fix: find gaps in the relocation for data inject

This commit is contained in:
Dobin
2024-03-03 19:19:58 +00:00
parent 903add2c4f
commit 5e46432d96
4 changed files with 116 additions and 40 deletions
+19 -31
View File
@@ -7,6 +7,7 @@ from model.defs import *
import pe.pehelper as pehelper import pe.pehelper as pehelper
from pe.superpe import SuperPe, PeSection from pe.superpe import SuperPe, PeSection
from model.carrier import Carrier from model.carrier import Carrier
from model.rangemanager import RangeManager
logger = logging.getLogger("ExeHost") logger = logging.getLogger("ExeHost")
@@ -36,6 +37,7 @@ class ExeHost():
self.iat: Dict[str, IatEntry] = {} self.iat: Dict[str, IatEntry] = {}
self.base_relocs: List[PeRelocEntry] = [] self.base_relocs: List[PeRelocEntry] = []
self.base_reloc_ranges: RangeManager = None
self.image_base: int = 0 self.image_base: int = 0
self.dynamic_base: bool = False self.dynamic_base: bool = False
@@ -127,43 +129,29 @@ class ExeHost():
section: PeSection = self.superpe.get_section_by_name(section_name) section: PeSection = self.superpe.get_section_by_name(section_name)
if section is None: if section is None:
return [] return []
return [reloc for reloc in self.base_relocs if reloc.base_rva == section.virt_addr] ret = []
def get_reloc_largest_gap(self, section_name: str = ".rdata"):
tree = IntervalTree()
section: PeSection = self.superpe.get_section_by_name(section_name)
print("-- Relocations: {}".format(len(self.base_relocs)))
print("-- section: 0x{:x}".format(section.virt_addr))
#return [reloc for reloc in self.base_relocs if reloc.base_rva == section.virt_addr]
for reloc in self.base_relocs: for reloc in self.base_relocs:
if reloc.base_rva == section.virt_addr: reloc_addr = reloc.rva
print("--- Adding reloc: {} {}".format(reloc.offset, reloc.offset + 8)) if reloc_addr >= section.virt_addr and reloc_addr < section.virt_addr + section.virt_size:
tree.add(Interval(reloc.offset, reloc.offset + 8)) ret.append(reloc)
tree.add(Interval(section.virt_size, section.virt_size + 1)) return ret
# Initialize variables to track the largest gap and its bounds
max_gap = 0
gap_start = None
gap_end = None
# Sort intervals for sequential comparison def get_rdata_relocmanager(self) -> RangeManager:
sorted_intervals = sorted(tree) section = self.superpe.get_section_by_name(".rdata")
for i in range(len(sorted_intervals) - 1): relocs = self.get_relocations_for_section(".rdata")
current_end = sorted_intervals[i].end #print("Relocs for .rdata: {} of {}".format(len(relocs), len(self.base_relocs)))
next_start = sorted_intervals[i + 1].begin
gap = next_start - current_end
if gap > max_gap:
max_gap = gap
gap_start = current_end # Adjusted for the actual start of the gap
gap_end = next_start - 1 # Adjusted for the actual end of the gap
# Adjust for the artificial +1 in interval ends rm = RangeManager(section.virt_addr, section.virt_addr + section.virt_size)
if gap_start is not None and gap_end is not None: for reloc in relocs:
gap_start -= 1 # Reloc destination is probably 8 bytes
# But i add another 8 to skip over small holes (common in .rdata)
rm.add_range(reloc.rva, reloc.rva + 8 + 8)
rm.merge_overlaps()
return rm
return max_gap - 1, gap_start, gap_end
def has_all_carrier_functions(self, carrier: Carrier): def has_all_carrier_functions(self, carrier: Carrier):
is_ok = True is_ok = True
+43
View File
@@ -0,0 +1,43 @@
from intervaltree import Interval, IntervalTree
class RangeManager:
def __init__(self, min=0, max=1000):
self.intervals = IntervalTree()
self.min = min
self.max = max
def merge_overlaps(self):
self.intervals.merge_overlaps(strict=False)
def add_range(self, start, end):
if start < self.min or end > self.max:
raise ValueError("Ranges must be within 0x{:X} and 0x{:X}, not 0x{:X}/0x{:X}".format(
self.min, self.max, start, end
))
self.intervals.add(Interval(start, end))
def find_hole(self, hole_size):
sorted_intervals = sorted(self.intervals)
last_end = self.min
for interval in sorted_intervals:
start, end = interval.begin, interval.end
if start - last_end >= hole_size:
return (last_end + 1, start - 1)
last_end = max(last_end, end)
def find_holes(self, hole_size):
sorted_intervals = sorted(self.intervals)
holes = []
last_end = self.min
for interval in sorted_intervals:
start, end = interval.begin, interval.end
if start - last_end >= hole_size:
holes.append((last_end + 1, start - 1))
last_end = max(last_end, end)
if last_end < self.max and self.max - last_end >= hole_size:
holes.append((last_end + 1, self.max))
return holes
+19 -7
View File
@@ -110,18 +110,30 @@ def injected_fix_data(superpe: SuperPe, carrier: Carrier, exe_host: ExeHost):
peSection = exe_host.superpe.get_section_by_name(".rdata") peSection = exe_host.superpe.get_section_by_name(".rdata")
if peSection == None: if peSection == None:
raise Exception("No .rdata section found, abort") raise Exception("No .rdata section found, abort")
sect_data_copy = peSection.pefile_section.get_data()
string_off = find_first_utf16_string_offset(sect_data_copy) rm = exe_host.get_rdata_relocmanager()
if string_off == None: if False: # seems i dont need this, even tho i dont understand why
raise Exception("Strings not found in .rdata section, abort") sect_data_copy = peSection.pefile_section.get_data()
if string_off < 100: string_off = find_first_utf16_string_offset(sect_data_copy)
logging.warn("weird: Strings in .rdata section at offset {} < 100".format(string_off)) if string_off == None:
fixup_offset_rdata = peSection.raw_addr + string_off raise Exception("Strings not found in .rdata section, abort")
if string_off < 100:
logging.warn("weird: Strings in .rdata section at offset {} < 100".format(string_off))
rm.add_range(peSection.virt_addr, peSection.virt_addr + string_off)
# Do all .rdata patches # Do all .rdata patches
for datareuse_fixup in reusedata_fixups: for datareuse_fixup in reusedata_fixups:
# get a hole in the .rdata section to put our data
hole = rm.find_hole(len(datareuse_fixup.data))
if hole == None:
raise Exception("No hole found in .rdata section, abort")
fixup_offset_rdata = hole[0] # the start address of the hole (from start of .rdata)
rm.add_range(hole[0], hole[1]) # mark it as used
var_data = datareuse_fixup.data var_data = datareuse_fixup.data
superpe.pe.set_bytes_at_offset(fixup_offset_rdata, var_data) superpe.pe.set_bytes_at_offset(fixup_offset_rdata, var_data)
datareuse_fixup.addr = fixup_offset_rdata + peSection.virt_addr + exe_host.image_base - peSection.raw_addr datareuse_fixup.addr = fixup_offset_rdata + peSection.virt_addr + exe_host.image_base - peSection.raw_addr
logging.info(" Add data to .rdata at 0x{:X} (off: {}): {}".format(
datareuse_fixup.addr, fixup_offset_rdata, var_data.decode('utf-16le')))
fixup_offset_rdata += len(var_data) + 8 fixup_offset_rdata += len(var_data) + 8
# patch code section # patch code section
+33
View File
@@ -0,0 +1,33 @@
from typing import List
import unittest
import logging
from pe.superpe import SuperPe
from model.exehost import ExeHost
from model.rangemanager import RangeManager
class RangeManagerTest(unittest.TestCase):
def test_rangemanager(self):
rm = RangeManager(0, 100)
rm.add_range(0, 10)
rm.add_range(20, 30)
rm.add_range(50, 60)
hole = rm.find_hole(10)
self.assertEqual((11, 19), hole)
holes = rm.find_holes(20)
self.assertEqual([(31, 49), (61, 100)], holes)
def test_relocmanager(self):
exehost = ExeHost("data/exes/procexp64.exe")
exehost.init()
section = exehost.superpe.get_section_by_name(".rdata")
rm = exehost.get_rdata_relocmanager()
self.assertEqual(69, len(rm.intervals))
# 0x1ab0 is magic currently (should use find_first_utf16_string_offset()
#rm.add_range(section.virt_addr, section.virt_addr + 0x1AB0)
hole = rm.find_hole(20)
self.assertEqual(hole, (1167361, 1173015))