Commit 16168f0b authored by Powell, Eric's avatar Powell, Eric
Browse files

Refactored to consilidaste PG functions into class, and support the concept of...

Refactored to consilidaste PG functions into class, and support the concept of a project hierarchy above workorders to allow multiple files to be associated with a project name for tracking.
parent 61bdf28b
Loading
Loading
Loading
Loading
+0 −0

Empty file added.

+87 −85
Original line number Diff line number Diff line
from parse_res import ResEstimateData
import psycopg2
from psycopg2 import sql

class LoadRes(ResEstimateData):
    def __init__(self, project_id, workorder_title, schema_name, resource_pool_name, filename, pg_conn):
        super().__init__(filename)
        self.pg_con = pg_conn
# import psycopg2
# from psycopg2 import sql
from .pg_tools import PGTools

class LoadRes(ResEstimateData, PGTools):
    def __init__(self, project_id, workorder_title, schema_name, resource_pool_name, filename, **DB_PARAMS):
        ResEstimateData.__init__(self, filename)
        PGTools.__init__(self, **DB_PARAMS)
        # self.pg_con = pg_conn
        self.resource_poolid = self._get_id(schema_name,
                                            "resource_pools",
                                            "resource_pool_id",
@@ -22,84 +24,84 @@ class LoadRes(ResEstimateData):
                print('WorkOrder / Team /Project combination or Filename exists')
                exit(-999)

    def _get_id(self, schema_name, table_name, column, filter_column, value):

        # Construct the SQL query safely to prevent SQL injection
        # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
        query = sql.SQL("SELECT {col} FROM {tbl} WHERE {key} = {val}").format(
            col=sql.Identifier(column),
            tbl=sql.SQL('.').join([
                sql.Identifier(schema_name),
                sql.Identifier(table_name)
            ]),
            key=sql.Identifier(filter_column),
            val=sql.Placeholder()  # For the data value
        )

        try:
            # Use a 'with' statement for the cursor to ensure it's closed automatically
            with self.pg_con.cursor() as cursor:
                # Execute the query with the data values
                cursor.execute(query, (value,))

                # Fetch the returned primary key from the cursor
                return cursor.fetchone()[0]

        except Exception as e:
            print(query, (value,))
            raise e

    def _insert_record_and_get_pk(self, schema_name, table_name, data, pk_column='id'):
        """
        Inserts a record into a table and returns the new primary key.

        Args:
            table_name (str): The name of the target table.
            data (dict): A dictionary where keys are column names and values are the data to insert.
            pk_column (str): The name of the serial primary key column (defaults to 'id').

        Returns:
            The value of the primary key for the newly inserted row, or None if insertion fails.
        """
        new_id = None
        columns = data.keys()

        # Construct the SQL query safely to prevent SQL injection
        # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
        query = sql.SQL("INSERT INTO {tbl} ({cols}) VALUES ({vals}) RETURNING {pk}").format(
            tbl=sql.SQL('.').join([
                sql.Identifier(schema_name),
                sql.Identifier(table_name)
            ]),
            cols=sql.SQL(', ').join(map(sql.Identifier, columns)),
            vals=sql.SQL(', ').join(sql.Placeholder() * len(columns)),
            pk=sql.Identifier(pk_column)
        )

        try:
            # Use a 'with' statement for the cursor to ensure it's closed automatically
            with self.pg_con.cursor() as cursor:
                # Execute the query with the data values
                cursor.execute(query, list(data.values()))

                # Fetch the returned primary key from the cursor
                new_id = cursor.fetchone()[0]

                # Commit the transaction to make the insert permanent
                self.pg_con.commit()
                print(f"✅ Record inserted successfully into '{table_name}'. New ID: {new_id}")

        except (Exception, psycopg2.Error) as error:
            print(f"❌ Error inserting record: {error}")
            # Roll back the transaction in case of an error
            if self.pg_con:
                self.pg_con.rollback()
            # return None
            raise error
        # TO DO - On insert fail, return the Record ID of the existing record


        return new_id
    # def _get_id(self, schema_name, table_name, column, filter_column, value):
    #
    #     # Construct the SQL query safely to prevent SQL injection
    #     # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
    #     query = sql.SQL("SELECT {col} FROM {tbl} WHERE {key} = {val}").format(
    #         col=sql.Identifier(column),
    #         tbl=sql.SQL('.').join([
    #             sql.Identifier(schema_name),
    #             sql.Identifier(table_name)
    #         ]),
    #         key=sql.Identifier(filter_column),
    #         val=sql.Placeholder()  # For the data value
    #     )
    #
    #     try:
    #         # Use a 'with' statement for the cursor to ensure it's closed automatically
    #         with self.pg_con.cursor() as cursor:
    #             # Execute the query with the data values
    #             cursor.execute(query, (value,))
    #
    #             # Fetch the returned primary key from the cursor
    #             return cursor.fetchone()[0]
    #
    #     except Exception as e:
    #         print(query, (value,))
    #         raise e
    #
    # def _insert_record_and_get_pk(self, schema_name, table_name, data, pk_column='id'):
    #     """
    #     Inserts a record into a table and returns the new primary key.
    #
    #     Args:
    #         table_name (str): The name of the target table.
    #         data (dict): A dictionary where keys are column names and values are the data to insert.
    #         pk_column (str): The name of the serial primary key column (defaults to 'id').
    #
    #     Returns:
    #         The value of the primary key for the newly inserted row, or None if insertion fails.
    #     """
    #     new_id = None
    #     columns = data.keys()
    #
    #     # Construct the SQL query safely to prevent SQL injection
    #     # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
    #     query = sql.SQL("INSERT INTO {tbl} ({cols}) VALUES ({vals}) RETURNING {pk}").format(
    #         tbl=sql.SQL('.').join([
    #             sql.Identifier(schema_name),
    #             sql.Identifier(table_name)
    #         ]),
    #         cols=sql.SQL(', ').join(map(sql.Identifier, columns)),
    #         vals=sql.SQL(', ').join(sql.Placeholder() * len(columns)),
    #         pk=sql.Identifier(pk_column)
    #     )
    #
    #     try:
    #         # Use a 'with' statement for the cursor to ensure it's closed automatically
    #         with self.pg_con.cursor() as cursor:
    #             # Execute the query with the data values
    #             cursor.execute(query, list(data.values()))
    #
    #             # Fetch the returned primary key from the cursor
    #             new_id = cursor.fetchone()[0]
    #
    #             # Commit the transaction to make the insert permanent
    #             self.pg_con.commit()
    #             print(f"✅ Record inserted successfully into '{table_name}'. New ID: {new_id}")
    #
    #     except (Exception, psycopg2.Error) as error:
    #         print(f"❌ Error inserting record: {error}")
    #         # Roll back the transaction in case of an error
    #         if self.pg_con:
    #             self.pg_con.rollback()
    #         # return None
    #         raise error
    #     # TO DO - On insert fail, return the Record ID of the existing record
    #
    #
    #     return new_id

    def load(self):
        """Function to coordinate loading of tables"""
+13 −6
Original line number Diff line number Diff line
@@ -2,14 +2,21 @@ import csv
import psycopg2
from psycopg2 import sql

class ExportPowerBInputs:
    def __init__(self, Project_title, export_path, **kwargs):
from libraries.pg_tools import PGTools


class ExportPowerBInputs(PGTools):
    def __init__(self, Project_title, export_path, **DB_PARAMS):
        super().__init__(**DB_PARAMS)
        self.Project_title = Project_title
        self.export_path = export_path
        self.pg_con = psycopg2.connect(**kwargs)
        # self.pg_con = psycopg2.connect(**DB_PARAMS)

    def _get_Data(self):
    def _get_Data(self, view):
        ...

    def expoer(self):
        self._get_Data()
 No newline at end of file
    def export(self):
        lst_views = ['v_labor_plan_data', 'vint_spend_rate_task', 'vint_spend_rate_activity']
        for view in lst_views:
            lst_rows = self._get_Data(view)
            # Write rows to CSV
+86 −0
Original line number Diff line number Diff line
import psycopg2
from psycopg2 import sql


class PGTools():
    def __init__(self, **DB_PARAMS):
        self.pg_con = psycopg2.connect(**DB_PARAMS)

    def _get_id(self, schema_name, table_name, column, filter_column, value):

        # Construct the SQL query safely to prevent SQL injection
        # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
        query = sql.SQL("SELECT {col} FROM {tbl} WHERE {key} = {val}").format(
            col=sql.Identifier(column),
            tbl=sql.SQL('.').join([
                sql.Identifier(schema_name),
                sql.Identifier(table_name)
            ]),
            key=sql.Identifier(filter_column),
            val=sql.Placeholder()  # For the data value
        )

        try:
            # Use a 'with' statement for the cursor to ensure it's closed automatically
            with self.pg_con.cursor() as cursor:
                # Execute the query with the data values
                cursor.execute(query, (value,))

                # Fetch the returned primary key from the cursor
                return cursor.fetchone()[0]

        except Exception as e:
            print(query, (value,))
            raise e

    def _insert_record_and_get_pk(self, schema_name, table_name, data, pk_column='id'):
        """
        Inserts a record into a table and returns the new primary key.

        Args:
            table_name (str): The name of the target table.
            data (dict): A dictionary where keys are column names and values are the data to insert.
            pk_column (str): The name of the serial primary key column (defaults to 'id').

        Returns:
            The value of the primary key for the newly inserted row, or None if insertion fails.
        """
        new_id = None
        columns = data.keys()

        # Construct the SQL query safely to prevent SQL injection
        # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
        query = sql.SQL("INSERT INTO {tbl} ({cols}) VALUES ({vals}) RETURNING {pk}").format(
            tbl=sql.SQL('.').join([
                sql.Identifier(schema_name),
                sql.Identifier(table_name)
            ]),
            cols=sql.SQL(', ').join(map(sql.Identifier, columns)),
            vals=sql.SQL(', ').join(sql.Placeholder() * len(columns)),
            pk=sql.Identifier(pk_column)
        )

        try:
            # Use a 'with' statement for the cursor to ensure it's closed automatically
            with self.pg_con.cursor() as cursor:
                # Execute the query with the data values
                cursor.execute(query, list(data.values()))

                # Fetch the returned primary key from the cursor
                new_id = cursor.fetchone()[0]

                # Commit the transaction to make the insert permanent
                self.pg_con.commit()
                print(f"✅ Record inserted successfully into '{table_name}'. New ID: {new_id}")

        except (Exception, psycopg2.Error) as error:
            print(f"❌ Error inserting record: {error}")
            # Roll back the transaction in case of an error
            if self.pg_con:
                self.pg_con.rollback()
            # return None
            raise error
        # TO DO - On insert fail, return the Record ID of the existing record


        return new_id
+87 −87
Original line number Diff line number Diff line
import psycopg2
from psycopg2 import sql
# import psycopg2
from psycopg2 import errors
from .pg_tools import PGTools



class ProjectBaseline:
class ProjectBaseline(PGTools):
    def __init__(self, schema_name, project_title, **KWargs):
        self.pg_con = psycopg2.connect(**KWargs)
        super().__init__(**KWargs)
        try:
            self.project_id = self._insert_record_and_get_pk(schema_name, 'Projects', project_title, 'project_id')
        except Exception as e:
            if e.pgcode == '23505':
                print('Project Exists, using existing project_id')
                self.project_id = self._get_id(schema_name, 'projects', 'proiject_id', 'project_name', project_title)


    def _get_id(self, schema_name, table_name, column, filter_column, value):

        # Construct the SQL query safely to prevent SQL injection
        # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
        query = sql.SQL("SELECT {col} FROM {tbl} WHERE {key} = {val}").format(
            col=sql.Identifier(column),
            tbl=sql.SQL('.').join([
                sql.Identifier(schema_name),
                sql.Identifier(table_name)
            ]),
            key=sql.Identifier(filter_column),
            val=sql.Placeholder()  # For the data value
        )

        try:
            # Use a 'with' statement for the cursor to ensure it's closed automatically
            with self.pg_con.cursor() as cursor:
                # Execute the query with the data values
                cursor.execute(query, (value,))

                # Fetch the returned primary key from the cursor
                return cursor.fetchone()[0]

        except Exception as e:
            print(query, (value,))
            raise e


    def _insert_record_and_get_pk(self, schema_name, table_name, data, pk_column='id'):
        """
        Inserts a record into a table and returns the new primary key.

        Args:
            table_name (str): The name of the target table.
            data (dict): A dictionary where keys are column names and values are the data to insert.
            pk_column (str): The name of the serial primary key column (defaults to 'id').

        Returns:
            The value of the primary key for the newly inserted row, or None if insertion fails.
        """
        new_id = None
        columns = data.keys()

        # Construct the SQL query safely to prevent SQL injection
        # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
        query = sql.SQL("INSERT INTO {tbl} ({cols}) VALUES ({vals}) RETURNING {pk}").format(
            tbl=sql.SQL('.').join([
                sql.Identifier(schema_name),
                sql.Identifier(table_name)
            ]),
            cols=sql.SQL(', ').join(map(sql.Identifier, columns)),
            vals=sql.SQL(', ').join(sql.Placeholder() * len(columns)),
            pk=sql.Identifier(pk_column)
        )

        try:
            # Use a 'with' statement for the cursor to ensure it's closed automatically
            with self.pg_con.cursor() as cursor:
                # Execute the query with the data values
                cursor.execute(query, list(data.values()))

                # Fetch the returned primary key from the cursor
                new_id = cursor.fetchone()[0]

                # Commit the transaction to make the insert permanent
                self.pg_con.commit()
                print(f"✅ Record inserted successfully into '{table_name}'. New ID: {new_id}")

        except (Exception, psycopg2.Error) as error:
            print(f"❌ Error inserting record: {error}")
            # Roll back the transaction in case of an error
            if self.pg_con:
                self.pg_con.rollback()
            # return None
            raise error
        # TO DO - On insert fail, return the Record ID of the existing record


        return new_id
                self.project_id = self._get_id(schema_name, 'projects', 'project_id', 'project_name', project_title)


    # def _get_id(self, schema_name, table_name, column, filter_column, value):
    #
    #     # Construct the SQL query safely to prevent SQL injection
    #     # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
    #     query = sql.SQL("SELECT {col} FROM {tbl} WHERE {key} = {val}").format(
    #         col=sql.Identifier(column),
    #         tbl=sql.SQL('.').join([
    #             sql.Identifier(schema_name),
    #             sql.Identifier(table_name)
    #         ]),
    #         key=sql.Identifier(filter_column),
    #         val=sql.Placeholder()  # For the data value
    #     )
    #
    #     try:
    #         # Use a 'with' statement for the cursor to ensure it's closed automatically
    #         with self.pg_con.cursor() as cursor:
    #             # Execute the query with the data values
    #             cursor.execute(query, (value,))
    #
    #             # Fetch the returned primary key from the cursor
    #             return cursor.fetchone()[0]
    #
    #     except Exception as e:
    #         print(query, (value,))
    #         raise e
    #
    #
    # def _insert_record_and_get_pk(self, schema_name, table_name, data, pk_column='id'):
    #     """
    #     Inserts a record into a table and returns the new primary key.
    #
    #     Args:
    #         table_name (str): The name of the target table.
    #         data (dict): A dictionary where keys are column names and values are the data to insert.
    #         pk_column (str): The name of the serial primary key column (defaults to 'id').
    #
    #     Returns:
    #         The value of the primary key for the newly inserted row, or None if insertion fails.
    #     """
    #     new_id = None
    #     columns = data.keys()
    #
    #     # Construct the SQL query safely to prevent SQL injection
    #     # This uses psycopg2.sql objects to safely handle identifiers (table/column names)
    #     query = sql.SQL("INSERT INTO {tbl} ({cols}) VALUES ({vals}) RETURNING {pk}").format(
    #         tbl=sql.SQL('.').join([
    #             sql.Identifier(schema_name),
    #             sql.Identifier(table_name)
    #         ]),
    #         cols=sql.SQL(', ').join(map(sql.Identifier, columns)),
    #         vals=sql.SQL(', ').join(sql.Placeholder() * len(columns)),
    #         pk=sql.Identifier(pk_column)
    #     )
    #
    #     try:
    #         # Use a 'with' statement for the cursor to ensure it's closed automatically
    #         with self.pg_con.cursor() as cursor:
    #             # Execute the query with the data values
    #             cursor.execute(query, list(data.values()))
    #
    #             # Fetch the returned primary key from the cursor
    #             new_id = cursor.fetchone()[0]
    #
    #             # Commit the transaction to make the insert permanent
    #             self.pg_con.commit()
    #             print(f"✅ Record inserted successfully into '{table_name}'. New ID: {new_id}")
    #
    #     except (Exception, psycopg2.Error) as error:
    #         print(f"❌ Error inserting record: {error}")
    #         # Roll back the transaction in case of an error
    #         if self.pg_con:
    #             self.pg_con.rollback()
    #         # return None
    #         raise error
    #     # TO DO - On insert fail, return the Record ID of the existing record
    #
    #
    #     return new_id

    def get_project_id(self):
        return self.project_id
 No newline at end of file
Loading