404 CTF 2024 - Using miasm to automate RE

404 CTF is a French cybersecurity contest organized by the French intelligence, and challenges were created by HackademINT, a cyber security club I’m part of.

For the competition I created various challenges, however, we won’t talk about them here. Instead I’d like to show you my solve of another challenge, created by a friend (Izipak). I think this solve might be useful for people trying to learn the miasm framework. Here we will play a bit with the Jitter, and with Static Symbolic Execution (SSE).

As it is quite hard to have up to date miasm learning ressources, I will focus on this part the most. I will also quickly explain the reverse engineering part, however I won’t go into much details, since I assume a rather “advanced” audience, that maybe have already solved the challenge.

If you came here because you have no idea what happens in the binary: you won’t understand every little details.

If you came here to know how to automate the challenge in a smart way, or to learn miasm with examples : you are in the right place !

Discovery

The description of the challenge (French)

Quick summary: this is the part 2 of another (easier) challenge, I did not try the part 1 and jumped straight to this one. We are given a single-use binary to reverse through the port 31990. We have 10 minutes to reverse it and to give the solution to the verification server at port 31991.

Let’s download a binary and open it in IDA.

Cleaned IDA output of the main function

Let’s go through the reversing very quickly. The binary connects to port 31992 to authenticate (with a token, identifying the challenge instance) and retrieves some executable code. Those functions pointers are stored for later. The binary gets into the verification loop, gets a 16 byte blob used later (I called it the seed), aswell as some string to print. It gets the user input, and verifies it.

Now let’s see this verification phase in more details.

Cleaned IDA output of the verify function

It takes as arguments: a table of function pointers, a pointer to the received seed, and a pointer to the input. It calls each function on the input (we’ll see later that it modifies it with some reversible operations). It then computes the check value with the seed. Finally it compares the modified output with the check.

By this point, we now know how to get the flag.

  1. Get a new challenge (do not launch it !)
  2. Send the token to port 31992 and receive all the functions.
  3. Receive the seed.
  4. Emulate the compute_check function to get the check value from the seed.
  5. Reverse all function calls to get the input that gives the check value.
  6. Go back to step 3 if we did not receive the “done” message (that gives the 0x1337 return value).

Then we can send all the appropriate information to the verification server (this is explained in the received messages).

All the networking can be done easily with python, with raw sockets or with pwntools. I chose to use Nclib, because I’m used to it.

Emulation of compute_check

IDA output of the compute_check function

This is quite ugly, and each instance of a challenge is different. We need to automaticaly compute the value given by this function, from any seed. It could be done via some dynamic analysis (e.g., gdb or radare2 scripting), however let’s see how we can use miasm’s Jitter (emulation engine) to do it.

Firstly, let’s talk quickly about some miasm concepts.

  • The LocationDB is what stores locations, which are abstract representations of “addresses”.
  • MiasmIR is the intermediate representation of miasm. You can lift from a number of architecture to it. It is based on an expression tree.
  • The jitter, is the emulation engine.

For this part you can also read this notebook which explains the jitter.

The imports

from miasm.jitter.csts import PAGE_READ, PAGE_WRITE, PAGE_EXEC
from miasm.analysis.machine import Machine
from miasm.core.locationdb import LocationDB

These are the imports we need, some constants for the jitter, the Machine object, The LocationDB object. The Machine object is an abstraction of the architecture.

As the function to be emulated is always at the same address (0x13c5) we can hardcode it. This function also makes a call to malloc, we will need to hook it.

So the first part of our emulate function is:

def emulate(arg):
    """
    Emulate fun_13c5 to retrieve the goal.
    """
    assert len(arg) == 16
    assert type(arg) is bytes

    loc_db = LocationDB()

    f_addr = 0x13c5
    arg_addr = 0x10000
    dest_addr = 0x20000
    text_addr = 0x40000000
    malloc_addr = 0x1110

We create a LocationDB to store the locations, and we set some constants.

  • f_addr: address of the emulated function
  • arg_addr: address of the argument
  • dest_addr: address of the destination
  • text_addr: where the executable code is loaded
  • malloc_addr: the address of malloc (as called by the binary)

Let’s setup our jitter. We can get one for x86 through the Machine object.

myjit = Machine("x86_64").jitter(loc_db)

myjit.init_stack()

# This is the common way to stop after the ret, 
# since it will try to jump to the address on the top of stack !
myjit.push_uint64_t(0x1337beef)
myjit.add_breakpoint(0x1337beef, code_sentinelle)
myjit.add_breakpoint(text_addr + malloc_addr, malloc_handler)

The begining is self explanatory. We then create what is called “breakpoints”. This is the mechanism in miasm to do hooking for the jitter. Here we have 2, the first one is called the “code sentinelle” and tracks when we need to stop. By pushing a value on the stack, as the saved rip, we can break on the return of the function. The second one is for malloc.

The second argument of the add_breakpoint function is a python function, launched when the address is met during the emulation. Let’s see how it works.

def code_sentinelle(jitter):
    """
    Stops the jitter
    """
    jitter.running = False
    return False

def malloc_handler(jitter):
    ret = jitter.get_stack_arg(0)
    jitter.cpu.RAX = dest_addr

    # Update the RIP register, AND the jitter pc (!)
    jitter.cpu.RIP = ret
    jitter.pc = ret

    return True

Their return value (True/False) dictates if the jitter continues to run after the hook. For the malloc hook, I simply get the return address from the stack, in order to get back to the normal execution by setting both the RIP register (in case there are RIP related operations) AND the jitter pc (which is address the jitter executes).

The malloc hook just overwrites RAX with the fixed destination address we chose.

In our emulate function we can now setup the memory with the challenge code, and run it ! It is done with the vm interface of the jitter.

myjit.vm.add_memory_page(text_addr, PAGE_READ | PAGE_WRITE | PAGE_EXEC, data, "text")
myjit.vm.add_memory_page(arg_addr, PAGE_READ | PAGE_WRITE, arg, "arg")
myjit.vm.add_memory_page(dest_addr, PAGE_READ | PAGE_WRITE, b'\x00'*16, "dest")

myjit.cpu.RDI = arg_addr
myjit.run(text_addr + f_addr)
return myjit.vm.get_mem(dest_addr, 16)

data is just the challenge that was downloaded.

Here is the full forward_verifier.py file. (I am bad at naming stuff)

from miasm.jitter.csts import PAGE_READ, PAGE_WRITE, PAGE_EXEC
from miasm.analysis.machine import Machine
from miasm.core.locationdb import LocationDB

def emulate(arg):
    """
    Emulate fun_13c5 to retrieve the goal.
    """
    assert len(arg) == 16
    assert type(arg) is bytes

    loc_db = LocationDB()

    f_addr = 0x13c5
    arg_addr = 0x10000
    dest_addr = 0x20000
    text_addr = 0x40000000
    malloc_addr = 0x1110

    def code_sentinelle(jitter):
        """
        Stops the jitter
        """
        jitter.running = False
        return False

    def malloc_handler(jitter):
        ret = jitter.get_stack_arg(0)
        jitter.cpu.RAX = dest_addr

        # Update the RIP register, AND the jitter pc (!)
        jitter.cpu.RIP = ret
        jitter.pc = ret

        return True


    with open("chall/crackme.bin", "rb") as f:
        # We assume it has been downloaded before :)
        data = f.read()

    myjit = Machine("x86_64").jitter(loc_db)

    myjit.init_stack()

    # This is the common way to stop after the ret, 
    # since it will try to jump to the address on the top of stack !
    myjit.push_uint64_t(0x1337beef)
    myjit.add_breakpoint(0x1337beef, code_sentinelle)
    myjit.add_breakpoint(text_addr + malloc_addr, malloc_handler)
    
    myjit.vm.add_memory_page(text_addr, PAGE_READ | PAGE_WRITE | PAGE_EXEC, data, "text")
    myjit.vm.add_memory_page(arg_addr, PAGE_READ | PAGE_WRITE, arg, "arg")
    myjit.vm.add_memory_page(dest_addr, PAGE_READ | PAGE_WRITE, b'\x00'*16, "dest")

    myjit.cpu.RDI = arg_addr
    myjit.run(text_addr + f_addr)
    return myjit.vm.get_mem(dest_addr, 16)


if __name__ == "__main__":
    """
    Simple stand alone use.
    """
    import binascii
    print(emulate(binascii.unhexlify("100B8E1BD2655EA36B3AAE875A71AEF2")))

Symbolic execution of the functions

As stated before, we need to reverse those functions we receive through the socket. I believe most people used some sort of byte-per-byte bruteforce attack, since the functions only applied modification to bytes, one by one. However I will present here a way more general way of solving, that could handle more complex transformations.

I will assume a tiny bit of theoretical knowledge about SSE. If you do not know what it is, you can read about it here.

Firstly, what are those functions like ? Let’s get some by connecting to the server, and open them in radare2.

The block @0x14 is quite long

In the end it’s just a loop of the same transformation on each byte of the 16 byte argument. It is done in-place. Here we could do many things, for instance, only symbolically execute that one block and reverse it, in order to apply the reverse transformation to each byte. However, for the sake of simplicity, we will do the symbolic execution of the whole function (since the loop is concrete, there is no path explosion problem).

In the end I will:

  • Create a function that reverses the function at a given offset in the file containing all the functions.
  • It will do SSE to extract the constraints
  • Transform those constraints to z3
  • Let z3 find the input

Here we can already see an improvement that could be done: Since the functions are the same between each loop (the while loop of the main binary, remember…), we could do the SSE separately and save z3 expressions with those constraints for each function. We could then solve for each input, multiple times. However I did not do it since it’s fast anyway, and it really was not my goal to show you python. This blog post is about miasm, not regular programming. We’ll have to stick with redoing the SSE at each loop. (wasting maybe 5 seconds of our life…, for harder problems, it might be needed)

Let’s get into it !

The imports

from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.core.locationdb import LocationDB
from miasm.ir.symbexec import SymbolicExecutionEngine
from miasm.expression.expression import *
from miasm.analysis.simplifier import expr_simp
from miasm.ir.translators import Translator

import z3

Here we need some more objects from the miasm framework. You already know some from before ! What are the remaining ones about ?

  • Container: an abstraction over the file format
  • SymbolicExecutionEngine: This is what will do the SSE
  • miasm.expression.expression: contains all Expression objects (for the MiasmIR we talked about !)
  • expr_simp: a basic simplifier for expressions, it will help normalize them
  • Translator: This will transform the miasm constraints to z3
  • z3: the regular z3 you all know and love !

Setup and lifting

Since SSE is done on the IR, we need to lift ! (do you even lift ? /j)

def rev(goal, addr):

    # This is what will allow us to go from miasm expressions -> z3 expressions.
    z3trans = Translator.to_language("z3")

    # Let's setup miasm ----
    loc_db = LocationDB()

    with open("chall/funcs", "rb") as f:
        # We assume we retrieved the functions
        cont = Container.from_stream(f, loc_db)

    machine = Machine("x86_64")
    mdis = machine.dis_engine(cont.bin_stream, loc_db=cont.loc_db)

    asmcfg = mdis.dis_multiblock(addr)
    lifter = machine.lifter(mdis.loc_db)
    ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)

    # End of setup ----
    # At this point, we have an IR of the function, on which we'll do symbex.

With the machine we can create a dis engine, which can disassemble the code. We can also create a lifter that will translate from native to IR. In the end we have an IRcfg (the control flow graph of the IR).

SSE

regs = lifter.arch.regs

symb = SymbolicExecutionEngine(lifter)  # <--- Coolest object in the west.

# These are our input bytes, as symbols.
input_ids = [ExprId(f"input[{i}]", 8) for i in range(16)]

# We put those symbols in the argument of the function (@RDI)
initial_state = {
    expr_simp(ExprMem(regs.RDI + ExprInt(i, 64), 8)): input_ids[i]
    for i in range(16)
}
symb.set_state(initial_state)
symb.run_at(ircfg, addr)

Here the setup is quite simple, if you understand the SSE theory (once again: go read about it !!). ExprId is just an expression that has a name; we create one for each byte of the input. These are our initial symbols. The initial state is just a dictionary that links expressions to others. Here we setup our input_ids at RDI, RDI + 1 etc…

We use expr_simp in order to have the most simple form of expressions, for instance, we get rid of the RDI + 0.

In the end, once the setup is done, we can just… run it. It will continue until it does not know where to go (non-solvable PC). This is exactly what we need here, since it will stop at the ret.

Let’s get this solution !

Now we have done the SSE. We have somewhere in the symb object an expression (probably too big to print) that gives the exact transformations done by the function. We will now translate those to a z3 solver.

# z3 time !
s = z3.Solver()

for i in range(16):
    # We extract the equation for each input, and add it as constraints to the z3solver.
    s.add(
        z3trans.from_expr(
            symb.symbols[expr_simp(ExprMem(regs.RDI + ExprInt(i, 64), 8))]
        )
        == goal[i]
    )

The only important thing to notice is the use of symb.symbols[], which allows us to retrieve the “value” (symbolic value) of a symbol. We just have to add the z3 constraint for each byte of the goal, which is our argument. In the end we have a regular z3 Solver object, that you can handle the way you prefer. This is how I do it:

# z3 magic :sparkles:
if s.check() == z3.sat:
    m = s.model()
    to_ret = b""
    for c in input_ids:
        to_ret += bytes([m[z3trans.from_expr(c)].as_long()])
    return to_ret
else:
    raise "uh oh..."

The non regular thing is just to remember getting back the z3 symbols used in the solver, by using the transformer again.

In the end, here is the full reverse_func.py file:

from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.core.locationdb import LocationDB
from miasm.ir.symbexec import SymbolicExecutionEngine
from miasm.expression.expression import *
from miasm.analysis.simplifier import expr_simp
from miasm.ir.translators import Translator

import z3


def rev(goal, addr):

    # This is what will allow us to go from miasm expressions -> z3 expressions.
    z3trans = Translator.to_language("z3")

    # Let's setup miasm ----
    loc_db = LocationDB()

    with open("chall/funcs", "rb") as f:
        # We assume we retrieved the functions
        cont = Container.from_stream(f, loc_db)

    machine = Machine("x86_64")
    mdis = machine.dis_engine(cont.bin_stream, loc_db=cont.loc_db)

    asmcfg = mdis.dis_multiblock(addr)
    lifter = machine.lifter_model_call(mdis.loc_db)
    ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)

    # End of setup ----
    # At this point, we have an IR of the function, on which we'll do symbex.

    regs = lifter.arch.regs

    symb = SymbolicExecutionEngine(lifter)  # <--- Coolest object in the west.

    # These are our input bytes, as symbols.
    input_ids = [ExprId(f"input[{i}]", 8) for i in range(16)]

    # We put those symbols in the argument of the function (@RDI)
    initial_state = {
        expr_simp(ExprMem(regs.RDI + ExprInt(i, 64), 8)): input_ids[i]
        for i in range(16)
    }
    symb.set_state(initial_state)
    symb.run_at(ircfg, addr)

    # z3 time !
    s = z3.Solver()

    for i in range(16):
        # We extract the equation for each input, and add it as constraints to the z3solver.
        s.add(
            z3trans.from_expr(
                symb.symbols[expr_simp(ExprMem(regs.RDI + ExprInt(i, 64), 8))]
            )
            == goal[i]
        )

    # z3 magic :sparkles:
    if s.check() == z3.sat:
        m = s.model()
        to_ret = b""
        for c in input_ids:
            to_ret += bytes([m[z3trans.from_expr(c)].as_long()])
        return to_ret
    else:
        raise "uh oh..."


if __name__ == "__main__":
    print(rev(b"JO\xF8=\x1A\x0F\v\xFDJN=J\xF8\xE5\x4A\x0F", 0))

Everything together.

Some python to download the chall and dump the functions to a file.

The file is retrieve.py

import nclib
import os

from colorama import Fore


def retrieve_funcs(until_token=False):
    # Clean time !
    os.system("rm -fr chall")

    nc1 = nclib.Netcat("challenges.404ctf.fr", 31990)
    with open("chall.zip", "wb") as f:
        f.write(nc1.recv_all())

    print(Fore.BLUE + "[ + ] Received " + Fore.GREEN + "chall.zip" + Fore.RESET)

    os.system("unzip chall.zip -d chall > /dev/null")
    print(
        Fore.BLUE
        + "[ + ] Extracted "
        + Fore.GREEN
        + "chall.zip"
        + Fore.BLUE
        + " -> "
        + Fore.GREEN
        + "chall"
        + Fore.RESET
    )

    with open("chall/token.txt", "r") as f:
        token = f.read()

    print(
        Fore.BLUE
        + "[ + ] Got challenge with token : "
        + Fore.GREEN
        + f"{token}"
        + Fore.RESET
    )

    if until_token:
        return
    nc2 = nclib.Netcat("162.19.101.153", 31992)
    nc2.sendline(token.encode())
    tmpmsg = nc2.recvuntil(b"endfunc")[:-7]

    # We gather all the functions in a single file, 
    # We will keep track of the offset of each function in it.
    funcs = tmpmsg
    funcs_addresses = [0]


    while tmpmsg != b"":
        # We don't know how many function to retrieve in advance.
        nc2 = nclib.Netcat("162.19.101.153", 31992)
        nc2.sendline(token.encode())
        tmpmsg = nc2.recvuntil(b"endfunc")[:-7]
        funcs_addresses.append(len(funcs))
        funcs += tmpmsg

    with open("chall/funcs", "wb") as f:
        f.write(funcs)

    print(Fore.BLUE + "[ + ] Retrieved all funcs" + Fore.RESET)
    return token.encode(), funcs_addresses[:-1]


if __name__ == "__main__":
    """
    Stand alone retrieve, it will only download the remote challenge
    And not initiate conversation (fresh state)
    """
    retrieve_funcs(True)

With some colored logging :).

In the end the main solve.py, imports everything and finalizes the communication with the verification server. (Again this is not about regular python programming.)

from retrieve import retrieve_funcs
from forward_verifier import emulate
from reverse_func import rev

from binascii import hexlify
import nclib

from colorama import Fore

# We get the chall, token and function addresses.
token, funcs_addresses = retrieve_funcs()
final = b""

while 1:
    # The amount of layers is unknown.
    nc1 = nclib.Netcat("162.19.101.153", 31992)
    nc1.sendline(token)
    r = nc1.recv_all()

    if b"done" in r:
        break

    seed = r[:16]
    goal = emulate(seed)
    for addr in funcs_addresses[::-1]:
        # We reverse each function, starting by the last one to be called in the binary.
        goal = rev(goal, addr)
    final += goal

print(Fore.BLUE + "[ + ] Final payload in hex : " + Fore.GREEN + hexlify(final).decode() + Fore.RESET)


# Let's get our flag, by sending the solution to the validation server.

nc2 = nclib.Netcat("challenges.404ctf.fr", 31991)

nc2.recvline()
nc2.sendline(token)
nc2.recvline()
nc2.send_line(final)

print(Fore.BLUE + "[ + ] Validation server answer : " + Fore.YELLOW)
print(nc2.recv_all().decode().strip() + Fore.RESET)

# Great challenge !!

Let’s see it, at last !

It takes between 10 and 15s, depending on my network speed

Again this could be improved by a lot with just a little bit of carefully saved snapshots.

In the end it was a super great challenge to train my miasm skills a bit. Thanks again Izipak !

The 404 CTF was again amazing this year, especially in RE :).

HRTC <3

Contact me !

If you want to get in touch with me, please do so on Discord ! My username is @redhpm.

Here are some other links, in case you want to know what I like…


2024-05-12

Your current screen is too small for the site to render properly.