refactor: small SuperPe improvements

This commit is contained in:
Dobin
2024-04-13 11:12:43 +01:00
parent f40d527783
commit b9d3a5a97e
3 changed files with 51 additions and 62 deletions
+1 -1
View File
@@ -37,7 +37,7 @@ class ExeHost():
logger.info("--[ Analyzing: {}".format(self.filepath)) logger.info("--[ Analyzing: {}".format(self.filepath))
self.superpe = SuperPe(self.filepath) self.superpe = SuperPe(self.filepath)
if self.superpe.arch != "x64": if not self.superpe.is_64():
raise Exception("Binary is not 64bit: {}".format(self.filepath)) raise Exception("Binary is not 64bit: {}".format(self.filepath))
self.ep = self.superpe.get_entrypoint() self.ep = self.superpe.get_entrypoint()
+4 -3
View File
@@ -114,9 +114,10 @@ class PeBackdoor:
trampoline = '' trampoline = ''
addrOffset = -1 addrOffset = -1
registers = ['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi'] if self.superpe.is_64():
registers = ['rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi']
if self.superpe.arch == 'x86': else:
# Not really used
registers = ['eax', 'ebx', 'ecx', 'edx', 'esi', 'edi'] registers = ['eax', 'ebx', 'ecx', 'edx', 'esi', 'edi']
reg = random.choice(registers).upper() reg = random.choice(registers).upper()
+46 -58
View File
@@ -1,10 +1,8 @@
import pefile import pefile
import capstone import capstone
from enum import IntEnum
import logging import logging
from typing import List from typing import List, Dict
from utils import hexdump
from model.defs import * from model.defs import *
logger = logging.getLogger("superpe") logger = logging.getLogger("superpe")
@@ -42,18 +40,17 @@ class SuperPe():
self.pe.parse_data_directories() self.pe.parse_data_directories()
self.ptrSize = 4
self.arch = self.getFileArch()
if self.arch == 'x64':
self.ptrSize = 8
## PE Properties
def is_dll(self) -> bool: def is_dll(self) -> bool:
return self.filepath.endswith(".dll") return self.filepath.endswith(".dll")
def is_64(self) -> bool: def is_64(self) -> bool:
return self.arch == 'x64' if self.pe.FILE_HEADER.Machine == 0x8664:
return True
return False
def is_dotnet(self) -> bool: def is_dotnet(self) -> bool:
@@ -63,30 +60,18 @@ class SuperPe():
return True return True
return False return False
## Entrypoint
def get_physical_address(self, virtual_address): def get_entrypoint(self) -> int:
# Iterate through the section headers to find which section contains the VA return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
for section in self.pe.sections:
# Check if the VA is within the range of this section
if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize: def set_entrypoint(self, entrypoint: int):
# Calculate the difference between the VA and the section's virtual address self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint
virtual_offset = virtual_address - section.VirtualAddress
# Add the difference to the section's pointer to raw data
return virtual_offset
#physical_address = section.PointerToRawData + virtual_offset
#return physical_address
return None
def getFileArch(self): ## Section Access
if self.pe.FILE_HEADER.Machine == 0x014c:
return "x86"
if self.pe.FILE_HEADER.Machine == 0x8664:
return "x64"
raise Exception("Unsupported PE file architecture.")
def get_code_section(self): def get_code_section(self):
entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint entrypoint = self.pe.OPTIONAL_HEADER.AddressOfEntryPoint
@@ -121,8 +106,18 @@ class SuperPe():
return section return section
return None return None
def write_code_section_data(self, data: bytes):
sect = self.get_code_section()
if len(data) != sect.SizeOfRawData:
logger.error(f'New code section data is larger than the original! {len(data)} != {sect.SizeOfRawData}')
return
self.pe.set_bytes_at_offset(sect.PointerToRawData, data)
def get_base_relocs(self):
## PE Specific Information
def get_base_relocs(self) -> List[PeRelocEntry]:
base_relocs = [] base_relocs = []
if hasattr(self.pe, 'DIRECTORY_ENTRY_BASERELOC'): if hasattr(self.pe, 'DIRECTORY_ENTRY_BASERELOC'):
for base_reloc in self.pe.DIRECTORY_ENTRY_BASERELOC: for base_reloc in self.pe.DIRECTORY_ENTRY_BASERELOC:
@@ -134,7 +129,7 @@ class SuperPe():
return base_relocs return base_relocs
def get_iat_entries(self): def get_iat_entries(self) -> Dict[str, IatEntry]:
iat = {} iat = {}
for entry in self.pe.DIRECTORY_ENTRY_IMPORT: for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports: for imp in entry.imports:
@@ -150,14 +145,6 @@ class SuperPe():
return iat return iat
def write_code_section_data(self, data: bytes):
sect = self.get_code_section()
if len(data) != sect.SizeOfRawData:
logger.error(f'New code section data is larger than the original! {len(data)} != {sect.SizeOfRawData}')
return
self.pe.set_bytes_at_offset(sect.PointerToRawData, data)
def getSectionIndexByDataDir(self, dirIndex): def getSectionIndexByDataDir(self, dirIndex):
addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dirIndex].VirtualAddress addr = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dirIndex].VirtualAddress
@@ -175,17 +162,6 @@ class SuperPe():
relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC)
out = self.pe.sections[relocsIndex].SizeOfRawData - self.pe.sections[relocsIndex].Misc_VirtualSize out = self.pe.sections[relocsIndex].SizeOfRawData - self.pe.sections[relocsIndex].Misc_VirtualSize
return out return out
def getSectionIndexByName(self, name):
i = 0
for sect in self.pe.sections:
if sect.Name.decode().lower().startswith(name.lower()):
return i
i += 1
logger.error(f'Could not find section with name {name}!')
return -1
def addImageBaseRelocations(self, pageRva, relocs): def addImageBaseRelocations(self, pageRva, relocs):
@@ -195,9 +171,11 @@ class SuperPe():
logger.error("No .reloc section") logger.error("No .reloc section")
raise(Exception("No .reloc section")) raise(Exception("No .reloc section"))
imageBaseRelocType = SuperPe.IMAGE_REL_BASED_HIGHLOW if self.is_64():
if self.arch == 'x64':
imageBaseRelocType = SuperPe.IMAGE_REL_BASED_DIR64 imageBaseRelocType = SuperPe.IMAGE_REL_BASED_DIR64
else:
# Not really used
imageBaseRelocType = SuperPe.IMAGE_REL_BASED_HIGHLOW
relocsSize = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size relocsSize = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC].Size
relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC) relocsIndex = self.getSectionIndexByDataDir(SuperPe.IMAGE_DIRECTORY_ENTRY_BASERELOC)
@@ -248,16 +226,26 @@ class SuperPe():
## Helpers ## Helpers
def get_entrypoint(self) -> int: def get_physical_address(self, virtual_address) -> int:
return self.pe.OPTIONAL_HEADER.AddressOfEntryPoint """Convert a virtual address to a physical address in the PE file"""
# Iterate through the section headers to find which section contains the VA
def set_entrypoint(self, entrypoint: int): for section in self.pe.sections:
self.pe.OPTIONAL_HEADER.AddressOfEntryPoint = entrypoint # Check if the VA is within the range of this section
if section.VirtualAddress <= virtual_address < section.VirtualAddress + section.Misc_VirtualSize:
# Calculate the difference between the VA and the section's virtual address
virtual_offset = virtual_address - section.VirtualAddress
# Add the difference to the section's pointer to raw data
return virtual_offset
#physical_address = section.PointerToRawData + virtual_offset
#return physical_address
return None
def write_pe_to_file(self, outfile: str): def write_pe_to_file(self, outfile: str):
self.pe.write(outfile) self.pe.write(outfile)
## Disassembly / Output
def disasmBytes(self, cs, ks, disasmData, startOffset, length, callback = None, maxDepth = 5): def disasmBytes(self, cs, ks, disasmData, startOffset, length, callback = None, maxDepth = 5):
return self._disasmBytes(cs, ks, disasmData, startOffset, length, callback, maxDepth, 1) return self._disasmBytes(cs, ks, disasmData, startOffset, length, callback, maxDepth, 1)