From 9ed046988402d7458859cf846961731479c775c0 Mon Sep 17 00:00:00 2001 From: Dobin Rutishauser Date: Mon, 9 Jun 2025 10:10:02 +0200 Subject: [PATCH] refactor: rework logging --- app/views_project.py | 10 +++++----- helper.py | 14 +++++++------- model/injectable.py | 2 +- model/payload.py | 4 +++- model/project.py | 8 ++------ pe/derbackdoorer.py | 6 +++--- pe/pehelper.py | 2 +- phases/assembler.py | 10 +++++++--- phases/compiler.py | 11 +++++++---- phases/injector.py | 36 ++++++++++++++++++------------------ phases/templater.py | 33 +++++++++++++++++++++++++++++++-- supermega.py | 36 ++++++++++++------------------------ 12 files changed, 97 insertions(+), 75 deletions(-) diff --git a/app/views_project.py b/app/views_project.py index 20a2425..0851254 100644 --- a/app/views_project.py +++ b/app/views_project.py @@ -19,7 +19,7 @@ from app.storage import storage, WebProject from sender import scannerDetectsBytes from phases.injector import verify_injected_exe from phases.templater import get_template_names -from helper import run_process_checkret, run_exe +from helper import run_exe from model.project import prepare_project from pe.superpe import SuperPe import pe.dllresolver @@ -280,10 +280,10 @@ def start_project(project_name): if no_exec_arg == "true": no_exec = True - logger.info("--[ Exec project: {} remote: {} no_exec: {}".format(project_name, remote, no_exec)) + logger.info(" Exec project: {} remote: {} no_exec: {}".format(project_name, remote, no_exec)) if remote: - logger.info("--[ Exec {} on server {}".format(project.settings.inject_exe_out, config.get("avred_server"))) + logger.info(" Exec {} on server {}".format(project.settings.inject_exe_out, config.get("avred_server"))) with open(project.settings.inject_exe_out, "rb") as f: data = f.read() filename = os.path.basename(project.settings.inject_exe_out) @@ -301,13 +301,13 @@ def start_project(project_name): else: # Start/verify it at the end if project.settings.verify: - logger.info("--[ Verify infected exe") + logger.info(" Verify infected exe") exit_code = verify_injected_exe(project.settings.inject_exe_out) elif no_exec == False: run_exe(project.settings.inject_exe_out, dllfunc=project.settings.dllfunc, check=False) elif no_exec == True: dirname = os.path.dirname(os.path.abspath(project.settings.inject_exe_out)) - logger.info("--[ Open folder: {}".format(dirname)) + logger.info(" Open folder: {}".format(dirname)) subprocess.run(['explorer', dirname]) return redirect("/project/{}".format(project_name), code=302) diff --git a/helper.py b/helper.py index fcc3dd7..9d39008 100644 --- a/helper.py +++ b/helper.py @@ -37,8 +37,6 @@ def clean_tmp_files(): def clean_files(settings): - logger.info("--( Remove old files") - files_to_clean = [ # temporary files settings.main_c_path, @@ -56,8 +54,8 @@ def run_exe(exefile, dllfunc="", check=True): if exefile.endswith(".dll"): if dllfunc == "": dllfunc = "dllMain" - logger.info("---[ No DLL function specified, using default: {}".format(dllfunc)) - #raise Exception("---[ No DLL function specified") + logger.info(" No DLL function specified, using default: {}".format(dllfunc)) + #raise Exception(" No DLL function specified") args = [ "rundll32.exe", "{},{}".format(exefile, dllfunc) ] elif exefile.endswith(".exe"): args = [ exefile ] @@ -97,8 +95,10 @@ def run_process_checkret(args, check=True): # check return code (optional) if ret.returncode != 0 and check: - logger.info("----! FAILED Command: {}".format(" ".join(args))) - raise Exception("Command failed: " + " ".join(args)) + logger.error("----! FAILED Command: {}".format(" ".join(args))) + logger.warning("----! Stdout:\n {}".format(stdout_s)) + logger.warning("----! Stderr:\n {}".format(stderr_s)) + raise ChildProcessError("Command failed: " + " ".join(args)) # debug: show command output if config.ShowCommandOutput: @@ -108,7 +108,7 @@ def run_process_checkret(args, check=True): def try_start_shellcode(shc_file): - logger.info("--[ Blindly execute shellcode: {}".format(shc_file)) + logger.info(" Blindly execute shellcode: {}".format(shc_file)) subprocess.run([ config.get("path_runshc"), shc_file, diff --git a/model/injectable.py b/model/injectable.py index 9f69920..f0563e3 100644 --- a/model/injectable.py +++ b/model/injectable.py @@ -82,7 +82,7 @@ class Injectable(): # Data Reuse def add_datareuse_fixup(self, fixup: DataReuseEntry): - logger.debug("---( Add datareuse: {}".format(fixup.string_ref)) + logger.debug(" Add datareuse: {}".format(fixup.string_ref)) self.reusedata_fixups.append(fixup) def get_all_reusedata_fixups(self) -> List[DataReuseEntry]: diff --git a/model/payload.py b/model/payload.py index 682ea0c..ede6d73 100644 --- a/model/payload.py +++ b/model/payload.py @@ -13,12 +13,14 @@ class Payload(): def init(self) -> bool: - logging.info("-( Load payload: {}".format(self.payload_path)) + logging.info("-[ Payload: {}".format(self.payload_path)) if not os.path.exists(self.payload_path): logger.error("Payload file does not exist: {}".format(self.payload_path)) return False with open(self.payload_path, 'rb') as f: self.payload_data = f.read() + + logging.info(" Size: {} bytes".format(len(self.payload_data))) return True diff --git a/model/project.py b/model/project.py index d82649d..c9d7e70 100644 --- a/model/project.py +++ b/model/project.py @@ -40,6 +40,8 @@ def prepare_project(project_name, settings): src = "{}{}/".format(PATH_CARRIER, settings.carrier_name) dst = "{}{}/".format(PATH_WEB_PROJECT, project_name) + logger.info("-[ Cleanup project: {}".format(project_name)) + if not os.path.exists(dst): os.makedirs(dst) @@ -59,9 +61,3 @@ def prepare_project(project_name, settings): continue os.remove(dst + file) - - # copy *.c *.h files from src directory to dst directory - for file in os.listdir(src): - if file.endswith(".c") or file.endswith(".h"): - logger.info("--( Copy {} to {}".format(src + file, dst)) - shutil.copy2(src + file, dst) \ No newline at end of file diff --git a/pe/derbackdoorer.py b/pe/derbackdoorer.py index 04b6d6d..44642dd 100644 --- a/pe/derbackdoorer.py +++ b/pe/derbackdoorer.py @@ -32,14 +32,14 @@ class FunctionBackdoorer: def backdoor_function(self, function_addr: int, shellcode_addr: int, shellcode_len: int): - logger.debug("--[ Backdooring exe function at 0x{:X} with jump to carrier at 0x{:X}".format(function_addr, shellcode_addr)) + logger.debug(" Backdooring exe function at 0x{:X} with jump to carrier at 0x{:X}".format(function_addr, shellcode_addr)) addr = self.find_suitable_instruction_addr(function_addr) if addr is None: raise Exception("Couldn't find a suitable instruction to backdoor") compiled_trampoline = assemble_relative_jmp(addr, shellcode_addr) - logger.debug("---[ Backdoor Instruction at 0x{:X} (offset to shellcode: 0x{:X})".format(addr, shellcode_addr - addr)) + logger.debug(" Backdoor Instruction at 0x{:X} (offset to shellcode: 0x{:X})".format(addr, shellcode_addr - addr)) # Check for overlap it = IntervalTree() @@ -63,7 +63,7 @@ class FunctionBackdoorer: def find_suitable_instruction_addr(self, startOffset, length=256): """Find a instruction to backdoor. Recursively.""" - logger.debug("---[ find suitable instruction to hijack starting from 0x{:X} len:{} depthopt:{}".format( + logger.debug(" find suitable instruction to hijack starting from 0x{:X} len:{} depthopt:{}".format( startOffset, length, self.depth_option)) if self.depth_option == DEPTH_OPTIONS.LEVEL1: diff --git a/pe/pehelper.py b/pe/pehelper.py index 4510c9a..ab5b910 100644 --- a/pe/pehelper.py +++ b/pe/pehelper.py @@ -57,7 +57,7 @@ def extract_code_from_exe_file(exe_file: FilePath) -> bytes: section = get_code_section(pe) data: bytes = section.get_data() data = remove_trailing_null_bytes(data) - logger.debug("---[ Extract code section size: {} / {}".format( + logger.debug(" Extract code section size: {} / {}".format( len(data), section.Misc_VirtualSize)) pe.close() return data diff --git a/phases/assembler.py b/phases/assembler.py index 3471a4d..a4d8cd3 100644 --- a/phases/assembler.py +++ b/phases/assembler.py @@ -11,7 +11,8 @@ logger = logging.getLogger("Assembler") def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath) -> bytes: """Takes ASM source file asm_in, compiles it into build_exe, extracts its code section and write into shellcode_out""" - logger.info("-[ Assemble to exe: {} -> {}".format(asm_in, build_exe)) + logger.info("-[ Carrier: ASM to EXE".format()) + logger.info(" Carrier: {} -> {}".format(asm_in, build_exe)) run_process_checkret([ config.get("path_ml64"), asm_in, @@ -22,6 +23,9 @@ def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath) -> bytes: if not os.path.isfile(build_exe): raise Exception("Compiling failed") code = extract_code_from_exe_file(build_exe) + logging.info(" Carrier Size: {}".format( + len(code) + )) return code @@ -30,12 +34,12 @@ def encode_payload(payload: bytes, decoder_style: str) -> bytes: return bytes(payload) elif decoder_style == "xor_1": xor_key = config.xor_key - logger.info("---[ XOR payload with key 0x{:X}".format(xor_key)) + logger.debug(" XOR payload with key 0x{:X}".format(xor_key)) xored = bytes([byte ^ xor_key for byte in payload]) return bytes(xored) elif decoder_style == "xor_2": xor_key = config.xor_key2 - logger.info("---[ XOR2 payload with key {}".format(xor_key)) + logger.debug(" XOR2 payload with key {}".format(xor_key)) xored = bytearray(payload) for i in range(len(xored)): xored[i] ^= xor_key[i % 2] diff --git a/phases/compiler.py b/phases/compiler.py index 26964b1..875288a 100644 --- a/phases/compiler.py +++ b/phases/compiler.py @@ -22,7 +22,7 @@ def compile_dev( asm_out: FilePath, short_call_patching: bool = False, ): - logger.info("-( Compile C to ASM: {} -> {} ".format(c_in, asm_out)) + logger.info("-( Carrier: Compile C to ASM: {} -> {} ".format(c_in, asm_out)) # Compile C To Assembly (text) run_process_checkret([ @@ -39,8 +39,10 @@ def compile_dev( asm_text: str = file_readall_text(asm_out) observer.add_text_file("carrier_asm_orig", asm_text) - logger.info("---[ ASM masm_shc: {} ".format(asm_out)) - asm_text_lines: List[str] = parse_asm_text_file(Carrier(), asm_text) + logger.info(" ASM masm_shc: {} ".format(asm_out)) + settings = Settings() + injectable = Injectable("test.exe") # ??? + asm_text_lines: List[str] = parse_asm_text_file(injectable, asm_text, settings) asm_text = masm_shc(asm_text_lines) observer.add_text_file("carrier_asm_cleanup", asm_text) @@ -54,7 +56,8 @@ def compile( injectable: Injectable, settings: Settings, ): - logger.info("-[ Compile C to ASM: {} -> {} ".format(c_in, asm_out)) + logger.info("-[ Carrier: Compile C to ASM".format()) + logger.info(" Carrier: {} -> {} ".format(c_in, asm_out)) # Compile C To Assembly (text) run_process_checkret([ diff --git a/phases/injector.py b/phases/injector.py index 98b5f4c..ab697f8 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -114,7 +114,8 @@ class Injector(): exe_out = self.settings.inject_exe_out carrier_invoke_style: CarrierInvokeStyle = self.settings.carrier_invoke_style - logger.info("-[ Injecting into {} -> {}".format(exe_in, exe_out)) + logger.info("-[ Injecting Carrier".format()) + logger.info(" Injectable: {} -> {}".format(exe_in, exe_out)) # Patch IAT (if necessary and wanted) self.injectable_patch_iat() @@ -124,7 +125,7 @@ class Injector(): # Special case: DLL exported function direct overwrite if self.superpe.is_dll() and self.settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: - logger.warning("---[ Inject DLL: Overwrite exported function {} with shellcode".format(self.settings.dllfunc)) + logger.warning(" Inject DLL: Overwrite exported function {} with shellcode".format(self.settings.dllfunc)) rva = self.superpe.getExportEntryPoint(self.settings.dllfunc) # Size and sanity checks @@ -136,13 +137,13 @@ class Injector(): # Inject carrier_offset = self.superpe.get_offset_from_rva(rva) - logger.info(f'----[ Using DLL Export "{self.settings.dllfunc}" at RVA 0x{rva:X} offset 0x{carrier_offset:X} to overwrite') + logger.info(f'- Using DLL Export "{self.settings.dllfunc}" at RVA 0x{rva:X} offset 0x{carrier_offset:X} to overwrite') self.superpe.pe.set_bytes_at_offset(carrier_offset, self.carrier_shc) else: # EXE/DLL carrier_offset = self.superpe.get_offset_from_rva(self.carrier_rva) #logger.info("{} {}".format(self.carrier_rva, carrier_offset)) - logger.info("--( Inject: Write Carrier to 0x{:X} (0x{:X})".format( + logger.info(" Inject: Write Carrier to 0x{:X} (0x{:X})".format( self.carrier_rva, carrier_offset)) # Copy the carrier @@ -156,27 +157,27 @@ class Injector(): elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr: addr = self.superpe.getExportEntryPoint(self.settings.dllfunc) - logger.info("---( Inject DLL: Backdoor {} (0x{:X})".format( + logger.info(" Inject: Backdoor DLL {} (0x{:X})".format( self.settings.dllfunc, addr)) self.function_backdoorer.backdoor_function( addr, self.carrier_rva, carrier_shc_len) else: # EXE if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: - logger.info("--( Inject EXE: Change Entry Point to 0x{:X}".format( + logger.info(" Inject: Change Entry Point to 0x{:X}".format( self.carrier_rva)) self.superpe.set_entrypoint(self.carrier_rva) elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr: addr = self.superpe.get_entrypoint() - logger.info("--( Inject EXE: Backdoor function at entrypoint (0x{:X})".format( + logger.info(" Inject EXE: Backdoor function at entrypoint (0x{:X})".format( addr)) self.function_backdoorer.backdoor_function( addr, self.carrier_rva, carrier_shc_len) - logger.info("--( Fix imports and make carrier reference IAT") + logger.info(" Fix imports and make carrier reference IAT") self.injectable_write_iat_references() - logger.info("--( Insert and reference carrier data") + logger.info(" Insert and reference carrier data") self.inject_and_reference_data() # changes from console to UI (no console window) if necessary @@ -188,7 +189,7 @@ class Injector(): self.superpe.pe.OPTIONAL_HEADER.CheckSum = new_checksum # We done - logger.info("--( Write to file: {}".format(exe_out)) + logger.info("-[ Write to file: {}".format(exe_out)) self.superpe.write_pe_to_file(exe_out) # Log @@ -198,13 +199,13 @@ class Injector(): def injectable_patch_iat(self): - logger.info("--( Checking if IAT entries required by carrier are available") + logger.info(" Checking if IAT entries required by carrier are available") # Patch IAT (if necessary and wanted) for iatRequest in self.injectable.get_all_iat_requests(): # skip available addr = self.superpe.get_vaddr_of_iatentry(iatRequest.name) if addr != None: - logger.debug("---[ Request IAT {} is available at 0x{:X}".format( + logger.debug(" Request IAT {} is available at 0x{:X}".format( iatRequest.name, addr)) continue iat_name = self.superpe.get_replacement_iat_for("KERNEL32.dll", iatRequest.name) @@ -265,9 +266,8 @@ class Injector(): return # insert data - logger.info("--( DataReuseFixups: Inject the data") for datareuse_fixup in reusedata_fixups: - logger.debug(" Handling DataReuse Fixup: {} (.code: {})".format( + logger.debug(" Handling DataReuse Fixup: {} (.code: {})".format( datareuse_fixup.string_ref, datareuse_fixup.in_code)) if datareuse_fixup.in_code: # .text @@ -277,7 +277,7 @@ class Injector(): if payload_rva == None: raise Exception("DataReuseFixup: payload_rva is None") datareuse_fixup.addr = payload_rva + self.injectable.superpe.get_image_base() - logging.debug(" Add to .text at 0x{:X} ({}): {} with size {}".format( + logging.debug(" Add to .text at 0x{:X} ({}): {} with size {}".format( datareuse_fixup.addr, payload_rva, datareuse_fixup.string_ref, len(datareuse_fixup.data))) else: # .rdata @@ -294,11 +294,11 @@ class Injector(): self.superpe.pe.set_bytes_at_rva(data_rva, var_data) datareuse_fixup.addr = data_rva + self.injectable.superpe.get_image_base() ## - logging.debug(" Add to .rdata at 0x{:X} ({}): {}: {}".format( + logging.debug(" Add to .rdata at 0x{:X} ({}): {}: {}".format( datareuse_fixup.addr, data_rva, datareuse_fixup.string_ref, ui_string_decode(var_data))) # replace the placeholder in .text with a LEA instruction to the data we written above - logger.info("--( Datareusefixups: patch code to reference the data") + logger.info(" Datareusefixups: patch code to reference the data") code = self.superpe.get_code_section_data() for datareuse_fixup in reusedata_fixups: ref: DataReuseReference @@ -327,7 +327,7 @@ class Injector(): def verify_injected_exe(exefile: FilePath, dllfunc="") -> int: - logger.info("---[ Verify infected exe: {} ".format(exefile)) + logger.info(" Verify infected exe: {} ".format(exefile)) # remove indicator file pathlib.Path(VerifyFilename).unlink(missing_ok=True) diff --git a/phases/templater.py b/phases/templater.py index 63bd309..98559d7 100644 --- a/phases/templater.py +++ b/phases/templater.py @@ -23,10 +23,39 @@ def get_template_names() -> List[str]: def create_c_from_template(settings: Settings, payload_len: int): - logger.info("-[ Create C from template: {} -> {}".format( - PATH_DECODER, settings.main_c_path)) plugin_decoder = "" + src = "{}{}/".format(PATH_CARRIER, settings.carrier_name) + dst = "{}{}/".format(PATH_WEB_PROJECT, settings.project_name) + + logger.info("-[ Carrier create Template: {}".format( + settings.main_c_path)) + + # copy *.c *.h files from src directory to dst directory + for file in os.listdir(src): + if file.endswith(".c") or file.endswith(".h"): + logger.debug(" Copy {} to {}".format(src + file, dst)) + shutil.copy2(src + file, dst) + + logger.info(" Carrier: {}".format( + settings.carrier_name)) + logger.info(" Carrier: Code into: {}".format( + settings.payload_location.value)) + logger.info(" Carrier: Decoder: {}".format( + settings.decoder_style)) + logger.info(" Carrier: Invoker: {}".format( + settings.carrier_invoke_style.value)) + + logger.info(" Carrier AntiEmulation: {}".format( + settings.plugin_antiemulation) + ) + logger.info(" Carrier Guardrail: {}".format( + settings.plugin_guardrail) + ) + logger.info(" Carrier Decoy: {}".format( + settings.plugin_decoy) + ) + # Plugin: VirtualAlloc filepath_virtualprotect = PATH_VIRTUALPROTECT + "{}.c".format( settings.plugin_virtualprotect) diff --git a/supermega.py b/supermega.py index c04a81a..8d6c408 100644 --- a/supermega.py +++ b/supermega.py @@ -53,7 +53,6 @@ def main(): else: setup_logging(logging.INFO) - settings.try_start_final_infected_exe = args.start_injected settings.cleanup_files_on_start = not args.no_clean_at_start settings.cleanup_files_on_exit =not args.no_clean_at_exit @@ -146,18 +145,6 @@ def start_real(settings: Settings): if not project.injectable.superpe.is_64(): raise Exception("Binary is not 64bit: {}".format(project.settings.inject_exe_in)) - logger.info("--[ Config: {} {} {} {}".format( - project.settings.carrier_name, - settings.payload_location.value, - project.settings.decoder_style, - project.settings.carrier_invoke_style.value)) - - logger.info("--[ Plugins: AntiEmulation={} Decoy={} Guardrail={}".format( - project.settings.plugin_antiemulation, - project.settings.plugin_decoy, - project.settings.plugin_guardrail) - ) - # Tell user if they attempt to do something stupid sanity_checks(project.settings) @@ -184,11 +171,15 @@ def start_real(settings: Settings): # COMPILE: Carrier to .asm (C -> ASM) if settings.generate_asm_from_c: - phases.compiler.compile( - c_in = settings.main_c_path, - asm_out = settings.main_asm_path, - injectable = project.injectable, - settings = project.settings) + try: + phases.compiler.compile( + c_in = settings.main_c_path, + asm_out = settings.main_asm_path, + injectable = project.injectable, + settings = project.settings) + except ChildProcessError as e: + logger.error("Error compiling C to ASM: {}".format(e)) + return # we have the carrier-required IAT entries in carrier.iat_requests # CHECK if all are available in infectable, or abort (early check) @@ -204,9 +195,6 @@ def start_real(settings: Settings): asm_in = settings.main_asm_path, build_exe = settings.main_exe_path) observer.add_code_file("carrier_shc", carrier_shellcode) - logging.info("> Carrier Size: {} Payload Size: {}".format( - len(carrier_shellcode), len(project.payload.payload_data) - )) # INJECT loader into an exe and do IAT & data references. Big task. injector = phases.injector.Injector( @@ -228,7 +216,7 @@ def start_real(settings: Settings): else: # Support automated verification (dev) if settings.verify: - logger.info("--[ Verify infected exe") + logger.info(" Verify infected exe") payload_exit_code = phases.injector.verify_injected_exe( settings.inject_exe_out, dllfunc=settings.dllfunc) @@ -239,7 +227,7 @@ def start_real(settings: Settings): def obfuscate_shc_loader(file_shc_in, file_shc_out): - logger.info("--[ Obfuscate shellcode with SGN") + logger.info(" Obfuscate shellcode with SGN") run_process_checkret([ config.get("path_sgn"), "--arch=64", @@ -255,7 +243,7 @@ def obfuscate_shc_loader(file_shc_in, file_shc_out): def verify_shellcode(shc_name): - logger.info("---[ Verify shellcode: {}".format(shc_name)) + logger.info(" Verify shellcode: {}".format(shc_name)) # check if directory exists if not os.path.exists(os.path.dirname(VerifyFilename)):