Loading src/common/mixins/influx.py 0 → 100644 +52 −0 Original line number Diff line number Diff line """Scratch file for showing how the mixins get used.""" from influxdb_client import InfluxDBClient from common.env import check_environment as ce class InfluxMixin: """Serve common connection method for InfluxDB.""" DEFAULT_URL = ce("INFLUXDB_URL", "http://localhost:8086") DEFAULT_TOKEN = ce("INFLUXDB_TOKEN", "my-token") DEFAULT_ORG = ce("INFLUXDB_ORG", "my-org") DEFAULT_BUCKET = ce("INFLUXDB_BUCKET", "my-bucket") DEFAULT_TIMEOUT = ce("INFLUXDB_TIMEOUT", 10000) # In milliseconds def open(self): """Explicitly create the InfluxDB client instance :return: True if client created successfully, else false """ self.logger.debug("Creating InfluxDB Client Instance") try: self.client = InfluxDBClient( url=self.DEFAULT_URL, token=self.DEFAULT_TOKEN, org=self.DEFAULT_ORG, timeout=self.DEFAULT_TIMEOUT, ) self.query_api = self.client.query_api() self.logger.debug("Successfully created InfluxDB client instance") except Exception as error: self.logger.error(f"Error creating InfluxDB Client: {error}") return False return True def query(self, query): """Query the InfluxDB. :param query: A valid Flux query statement. :return: Results of the query, or None if an error occurs. """ if not self.client: self.logger.info("InfluxDB client not created, creating now.") if not self.open(): return None self.logger.debug("Submitting query to InfluxDB.") try: result = self.query_api.query(query) return result except Exception as error: self.logger.error(f"Error during query execution: {error}") return None Loading
src/common/mixins/influx.py 0 → 100644 +52 −0 Original line number Diff line number Diff line """Scratch file for showing how the mixins get used.""" from influxdb_client import InfluxDBClient from common.env import check_environment as ce class InfluxMixin: """Serve common connection method for InfluxDB.""" DEFAULT_URL = ce("INFLUXDB_URL", "http://localhost:8086") DEFAULT_TOKEN = ce("INFLUXDB_TOKEN", "my-token") DEFAULT_ORG = ce("INFLUXDB_ORG", "my-org") DEFAULT_BUCKET = ce("INFLUXDB_BUCKET", "my-bucket") DEFAULT_TIMEOUT = ce("INFLUXDB_TIMEOUT", 10000) # In milliseconds def open(self): """Explicitly create the InfluxDB client instance :return: True if client created successfully, else false """ self.logger.debug("Creating InfluxDB Client Instance") try: self.client = InfluxDBClient( url=self.DEFAULT_URL, token=self.DEFAULT_TOKEN, org=self.DEFAULT_ORG, timeout=self.DEFAULT_TIMEOUT, ) self.query_api = self.client.query_api() self.logger.debug("Successfully created InfluxDB client instance") except Exception as error: self.logger.error(f"Error creating InfluxDB Client: {error}") return False return True def query(self, query): """Query the InfluxDB. :param query: A valid Flux query statement. :return: Results of the query, or None if an error occurs. """ if not self.client: self.logger.info("InfluxDB client not created, creating now.") if not self.open(): return None self.logger.debug("Submitting query to InfluxDB.") try: result = self.query_api.query(query) return result except Exception as error: self.logger.error(f"Error during query execution: {error}") return None