From c82c99e0ebefebaaa784c84eb5864b617f686b1e Mon Sep 17 00:00:00 2001 From: Dobin Date: Sun, 28 Apr 2024 17:56:12 +0100 Subject: [PATCH] refactor: all asm text parsing into asmparser.py --- model/carrier.py | 24 ++++-- phases/asmparser.py | 168 ++++++++++++++++++++++++++++++++++++++++ phases/compiler.py | 135 ++------------------------------ phases/datareuse.py | 103 ------------------------ tests/test_asm.py | 9 ++- tests/test_datareuse.py | 34 ++++---- 6 files changed, 218 insertions(+), 255 deletions(-) create mode 100644 phases/asmparser.py delete mode 100644 phases/datareuse.py diff --git a/model/carrier.py b/model/carrier.py index c109fa1..7539610 100644 --- a/model/carrier.py +++ b/model/carrier.py @@ -12,10 +12,11 @@ class IatRequest(): class DataReuseEntry(): - def __init__(self, string_ref: str, register: str, randbytes: bytes): - self.string_ref = string_ref - self.register = register - self.randbytes = randbytes + def __init__(self, string_ref: str): + self.string_ref = string_ref # "$SG72513" + + self.register = "" # "rcx" + self.randbytes = b"" # placeholder self.data = b'' self.addr = 0 @@ -30,6 +31,8 @@ class Carrier(): pass + # IAT + def add_iat_request(self, func_name: str, placeholder: bytes): self.iat_requests.append(IatRequest(func_name, placeholder)) @@ -37,8 +40,17 @@ class Carrier(): return self.iat_requests - def set_datareuse_fixups(self, fixups: List[DataReuseEntry]): - self.reusedata_fixups = fixups + # Data Reuse + + def add_datareuse_fixup(self, fixup: DataReuseEntry): + self.reusedata_fixups.append(fixup) def get_all_reusedata_fixups(self) -> List[DataReuseEntry]: return self.reusedata_fixups + + def get_all_reusedata_fixup(self, string_ref) -> DataReuseEntry: + for entry in self.reusedata_fixups: + if entry.string_ref == string_ref: + return entry + return None + \ No newline at end of file diff --git a/phases/asmparser.py b/phases/asmparser.py new file mode 100644 index 0000000..6dd3e20 --- /dev/null +++ b/phases/asmparser.py @@ -0,0 +1,168 @@ +import os +from typing import List, Dict + +from helper import * +from model import * +from model.carrier import Carrier, DataReuseEntry, IatRequest + +logger = logging.getLogger("AsmParser") + + +def parse_asm_file(carrier, filename): + lines_out = [] + with open(filename, 'r', encoding='utf-8') as asmfile: + lines = asmfile.readlines() + + current_segment = None + current_datareuse_entry= None + line_idx = -1 + for line in lines: + line = line.rstrip() + line_idx += 1 + tokens = line.split() + + # skip irrelevant + #if not tokens: + # lines_out.append(line) + # continue + if len(tokens) <= 1: + lines_out.append(line) + continue + + # TRACK in which segment we currently are + if tokens[1] == "SEGMENT": + current_segment = tokens[0] + lines_out.append(line) + continue + + # PATCH SHORT + if "jmp\tSHORT" in line: + updated_line = line.replace("SHORT", "") + lines_out.append(updated_line) + continue + + # REMOVE EXTRN, we dont need it + ## EXTRN __imp_GetEnvironmentVariableW:PROC + ## to + ## ; EXTRN __imp_GetEnvironmentVariableW:PROC + if tokens[0] == "EXTRN": + updated_line = "; " + line + "; Removed" + lines_out.append(updated_line) + continue + + # PATCH external shellcode reference + ## mov rdi, QWORD PTR supermega_payload + ## to + ## lea rdi, [shcstart] ; get payload shellcode address + if "supermega_payload" in line: + updated_line = line + updated_line = updated_line.replace( + "mov ", + "lea " + ) + updated_line = updated_line.replace( + "QWORD PTR supermega_payload", + "[shcstart] ; get payload shellcode address" + ) + lines_out.append(updated_line) + continue + + # ADD label at end of code + # we cant reliably identify in which function, so we just add it at the end + ## get_time_raw ENDP + ## <---- add here + ## _TEXT ENDS + ## END + if line_idx > len(lines) - 5 and tokens[1] == "ENDP": + lines_out.append(line) + lines_out.append("shcstart: ; start of payload shellcode") + continue + + # COLLECT AND PATCH all functions that need to be resolved in loader shellcode + # we replace the function call invocation with a random byte sequence + ## call QWORD PTR __imp_GetEnvironmentVariableW + ## to + ## DB 07cH, 04cH, 028H, 0b0H, 006H, 07eH ; IAT Reuse for GetEnvironmentVariableW + if "QWORD PTR __imp_" in line: + # just the function name, without __imp_ + func_name = line[line.find("__imp_")+6:].rstrip() + randbytes: bytes = os.urandom(6) # exact size or the result + carrier.add_iat_request(func_name, randbytes) + new_line = bytes_to_asm_db(randbytes) + " ; IAT Reuse for {}".format(func_name) + lines_out.append(new_line) + continue + + # COLLECT data strings + # these are usually multi-line, and at the beginning of the file + # $SG72513 DB 'U', 00H, 'S', 00H, 'E', 00H, 'R', 00H, 'P', 00H, 'R', 00H + # DB 'O', 00H, 'F', 00H, 'I', 00H, 'L', 00H, 'E', 00H, 00H, 00H + if line.startswith("$SG"): + # fuck me. if we start a new definition, and have an old one, add the old one... + if current_datareuse_entry != None: + carrier.add_datareuse_fixup(current_datareuse_entry) + current_datareuse_entry = None # reset it here + + var_name = tokens[0] + data = convert_asm_db_to_bytes(line[line.index("DB"):]) + current_datareuse_entry = DataReuseEntry(var_name) + current_datareuse_entry.data = data + lines_out.append("; " + line) + continue + if line.startswith("\tDB"): + if current_datareuse_entry == None: + raise("Found DB without $SG, corrupted asm file?") + current_datareuse_entry.data += convert_asm_db_to_bytes(line) + lines_out.append("; " + line) + continue + if current_datareuse_entry != None: + # when we reach here, $SG with its DB should be done. + carrier.add_datareuse_fixup(current_datareuse_entry) + current_datareuse_entry = None # reset it here + + # PATCH data reuse code (data from C) + # put $SGxxxxxx into .rdata section + ## lea rcx, OFFSET FLAT:$SG72751 + ## to + ## DB 07cH, 04cH, 028H, 0b0H, 006H, 07eH ; IAT Reuse for GetEnvironmentVariableW + if "OFFSET FLAT:$SG" in line: + string_ref = line.split("OFFSET FLAT:")[1] + register = line.split("lea\t")[1].split(",")[0] + randbytes: bytes = os.urandom(7) + + datareuse_fixup = carrier.get_all_reusedata_fixup(string_ref) + if datareuse_fixup == None: + raise("Data reuse entry not found: {}".format(string_ref)) + + datareuse_fixup.register = register + datareuse_fixup.randbytes = randbytes + + line = bytes_to_asm_db(randbytes) + " ; .rdata Reuse for {} ({})".format( + string_ref, register) + lines_out.append(line) + continue + + lines_out.append(line) + + with open(filename, "w") as f: + for line in lines_out: + f.write(line + "\n") + + +def convert_asm_db_to_bytes(line: str) -> bytes: + value = b'' + parts = line.split() + for part in parts: + if part.startswith('\''): + value += str.encode(part.split('\'')[1]) + elif part.endswith('H') or part.endswith('H,'): + hex = part.split('H')[0] + value += bytes.fromhex(hex) + return value + + +def bytes_to_asm_db(byte_data: bytes) -> bytes: + # Convert each byte to a string in hexadecimal format + # prefixed with '0' and suffixed with 'h' + hex_values = [f"0{byte:02x}H" for byte in byte_data] + formatted_string = ', '.join(hex_values) + return "\tDB " + formatted_string diff --git a/phases/compiler.py b/phases/compiler.py index 4bcc18e..e8c231e 100644 --- a/phases/compiler.py +++ b/phases/compiler.py @@ -9,12 +9,13 @@ from config import config from observer import observer from model import * from phases.masmshc import process_file, Params -from phases.datareuse import * from model.carrier import Carrier from model.exehost import ExeHost +from phases.asmparser import parse_asm_file logger = logging.getLogger("Compiler") + # NOTE: Mostly copy-pasted from compiler.py::compile() def compile_dev( c_in: FilePath, @@ -82,22 +83,10 @@ def compile( file_to_lf(asm_out) observer.add_text_file("carrier_asm_orig", file_readall_text(asm_out)) - # DataReuse first - asmFileParser = ReusedataAsmFileParser(asm_out) - asmFileParser.init() - asmFileParser.process() - carrier.set_datareuse_fixups(asmFileParser.get_reusedata_fixups()) - asmFileParser.write_lines_to(asm_out) + # Fixup assembly file + parse_asm_file(carrier, asm_out) - # Assembly text fixup (SuperMega) - logger.info("---[ ASM Fixup : {} ".format(asm_out)) - if not fixup_asm_file(asm_out, payload_len, short_call_patching=short_call_patching): - raise Exception("Error: Fixup failed") - - if config.debug: - observer.add_text_file("carrier_asm_fixup", file_readall_text(asm_out)) - - # Assembly cleanup (masm_shc) + # Cleanup assembly file asm_clean_file = asm_out + ".clean" logger.info("---[ ASM masm_shc: {} ".format(asm_out)) params = Params(asm_out, asm_clean_file, @@ -105,122 +94,12 @@ def compile( remove_crt=True, append_rsp_stub=True) # required atm process_file(params) - if not os.path.isfile(asm_clean_file): raise Exception("Error: Cleaned up ASM file {} was not created".format( asm_clean_file )) - - if source_style == FunctionInvokeStyle.iat_reuse: - fixup_iat_reuse(asm_clean_file, carrier) - observer.add_text_file("carrier_asm_updated", file_readall_text(asm_clean_file)) - - if not exe_host.has_all_carrier_functions(carrier): - logger.error("Error: Not all carrier functions are available in the target exe") - return - # Move to destination we expect shutil.move(asm_clean_file, asm_out) - if config.debug: - observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out)) - -def bytes_to_asm_db(byte_data: bytes) -> bytes: - # Convert each byte to a string in hexadecimal format - # prefixed with '0' and suffixed with 'h' - hex_values = [f"0{byte:02x}H" for byte in byte_data] - formatted_string = ', '.join(hex_values) - return "\tDB " + formatted_string - - -def fixup_asm_file(filename: FilePath, payload_len: int, short_call_patching: bool = False): - with open(filename, 'r') as asmfile: # None = translate to \n - lines = asmfile.readlines() - - # When it breaks, enable this - if short_call_patching: - for idx, line in enumerate(lines): - if "jmp\tSHORT" in lines[idx]: - lines[idx] = lines[idx].replace("SHORT", "") - - for idx, line in enumerate(lines): - # Remove EXTRN, we dont need it - # Even tho it is part of IAT_REUSE process (see fixup_iat_reuse()) - if "EXTRN __imp_" in lines[idx]: - lines[idx] = "; " + lines[idx] - - # replace external reference with shellcode reference - for idx, line in enumerate(lines): - if "supermega_payload" in lines[idx]: - logger.info(" > Replace external reference at line: {}".format(idx)) - #lines[idx] = lines[idx].replace( - # "mov r8, QWORD PTR supermega_payload", - # "lea r8, [shcstart]" - #) - # better keep register (hack) - lines[idx] = lines[idx].replace( - "mov ", - "lea " - ) - lines[idx] = lines[idx].replace( - "QWORD PTR supermega_payload", - "[shcstart] ; get payload shellcode address" - ) - - # add label at end of code - for idx, line in enumerate(lines): - if lines[idx].startswith("END"): - logger.info(" > Add end of code label at line: {}".format(idx)) - lines.insert(idx-1, "shcstart: ; start of payload shellcode\n") - break - - with open(filename, 'w',) as asmfile: # write back with CRLF - #for line in lines: - # asmfile.write(line + "\n") - asmfile.writelines(lines) - - return True - - -def get_function_stubs(asm_in: FilePath) -> List[str]: - functions = [] - - with open(asm_in, 'r', encoding='utf-8') as asmfile: - lines = asmfile.readlines() - - # EXTRN __imp_GetEnvironmentVariableW:PROC - for line in lines: - if "QWORD PTR __imp_" in line: - a = line - a = a.split("__imp_")[1] - func_name = a.strip("\r\n") - print(" > loader shellcode IAT requirement: {}".format(func_name)) - functions.append(func_name) - - return functions - - -def fixup_iat_reuse(filename: FilePath, carrier: Carrier): - with open(filename, 'r', encoding='utf-8') as asmfile: - lines = asmfile.readlines() - - # do IAT reuse - for idx, line in enumerate(lines): - # Fix call - # call QWORD PTR __imp_GetEnvironmentVariableW - if "call" in lines[idx] and "__imp_" in lines[idx]: - func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip() - - randbytes: bytes = os.urandom(6) - lines[idx] = bytes_to_asm_db(randbytes) + " ; IAT Reuse for {}".format(func_name) - lines[idx] += "\n" - carrier.add_iat_request(func_name, randbytes) - - logger.info(" > Replace func name: {} with {}".format( - func_name, randbytes.hex())) - - with open(filename, 'w') as asmfile: - asmfile.writelines(lines) - - if config.debug: - observer.add_text_file("carrier_asm_iat_patch", file_readall_text(filename)) + # Log result + observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out)) diff --git a/phases/datareuse.py b/phases/datareuse.py deleted file mode 100644 index e324553..0000000 --- a/phases/datareuse.py +++ /dev/null @@ -1,103 +0,0 @@ -import sys -import pefile -from intervaltree import Interval, IntervalTree -from typing import List, Dict -import os - -from model.carrier import DataReuseEntry - - -def bytes_to_asm_db(byte_data: bytes) -> bytes: - # Convert each byte to a string in hexadecimal format - # prefixed with '0' and suffixed with 'h' - hex_values = [f"0{byte:02x}H" for byte in byte_data] - formatted_string = ', '.join(hex_values) - return "\tDB " + formatted_string - - -class ReusedataAsmFileParser(): - def __init__(self, filepath): - self.filepath = filepath - self.lines = [] - self.fixups: Dict[str, DataReuseEntry] = {} - - - def get_reusedata_fixups(self) -> List[DataReuseEntry]: - return list(self.fixups.values()) - - - def init(self): - with open(self.filepath, "r") as f: - self.lines = f.readlines() - self.lines = [line.rstrip() for line in self.lines] - - - def process(self): - self.fixup_data_reuse_code() - self.fixup_data_reuse_data() - - - def fixup_data_reuse_code(self): - fixups = [] - # lea rcx, OFFSET FLAT:$SG72513 - for idx, line in enumerate(self.lines): - if "OFFSET FLAT:$SG" in line: - string_ref = line.split("OFFSET FLAT:")[1] - register = line.split("lea\t")[1].split(",")[0] - randbytes: bytes = os.urandom(7) # lea is 7 bytes - self.fixups[string_ref] = DataReuseEntry(string_ref, register, randbytes) - self.lines[idx] = bytes_to_asm_db(randbytes) + " ; .rdata Reuse for {} ({})".format( - string_ref, register) - return fixups - - - def fixup_data_reuse_data(self) -> List[str]: - current_entry_name = "" - - for line in self.lines: - # $SG72513 DB 'U', 00H, 'S', 00H, 'E', 00H, 'R', 00H, 'P', 00H, 'R', 00H - # DB 'O', 00H, 'F', 00H, 'I', 00H, 'L', 00H, 'E', 00H, 00H, 00H - if line.startswith("$SG"): - parts = line.split() - name = parts[0] - current_entry_name = name - value = b'' - for part in parts: - if part.startswith('\''): - value += str.encode(part.split('\'')[1]) - elif part.endswith('H') or part.endswith('H,'): - hex = part.split('H')[0] - value += bytes.fromhex(hex) - - if not name in self.fixups: - raise Exception("DataReuse: Entry {} not found in fixups".format(name)) - self.fixups[name].data = value - - - elif line.startswith("\tDB"): - if current_entry_name == "": - continue - value = b'' - parts = line.split() - for part in parts: - if part.startswith('\''): - value += str.encode(part.split('\'')[1]) - elif part.endswith('H') or part.endswith('H,'): - hex = part.split('H')[0] - if len(hex) == 3: - hex = hex.lstrip('0') - value += bytes.fromhex(hex) - - if not name in self.fixups: - raise Exception("DataReuse: Entry {} not found in fixups".format(name)) - self.fixups[name].data += value - - else: - current_entry_name = "" - - - def write_lines_to(self, filename): - with open(filename, 'w',) as asmfile: - for line in self.lines: - asmfile.write(line + "\n") - diff --git a/tests/test_asm.py b/tests/test_asm.py index 45f74d0..6133546 100644 --- a/tests/test_asm.py +++ b/tests/test_asm.py @@ -3,7 +3,7 @@ from typing import List import unittest import logging -from phases.compiler import fixup_asm_file, fixup_iat_reuse +from phases.asmparser import parse_asm_file from model.exehost import ExeHost from model.defs import * from model.carrier import Carrier @@ -18,9 +18,10 @@ class AsmTest(unittest.TestCase): def test_asm_fixup(self): path_in: FilePath = "tests/data/peb_walk_pre_fixup.asm" path_working: FilePath = "tests/data/peb_walk_pre_fixup.asm.test" + carrier = Carrier() shutil.copy(path_in, path_working) - fixup_asm_file(path_working, 272) + parse_asm_file(carrier, path_working) with open(path_working, "r") as f: lines = f.readlines() @@ -35,7 +36,7 @@ class AsmTest(unittest.TestCase): self.assertTrue("supermega_payload" not in lines[198-1]) # shcstart: - self.assertTrue("shcstart:" in lines[213-1]) + self.assertTrue("shcstart:" in lines[212-1]) os.remove(path_working) @@ -46,7 +47,7 @@ class AsmTest(unittest.TestCase): shutil.copy(path_in, path_working) carrier = Carrier() - fixup_iat_reuse(path_working, carrier) + parse_asm_file(carrier, path_working) self.assertEqual(len(carrier.iat_requests), 2) diff --git a/tests/test_datareuse.py b/tests/test_datareuse.py index 5645a90..eb4d6a9 100644 --- a/tests/test_datareuse.py +++ b/tests/test_datareuse.py @@ -5,7 +5,9 @@ import logging import os from model.defs import * from model.exehost import ExeHost -from phases.datareuse import ReusedataAsmFileParser +from model.carrier import Carrier +from phases.asmparser import parse_asm_file + class DataReuseTest(unittest.TestCase): def test_relocation_list(self): @@ -40,12 +42,12 @@ class DataReuseTest(unittest.TestCase): def test_data_reuse_entries(self): asm_in = "tests/data/data_reuse_pre_fixup.asm" - data_reuse_entries = [] - - asmFileParser = ReusedataAsmFileParser(asm_in) - asmFileParser.init() - asmFileParser.process() - data_reuse_entries = asmFileParser.get_reusedata_fixups() + asm_working = "tests/data/data_reuse_pre_fixup.asm.test" + + shutil.copy(asm_in, asm_working) + carrier = Carrier() + parse_asm_file(carrier, asm_working) + data_reuse_entries = carrier.get_all_reusedata_fixups() self.assertEqual(2, len(data_reuse_entries)) @@ -59,16 +61,20 @@ class DataReuseTest(unittest.TestCase): entry = data_reuse_entries[1] self.assertTrue('$SG72514' in entry.string_ref) + os.remove(asm_working) + def test_data_reuse_fixup(self): asm_in = "tests/data/data_reuse_pre_fixup.asm" - asm_out = asm_in + ".test" - asmFileParser = ReusedataAsmFileParser(asm_in) - asmFileParser.init() - asmFileParser.process() - asmFileParser.write_lines_to(asm_out + ".test") - with open(asm_out + ".test", "r") as f: + asm_working = asm_in + ".test" + + shutil.copy(asm_in, asm_working) + carrier = Carrier() + parse_asm_file(carrier, asm_working) + + with open(asm_working, "r") as f: lines = f.readlines() self.assertTrue("\tDB " in lines[108-1]) self.assertFalse("OFFSET FLAT:$SG" in lines[108-1]) - os.remove(asm_out + ".test") + + os.remove(asm_working)