Loading Pipfile 0 → 100644 +12 −0 Original line number Diff line number Diff line [[source]] url = "https://pypi.org/simple" verify_ssl = true name = "pypi" [packages] common = {editable = true, path = "./src"} [dev-packages] [requires] python_version = "3.11" src/common/mixins/influx.py 0 → 100644 +51 −0 Original line number Diff line number Diff line """Scratch file for showing how the mixins get used.""" from influxdb import InfluxDBClient from common.env import check_environment as ce class InfluxMixin: """Serve common connection method for InfluxDB.""" DEFAULT_HOST = ce("INFLUXDB_HOST", "localhost") DEFAULT_PORT = int(ce("INFLUXDB_PORT", 8086)) DEFAULT_USER = ce("INFLUXDB_ADMIN_USER", "admin") DEFAULT_PASSWORD = ce("INFLUXDB_ADMIN_PASSWORD", "secret") DEFAULT_DB = ce("INFLUX_DB", "cast") def open(self): """Create the InfluxDB client instance.""" self.logger.debug("Creating InfluxDB Client Instance") try: self.client = InfluxDBClient( host=self.DEFAULT_HOST, port=self.DEFAULT_PORT, username=self.DEFAULT_USER, password=self.DEFAULT_PASSWORD, database=self.DEFAULT_DB, ) self.logger.debug("Successfully created InfluxDB client instance") except Exception as error: self.logger.error(f"Error creating InfluxDB Client: {error}") return False return True def query(self, query): """Query the InfluxDB.""" if not self.client: self.logger.info("InfluxDB client not created, creating now.") if not self.open(): return None self.logger.debug("Submitting query to InfluxDB.") try: result = self.client.query(query) return result except Exception as error: self.logger.error(f"Error during query execution: {error}") return None def list_databases(self): if not self.client: if not self.open(): return None return self.client.get_list_database() src/requirements.txt +2 −1 Original line number Diff line number Diff line Loading @@ -3,3 +3,4 @@ pandas==2.1.4 psycopg2-binary==2.9.9 pymssql==2.2.11 SQLAlchemy~=2.0.23 influxdb==5.3.1 No newline at end of file Loading
Pipfile 0 → 100644 +12 −0 Original line number Diff line number Diff line [[source]] url = "https://pypi.org/simple" verify_ssl = true name = "pypi" [packages] common = {editable = true, path = "./src"} [dev-packages] [requires] python_version = "3.11"
src/common/mixins/influx.py 0 → 100644 +51 −0 Original line number Diff line number Diff line """Scratch file for showing how the mixins get used.""" from influxdb import InfluxDBClient from common.env import check_environment as ce class InfluxMixin: """Serve common connection method for InfluxDB.""" DEFAULT_HOST = ce("INFLUXDB_HOST", "localhost") DEFAULT_PORT = int(ce("INFLUXDB_PORT", 8086)) DEFAULT_USER = ce("INFLUXDB_ADMIN_USER", "admin") DEFAULT_PASSWORD = ce("INFLUXDB_ADMIN_PASSWORD", "secret") DEFAULT_DB = ce("INFLUX_DB", "cast") def open(self): """Create the InfluxDB client instance.""" self.logger.debug("Creating InfluxDB Client Instance") try: self.client = InfluxDBClient( host=self.DEFAULT_HOST, port=self.DEFAULT_PORT, username=self.DEFAULT_USER, password=self.DEFAULT_PASSWORD, database=self.DEFAULT_DB, ) self.logger.debug("Successfully created InfluxDB client instance") except Exception as error: self.logger.error(f"Error creating InfluxDB Client: {error}") return False return True def query(self, query): """Query the InfluxDB.""" if not self.client: self.logger.info("InfluxDB client not created, creating now.") if not self.open(): return None self.logger.debug("Submitting query to InfluxDB.") try: result = self.client.query(query) return result except Exception as error: self.logger.error(f"Error during query execution: {error}") return None def list_databases(self): if not self.client: if not self.open(): return None return self.client.get_list_database()
src/requirements.txt +2 −1 Original line number Diff line number Diff line Loading @@ -3,3 +3,4 @@ pandas==2.1.4 psycopg2-binary==2.9.9 pymssql==2.2.11 SQLAlchemy~=2.0.23 influxdb==5.3.1 No newline at end of file