refactor: make IAT_REUSE check work well

This commit is contained in:
Dobin
2024-02-16 15:26:56 +00:00
parent 20c4749e8c
commit ca8e830ec6
6 changed files with 149 additions and 122 deletions
+25 -35
View File
@@ -7,14 +7,14 @@ import pehelper
logger = logging.getLogger("Model")
class Capability():
def __init__(self, name):
self.name = name
self.id: bytes = b""
self.addr: int = 0
class IatResolve():
def __init__(self, name: str, placeholder: bytes, addr: int):
self.name: str = name # Function Name, like "VirtualAlloc"
self.id: bytes = placeholder # Random bytes
self.addr: int = addr # The address of the IAT entry (incl. image_base)
def __str__(self):
def __str__(self) -> str:
return "0x{:X}: {} ({})".format(
self.addr,
self.name,
@@ -23,8 +23,8 @@ class Capability():
class ExeInfo():
def __init__(self, capabilities):
self.capabilities: Dict[str, Capability] = {}
def __init__(self):
self.iat_resolves: Dict[str, IatResolve] = {}
self.image_base = 0
self.dynamic_base = False
@@ -36,8 +36,10 @@ class ExeInfo():
self.base_relocs = []
self.rwx_section = None
for cap in capabilities:
self.capabilities[cap] = Capability(cap)
def add_capability(self, func_name, placeholder):
self.iat_resolves[func_name] = IatResolve(
func_name, placeholder, pehelper.get_addr_for(self.iat, func_name))
def parse_from_exe(self, filepath):
@@ -61,10 +63,7 @@ class ExeInfo():
self.code_rawsize = self.code_section.SizeOfRawData
# iat
iat = pehelper.extract_iat(pe)
for _, cap in self.capabilities.items():
cap.addr = pehelper.get_addr_for(iat, cap.name)
self.iat = iat
self.iat = pehelper.extract_iat(pe)
# relocs
if hasattr(pe, 'DIRECTORY_ENTRY_BASERELOC'):
@@ -81,32 +80,23 @@ class ExeInfo():
self.rwx_section = pehelper.get_rwx_section(pe)
def get(self, func_name):
if not func_name in self.capabilities:
return None
if self.capabilities[func_name].addr == 0:
return None
return self.capabilities[func_name]
def get_all_iat_resolvs(self) -> Dict[str, IatResolve]:
return self.iat_resolves
def get_all(self) -> Dict[str, Capability]:
return self.capabilities
def has_all(self):
needs = [ 'GetEnvironmentVariableW', 'VirtualAlloc']
for need in needs:
if not need in self.capabilities:
return False
if self.capabilities[need].addr == 0:
return False
return True
def has_all_functions(self, needs):
is_ok = True
for func_name in needs:
addr = pehelper.get_addr_for(self.iat, func_name)
if addr == 0:
logging.warn("Not available as import: {}".format(func_name))
is_ok = False
return is_ok
def print(self):
logger.info("--( Capabilities: ")
for _, cap in self.capabilities.items():
logger.info("--( Required IAT Resolves: ")
for _, cap in self.iat_resolves.items():
if cap.addr == 0:
logger.info(" {:28} {}".format(cap.name, "N/A"))
else:
-7
View File
@@ -113,13 +113,6 @@ def get_addr_for(iat, func_name: str) -> int:
return 0
def resolve_iat_capabilities(needed_capabilities, inject_exe):
pe = pefile.PE(inject_exe)
iat = extract_iat(pe)
for _, cap in needed_capabilities.items():
cap.addr = get_addr_for(iat, cap.name)
## Utils
def remove_trailing_null_bytes(data: bytes) -> bytes:
+54 -26
View File
@@ -20,7 +20,7 @@ def compile(
):
logger.info("--[ Compile C to ASM: {} -> {} ".format(c_in, asm_out))
# Phase 1: C To Assembly
# Compile C To Assembly (text)
logger.info("---[ Make ASM from C: {} ".format(c_in))
run_process_checkret([
config.get("path_cl"),
@@ -34,13 +34,13 @@ def compile(
raise Exception("Error: Compiling failed")
observer.add_text("payload_asm_orig", file_readall_text(asm_out))
# Phase 1.2: Assembly fixup
# Assembly text fixup (SuperMega)
logger.info("---[ Fixup : {} ".format(asm_out))
if not fixup_asm_file(asm_out, payload_len, exe_info):
raise Exception("Error: Fixup failed")
observer.add_text("payload_asm_fixup", file_readall_text(asm_out))
# Phase 1.1: Assembly cleanup
# Assembly cleanup (masm_shc)
asm_clean_file = asm_out + ".clean"
logger.info("---[ Cleanup: {} ".format(asm_out))
run_process_checkret([
@@ -51,6 +51,7 @@ def compile(
if not os.path.isfile(asm_clean_file):
raise Exception("Error: Cleanup filed")
# Move to destination we expect
shutil.move(asm_clean_file, asm_out)
observer.add_text("payload_asm_cleanup", file_readall_text(asm_out))
@@ -63,7 +64,7 @@ def bytes_to_asm_db(byte_data: bytes) -> bytes:
return "\tDB " + formatted_string
def fixup_asm_file(filename: FilePath, payload_len: int, capabilities: ExeInfo):
def fixup_asm_file(filename: FilePath, payload_len: int, exe_info: ExeInfo):
with open(filename, 'r', encoding='utf-8') as asmfile:
lines = asmfile.readlines()
@@ -72,27 +73,6 @@ def fixup_asm_file(filename: FilePath, payload_len: int, capabilities: ExeInfo):
# if "jmp\tSHORT" in lines[idx]:
# lines[idx] = lines[idx].replace("SHORT", "")
# do IAT reuse
for idx, line in enumerate(lines):
# Remove EXTRN, we dont need it
if "EXTRN __imp_" in lines[idx]:
lines[idx] = "; " + lines[idx]
continue
# Fix call
if "call" in lines[idx] and "__imp_" in lines[idx]:
func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip()
exeCapability = capabilities.get(func_name)
if exeCapability == None:
logger.error("Error Capabilities not: {}".format(func_name))
else:
randbytes: bytes = os.urandom(6)
lines[idx] = bytes_to_asm_db(randbytes) + "\r\n"
exeCapability.id = randbytes
logger.info(" > Replace func name: {} with {}".format(
func_name, randbytes))
# replace external reference with shellcode reference
for idx, line in enumerate(lines):
if "supermega_payload" in lines[idx]:
@@ -128,4 +108,52 @@ def fixup_asm_file(filename: FilePath, payload_len: int, capabilities: ExeInfo):
with open(filename, 'w') as asmfile:
asmfile.writelines(lines)
return True
return True
def get_function_stubs(asm_in: FilePath):
functions = []
with open(asm_in, 'r', encoding='utf-8') as asmfile:
lines = asmfile.readlines()
# EXTRN __imp_GetEnvironmentVariableW:PROC
for line in lines:
if "EXTRN __imp_" in line:
a = line
a = a.split("__imp_")[1]
a = a.split(":PROC")[0]
func_name = a
#func_name = line.strip("\r\n ")
#func_name = line.replace("EXTRN\t__imp_", "")
#func_name = line.replace(":PROC", "")
functions.append(func_name)
return functions
def fixup_iat_reuse(filename: FilePath, exe_info):
with open(filename, 'r', encoding='utf-8') as asmfile:
lines = asmfile.readlines()
# do IAT reuse
for idx, line in enumerate(lines):
# Remove EXTRN, we dont need it
if "EXTRN __imp_" in lines[idx]:
lines[idx] = "; " + lines[idx]
continue
# Fix call
# call QWORD PTR __imp_GetEnvironmentVariableW
if "call" in lines[idx] and "__imp_" in lines[idx]:
func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip()
randbytes: bytes = os.urandom(6)
lines[idx] = bytes_to_asm_db(randbytes) + "\r\n"
exe_info.add_capability(func_name, randbytes)
logger.info(" > Replace func name: {} with {}".format(
func_name, randbytes))
with open(filename, 'w') as asmfile:
asmfile.writelines(lines)
+22 -22
View File
@@ -14,7 +14,6 @@ def inject_exe(
shellcode_in: FilePath,
exe_in: FilePath,
exe_out: FilePath,
exe_info: ExeInfo,
):
logger.info("--[ Injecting: {} into: {} -> {} ".format(
shellcode_in, exe_in, exe_out
@@ -33,28 +32,29 @@ def inject_exe(
exe_out
])
# replace IAT in shellcode in code
# and re-implant it
if project.source_style == SourceStyle.iat_reuse:
# get code section of exe_out
code = extract_code_from_exe(exe_out)
for cap in exe_info.get_all().values():
if not cap.id in code:
raise Exception("Capability ID {} not found, abort".format(cap.id))
off = code.index(cap.id)
current_address = off + exe_info.image_base + exe_info.code_virtaddr
destination_address = cap.addr
logger.info(" Replace at 0x{:x} with call to 0x{:x}".format(
current_address, destination_address
))
jmp = assemble_and_disassemble_jump(
current_address, destination_address
)
code = code.replace(cap.id, jmp)
# write back our patched code into the exe
write_code_section(exe_file=exe_out, new_data=code)
def injected_fix_iat(exe_out: FilePath, exe_info: ExeInfo):
"""replace IAT in shellcode in code and re-implant it"""
# get code section of exe_out
code = extract_code_from_exe(exe_out)
for cap in exe_info.get_all_iat_resolvs().values():
if not cap.id in code:
raise Exception("IatResolve ID {} not found, abort".format(cap.id))
off = code.index(cap.id)
current_address = off + exe_info.image_base + exe_info.code_virtaddr
destination_address = cap.addr
logger.info(" Replace at 0x{:x} with call to 0x{:x}".format(
current_address, destination_address
))
jmp = assemble_and_disassemble_jump(
current_address, destination_address
)
code = code.replace(cap.id, jmp)
# write back our patched code into the exe
write_code_section(exe_file=exe_out, new_data=code)
def verify_injected_exe(exefile: FilePath) -> int:
+5 -3
View File
@@ -39,13 +39,15 @@ class Project():
def load_payload(self):
logging.info("Load payload: {}".format(self.payload_path))
with open(self.payload_path, 'rb') as input2:
self.payload_data = input2.read()
def load_injectable(self, tmp_caps):
self.exe_info = ExeInfo(tmp_caps)
def load_injectable(self):
logging.info("Load injectable: {}".format(self.inject_exe_in))
self.exe_info = ExeInfo()
self.exe_info.parse_from_exe(self.inject_exe_in)
project = Project()
+43 -29
View File
@@ -81,23 +81,20 @@ def main():
project.try_start_final_shellcode = False
if args.verify == "peb":
project.source_style = SourceStyle.peb_walk
project.inject = True
project.inject_mode = "1,1"
project.inject_exe_in = "exes/7z.exe"
project.inject_exe_out = "out/7z-verify.exe"
elif args.verify == "iat":
project.inject = True
project.inject_mode = "1,1"
project.inject_exe_in = "exes/procexp64.exe"
project.inject_exe_out = "out/procexp64-a.exe"
elif args.verify == "iat":
project.source_style = SourceStyle.iat_reuse
project.inject = True
project.inject_mode = "1,1"
project.inject_exe_in = "exes/iattest-full.exe"
project.inject_exe_out = "out/iatttest-full-a.exe"
project.inject_exe_out = "out/procexp64-verify.exe"
elif args.verify == "rwx":
project.source_style = SourceStyle.peb_walk
project.inject = True
project.inject_mode = "1,1"
project.inject_exe_in = "exes/wifiinfoview.exe"
project.inject_exe_out = "out/wifiinfoview.exe-a.exe"
project.inject_exe_out = "out/wifiinfoview.exe-verify.exe"
else:
logger.info("Unknown verify option {}, use std/iat".format(args.verify))
@@ -139,35 +136,50 @@ def start():
# Load our input
project.load_payload()
project.load_injectable([
"GetEnvironmentVariableW",
"VirtualAlloc"
])
project.exe_info.print()
project.load_injectable()
# choose which source / technique we gonna use
if project.exe_info.has_all():
project.source_style = SourceStyle.iat_reuse
else:
logger.info("--[ Some imports are missing for the shellcode to use IAT_REUSE")
project.source_style = SourceStyle.peb_walk
logger.warning("--[ SourceStyle: {}".format(project.source_style.name))
# Copy: loader C files into working directory: build/
# Copy: IAT_REUSE loader C files into working directory: build/
phases.templater.create_c_from_template(
source_style = project.source_style,
source_style = SourceStyle.iat_reuse,
alloc_style = project.alloc_style,
exec_style = project.exec_style,
decoder_style= project.decoder_style,
)
# Compile: C -> ASM
# Compile: IAT_REUSE loader C -> ASM
if project.generate_asm_from_c:
phases.compiler.compile(
c_in = main_c_file,
asm_out = main_asm_file,
payload_len = len(project.payload_data),
exe_info = project.exe_info)
# Decide if we can use IAT_REUSE (all function calls available as import)
required_functions = phases.compiler.get_function_stubs(main_asm_file)
if project.exe_info.has_all_functions(required_functions):
project.source_style = SourceStyle.iat_reuse
logger.warning("--[ SourceStyle: Using IAT_REUSE".format())
# all good, patch ASM
phases.compiler.fixup_iat_reuse(main_asm_file, project.exe_info)
else:
# Not good, Fall back to PEB_WALK
project.source_style = SourceStyle.peb_walk
logger.warning("--[ SourceStyle: Fall back to PEB_WALK".format())
clean_files()
# Copy: PEB_WALK loader C files into working directory: build/
phases.templater.create_c_from_template(
source_style = SourceStyle.peb_walk,
alloc_style = project.alloc_style,
exec_style = project.exec_style,
decoder_style= project.decoder_style,
)
# Compile: PEB_WALK C -> ASM
if project.generate_asm_from_c:
phases.compiler.compile(
c_in = main_c_file,
asm_out = main_asm_file,
payload_len = len(project.payload_data),
exe_info = project.exe_info)
# Assemble: ASM -> Shellcode
if project.generate_shc_from_asm:
@@ -216,9 +228,11 @@ def start():
phases.injector.inject_exe(
shellcode_in = main_shc_file,
exe_in = project.inject_exe_in,
exe_out = project.inject_exe_out,
exe_info = project.exe_info
exe_out = project.inject_exe_out
)
if project.source_style == SourceStyle.iat_reuse:
phases.injector.injected_fix_iat(project.inject_exe_out, project.exe_info)
if project.verify:
logger.info("--[ Verify infected exe")
exit_code = phases.injector.verify_injected_exe(project.inject_exe_out)