+
25
-

回答

可以,参考这个nginx远程http请求漏洞检测工具代码

点击查看全文

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Nginx Rewrite 漏洞行为检测器(改进版)

改进点:
1. 不再简单依赖版本号
2. 增加 rewrite 行为探测
3. 原始 socket HTTP 请求(避免 requests 自动处理)
4. 禁止自动 redirect
5. 检测 rewrite 特征
6. 差异化 payload fuzz
7. 检测 worker 崩溃迹象
8. URI 编码控制
9. 更真实的风险评分

注意:
这是“行为检测器”,不是100%漏洞确认器。
因为 rewrite 漏洞依赖 nginx.conf 配置。

使用:
python scanner.py example.com
python scanner.py example.com --ssl
python scanner.py example.com -p 8080
python scanner.py example.com --aggressive
"""

import argparse
import re
import socket
import ssl
import statistics
import time
from urllib.parse import urlparse

# =========================
# 配置
# =========================

DEFAULT_TIMEOUT = 8

USER_AGENT = "Mozilla/5.0 NginxRewriteScanner/2.0"

NORMAL_PATHS = [
    "/",
    "/index.html",
]

REWRITE_TEST_PATHS = [
    "/test",
    "/test/",
    "/test//",
    "/test%2f",
    "/test%3f",
    "/test?",
    "/a/../test",
    "/a//b//c",
]

FUZZ_PAYLOADS = [
    "/" + "A" * 128,
    "/" + "A" * 512,
    "/" + "A" * 1024,
    "/" + "A" * 2048,
    "/" + "A" * 4096,

    # 关键:? 编码
    "/" + "A" * 512 + "%3f" + "B" * 512,

    # rewrite capture 风格
    "/a/" + "A" * 1024,

    # 双斜杠
    "//" + "A" * 1024,

    # 编码
    "/%2e%2e/" + "A" * 1024,

    # 空字节风格
    "/" + "A" * 512 + "%00" + "B" * 512,
]

# =========================
# 工具函数
# =========================

def recv_all(sock, timeout=2):
    sock.settimeout(timeout)

    chunks = []

    while True:
        try:
            data = sock.recv(4096)

            if not data:
                break

            chunks.append(data)

        except socket.timeout:
            break

        except:
            break

    return b"".join(chunks)


def parse_status(response_bytes):
    try:
        text = response_bytes.decode(errors="ignore")

        first_line = text.split("\r\n")[0]

        match = re.search(r"HTTP/\d\.\d\s+(\d+)", first_line)

        if match:
            return int(match.group(1))

    except:
        pass

    return None


def parse_headers(response_bytes):
    headers = {}

    try:
        text = response_bytes.decode(errors="ignore")

        lines = text.split("\r\n")

        for line in lines[1:]:

            if ":" in line:
                k, v = line.split(":", 1)
                headers[k.strip().lower()] = v.strip()

    except:
        pass

    return headers


def raw_http_request(
        host,
        port,
        path="/",
        use_ssl=False,
        timeout=DEFAULT_TIMEOUT
):
    """
    原始 HTTP 请求
    不使用 requests
    """

    start = time.time()

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)

    try:
        sock.connect((host, port))

        if use_ssl:
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE

            sock = context.wrap_socket(sock, server_hostname=host)

        request = (
            f"GET {path} HTTP/1.1\r\n"
            f"Host: {host}\r\n"
            f"User-Agent: {USER_AGENT}\r\n"
            f"Accept: */*\r\n"
            f"Connection: close\r\n"
            f"\r\n"
        )

        sock.send(request.encode())

        response = recv_all(sock)

        elapsed = time.time() - start

        status = parse_status(response)

        headers = parse_headers(response)

        return {
            "ok": True,
            "status": status,
            "headers": headers,
            "latency": elapsed,
            "raw": response,
        }

    except Exception as e:

        elapsed = time.time() - start

        return {
            "ok": False,
            "error": str(e),
            "latency": elapsed,
        }

    finally:
        try:
            sock.close()
        except:
            pass


def extract_nginx_version(server_header):
    if not server_header:
        return None

    m = re.search(r"nginx/([\d\.]+)", server_header)

    if m:
        return m.group(1)

    return None


# =========================
# 核心逻辑
# =========================

class RiskScore:

    def __init__(self):
        self.score = 0
        self.reasons = []

    def add(self, score, reason):
        self.score += score
        self.reasons.append(f"[+{score}] {reason}")

    def level(self):

        if self.score >= 80:
            return "高风险"

        if self.score >= 50:
            return "中风险"

        if self.score >= 20:
            return "低风险"

        return "未发现明显风险"


def fingerprint_nginx(host, port, use_ssl):
    print("[1] 指纹识别...")

    r = raw_http_request(
        host=host,
        port=port,
        path="/",
        use_ssl=use_ssl,
    )

    if not r["ok"]:
        print("    无法连接")
        return None

    headers = r["headers"]

    server = headers.get("server", "")

    nginx_version = extract_nginx_version(server)

    print(f"    Server: {server}")

    if nginx_version:
        print(f"    Nginx版本: {nginx_version}")

    return {
        "server": server,
        "version": nginx_version,
        "headers": headers,
    }


def detect_rewrite_behavior(host, port, use_ssl):
    print("[2] rewrite 行为探测...")

    results = []

    for path in REWRITE_TEST_PATHS:

        r = raw_http_request(
            host=host,
            port=port,
            path=path,
            use_ssl=use_ssl,
        )

        if not r["ok"]:
            continue

        headers = r["headers"]

        location = headers.get("location")

        results.append({
            "path": path,
            "status": r["status"],
            "location": location,
        })

        print(f"    {path} -> {r['status']} {location or ''}")

    return results


def fuzz_rewrite(host, port, use_ssl):
    print("[3] rewrite fuzz...")

    results = []

    for payload in FUZZ_PAYLOADS:

        r = raw_http_request(
            host=host,
            port=port,
            path=payload,
            use_ssl=use_ssl,
        )

        results.append({
            "payload": payload[:80],
            "ok": r["ok"],
            "status": r.get("status"),
            "latency": r.get("latency"),
            "error": r.get("error"),
        })

        if r["ok"]:
            print(
                f"    {payload[:40]}... -> "
                f"{r['status']} "
                f"{r['latency']:.2f}s"
            )
        else:
            print(
                f"    {payload[:40]}... -> "
                f"ERROR {r['error']}"
            )

        time.sleep(0.2)

    return results


def detect_worker_instability(host, port, use_ssl):
    print("[4] worker 稳定性检测...")

    latencies = []
    failures = 0

    for i in range(10):

        r = raw_http_request(
            host=host,
            port=port,
            path="/",
            use_ssl=use_ssl,
        )

        if not r["ok"]:
            failures += 1
            continue

        latencies.append(r["latency"])

    if latencies:
        avg = statistics.mean(latencies)

        jitter = statistics.pstdev(latencies)

    else:
        avg = 999
        jitter = 999

    print(f"    平均延迟: {avg:.2f}s")
    print(f"    抖动: {jitter:.2f}")
    print(f"    失败次数: {failures}")

    return {
        "avg_latency": avg,
        "jitter": jitter,
        "failures": failures,
    }


def analyze(
        fingerprint,
        rewrite_results,
        fuzz_results,
        stability_results
):
    print("[5] 风险分析...")

    risk = RiskScore()

    # nginx
    if fingerprint:

        server = fingerprint["server"].lower()

        if "nginx" in server:
            risk.add(10, "检测到 nginx")

        if fingerprint["version"]:
            risk.add(10, "暴露 nginx 版本")

    # rewrite
    rewrite_detected = False

    for r in rewrite_results:

        if r["status"] in [301, 302, 307, 308]:
            rewrite_detected = True

        if r["location"]:
            rewrite_detected = True

    if rewrite_detected:
        risk.add(25, "检测到 rewrite/redirect 行为")

    # fuzz
    error_500 = 0
    connection_failures = 0

    for r in fuzz_results:

        if not r["ok"]:
            connection_failures += 1

        if r["status"] and r["status"] >= 500:
            error_500 += 1

    if error_500 >= 2:
        risk.add(25, f"fuzz 导致多个 5xx ({error_500})")

    if connection_failures >= 2:
        risk.add(35, f"fuzz 后连接异常 ({connection_failures})")

    # stability
    if stability_results["failures"] >= 3:
        risk.add(40, "worker 疑似不稳定")

    if stability_results["jitter"] > 1:
        risk.add(10, "请求抖动异常")

    return risk


# =========================
# 主函数
# =========================

def main():

    parser = argparse.ArgumentParser(
        description="Nginx Rewrite 行为漏洞检测器"
    )

    parser.add_argument(
        "host",
        help="目标主机"
    )

    parser.add_argument(
        "-p",
        "--port",
        type=int,
        default=None,
        help="端口"
    )

    parser.add_argument(
        "--ssl",
        action="store_true",
        help="HTTPS"
    )

    parser.add_argument(
        "--aggressive",
        action="store_true",
        help="激进模式"
    )

    args = parser.parse_args()

    if args.port is None:
        args.port = 443 if args.ssl else 80

    print("=" * 60)
    print(" Nginx Rewrite 行为漏洞检测器")
    print("=" * 60)

    print(f"目标: {args.host}:{args.port}")
    print(f"SSL: {'开启' if args.ssl else '关闭'}")
    print()

    # 1
    fingerprint = fingerprint_nginx(
        args.host,
        args.port,
        args.ssl
    )

    print()

    # 2
    rewrite_results = detect_rewrite_behavior(
        args.host,
        args.port,
        args.ssl
    )

    print()

    # 3
    fuzz_results = fuzz_rewrite(
        args.host,
        args.port,
        args.ssl
    )

    print()

    # 4
    stability_results = detect_worker_instability(
        args.host,
        args.port,
        args.ssl
    )

    print()

    # 5
    risk = analyze(
        fingerprint,
        rewrite_results,
        fuzz_results,
        stability_results
    )

    print("=" * 60)
    print(" 检测结果")
    print("=" * 60)

    print(f"风险等级: {risk.level()}")
    print(f"风险评分: {risk.score}/100")

    print()

    print("分析结果:")

    for reason in risk.reasons:
        print(f"  {reason}")

    print()

    print("注意:")
    print("1. rewrite漏洞依赖 nginx.conf")
    print("2. 无法仅通过远程100%确认")
    print("3. 本工具是行为检测,不是漏洞确认")
    print("4. 建议结合 nginx 配置审计")

    print("=" * 60)


if __name__ == "__main__":
    main()

网友回复

我知道答案,我要回答