refactor: bix cleanup

This commit is contained in:
Dobin
2024-02-16 08:34:46 +00:00
parent cb40434aae
commit 7d6e726fab
9 changed files with 125 additions and 167 deletions
+30
View File
@@ -0,0 +1,30 @@
from enum import Enum
class FilePath(str):
pass
class AllocStyle(Enum):
RWX = "rwx_1"
#RW_X = "rw_x"
#REUSE = "reuse"
class ExecStyle(Enum):
CALL = "direct_1",
#JMP = 2,
#FIBER = 3,
class DecoderStyle(Enum):
PLAIN_1 = "plain_1"
XOR_1 = "xor_1"
class DataRefStyle(Enum):
APPEND = 1
#class InjectStyle(Enum):
class SourceStyle(Enum):
peb_walk = 1
iat_reuse = 2
+9 -75
View File
@@ -1,84 +1,18 @@
import subprocess
import os
import time
import shutil
import pathlib
import sys
import pefile
import glob
import logging
from config import config
from project import project
from pehelper import *
from defs import *
logger = logging.getLogger("Helper")
SHC_VERIFY_SLEEP = 0.1
verify_filename = r'C:\Temp\a'
build_dir = "build"
def remove_trailing_null_bytes(data):
for i in range(len(data) - 1, -1, -1):
if data[i] != b'\x00'[0]: # Check for a non-null byte
return data[:i + 1]
return b'' # If the entire sequence is null bytes
def get_code_section_data(pe_file):
try:
# Load the PE file
pe = pefile.PE(pe_file)
# Iterate over the sections
#for section in pe.sections:
# # Check if this is the code section
# if '.text' in section.Name.decode().rstrip('\x00'):
# data = section.get_data()
# data = remove_trailing_null_bytes(data)
# logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format(
# section.VirtualAddress,
# len(data), section.SizeOfRawData))
# return data
section = get_code_section(pe)
if section == None:
raise Exception("Code section not found.")
logger.info("--[ Code section: {}".format(section.Name.decode().rstrip('\x00')))
data = section.get_data()
data = remove_trailing_null_bytes(data)
logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format(
section.VirtualAddress,
len(data), section.SizeOfRawData))
return data
except FileNotFoundError:
logger.info(f"File not found: {pe_file}")
except pefile.PEFormatError:
logger.info(f"Invalid PE file: {pe_file}")
def write_code_section(pe_file, new_data):
# Load the PE file
pe = pefile.PE(pe_file)
# Iterate over the sections
for section in pe.sections:
# Check if this is the code section
if '.text' in section.Name.decode().rstrip('\x00'):
file_offset = section.PointerToRawData
with open(pe_file, 'r+b') as f:
f.seek(file_offset)
f.write(new_data)
#logger.info("Successfully overwritten the .text section with new data.")
break
def clean_files():
logger.info("--[ Remove old files")
@@ -89,13 +23,13 @@ def clean_files():
"mllink$.lnk",
# out/ stuff
os.path.join(build_dir, "main.asm"),
os.path.join(build_dir, "main.bin"),
os.path.join(build_dir, "main.c"),
os.path.join(build_dir, "peb_lookup.h"),
#os.path.join(build_dir, "main.exe"),
os.path.join(project.build_dir, "main.asm"),
os.path.join(project.build_dir, "main.bin"),
os.path.join(project.build_dir, "main.c"),
os.path.join(project.build_dir, "peb_lookup.h"),
#os.path.join(project.build_dir, "main.exe"),
verify_filename,
project.verify_filename,
]
for file in files_to_clean:
pathlib.Path(file).unlink(missing_ok=True)
@@ -133,7 +67,7 @@ def try_start_shellcode(shc_file):
subprocess.run([
config.get["path_runshc"],
shc_file,
]) # , check=True
])
def file_readall_text(filepath) -> str:
@@ -155,4 +89,4 @@ def delete_all_files_in_directory(directory_path):
os.remove(file_path)
#logger.info(f"Deleted {file_path}")
except Exception as e:
logger.info(f"Error deleting {file_path}: {e}")
logger.info(f"Error deleting {file_path}: {e}")
+3 -31
View File
@@ -1,40 +1,12 @@
from typing import Dict
import pehelper
import pefile
from enum import Enum
import logging
import pefile
import pehelper
logger = logging.getLogger("Model")
class FilePath(str):
pass
class AllocStyle(Enum):
RWX = "rwx_1"
#RW_X = "rw_x"
#REUSE = "reuse"
class ExecStyle(Enum):
CALL = "direct_1",
#JMP = 2,
#FIBER = 3,
class DecoderStyle(Enum):
PLAIN_1 = "plain_1"
XOR_1 = "xor_1"
class DataRefStyle(Enum):
APPEND = 1
#class InjectStyle(Enum):
class SourceStyle(Enum):
peb_walk = 1
iat_reuse = 2
class Capability():
def __init__(self, name):
self.name = name
+35 -23
View File
@@ -5,38 +5,50 @@ from keystone import Ks, KS_ARCH_X86, KS_MODE_64
from capstone import Cs, CS_ARCH_X86, CS_MODE_64
import logging
from model import *
from helper import *
logger = logging.getLogger("PEHelper")
def get_code_section(pe):
def extract_code_from_exe(exe_file: FilePath) -> bytes:
pe = pefile.PE(exe_file)
section = get_code_section(pe)
logger.info("--[ Code section: {}".format(section.Name.decode().rstrip('\x00')))
data: bytes = section.get_data()
data = remove_trailing_null_bytes(data)
logger.info(" > 0x{:X} Code Size: {} (raw code section size: {})".format(
section.VirtualAddress,
len(data), section.SizeOfRawData))
return data
def write_code_section(exe_file: FilePath, new_data: bytes):
pe = pefile.PE(exe_file)
section = get_code_section(pe)
file_offset = section.PointerToRawData
with open(exe_file, 'r+b') as f:
f.seek(file_offset)
f.write(new_data)
def get_code_section(pe) -> pefile.SectionStructure:
entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint
for sect in pe.sections:
name = sect.Name.decode()
#logger.info("Checking: {} and 0x{:x}".format(name, sect.Characteristics))
if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']:
if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.SizeOfRawData:
return sect
#else:
# logger.info("NOOO: 0x{:x} 0x{:x} 0x{:x}".format(
# entrypoint,
# sect.VirtualAddress,
# sect.VirtualAddress + sect.SizeOfRawData,
# ))
return None
raise Exception("Code section not found")
# RWX
def get_rwx_section(pe):
def get_rwx_section(pe: pefile.PE) -> pefile.SectionStructure:
entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint
for section in pe.sections:
if (section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_READ'] and
section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_WRITE'] and
section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']
):
#name = section.Name.decode().rstrip('\x00')
if entrypoint > section.VirtualAddress and entrypoint < section.VirtualAddress + section.SizeOfRawData:
return section
return None
@@ -44,7 +56,7 @@ def get_rwx_section(pe):
# keystone/capstone stuff
def assemble_and_disassemble_jump(current_address, destination_address):
def assemble_and_disassemble_jump(current_address: int, destination_address: int) -> bytes:
#logger.info(" Make jmp from 0x{:X} to 0x{:X}".format(
# current_address, destination_address
#))
@@ -67,7 +79,7 @@ def assemble_and_disassemble_jump(current_address, destination_address):
# IAT Stuff
def extract_iat(pe):
def extract_iat(pe: pefile.PE):
iat = {}
# If the PE file was loaded using the fast_load=True argument, we will need to parse the data directories:
@@ -109,10 +121,10 @@ def resolve_iat_capabilities(needed_capabilities, inject_exe):
cap.addr = get_addr_for(iat, cap.name)
def main():
pe = pefile.PE(sys.argv[1])
iat = extract_iat(pe)
## Utils
if __name__ == "__main__":
main()
def remove_trailing_null_bytes(data: bytes) -> bytes:
for i in range(len(data) - 1, -1, -1):
if data[i] != b'\x00'[0]: # Check for a non-null byte
return data[:i + 1]
return b'' # If the entire sequence is null bytes
+13 -16
View File
@@ -8,37 +8,34 @@ from config import config
from observer import observer
from project import project
from helper import *
from pehelper import *
logger = logging.getLogger("Assembler")
def make_shc_from_asm(asm_file, exe_file, shc_file):
logger.info("--[ Assemble to exe: {} -> {} -> {}".format(asm_file, exe_file, shc_file))
logger.info("---[ Assemble ASM to EXE: {} -> {}".format(asm_file, exe_file))
def asm_to_shellcode(asm_in: FilePath, build_exe: FilePath, shellcode_out: FilePath):
"""Takes ASM source file asm_in, compiles it into build_exe, extracts its code section and write into shellcode_out"""
logger.info("--[ Assemble to exe: {} -> {} -> {}".format(asm_in, build_exe, shellcode_out))
run_process_checkret([
config.get("path_ml64"),
asm_file,
asm_in,
"/link",
"/OUT:{}".format(exe_file),
"/OUT:{}".format(build_exe),
"/entry:AlignRSP"
])
if not os.path.isfile(exe_file):
if not os.path.isfile(build_exe):
logger.error("Error")
return
logger.info("---[ EXE to SHC: {} -> {} ".format(exe_file, shc_file))
code = get_code_section_data(exe_file)
with open(shc_file, 'wb') as f:
code = extract_code_from_exe(build_exe)
with open(shellcode_out, 'wb') as f:
f.write(code)
return code
#logger.info("---[ Shellcode from {} written to: {} (size: {}) ".format(exe_file, shc_file, len(code)))
def merge_loader_payload(main_shc_file):
def merge_loader_payload(shellcode_in: FilePath, shellcode_out: FilePath, payload: FilePath, decoder_style: DecoderStyle):
logger.info("--[ Merge stager: {} + {} -> {}".format(
main_shc_file, project.payload, main_shc_file))
with open(main_shc_file, 'rb') as input1:
shellcode_in, project.payload, shellcode_out))
with open(shellcode_in, 'rb') as input1:
data_stager = input1.read()
with open(project.payload, 'rb') as input2:
data_payload = input2.read()
@@ -53,7 +50,7 @@ def merge_loader_payload(main_shc_file):
logger.info("---[ Size: Stager: {} and Payload: {} Sum: {} ".format(
len(data_stager), len(data_payload), len(data_stager)+len(data_payload)))
with open(main_shc_file, 'wb') as output:
with open(shellcode_out, 'wb') as output:
data = data_stager + data_payload
output.write(data)
observer.add_code("final_shellcode", data)
+4 -3
View File
@@ -1,10 +1,11 @@
from helper import *
from config import config
import os
import pprint
from observer import observer
import logging
import shutil
from helper import *
from config import config
from observer import observer
from project import project
from model import *
+5 -4
View File
@@ -2,6 +2,7 @@ from helper import *
import shutil
import pprint
import logging
import time
from pehelper import *
from model import *
@@ -36,7 +37,7 @@ def inject_exe(shc_file: FilePath):
# and re-implant it
if project.source_style == SourceStyle.iat_reuse:
# get code section of exe_out
code = get_code_section_data(exe_out)
code = extract_code_from_exe(exe_out)
for cap in exe_capabilities.get_all().values():
if not cap.id in code:
logger.error("Capability ID {} not found, abort".format(cap.id))
@@ -58,16 +59,16 @@ def inject_exe(shc_file: FilePath):
def verify_injected_exe(exefile):
logger.info("---[ Verify infected exe: {} ".format(exefile))
# remove indicator file
pathlib.Path(verify_filename).unlink(missing_ok=True)
pathlib.Path(project.verify_filename).unlink(missing_ok=True)
run_process_checkret([
exefile,
], check=False)
time.sleep(SHC_VERIFY_SLEEP)
if os.path.isfile(verify_filename):
if os.path.isfile(project.verify_filename):
logger.info("---> Verify OK. Infected exe works (file was created)")
# better to remove it immediately
os.remove(verify_filename)
os.remove(project.verify_filename)
return True
else:
logger.error("---> Verify FAIL. Infected exe does not work (no file created)")
+4 -1
View File
@@ -1,5 +1,5 @@
from model import *
from defs import *
class Project():
def __init__(self):
@@ -32,5 +32,8 @@ class Project():
self.generate_asm_from_c: bool = True
self.generate_shc_from_asm: bool = True
self.verify_filename = r'C:\Temp\a'
self.build_dir = "build"
project = Project()
+22 -14
View File
@@ -1,14 +1,15 @@
import shutil
from enum import Enum
from helper import *
import argparse
from typing import Dict
import pickle
import os
import logging
import time
from defs import *
from model import *
from helper import *
from config import config
from pehelper import *
import phases.templater
import phases.compiler
import phases.assembler
@@ -16,10 +17,10 @@ import phases.injector
from observer import observer
from project import project
main_c_file = os.path.join(build_dir, "main.c")
main_asm_file = os.path.join(build_dir, "main.asm")
main_exe_file = os.path.join(build_dir, "main.exe")
main_shc_file = os.path.join(build_dir, "main.bin")
main_c_file = os.path.join(project.build_dir, "main.c")
main_asm_file = os.path.join(project.build_dir, "main.asm")
main_exe_file = os.path.join(project.build_dir, "main.exe")
main_shc_file = os.path.join(project.build_dir, "main.bin")
# ANSI escape sequences for colors
@@ -178,7 +179,10 @@ def start():
# Convert: ASM -> Shellcode
if project.generate_shc_from_asm:
code = phases.assembler.make_shc_from_asm(main_asm_file, main_exe_file, main_shc_file)
code = phases.assembler.asm_to_shellcode(
asm_in = main_asm_file,
build_exe = main_exe_file,
shellcode_out = main_shc_file)
observer.add_code("generate_shc_from_asm", code)
# Try: Starting the shellcode (rarely useful)
@@ -187,7 +191,11 @@ def start():
# Merge shellcode/loader with payload
if project.dataref_style == DataRefStyle.APPEND:
phases.assembler.merge_loader_payload(main_shc_file)
phases.assembler.merge_loader_payload(
shellcode_in = main_shc_file,
shellcode_out = main_shc_file,
payload = project.payload,
decoder_style = project.decoder_style)
if project.verify and project.source_style == SourceStyle.peb_walk:
logger.info("--[ Verify final shellcode")
@@ -275,21 +283,21 @@ def verify_shellcode(shc_name):
logger.info("---[ Verify shellcode: {}".format(shc_name))
# check if directory exists
if not os.path.exists(os.path.dirname(verify_filename)):
logger.info("Error, directory does not exist for: {}".format(verify_filename))
if not os.path.exists(os.path.dirname(project.verify_filename)):
logger.info("Error, directory does not exist for: {}".format(project.verify_filename))
return
# remove indicator file
pathlib.Path(verify_filename).unlink(missing_ok=True)
pathlib.Path(project.verify_filename).unlink(missing_ok=True)
run_process_checkret([
config.get("path_runshc"),
"{}".format(shc_name),
], check=False)
time.sleep(SHC_VERIFY_SLEEP)
if os.path.isfile(verify_filename):
if os.path.isfile(project.verify_filename):
logger.info("---> Verify OK. Shellcode works (file was created)")
os.remove(verify_filename)
os.remove(project.verify_filename)
return True
else:
logger.info("---> Verify FAIL. Shellcode doesnt work (file was not created)")