Docs, Tests, SIGINT

This commit is contained in:
Evan Reichard 2021-03-20 23:15:22 -04:00
parent bdf6dd4e77
commit 0632c06978
7 changed files with 107 additions and 9 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
**/__pycache__/
**/.coverage
src/overseer.egg-info/
build/
dist/

View File

@ -4,9 +4,9 @@ repos:
hooks:
- id: black
language_version: python3.9
files: '^src/overseer/|^setup.py'
files: '^src/overseer/|^setup.py|^tests/overseer/'
- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.0
hooks:
- id: flake8
files: '^src/overseer/|^setup.py'
files: '^src/overseer/|^setup.py|^tests/overseer/'

View File

@ -15,5 +15,6 @@ setup(
"sqlalchemy",
"Flask",
],
tests_require=["pytest"],
extras_require={"dev": ["pre-commit", "black", "flake8", "pytest"]},
)

View File

@ -1,4 +1,6 @@
import click
import signal
import sys
from importlib.metadata import version
from overseer.config import EnvConfig
from overseer.scanner import ScanManager
@ -14,6 +16,11 @@ database = DatabaseConnector(config.DATA_PATH)
scan_manager = ScanManager()
def signal_handler(sig, frame):
scan_manager.shutdown()
sys.exit(0)
def create_app():
return app
@ -23,5 +30,8 @@ def cli():
"""Management script for the application."""
# Handle SIGINT
signal.signal(signal.SIGINT, signal_handler)
# Import all flask views
import overseer.overseer # noqa: E501,F401,E402

View File

@ -73,6 +73,11 @@ class DatabaseConnector:
**kwargs
Either hostname or ip_addr
Raises
------
ValueError
If hostname or ip_addr isn't provided in kwargs
Returns
-------
ScanTarget
@ -103,6 +108,11 @@ class DatabaseConnector:
**kwargs
Either hostname or ip_addr
Raises
------
ValueError
If hostname or ip_addr isn't provided in kwargs
Returns
-------
ScanTarget
@ -196,6 +206,11 @@ class DatabaseConnector:
error : str, optional
Error message, if any
Raises
------
NoResultFound
If we cannot find the desired ScanHistory by ID
Returns
-------
ScanHistory

View File

@ -1,9 +1,14 @@
import pytest
# import pytest
import ipaddress
from overseer import scan_manager
from overseer.database import DatabaseConnector
# We're not testing this & this will stall tests
scan_manager.shutdown()
def test_create_scan_target():
db = DatabaseConnector(None, True)
db = DatabaseConnector(None, in_memory=True)
hostname = db.create_scan_target(hostname="google.com")
ip_address = db.create_scan_target(ip_addr="1.1.1.1")
@ -12,8 +17,74 @@ def test_create_scan_target():
assert ip_address.id == 2
assert ip_address.ip == int(ipaddress.ip_address("1.1.1.1"))
# def test_get_scan_target():
# def test_get_all_scan_target():
# def test_create_scan_result():
# def test_update_scan_result():
# def test_get_scan_results_by_target():
def test_get_scan_target():
db = DatabaseConnector(None, in_memory=True)
created_target = db.create_scan_target(hostname="google.com")
found_target = db.get_scan_target(hostname="google.com")
assert created_target.id == found_target.id
assert created_target.ip == found_target.ip
assert created_target.hostname == found_target.hostname
assert created_target.updated_at == found_target.updated_at
assert found_target.hostname == "google.com"
def test_get_all_scan_targets():
db = DatabaseConnector(None, in_memory=True)
for i in range(1, 6):
db.create_scan_target(ip_addr="127.0.0." + str(i))
found_targets = db.get_all_scan_targets()
assert len(found_targets) == 5
# This checks for properly ordered items
for i, target in enumerate(found_targets):
desired_target_ip = "127.0.0." + str(5 - i)
assert target.id == 5 - i
assert target.ip == int(ipaddress.ip_address(desired_target_ip))
def test_create_scan_result():
db = DatabaseConnector(None, in_memory=True)
scan_target = db.create_scan_target(ip_addr="127.0.0.1")
scan_history = db.create_scan_result("IN_PROGRESS", ip_addr="127.0.0.1")
scan_history_2 = db.create_scan_result("COMPLETE", ip_addr="127.0.0.2")
assert scan_history.id == 1
assert scan_history.target_id == scan_target.id
assert scan_history.status == "IN_PROGRESS"
assert scan_history_2.id == 2
assert scan_history_2.error is None
assert scan_history_2.target_id != scan_target.id
assert scan_history_2.status == "COMPLETE"
def test_update_scan_result():
db = DatabaseConnector(None, in_memory=True)
scan_history = db.create_scan_result("IN_PROGRESS", ip_addr="127.0.0.1")
updated_scan_history = db.update_scan_result(
scan_history.id, "COMPLETE", ["53 UDP", "53 TCP"]
)
assert scan_history.id == updated_scan_history.id
assert scan_history.status == "IN_PROGRESS"
assert updated_scan_history.status == "COMPLETE"
assert updated_scan_history.results == "53 UDP,53 TCP"
def test_get_scan_results_by_target():
db = DatabaseConnector(None, in_memory=True)
for i in range(1, 6):
db.create_scan_result("IN_PROGRESS", ip_addr="127.0.0.1")
for i in range(1, 3):
db.create_scan_result("COMPLETE", ip_addr="127.0.0.2")
scan_history_1 = db.get_scan_results_by_target(ip_addr="127.0.0.1")
scan_history_2 = db.get_scan_results_by_target(ip_addr="127.0.0.2")
assert len(scan_history_1) == 5
assert len(scan_history_2) == 2
assert scan_history_1[3].status == "IN_PROGRESS"

View File