From 919aca0d70e6870b9c69d8cac5d850485acc604d Mon Sep 17 00:00:00 2001 From: Dobin Rutishauser Date: Sat, 25 Jan 2025 18:07:24 +0100 Subject: [PATCH] refactor: fix some things shown by newer VS syntax checker, mostly type related --- app/views_project.py | 11 +++++----- config.py | 2 +- helper.py | 2 +- model/defs.py | 2 +- model/settings.py | 48 +++++++++++++++++++++---------------------- pe/dllresolver.py | 2 +- pe/pehelper.py | 4 ++-- pe/superpe.py | 34 ++++++++++++++++-------------- phases/injector.py | 12 ++++++----- supermega.py | 14 ++++++------- tests/test_superpe.py | 8 +++++--- 11 files changed, 73 insertions(+), 66 deletions(-) diff --git a/app/views_project.py b/app/views_project.py index bb32c09..20a2425 100644 --- a/app/views_project.py +++ b/app/views_project.py @@ -81,8 +81,9 @@ def project(name): if superpe.is_dll(): exports = superpe.get_exports_full() code_sect_size = superpe.get_code_section().Misc_VirtualSize - if superpe.get_section_by_name(".rdata") != None: - data_sect_size = superpe.get_section_by_name(".rdata").virt_size + rdata_section = superpe.get_section_by_name(".rdata") + if rdata_section != None: + data_sect_size = rdata_section.virt_size else: logger.warning("No .rdata section found in {}".format(project.settings.inject_exe_in)) @@ -178,8 +179,8 @@ def add_project(): if storage.get_project(project_name) == None: # Default values for web create settings.init_payload_injectable( - "messagebox.bin", - "data/binary/exes/procexp64.exe", + FilePath("messagebox.bin"), + FilePath("data/binary/exes/procexp64.exe"), "" ) settings.decoder_style = "xor_2" @@ -347,7 +348,7 @@ def get_logfiles(directory): elif '.log' in file: data = conv.convert(data, full=False) else: - data = escape(data) + data = data entry = { "name": file, diff --git a/config.py b/config.py index 7d3b30a..ffb6634 100644 --- a/config.py +++ b/config.py @@ -63,7 +63,7 @@ class Config(object): def getConfig(self): return self.data - def get(self, value): + def get(self, value) -> str: return self.data.get(value, "") config = Config() \ No newline at end of file diff --git a/helper.py b/helper.py index 9ac3636..af16f0f 100644 --- a/helper.py +++ b/helper.py @@ -110,7 +110,7 @@ def run_process_checkret(args, check=True): def try_start_shellcode(shc_file): logger.info("--[ Blindly execute shellcode: {}".format(shc_file)) subprocess.run([ - config.get["path_runshc"], + config.get("path_runshc"), shc_file, ]) diff --git a/model/defs.py b/model/defs.py index 2ec80f8..5516d6a 100644 --- a/model/defs.py +++ b/model/defs.py @@ -5,7 +5,7 @@ class FilePath(str): pass # with data/shellcodes/createfile.bin -VerifyFilename: FilePath = r'C:\Temp\a' +VerifyFilename: FilePath = FilePath("C:\\Temp\\a") # Directory structure PATH_EXES = "data/binary/exes/" diff --git a/model/settings.py b/model/settings.py index b7b4856..cd76ba8 100644 --- a/model/settings.py +++ b/model/settings.py @@ -6,20 +6,20 @@ logger = logging.getLogger("Views") class Settings(): def __init__(self, project_name: str = "default"): - self.project_name = project_name - self.payload_path: FilePath = "" + self.project_name: str = project_name + self.payload_path: FilePath = FilePath("") # Settings self.carrier_name: str = "" self.decoder_style: str = "xor_2" self.short_call_patching: bool = False - self.plugin_antiemulation = "none" - self.plugin_decoy = "none" - self.plugin_guardrail = "none" - self.plugin_guardrail_data = "C:\\\\Users\\\\hacker" - self.plugin_virtualprotect = "standard" - self.plugin_virtualprotect_data = "" + self.plugin_antiemulation: str = "none" + self.plugin_decoy: str = "none" + self.plugin_guardrail: str = "none" + self.plugin_guardrail_data: str = "C:\\\\Users\\\\hacker" + self.plugin_virtualprotect: str = "standard" + self.plugin_virtualprotect_data: str = "" self.dllfunc: str = "" # For DLL injection @@ -29,34 +29,34 @@ class Settings(): # Injectable self.carrier_invoke_style: CarrierInvokeStyle = CarrierInvokeStyle.BackdoorCallInstr - self.inject_exe_in: FilePath = "" - self.inject_exe_out: FilePath = "" + self.inject_exe_in: FilePath = FilePath("") + self.inject_exe_out: FilePath = FilePath("") # Debug - self.show_command_output = False + self.show_command_output: bool = False self.verify: bool = False self.try_start_final_infected_exe: bool = False self.cleanup_files_on_start: bool = True self.cleanup_files_on_exit: bool = True self.generate_asm_from_c: bool = True - self.generate_shc_from_asm: bool = True # More self.fix_missing_iat = True self.patch_show_window = True - self.payload_location = PayloadLocation.DATA + self.payload_location: PayloadLocation = PayloadLocation.DATA # directories and filenames - self.main_dir = "{}{}/".format(PATH_WEB_PROJECT, self.project_name) - self.main_c_path = self.main_dir + "main.c" - self.main_asm_path = self.main_dir + "main.asm" - self.main_exe_path = self.main_dir + "main.exe" - self.main_shc_path = self.main_dir + "main.bin" - self.inject_exe_out = "{}{}".format( - self.main_dir, os.path.basename(self.inject_exe_in).replace(".exe", ".infected.exe")) + self.main_dir: FilePath = FilePath("{}{}/".format(PATH_WEB_PROJECT, self.project_name)) + self.main_c_path: FilePath = FilePath(self.main_dir + "main.c") + self.main_asm_path: FilePath = FilePath(self.main_dir + "main.asm") + self.main_exe_path: FilePath = FilePath(self.main_dir + "main.exe") + self.main_shc_path: FilePath = FilePath(self.main_dir + "main.bin") + self.inject_exe_out: FilePath = FilePath("{}{}".format( + self.main_dir, os.path.basename(self.inject_exe_in).replace(".exe", ".infected.exe"))) - def init_payload_injectable(self, shellcode, injectable, dll_func): - self.payload_path = PATH_SHELLCODES + shellcode + + def init_payload_injectable(self, shellcode: FilePath, injectable: FilePath, dll_func: str ): + self.payload_path = FilePath(PATH_SHELLCODES + shellcode) if shellcode == "createfile.bin": self.verify = True self.try_start_final_infected_exe = False @@ -64,9 +64,9 @@ class Settings(): self.cleanup_files_on_exit = False self.inject_exe_in = injectable - self.inject_exe_out = "{}{}".format( + self.inject_exe_out = FilePath("{}{}".format( self.main_dir, os.path.basename(self.inject_exe_in).replace(".exe", ".infected.exe") - ) + )) self.dllfunc = dll_func \ No newline at end of file diff --git a/pe/dllresolver.py b/pe/dllresolver.py index 2909d62..abadd7f 100644 --- a/pe/dllresolver.py +++ b/pe/dllresolver.py @@ -62,4 +62,4 @@ def search_for_dll(dll_name) -> str: full_path = os.path.join(path, dll_name) if os.path.exists(full_path): return full_path - return None + return "" diff --git a/pe/pehelper.py b/pe/pehelper.py index ff8df78..4510c9a 100644 --- a/pe/pehelper.py +++ b/pe/pehelper.py @@ -43,13 +43,13 @@ def extract_code_from_exe_file_ep(exe_file: FilePath, len: int) -> bytes: return data -def get_physical_address_tmp(pe, virtual_address): +def get_physical_address_tmp(pe, virtual_address) -> int: for section in pe.sections: if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize: virtual_offset = virtual_address - section.VirtualAddress physical_address = section.PointerToRawData + virtual_offset return physical_address - return None + raise Exception("pehelper::get_physical_address(): Address not found") def extract_code_from_exe_file(exe_file: FilePath) -> bytes: diff --git a/pe/superpe.py b/pe/superpe.py index f898530..1448260 100644 --- a/pe/superpe.py +++ b/pe/superpe.py @@ -3,6 +3,7 @@ import capstone import logging from typing import List, Dict import random +from typing import Optional from model.defs import * from model.rangemanager import RangeManager @@ -40,7 +41,7 @@ class SuperPe(): for section in self.pe.sections: self.pe_sections.append(PeSection(section)) - self.iat_entries: Dict[str, IatEntry] = {} + self.iat_entries: Dict[str, List[IatEntry]] = {} self.init_iat_entries() @@ -121,7 +122,7 @@ class SuperPe(): return None - def get_section_by_name(self, name: str) -> PeSection: + def get_section_by_name(self, name: str) -> Optional[PeSection]: for section in self.pe_sections: if section.name == name: return section @@ -165,7 +166,7 @@ class SuperPe(): return base_relocs - def getExportEntryPoint(self, exportName: str): + def getExportEntryPoint(self, exportName: str) -> int: dec = lambda x: '???' if x is None else x.decode() d = [pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] #self.pe.parse_data_directories(directories=d) @@ -174,7 +175,7 @@ class SuperPe(): raise Exception('No DLL exports found!') exports = [(e.ordinal, dec(e.name)) for e in self.pe.DIRECTORY_ENTRY_EXPORT.symbols] - chosen_export = None + chosen_export = [] for export in exports: if export[1].lower() == exportName.lower(): chosen_export = export @@ -183,8 +184,9 @@ class SuperPe(): name = chosen_export[1] for exp in self.pe.DIRECTORY_ENTRY_EXPORT.symbols: if exp.name.decode() == name: - addr = exp.address - return addr + return exp.address + raise Exception("Cant find entry point for export {}".format(exportName)) + def get_exports(self) -> List[str]: @@ -229,12 +231,12 @@ class SuperPe(): return res - def get_size_of_exported_function(self, dllfunc): + def get_size_of_exported_function(self, dllfunc) -> int: exports = self.get_exports_full() for exp in exports: if exp["name"] == dllfunc: return exp["size"] - return None + return 0 ## IAT @@ -245,14 +247,14 @@ class SuperPe(): for entry in iat[dll_name]: if entry.func_name == func_name: return entry.iat_vaddr - return None + raise Exception(f"Function {func_name} not found in IAT") - def get_iat_entries(self) -> Dict[str, IatEntry]: + def get_iat_entries(self) -> Dict[str, List[IatEntry]]: return self.iat_entries - def make_iat_entries(self) -> Dict[str, IatEntry]: + def make_iat_entries(self): iat = {} for entry in self.pe.DIRECTORY_ENTRY_IMPORT: for imp in entry.imports: @@ -280,7 +282,7 @@ class SuperPe(): possible.append(entry.func_name) if len(possible) == 0: - return None + raise Exception("No alternatives found for function name") else: # Hope there wont be many collisions return random.choice(possible) @@ -301,7 +303,7 @@ class SuperPe(): if funcname == encoded_funcname: return imp.name_offset break - return None + raise Exception(f"Import {func_name} not found.") def patch_iat_entry(self, dll_name: str, func_name: str, new_func_name: str): @@ -325,6 +327,8 @@ class SuperPe(): ## .rdata manager def get_rdata_rangemanager(self) -> RangeManager: section = self.get_section_by_name(".rdata") + if section == None: + raise Exception("No .rdata section found in PE file") relocs = self.get_relocations_for_section(".rdata") rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size) for reloc in relocs: @@ -347,8 +351,8 @@ class SuperPe(): def get_relocations_for_section(self, section_name: str) -> List[PeRelocEntry]: - section: PeSection = self.get_section_by_name(section_name) ret = [] + section = self.get_section_by_name(section_name) if section is None: return ret for reloc in self.get_base_relocs(): @@ -396,7 +400,7 @@ class SuperPe(): # Add the difference to the section's pointer to raw data physical_address = section.PointerToRawData + virtual_offset return physical_address - return None + raise Exception("Cant translate VA to offset") def write_pe_to_file(self, outfile: str): diff --git a/phases/injector.py b/phases/injector.py index c3c7d2e..70959ac 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -42,8 +42,8 @@ class Injector(): self.rdata_manager = self.superpe.get_rdata_rangemanager() self.code_manager = self.superpe.get_code_rangemanager() - self.payload_rva = None - self.carrier_rva = None + self.payload_rva: int = 0 + self.carrier_rva: int = 0 self.init_addresses() @@ -100,7 +100,11 @@ class Injector(): raise Exception('No hole found in code section to fit payload!') largest_gap_size = largest_gap[0][1] - largest_gap[0][0] offset = largest_gap[0][0] - self.payload_rva = self.superpe.get_section_by_name(".rdata").virt_addr + offset + + rdata_section = self.superpe.get_section_by_name(".rdata") + if rdata_section == None: + raise Exception("No .rdata section found in PE file") + self.payload_rva = rdata_section.virt_addr + offset self.rdata_manager.add_range(offset, offset+len(self.payload.payload_data)) ## Inject @@ -137,8 +141,6 @@ class Injector(): else: # EXE/DLL carrier_offset = self.superpe.get_offset_from_rva(self.carrier_rva) - if carrier_offset == None: - raise Exception("Carrier Offset is None, invalid carrier RVA? 0x{:X}".format(self.carrier_rva)) #logger.info("{} {}".format(self.carrier_rva, carrier_offset)) logger.info("--[ Inject: Write Carrier to 0x{:X} (0x{:X})".format( self.carrier_rva, carrier_offset)) diff --git a/supermega.py b/supermega.py index 0e3ac78..d9d5beb 100644 --- a/supermega.py +++ b/supermega.py @@ -80,10 +80,10 @@ def main(): logger.info("Could not find: {}".format(args.inject)) return settings.inject_exe_in = args.inject - settings.inject_exe_out = "{}{}".format( + settings.inject_exe_out = FilePath("{}{}".format( settings.main_dir, os.path.basename(args.inject).replace(".exe", ".injected.exe") - ) + )) settings.inject_exe_out = args.inject.replace(".exe", ".infected.exe").replace(".dll", ".infected.dll") write_webproject("default", settings) @@ -200,12 +200,10 @@ def start_real(settings: Settings): raise Exception("IAT entry not found: {}".format(", ".join(functions))) # ASSEMBLE: Assemble .asm to .shc (ASM -> SHC) - if settings.generate_shc_from_asm: - carrier_shellcode: bytes = phases.assembler.asm_to_shellcode( - asm_in = settings.main_asm_path, - build_exe = settings.main_exe_path) - observer.add_code_file("carrier_shc", carrier_shellcode) - + carrier_shellcode: bytes = phases.assembler.asm_to_shellcode( + asm_in = settings.main_asm_path, + build_exe = settings.main_exe_path) + observer.add_code_file("carrier_shc", carrier_shellcode) logging.info("> Carrier Size: {} Payload Size: {}".format( len(carrier_shellcode), len(project.payload.payload_data) )) diff --git a/tests/test_superpe.py b/tests/test_superpe.py index 87979fc..8b2ecec 100644 --- a/tests/test_superpe.py +++ b/tests/test_superpe.py @@ -29,7 +29,8 @@ class SuperPeTest(unittest.TestCase): self.assertEqual(code_sect.Misc_VirtualSize, 0x11B0CE) # Text Section 2 (PeSection) - code_pesect: PeSection = superpe.get_section_by_name(".text") + code_pesect = superpe.get_section_by_name(".text") + self.assertIsNotNone(code_pesect) self.assertEqual(code_pesect.name, ".text") self.assertEqual(code_pesect.virt_addr, 0x1000) self.assertEqual(code_pesect.virt_size, 0x11B0CE) @@ -43,7 +44,7 @@ class SuperPeTest(unittest.TestCase): self.assertEqual(base_reloc.offset, 0x618) # IAT - iat_entries: Dict[str, IatEntry] = superpe.get_iat_entries() + iat_entries: Dict[str, List[IatEntry]] = superpe.get_iat_entries() self.assertEqual(len(iat_entries), 24) self.assertTrue("kernel32.dll" in iat_entries) self.assertTrue("uxtheme.dll" in iat_entries) @@ -89,6 +90,7 @@ class SuperPeTest(unittest.TestCase): # Text Section 2 (PeSection) code_pesect: PeSection = superpe.get_section_by_name(".text") + self.assertIsNotNone(code_pesect) self.assertEqual(code_pesect.name, ".text") self.assertEqual(code_pesect.virt_addr, 0x1000) self.assertEqual(code_pesect.virt_size, 0x12D08) @@ -102,7 +104,7 @@ class SuperPeTest(unittest.TestCase): self.assertEqual(base_reloc.offset, 0xCE8) # IAT - iat_entries: Dict[str, IatEntry] = superpe.get_iat_entries() + iat_entries: Dict[str, List[IatEntry]] = superpe.get_iat_entries() self.assertEqual(len(iat_entries), 2) self.assertTrue("kernel32.dll" in iat_entries) self.assertTrue("msvcrt.dll" in iat_entries)