pre-commit black & flake8

This commit is contained in:
Evan Reichard 2021-03-18 09:01:07 -04:00
parent 2e0284c33e
commit 042f83a035
11 changed files with 116 additions and 47 deletions

12
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,12 @@
repos:
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
language_version: python3.9
files: '^src/overseer/|^setup.py'
- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.0
hooks:
- id: flake8
files: '^src/overseer/|^setup.py'

View File

@ -6,17 +6,14 @@ setup(
author="Evan Reichard", author="Evan Reichard",
version="0.0.1", version="0.0.1",
packages=find_packages("src"), packages=find_packages("src"),
package_dir= {'': 'src'}, package_dir={"": "src"},
zip_safe=False, zip_safe=False,
include_package_data=True, include_package_data=True,
entry_points = { entry_points={"console_scripts": ["overseer = overseer:cli"]},
"console_scripts": [
"overseer = overseer:cli"
]
},
install_requires=[ install_requires=[
"Flask", "Flask",
"flask_socketio", "flask_socketio",
"sqlalchemy", "sqlalchemy",
], ],
extras_require={"dev": ["pre-commit", "black", "flake8"]},
) )

View File

@ -6,11 +6,14 @@ from flask.cli import FlaskGroup
app = Flask(__name__) app = Flask(__name__)
config = EnvConfig() config = EnvConfig()
def create_app(): def create_app():
return app return app
@click.group(cls=FlaskGroup, create_app=create_app) @click.group(cls=FlaskGroup, create_app=create_app)
def cli(): def cli():
"""Management script for the Wiki application.""" """Management script for the Wiki application."""
import overseer.overseer
import overseer.overseer # noqa: E501,F401,E402

View File

@ -1,3 +1,4 @@
from flask import Blueprint from flask import Blueprint
api = Blueprint('v1', __name__, url_prefix="/api/v1")
from overseer.api.v1 import routes, events api = Blueprint("v1", __name__, url_prefix="/api/v1")
from overseer.api.v1 import routes, events # noqa: F401,E402

View File

@ -3,10 +3,12 @@ from flask_socketio import SocketIO
socketio = SocketIO(app, path="/api/v1/socket.io") socketio = SocketIO(app, path="/api/v1/socket.io")
@socketio.on("message") @socketio.on("message")
def handle_message(data): def handle_message(data):
print("RAW DATA: %s" % data) print("RAW DATA: %s" % data)
@socketio.on("json") @socketio.on("json")
def handle_json(json): def handle_json(json):
print("JSON DATA: %s" % json) print("JSON DATA: %s" % json)

View File

@ -1,9 +1,11 @@
from overseer.api.v1 import api from overseer.api.v1 import api
@api.route('/status', methods=["GET"])
@api.route("/status", methods=["GET"])
def status(): def status():
return "STATUS PLACEHOLDER" return "STATUS PLACEHOLDER"
@api.route('/scan', methods=["GET"])
@api.route("/scan", methods=["GET"])
def scan(): def scan():
return "SCAN PLACEHOLDER" return "SCAN PLACEHOLDER"

View File

@ -1,9 +1,11 @@
import os import os
def get_env(key, default=None, required=False): def get_env(key, default=None, required=False):
if required: if required:
assert key in os.environ, "Missing Environment Variable: %s" % key assert key in os.environ, "Missing Environment Variable: %s" % key
return os.environ.get(key, default) return os.environ.get(key, default)
class EnvConfig: class EnvConfig:
DATABASE = get_env("OVERSEER_DB", default="sqlite") DATABASE = get_env("OVERSEER_DB", default="sqlite")

View File

@ -2,58 +2,91 @@ import models
import ipaddress import ipaddress
from datetime import datetime from datetime import datetime
from os import path from os import path
from sqlalchemy import create_engine, or_, insert, Table, Column, Integer, String, ForeignKey, DateTime from sqlalchemy import (
from sqlalchemy.orm import declarative_base, Session create_engine,
or_,
)
from sqlalchemy.orm import Session
class DatabaseConnector: class DatabaseConnector:
def __init__(self, data_path, in_memory=False): def __init__(self, data_path, in_memory=False):
if in_memory: if in_memory:
self.engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True) self.engine = create_engine(
"sqlite+pysqlite:///:memory:", echo=True, future=True
)
else: else:
self.engine = create_engine("sqlite+pysqlite:///%s" % path.join(data_path, "overseer.sqlite"), echo=True, future=True) db_path = path.join(data_path, "overseer.sqlite")
self.engine = create_engine(
"sqlite+pysqlite:///%s" % db_path,
echo=True,
future=True,
)
models.Base.metadata.create_all(self.engine) models.Base.metadata.create_all(self.engine)
def create_scan_result(self, ip_address, scan_results, hostname=None): def create_scan_result(self, ip_addr, scan_results, hostname=None):
int_ip_address = int(ipaddress.ip_address(ip_address)) int_ip_addr = int(ipaddress.ip_addr(ip_addr))
session = Session(bind=self.engine) session = Session(bind=self.engine)
# Does an existing target exist? # Does an existing target exist?
scan_target = session.query(models.ScanTarget).filter(or_( scan_target = (
models.ScanTarget.ip==int_ip_address, session.query(models.ScanTarget)
.filter(
or_(
models.ScanTarget.ip == int_ip_addr,
models.ScanTarget.hostname == hostname, models.ScanTarget.hostname == hostname,
)).first() )
)
.first()
)
# TODO: Do we need to update hostname? # TODO: Do we need to update hostname?
# Nope, create one # Nope, create one
if not scan_target: if not scan_target:
scan_target = models.ScanTarget(ip=int_ip_address, hostname=hostname) scan_target = models.ScanTarget(ip=int_ip_addr, hostname=hostname)
session.add(scan_target) session.add(scan_target)
session.commit() session.commit()
# Create scan history # Create scan history
scan_history = models.ScanHistory(target_id=scan_target.id, results=",".join(map(str, scan_results)), datetime=datetime.now()) scan_history = models.ScanHistory(
target_id=scan_target.id,
results=",".join(map(str, scan_results)),
datetime=datetime.now(),
)
session.add(scan_history) session.add(scan_history)
session.commit() session.commit()
session.close() session.close()
def get_scan_results(self, **kwargs): def get_scan_results(self, **kwargs):
if len(kwargs.keys() & {'ip_address', 'hostname'}) != 1: """Returns scan results for the queries target.
raise ValueError('Missing keyword argument: ip_address or hostname')
Keyword arguments:
hostname -- The hostname of the target.
ip_addr -- The IP address of the target.
"""
if len(kwargs.keys() & {"ip_addr", "hostname"}) != 1:
raise ValueError("Missing keyword argument: ip_addr or hostname")
hostname = kwargs["hostname"] if "hostname" in kwargs else None hostname = kwargs["hostname"] if "hostname" in kwargs else None
ip_address = kwargs["ip_address"] if "ip_address" in kwargs else None ip_addr = kwargs["ip_addr"] if "ip_addr" in kwargs else None
int_ip_address = int(ipaddress.ip_address(ip_address)) if ip_address else None int_ip_addr = int(ipaddress.ip_addr(ip_addr)) if ip_addr else None
session = Session(bind=self.engine) session = Session(bind=self.engine)
# Get all scan histories # Get all scan histories
scan_histories = session.query(models.ScanHistory).join(models.ScanHistory.target).filter(or_( scan_histories = (
models.ScanTarget.ip==int_ip_address, session.query(models.ScanHistory)
.join(models.ScanHistory.target)
.filter(
or_(
models.ScanTarget.ip == int_ip_addr,
models.ScanTarget.hostname == hostname, models.ScanTarget.hostname == hostname,
)).all() )
)
.all()
)
session.close() session.close()
@ -62,8 +95,11 @@ class DatabaseConnector:
# FOR TESTING PURPOSES # FOR TESTING PURPOSES
def main(): def main():
db = DatabaseConnector("/Users/evanreichard/Development/git/overseer/src/overseer") data_path = "/Users/evanreichard/Development/git/overseer/src/overseer"
db = DatabaseConnector(data_path)
db.create_scan_result("1.2.3.4", [5, 6, 7, 8], "test222.com") db.create_scan_result("1.2.3.4", [5, 6, 7, 8], "test222.com")
db.get_scan_results(ip_address="1.2.3.4") db.get_scan_results(ip_addr="1.2.3.4")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -3,20 +3,25 @@ from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base() Base = declarative_base()
class ScanTarget(Base): class ScanTarget(Base):
__tablename__ = "scan_target" __tablename__ = "scan_target"
# Unique ID # Unique ID
id = Column(Integer, primary_key=True, unique=True) id = Column(Integer, primary_key=True, unique=True)
# Integer representation of an IP Address. Can use SQL queries to filter subnets. # Integer representation of an IP Address
ip = Column(Integer, index=True, unique=True) ip = Column(Integer, index=True, unique=True)
# Corresponding hostname. # Corresponding hostname.
hostname = Column(String, index=True, unique=True) hostname = Column(String, index=True, unique=True)
def __repr__(self): def __repr__(self):
return f"ScanTarget(id={self.id!r}, ip={self.ip!r}, hostname={self.hostname!r})" return (
f"ScanTarget(id={self.id!r}, ip={self.ip!r}, "
f"hostname={self.hostname!r})"
)
class ScanHistory(Base): class ScanHistory(Base):
__tablename__ = "scan_history" __tablename__ = "scan_history"
@ -37,4 +42,7 @@ class ScanHistory(Base):
target = relationship("ScanTarget", foreign_keys=[target_id]) target = relationship("ScanTarget", foreign_keys=[target_id])
def __repr__(self): def __repr__(self):
return f"ScanHistory(id={self.id!r}, target={self.target!r}, results={self.results!r}, datetime={self.datetime!r})" return (
f"ScanHistory(id={self.id!r}, target={self.target!r}, "
f"results={self.results!r}, datetime={self.datetime!r})"
)

View File

@ -1,20 +1,26 @@
from overseer import app from overseer import app
from flask import Flask, make_response, render_template, send_from_directory from flask import make_response, render_template, send_from_directory
from overseer.api.v1 import api as api_v1 from overseer.api.v1 import api as api_v1
""" """
Initial Entrypoint to the SPA (i.e. 'index.html') Initial Entrypoint to the SPA (i.e. 'index.html')
""" """
@app.route("/", methods=["GET"]) @app.route("/", methods=["GET"])
def main_entry(): def main_entry():
return make_response(render_template("index.html")) return make_response(render_template("index.html"))
""" """
Front End Static Resources Front End Static Resources
""" """
@app.route("/static/<path:path>") @app.route("/static/<path:path>")
def static_resources(path): def static_resources(path):
return send_from_directory("static", path) return send_from_directory("static", path)
# Version API's # Version API's
app.register_blueprint(api_v1) app.register_blueprint(api_v1)

Binary file not shown.