Welcome to imfht.github.io!

imfht.github.io

Spark 与 ElasticSearch 实战 - 使用 Spark处理 Rapid7的HTTP 数据并使用 ES 做索引

Rapid7提供了全网常见端口的 HTTP 数据,数据的地址可以在https://opendata.rapid7.com/sonar.tcp/中被找到,数据的格式如下所示:

{"data":"SFRUUC8xLjEgNDA0IE5vdCBGb3VuZA0KU0VSVkVSOiBMaW51eC8yLjQuMzAsIFVQblAvMS4wLCBQb3J0YWJsZSBTREsgZm9yIFVQblAgZGV2aWNlcy8xLjMuMQ0KQ09OTkVDVElPTjogY2xvc2UNCkNPTlRFTlQtTEVOR1RIOiA0OA0KQ09OVEVOVC1UWVBFOiB0ZXh0L2h0bWwNCg0KPGh0bWw+PGJvZHk+PGgxPjQwNCBOb3QgRm91bmQ8L2gxPjwvYm9keT48L2h0bWw+","host":"5.12.214.213","ip":"5.12.214.213","path":"/","port":49153,"vhost":"5.12.214.213"}

其中 data 数据为 base64加密之后的HTTP 完整报文(包含 HTTP header 和 body)。
我们先写一个简单的脚本来解析这行数据:

linend = re.compile(r'\r\n\r\n')
title_regex = re.compile("<title>(.*?)</title>", re.I | re.M)
has_gzip_regex = re.compile(r'Content-Encoding: gzip', re.I)

def get_page(page_json):
    """
    给一个json 返回一个page对象
    """
    index = page_json['data'].find(b'\r\n\r\n')
    header = page_json['data'][:index].decode(errors='ignore')
    body = page_json['data'][index + 4:]
    server = re.findall(server_regex, header)
    if server:
        server = server[0].strip()
    else:
        server = ""
    if has_gzip_regex.findall(header):
        try:
            body = gzip.decompress(body)
        except Exception as e:
            pass
    encoding = get_charset(header, body)
    if encoding:
        try:
            body = body.decode(encoding)
        except:  # 一些奇怪的编码问题,不管了
            body = body.decode(errors='ignore')
            pass
    else:
        body = body.decode(errors='ignore')
    _ = re.findall(title_regex, body)

    title = _[0] if _ else ""
    print header, body, title

加入 spark 之后一个完整的脚本如下(略去了相关依赖库)。

import base64
import gzip
import json
import os
import re
import sys
from functools import reduce

import geoip2.database
import pyspark
from numpy import long

linend = re.compile(r'\r\n\r\n')
title_regex = re.compile("<title>(.*?)</title>", re.I | re.M)
has_gzip_regex = re.compile(r'Content-Encoding: gzip', re.I)

from Util import Util, Page

ip2int = lambda ip: reduce(lambda a, b: long(a) * 256 + long(b), ip.split('.'))

os.environ[
    'PYSPARK_SUBMIT_ARGS'] = '--jars ./elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar pyspark-shell'

def get_charset(header, html):
    try:
        m = re.search(b'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I)
        if m:
            return m.group(1).decode().replace('"', '')
    except Exception as e:
        pass
    try:
        if 'Content-Type' in header:
            m = re.search(r'.*?charset=(.*?)(;|$)', header, flags=re.I)
            if m:
                return m.group(1)
    except Exception as e:
        pass

server_regex = re.compile("server:(.*?)\r\n",re.I)
def get_page(page_json):
    """
    给一个json 返回一个page对象
    """
    index = page_json['data'].find(b'\r\n\r\n')
    header = page_json['data'][:index].decode(errors='ignore')
    body = page_json['data'][index + 4:]
    server = re.findall(server_regex, header)
    if server:
        server = server[0].strip()
    else:
        server = ""
    if has_gzip_regex.findall(header):
        try:
            body = gzip.decompress(body)
        except Exception as e:
            pass
    encoding = get_charset(header, body)
    if encoding:
        try:
            body = body.decode(encoding)
        except:  # 一些奇怪的编码问题,不管了
            body = body.decode(errors='ignore')
            pass
    else:
        body = body.decode(errors='ignore')
    _ = re.findall(title_regex, body)

    title = _[0] if _ else ""
    page = Page(title=title, body=body, ip=page_json['vhost'], port=page_json['port'], header=header,server=server)
    return page


ip2int = lambda ip: reduce(lambda a, b: long(a) * 256 + long(b), ip.split('.'))


def parse_line(line_json):
    reader = geoip2.database.Reader('./GeoLite2-ASN.mmdb')
    data = base64.b64decode(line_json['data'])
    line_json['data'] = data
    page = get_page(line_json)
    ip_int = ip2int(line_json['vhost'])
    ip_info = Util.get_ip_info(line_json['vhost'])
    line_json['ip_int'] = ip_int
    try:
        line_json['location'] = '%.2f,%.2f' % (float(ip_info['latitude']), float(ip_info['longitude']))
    except:
        pass # no location
    try:
        asn = reader.asn(line_json['ip'])
        line_json['asn_info'] = {"asn": asn.autonomous_system_number, 'asn_org': asn.autonomous_system_organization}
    except:
        pass
    if page.server:
        line_json['server'] = page.server
    line_json['title'] = page.title
    line_json['body'] = page.body
    line_json['header'] = page.header
    line_json['doc_id'] = '%s-%s-%s'%(line_json['ip'], line_json['port'], "http")
    line_json.pop('path')
    line_json.pop('vhost')
    line_json.pop('data')
    line_json.pop('host')
    return line_json


import hashlib


def get_md5(ip, port):
    return hashlib.md5(('%s%d' % (ip, port)).encode()).hexdigest()


def main():
    pyspark.SparkContext.setSystemProperty('spark.executor.memory', '28g')
    conf = pyspark.SparkConf().setMaster("local[*]").setAppName("Streamer")
    conf.set('spark.executor.cores', 64)
    conf.set("spark.executor.memory", '32g')
    es_write_conf = {
        # specify the node that we are sending data to (this should be the master)
        "es.nodes": 'http://127.0.0.1:9200',
        # specify the port in case it is not the default port
        'es.net.http.auth.user': '',
        'es.net.http.auth.pass': '',
        # "es.port": '9243',
        # 'es.net.ssl': 'true',
        # specify a resource in the form 'index/doc-type'
        "es.resource": 'http_test/_doc',
        # is the input JSON?
        'es.nodes.wan.only': 'true',
        'es.nodes.discovery': 'false',
        "es.input.json": "yes",
        # is there a field in the mapping that should be used to specify the ES document ID
        "es.mapping.id": "doc_id"
    }
    with pyspark.SparkContext("local[*]", "PySparkWordCount", conf=conf) as sc:
        # Get a RDD containing lines from this script file
        text_rdd = sc.textFile(sys.argv[1], 10)
        json_rdd = text_rdd.map(json.loads)
        result = json_rdd.map(parse_line)
        result = result.map(lambda x: (x['doc_id'], json.dumps(x, ensure_ascii=False)))
        # print(result.take(10))
        result.saveAsNewAPIHadoopFile(
            path='-',
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
            keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            # critically, we must specify our `es_write_conf`
            conf=es_write_conf)


if __name__ == '__main__':
    main()

以上脚本演示了如何使用 pyspark 来读取一个文件,对每一行进行(并行)处理之后将结果写入到 ES 中。

其他

处理 Rapid7的数据有以下几个注意事项:
1. 对于header中存在 gzip 的HTTP报文,需要使用 gzip 解码 body 中的内容
2. 可以从 header 的 Content-Type,body 的 charset 字段中找到编码信息

TravisCi与 Gitbook 实战:使用 TravisCi自动编译Gitbook文档并发布 Release

最近有一个项目需要编写比较多的 Markdown 文档,于是使用 Gitbook来完成这个事情,并使用 github 进行版本管理。这篇文章实现了当新的 Markdown 文档被推送到 github 时TravisCi自动发起构建,构建完成之后发布一个PDF release。

什么是 Gitbook

GitBook 是一个基于Node.js 的命令行工具,可使用Github/Git 和Markdown 来制作精美的电子书。
- 百度百科

什么是 TravisCi

Travis CI是在软件开发领域中的一个在线的,分布式的持续集成服务,用来构建及测试在GitHub托管的代码。这个软件的代码同时也是开源的,可以在GitHub上下载到,尽管开发者当前并不推荐在闭源项目中单独使用它。
- 维基百科

实战

在项目的根目录添加如下的 .travis.yml 文件,在 Travis.yml设置自动 build即可。

language: node_js

node_js:
  - "9"

# 缓存依赖
cache:
  directories:
    - $HOME/.npm

before_install:
  - export TZ='Asia/Shanghai' # 更改时区
  - sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 6B05F25D762E3157
  - sudo apt update -qq
  - sudo apt-get install calibre xvfb -y
  - sudo apt-get install ttf-wqy-*
  - echo '#!/bin/bash' | sudo tee -a /usr/local/bin/ebook-convert
  - echo 'Run xvfb-run /usr/bin/ebook-convert $@' | sudo tee -a /usr/local/bin/ebook-convert
  - echo 'xvfb-run /usr/bin/ebook-convert "$@"' | sudo tee -a /usr/local/bin/ebook-convert
  - sudo chmod +x /usr/local/bin/ebook-convert

# 依赖安装
install:
  - npm install gitbook-cli -g
  # 安装 gitbook 插件
  - gitbook install

# 构建脚本
script:
    # 自定义输出目录 gitbook build src dest
  - gitbook pdf ./ ./book.pdf

# 分支白名单
branches:
  only:
    - master # 只对 master 分支进行构建

# GitHub Pages 部署
deploy:
  provider: releases
  skip_cleanup: true
  api_key: $CI_USER_TOKEN
  file:
     - book.pdf

MacOS 软件破解实战:无限试用图床软件 iPic

在使用 Typora 时发现这个编辑器可以使用 iPic 上传图片。下载之后发现这个软件是订阅制的,需要 ¥60/年。
iPic确实是一款很好用的软件,支持的图床种类也有很多,但是作为一个几乎是0运营成本的软件我不能接受订阅制这个东西。

无限试用

默认 iPic 支持试用7天无限制图床,试用采取的是本地验证。试用开启时间存储在

/Users/yourname/Library/Containers/net.toolinbox.ipic/Data/Library/SyncedPreferences/net.toolinbox.ipic.plist

values -> Markdown -> value 更换为七天之内的时间戳即可实现无限试用。

写一个小脚本

一个方便的做法是在启动 iPic 前将这个试用时间调到一个最近的时间。但将 /Applications/iPic.app/Contents/MacOS/iPic改为启动脚本会出现一些签名的奇怪问题。
所以我的解决方法是

mkdir -p /Applications/cipc.app/MacOS/

/Applications/cipc.app/MacOS/cipc 中填入如下内容

!/bin/bash
/usr/libexec/PlistBuddy -c "Set ':values:Markdown Flag:value' `date +%s`" /Users/`whoami`/Library/Containers/net.toolinbox.ipic/Data/Library/SyncedPreferences/net.toolinbox.ipic.plist && nohup /Applications/iPic.app/Contents/MacOS/iPic &

别忘了添加可执行权限 chmod +x /Applications/cipc.app/MacOS/cipc
然后运行cipc即可实现对 iPic 的无限试用。

批量注册实战:使用Appium 和安卓虚拟机完成 Telegram 批量注册

测试了一下使用 Python 操作 Telegram。记录一下大致过程供备忘。

0x01 安装 Genymotion与VirtualBox

Genymotion是一款非常好用的安卓模拟器,网络上有很多的安装教程,这里不再赘述。
这里要注意:需要使用安卓6.0以下的安卓虚拟机。原因是安卓6.0引入了运行时权限管理,使我们的自动化任务变得复杂很多,而且涉及到一些 Root 权限的问题。故推荐使用安卓5.0版本的虚拟机。

0x02 安装安卓 SDK

在MacOS 下,使用命令brew cask install android-sdk 进行安装。
然后使用 brew cask install homebrew/cask-versions/java8安装 Java8。
使用 brew cask info android-sdk 查看安卓 SDK 的 path。
上述步骤完成之后,启动 Genymotion 虚拟机,运行adb devices应该可以看到运行中的虚拟机。

☁  ~  adb devices
List of devices attached
192.168.56.101:5555 device

0x03 安装 & 配置Appium

前往 https://github.com/appium/appium-desktop/releases/latest 下载最新版本的 Appium。
点击 Appium,在 ANDROID_HOME 一行中填入安卓 SDK 的 path。JAVA_HOME 实测不用管。

0x04 运行我们的 Python代码

这里选择的号池是 fxhyd.cn。有很多的号池可以选择,不需要局限在这一个。

import re
import time

import requests
from appium import webdriver

TOKEN = ''


def get_number():
    """
    从号池中获取号码
    """
    req = requests.get(
        "http://api.fxhyd.cn/UserInterface.aspx?action=getmobile&token=%s&itemid=3988&excludeno=170.171.150" % TOKEN)
    if 'success' in req.text:
        return req.text.split("|")[-1]


def get_message(phone_number):
    """
    获取短信验证码
    """
    while True:
        req = requests.get(
            "http://api.fxhyd.cn/UserInterface.aspx?action=getsms&token=%s&itemid=3988&mobile=%s&release=1" % (
                TOKEN, phone_number))
        if "success" not in req.text:
            print(req.text)  # 30001
        else:
            return re.findall("\\d+", req.text)[0]
        time.sleep(10)


def release_number(phone_number):
    req = requests.get(
        "http://api.fxhyd.cn/UserInterface.aspx?action=release&token=%s&itemid=3988&mobile=%s," % (TOKEN, phone_number))
    print(req.text)


def main():
    desired_caps = {}
    desired_caps['platformName'] = 'Android'
    desired_caps['platformVersion'] = '5.0'
    desired_caps['deviceName'] = 'Android Emulator'
    desired_caps['appPackage'] = 'org.telegram.messenger'
    desired_caps['appActivity'] = 'org.telegram.ui.LaunchActivity'
    desired_caps['newCommandTimeout'] = 2000000  # Appium默认的超时是60s: 在60s 内没有接到指令的话会重启应用,这里加大这个值
    driver = webdriver.Remote('http://localhost:4723/wd/hub', desired_caps)
    driver.find_elements_by_class_name("android.widget.TextView")[-1].click()
    try:
        print(driver.page_source)
        country_num, phone_number = driver.find_elements_by_class_name("android.widget.EditText")
        for i in range(10):
            driver.keyevent(67)
        number = get_number()
        print("get number %s" % number)
        phone_number.send_keys("86%s" % number)
        driver.find_elements_by_class_name("android.widget.FrameLayout")[5].click()  # 对勾勾上
        message = get_message(number)
        driver.find_elements_by_class_name("android.widget.EditText")[0].send_keys(message)  # 输入验证码
        driver.find_elements_by_class_name("android.widget.EditText")[0].send_keys(number)  # 登录界面
        # print(release_number(number))
        driver.find_elements_by_class_name("android.widget.FrameLayout")[5].click()  # 对勾勾上
    except Exception as e:
        print(e)
    time.sleep(10)
    driver.quit()


def test_get_release():
    number = get_number()
    print(number)
    release_number(number)


if __name__ == '__main__':
    main()