Commit d071f133 authored by w2v1's avatar w2v1
Browse files

structure change, 100% code coverage

parent c5bbe899
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ class PostgresMixin():
                self.logger.error(f'Unknown type: {search_path}.')
                self.search_path = 'public'
            self.cursor.execute(f"""SET search_path TO {self.search_path};""")
            self.cursor.commit()
            self.connection.commit()
            self.logger.debug('Successfully set search_path')
        except psycopg2.OperationalError as error:
            self.logger.error(f'Database Error: {error}')
+34 −40
Original line number Diff line number Diff line
from os import times
import time

from common.database import Database
@@ -27,85 +28,76 @@ class PostgresLoggerMixin(Database, PostgresMixin):
        super().__init__()
        self.schema = schema
        self.log_collection = []
        self.logger = Database().logger

    def debug(self, message, timestamp=None, w=False, c=False):
    def debug(self, message, w=False, c=False):
        """Logs a debug message."""
        timestamp = self.check_for_timestamp(timestamp)
        if w:
            self.check_table_exists()
            self.log_to_database('DEBUG', timestamp, message)
            self.log_to_database('DEBUG', message)
        elif c:
            self.collect_logs('DEBUG', timestamp, message)
            self.collect_logs('DEBUG', message)
        self.logger.debug(message)

    def info(self, message, timestamp=None, w=False, c=False):
        """Logs an info message."""
        timestamp = self.check_for_timestamp(timestamp)
    def info(self, message, w=False, c=False):
        if w:
            self.check_table_exists()
            self.log_to_database('INFO', timestamp, message)
            self.log_to_database('INFO', message)
        elif c:
            self.collect_logs('INFO', timestamp, message)
            self.collect_logs('INFO', message)
        self.logger.info(message)

    def warning(self, message, timestamp=None, w=False, c=False):
        """Logs a warning message."""
        timestamp = self.check_for_timestamp(timestamp)
    def warning(self, message, w=False, c=False):
        if w:
            self.check_table_exists()
            self.log_to_database('WARNING', timestamp, message)
            self.log_to_database('WARNING', message)
        elif c:
            self.collect_logs('WARNING', timestamp, message)
            self.collect_logs('WARNING',  message)
        self.logger.warning(message)

    def error(self, message, timestamp=None, w=False, c=False):
    def error(self, message, w=False, c=False):
        """Logs an error message."""
        timestamp = self.check_for_timestamp(timestamp)
        if w:
            self.check_table_exists()
            self.log_to_database('ERROR', timestamp, message)
            self.log_to_database('ERROR', message)
        elif c:
            self.collect_logs('ERROR', timestamp, message)
            self.collect_logs('ERROR', message)
        self.logger.error(message)

    def critical(self, message, timestamp=None, w=False, c=False):
    def critical(self, message, w=False, c=False):
        """Logs a critical Message"""
        timestamp = self.check_for_timestamp(timestamp)
        if w:
            self.check_table_exists()
            self.log_to_database('CRITICAL', timestamp, message)
            self.log_to_database('CRITICAL', message)
        elif c:
            self.collect_logs('CRITICAL', timestamp, message)
            self.collect_logs('CRITICAL', message)
        self.logger.critical(message)

    # Only call from an exception handler
    def exception(self, message, timestamp=None, w=False, c=False):
    def exception(self, message, w=False, c=False):
        """Logs and exception message."""
        timestamp = self.check_for_timestamp(timestamp)
        if w:
            self.check_table_exists()
            self.log_to_database('EXCEPTION', timestamp, message)
            self.log_to_database('EXCEPTION', message)
        elif c:
            self.collect_logs('EXCEPTION', timestamp, message)
        self.logger.exceptionfo(message)
            self.collect_logs('EXCEPTION', message)
        self.logger.exception(message)

    def check_table_exists(self):
        """Checks if log table exists for workspace."""
        self.cursor.execute(f"CREATE TABLE IF NOT EXISTS {self.schema}.log (levelname TEXT, ts TIMESTAMP, message TEXT);")
        self.open()
        self.cursor.execute(f"CREATE TABLE IF NOT EXISTS {self.schema}.log (levelname TEXT, message TEXT, ts timestamp default now());")
        self.close()

    def check_for_timestamp(self, timestamp):
        """Checks if timestamp is provided."""
        if not timestamp:
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            return timestamp

    def log_to_database(self, loglevel, timestamp, message):
    def log_to_database(self, loglevel, message):
        "Logs log message to database."
        self.cursor.execute(f"INSERT INTO {self.schema}.log (levelname, ts, message) VALUES('{loglevel}', '{timestamp}', '{message}');")
        self.open()
        self.cursor.execute(f"INSERT INTO {self.schema}.log (levelname, message) VALUES('{loglevel}', '{message}');")
        self.close()

    def collect_logs(self, level, timestamp, message):
    def collect_logs(self, level, message):
        """Appends log messages to list"""
        log = (level, timestamp, message)
        log = (level, message)
        self.log_collection.append(tuple(log))

    def write_log_collection_to_database(self, log_list=None):
@@ -113,12 +105,14 @@ class PostgresLoggerMixin(Database, PostgresMixin):
        self.check_table_exists()
        # This keeps us from mucking up the internal list, even though we double
        # the code
        self.open()
        if log_list:
            for l in log_list:
                self.cursor.execute(f"INSERT INTO {self.schema}.log VALUES('{l[0]}', '{l[1]}', '{l[2]}');")
                self.cursor.execute(f"INSERT INTO {self.schema}.log VALUES('{l[0]}', '{l[1]}');")
        elif self.log_collection:
            for l in self.log_collection:
                self.cursor.execute(f"INSERT INTO {self.schema}.log VALUES('{l[0]}', '{l[1]}', '{l[2]}');")
                self.cursor.execute(f"INSERT INTO {self.schema}.log VALUES('{l[0]}', '{l[1]}');")
            self.log_collection = []
        else:
            self.warning("No logs recorded.")
        self.close()
+69 −18
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ import subprocess
import logging
import time
from datetime import datetime
from webbrowser import get
from common.database import Database
from common.mixins.postgres import PostgresMixin
from common.mixins.postgres_logger import PostgresLoggerMixin
@@ -128,47 +129,97 @@ class LogzTestCase(unittest.TestCase):
        test_postgresloggermixin:
            Method to test the postgresloggermixin class.
        """
        with DBPG() as data_base:
            data_base.cursor.execute("DROP TABLE IF EXISTS public.log") 
        plm = PostgresLoggerMixin('public')
        self.assertTrue(isinstance(plm, PostgresLoggerMixin))

        # Test log message
        plm.info("Test Message")
        # # Test log message
        # plm.info("Test Message")

        # test writing log message to database
        test_log = "Test log to db"
        dt = datetime.now()
        new_dt = datetime.strptime(str(dt).split(".")[0], '%Y-%m-%d %H:%M:%S')
        # # test writing log message to database
        # test_log = "Test log to db"

        plm.open()
        plm.info(test_log, w=True)
        plm.close()
        plm.debug("Test debug log to db", w=True)
        plm.info("Test info log to db", w=True)
        plm.warning("Test warning log to db", w=True)
        plm.error("Test error log to db", w=True)
        plm.critical("Test critical log to db", w=True)
        plm.exception("Test exception log to db", w=True)

        with DBPG() as data_base:
            get_log = data_base.query("SELECT * FROM public.log")
            get_single_log = data_base.query("SELECT * FROM public.log")

        self.assertEqual([("INFO", new_dt, "Test log to db")], get_log)
        self.assertListEqual(
            [
                ("DEBUG", "Test debug log to db", get_single_log[0][2]),
                ("INFO", "Test info log to db", get_single_log[1][2]),
                ("WARNING", "Test warning log to db", get_single_log[2][2]),
                ("ERROR", "Test error log to db", get_single_log[3][2]),
                ("CRITICAL", "Test critical log to db", get_single_log[4][2]),
                ("EXCEPTION", "Test exception log to db", get_single_log[5][2])
            ],
            [
                get_single_log[0], 
                get_single_log[1], 
                get_single_log[2], 
                get_single_log[3], 
                get_single_log[4], 
                get_single_log[5]
            ]
        )

        # Test writing a collection to database
        plm.debug("test debug collection", c=True)
        plm.info("test info collection", c=True)
        plm.warning("test warning collection", c=True)
        plm.error("test error collection", c=True)
        plm.critical("test critical collection", c=True)
        plm.exception("test exception collection", c=True)

        plm.open()
        plm.write_log_collection_to_database()
        plm.close()

        with DBPG() as data_base:
            get_logs = data_base.query("SELECT * FROM public.log")

        self.assertListEqual(
            [
                ("INFO", new_dt, "test info collection"),
                ("WARNING", new_dt, "test warning collection"),
                ("CRITICAL", new_dt, "test critical collection")
                ("DEBUG", "test debug collection", get_logs[6][2]),
                ("INFO", "test info collection", get_logs[7][2]),
                ("WARNING", "test warning collection", get_logs[8][2]),
                ("ERROR", "test error collection", get_logs[9][2]),
                ("CRITICAL", "test critical collection", get_logs[10][2]),
                ("EXCEPTION", "test exception collection", get_logs[11][2])
            ],
            [
                get_logs[6], get_logs[7], get_logs[8], get_logs[9], get_logs[10], get_logs[11]
            ]
        )

        test_custom_list = [
            ("INFO", "custom_list_message"), 
            ("WARNING", "Warning Custom"),
            ("CRITICAL", "YO THIS IS A CRIT")
        ]

        plm.write_log_collection_to_database(log_list=test_custom_list)

        with DBPG() as data_base:
            get_cust = data_base.query("SELECT * FROM public.log")

        self.assertListEqual(
            [
                ("INFO", "custom_list_message", get_cust[12][2]),
                ("WARNING", "Warning Custom", get_cust[13][2]),
                ("CRITICAL", "YO THIS IS A CRIT", get_cust[14][2]),
            ],
            [
                get_logs[1], get_logs[2], get_logs[3]
                get_cust[12], get_cust[13], get_cust[14]
            ]
        )

        # test empty list:
        plm.write_log_collection_to_database()

if __name__ == "__main__":
    unittest.main(verbosity=2)