diff --git a/pe/asmdisasm.py b/pe/asmdisasm.py new file mode 100644 index 0000000..17849ed --- /dev/null +++ b/pe/asmdisasm.py @@ -0,0 +1,62 @@ +import sys +import pefile +import pprint +from keystone import Ks, KS_ARCH_X86, KS_MODE_64 +from capstone import Cs, CS_ARCH_X86, CS_MODE_64, CS_MODE_LITTLE_ENDIAN +import logging + +from model.defs import * + +logger = logging.getLogger("AsmDisasm") + + +cs = Cs(CS_ARCH_X86, CS_MODE_64 + CS_MODE_LITTLE_ENDIAN) +cs.detail = True # from RedBackdoorer + + +ks = Ks(KS_ARCH_X86, KS_MODE_64) + +def assemble_lea(current_address: int, destination_address: int, reg: str) -> bytes: + #print("LEAH: 0x{:X} - 0x{:X} = 0x{:X}".format( + # current_address, destination_address, destination_address - current_address)) + offset = destination_address - current_address + encoding, _ = ks.asm(f"lea {reg}, qword ptr ds:[{offset}]") + machine_code = bytes(encoding) + return machine_code + + +def assemble_relative_call(current_address: int, destination_address: int) -> bytes: + # Calculate the relative offset + # For a near jump, the instruction length is typically 5 bytes (E9 xx xx xx xx) + offset = destination_address - current_address + + # Assemble the jump instruction using Keystone + encoding, _ = ks.asm(f"call qword ptr ds:[{offset}]") + machine_code = bytes(encoding) + + # Disassemble the machine code using Capstone + #cs = Cs(CS_ARCH_X86, CS_MODE_64) + #disassembled = next(cs.disasm(machine_code, current_address)) + #logger.info(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}") + #logger.info(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}") + return machine_code + + +def assemble_relative_jmp(current_address: int, destination_address: int) -> bytes: + offset = destination_address - current_address + encoding, _ = ks.asm(f"jmp {offset}") + machine_code = bytes(encoding) + return machine_code + + +def asm_disasm(asm_text, offset=0): + for instr in cs.disasm(asm_text, offset): + printInstr(instr) + + +def printInstr(instr, depth=0): + _bytes = [f'{x:02x}' for x in instr.bytes[:8]] + if len(instr.bytes) < 8: + _bytes.extend([' ',] * (8 - len(instr.bytes))) + instrBytes = ' '.join([f'{x}' for x in _bytes]) + logger.info('\t' * 1 + f' [{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}') diff --git a/pe/derbackdoorer.py b/pe/derbackdoorer.py index 8aa8edc..f662759 100644 --- a/pe/derbackdoorer.py +++ b/pe/derbackdoorer.py @@ -13,7 +13,7 @@ from intervaltree import * from utils import hexdump from pe.superpe import SuperPe from model.defs import * -from pe.pehelper import assemble_relative_call, assemble_relative_jmp +from pe.asmdisasm import assemble_relative_jmp, asm_disasm, cs, ks, printInstr logger = logging.getLogger("DerBackdoorer") @@ -28,10 +28,6 @@ class FunctionBackdoorer: def __init__(self, superpe: SuperPe, depth_option=DEPTH_OPTIONS.LEVEL1): self.superpe: SuperPe = superpe self.pe_data = self.superpe.pe.get_memory_mapped_image() - - self.cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64 + capstone.CS_MODE_LITTLE_ENDIAN) - self.ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64 + keystone.KS_MODE_LITTLE_ENDIAN) - self.cs.detail = True self.depth_option: DEPTH_OPTIONS = depth_option @@ -60,12 +56,7 @@ class FunctionBackdoorer: # Show Result logger.info("--[ Patched result of function: ".format()) data = self.pe_data[function_addr:addr+len(compiled_trampoline)] - self.asm_disasm(data, offset=function_addr) - - - def asm_disasm(self, asm_text, offset=0): - for instr in self.cs.disasm(asm_text, offset): - self.printInstr(instr, 0) + asm_disasm(data, offset=function_addr) def find_suitable_instruction_addr(self, startOffset, length=256): @@ -90,8 +81,8 @@ class FunctionBackdoorer: def _find_suitable_instruction_addr(self, startOffset, length, option): # iterate through every instruction. starting from startOffset data = self.pe_data[startOffset:startOffset + length] - for instr in self.cs.disasm(data, startOffset): - self.printInstr(instr, 0) + for instr in cs.disasm(data, startOffset): + printInstr(instr, 0) if instr.mnemonic.lower() in ['ret']: return None @@ -124,7 +115,7 @@ class FunctionBackdoorer: full_shellcode_addr = shellcode_addr + self.superpe.pe.OPTIONAL_HEADER.ImageBase enc, count = self.ks.asm(f'MOV {reg}, 0x{full_shellcode_addr:X}') - for instr2 in self.cs.disasm(bytes(enc), 0): + for instr2 in cs.disasm(bytes(enc), 0): addrOffset = len(instr2.bytes) - instr2.addr_size break @@ -141,16 +132,7 @@ class FunctionBackdoorer: ]) trampoline_text = f'MOV {reg}, 0x{full_shellcode_addr:X} ; {jump}' - trampoline_compiled, count = self.ks.asm(trampoline_text) + trampoline_compiled, count = ks.asm(trampoline_text) return trampoline_compiled, trampoline_text, addrOffset - - - def printInstr(self, instr, depth=0): - _bytes = [f'{x:02x}' for x in instr.bytes[:8]] - if len(instr.bytes) < 8: - _bytes.extend([' ',] * (8 - len(instr.bytes))) - - instrBytes = ' '.join([f'{x}' for x in _bytes]) - logger.info('\t' * 1 + f'[{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}') - + \ No newline at end of file diff --git a/pe/pehelper.py b/pe/pehelper.py index 2b08479..7f11ca8 100644 --- a/pe/pehelper.py +++ b/pe/pehelper.py @@ -1,8 +1,4 @@ -import sys import pefile -import pprint -from keystone import Ks, KS_ARCH_X86, KS_MODE_64 -from capstone import Cs, CS_ARCH_X86, CS_MODE_64, CS_MODE_LITTLE_ENDIAN import logging from model.defs import * @@ -66,57 +62,6 @@ def get_code_section(pe: pefile.PE) -> pefile.SectionStructure: raise Exception("pehelper::get_code_section(): Code section not found") -# keystone/capstone stuff -cs = Cs(CS_ARCH_X86, CS_MODE_64 + CS_MODE_LITTLE_ENDIAN) - -def assemble_lea(current_address: int, destination_address: int, reg: str) -> bytes: - #print("LEAH: 0x{:X} - 0x{:X} = 0x{:X}".format( - # current_address, destination_address, destination_address - current_address)) - offset = destination_address - current_address - ks = Ks(KS_ARCH_X86, KS_MODE_64) - encoding, _ = ks.asm(f"lea {reg}, qword ptr ds:[{offset}]") - machine_code = bytes(encoding) - return machine_code - - -def assemble_relative_call(current_address: int, destination_address: int) -> bytes: - # Calculate the relative offset - # For a near jump, the instruction length is typically 5 bytes (E9 xx xx xx xx) - offset = destination_address - current_address - - # Assemble the jump instruction using Keystone - ks = Ks(KS_ARCH_X86, KS_MODE_64) - encoding, _ = ks.asm(f"call qword ptr ds:[{offset}]") - machine_code = bytes(encoding) - - # Disassemble the machine code using Capstone - #cs = Cs(CS_ARCH_X86, CS_MODE_64) - #disassembled = next(cs.disasm(machine_code, current_address)) - #logger.info(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}") - #logger.info(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}") - return machine_code - - -def assemble_relative_jmp(current_address: int, destination_address: int) -> bytes: - offset = destination_address - current_address - ks = Ks(KS_ARCH_X86, KS_MODE_64) - encoding, _ = ks.asm(f"jmp {offset}") - machine_code = bytes(encoding) - return machine_code - - -def asm_disasm(asm_text, offset=0): - for instr in cs.disasm(asm_text, offset): - printInstr(instr) - -def printInstr(instr, depth=0): - _bytes = [f'{x:02x}' for x in instr.bytes[:8]] - if len(instr.bytes) < 8: - _bytes.extend([' ',] * (8 - len(instr.bytes))) - instrBytes = ' '.join([f'{x}' for x in _bytes]) - logger.info('\t' * 1 + f' [{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}') - - ## Utils def remove_trailing_null_bytes(data: bytes) -> bytes: diff --git a/phases/injector.py b/phases/injector.py index 3b6dba4..bf777d3 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -11,6 +11,7 @@ from pe.derbackdoorer import FunctionBackdoorer from pe.superpe import SuperPe from model.project import Project from model.settings import Settings +from pe.asmdisasm import * logger = logging.getLogger("Injector")