Welcome to imfht.github.io!

imfht.github.io

Spark 与 ElasticSearch 实战 - 使用 Spark处理 Rapid7的HTTP 数据并使用 ES 做索引

Rapid7提供了全网常见端口的 HTTP 数据,数据的地址可以在https://opendata.rapid7.com/sonar.tcp/中被找到,数据的格式如下所示:

{"data":"SFRUUC8xLjEgNDA0IE5vdCBGb3VuZA0KU0VSVkVSOiBMaW51eC8yLjQuMzAsIFVQblAvMS4wLCBQb3J0YWJsZSBTREsgZm9yIFVQblAgZGV2aWNlcy8xLjMuMQ0KQ09OTkVDVElPTjogY2xvc2UNCkNPTlRFTlQtTEVOR1RIOiA0OA0KQ09OVEVOVC1UWVBFOiB0ZXh0L2h0bWwNCg0KPGh0bWw+PGJvZHk+PGgxPjQwNCBOb3QgRm91bmQ8L2gxPjwvYm9keT48L2h0bWw+","host":"5.12.214.213","ip":"5.12.214.213","path":"/","port":49153,"vhost":"5.12.214.213"}

其中 data 数据为 base64加密之后的HTTP 完整报文(包含 HTTP header 和 body)。
我们先写一个简单的脚本来解析这行数据:

linend = re.compile(r'\r\n\r\n')
title_regex = re.compile("<title>(.*?)</title>", re.I | re.M)
has_gzip_regex = re.compile(r'Content-Encoding: gzip', re.I)

def get_page(page_json):
    """
    给一个json 返回一个page对象
    """
    index = page_json['data'].find(b'\r\n\r\n')
    header = page_json['data'][:index].decode(errors='ignore')
    body = page_json['data'][index + 4:]
    server = re.findall(server_regex, header)
    if server:
        server = server[0].strip()
    else:
        server = ""
    if has_gzip_regex.findall(header):
        try:
            body = gzip.decompress(body)
        except Exception as e:
            pass
    encoding = get_charset(header, body)
    if encoding:
        try:
            body = body.decode(encoding)
        except:  # 一些奇怪的编码问题,不管了
            body = body.decode(errors='ignore')
            pass
    else:
        body = body.decode(errors='ignore')
    _ = re.findall(title_regex, body)

    title = _[0] if _ else ""
    print header, body, title

加入 spark 之后一个完整的脚本如下(略去了相关依赖库)。

import base64
import gzip
import json
import os
import re
import sys
from functools import reduce

import geoip2.database
import pyspark
from numpy import long

linend = re.compile(r'\r\n\r\n')
title_regex = re.compile("<title>(.*?)</title>", re.I | re.M)
has_gzip_regex = re.compile(r'Content-Encoding: gzip', re.I)

from Util import Util, Page

ip2int = lambda ip: reduce(lambda a, b: long(a) * 256 + long(b), ip.split('.'))

os.environ[
    'PYSPARK_SUBMIT_ARGS'] = '--jars ./elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar pyspark-shell'

def get_charset(header, html):
    try:
        m = re.search(b'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I)
        if m:
            return m.group(1).decode().replace('"', '')
    except Exception as e:
        pass
    try:
        if 'Content-Type' in header:
            m = re.search(r'.*?charset=(.*?)(;|$)', header, flags=re.I)
            if m:
                return m.group(1)
    except Exception as e:
        pass

server_regex = re.compile("server:(.*?)\r\n",re.I)
def get_page(page_json):
    """
    给一个json 返回一个page对象
    """
    index = page_json['data'].find(b'\r\n\r\n')
    header = page_json['data'][:index].decode(errors='ignore')
    body = page_json['data'][index + 4:]
    server = re.findall(server_regex, header)
    if server:
        server = server[0].strip()
    else:
        server = ""
    if has_gzip_regex.findall(header):
        try:
            body = gzip.decompress(body)
        except Exception as e:
            pass
    encoding = get_charset(header, body)
    if encoding:
        try:
            body = body.decode(encoding)
        except:  # 一些奇怪的编码问题,不管了
            body = body.decode(errors='ignore')
            pass
    else:
        body = body.decode(errors='ignore')
    _ = re.findall(title_regex, body)

    title = _[0] if _ else ""
    page = Page(title=title, body=body, ip=page_json['vhost'], port=page_json['port'], header=header,server=server)
    return page


ip2int = lambda ip: reduce(lambda a, b: long(a) * 256 + long(b), ip.split('.'))


def parse_line(line_json):
    reader = geoip2.database.Reader('./GeoLite2-ASN.mmdb')
    data = base64.b64decode(line_json['data'])
    line_json['data'] = data
    page = get_page(line_json)
    ip_int = ip2int(line_json['vhost'])
    ip_info = Util.get_ip_info(line_json['vhost'])
    line_json['ip_int'] = ip_int
    try:
        line_json['location'] = '%.2f,%.2f' % (float(ip_info['latitude']), float(ip_info['longitude']))
    except:
        pass # no location
    try:
        asn = reader.asn(line_json['ip'])
        line_json['asn_info'] = {"asn": asn.autonomous_system_number, 'asn_org': asn.autonomous_system_organization}
    except:
        pass
    if page.server:
        line_json['server'] = page.server
    line_json['title'] = page.title
    line_json['body'] = page.body
    line_json['header'] = page.header
    line_json['doc_id'] = '%s-%s-%s'%(line_json['ip'], line_json['port'], "http")
    line_json.pop('path')
    line_json.pop('vhost')
    line_json.pop('data')
    line_json.pop('host')
    return line_json


import hashlib


def get_md5(ip, port):
    return hashlib.md5(('%s%d' % (ip, port)).encode()).hexdigest()


def main():
    pyspark.SparkContext.setSystemProperty('spark.executor.memory', '28g')
    conf = pyspark.SparkConf().setMaster("local[*]").setAppName("Streamer")
    conf.set('spark.executor.cores', 64)
    conf.set("spark.executor.memory", '32g')
    es_write_conf = {
        # specify the node that we are sending data to (this should be the master)
        "es.nodes": 'http://127.0.0.1:9200',
        # specify the port in case it is not the default port
        'es.net.http.auth.user': '',
        'es.net.http.auth.pass': '',
        # "es.port": '9243',
        # 'es.net.ssl': 'true',
        # specify a resource in the form 'index/doc-type'
        "es.resource": 'http_test/_doc',
        # is the input JSON?
        'es.nodes.wan.only': 'true',
        'es.nodes.discovery': 'false',
        "es.input.json": "yes",
        # is there a field in the mapping that should be used to specify the ES document ID
        "es.mapping.id": "doc_id"
    }
    with pyspark.SparkContext("local[*]", "PySparkWordCount", conf=conf) as sc:
        # Get a RDD containing lines from this script file
        text_rdd = sc.textFile(sys.argv[1], 10)
        json_rdd = text_rdd.map(json.loads)
        result = json_rdd.map(parse_line)
        result = result.map(lambda x: (x['doc_id'], json.dumps(x, ensure_ascii=False)))
        # print(result.take(10))
        result.saveAsNewAPIHadoopFile(
            path='-',
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
            keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            # critically, we must specify our `es_write_conf`
            conf=es_write_conf)


if __name__ == '__main__':
    main()

以上脚本演示了如何使用 pyspark 来读取一个文件,对每一行进行(并行)处理之后将结果写入到 ES 中。

其他

处理 Rapid7的数据有以下几个注意事项:
1. 对于header中存在 gzip 的HTTP报文,需要使用 gzip 解码 body 中的内容
2. 可以从 header 的 Content-Type,body 的 charset 字段中找到编码信息