refactor: move capstone/keystone into its own file

This commit is contained in:
Dobin
2024-05-10 12:02:24 +01:00
parent 900c145557
commit 696599cb64
4 changed files with 70 additions and 80 deletions
+62
View File
@@ -0,0 +1,62 @@
import sys
import pefile
import pprint
from keystone import Ks, KS_ARCH_X86, KS_MODE_64
from capstone import Cs, CS_ARCH_X86, CS_MODE_64, CS_MODE_LITTLE_ENDIAN
import logging
from model.defs import *
logger = logging.getLogger("AsmDisasm")
cs = Cs(CS_ARCH_X86, CS_MODE_64 + CS_MODE_LITTLE_ENDIAN)
cs.detail = True # from RedBackdoorer
ks = Ks(KS_ARCH_X86, KS_MODE_64)
def assemble_lea(current_address: int, destination_address: int, reg: str) -> bytes:
#print("LEAH: 0x{:X} - 0x{:X} = 0x{:X}".format(
# current_address, destination_address, destination_address - current_address))
offset = destination_address - current_address
encoding, _ = ks.asm(f"lea {reg}, qword ptr ds:[{offset}]")
machine_code = bytes(encoding)
return machine_code
def assemble_relative_call(current_address: int, destination_address: int) -> bytes:
# Calculate the relative offset
# For a near jump, the instruction length is typically 5 bytes (E9 xx xx xx xx)
offset = destination_address - current_address
# Assemble the jump instruction using Keystone
encoding, _ = ks.asm(f"call qword ptr ds:[{offset}]")
machine_code = bytes(encoding)
# Disassemble the machine code using Capstone
#cs = Cs(CS_ARCH_X86, CS_MODE_64)
#disassembled = next(cs.disasm(machine_code, current_address))
#logger.info(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}")
#logger.info(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}")
return machine_code
def assemble_relative_jmp(current_address: int, destination_address: int) -> bytes:
offset = destination_address - current_address
encoding, _ = ks.asm(f"jmp {offset}")
machine_code = bytes(encoding)
return machine_code
def asm_disasm(asm_text, offset=0):
for instr in cs.disasm(asm_text, offset):
printInstr(instr)
def printInstr(instr, depth=0):
_bytes = [f'{x:02x}' for x in instr.bytes[:8]]
if len(instr.bytes) < 8:
_bytes.extend([' ',] * (8 - len(instr.bytes)))
instrBytes = ' '.join([f'{x}' for x in _bytes])
logger.info('\t' * 1 + f' [{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}')
+6 -24
View File
@@ -13,7 +13,7 @@ from intervaltree import *
from utils import hexdump from utils import hexdump
from pe.superpe import SuperPe from pe.superpe import SuperPe
from model.defs import * from model.defs import *
from pe.pehelper import assemble_relative_call, assemble_relative_jmp from pe.asmdisasm import assemble_relative_jmp, asm_disasm, cs, ks, printInstr
logger = logging.getLogger("DerBackdoorer") logger = logging.getLogger("DerBackdoorer")
@@ -28,10 +28,6 @@ class FunctionBackdoorer:
def __init__(self, superpe: SuperPe, depth_option=DEPTH_OPTIONS.LEVEL1): def __init__(self, superpe: SuperPe, depth_option=DEPTH_OPTIONS.LEVEL1):
self.superpe: SuperPe = superpe self.superpe: SuperPe = superpe
self.pe_data = self.superpe.pe.get_memory_mapped_image() self.pe_data = self.superpe.pe.get_memory_mapped_image()
self.cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64 + capstone.CS_MODE_LITTLE_ENDIAN)
self.ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64 + keystone.KS_MODE_LITTLE_ENDIAN)
self.cs.detail = True
self.depth_option: DEPTH_OPTIONS = depth_option self.depth_option: DEPTH_OPTIONS = depth_option
@@ -60,12 +56,7 @@ class FunctionBackdoorer:
# Show Result # Show Result
logger.info("--[ Patched result of function: ".format()) logger.info("--[ Patched result of function: ".format())
data = self.pe_data[function_addr:addr+len(compiled_trampoline)] data = self.pe_data[function_addr:addr+len(compiled_trampoline)]
self.asm_disasm(data, offset=function_addr) asm_disasm(data, offset=function_addr)
def asm_disasm(self, asm_text, offset=0):
for instr in self.cs.disasm(asm_text, offset):
self.printInstr(instr, 0)
def find_suitable_instruction_addr(self, startOffset, length=256): def find_suitable_instruction_addr(self, startOffset, length=256):
@@ -90,8 +81,8 @@ class FunctionBackdoorer:
def _find_suitable_instruction_addr(self, startOffset, length, option): def _find_suitable_instruction_addr(self, startOffset, length, option):
# iterate through every instruction. starting from startOffset # iterate through every instruction. starting from startOffset
data = self.pe_data[startOffset:startOffset + length] data = self.pe_data[startOffset:startOffset + length]
for instr in self.cs.disasm(data, startOffset): for instr in cs.disasm(data, startOffset):
self.printInstr(instr, 0) printInstr(instr, 0)
if instr.mnemonic.lower() in ['ret']: if instr.mnemonic.lower() in ['ret']:
return None return None
@@ -124,7 +115,7 @@ class FunctionBackdoorer:
full_shellcode_addr = shellcode_addr + self.superpe.pe.OPTIONAL_HEADER.ImageBase full_shellcode_addr = shellcode_addr + self.superpe.pe.OPTIONAL_HEADER.ImageBase
enc, count = self.ks.asm(f'MOV {reg}, 0x{full_shellcode_addr:X}') enc, count = self.ks.asm(f'MOV {reg}, 0x{full_shellcode_addr:X}')
for instr2 in self.cs.disasm(bytes(enc), 0): for instr2 in cs.disasm(bytes(enc), 0):
addrOffset = len(instr2.bytes) - instr2.addr_size addrOffset = len(instr2.bytes) - instr2.addr_size
break break
@@ -141,16 +132,7 @@ class FunctionBackdoorer:
]) ])
trampoline_text = f'MOV {reg}, 0x{full_shellcode_addr:X} ; {jump}' trampoline_text = f'MOV {reg}, 0x{full_shellcode_addr:X} ; {jump}'
trampoline_compiled, count = self.ks.asm(trampoline_text) trampoline_compiled, count = ks.asm(trampoline_text)
return trampoline_compiled, trampoline_text, addrOffset return trampoline_compiled, trampoline_text, addrOffset
def printInstr(self, instr, depth=0):
_bytes = [f'{x:02x}' for x in instr.bytes[:8]]
if len(instr.bytes) < 8:
_bytes.extend([' ',] * (8 - len(instr.bytes)))
instrBytes = ' '.join([f'{x}' for x in _bytes])
logger.info('\t' * 1 + f'[{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}')
-55
View File
@@ -1,8 +1,4 @@
import sys
import pefile import pefile
import pprint
from keystone import Ks, KS_ARCH_X86, KS_MODE_64
from capstone import Cs, CS_ARCH_X86, CS_MODE_64, CS_MODE_LITTLE_ENDIAN
import logging import logging
from model.defs import * from model.defs import *
@@ -66,57 +62,6 @@ def get_code_section(pe: pefile.PE) -> pefile.SectionStructure:
raise Exception("pehelper::get_code_section(): Code section not found") raise Exception("pehelper::get_code_section(): Code section not found")
# keystone/capstone stuff
cs = Cs(CS_ARCH_X86, CS_MODE_64 + CS_MODE_LITTLE_ENDIAN)
def assemble_lea(current_address: int, destination_address: int, reg: str) -> bytes:
#print("LEAH: 0x{:X} - 0x{:X} = 0x{:X}".format(
# current_address, destination_address, destination_address - current_address))
offset = destination_address - current_address
ks = Ks(KS_ARCH_X86, KS_MODE_64)
encoding, _ = ks.asm(f"lea {reg}, qword ptr ds:[{offset}]")
machine_code = bytes(encoding)
return machine_code
def assemble_relative_call(current_address: int, destination_address: int) -> bytes:
# Calculate the relative offset
# For a near jump, the instruction length is typically 5 bytes (E9 xx xx xx xx)
offset = destination_address - current_address
# Assemble the jump instruction using Keystone
ks = Ks(KS_ARCH_X86, KS_MODE_64)
encoding, _ = ks.asm(f"call qword ptr ds:[{offset}]")
machine_code = bytes(encoding)
# Disassemble the machine code using Capstone
#cs = Cs(CS_ARCH_X86, CS_MODE_64)
#disassembled = next(cs.disasm(machine_code, current_address))
#logger.info(f"Machine Code: {' '.join(f'{byte:02x}' for byte in machine_code)}")
#logger.info(f"Disassembled: {disassembled.mnemonic} {disassembled.op_str}")
return machine_code
def assemble_relative_jmp(current_address: int, destination_address: int) -> bytes:
offset = destination_address - current_address
ks = Ks(KS_ARCH_X86, KS_MODE_64)
encoding, _ = ks.asm(f"jmp {offset}")
machine_code = bytes(encoding)
return machine_code
def asm_disasm(asm_text, offset=0):
for instr in cs.disasm(asm_text, offset):
printInstr(instr)
def printInstr(instr, depth=0):
_bytes = [f'{x:02x}' for x in instr.bytes[:8]]
if len(instr.bytes) < 8:
_bytes.extend([' ',] * (8 - len(instr.bytes)))
instrBytes = ' '.join([f'{x}' for x in _bytes])
logger.info('\t' * 1 + f' [{instr.address:08x}]\t{instrBytes}' + '\t' * depth + f'{instr.mnemonic}\t{instr.op_str}')
## Utils ## Utils
def remove_trailing_null_bytes(data: bytes) -> bytes: def remove_trailing_null_bytes(data: bytes) -> bytes:
+1
View File
@@ -11,6 +11,7 @@ from pe.derbackdoorer import FunctionBackdoorer
from pe.superpe import SuperPe from pe.superpe import SuperPe
from model.project import Project from model.project import Project
from model.settings import Settings from model.settings import Settings
from pe.asmdisasm import *
logger = logging.getLogger("Injector") logger = logging.getLogger("Injector")