diff --git a/phases/asmparser.py b/phases/asmparser.py index 6dd3e20..70cf40c 100644 --- a/phases/asmparser.py +++ b/phases/asmparser.py @@ -8,10 +8,9 @@ from model.carrier import Carrier, DataReuseEntry, IatRequest logger = logging.getLogger("AsmParser") -def parse_asm_file(carrier, filename): +def parse_asm_file(carrier: Carrier, asm_text: str) -> List[str]: lines_out = [] - with open(filename, 'r', encoding='utf-8') as asmfile: - lines = asmfile.readlines() + lines = asm_text.split("\n") current_segment = None current_datareuse_entry= None @@ -143,9 +142,7 @@ def parse_asm_file(carrier, filename): lines_out.append(line) - with open(filename, "w") as f: - for line in lines_out: - f.write(line + "\n") + return lines_out def convert_asm_db_to_bytes(line: str) -> bytes: diff --git a/phases/compiler.py b/phases/compiler.py index e8c231e..0c873e2 100644 --- a/phases/compiler.py +++ b/phases/compiler.py @@ -8,7 +8,7 @@ from helper import * from config import config from observer import observer from model import * -from phases.masmshc import process_file, Params +from phases.masmshc import masm_shc, Params from model.carrier import Carrier from model.exehost import ExeHost from phases.asmparser import parse_asm_file @@ -35,37 +35,23 @@ def compile_dev( ]) if not os.path.isfile(asm_out): raise Exception("Error: Compiling failed") - file_to_lf(asm_out) - observer.add_text_file("carrier_asm_orig", file_readall_text(asm_out)) - - # Assembly cleanup (masm_shc) - asm_clean_file = asm_out + ".clean" - logger.info("---[ ASM masm_shc: {} ".format(asm_out)) - params = Params(asm_out, asm_clean_file, - inline_strings=False, # not for DATA_REUSE - remove_crt=True, - append_rsp_stub=True) # required atm - process_file(params) - - if not os.path.isfile(asm_clean_file): - raise Exception("Error: Cleaned up ASM file {} was not created".format( - asm_clean_file - )) - # Move to destination we expect - shutil.move(asm_clean_file, asm_out) - if config.debug: - observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out)) + asm_text: str = file_readall_text(asm_out) + observer.add_text_file("carrier_asm_orig", asm_text) + + logger.info("---[ ASM masm_shc: {} ".format(asm_out)) + asm_text_lines: List[str] = parse_asm_file(Carrier(), asm_text) + asm_text = masm_shc(asm_text_lines) + observer.add_text_file("carrier_asm_cleanup", asm_text) + + with open(asm_out, "w") as f: + f.write(asm_text) def compile( c_in: FilePath, asm_out: FilePath, - payload_len: int, carrier: Carrier, - source_style: FunctionInvokeStyle, - exe_host: ExeHost, - short_call_patching: bool = False, ): logger.info("--[ Compile C to ASM: {} -> {} ".format(c_in, asm_out)) @@ -80,26 +66,13 @@ def compile( ]) if not os.path.isfile(asm_out): raise Exception("Error: Compiling failed") - file_to_lf(asm_out) - observer.add_text_file("carrier_asm_orig", file_readall_text(asm_out)) + asm_text = file_readall_text(asm_out) + observer.add_text_file("carrier_asm_orig", asm_text) - # Fixup assembly file - parse_asm_file(carrier, asm_out) + asm_text_lines = parse_asm_file(carrier, asm_text) # Fixup assembly file + asm_text = masm_shc(asm_text_lines) # Cleanup assembly file + observer.add_text_file("carrier_asm_final", asm_text) - # Cleanup assembly file - asm_clean_file = asm_out + ".clean" - logger.info("---[ ASM masm_shc: {} ".format(asm_out)) - params = Params(asm_out, asm_clean_file, - inline_strings=False, # not for DATA_REUSE - remove_crt=True, - append_rsp_stub=True) # required atm - process_file(params) - if not os.path.isfile(asm_clean_file): - raise Exception("Error: Cleaned up ASM file {} was not created".format( - asm_clean_file - )) - # Move to destination we expect - shutil.move(asm_clean_file, asm_out) - - # Log result - observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out)) + # write back. Next step would be compiling this file + with open(asm_out, "w") as f: + f.write(asm_text) diff --git a/phases/masmshc.py b/phases/masmshc.py index 42d2549..35a393b 100644 --- a/phases/masmshc.py +++ b/phases/masmshc.py @@ -1,10 +1,13 @@ import re import os import logging +import io +from typing import List logger = logging.getLogger("masmshc") -g_is32bit = False +# original source: https://github.com/hasherezade/masm_shc/blob/master/masm_shc/main.cpp +# Converted to python by chatgpt, with some manual fixups class Params: @@ -71,121 +74,115 @@ _TEXT ENDS """ ofile.write(stub) -def process_file(params): - global g_is32bit - try: - with open(params.infile, "r") as file, open(params.outfile, "w") as ofile: - consts_lines = {} - seg_name = "" - const_name = "" - code_start = False +def masm_shc(asm_text_lines: List[str]) -> str: + g_is32bit = False + consts_lines = {} + seg_name = "" + const_name = "" + code_start = False - line_count = 0 - for line in file.readlines(): - #for line_count, line in enumerate(file): - tokens = split_to_tokens(line) + params = Params("", "", + inline_strings=False, # not for DATA_REUSE + remove_crt=True, + append_rsp_stub=True) # required atm + ofile = io.StringIO() - #print("Tokens: {}".format(" ".join(tokens))) + line_count = 0 + for line in asm_text_lines: + line = line + "\n" # lol + tokens = split_to_tokens(line) - if not tokens: - ofile.write(line) + if not tokens: + ofile.write(line) + continue + + if tokens[0] == ".686P": + g_is32bit = True + + if tokens[0] == "EXTRN": + print(f"[ERROR] Line {line_count + 1}: External dependency detected:\n{line}") + + in_skipped = False + in_const = False + + if len(tokens) >= 2: + # TMP better stack alignment + #if tokens[0] == "sub" and tokens[1] == "rsp,": + # ofile.write(line) + # #ofile.write("\tand\trsp, 0FFFFFFFFFFFFFFF0h; Align RSP to 16 bytes\n") + # #ofile.write("\tsub\trsp, 8") + # continue + + if tokens[1] == "SEGMENT": + seg_name = tokens[0] + if not code_start and seg_name == "_TEXT": + code_start = True + if g_is32bit: + ofile.write("assume fs:nothing\n") + # TMP better stack alignment alternative + #else: + # ofile.write("\tjmp\tmain\n") + elif params.append_rsp_stub: + append_align_rsp(ofile) + logger.debug("[INFO] Entry Point: AlignRSP") + + if seg_name == "_BSS": + logger.error(f"[ERROR] Line {line_count + 1}: _BSS segment detected! Remove all global and static variables!\n") + + if seg_name in ("pdata", "xdata", "voltbl"): + in_skipped = True + elif seg_name in ("CONST", "_DATA"): + in_const = True + elif tokens[1] == "ENDS" and tokens[0] == seg_name: + seg_name = "" + if in_const: continue - if tokens[0] == ".686P": - g_is32bit = True + if in_skipped: + continue - if tokens[0] == "EXTRN": - print(f"[ERROR] Line {line_count + 1}: External dependency detected:\n{line}") + if params.remove_crt and tokens[0] == "INCLUDELIB": + if tokens[1] in ("LIBCMT", "OLDNAMES"): + ofile.write(f"; {line}\n") # copy commented out line + continue + print(f"[ERROR] Line {line_count + 1}: INCLUDELIB detected! Remove all external dependencies!\n") - in_skipped = False - in_const = False + if params.inline_strings and in_const: + if tokens[1] == "DB": + const_name = tokens[0] + if const_name != "": + if const_name not in consts_lines: + consts_lines[const_name] = line + else: + consts_lines[const_name] += "\n" + line + continue - if len(tokens) >= 2: - # TMP better stack alignment - #if tokens[0] == "sub" and tokens[1] == "rsp,": - # ofile.write(line) - # #ofile.write("\tand\trsp, 0FFFFFFFFFFFFFFF0h; Align RSP to 16 bytes\n") - # #ofile.write("\tsub\trsp, 8") - # continue + if tokens[0] == "rex_jmp": + line = re.sub(r"rex_jmp", "JMP", line) - if tokens[1] == "SEGMENT": - seg_name = tokens[0] - if not code_start and seg_name == "_TEXT": - code_start = True - if g_is32bit: - ofile.write("assume fs:nothing\n") - # TMP better stack alignment alternative - #else: - # ofile.write("\tjmp\tmain\n") - elif params.append_rsp_stub: - append_align_rsp(ofile) - logger.debug("[INFO] Entry Point: AlignRSP") + curr_const = get_constant(consts_lines, tokens) + if params.inline_strings and curr_const != "": + label_after = f"after_{curr_const}" + ofile.write(f"\tCALL {label_after}\n") + ofile.write(consts_lines[curr_const] + "\n") + ofile.write(f"{label_after}:\n") + if len(tokens) > 2 and (tokens[0] in ("lea", "mov")): + offset_index = tokens.index("OFFSET", 1) + instructions = tokens[1] + if offset_index == 4: + instructions = f"{tokens[1]} {tokens[2]} {tokens[3]}" + ofile.write(f"\tPOP {instructions}\n") + ofile.write("\n") + ofile.write(f"; {line}\n") # copy commented out line + continue - if seg_name == "_BSS": - logger.error(f"[ERROR] Line {line_count + 1}: _BSS segment detected! Remove all global and static variables!\n") + if not g_is32bit and any(token in tokens for token in ["gs:96"]): + #line = re.sub(r"gs:96", "gs[96]\r\n", line) + line = line.replace("gs:96", "gs:[96]") - if seg_name in ("pdata", "xdata", "voltbl"): - in_skipped = True - elif seg_name in ("CONST", "_DATA"): - in_const = True - elif tokens[1] == "ENDS" and tokens[0] == seg_name: - seg_name = "" - if in_const: - continue - - if in_skipped: - continue - - if params.remove_crt and tokens[0] == "INCLUDELIB": - if tokens[1] in ("LIBCMT", "OLDNAMES"): - ofile.write(f"; {line}\n") # copy commented out line - continue - print(f"[ERROR] Line {line_count + 1}: INCLUDELIB detected! Remove all external dependencies!\n") - - if params.inline_strings and in_const: - if tokens[1] == "DB": - const_name = tokens[0] - if const_name != "": - if const_name not in consts_lines: - consts_lines[const_name] = line - else: - consts_lines[const_name] += "\n" + line - continue - - if tokens[0] == "rex_jmp": - line = re.sub(r"rex_jmp", "JMP", line) - - curr_const = get_constant(consts_lines, tokens) - if params.inline_strings and curr_const != "": - label_after = f"after_{curr_const}" - ofile.write(f"\tCALL {label_after}\n") - ofile.write(consts_lines[curr_const] + "\n") - ofile.write(f"{label_after}:\n") - if len(tokens) > 2 and (tokens[0] in ("lea", "mov")): - offset_index = tokens.index("OFFSET", 1) - instructions = tokens[1] - if offset_index == 4: - instructions = f"{tokens[1]} {tokens[2]} {tokens[3]}" - ofile.write(f"\tPOP {instructions}\n") - ofile.write("\n") - ofile.write(f"; {line}\n") # copy commented out line - continue - - if not g_is32bit and any(token in tokens for token in ["gs:96"]): - #line = re.sub(r"gs:96", "gs[96]\r\n", line) - line = line.replace("gs:96", "gs:[96]") - - ofile.write(line) # copy line - - except FileNotFoundError as e: - print(f"[ERROR] {e}") - return False + ofile.write(line) # copy line if params.inline_strings: print("[INFO] Strings have been inlined. It may require to change some short jumps (jmp SHORT) into jumps (jmp)") - return True -if __name__ == "__main__": - # Example usage - params = Params("test.asm", "testout.asm", True, True, True) - process_file(params) \ No newline at end of file + return ofile.getvalue() diff --git a/supermega.py b/supermega.py index 60de2a8..4bc0273 100644 --- a/supermega.py +++ b/supermega.py @@ -109,12 +109,15 @@ def start(settings: Settings) -> int: prepare_project("default", settings) # Do the thing and catch the errors - try: + if False: start_real(settings) - except Exception as e: - logger.error(f'Error compiling: {e}') - observer.write_logs(settings.main_dir) - return 1 + else: + try: + start_real(settings) + except Exception as e: + logger.error(f'Error compiling: {e}') + observer.write_logs(settings.main_dir) + return 1 # Cleanup files clean_tmp_files() @@ -146,11 +149,7 @@ def start_real(settings: Settings): phases.compiler.compile( c_in = settings.main_c_path, asm_out = settings.main_asm_path, - payload_len = project.payload.len, - carrier = project.carrier, - source_style = project.settings.source_style, - exe_host = project.exe_host, - short_call_patching = project.settings.short_call_patching) + carrier = project.carrier) # Assemble: Assemble .asm to .shc (ASM -> SHC) if settings.generate_shc_from_asm: