Commit ae53c7a8 authored by Grant, Josh's avatar Grant, Josh
Browse files

Merge branch 'feature/scraper' into 'develop'

Feature/scraper

See merge request !32
parents 20f4f01f c3523219
Loading
Loading
Loading
Loading
+32 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
This module defines a data class 'ErrorCodes' that instantiates various error codes used throughout the application.
It categorizes error codes into distinct sections for database operations, scraping processes,
templating issues, and provides a default error code for general use. Each error type is associated with specific
integer values, making it easier to manage and identify errors consistently across different components of the application.
'''
from dataclasses import dataclass


@dataclass
class ErrorCodes:

    # Database Errors
    DB_CONNECTION_FAILED: int = 1001
    DB_TIMEOUT: int = 1002
    DB_INTEGRITY_ERROR: int = 1003

    # Web Scraping Errors
    SCRAPE_CONNECTION_ERROR: int = 2001
    SCRAPE_TIMEOUT: int = 2002
    SCRAPE_PARSING_FAILED: int = 2003
    SCRAPE_DATA_NOT_FOUND: int = 2004

    # Templating Errors
    TEMPLATE_RENDERING_FAILED: int = 3001
    TEMPLATE_NOT_FOUND: int = 3002
    TEMPLATE_SYNTAX_ERROR: int = 3003

    # Catch all Error
    DEFAULT_ERROR: int = 9999
+67 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
This module defines custom exception classes for various application components. Each exception class validates
specific error codes from 'ErrorCodes'. Included are 'ScraperException', 'DatabaseException', 'TemplateException', and 'ParserError',
each tailored to a specific component and type of error within the application.
'''
from common.error_codes import ErrorCodes


class CommonException(Exception):
    def __init__(self, message=None, code=None):

        self.code = code if code else ErrorCodes.DEFAULT_ERROR
        self.validate_code(self.code)
        self.message = str(message)
        super().__init__(message)

    def __str__(self):
        return f"{self.message} (Error Code: {self.code})"

    @staticmethod
    def validate_code(code: int = None):
        if code and not any(value == code for value in vars(ErrorCodes).values()):
            raise ValueError(f"Invalid error code: {code}")

    def __repr__(self):
        return f"{self.__class__.__name__}(message={self.args[0]!r}, code={self.code})"


class ScraperException(CommonException):
    @staticmethod
    def validate_code(code):
        db_codes = {
            value
            for name, value in vars(ErrorCodes).items()
            if name.startswith("SCRAPE_")
        }
        if code and code not in db_codes:
            raise ValueError(f"Invalid scraper error code: {code}")


class DatabaseException(CommonException):
    @staticmethod
    def validate_code(code):
        db_codes = {
            value for name, value in vars(ErrorCodes).items() if name.startswith("DB_")
        }
        if code and code not in db_codes:
            raise ValueError(f"Invalid database error code: {code}")


class TemplateException(CommonException):
    @staticmethod
    def validate_code(code):
        db_codes = {
            value
            for name, value in vars(ErrorCodes).items()
            if name.startswith("TEMPLATE_")
        }
        if code and code not in db_codes:
            raise ValueError(f"Invalid template error code: {code}")


class ParserError(ScraperException):
    def __init__(self, message=None):
        super().__init__(message, 2003)
+8 −6
Original line number Diff line number Diff line
@@ -52,8 +52,10 @@ def delete_directory(dirc=None):
                directory_to_delete = Path(dirc)

                for item in directory_to_delete.iterdir():
                    if item.is_file(): item.unlink()
                    if item.is_dir(): item.rmdir()
                    if item.is_file():
                        item.unlink()
                    if item.is_dir():
                        item.rmdir()

                directory_to_delete.rmdir()
                return f"{dirc} deleted successfully."
@@ -132,7 +134,7 @@ def write_file(file_name, file_data=""):
    try:
        if Path(file_name).suffix == ".json":
            if not Path(file_name).exists():
                with open(f"{file_name}", 'w') as json_file:
                with open(f"{file_name}", "w") as json_file:
                    json_file.write(json.dumps({}))
            with open(f"{file_name}", "r+") as file:
                file_contents = json.load(file)
+5 −1
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
"""Provide a standard logger for all use cases."""
import logging

# standard python imports
import os

@@ -42,7 +43,10 @@ def create_logger(file_out=None, mode=None, encoding=None):
        file_handler = None
        handlers = [rich_handler]
    logging.basicConfig(
        level=log_level, format="%(message)s", datefmt="[%Y/%m/%d %H:%M;%S]", handlers=handlers
        level=log_level,
        format="%(message)s",
        datefmt="[%Y/%m/%d %H:%M;%S]",
        handlers=handlers,
    )
    # if there is a file_handler set, close it before leaving :)
    # This prevents leaving open files
+10 −3
Original line number Diff line number Diff line
@@ -58,7 +58,9 @@ def send_email(subject, text, files=None, recipients=None):
            if os.path.isfile(f):
                with open(f, "rb") as att_file:
                    part = MIMEApplication(att_file.read())
                part["Content-Disposition"] = "attachment; filename=" f'"{os.path.basename(f)}"'
                part["Content-Disposition"] = (
                    "attachment; filename=" f'"{os.path.basename(f)}"'
                )
                msg.attach(part)
            else:
                logger.error(f"Failed to attach file {f}. {f} is not a file.")
@@ -66,7 +68,9 @@ def send_email(subject, text, files=None, recipients=None):
        if os.path.isfile(files):
            with open(files, "rb") as att_file:
                part = MIMEApplication(att_file.read())
            part["Content-Disposition"] = "attachment; filename=" f'"{os.path.basename(files)}"'
            part["Content-Disposition"] = (
                "attachment; filename=" f'"{os.path.basename(files)}"'
            )
            msg.attach(part)
        else:
            logger.error(f"Failed to attach file {files}. {files} is not a " "file.")
@@ -80,7 +84,10 @@ def send_email(subject, text, files=None, recipients=None):
            server.send_message(msg)
            server.quit()
        logger.info(f"Email successfully sent to {to_address}.")
    elif all(var is not None for var in [relay_address, relay_port, relay_password, relay_user]):
    elif all(
        var is not None
        for var in [relay_address, relay_port, relay_password, relay_user]
    ):
        # allow ssl connection
        context = ssl.create_default_context()
        with smtplib.SMTP(relay_address, relay_port) as conn:
Loading