diff --git a/model/exehost.py b/model/exehost.py index de46231..602f47a 100644 --- a/model/exehost.py +++ b/model/exehost.py @@ -37,7 +37,7 @@ class ExeHost(): logger.info("--[ Analyzing: {}".format(self.filepath)) self.superpe = SuperPe(self.filepath) - if self.superpe.arch != "x64": + if not self.superpe.is_64(): raise Exception("Binary is not 64bit: {}".format(self.filepath)) self.ep = self.superpe.get_entrypoint() diff --git a/pe/derbackdoorer.py b/pe/derbackdoorer.py index 5ff444f..4205961 100644 --- a/pe/derbackdoorer.py +++ b/pe/derbackdoorer.py @@ -114,9 +114,10 @@ class PeBackdoor: trampoline = '' addrOffset = -1 - registers = ['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi'] - - if self.superpe.arch == 'x86': + if self.superpe.is_64(): + registers = ['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi'] + else: + # Not really used registers = ['eax', 'ebx', 'ecx', 'edx', 'esi', 'edi'] reg = random.choice(registers).upper() diff --git a/pe/superpe.py b/pe/superpe.py index f0183cc..31c79a5 100644 --- a/pe/superpe.py +++ b/pe/superpe.py @@ -1,10 +1,8 @@ import pefile import capstone -from enum import IntEnum import logging -from typing import List +from typing import List, Dict -from utils import hexdump from model.defs import * logger = logging.getLogger("superpe") @@ -42,18 +40,17 @@ class SuperPe(): self.pe.parse_data_directories() - self.ptrSize = 4 - self.arch = self.getFileArch() - if self.arch == 'x64': - self.ptrSize = 8 + ## PE Properties def is_dll(self) -> bool: return self.filepath.endswith(".dll") def is_64(self) -> bool: - return self.arch == 'x64' + if self.pe.FILE_HEADER.Machine == 0x8664: + return True + return False def is_dotnet(self) -> bool: @@ -63,30 +60,18 @@ class SuperPe(): return True return False + + ## Entrypoint - def get_physical_address(self, virtual_address): - # Iterate through the section headers to find which section contains the VA - for section in self.pe.sections: - # Check if the VA is within the range of this section - if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize: - # Calculate the difference between the VA and the section's virtual address - virtual_offset = virtual_address - section.VirtualAddress - # Add the difference to the section's pointer to raw data - return virtual_offset - #physical_address = section.PointerToRawData + virtual_offset - #return physical_address - return None + def get_entrypoint(self) -> int: + return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint + + + def set_entrypoint(self, entrypoint: int): + self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint - def getFileArch(self): - if self.pe.FILE_HEADER.Machine == 0x014c: - return "x86" - - if self.pe.FILE_HEADER.Machine == 0x8664: - return "x64" - - raise Exception("Unsupported PE file architecture.") - + ## Section Access def get_code_section(self): entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint @@ -121,8 +106,18 @@ class SuperPe(): return section return None + + def write_code_section_data(self, data: bytes): + sect = self.get_code_section() + if len(data) != sect.SizeOfRawData: + logger.error(f'New code section data is larger than the original! {len(data)} != {sect.SizeOfRawData}') + return + self.pe.set_bytes_at_offset(sect.PointerToRawData, data) - def get_base_relocs(self): + + ## PE Specific Information + + def get_base_relocs(self) -> List[PeRelocEntry]: base_relocs = [] if hasattr(self.pe, 'DIRECTORY_ENTRY_BASERELOC'): for base_reloc in self.pe.DIRECTORY_ENTRY_BASERELOC: @@ -134,7 +129,7 @@ class SuperPe(): return base_relocs - def get_iat_entries(self): + def get_iat_entries(self) -> Dict[str, IatEntry]: iat = {} for entry in self.pe.DIRECTORY_ENTRY_IMPORT: for imp in entry.imports: @@ -150,14 +145,6 @@ class SuperPe(): return iat - def write_code_section_data(self, data: bytes): - sect = self.get_code_section() - if len(data) != sect.SizeOfRawData: - logger.error(f'New code section data is larger than the original! {len(data)} != {sect.SizeOfRawData}') - return - self.pe.set_bytes_at_offset(sect.PointerToRawData, data) - - def getSectionIndexByDataDir(self, dirIndex): addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dirIndex].VirtualAddress @@ -175,17 +162,6 @@ class SuperPe(): relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) out = self.pe.sections[relocsIndex].SizeOfRawData - self.pe.sections[relocsIndex].Misc_VirtualSize return out - - - def getSectionIndexByName(self, name): - i = 0 - for sect in self.pe.sections: - if sect.Name.decode().lower().startswith(name.lower()): - return i - i += 1 - - logger.error(f'Could not find section with name {name}!') - return -1 def addImageBaseRelocations(self, pageRva, relocs): @@ -195,9 +171,11 @@ class SuperPe(): logger.error("No .reloc section") raise(Exception("No .reloc section")) - imageBaseRelocType = SuperPe.IMAGE_REL_BASED_HIGHLOW - if self.arch == 'x64': + if self.is_64(): imageBaseRelocType = SuperPe.IMAGE_REL_BASED_DIR64 + else: + # Not really used + imageBaseRelocType = SuperPe.IMAGE_REL_BASED_HIGHLOW relocsSize = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) @@ -248,16 +226,26 @@ class SuperPe(): ## Helpers - def get_entrypoint(self) -> int: - return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint - - def set_entrypoint(self, entrypoint: int): - self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint - + def get_physical_address(self, virtual_address) -> int: + """Convert a virtual address to a physical address in the PE file""" + # Iterate through the section headers to find which section contains the VA + for section in self.pe.sections: + # Check if the VA is within the range of this section + if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize: + # Calculate the difference between the VA and the section's virtual address + virtual_offset = virtual_address - section.VirtualAddress + # Add the difference to the section's pointer to raw data + return virtual_offset + #physical_address = section.PointerToRawData + virtual_offset + #return physical_address + return None + def write_pe_to_file(self, outfile: str): self.pe.write(outfile) + + ## Disassembly / Output def disasmBytes(self, cs, ks, disasmData, startOffset, length, callback = None, maxDepth = 5): return self._disasmBytes(cs, ks, disasmData, startOffset, length, callback, maxDepth, 1)