关于Faster的那些事...

May 24, 2020

听说最近的网鼎杯基本每场都放了faster这种类型的题,故开篇博客总结一下。

faster这样的题大概是意在锻炼自动化漏洞挖掘技术,在pwn出题方向越来越窄(?)的今天打开了一条新的道路。

这种题型最早出现是在De1taCTF2020的code_runner,code_runner这道题mips架构,而且要求1.4s内跑完n层关卡,即输入一次必须满足这所有n层关卡的要求才可以达到最后栈溢出的位置。这个题给我的第一反应就是要用到angr、z3这类自动化求解工具。

先介绍一下angr,官网在这,如果想更深一步了解的话强烈建议angr的blog,比如从这篇开始

CLE: binary loader

Archinfo: 对应CLE的结果给出大小端等有用信息提供给angr

SimEngine: angr中的模拟执行引擎

PyVEX: angr中把汇编代码转化为中间语言(IR)的部件

Claripy: 约束求解器,后端用的微软的z3

SimOS: 负责模拟各类文件以及设备

跟angr相关的东西实在是太多,可以跑example或者这个自己体会下

当时code_runner的关卡中不仅有简单的字节相等判断,还有平方差绝对值的比较运算,后者就对angr和z3不太友好,所以经过我各种调参最后最快只能到3秒..

import angr
import claripy
import logging
import time
#logging.getLogger("angr").setLevel("ERROR")

def getaddr(p):
    cfg = p.analyses.CFGFast(regions=[(p.loader.main_object.min_addr, p.loader.main_object.max_addr)], force_complete_scan=False)
    addrs = []
    for address,function in cfg.functions.items():
        addrs.append(address)
    #print("\n".join(hex(i) for i in addrs[-14-17+1:-14+1]))
    return addrs[-14-17+1:-14+1]

def getav(addr,p):
    av = []
    for i in range(len(addr)):
        irsb = p.factory.block(addr[i])
        while(True):
            if i+1 == len(addr):
                break
            if irsb.addr == addr[i+1]:
                break
            road = list(irsb.vex.constant_jump_targets)
            if len(road) == 2 and irsb.vex.operations[-1] == 'Iop_CmpNE32':
                if road[0] == int(str(irsb.vex.next),16):
                    av.append(road[1])
                else:
                    av.append(road[0])
            if len(road) == 1 and irsb.vex.jumpkind == 'Ijk_Call':
                av.append(irsb.instruction_addrs[-1]+0xC)
            irsb = p.factory.block(int(str(irsb.vex.next),16))
    av = list(set(av))
    return av

def angr_run(filename): 
    p = angr.Project(filename, load_options={
        'auto_load_libs':False,
        'main_opts': {
            'backend': 'blob',
            'base_addr': 0x400000,
            'arch': 'mipsel',
        },
    })
    pass_addr = getaddr(p)
    pass_addr.reverse()
    result = b""  
    state = p.factory.blank_state(addr=pass_addr[0])
    password = [claripy.BVS("flag_%d"%i,8*4) for i in range(16)]
    state.regs.a0 = 0x4140e0
    k = 0
    for i in range(16):
        state.memory.store(0x4140e0+4*i,password[k])
        k+=1    
    sm = p.factory.simgr(state)
    sm.active[0].options.add(angr.options.LAZY_SOLVES)
    sm.use_technique(angr.exploration_techniques.DFS())
    av = getav(pass_addr,p)
    sm.explore(find=pass_addr[-1],avoid=av)
    #print(",".join(hex(i) for i in av))
    print(len(av))
    st = sm.found[0]
    for i in password:
        result+=(st.solver.eval(i,cast_to=bytes))
    return result

首先这样的题,既然它能不断批量生成这么一个随机的关卡,那这些关卡的位置一定是可以找到规律的,基本上都可以从程序的CFG(控制流图)或者可获取到的函数列表中找到规律。CFG可以通过angr生成(比如说上面代码中的project.analyses.CFGFast),函数列表可以通过radare2生成。这样我们就找到了要动态解决的起点。

如果要让angr跑程序的话,有两点会提高效率,一种是hook代码,一种是设定avoid条件。hook代码即人工把某部分汇编代码翻译成python之后告诉angr,当你执行到这里的时候它其实是这个意思,就不用让他再跑VEX了。路径爆炸是符号执行一定要解决的问题,所以说可以提前帮助angr做剪枝,使得它不会再其他不可能进行下去的分支上耽误太长时间。上面的getav就是获取avoid地址列表。

getav的思路: 其实还是找规律,我写getav之前学长手动把所有avoid地址找出来了,就可以找规律。上面提到了angr会把汇编转化成IR语言,那我们也可以通过翻译完的IR语言来找到每一个条件判断之后的跳转信息。一般来说,基本都是不符合条件就直接跳下去,不再继续执行函数,所以说这就是明显的特征之一,我们找到跳下去的部分,用vex.next看如果不跳下去会到什么地址,然后直接把另一条路拉到avoid里就OK了,这是第一个特征。

第二个特征就是,经过调试发现,angr在执行完函数之后还会返回来看call之后的部分,其实call之后的部分不用angr管了,所以说call之后的部分也要avoid掉。

下面在angr执行的时候我多加了两个参数,一个是angr.options.LAZY_SOLVES,另一个是angr.exploration_techniques.DFS(),我的理解是,使用LAZY_SOLVES可以不用特别频繁的调用z3来约束求解,可以省时间,而使用angr.exploration_techniques.DFS()则是想要让angr一条路走到黑,让angr只管往前走,尽量不管其他分支,这样也有副作用,就是取绝对值部分可能就会被统一处理成取负或者什么都不做,不过这都是随机的,什么情况都有可能出现,为了程序运行效率达到最大就也把这个加上了。

最后看了各路大神的WP,发现做出来的基本都是找运算pattern识别运算总结出来式子之后自己解的,这样的话就可以轻松1.3秒以内。

接下来就是网鼎第一天的faster。网鼎的要求就比De1taCTF要求要低得多,基本上跑出来就算你过,时间要求可忽略不计。然后faster就不是运算类的关卡了,而是类似于走迷宫,100个或者1000个函数走switch,通过输入确定路径,到了目的地之后会给你一个栈溢出让你pwn。

在汇编中,switch是有jump table做支撑的,可以直接分析jump table而避免使用angr...刚开始我没有想到这点,依旧用angr做的,然后angr 100秒跑出来,直接分析似乎不到一秒就出了...

import angr
import r2pipe
import logging
import claripy
import time

logging.getLogger("angr").setLevel("INFO")

r2bin = r2pipe.open("pwn-1")
addr_list = []
r2bin.cmd("aaa")
func100_addr = r2bin.cmd("?= @sym.func100")[:-1]

for i in range(0,101):
    tmp = r2bin.cmd("?= @sym.func%s" % str(i).rjust(3,"0"))[:-1]
    addr_list.append(int(tmp,16))

print(",".join([hex(i) for i in addr_list]))

#addr_list.reverse()


result = b""

flag_chars = [claripy.BVS('flag_%d' % i, 8) for i in range(100)]
flag = claripy.Concat(*flag_chars + [claripy.BVV(b'\n')])

start_time = time.time()

# solution 1
if 1:
    p = angr.Project("pwn-1")
    next = addr_list[0]
    state = p.factory.blank_state(addr=addr_list[0])#,stdin=flag)

    if 1:
        for k in flag_chars:
            state.solver.add(k >= ord("0"))
            state.solver.add(k <= ord("z"))

    sm = p.factory.simgr(state)
    sm.step(until=lambda pg: len(pg.active)>1 )
    for i in range(1,101):
        next = addr_list[i]
        if len(sm.active) >1 :
            if sm.active[0].addr == sm.active[1].addr:
                sm.merge()                
                sm.step(until=lambda pg: len(pg.active)>1 )
        sm.step(until=lambda pg: len(pg.active)>1 )
        sm.move(from_stash='active', to_stash='deadended', filter_func=lambda s: not next == s.addr)
        sm.drop(stash='deadended')
        sm.step(until=lambda pg: len(pg.active)>1 )
        print(sm.active[0].posix.dumps(0),hex(next))

#solution 2
if 0:
    p = angr.Project("pwn-1")
    next = addr_list[0]
    result = []
    flag = claripy.BVS('flag', 8)
    for i in range(1,101):
        state = p.factory.blank_state(addr=addr_list[i-1],stdin=flag)
        num = claripy.BVV(i-1, 8)
        state.memory.store(0x6090AC,num)
        if 1:
            state.solver.add(flag >= ord("0"))
            state.solver.add(flag <= ord("9"))
        sm = p.factory.simgr(state)
        sm.explore(find=addr_list[i],avoid=0x4006B0)
        result.append(sm.found[0].posix.dumps(0))

end_time = time.time()
print(end_time - start_time)


这里体现了两个思路,第一个就是让angr一口气按流程执行下来,最终结果为总输入,第二个就是让angr一步一步求,最终结果为每一次explore完毕的输入合到一起的字符串。中间加了对输入的约束,因为angr总会给我们带来各种惊喜(x),似乎中间总会有可能约束到一些奇奇怪怪的字符,然后输入到程序中还能通过的情况...这个就没再细研究了,以后再说。

在angr中是有机器状态(state)这个概念的,也就是说这个state记录了执行到某地址时其内存寄存器等部件的状态,每多出一种分支就会多一个state,然后angr会默认对这些state进行类似于bfs算法的队列操作,而state也是也是有状态归类的,比如说如果执行了explore最后找到一组state满足条件,则这个state被划归到found目录下,即simgr.found这个list中,若状态刚生成或者状态还没有跑完则会被划归到active目录下,也算是给不同情况下的list归类。

由上面已经了解到的知识我详细说说第一种思路,首先我创建一个state,设定起始位置,然后用sm.step单步跑,在until中写清停止条件,这里我设置终止条件是,一旦这个state开始分叉我就停止,因为有switch所以一定会有一个时刻会分到switch个的state,这时还没有进下一个call函数,才在基本块的开头停了下来,这时我们就再让angr执行一次,各个state的起始位置就到了各个函数的开头位置。这里需要注意一点,step的最小单位是一个基本块,即遇到更改ip指令(比如说jmp,jz,jnz,call等等)的下一条指令处停止,而且停止的范围要设立一个最大值,要不然就会断到angr自动hook的函数里,因为angr hook函数具体执行的地址是远大于程序基址范围的。所以说在思路一中我跑了两次step。

接下来就是对state的分类,我们已经知道路径了就可以指导angr进行下一步操作,所有的state只要不符合下一步地址的全都kill掉,这就有了move和drop操作。但是在跑的过程中,angr又一次给我惊喜(x),发现执行的过程中总会有几次是不符合预期的state分叉,若是顺着其中一个state做还推不下去,这时就要用到merge这个操作,merge会把所有的状态合并到一块去,这样就可以继续顺利进行了。

当时我第一想法就是想写第二种的,又再想第二种这样断断续续的会不会使得angr效率很低,结果恰好相反,第二种速度在我这边做是最快的,后来分析了一下,大概是在merge操作的时候耗时有一点点长,这微小的代价导致了第二种会更快一点...

以上是angr部分的一些见解..其实我也不是很了解angr,多记录以后回来再看吧。

之后我们队大佬写了一个分析jmp table的exp,比我的要快好多好多倍...这种题分析pattern还是王道啊...

后来在网鼎第四天又碰到了faster,这次题就没有第一天的容易,首先这次的题把符号全删了,其次把规定路径的部分删掉,要求在迷宫里必须跑1000步跑到出口,多了不行少了也不行。如果没有自成环的switch(即自己跳自己)这就相当于图论的问题了,肯定是不能让angr跑的。那就先做个基本的寻址再考虑凑步数的问题。

可能有人发现我还没提到脚本里比较关键的r2pipe,我解释一下,r2pipe是radare2的python binding。radare2 在我看来是可以跟没有hexray的ida平起平坐的,甚至还能高出一小点(指r2里面的模拟执行),很厉害,ida最基本能完成的东西他都能做到,而且还写了其他语言可以调用的接口,r2部分指令内也提供了json输出的关键字(j),在python中还可以直接把json对象反序列化,也非常方便。

radare厉害但是指令有点难记,听说它的指令类似于windbg,反正r2 <filename>之后一个劲用"?"来看帮助就好了。比如说

(r2载入的第一句即白色部分通常来说都是一句俏皮话,跟msf的字符画一样,别被突然的一句see you in Defcon吓到2333)

然后就问号呗

这样就可以递归学习自己不知道的指令了。如果说觉得太费劲了,为啥不求助万能的google呢?直接用英文搜radare+功能就好了2333

在这种自动利用的题目中大概率会用到以下几个命令:

aaa : 全面分析binary,不分析binary好多功能没法用,一般用在开头。

?= @xxx : xxx表示函数名(当然地址其实也行),这句话可以输出有符号函数的地址。

axt [addr] : find data/code references to this address,即ida中的xref,对着地址按个x的作用,这个可以转成json格式即axtj。

axf [addr] : find data/code references from this address,即ida中的双击(x),对着引用地址双击就跳到那个地址了,这个也可以加j转json格式。

afi [addr|fcn.name] : show function(s) information (verbose afl),我通常用这个看函数调用关系,比如main函数调了func_1234就可以直接用afi或者afij看func_1234的地址。

有用的指令还有很多大家可以回去慢慢发掘...这种自动化挖掘的提醒用xref会多一点,上面这些应该是够了。值得一提的是,radare里也有一个看jump table的,但是支持可能不太给力,还是手写parser吧...

先上网鼎第四天的faster寻路代码

import r2pipe
import random
from pwn import *

def deal_list(addr,x):
    tmp = []
    for i in x:
        tmp.append(i["addr"])
    tmp = set(tmp)
    tmp -= {addr+204}
    return list(tmp)
    
def find_way(start_addr,end_addr):
    aim_before_addr_list = []
    result = r2.cmdj("axtj %d"% end_addr)
    aim_addr_2d = result[0]["fcn_addr"]
    result = r2.cmdj("axtj %d"% aim_addr_2d)
    for i in result:
        if not "main" == i["fcn_name"]:
            aim_before_addr_list.append(i["fcn_addr"])

    way_list = []
    jump_addr = None
    find_list = deal_list(start_addr,r2.cmdj("afij %d"% start_addr)[0]["callrefs"][12:-2])
    tmp_addr = None
    add_addr = None
    while(True):
        status = 0
        for i in find_list:
            if i in aim_before_addr_list:
                status = 1
                jump_addr = i
        if status: #and len(way_list) == 999:
            break
        if len(way_list) >1000 :
            pass
        else:
            tmp_addr = random.choice(find_list)
            #tmp_addr = find_list[-1]
            #tmp_addr = r2.cmdj("afij %d"% start_addr)[0]["callrefs"][-2]["addr"]
            way_list.append(tmp_addr)
            find_list = deal_list(tmp_addr,r2.cmdj("afij %d"% tmp_addr)[0]["callrefs"][12:-2])
    way_list = [start_addr]+way_list+[jump_addr]+[aim_addr_2d]+[end_addr]
    return way_list


def find_input(src,dst):
    tmp_addr = src+48
    jpt_addr = r2.cmdj("axfj %d" % tmp_addr)[0]["to"]
    #print(hex(jpt_addr))
    datas = elf.read(jpt_addr, 40)
    for i in range(0,40,4):
        jmp_tmp = u32(datas[i:i+4])
        # print(jmp_tmp)
        #print(b32i(jmp_tmp))
        jmp_addr = b32i(jmp_tmp) + jpt_addr
        #print(hex(jmp_addr))
        call_offset = u32(elf.read(jmp_addr, 10)[6:])
        #print(call_offset)
        # B8 00 00 00 00 E8 67 4C 00 00
        call_addr = b32i(call_offset) + jmp_addr + 10
        # print(hex(call_addr))
        if call_addr == dst:
            # print('success',j+1,100)
            index = i//4
            return str(index)

def b32i(i):
    if i >= (1 << 31):
        return i - (1 << 32)
    else:
        return i

def find_input_1(src,dst):
    tmp_addr = src+48
    jpt_addr = r2.cmdj("axfj %d" % tmp_addr)[0]["to"]
    datas = elf.read(jpt_addr, 40)
    for i in range(0,40,4):
        jmp_tmp = u32(datas[i:i+4])
        # print(jmp_tmp)
        #print(b32i(jmp_tmp))
        jmp_addr = b32i(jmp_tmp) + jpt_addr
        #print(hex(jmp_addr))
        call_offset = u32(elf.read(jmp_addr, 10)[6:])
        #print(call_offset)
        # B8 00 00 00 00 E8 67 4C 00 00
        call_addr = b32i(call_offset) + jmp_addr + 10
        #print(hex(call_addr))
        if call_addr == dst:
            # print('success',j+1,100)
            index = i//4
            circle_list.append(src)
    #return str(index)

def find_circle():
    for i in range(0x432EA4,0x43CAE4,40):
        addr = r2.cmdj("axtj %d"%i)[0]["fcn_addr"]
        #print(hex(addr))
        find_input_1(addr,addr)


binary_name = "pwn-1"
elf = ELF(binary_name)
r2 = r2pipe.open(binary_name)
r2.cmd("aaa") # r2 init

result = r2.cmdj("aflj")
circle_list = []
find_circle() # find circle_jmp

start_addr = r2.cmdj("afij main")[0]["callrefs"][-1]['addr']# find beginning

aim_addr = None # find end
for i in result:
    if i["nbbs"] == 5 and "init" not in i["name"]:
        aim_addr = i["offset"]


way1 = find_way(start_addr,circle_list[0]) # find start to circle_jmp
way1_result = ""
for i in range(len(way1)-1):
    way1_result += find_input(way1[i],way1[i+1])

way2 = find_way(circle_list[0],aim_addr) # find circle_jmp to end
way2_result = ""
for i in range(len(way2)-1):
    way2_result += find_input(way2[i],way2[i+1])

fin_result = way1_result.ljust(1000-len(way2_result),way1_result[-1])+way2_result
print(fin_result)

if 1:
    io = process(binary_name)
    if 0:
        gdb.attach(io,
        '''
        b *0x432D55
        c
        ''')
    io.sendline(fin_result)
    io.interactive()

基本的思路就是找自己跳自己的switch(即代码里的circle_jmp),然后找到开头到circle_jmp的路径,再找circle_jmp到目的地的路径,最后用circle_jmp填充到1000步,就可以了。因为写得比较急又懒得改,find_input和find_input_1其实功能是一样的,只不过后者多了一个append而已...

还有一个细节在寻路函数(find_way)中。因为1000个函数中找一个函数的调用基本属于大海捞针,或者代价很高(至少我在写的时候是这么想的,没验证),我就想,把搜索的范围扩大到 找到 调用目标地址的地址 的调用,即aim_addr_2d与aim_before_addr_list,这样搜索要求就降低了好多,也更好找。比较有意思的是,也是在写这里代码的时候我发现有switch可以自成环,所以说我在里面气急败坏的写了个random进去,效果竟出奇的好...你只管大力,剩下的全靠奇迹23333

所以说,总结一下这种自动化pwn题的解题步骤:

  1. 先从nc中提出两个二进制(找规律的时候两点确定一条直线),一般都是gzip+upx。
  2. 找规律分析关卡入口与出口(上面这个题我是找的函数基本块信息锁定的出口)。
  3. 写出寻路exp(在时间要求严格或者要求图算法的时候能不用angr就不用angr,找pattern还是挺香的)
  4. 测试寻路,找个程序跑跑试试,关键时刻建议下断点跟踪,没问题了再继续。
  5. 写出口后的处理脚本(就像网鼎faster后面那个栈溢出利用一样),然后本地测试。
  6. 写nc传binary脚本,做最后的合并与完善。
  7. Get flag。

大概就这些,文章有什么出纰漏的地方或者各位有什么其他好的想法都可以联系我,联系方式的话...仔细读文章就有啦~

Great! You've successfully subscribed.
Great! Next, complete checkout for full access.
Welcome back! You've successfully signed in.
Success! Your account is fully activated, you now have access to all content.
京ICP备18053813号-1