- [1.1 题目要求](https://eqqie.cn/#menu_index_2)
- [1.2 思路](https://eqqie.cn/#menu_index_3)
- [1.3 实现](https://eqqie.cn/#menu_index_4)
- [1.3.1 相关工具](https://eqqie.cn/#menu_index_5)
- [1.3.2 脚本](https://eqqie.cn/#menu_index_6)
无经验新手队伍的writeup,轻喷
- 即需要满足如下关系:`pointer_value - image_base = string_offset`所以实现方式大致为:
- 检索所有的字符串信息,并搜集`string_offset`
- 按照目标架构的`size_t`长度搜集所有的`pointer_value`
- 按照一定步长遍历`image_base`,计算所有`image_base` 取值下`string_offset`的正确数量,并统计出正确数量最多的前几个候选`image_base`输出在此基础上可以增加一些优化措施,比如可以像 rbasefind2 一样通过比较子字符串差异以获得
C0ss4ck 师傅使用的思路是通过 unicorn 手动加载并模拟固件,然后在
+0x0 offset
的位置添加一个读写和大跳转 Hook,以此实现的 rbase 效果和用字符串实现的效果基本一样,值得参考:github.com/Cossack9989/uni-rbasefind
基于 soyersoyer/basefind2 和 sgayou/rbasefind 项目以及 ReFirmLabs/binwalk 工具实现
rbasefind
主要提供了3个控制参数:搜索步长,最小有效字符串长度以及端序binwalk
用于通过指令比较的方式检查 Image 文件的架构和端序rbasefind
,可以得到可信度最高的 Image Base 值,将其作为答案提交import os
import sys
import subprocess
chall_1_data_path = "../dataset/1"
file_list = os.listdir(chall_1_data_path)
vxworks = {15, 21, 36, 37, 44, 45, 49}
ecos = {4, 2, 30, 49, 18, 45, 33, 5, 20, 32, 43}
answer = {}
def get_default_answer(data_i):
if int(data_i) in vxworks:
return hex(0x40205000)
elif int(data_i) in ecos:
return hex(0x80040000)
else:
return hex(0x80000000)
def check_endian(path):
out, err = subprocess.Popen(
f"binwalk -Y \'{path}\'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
# print(out)
if b", little endian, " in out:
return "little"
elif b", big endian, " in out:
return "big"
else:
return "unknown"
if __name__ == "__main__":
#file_list = ["2", "5"]
cnt = 0
for file in file_list:
cnt += 1
print(f"[{cnt}/{len(file_list)}] Processing file: {file}...")
file_path = os.path.join(chall_1_data_path, file)
endian = check_endian(file_path)
if endian == "little":
cmd = f"./rbase_find -o 0x100 -m 10 \'{file_path}\' 2>/dev/null | sed -n \"1p\""
elif endian == "big":
cmd = f"./rbase_find -o 0x100 -m 10 -b \'{file_path}\' 2>/dev/null | sed -n \"1p\""
elif endian == "unknown":
cmd = f"./rbase_find -o 0x100 -m 10 \'{file_path}\' 2>/dev/null | sed -n \"1p\""
try:
out, err = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
except Exception as e:
# error
print(f"Rbase file \'{file_path}\' failed with:", e)
answer[file] = get_default_answer(file)
continue
out = out.decode().strip()
print(f"File {file_path} done with:", out)
colsep = out.split(":")
if len(colsep) != 2:
answer[file] = get_default_answer(file)
continue
# success
base_address = colsep[0].strip()
base_address = hex(int(base_address, 16))
print(f"Add '{file}:{base_address}\' => answer")
answer[file] = base_address
# sort answer
answer = dict(sorted(answer.items(), key=lambda item: int(item[0])))
with open("rbase_answer.txt", "w") as f:
for key, value in answer.items():
f.write(f"{key}:{value}\n")
从题目要求来看应该是比较经典的二进制匹配问题了,相关工具和公开的思路都不少。最开始看到题目我们就有了如下两种思路。
第一种是传统的 静态二进制匹配 方式,提取目标函数的 CFG 特征或者 sig 等信息,将其与无符号二进制中的函数进行比较,并输出匹配结果的可信度。由于尝试了几个现成的工具后发现效果不尽人意,暂时也没想到优化措施,就暂时搁置了这个思路。
后续和 C0ss4ck 师傅交流了一下,他是通过魔改的 Gencoding 以及大量提取各种各样 Glibc 中的函数特征而实现的二进制匹配。一开始效果一般,和我分数差不多,但是后来他针对性的提取了很多特殊 RTOS 工具链构建出来的 kernel 的函数特征,效果突飞猛进,相关特征库也已经开源,可以参考一下:Cossack9989/BinFeatureDB: Binary Feature(ACFG) Database for DataCon2022-IoT-Challenge-2 (github.com)。
第二种是通过 动态模拟执行 来匹配函数。这个思路是比赛时想到的,之前没有见过相关工具,也没有阅读过相关资料,直觉上觉得效果会不错,而且很有挑战性,于是着手尝试实现这个思路。
- **测试用例**:为要匹配的所有函数设计输入和输出用例
- **函数行为**:为一些该函数特有的访存行为定义回调函数,如`memcpy`和`memcpy`会对两个指针参数指向的地址进行访存
- **系统调用**:监控某些函数会使用的系统调用,如`recv`, `recvmsg`,`send`,`sendto` 等socket函数依赖于某些底层调用提取出函数的起始地址,为该函数
图画得稍微有点不清楚的地方,Snapshot 在一个 Test Case 中只执行一次,往后只完成 Args Passing 就行,懒得改了...
- BinaryMatch 负责遍历加载目标文件,构建出可的模拟执行对象,并请求 Solver 匹配目标函数
- Solver 则是使用模拟执行的方式,将运行结果和预期结果作比较,判断是否匹配。而与一般匹配方式不同的是,不需要提前编译并搜集函数特征库,但是需要手动实现某个函数的 matcher
- 由于题目强调了基址不同时以 IDA 为准,这里绕了点弯使用 IDA 导出的结果
ida_res_file = os.path.join(ida_res_dir, f"{file_r_n}_ida_res.txt")
with open(ida_res_file, "r") as f:
ida_res = json.loads(f.read())
bin = BinaryMatch(
file_path, func_list=ida_res["func_list"], base=ida_res["image_base"])
res[file_path] = bin.match_all()
将函数列表和 ELF 文件作为参数构造一个 BinaryMatch 对象,该对象负责组织针对当前 ELF 的匹配工作:
识别 ELF 架构和端序,选用指定参数去创建一个 Qiling 虚拟机对象,用于后续模拟执行
_archtype = self._get_ql_arch()
_endian = self._get_ql_endian()
if _archtype == None or _endian == None:
self.log_warnning(f"Unsupported arch {self.arch}-{self.bit}-{self.endian}")
return False
...
ql = Qiling(
[self.file_path],
rootfs="./fake_root",
archtype=_archtype, endian=_endian, ostype=QL_OS.LINUX,
#multithread=True
verbose=QL_VERBOSE.DISABLED
#verbose=QL_VERBOSE.DEBUG
)
entry_point = ql.loader.images[0].base + _start - self.base
exit_point = ql.loader.images[0].base + _end - self.base
遍历 BinaryMatch 类中默认的或用户指定的匹配目标(Target),使用注册到 BinaryMatch 类上的对应架构的 Solver 创建一个 solver 实例,调用其中的 solve 方法发起匹配请求:
如:一个 x86_64 小端序的 ELF 会请求到 Amd64LittleSolver.solve()
...
"amd64": {
"64": {
"little": Amd64LittleSolver
}
}
...
每次请求可以看成传递了一个3元组:(虚拟机对象, 欲匹配函数名, 待匹配函数入口)
solver = self._get_solver()
res = solver.solve(ql, target, entry_point, exit_point) # solve
返回匹配结果
Solver.solve()
方法
def solve(self, ql: Qiling, target: str, _start: int, _end: int):
self._build_context(ql)
matcher = self._matchers.get(target, None)
if matcher == None:
self.log_warnning(f"No mather for \"{target}()\"")
return False
_test_cases = self._get_test_cases(target)
if _test_cases == None:
self.log_warnning(f"No test cases for {target}!")
return False
_case_i = 0
# Snapshot: save states of emulator
ql_all = ql.save(reg=True, cpu_context=True,
mem=True, loader=True, os=True, fd=True)
ql.log.info("Snapshot...")
for case in _test_cases:
_case_i += 1
# global hooks
self._set_global_hook(ql, target, _start, _end)
# match target funtion
if not matcher(ql, _start, _end, case):
self.log_warnning(f"Test Case {_case_i}/{len(_test_cases)} failed!")
return False
# Resume: recover states of emulator
ql.clear_hooks()
# note that it can not unmap the mapped memory. Fuck you Qiling! It is a shit bug!
ql.restore(ql_all)
ql.log.info("Restore...")
self.log_success(f"Test Case {_case_i}/{len(_test_cases)} passed!")
return True
调用 Solver.solve()
方法后,开始构建函数运行所需的上下,文这些上下文信息包括:
def _build_context(self, ql: Qiling):
# Due to Qiling's imperfect implementation of Hook, it's like a piece of shit here
mmap_start = self._default_mmap_start
mmap_size = self._default_mmap_size
# Prevent syscall read
def null_read_impl(ql: Qiling, abi: int, length: int, flags: int, sig: int):
self.log_warnning("Ingnore syscall READ!")
return 0
ql.os.set_syscall('read', null_read_impl, QL_INTERCEPT.CALL)
# Prevent syscall setrlimit
def null_setrlimit_impl(ql: Qiling, abi: int, length: int, flags: int, sig: int):
self.log_warnning("Ingnore syscall SETRLIMIT!")
return 0
ql.os.set_syscall('setrlimit', null_setrlimit_impl, QL_INTERCEPT.CALL)
# args buffer
ql.mem.map(mmap_start, mmap_size, UC_PROT_ALL)
# return point
ql.mem.map(self._default_return_point, 0x1000, UC_PROT_ALL)
char *buf
)创建对应的缓冲区 ql.mem.map(mmap_start, mmap_size, UC_PROT_ALL)
;_build_context
方法进行补充完善。如:x86_64 下需要直接调用底层 Unicorn 接口给 UC_X86_REG_FS_BASE
寄存器赋值,防止访问 TLS 结构体时出现异常;上下文构造完毕后,进入 预执行 状态,在这个状态下调用快照功能将 Qiling Machine 的状态保存下来。因为一个目标函数的测试用例可能有好几个,使用快照可以防止用例间产生干扰,并且避免了重复构建上下文信息
调用 _set_global_hook
设置全局 hook,主要是便于不同架构下单独进行 debug 调试
def _set_global_hook(self, ql: Qiling, target: str, _start: int, _end: int):
def _code_hook(ql: Qiling, address: int, size: int, md: Cs):
_insn = next(md.disasm(ql.mem.read(address,size), address, count=1))
_mnemonic = _insn.mnemonic
_op_str = _insn.op_str
_ins_str = f"{_mnemonic} {_op_str}"
self.log_warnning(f"Hook <{hex(address)}: {_ins_str}> instruction.")
ql.hook_code(_code_hook, user_data=ql.arch.disassembler)
return
借助 _set_global_hook 实现简单的调试功能,检查执行出错的指令
检查类的内部是否实现了名为 _match_xxxx
的私有方法,其中 xxxx
是待匹配目标函数的名称,如 strlen
对应 _match_strlen
。如果有实现该方法则取出作为 matcher 传入 Qiling Machine,函数地址,测试用例,并等待返回匹配结果
matcher = self._matchers.get(target, None)
...
if not matcher(ql, _start, _end, case):
self.log_warnning(f"Test Case {_case_i}/{len(_test_cases)} failed!")
return False
以 _match_strlen
为例,一个 matcher 的实现逻辑大致如下:
def _match_strlen(self, ql: Qiling, entry_point: int, exit_point: int, case):
match_result = False
# 需要注册一个_return_hook到返回点上
def _return_hook(ql: Qiling) -> None:
nonlocal match_result
nonlocal case
# check output
assert self._type_is_int(case["out"][0])
if case["out"][0].data == self._get_retval(ql)[0]:
match_result = True
ql.stop()
ql.hook_address(_return_hook, self._default_return_point)
self._pass_args(ql, case["in"])
self._run_emu(ql, entry_point, exit_point)
return match_result
有一些函数涉及到缓冲区访问,或者会将结果保存到缓冲区中,实现上则更麻烦,如 _match_memcmp
:
def _match_memcmp(self, ql: Qiling, entry_point: int, exit_point: int, case):
match_result = False
_dest_mem_read = False
_dest_mem_addr = self._get_arg_buffer_ptr(0)
_src_mem_read = False
_src_mem_addr = self._get_arg_buffer_ptr(1)
_mem_size = self._default_buffer_size
_cmp_len = case["in"][2].data
# memcmp() function must read this two mem
def _mem_read_hook(ql: Qiling, access: int, address: int, size: int, value: int):
nonlocal _dest_mem_read, _src_mem_read
nonlocal _dest_mem_addr, _src_mem_addr
nonlocal _mem_size
if access == UC_MEM_READ:
if address >= _dest_mem_addr and address < _dest_mem_addr + _mem_size:
_dest_mem_read = True
if address >= _src_mem_addr and address < _src_mem_addr + _mem_size:
_src_mem_read = True
return
_hook_start = self._default_mmap_start
_hook_end =_hook_start + self._default_mmap_size
ql.hook_mem_read(_mem_read_hook, begin=self._default_mmap_start, end=_hook_end)
def _return_hook(ql: Qiling) -> None:
nonlocal match_result
nonlocal case
_dst_buffer = case["in"][0].data
_src_buffer = case["in"][1].data
# Check whether the buffer is accessed
if _dest_mem_read and _src_mem_read:
# check memory consistency
if case["in"][0].data == self._get_arg_buffer(ql, 0, len(case["in"][0].data)) and\
case["in"][1].data == self._get_arg_buffer(ql, 1, len(case["in"][1].data)):
# check memcmp result
if _dst_buffer[:_cmp_len] == _src_buffer[:_cmp_len]:
if self._get_retval(ql)[0] == 0:
match_result = True
else:
match_result = False
else:
if self._get_retval(ql)[0] != 0:
match_result = True
else:
match_result = False
ql.stop()
ql.hook_address(_return_hook, self._default_return_point)
self._pass_args(ql, case["in"])
self._run_emu(ql, entry_point, exit_point)
return match_result
在 matcher 中会调用 _pass_args
方法,按照预先设置好的参数寄存器和传参约定,进行测试用例的参数传递
def _pass_args(self, ql: Qiling, input: list[EmuData]):
mmap_start = self._default_mmap_start
max_buffer_args = self._default_max_buffer_args
buffer_size = self._default_buffer_size
buffer_args_count = 0
_arg_i = 0
for _arg in input:
if _arg_i >= len(self._arg_regs):
ValueError(
f"Too many args: {len(input)} (max {len(self._arg_regs)})!")
if self._type_is_int(_arg):
ql.arch.regs.write(self._arg_regs[_arg_i], _arg.data)
elif _arg.type == DATA_TYPE.STRING:
if buffer_args_count == max_buffer_args:
ValueError(
f"Too many buffer args: {buffer_args_count} (max {max_buffer_args})!")
_ptr = mmap_start+buffer_args_count*buffer_size
ql.mem.write(_ptr, _arg.data+b"\x00") # "\x00" in the end
ql.arch.regs.write(self._arg_regs[_arg_i], _ptr)
buffer_args_count += 1
elif _arg.type == DATA_TYPE.BUFFER:
if buffer_args_count == self._default_max_buffer_args:
ValueError(
f"Too many buffer args: {buffer_args_count} (max {self._default_max_buffer_args})!")
_ptr = mmap_start+buffer_args_count*buffer_size
ql.mem.write(_ptr, _arg.data)
ql.arch.regs.write(self._arg_regs[_arg_i], _ptr)
buffer_aargs_count += 1
_arg_i += 1
目前简单将参数分为了:整数、字符串以及其它缓冲区(包括复杂结构体),未来可以继续扩展
调用 _run_emu
开始运行 Qiling Machine,运行时会不断触发实现设置的Hook,此处略过。由于实现将返回地址设置到了一块空内存上,并在这块内存设置了 Return Hook,所以最终停止执行只有三个原因:执行超时,内存错误,触发 Return Hook
运行前注册的 _return_hook
其实主要就是起到检查作用,检查测试用例的输入传入未知函数后得到的结果是否符合预期。很多时候函数的返回值并不能说明函数的执行效果。比如memmove
函数需要检查 dest 缓冲区是否拷贝了正确的字节;再比如 snprintf
需要模拟格式化字符串输出结果后,再与缓冲区中的字符串作比较。
在 matcher 退出后,需要清空本次测试用例挂上的 Hook,并恢复快照,准备比较下一个测试用例
for case in _test_cases:
...
ql.clear_hooks()
ql.restore(ql_all)
ql.log.info("Restore...")
self.log_success(f"Test Case {_case_i}/{len(_test_cases)} passed!")
给 strcmp 和 memcmp 设置带 \x00
截断的测试用例:
"memcmp": [
{
"in": [
EmuData(b"A"*0x20, DATA_TYPE.BUFFER),
EmuData(b"A"*0x20, DATA_TYPE.BUFFER),
EmuData(0x20, DATA_TYPE.INT32)
],
"out": [
EmuData(0, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"AAAAaaaa", DATA_TYPE.BUFFER),
EmuData(b"AAAAAAAA", DATA_TYPE.BUFFER),
EmuData(0x8, DATA_TYPE.INT32)
],
"out": [
EmuData(-1, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"aisudhakaisudhak", DATA_TYPE.BUFFER),
EmuData(b"AAAAAAAAaisudhak", DATA_TYPE.BUFFER),
EmuData(0x10, DATA_TYPE.INT32)
],
"out": [
EmuData(-1, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"AAAAAAAA\x00AAAAAAA", DATA_TYPE.BUFFER),
EmuData(b"AAAAAAAA\x00BBBBBBB", DATA_TYPE.BUFFER),
EmuData(0x10, DATA_TYPE.INT32)
],
"out": [
EmuData(-1, DATA_TYPE.INT32)
]
},
],
"strcmp": [
{
"in": [
EmuData(b"A"*0x20, DATA_TYPE.STRING),
EmuData(b"A"*0x20, DATA_TYPE.STRING),
],
"out": [
EmuData(0, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"AAAAaaaa", DATA_TYPE.STRING),
EmuData(b"AAAAAAAA", DATA_TYPE.STRING),
],
"out": [
EmuData(-1, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"AAAAAAAA\x00AAAAAAA", DATA_TYPE.STRING),
EmuData(b"AAAAAAAA\x00BBBBBBB", DATA_TYPE.STRING),
],
"out": [
EmuData(0, DATA_TYPE.INT32)
]
},
],
给 atoi 和 strtol 设计带有 base 参数的测试用例,并且在匹配 atoi 前将 base 参数(atoi 本身没有这个参数)对应的寄存器写 0
"atoi": [
{
"in": [
EmuData(b"12345", DATA_TYPE.STRING)
],
"out": [
EmuData(12345, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"1923689", DATA_TYPE.STRING)
],
"out": [
EmuData(1923689, DATA_TYPE.INT32)
]
},
],
"strtoul": [
{
"in": [
EmuData(b"12345", DATA_TYPE.STRING),
EmuData(0, DATA_TYPE.INT32), # endptr
EmuData(10, DATA_TYPE.INT32) # base
],
"out": [
EmuData(12345, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"12345", DATA_TYPE.STRING),
EmuData(0, DATA_TYPE.INT32), # endptr
EmuData(16, DATA_TYPE.INT32) # base
],
"out": [
EmuData(74565, DATA_TYPE.INT32)
]
},
{
"in": [
EmuData(b"0x100", DATA_TYPE.STRING),
EmuData(0, DATA_TYPE.INT32), # endptr
EmuData(16, DATA_TYPE.INT32) # base
],
"out": [
EmuData(256, DATA_TYPE.INT32)
]
},
]
...
# Easy to distinguish from strtoul/strtol
ql.arch.regs.write(self._arg_regs[1], 0xdeadbeef)
ql.arch.regs.write(self._arg_regs[2], 0xffff)
...
- 添加 dest 和 src 缓冲区的内存访问 Hook,保证运行时这两个参数都要被访问到
- 运行结束后检查 dest 和 src 缓冲区中的值是否不变,memcmp 函数不应该改变这两个缓冲区的值经过实际测试,增加额外检查后,近似函数导致的误报率大大降低
load_elf_segments
错误。虽然说可以通过手动 map ELF文件到指定的 Base 上,但是这总归是个极大影响使用体验的东西;本题难度还是比较大的,要想做好的话需要花不少时间,前期在第一第二题花了不少时间,第三题只能在3天里面极限整了一个仓促的解决方案,最后效果也不尽人意。但是如果继续修改,个人认为还是能产生不错效果。
几个关键的前提:
基于以上几点考虑,我们决定基于科恩实验室开发的一个比较成熟的漏洞检测框架 KeenSecurityLab/BinAbsInspector 开展具体工作
该框架支持使用 Ghidra 的 headless 模式,利于命令行处理数据。并且提供了P-code visitor,可以通过符号执行的方式遍历 P-code,判断指令中某个操作数是否存在潜在的溢出。还提供了各种自带的 Checker,每个 Checker 对应一种 CWE。当程序分析完成后,该框架就可以调用指定 Checker 分析反编译后的程序:
可以发现其中本身就提供了 CWE190 —— 也就是整数溢出的检测模块,但是非常遗憾的是这个模块实现得较为简单,没有针对漏洞特点进行进一步处理,所以漏报率和误报率都很高。
这是原生的代码实现:
/**
* CWE-190: Integer Overflow or Wraparound
*/
public class CWE190 extends CheckerBase {
private static final Set<String> interestingSymbols = Set.of("malloc", "xmalloc", "calloc", "realloc");
public CWE190() {
super("CWE190", "0.1");
description = "Integer Overflow or Wraparound: The software performs a calculation that "
+ "can produce an integer overflow or wraparound, when the logic assumes that the resulting value "
+ "will always be larger than the original value. This can introduce other weaknesses "
+ "when the calculation is used for resource management or execution control.";
}
private boolean checkCodeBlock(CodeBlock codeBlock, Reference ref) {
boolean foundWrapAround = false;
for (Address address : codeBlock.getAddresses(true)) {
Instruction instruction = GlobalState.flatAPI.getInstructionAt(address);
if (instruction == null) {
continue;
}
for (PcodeOp pCode : instruction.getPcode(true)) {
if (pCode.getOpcode() == PcodeOp.INT_LEFT || pCode.getOpcode() == PcodeOp.INT_MULT) {
foundWrapAround = true;
}
if (pCode.getOpcode() == PcodeOp.CALL && foundWrapAround && pCode.getInput(0).getAddress()
.equals(ref.getToAddress())) {
CWEReport report = getNewReport(
"(Integer Overflow or Wraparound) Potential overflow "
+ "due to multiplication before call to malloc").setAddress(
Utils.getAddress(pCode));
Logging.report(report);
return true;
}
}
}
return false;
}
@Override
public boolean check() {
boolean hasWarning = false;
try {
BasicBlockModel basicBlockModel = new BasicBlockModel(GlobalState.currentProgram);
for (Reference reference : Utils.getReferences(new ArrayList<>(interestingSymbols))) {
Logging.debug(reference.getFromAddress() + "->" + reference.getToAddress());
for (CodeBlock codeBlock : basicBlockModel.getCodeBlocksContaining(reference.getFromAddress(),
TaskMonitor.DUMMY)) {
hasWarning |= checkCodeBlock(codeBlock, reference);
}
}
} catch (Exception exception) {
exception.printStackTrace();
}
return hasWarning;
}
}
可以发现这个模块就是直接遍历 Reference 所在 BasicBlock 的指令流,判断是否有潜在的整数溢出运算指令,在此基础上检查是否遇到了调用 Sink 函数的 Call 指令,条件满足则输出。这样会导致肉眼可见的误报。
最终,基于 BinAbsInspector 框架,我们构思了以下的实现思路来实现整数溢出漏洞检测:
- PcodeVisitor 负责完成潜在整数溢出指令的**标记**
- Checker 负责检查 Sink 处的函数调用**参数**,以确认其是否受到了被标记指令的影响
- 这里暂时没有实现 Source 的约束,即使框架本身已经提供了 TaintMap 去回溯指令的 Source 函数,主要考虑是避免不小心整出更多 BUG 导致跑不出有分数的答案交上去...
不太擅长写 Java,写得蠢的地方不要见怪
在 Checker 模块添加自定义 Sink,并实现扫描程序 Symbol Table 自动提取 Sink 的功能(就是一暴力枚举):
SymbolTable symbolTable = GlobalState.currentProgram.getSymbolTable();
SymbolIterator si = symbolTable.getSymbolIterator();
...
while (si.hasNext()) {
Symbol s = si.next();
if ((s.getSymbolType() == SymbolType.FUNCTION) && (!s.isExternal()) && (!isSymbolThunk(s))) {
for(Reference reference: s.getReferences()){
Logging.debug(s.getName() + ":" + reference.getFromAddress() + "->" + reference.getToAddress());
hasWarning |= checkCodeBlock(reference, s.getName());
}
}
}
...
这里首先从符号表提取出所有符号,然后过滤出函数符号,过滤掉 External 符号,过滤掉 Thunk 符号剩下来的作为 Sink。其实这样的过滤还是太粗略的,可以大致总结一些基本不可能成为 Sink 但是又高频使用的常见函数构成黑名单,提取 Sink 时从中过滤一下实测效果会好很多。
不再直接遍历 CodeBlock 中的 Instruction,因为这样使用的是 Raw-Pcode。与 Raw-Pcode 相对应的是 High-Pcode。Raw-Pcode 只是将返汇编指令直接抽象出来得到中间的表示方式,它的 CALL 指令无法表示函数调用的参数信息。而 High-Pcode 是经过 AST 分析后得到的,其包含的 Varnode 具有语法树上的关联关系,CALL 指令也包含了传入的参数
先获取 Sink 函数的引用点所在函数,调用 decompileFunction 进行反编译,分析函数的AST结构,并得到 High Function,由 High Function 可以获得 PcodeOpAST,PcodeOpAST 继承自 PocdeOp 类,也就是上面所说的 High-Pcode
DecompileOptions options = new DecompileOptions();
DecompInterface ifc = new DecompInterface();
ifc.setOptions(options);
// caller function
Function func = GlobalState.flatAPI.getFunctionContaining(ref.getFromAddress());
if (func == null) {
Logging.debug("Function is null!!!");
return false;
}
if (!ifc.openProgram(GlobalState.currentProgram)) {
Logging.debug("Decompiler" + "Unable to initialize: " + ifc.getLastMessage());
return false;
}
ifc.setSimplificationStyle("decompile");
Logging.debug("Try to decompile function...");
DecompileResults res = ifc.decompileFunction(func, 3000, null);
if (res == null) {
Logging.debug("Decompile res is null!!!");
return false;
}
Logging.debug("Decompile success!");
HighFunction highFunction = res.getHighFunction();
if (highFunction == null) {
Logging.debug("highFunction is null!!!");
return false;
}
Iterator<PcodeOpAST> pCodeOps = highFunction.getPcodeOps();
if (pCodeOps == null) {
Logging.debug("pCodeOps is null!!!");
return false;
}
迭代遍历函数中所有的 pCode,判断是否属于4种算数运算之一,如果是的话则检查 PcodeVisitor 是否有将该指令标记为潜在溢出指令。如果条件都符合则标记 foundWrapAround
为真,并保存最后一条潜在溢出指令地址到 lastSinkAddress
while(pCodeOps.hasNext()) {
if(found){
break;
}
pCode = pCodeOps.next();
if (pCode.getOpcode() == PcodeOp.INT_LEFT
|| pCode.getOpcode() == PcodeOp.INT_MULT
|| pCode.getOpcode() == PcodeOp.INT_ADD
|| pCode.getOpcode() == PcodeOp.INT_SUB) {
if(PcodeVisitor.sink_address.contains(Utils.getAddress(pCode))){
foundWrapAround = true;
// get pCode's address and store it in lastSinkAddress
lastSinkAddress = Utils.getAddress(pCode);
} else{
Logging.debug("sink_address set does not contain: "+String.valueOf(Utils.getAddress(pCode).getOffset()));
}
}
...
}
其中
PcodeVisitor.sink_address
是下文添加的一个用于保存潜在溢出指令的数据结构
因为不能直接认为潜在整数溢出指令就一定会导致后续 CALL 所调用的 Sink 函数会受到整数溢出影响,所以还需要明确整数溢出的位置是否影响到了函数的参数。为了提高效率,可以只检查函数的 size 参数或者 length 参数的位置,将这些位置对应的 Varnode 的 def 地址和 lastSinkAddress
作比较来确定参数是否受到溢出影响(事实上这操作也有一些问题)。
switch(symbolName){
...
case "calloc":
if(pCode.getInput(1) == null && pCode.getInput(2) == null){
Logging.debug("Input(1) & Input(2) is null!");
break;
}
found = true;
if (Utils.getAddress(pCode.getInput(1).getDef()) == lastSinkAddress
|| Utils.getAddress(pCode.getInput(2).getDef()) == lastSinkAddress) {
found = true;
}
break;
case "realloc":
if(pCode.getInput(2) == null){
Logging.debug("Input(2) is null!");
break;
}
found = true;
if (Utils.getAddress(pCode.getInput(2).getDef()) == lastSinkAddress) {
found = true;
}
break;
...
}
这个模块主要完成符号执行的功能,如果某条指令发生了潜在的整数溢出可以通过 Kset 的
isTop()
方法来检查
添加一个 public 的静态 HashSet 变量,用于保存那些被符号执行认为存在潜在整数溢出的指令
public static HashSet<Address> sink_address = new HashSet<Address>();
在 PcodeVisitor 对之前提到的四种运算指令进行符号执行时,通过 isTop()
检查 Pcode 的两个 Input Varnode 和一个 Output Varnode 对应的符号值是否存在潜在的整数溢出,如果有则标记到 HashSet<Address> sink_address
中以便 Checker 访问
public void visit_INT_MULT(PcodeOp pcode, AbsEnv inOutEnv, AbsEnv tmpEnv) {
Varnode op1 = pcode.getInput(0);
Varnode op2 = pcode.getInput(1);
Varnode dst = pcode.getOutput();
KSet op1KSet = getKSet(op1, inOutEnv, tmpEnv, pcode);
KSet op2KSet = getKSet(op2, inOutEnv, tmpEnv, pcode);
KSet resKSet = op1KSet.mult(op2KSet);
setKSet(dst, resKSet, inOutEnv, tmpEnv, true);
updateLocalSize(dst, resKSet);
// CWE190: Integer Overflow
if (resKSet.isTop() || op1KSet.isTop() || op2KSet.isTop()) {
Logging.debug("Add new sink address: "+String.valueOf(Utils.getAddress(pcode).getOffset()));
sink_address.add(Utils.getAddress(pcode));
}
IntegerOverflowUnderflow.checkTaint(op1KSet, op2KSet, pcode, true);
}
lastSinkAddress
做了比较,导致如果在 CALL 之前出现多个潜在溢出指令时,可能会无法匹配到正确的那条指令,这也会导致大量的漏报情况