refactor: split views into -project and -shcdev

This commit is contained in:
Dobin
2024-03-31 19:11:04 +01:00
parent 6a760b3496
commit b8ab7d02b9
4 changed files with 375 additions and 351 deletions
-351
View File
@@ -1,366 +1,15 @@
from flask import Flask, Blueprint, current_app, request, redirect, url_for, render_template, send_file, make_response, session, escape, jsonify
from threading import Thread
from werkzeug.utils import secure_filename
import os
import logging
from typing import List, Tuple
from pygments import highlight
from pygments.lexers import CLexer, NasmLexer, DiffLexer, HexdumpLexer
from pygments.formatters import HtmlFormatter
import difflib
from ansi2html import Ansi2HTMLConverter
import subprocess
from datetime import datetime
from observer import observer
from config import config
from model.settings import Settings
from model.defs import *
from supermega import start
from app.storage import storage, WebProject
from sender import scannerDetectsBytes
from phases.injector import verify_injected_exe
from phases.compiler import compile_dev
from phases.assembler import asm_to_shellcode
from helper import run_process_checkret, clean_tmp_files
from model.project import prepare_project
views = Blueprint('views', __name__)
conv = Ansi2HTMLConverter()
config.load()
thread_running = False
logger = logging.getLogger("Views")
@views.route("/")
def index():
return render_template('index.html')
@views.route("/projects")
def projects_route():
projects = storage.get_projects()
return render_template('projects.html', projects=projects)
@views.route("/shcdev")
def devs_route():
data = []
for filename in os.listdir(PATH_PAYLOAD):
file_path = PATH_PAYLOAD + filename
creation_time = os.path.getctime(file_path)
readable_time = datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S')
data.append({
"name": filename,
"date": readable_time,
})
return render_template('devs.html', data=data)
@views.route("/shcdev/<name>")
def dev_route(name):
data = []
log = ""
path = PATH_PAYLOAD + name
for filename in os.listdir(path):
filepath = path + "/" + filename
creation_time = os.path.getmtime(filepath)
readable_time = datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S')
info = ""
if filename.endswith(".asm"):
info = "text assembly (cleaned, from compiled .c)"
elif filename.endswith(".bin"):
info = "generated shellcode (from .exe)"
elif filename.endswith(".c"):
info = "input C code"
elif filename.endswith(".exe"):
info = "temporary shellcode holder (from .c)"
elif filename.endswith("cmdoutput.log"):
info = "command output"
with open(path + "/" + filename, "r") as f:
log += f.read() + "\n-----------------------------------\n"
elif filename.endswith("supermega.log"):
info = "supermega logging output"
with open(path + "/" + filename, "r") as f:
log += f.read()
data.append({
"name": filename,
"date": readable_time,
"info": info,
})
return render_template('dev.html',
name=name, files=data, log=log)
@views.route("/shcdev/<name>/build")
def dev_build_route(name):
c_in = PATH_PAYLOAD + "{}/main.c".format(name)
asm_out = PATH_PAYLOAD + "{}/main.asm".format(name)
build_exe = PATH_PAYLOAD + "{}/main.exe".format(name)
shellcode_out = PATH_PAYLOAD + "{}/main.bin".format(name)
compile_dev(c_in, asm_out)
asm_to_shellcode(asm_out, build_exe, shellcode_out)
observer.write_logs(PATH_PAYLOAD + "{}/".format(name))
clean_tmp_files()
return redirect("/shcdev/{}".format(name), code=302)
@views.route("/project/<name>")
def project(name):
project = storage.get_project(name)
if project == None:
return redirect("/projects", code=302)
log_files = get_logfiles(project.settings.main_dir)
exes = []
for file in os.listdir(PATH_EXES):
exes.append(file)
shellcodes = []
for file in os.listdir(PATH_SHELLCODES):
shellcodes.append(file)
sourcestyles = [(color.name, color.value) for color in SourceStyle]
allocstyles = [(color.name, color.value) for color in AllocStyle]
decoderstyles = [(color.name, color.value) for color in DecoderStyle]
execstyles = [(color.name, color.value) for color in ExecStyle]
injectstyles = [(color.name, color.value) for color in InjectStyle]
return render_template('project.html',
project_name = name,
project=project,
exes=exes,
shellcodes=shellcodes,
sourcestyles=sourcestyles,
allocstyles=allocstyles,
decoderstyles=decoderstyles,
execstyles=execstyles,
injectstyles=injectstyles,
log_files=log_files,
)
@views.route("/project_add", methods=['POST', 'GET'])
def add_project():
if request.method == 'POST':
settings = Settings()
project_name = request.form['project_name']
comment = request.form['comment']
settings.payload_path = PATH_SHELLCODES + request.form['shellcode']
if request.form['shellcode'] == "createfile.bin":
settings.verify = True
settings.try_start_final_infected_exe = False
else:
settings.cleanup_files_on_exit = False
settings.inject_exe_in = PATH_EXES + request.form['exe']
settings.inject_exe_out = PATH_EXES + request.form['exe'].replace(".exe", ".infected.exe")
source_style = request.form['source_style']
settings.source_style = SourceStyle[source_style]
alloc_style = request.form['alloc_style']
settings.alloc_style = AllocStyle[alloc_style]
decoder_style = request.form['decoder_style']
settings.decoder_style = DecoderStyle[decoder_style]
if storage.get_project(project_name) != None:
# overwrite project
project = storage.get_project(project_name)
project.settings = settings
project.comment = comment
storage.save_project(project)
else:
# add new project
project = WebProject(project_name, settings)
project.comment = comment
storage.add_project(project)
return redirect("/project/{}".format(project_name), code=302)
else: # GET
exes = []
for file in os.listdir(PATH_EXES):
exes.append(file)
shellcodes = []
for file in os.listdir(PATH_SHELLCODES):
shellcodes.append(file)
sourcestyles = [(color.name, color.value) for color in SourceStyle]
allocstyles = [(color.name, color.value) for color in AllocStyle]
decoderstyles = [(color.name, color.value) for color in DecoderStyle]
execstyles = [(color.name, color.value) for color in ExecStyle]
injectstyles = [(color.name, color.value) for color in InjectStyle]
return render_template('project_add_get.html',
exes=exes,
shellcodes=shellcodes,
sourcestyles=sourcestyles,
allocstyles=allocstyles,
decoderstyles=decoderstyles,
execstyles=execstyles,
injectstyles=injectstyles,
)
def supermega_thread(settings: Settings):
global thread_running
start(settings)
thread_running = False
@views.route("/project/<project_name>/build", methods=['POST', 'GET'])
def build_project(project_name):
global thread_running
project = storage.get_project(project_name)
project.settings.try_start_final_infected_exe = False
prepare_project(project_name, project.settings)
thread = Thread(target=supermega_thread, args=(project.settings, ))
thread.start()
thread_running = True
return redirect("/project/{}/status".format(project_name), code=302)
@views.route("/project/<project_name>/status")
def status_project(project_name):
global thread_running
if thread_running:
return render_template('status_project.html',
project_name=project_name,
logdata = "\n".join(observer.get_logs()))
else:
return redirect("/project/{}".format(project_name), code=302)
@views.route("/project/<project_name>/exec", methods=['POST', 'GET'])
def start_project(project_name):
project = storage.get_project(project_name)
if project == None:
return redirect("/", code=302)
remote = False
remote_arg = request.args.get('remote')
if remote_arg == "true":
remote = True
no_exec = False
no_exec_arg = request.args.get('no_exec')
if no_exec_arg == "true":
no_exec = True
logger.info("--[ Exec project: {} remote: {} no_exec: {}".format(project_name, remote, no_exec))
if remote:
logger.info("--[ Exec {} on server {}".format(project.project_exe, config.get("avred_server")))
filepath = "{}/{}".format(project.project_dir, project.project_exe)
with open(filepath, "rb") as f:
data = f.read()
try:
scannerDetectsBytes(data,
project.project_exe,
useBrotli=True,
verify=project.settings.verify,
no_exec=no_exec)
except Exception as e:
logger.error(f'Error scanning: {e}')
return jsonify({
"exception": str(e)
}), 500
else:
# Start/verify it at the end
if project.settings.verify:
logger.info("--[ Verify infected exe")
exit_code = verify_injected_exe(project.settings.inject_exe_out)
elif no_exec == False:
logger.info("--[ Start infected exe: {}".format(project.settings.inject_exe_out))
run_process_checkret([
project.settings.inject_exe_out,
], check=False)
elif no_exec == True:
dirname = os.path.dirname(os.path.abspath(project.settings.inject_exe_out))
logger.info("--[ Open folder: {}".format(dirname))
subprocess.run(['explorer', dirname])
return redirect("/project/{}".format(project_name), code=302)
def get_logfiles(directory):
log_files = []
id = 0
asm_a = "" # for diff
asm_b = ""
for file in os.listdir(f"{directory}/"):
if file.startswith("."):
continue
if not file.startswith("log-"):
continue
if file.endswith(".bin"):
continue
with open(os.path.join(f"{directory}/", file), "r") as f:
data = f.read()
if 'main_c' in file:
data = highlight(data, CLexer(), HtmlFormatter(full=False))
elif '_asm_' in file:
# handle special cases
if '_orig' in file:
asm_a = data
if '_updated' in file:
asm_b = data
data = highlight(data, NasmLexer(), HtmlFormatter(full=False))
elif '.ascii' in file:
data = conv.convert(data, full=False)
elif '.txt' in file:
continue # skip it
elif '.hex' in file:
continue # skip it
#data = escape(data)
#data = highlight(data, HexdumpLexer(), HtmlFormatter(full=False))
elif '.log' in file:
data = conv.convert(data, full=False)
else:
data = escape(data)
entry = {
"name": file,
"id": str(id),
"content": data,
}
log_files.append(entry)
id += 1
# more
if asm_a != "" and asm_b != "":
# do the diff from the content of the two files
a = asm_a.splitlines()
b = asm_b.splitlines()
diff_generator = difflib.unified_diff(a, b, lineterm='')
diff_string = '\n'.join(diff_generator)
diff_l = highlight(diff_string, DiffLexer(), HtmlFormatter(full=False))
entry = {
"name": "Summary: ASM Diff".format(),
"id": str(id),
"content": diff_l,
}
log_files.append(entry)
id += 1
#asm_a = ""
asm_b = ""
return log_files
+288
View File
@@ -0,0 +1,288 @@
from flask import Flask, Blueprint, current_app, request, redirect, url_for, render_template, send_file, make_response, session, escape, jsonify
from threading import Thread
import os
import logging
from typing import List, Tuple
from pygments import highlight
from pygments.lexers import CLexer, NasmLexer, DiffLexer, HexdumpLexer
from pygments.formatters import HtmlFormatter
import difflib
import subprocess
from ansi2html import Ansi2HTMLConverter
from observer import observer
from config import config
from model.settings import Settings
from model.defs import *
from supermega import start
from app.storage import storage, WebProject
from sender import scannerDetectsBytes
from phases.injector import verify_injected_exe
from helper import run_process_checkret
from model.project import prepare_project
logger = logging.getLogger("ViewsProjects")
views_project = Blueprint('views_project', __name__)
conv = Ansi2HTMLConverter()
config.load()
thread_running = False
@views_project.route("/projects")
def projects_route():
projects = storage.get_projects()
return render_template('projects.html', projects=projects)
@views_project.route("/project/<name>")
def project(name):
project = storage.get_project(name)
if project == None:
return redirect("/projects", code=302)
log_files = get_logfiles(project.settings.main_dir)
exes = []
for file in os.listdir(PATH_EXES):
exes.append(file)
shellcodes = []
for file in os.listdir(PATH_SHELLCODES):
shellcodes.append(file)
sourcestyles = [(color.name, color.value) for color in SourceStyle]
allocstyles = [(color.name, color.value) for color in AllocStyle]
decoderstyles = [(color.name, color.value) for color in DecoderStyle]
execstyles = [(color.name, color.value) for color in ExecStyle]
injectstyles = [(color.name, color.value) for color in InjectStyle]
return render_template('project.html',
project_name = name,
project=project,
exes=exes,
shellcodes=shellcodes,
sourcestyles=sourcestyles,
allocstyles=allocstyles,
decoderstyles=decoderstyles,
execstyles=execstyles,
injectstyles=injectstyles,
log_files=log_files,
)
@views_project.route("/project_add", methods=['POST', 'GET'])
def add_project():
if request.method == 'POST':
settings = Settings()
project_name = request.form['project_name']
comment = request.form['comment']
settings.payload_path = PATH_SHELLCODES + request.form['shellcode']
if request.form['shellcode'] == "createfile.bin":
settings.verify = True
settings.try_start_final_infected_exe = False
else:
settings.cleanup_files_on_exit = False
settings.inject_exe_in = PATH_EXES + request.form['exe']
settings.inject_exe_out = PATH_EXES + request.form['exe'].replace(".exe", ".infected.exe")
source_style = request.form['source_style']
settings.source_style = SourceStyle[source_style]
alloc_style = request.form['alloc_style']
settings.alloc_style = AllocStyle[alloc_style]
decoder_style = request.form['decoder_style']
settings.decoder_style = DecoderStyle[decoder_style]
if storage.get_project(project_name) != None:
# overwrite project
project = storage.get_project(project_name)
project.settings = settings
project.comment = comment
storage.save_project(project)
else:
# add new project
project = WebProject(project_name, settings)
project.comment = comment
storage.add_project(project)
return redirect("/project/{}".format(project_name), code=302)
else: # GET
exes = []
for file in os.listdir(PATH_EXES):
exes.append(file)
shellcodes = []
for file in os.listdir(PATH_SHELLCODES):
shellcodes.append(file)
sourcestyles = [(color.name, color.value) for color in SourceStyle]
allocstyles = [(color.name, color.value) for color in AllocStyle]
decoderstyles = [(color.name, color.value) for color in DecoderStyle]
execstyles = [(color.name, color.value) for color in ExecStyle]
injectstyles = [(color.name, color.value) for color in InjectStyle]
return render_template('project_add_get.html',
exes=exes,
shellcodes=shellcodes,
sourcestyles=sourcestyles,
allocstyles=allocstyles,
decoderstyles=decoderstyles,
execstyles=execstyles,
injectstyles=injectstyles,
)
def supermega_thread(settings: Settings):
global thread_running
start(settings)
thread_running = False
@views_project.route("/project/<project_name>/build", methods=['POST', 'GET'])
def build_project(project_name):
global thread_running
project = storage.get_project(project_name)
project.settings.try_start_final_infected_exe = False
prepare_project(project_name, project.settings)
thread = Thread(target=supermega_thread, args=(project.settings, ))
thread.start()
thread_running = True
return redirect("/project/{}/status".format(project_name), code=302)
@views_project.route("/project/<project_name>/status")
def status_project(project_name):
global thread_running
if thread_running:
return render_template('status_project.html',
project_name=project_name,
logdata = "\n".join(observer.get_logs()))
else:
return redirect("/project/{}".format(project_name), code=302)
@views_project.route("/project/<project_name>/exec", methods=['POST', 'GET'])
def start_project(project_name):
project = storage.get_project(project_name)
if project == None:
return redirect("/", code=302)
remote = False
remote_arg = request.args.get('remote')
if remote_arg == "true":
remote = True
no_exec = False
no_exec_arg = request.args.get('no_exec')
if no_exec_arg == "true":
no_exec = True
logger.info("--[ Exec project: {} remote: {} no_exec: {}".format(project_name, remote, no_exec))
if remote:
logger.info("--[ Exec {} on server {}".format(project.project_exe, config.get("avred_server")))
filepath = "{}/{}".format(project.project_dir, project.project_exe)
with open(filepath, "rb") as f:
data = f.read()
try:
scannerDetectsBytes(data,
project.project_exe,
useBrotli=True,
verify=project.settings.verify,
no_exec=no_exec)
except Exception as e:
logger.error(f'Error scanning: {e}')
return jsonify({
"exception": str(e)
}), 500
else:
# Start/verify it at the end
if project.settings.verify:
logger.info("--[ Verify infected exe")
exit_code = verify_injected_exe(project.settings.inject_exe_out)
elif no_exec == False:
logger.info("--[ Start infected exe: {}".format(project.settings.inject_exe_out))
run_process_checkret([
project.settings.inject_exe_out,
], check=False)
elif no_exec == True:
dirname = os.path.dirname(os.path.abspath(project.settings.inject_exe_out))
logger.info("--[ Open folder: {}".format(dirname))
subprocess.run(['explorer', dirname])
return redirect("/project/{}".format(project_name), code=302)
def get_logfiles(directory):
log_files = []
id = 0
asm_a = "" # for diff
asm_b = ""
for file in os.listdir(f"{directory}/"):
if file.startswith("."):
continue
if not file.startswith("log-"):
continue
if file.endswith(".bin"):
continue
with open(os.path.join(f"{directory}/", file), "r") as f:
data = f.read()
if 'main_c' in file:
data = highlight(data, CLexer(), HtmlFormatter(full=False))
elif '_asm_' in file:
# handle special cases
if '_orig' in file:
asm_a = data
if '_updated' in file:
asm_b = data
data = highlight(data, NasmLexer(), HtmlFormatter(full=False))
elif '.ascii' in file:
data = conv.convert(data, full=False)
elif '.txt' in file:
continue # skip it
elif '.hex' in file:
continue # skip it
#data = escape(data)
#data = highlight(data, HexdumpLexer(), HtmlFormatter(full=False))
elif '.log' in file:
data = conv.convert(data, full=False)
else:
data = escape(data)
entry = {
"name": file,
"id": str(id),
"content": data,
}
log_files.append(entry)
id += 1
# more
if asm_a != "" and asm_b != "":
# do the diff from the content of the two files
a = asm_a.splitlines()
b = asm_b.splitlines()
diff_generator = difflib.unified_diff(a, b, lineterm='')
diff_string = '\n'.join(diff_generator)
diff_l = highlight(diff_string, DiffLexer(), HtmlFormatter(full=False))
entry = {
"name": "Summary: ASM Diff".format(),
"id": str(id),
"content": diff_l,
}
log_files.append(entry)
id += 1
#asm_a = ""
asm_b = ""
return log_files
+83
View File
@@ -0,0 +1,83 @@
from flask import Flask, Blueprint, current_app, request, redirect, url_for, render_template, send_file, make_response, session, escape, jsonify
from threading import Thread
import os
import logging
from typing import List, Tuple
from datetime import datetime
from observer import observer
from model.defs import *
from supermega import start
from phases.compiler import compile_dev
from phases.assembler import asm_to_shellcode
from helper import clean_tmp_files
views_shcdev = Blueprint('views_shcdev', __name__)
logger = logging.getLogger("ViewsShcdev")
@views_shcdev.route("/shcdev")
def devs_route():
data = []
for filename in os.listdir(PATH_PAYLOAD):
file_path = PATH_PAYLOAD + filename
creation_time = os.path.getctime(file_path)
readable_time = datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S')
data.append({
"name": filename,
"date": readable_time,
})
return render_template('devs.html', data=data)
@views_shcdev.route("/shcdev/<name>")
def dev_route(name):
data = []
log = ""
path = PATH_PAYLOAD + name
for filename in os.listdir(path):
filepath = path + "/" + filename
creation_time = os.path.getmtime(filepath)
readable_time = datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S')
info = ""
if filename.endswith(".asm"):
info = "text assembly (cleaned, from compiled .c)"
elif filename.endswith(".bin"):
info = "generated shellcode (from .exe)"
elif filename.endswith(".c"):
info = "input C code"
elif filename.endswith(".exe"):
info = "temporary shellcode holder (from .c)"
elif filename.endswith("cmdoutput.log"):
info = "command output"
with open(path + "/" + filename, "r") as f:
log += f.read() + "\n-----------------------------------\n"
elif filename.endswith("supermega.log"):
info = "supermega logging output"
with open(path + "/" + filename, "r") as f:
log += f.read()
data.append({
"name": filename,
"date": readable_time,
"info": info,
})
return render_template('dev.html',
name=name, files=data, log=log)
@views_shcdev.route("/shcdev/<name>/build")
def dev_build_route(name):
c_in = PATH_PAYLOAD + "{}/main.c".format(name)
asm_out = PATH_PAYLOAD + "{}/main.asm".format(name)
build_exe = PATH_PAYLOAD + "{}/main.exe".format(name)
shellcode_out = PATH_PAYLOAD + "{}/main.bin".format(name)
compile_dev(c_in, asm_out)
asm_to_shellcode(asm_out, build_exe, shellcode_out)
observer.write_logs(PATH_PAYLOAD + "{}/".format(name))
clean_tmp_files()
return redirect("/shcdev/{}".format(name), code=302)
+4
View File
@@ -6,6 +6,8 @@ from flask import Flask
import logging
from app.views import views
from app.views_project import views_project
from app.views_shcdev import views_shcdev
from log import setup_logging
if __name__ == "__main__":
@@ -33,4 +35,6 @@ if __name__ == "__main__":
app.config.from_prefixed_env()
app.register_blueprint(views)
app.register_blueprint(views_project)
app.register_blueprint(views_shcdev)
app.run(host=args.listenip, port=args.listenport, debug=args.debug)