mirror of
https://github.com/dobin/SuperMega
synced 2026-06-02 17:27:10 +00:00
refactor: consolidate all three log things (cmd output, logger, files) into observer
This commit is contained in:
+2
-3
@@ -25,7 +25,6 @@ from phases.injector import verify_injected_exe
|
||||
from phases.compiler import compile_dev
|
||||
from phases.assembler import asm_to_shellcode
|
||||
from helper import run_process_checkret
|
||||
from log import MyLog
|
||||
|
||||
views = Blueprint('views', __name__)
|
||||
|
||||
@@ -111,7 +110,7 @@ def dev_build_route(name):
|
||||
asm_to_shellcode(asm_out, build_exe, shellcode_out)
|
||||
|
||||
with open(log, "w") as f:
|
||||
for log_line in MyLog.getlog():
|
||||
for log_line in observer.getlog():
|
||||
f.write("{}\n".format(log_line))
|
||||
|
||||
f.write("\n\n")
|
||||
@@ -267,7 +266,7 @@ def status_project(project_name):
|
||||
if thread_running:
|
||||
return render_template('status_project.html',
|
||||
project_name=project_name,
|
||||
logdata = "asdf")
|
||||
logdata = "\n".join(observer.get_logs()))
|
||||
else:
|
||||
return redirect("/project/{}".format(project_name), code=302)
|
||||
|
||||
|
||||
@@ -55,19 +55,19 @@ def run_process_checkret(args, check=True):
|
||||
cmd += "--- " + " ".join(args) + "\n"
|
||||
f.write(cmd.encode('utf-8'))
|
||||
if ret.stdout != None:
|
||||
observer.add_log(ret.stdout.decode('utf-8'))
|
||||
observer.add_cmd_output(ret.stdout.decode('utf-8'))
|
||||
f.write(ret.stdout)
|
||||
if ret.stderr != None:
|
||||
observer.add_log(ret.stderr.decode('utf-8'))
|
||||
observer.add_cmd_output(ret.stderr.decode('utf-8'))
|
||||
f.write(ret.stderr)
|
||||
|
||||
if ret.returncode != 0 and check:
|
||||
logger.info("----! FAILED Command: {}".format(" ".join(args)))
|
||||
if ret.stdout != None:
|
||||
observer.add_log(ret.stdout.decode('utf-8'))
|
||||
observer.add_cmd_output(ret.stdout.decode('utf-8'))
|
||||
logger.info(ret.stdout.decode('utf-8'))
|
||||
if ret.stderr != None:
|
||||
observer.add_log(ret.stderr.decode('utf-8'))
|
||||
observer.add_cmd_output(ret.stderr.decode('utf-8'))
|
||||
logger.info(ret.stderr.decode('utf-8'))
|
||||
raise Exception("Command failed: " + " ".join(args))
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
|
||||
from model.defs import *
|
||||
|
||||
from observer import observer
|
||||
|
||||
# ANSI escape sequences for colors
|
||||
class LogColors:
|
||||
@@ -34,49 +34,22 @@ class CustomFormatter(logging.Formatter):
|
||||
|
||||
|
||||
class ListHandler(logging.Handler):
|
||||
def __init__(self, log_list):
|
||||
super().__init__()
|
||||
self.log_list = log_list
|
||||
|
||||
def emit(self, record):
|
||||
# Format the log record and store it in the list
|
||||
log_entry = self.format(record)
|
||||
self.log_list.append(log_entry)
|
||||
observer.add_log(log_entry)
|
||||
|
||||
def setup_logging(level = logging.INFO):
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(level)
|
||||
|
||||
class _MyLog():
|
||||
def __init__(self):
|
||||
self.log_messages = []
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(level)
|
||||
ch.setFormatter(CustomFormatter())
|
||||
|
||||
def log(self, message):
|
||||
self.log_messages.append(message)
|
||||
list_handler = ListHandler()
|
||||
list_handler.setLevel(level)
|
||||
list_handler.setFormatter(CustomFormatter())
|
||||
|
||||
def getlog(self):
|
||||
return self.log_messages
|
||||
|
||||
def clearlog(self):
|
||||
self.log_messages.clear()
|
||||
|
||||
def setup_logging(self, level = logging.INFO):
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(level)
|
||||
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(level)
|
||||
ch.setFormatter(CustomFormatter())
|
||||
|
||||
list_handler = ListHandler(self.log_messages)
|
||||
list_handler.setLevel(level)
|
||||
list_handler.setFormatter(CustomFormatter())
|
||||
|
||||
root_logger.addHandler(ch)
|
||||
root_logger.addHandler(list_handler)
|
||||
|
||||
def writelog(self):
|
||||
# write log to file
|
||||
with open(f"{logs_dir}/supermega.log", "w") as f:
|
||||
for line in self.log_messages:
|
||||
f.write(line + "\n")
|
||||
|
||||
|
||||
MyLog: _MyLog = _MyLog()
|
||||
root_logger.addHandler(ch)
|
||||
root_logger.addHandler(list_handler)
|
||||
|
||||
+14
@@ -9,17 +9,31 @@ from model.defs import *
|
||||
|
||||
class Observer():
|
||||
def __init__(self):
|
||||
self.cmd_output = []
|
||||
self.logs = []
|
||||
self.idx = 0
|
||||
self.active = True
|
||||
|
||||
def reset(self):
|
||||
self.cmd_output = []
|
||||
self.logs = []
|
||||
self.idx = 0
|
||||
|
||||
def add_cmd_output(self, cmd_output):
|
||||
self.cmd_output.append(cmd_output)
|
||||
|
||||
def add_log(self, log):
|
||||
self.logs.append(log)
|
||||
|
||||
def get_logs(self):
|
||||
return self.logs
|
||||
|
||||
def writelog(self):
|
||||
# write log to file
|
||||
with open(f"{logs_dir}/supermega.log", "w") as f:
|
||||
for line in self.logs:
|
||||
f.write(line + "\n")
|
||||
|
||||
def add_text(self, name, data):
|
||||
self.write_to_file(name + ".txt", data)
|
||||
self.idx += 1
|
||||
|
||||
+8
-9
@@ -17,7 +17,7 @@ from sender import scannerDetectsBytes
|
||||
from model.project import Project
|
||||
from model.settings import Settings
|
||||
from model.defs import *
|
||||
from log import MyLog
|
||||
from log import setup_logging
|
||||
from utils import delete_all_files_in_directory
|
||||
|
||||
def main():
|
||||
@@ -109,7 +109,6 @@ def start(settings: Settings):
|
||||
delete_all_files_in_directory(f"{logs_dir}/")
|
||||
# And logs
|
||||
observer.reset()
|
||||
MyLog.clearlog()
|
||||
exit_code = 0 # 0 = success
|
||||
|
||||
# Load our input
|
||||
@@ -146,7 +145,7 @@ def start(settings: Settings):
|
||||
short_call_patching = project.settings.short_call_patching)
|
||||
except Exception as e:
|
||||
logger.error(f'Error compiling: {e}')
|
||||
MyLog.writelog()
|
||||
observer.writelog()
|
||||
return 1
|
||||
|
||||
# Assemble: Assemble .asm to .shc (ASM -> SHC)
|
||||
@@ -158,7 +157,7 @@ def start(settings: Settings):
|
||||
shellcode_out = main_shc_file)
|
||||
except Exception as e:
|
||||
logger.error("Error: Assembling failed: {}".format(e))
|
||||
MyLog.writelog()
|
||||
observer.writelog()
|
||||
return 2
|
||||
#shutil.copy(main_shc_file, "working/build/shellcode.bin")
|
||||
|
||||
@@ -184,11 +183,11 @@ def start(settings: Settings):
|
||||
phases.injector.inject_exe(main_shc_file, settings, project)
|
||||
except PermissionError as e:
|
||||
logger.error(f'Error writing file: {e}')
|
||||
MyLog.writelog()
|
||||
observer.writelog()
|
||||
return 2
|
||||
except Exception as e:
|
||||
logger.error(f'Error injecting: {e}')
|
||||
MyLog.writelog()
|
||||
observer.writelog()
|
||||
return 3
|
||||
|
||||
observer.add_code("exe_final", extract_code_from_exe_file_ep(settings.inject_exe_out, 300))
|
||||
@@ -202,7 +201,7 @@ def start(settings: Settings):
|
||||
scannerDetectsBytes(data, filename, useBrotli=True, verify=settings.verify)
|
||||
except Exception as e:
|
||||
logger.error(f'Error scanning: {e}')
|
||||
MyLog.writelog()
|
||||
observer.writelog()
|
||||
return 4
|
||||
else:
|
||||
# Start/verify it at the end
|
||||
@@ -219,7 +218,7 @@ def start(settings: Settings):
|
||||
if settings.cleanup_files_on_exit:
|
||||
clean_files()
|
||||
|
||||
MyLog.writelog()
|
||||
observer.writelog()
|
||||
return exit_code
|
||||
|
||||
|
||||
@@ -265,5 +264,5 @@ def verify_shellcode(shc_name):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
MyLog.setup_logging()
|
||||
setup_logging()
|
||||
main()
|
||||
|
||||
@@ -5,7 +5,7 @@ from config import config
|
||||
from model.defs import *
|
||||
|
||||
from model.settings import Settings
|
||||
from log import MyLog
|
||||
from log import setup_logging
|
||||
from supermega import start
|
||||
|
||||
|
||||
@@ -56,5 +56,5 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
MyLog.setup_logging(level=logging.WARN)
|
||||
setup_logging(level=logging.WARN)
|
||||
main()
|
||||
|
||||
@@ -6,11 +6,11 @@ from flask import Flask
|
||||
import logging
|
||||
|
||||
from app.views import views
|
||||
from log import MyLog
|
||||
from log import setup_logging
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger('werkzeug').setLevel(logging.ERROR)
|
||||
MyLog.setup_logging()
|
||||
setup_logging()
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--listenip', type=str, help='IP to listen on', default="0.0.0.0")
|
||||
parser.add_argument('--listenport', type=int, help='Port to listen on', default=5001)
|
||||
|
||||
Reference in New Issue
Block a user