mirror of
https://github.com/dobin/SuperMega
synced 2026-06-02 17:27:10 +00:00
refactor: make asm cleanup/fixup work in memory
This commit is contained in:
+3
-6
@@ -8,10 +8,9 @@ from model.carrier import Carrier, DataReuseEntry, IatRequest
|
|||||||
logger = logging.getLogger("AsmParser")
|
logger = logging.getLogger("AsmParser")
|
||||||
|
|
||||||
|
|
||||||
def parse_asm_file(carrier, filename):
|
def parse_asm_file(carrier: Carrier, asm_text: str) -> List[str]:
|
||||||
lines_out = []
|
lines_out = []
|
||||||
with open(filename, 'r', encoding='utf-8') as asmfile:
|
lines = asm_text.split("\n")
|
||||||
lines = asmfile.readlines()
|
|
||||||
|
|
||||||
current_segment = None
|
current_segment = None
|
||||||
current_datareuse_entry= None
|
current_datareuse_entry= None
|
||||||
@@ -143,9 +142,7 @@ def parse_asm_file(carrier, filename):
|
|||||||
|
|
||||||
lines_out.append(line)
|
lines_out.append(line)
|
||||||
|
|
||||||
with open(filename, "w") as f:
|
return lines_out
|
||||||
for line in lines_out:
|
|
||||||
f.write(line + "\n")
|
|
||||||
|
|
||||||
|
|
||||||
def convert_asm_db_to_bytes(line: str) -> bytes:
|
def convert_asm_db_to_bytes(line: str) -> bytes:
|
||||||
|
|||||||
+19
-46
@@ -8,7 +8,7 @@ from helper import *
|
|||||||
from config import config
|
from config import config
|
||||||
from observer import observer
|
from observer import observer
|
||||||
from model import *
|
from model import *
|
||||||
from phases.masmshc import process_file, Params
|
from phases.masmshc import masm_shc, Params
|
||||||
from model.carrier import Carrier
|
from model.carrier import Carrier
|
||||||
from model.exehost import ExeHost
|
from model.exehost import ExeHost
|
||||||
from phases.asmparser import parse_asm_file
|
from phases.asmparser import parse_asm_file
|
||||||
@@ -35,37 +35,23 @@ def compile_dev(
|
|||||||
])
|
])
|
||||||
if not os.path.isfile(asm_out):
|
if not os.path.isfile(asm_out):
|
||||||
raise Exception("Error: Compiling failed")
|
raise Exception("Error: Compiling failed")
|
||||||
file_to_lf(asm_out)
|
|
||||||
observer.add_text_file("carrier_asm_orig", file_readall_text(asm_out))
|
|
||||||
|
|
||||||
# Assembly cleanup (masm_shc)
|
|
||||||
asm_clean_file = asm_out + ".clean"
|
|
||||||
logger.info("---[ ASM masm_shc: {} ".format(asm_out))
|
|
||||||
params = Params(asm_out, asm_clean_file,
|
|
||||||
inline_strings=False, # not for DATA_REUSE
|
|
||||||
remove_crt=True,
|
|
||||||
append_rsp_stub=True) # required atm
|
|
||||||
process_file(params)
|
|
||||||
|
|
||||||
if not os.path.isfile(asm_clean_file):
|
|
||||||
raise Exception("Error: Cleaned up ASM file {} was not created".format(
|
|
||||||
asm_clean_file
|
|
||||||
))
|
|
||||||
|
|
||||||
# Move to destination we expect
|
asm_text: str = file_readall_text(asm_out)
|
||||||
shutil.move(asm_clean_file, asm_out)
|
observer.add_text_file("carrier_asm_orig", asm_text)
|
||||||
if config.debug:
|
|
||||||
observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out))
|
logger.info("---[ ASM masm_shc: {} ".format(asm_out))
|
||||||
|
asm_text_lines: List[str] = parse_asm_file(Carrier(), asm_text)
|
||||||
|
asm_text = masm_shc(asm_text_lines)
|
||||||
|
observer.add_text_file("carrier_asm_cleanup", asm_text)
|
||||||
|
|
||||||
|
with open(asm_out, "w") as f:
|
||||||
|
f.write(asm_text)
|
||||||
|
|
||||||
|
|
||||||
def compile(
|
def compile(
|
||||||
c_in: FilePath,
|
c_in: FilePath,
|
||||||
asm_out: FilePath,
|
asm_out: FilePath,
|
||||||
payload_len: int,
|
|
||||||
carrier: Carrier,
|
carrier: Carrier,
|
||||||
source_style: FunctionInvokeStyle,
|
|
||||||
exe_host: ExeHost,
|
|
||||||
short_call_patching: bool = False,
|
|
||||||
):
|
):
|
||||||
logger.info("--[ Compile C to ASM: {} -> {} ".format(c_in, asm_out))
|
logger.info("--[ Compile C to ASM: {} -> {} ".format(c_in, asm_out))
|
||||||
|
|
||||||
@@ -80,26 +66,13 @@ def compile(
|
|||||||
])
|
])
|
||||||
if not os.path.isfile(asm_out):
|
if not os.path.isfile(asm_out):
|
||||||
raise Exception("Error: Compiling failed")
|
raise Exception("Error: Compiling failed")
|
||||||
file_to_lf(asm_out)
|
asm_text = file_readall_text(asm_out)
|
||||||
observer.add_text_file("carrier_asm_orig", file_readall_text(asm_out))
|
observer.add_text_file("carrier_asm_orig", asm_text)
|
||||||
|
|
||||||
# Fixup assembly file
|
asm_text_lines = parse_asm_file(carrier, asm_text) # Fixup assembly file
|
||||||
parse_asm_file(carrier, asm_out)
|
asm_text = masm_shc(asm_text_lines) # Cleanup assembly file
|
||||||
|
observer.add_text_file("carrier_asm_final", asm_text)
|
||||||
|
|
||||||
# Cleanup assembly file
|
# write back. Next step would be compiling this file
|
||||||
asm_clean_file = asm_out + ".clean"
|
with open(asm_out, "w") as f:
|
||||||
logger.info("---[ ASM masm_shc: {} ".format(asm_out))
|
f.write(asm_text)
|
||||||
params = Params(asm_out, asm_clean_file,
|
|
||||||
inline_strings=False, # not for DATA_REUSE
|
|
||||||
remove_crt=True,
|
|
||||||
append_rsp_stub=True) # required atm
|
|
||||||
process_file(params)
|
|
||||||
if not os.path.isfile(asm_clean_file):
|
|
||||||
raise Exception("Error: Cleaned up ASM file {} was not created".format(
|
|
||||||
asm_clean_file
|
|
||||||
))
|
|
||||||
# Move to destination we expect
|
|
||||||
shutil.move(asm_clean_file, asm_out)
|
|
||||||
|
|
||||||
# Log result
|
|
||||||
observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out))
|
|
||||||
|
|||||||
+101
-104
@@ -1,10 +1,13 @@
|
|||||||
import re
|
import re
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
import io
|
||||||
|
from typing import List
|
||||||
|
|
||||||
logger = logging.getLogger("masmshc")
|
logger = logging.getLogger("masmshc")
|
||||||
|
|
||||||
g_is32bit = False
|
# original source: https://github.com/hasherezade/masm_shc/blob/master/masm_shc/main.cpp
|
||||||
|
# Converted to python by chatgpt, with some manual fixups
|
||||||
|
|
||||||
|
|
||||||
class Params:
|
class Params:
|
||||||
@@ -71,121 +74,115 @@ _TEXT ENDS
|
|||||||
"""
|
"""
|
||||||
ofile.write(stub)
|
ofile.write(stub)
|
||||||
|
|
||||||
def process_file(params):
|
def masm_shc(asm_text_lines: List[str]) -> str:
|
||||||
global g_is32bit
|
g_is32bit = False
|
||||||
try:
|
consts_lines = {}
|
||||||
with open(params.infile, "r") as file, open(params.outfile, "w") as ofile:
|
seg_name = ""
|
||||||
consts_lines = {}
|
const_name = ""
|
||||||
seg_name = ""
|
code_start = False
|
||||||
const_name = ""
|
|
||||||
code_start = False
|
|
||||||
|
|
||||||
line_count = 0
|
params = Params("", "",
|
||||||
for line in file.readlines():
|
inline_strings=False, # not for DATA_REUSE
|
||||||
#for line_count, line in enumerate(file):
|
remove_crt=True,
|
||||||
tokens = split_to_tokens(line)
|
append_rsp_stub=True) # required atm
|
||||||
|
ofile = io.StringIO()
|
||||||
|
|
||||||
#print("Tokens: {}".format(" ".join(tokens)))
|
line_count = 0
|
||||||
|
for line in asm_text_lines:
|
||||||
|
line = line + "\n" # lol
|
||||||
|
tokens = split_to_tokens(line)
|
||||||
|
|
||||||
if not tokens:
|
if not tokens:
|
||||||
ofile.write(line)
|
ofile.write(line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if tokens[0] == ".686P":
|
||||||
|
g_is32bit = True
|
||||||
|
|
||||||
|
if tokens[0] == "EXTRN":
|
||||||
|
print(f"[ERROR] Line {line_count + 1}: External dependency detected:\n{line}")
|
||||||
|
|
||||||
|
in_skipped = False
|
||||||
|
in_const = False
|
||||||
|
|
||||||
|
if len(tokens) >= 2:
|
||||||
|
# TMP better stack alignment
|
||||||
|
#if tokens[0] == "sub" and tokens[1] == "rsp,":
|
||||||
|
# ofile.write(line)
|
||||||
|
# #ofile.write("\tand\trsp, 0FFFFFFFFFFFFFFF0h; Align RSP to 16 bytes\n")
|
||||||
|
# #ofile.write("\tsub\trsp, 8")
|
||||||
|
# continue
|
||||||
|
|
||||||
|
if tokens[1] == "SEGMENT":
|
||||||
|
seg_name = tokens[0]
|
||||||
|
if not code_start and seg_name == "_TEXT":
|
||||||
|
code_start = True
|
||||||
|
if g_is32bit:
|
||||||
|
ofile.write("assume fs:nothing\n")
|
||||||
|
# TMP better stack alignment alternative
|
||||||
|
#else:
|
||||||
|
# ofile.write("\tjmp\tmain\n")
|
||||||
|
elif params.append_rsp_stub:
|
||||||
|
append_align_rsp(ofile)
|
||||||
|
logger.debug("[INFO] Entry Point: AlignRSP")
|
||||||
|
|
||||||
|
if seg_name == "_BSS":
|
||||||
|
logger.error(f"[ERROR] Line {line_count + 1}: _BSS segment detected! Remove all global and static variables!\n")
|
||||||
|
|
||||||
|
if seg_name in ("pdata", "xdata", "voltbl"):
|
||||||
|
in_skipped = True
|
||||||
|
elif seg_name in ("CONST", "_DATA"):
|
||||||
|
in_const = True
|
||||||
|
elif tokens[1] == "ENDS" and tokens[0] == seg_name:
|
||||||
|
seg_name = ""
|
||||||
|
if in_const:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if tokens[0] == ".686P":
|
if in_skipped:
|
||||||
g_is32bit = True
|
continue
|
||||||
|
|
||||||
if tokens[0] == "EXTRN":
|
if params.remove_crt and tokens[0] == "INCLUDELIB":
|
||||||
print(f"[ERROR] Line {line_count + 1}: External dependency detected:\n{line}")
|
if tokens[1] in ("LIBCMT", "OLDNAMES"):
|
||||||
|
ofile.write(f"; {line}\n") # copy commented out line
|
||||||
|
continue
|
||||||
|
print(f"[ERROR] Line {line_count + 1}: INCLUDELIB detected! Remove all external dependencies!\n")
|
||||||
|
|
||||||
in_skipped = False
|
if params.inline_strings and in_const:
|
||||||
in_const = False
|
if tokens[1] == "DB":
|
||||||
|
const_name = tokens[0]
|
||||||
|
if const_name != "":
|
||||||
|
if const_name not in consts_lines:
|
||||||
|
consts_lines[const_name] = line
|
||||||
|
else:
|
||||||
|
consts_lines[const_name] += "\n" + line
|
||||||
|
continue
|
||||||
|
|
||||||
if len(tokens) >= 2:
|
if tokens[0] == "rex_jmp":
|
||||||
# TMP better stack alignment
|
line = re.sub(r"rex_jmp", "JMP", line)
|
||||||
#if tokens[0] == "sub" and tokens[1] == "rsp,":
|
|
||||||
# ofile.write(line)
|
|
||||||
# #ofile.write("\tand\trsp, 0FFFFFFFFFFFFFFF0h; Align RSP to 16 bytes\n")
|
|
||||||
# #ofile.write("\tsub\trsp, 8")
|
|
||||||
# continue
|
|
||||||
|
|
||||||
if tokens[1] == "SEGMENT":
|
curr_const = get_constant(consts_lines, tokens)
|
||||||
seg_name = tokens[0]
|
if params.inline_strings and curr_const != "":
|
||||||
if not code_start and seg_name == "_TEXT":
|
label_after = f"after_{curr_const}"
|
||||||
code_start = True
|
ofile.write(f"\tCALL {label_after}\n")
|
||||||
if g_is32bit:
|
ofile.write(consts_lines[curr_const] + "\n")
|
||||||
ofile.write("assume fs:nothing\n")
|
ofile.write(f"{label_after}:\n")
|
||||||
# TMP better stack alignment alternative
|
if len(tokens) > 2 and (tokens[0] in ("lea", "mov")):
|
||||||
#else:
|
offset_index = tokens.index("OFFSET", 1)
|
||||||
# ofile.write("\tjmp\tmain\n")
|
instructions = tokens[1]
|
||||||
elif params.append_rsp_stub:
|
if offset_index == 4:
|
||||||
append_align_rsp(ofile)
|
instructions = f"{tokens[1]} {tokens[2]} {tokens[3]}"
|
||||||
logger.debug("[INFO] Entry Point: AlignRSP")
|
ofile.write(f"\tPOP {instructions}\n")
|
||||||
|
ofile.write("\n")
|
||||||
|
ofile.write(f"; {line}\n") # copy commented out line
|
||||||
|
continue
|
||||||
|
|
||||||
if seg_name == "_BSS":
|
if not g_is32bit and any(token in tokens for token in ["gs:96"]):
|
||||||
logger.error(f"[ERROR] Line {line_count + 1}: _BSS segment detected! Remove all global and static variables!\n")
|
#line = re.sub(r"gs:96", "gs[96]\r\n", line)
|
||||||
|
line = line.replace("gs:96", "gs:[96]")
|
||||||
|
|
||||||
if seg_name in ("pdata", "xdata", "voltbl"):
|
ofile.write(line) # copy line
|
||||||
in_skipped = True
|
|
||||||
elif seg_name in ("CONST", "_DATA"):
|
|
||||||
in_const = True
|
|
||||||
elif tokens[1] == "ENDS" and tokens[0] == seg_name:
|
|
||||||
seg_name = ""
|
|
||||||
if in_const:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if in_skipped:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if params.remove_crt and tokens[0] == "INCLUDELIB":
|
|
||||||
if tokens[1] in ("LIBCMT", "OLDNAMES"):
|
|
||||||
ofile.write(f"; {line}\n") # copy commented out line
|
|
||||||
continue
|
|
||||||
print(f"[ERROR] Line {line_count + 1}: INCLUDELIB detected! Remove all external dependencies!\n")
|
|
||||||
|
|
||||||
if params.inline_strings and in_const:
|
|
||||||
if tokens[1] == "DB":
|
|
||||||
const_name = tokens[0]
|
|
||||||
if const_name != "":
|
|
||||||
if const_name not in consts_lines:
|
|
||||||
consts_lines[const_name] = line
|
|
||||||
else:
|
|
||||||
consts_lines[const_name] += "\n" + line
|
|
||||||
continue
|
|
||||||
|
|
||||||
if tokens[0] == "rex_jmp":
|
|
||||||
line = re.sub(r"rex_jmp", "JMP", line)
|
|
||||||
|
|
||||||
curr_const = get_constant(consts_lines, tokens)
|
|
||||||
if params.inline_strings and curr_const != "":
|
|
||||||
label_after = f"after_{curr_const}"
|
|
||||||
ofile.write(f"\tCALL {label_after}\n")
|
|
||||||
ofile.write(consts_lines[curr_const] + "\n")
|
|
||||||
ofile.write(f"{label_after}:\n")
|
|
||||||
if len(tokens) > 2 and (tokens[0] in ("lea", "mov")):
|
|
||||||
offset_index = tokens.index("OFFSET", 1)
|
|
||||||
instructions = tokens[1]
|
|
||||||
if offset_index == 4:
|
|
||||||
instructions = f"{tokens[1]} {tokens[2]} {tokens[3]}"
|
|
||||||
ofile.write(f"\tPOP {instructions}\n")
|
|
||||||
ofile.write("\n")
|
|
||||||
ofile.write(f"; {line}\n") # copy commented out line
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not g_is32bit and any(token in tokens for token in ["gs:96"]):
|
|
||||||
#line = re.sub(r"gs:96", "gs[96]\r\n", line)
|
|
||||||
line = line.replace("gs:96", "gs:[96]")
|
|
||||||
|
|
||||||
ofile.write(line) # copy line
|
|
||||||
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
print(f"[ERROR] {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if params.inline_strings:
|
if params.inline_strings:
|
||||||
print("[INFO] Strings have been inlined. It may require to change some short jumps (jmp SHORT) into jumps (jmp)")
|
print("[INFO] Strings have been inlined. It may require to change some short jumps (jmp SHORT) into jumps (jmp)")
|
||||||
return True
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
return ofile.getvalue()
|
||||||
# Example usage
|
|
||||||
params = Params("test.asm", "testout.asm", True, True, True)
|
|
||||||
process_file(params)
|
|
||||||
|
|||||||
+9
-10
@@ -109,12 +109,15 @@ def start(settings: Settings) -> int:
|
|||||||
prepare_project("default", settings)
|
prepare_project("default", settings)
|
||||||
|
|
||||||
# Do the thing and catch the errors
|
# Do the thing and catch the errors
|
||||||
try:
|
if False:
|
||||||
start_real(settings)
|
start_real(settings)
|
||||||
except Exception as e:
|
else:
|
||||||
logger.error(f'Error compiling: {e}')
|
try:
|
||||||
observer.write_logs(settings.main_dir)
|
start_real(settings)
|
||||||
return 1
|
except Exception as e:
|
||||||
|
logger.error(f'Error compiling: {e}')
|
||||||
|
observer.write_logs(settings.main_dir)
|
||||||
|
return 1
|
||||||
|
|
||||||
# Cleanup files
|
# Cleanup files
|
||||||
clean_tmp_files()
|
clean_tmp_files()
|
||||||
@@ -146,11 +149,7 @@ def start_real(settings: Settings):
|
|||||||
phases.compiler.compile(
|
phases.compiler.compile(
|
||||||
c_in = settings.main_c_path,
|
c_in = settings.main_c_path,
|
||||||
asm_out = settings.main_asm_path,
|
asm_out = settings.main_asm_path,
|
||||||
payload_len = project.payload.len,
|
carrier = project.carrier)
|
||||||
carrier = project.carrier,
|
|
||||||
source_style = project.settings.source_style,
|
|
||||||
exe_host = project.exe_host,
|
|
||||||
short_call_patching = project.settings.short_call_patching)
|
|
||||||
|
|
||||||
# Assemble: Assemble .asm to .shc (ASM -> SHC)
|
# Assemble: Assemble .asm to .shc (ASM -> SHC)
|
||||||
if settings.generate_shc_from_asm:
|
if settings.generate_shc_from_asm:
|
||||||
|
|||||||
Reference in New Issue
Block a user