refactor: make log/observer nice

This commit is contained in:
Dobin
2024-03-29 12:32:24 +00:00
parent 32f89a1b20
commit a46e0e4f13
7 changed files with 132 additions and 139 deletions
+12 -28
View File
@@ -45,50 +45,34 @@ def run_process_checkret(args, check=True):
logger.warn("Caught KeyboardInterrupt, exiting gracefully...")
except subprocess.CalledProcessError as e:
logger.warn(f"Command '{e.cmd}' returned non-zero exit status {e.returncode}.")
# Handle the error case
except Exception as e:
logger.warn(f"An error occurred: {e}")
# Handle other exceptions
logger.warn(f"An error occurred executing {e}")
# handle output
stdout_s = ""
stdout_b = b""
if ret.stdout != None:
stdout_b = ret.stdout
stdout_s = ret.stdout.decode('utf-8')
stderr_s = ""
stderr_b = b""
if ret.stderr != None:
stderr_b = ret.stderr
stderr_s = ret.stderr.decode('utf-8')
# write directly to logs/ dir
with open(f"{logs_dir}/cmdoutput.log", "ab") as f:
cmd = "------------------------------------\n"
cmd += "--- " + " ".join(args) + "\n"
f.write(cmd.encode('utf-8'))
observer.add_cmd_output(stdout_s)
f.write(stdout_b)
observer.add_cmd_output(stderr_s)
f.write(stderr_b)
# log it
observer.add_cmd_output(">>> {}\n".format(" ".join(args)))
for line in stdout_s.split("\n"):
observer.add_cmd_output(line)
for line in stderr_s.split("\n"):
observer.add_cmd_output(line)
# check return code (optional)
if ret.returncode != 0 and check:
logger.info("----! FAILED Command: {}".format(" ".join(args)))
if ret.stdout != None:
observer.add_cmd_output(stdout_s)
logger.info(stdout_s)
if ret.stderr != None:
observer.add_cmd_output(stderr_s)
logger.info(stderr_s)
raise Exception("Command failed: " + " ".join(args))
# debug: show command output
if config.ShowCommandOutput:
logger.info("> " + " ".join(args))
if ret.stdout != None:
logger.info(stdout_s)
if ret.stderr != None:
logger.info(stderr_s)
logger.info(">>> " + " ".join(args))
logger.info(stdout_s)
logger.info(stderr_s)
def try_start_shellcode(shc_file):
+47 -40
View File
@@ -1,6 +1,7 @@
import json
import pprint
from capstone import Cs, CS_ARCH_X86, CS_MODE_64
from typing import List, Dict
from pe.r2helper import r2_disas
from utils import delete_all_files_in_directory
@@ -8,68 +9,74 @@ from model.defs import *
class Observer():
"""Central class to store all logs and files created during the build process"""
def __init__(self):
self.cmd_output = []
self.logs = []
self.idx = 0
self.cmd_output = [] # output of external programs (cmdoutput.log)
self.logs: List[str] = [] # internal log messages (supermega.log)
self.files = [] # content of generated files
self.active = True
def reset(self):
self.cmd_output = []
self.logs = []
self.idx = 0
def add_cmd_output(self, cmd_output):
self.cmd_output.append(cmd_output)
def add_log(self, log):
def get_cmd_output(self):
return self.cmd_output
def add_log(self, log: str):
self.logs.append(log)
def get_logs(self):
return self.logs
def writelog(self):
# write log to file
with open(f"{logs_dir}/supermega.log", "w") as f:
for line in self.logs:
f.write(line + "\n")
def add_text(self, name, data):
self.write_to_file(name + ".txt", data)
self.idx += 1
def add_text_file(self, name, data):
self.files.append((name + ".txt", data))
def add_code(self, name, data: bytes):
def add_code_file(self, name, data: bytes):
ret = r2_disas(data)
self.write_to_file(name + ".disas.txt", ret['text'])
self.write_to_file(name + ".disas.ascii", ret['color'])
self.write_to_file(name + ".hex", ret['hexdump'])
self.write_to_file_bin(name + ".bin", data)
self.idx += 1
self.files.append((name + ".disas.ascii", ret['color']))
#self.write_to_file(name + ".disas.txt", ret['text'])
#self.write_to_file(name + ".disas.ascii", ret['color'])
#self.write_to_file(name + ".hex", ret['hexdump'])
#self.write_to_file_bin(name + ".bin", data)
#self.idx += 1
def add_json(self, name, data):
self.write_to_file(name, pprint.pformat(data, indent=4))
self.idx += 1
def write_to_file(self, filename, data):
if not self.active:
return
with open("{}/{}-{}".format(logs_dir, self.idx, filename), "w") as f:
f.write(data)
def write_to_file_bin(self, filename, data):
if not self.active:
return
with open("{}/{}-{}".format(logs_dir, self.idx, filename), "wb") as f:
f.write(data)
def clean_files(self):
delete_all_files_in_directory(f"{logs_dir}/")
self.idx = 0
self.logs = []
#def write_to_file(self, filename, data):
# if not self.active:
# return
# with open("{}/{}-{}".format(logs_dir, self.idx, filename), "w") as f:
# f.write(data)
def __str__(self):
s = "<todo>"
return s
#def write_to_file_bin(self, filename, data):
# if not self.active:
# return
# with open("{}/{}-{}".format(logs_dir, self.idx, filename), "wb") as f:
# f.write(data)
#def clean_files(self):
# delete_all_files_in_directory(f"{logs_dir}/")
# self.idx = 0
# self.logs = []
#def __str__(self):
# s = "<todo>"
# return s
observer = Observer()
+3 -3
View File
@@ -22,7 +22,7 @@ def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath, shellcode_out: FileP
if not os.path.isfile(build_exe):
raise Exception("Compiling failed")
code = extract_code_from_exe_file(build_exe)
observer.add_code("carrier_shc", code)
observer.add_code_file("carrier_shc", code)
with open(shellcode_out, 'wb') as f:
f.write(code)
@@ -35,7 +35,7 @@ def merge_loader_payload(
):
logger.info("--[ Merge stager with payload -> {}".format(
shellcode_out))
observer.add_code("payload_shc", payload_data)
observer.add_code_file("payload_shc", payload_data)
with open(shellcode_in, 'rb') as input1:
data_stager = input1.read()
@@ -55,5 +55,5 @@ def merge_loader_payload(
# append them
data = data_stager + payload_data
output.write(data)
observer.add_code("loader_shc", data)
observer.add_code_file("loader_shc", data)
+7 -7
View File
@@ -36,7 +36,7 @@ def compile_dev(
if not os.path.isfile(asm_out):
raise Exception("Error: Compiling failed")
file_to_lf(asm_out)
observer.add_text("carrier_asm_orig", file_readall_text(asm_out))
observer.add_text_file("carrier_asm_orig", file_readall_text(asm_out))
# Assembly cleanup (masm_shc)
asm_clean_file = asm_out + ".clean"
@@ -55,7 +55,7 @@ def compile_dev(
# Move to destination we expect
shutil.move(asm_clean_file, asm_out)
if config.debug:
observer.add_text("carrier_asm_cleanup", file_readall_text(asm_out))
observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out))
def compile(
@@ -81,7 +81,7 @@ def compile(
if not os.path.isfile(asm_out):
raise Exception("Error: Compiling failed")
file_to_lf(asm_out)
observer.add_text("carrier_asm_orig", file_readall_text(asm_out))
observer.add_text_file("carrier_asm_orig", file_readall_text(asm_out))
# DataReuse first
asmFileParser = ReusedataAsmFileParser(asm_out)
@@ -96,7 +96,7 @@ def compile(
raise Exception("Error: Fixup failed")
if config.debug:
observer.add_text("carrier_asm_fixup", file_readall_text(asm_out))
observer.add_text_file("carrier_asm_fixup", file_readall_text(asm_out))
# Assembly cleanup (masm_shc)
asm_clean_file = asm_out + ".clean"
@@ -114,7 +114,7 @@ def compile(
if source_style == SourceStyle.iat_reuse:
fixup_iat_reuse(asm_clean_file, carrier)
observer.add_text("carrier_asm_updated", file_readall_text(asm_clean_file))
observer.add_text_file("carrier_asm_updated", file_readall_text(asm_clean_file))
if not exe_host.has_all_carrier_functions(carrier):
logger.error("Error: Not all carrier functions are available in the target exe")
@@ -123,7 +123,7 @@ def compile(
# Move to destination we expect
shutil.move(asm_clean_file, asm_out)
if config.debug:
observer.add_text("carrier_asm_cleanup", file_readall_text(asm_out))
observer.add_text_file("carrier_asm_cleanup", file_readall_text(asm_out))
def bytes_to_asm_db(byte_data: bytes) -> bytes:
@@ -224,4 +224,4 @@ def fixup_iat_reuse(filename: FilePath, carrier: Carrier):
asmfile.writelines(lines)
if config.debug:
observer.add_text("carrier_asm_iat_patch", file_readall_text(filename))
observer.add_text_file("carrier_asm_iat_patch", file_readall_text(filename))
+2 -2
View File
@@ -67,8 +67,8 @@ def inject_exe(
in_code = code[peinj.shellcodeOffsetRel:peinj.shellcodeOffsetRel+shellcode_len]
jmp_code = code[peinj.backdoorOffsetRel:peinj.backdoorOffsetRel+12]
if config.debug:
observer.add_code("exe_extracted_loader", in_code)
observer.add_code("exe_extracted_jmp", jmp_code)
observer.add_code_file("exe_extracted_loader", in_code)
observer.add_code_file("exe_extracted_jmp", jmp_code)
#if in_code != shellcode:
# raise Exception("Shellcode injection error")
+6 -6
View File
@@ -57,7 +57,7 @@ def create_c_from_template(
if use_templates:
with open(PATH_PEB_WALK + "template.c", 'r', encoding='utf-8') as file:
template_content = file.read()
observer.add_text("main_c_template", template_content)
observer.add_text_file("main_c_template", template_content)
template = Template(template_content)
rendered_template = template.render({
@@ -68,12 +68,12 @@ def create_c_from_template(
})
with open(main_c_file, "w", encoding='utf-8') as file:
file.write(rendered_template)
observer.add_text("main_c_rendered", rendered_template)
observer.add_text_file("main_c_rendered", rendered_template)
# TODO PEB
shutil.copy(PATH_PEB_WALK + "peb_lookup.h", f"{build_dir}/peb_lookup.h")
else:
observer.add_text("main_c", file_readall_text(PATH_PEB_WALK + "main.c"))
observer.add_text_file("main_c", file_readall_text(PATH_PEB_WALK + "main.c"))
shutil.copy(PATH_PEB_WALK + "main.c", main_c_file)
# TODO PEB
shutil.copy(PATH_PEB_WALK + "peb_lookup.h", f"{build_dir}/peb_lookup.h")
@@ -82,7 +82,7 @@ def create_c_from_template(
if use_templates:
with open(PATH_IAT_REUSE + "template.c", 'r', encoding='utf-8') as file:
template_content = file.read()
observer.add_text("main_c_template", template_content)
observer.add_text_file("main_c_template", template_content)
template = Template(template_content)
rendered_template = template.render({
'plugin_allocator': plugin_allocator,
@@ -92,7 +92,7 @@ def create_c_from_template(
})
with open(main_c_file, "w", encoding='utf-8') as file:
file.write(rendered_template)
observer.add_text("main_c_rendered", rendered_template)
observer.add_text_file("main_c_rendered", rendered_template)
else:
observer.add_text("main_c", file_readall_text(PATH_IAT_REUSE + "main.c"))
observer.add_text_file("main_c", file_readall_text(PATH_IAT_REUSE + "main.c"))
shutil.copy(PATH_IAT_REUSE + "main.c", main_c_file)
+55 -53
View File
@@ -20,6 +20,7 @@ from model.defs import *
from log import setup_logging
from utils import delete_all_files_in_directory
def main():
"""Argument parsing for when called from command line"""
logger.info("Super Mega")
@@ -101,15 +102,48 @@ def main():
def start(settings: Settings):
"""Main entry point for the application. This is where the magic happens, based on settings"""
# Delete: all old files
if settings.cleanup_files_on_start:
clean_files()
delete_all_files_in_directory(f"{logs_dir}/")
# And logs
observer.reset()
exit_code = 0 # 0 = success
try:
start_real(settings)
except Exception as e:
logger.error(f'Error compiling: {e}')
write_logs()
return 1
# Cleanup files
if settings.cleanup_files_on_exit:
clean_files()
write_logs()
def write_logs():
# Our log output
with open(f"{logs_dir}/supermega.log", "w") as f:
for line in observer.get_logs():
f.write(line + "\n")
# Stdout of executed commands
with open(f"{logs_dir}/cmdoutput.log", "w") as f:
for line in observer.get_cmd_output():
f.write(line)
# Write all files
idx = 0
for name, data in observer.files:
with open(f"{logs_dir}/{idx}-{name}", "w") as f:
f.write(data)
idx += 1
def start_real(settings: Settings):
"""Main entry point for the application. This is where the magic happens, based on settings"""
# Load our input
project = Project(settings)
@@ -134,32 +168,21 @@ def start(settings: Settings):
# Compile: Carrier to .asm (C -> ASM)
if settings.generate_asm_from_c:
try:
phases.compiler.compile(
c_in = main_c_file,
asm_out = main_asm_file,
payload_len = project.payload.len,
carrier = project.carrier,
source_style = project.settings.source_style,
exe_host = project.exe_host,
short_call_patching = project.settings.short_call_patching)
except Exception as e:
logger.error(f'Error compiling: {e}')
observer.writelog()
return 1
phases.compiler.compile(
c_in = main_c_file,
asm_out = main_asm_file,
payload_len = project.payload.len,
carrier = project.carrier,
source_style = project.settings.source_style,
exe_host = project.exe_host,
short_call_patching = project.settings.short_call_patching)
# Assemble: Assemble .asm to .shc (ASM -> SHC)
if settings.generate_shc_from_asm:
try:
phases.assembler.asm_to_shellcode(
asm_in = main_asm_file,
build_exe = main_exe_file,
shellcode_out = main_shc_file)
except Exception as e:
logger.error("Error: Assembling failed: {}".format(e))
observer.writelog()
return 2
#shutil.copy(main_shc_file, "working/build/shellcode.bin")
phases.assembler.asm_to_shellcode(
asm_in = main_asm_file,
build_exe = main_exe_file,
shellcode_out = main_shc_file)
# Merge: shellcode/loader with payload (SHC + PAYLOAD -> SHC)
if settings.dataref_style == DataRefStyle.APPEND:
@@ -175,52 +198,31 @@ def start(settings: Settings):
project.exe_host.rwx_section.Name.decode().rstrip('\x00')
))
obfuscate_shc_loader(main_shc_file, main_shc_file + ".sgn")
observer.add_code("payload_sgn", file_readall_binary(main_shc_file + ".sgn"))
observer.add_code_file("payload_sgn", file_readall_binary(main_shc_file + ".sgn"))
shutil.move(main_shc_file + ".sgn", main_shc_file)
# inject merged loader into an exe
try:
phases.injector.inject_exe(main_shc_file, settings, project)
except PermissionError as e:
logger.error(f'Error writing file: {e}')
observer.writelog()
return 2
except Exception as e:
logger.error(f'Error injecting: {e}')
observer.writelog()
return 3
observer.add_code("exe_final", extract_code_from_exe_file_ep(settings.inject_exe_out, 300))
phases.injector.inject_exe(main_shc_file, settings, project)
observer.add_code_file("exe_final", extract_code_from_exe_file_ep(settings.inject_exe_out, 300))
if config.get("avred_server") != "":
if settings.verify or settings.try_start_final_infected_exe:
filename = os.path.basename(settings.inject_exe_in)
with open(settings.inject_exe_out, "rb") as f:
data = f.read()
try:
scannerDetectsBytes(data, filename, useBrotli=True, verify=settings.verify)
except Exception as e:
logger.error(f'Error scanning: {e}')
observer.writelog()
return 4
scannerDetectsBytes(data, filename, useBrotli=True, verify=settings.verify)
else:
# Start/verify it at the end
if settings.verify:
logger.info("--[ Verify infected exe")
exit_code = phases.injector.verify_injected_exe(settings.inject_exe_out)
payload_exit_code = phases.injector.verify_injected_exe(settings.inject_exe_out)
logging.info("Payload xit code: {}".format(payload_exit_code))
elif settings.try_start_final_infected_exe:
logger.info("--[ Start infected exe: {}".format(settings.inject_exe_out))
run_process_checkret([
settings.inject_exe_out,
], check=False)
# Cleanup files
if settings.cleanup_files_on_exit:
clean_files()
observer.writelog()
return exit_code
def obfuscate_shc_loader(file_shc_in, file_shc_out):
logger.info("--[ Obfuscate shellcode with SGN")