Loading src/common/mixins/mssql.py 0 → 100644 +71 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Allow opening with a pymssql connection.""" import pymssql import traceback class MSSQLMixin(): """Provides a convenient API-like interface to an/a Azure/MS SQL database. The database object is the interface to the database. :param db_name: the name of the database to connect to """ #user = 'natgas_demo@gasdatafeeddemo' #passwd = '3Ss5$6Uz8#' #host = 'gasdatafeeddemo.database.windows.net' #database = 'gasdatafeed_demo' def open(self): """Open the connection to the database. :return: True if connection established, else false""" self.logger.debug('Opening Database Object') try: msg = "\nConnecting information\n" + \ f"Database: {self.connection_info['dbName']}\n" + \ f"Host: {self.connection_info['dbHost']}\n" + \ f"User: {self.connection_info['dbUser']}\n" self.logger.debug(msg) self.connection = pymssql.connect( database=self.connection_info['dbName'], user=self.connection_info['dbUser'], password=self.connection_info['dbPassword'], host=self.connection_info['dbHost']) #connect_timeout=self.connection_info['dbTimeout']) #application_name='ongpy') self.logger.debug('Successfully opened database connection') self.cur = self.connection.cursor() self.logger.debug('Successfully created database cursor') # self.cur.execute("SET search_path TO covidb;") except pymssql.OperationalError as e: self.logger.error(f'Database error: {e}') return False return True def close(self): """Close the database connection.""" self.logger.debug('Closing the database coinnection') if hasattr(self, 'cur') and self.cur is not None: self.logger.debug('Closing cursor and setting to None') self.cur.close() self.cur = None if hasattr(self, 'con') and self.con is not None: self.logger.debug('Closing connection and setting to None') self.connection.commit() self.connection.close() self.connection = None if hasattr(self, 'engine') and self.engine is not None: self.logger.debug('Closing sqlalchemy connection') self.engine.dispose() self.logger.debug('Database object successfully closed.') def query(self, query): """Send a query to the database and retrieve the results.""" self.logger.debug(f'Initiating query: {query}') try: self.cur.execute(query) res = self.cur.fetchall() except Exception as e: traceback.print_stack() self.logger.error(f'Problem querying database: {e}') res = None return res Loading
src/common/mixins/mssql.py 0 → 100644 +71 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Allow opening with a pymssql connection.""" import pymssql import traceback class MSSQLMixin(): """Provides a convenient API-like interface to an/a Azure/MS SQL database. The database object is the interface to the database. :param db_name: the name of the database to connect to """ #user = 'natgas_demo@gasdatafeeddemo' #passwd = '3Ss5$6Uz8#' #host = 'gasdatafeeddemo.database.windows.net' #database = 'gasdatafeed_demo' def open(self): """Open the connection to the database. :return: True if connection established, else false""" self.logger.debug('Opening Database Object') try: msg = "\nConnecting information\n" + \ f"Database: {self.connection_info['dbName']}\n" + \ f"Host: {self.connection_info['dbHost']}\n" + \ f"User: {self.connection_info['dbUser']}\n" self.logger.debug(msg) self.connection = pymssql.connect( database=self.connection_info['dbName'], user=self.connection_info['dbUser'], password=self.connection_info['dbPassword'], host=self.connection_info['dbHost']) #connect_timeout=self.connection_info['dbTimeout']) #application_name='ongpy') self.logger.debug('Successfully opened database connection') self.cur = self.connection.cursor() self.logger.debug('Successfully created database cursor') # self.cur.execute("SET search_path TO covidb;") except pymssql.OperationalError as e: self.logger.error(f'Database error: {e}') return False return True def close(self): """Close the database connection.""" self.logger.debug('Closing the database coinnection') if hasattr(self, 'cur') and self.cur is not None: self.logger.debug('Closing cursor and setting to None') self.cur.close() self.cur = None if hasattr(self, 'con') and self.con is not None: self.logger.debug('Closing connection and setting to None') self.connection.commit() self.connection.close() self.connection = None if hasattr(self, 'engine') and self.engine is not None: self.logger.debug('Closing sqlalchemy connection') self.engine.dispose() self.logger.debug('Database object successfully closed.') def query(self, query): """Send a query to the database and retrieve the results.""" self.logger.debug(f'Initiating query: {query}') try: self.cur.execute(query) res = self.cur.fetchall() except Exception as e: traceback.print_stack() self.logger.error(f'Problem querying database: {e}') res = None return res