Loading src/common/mixins/influx.py +5 −5 Original line number Diff line number Diff line Loading @@ -8,11 +8,11 @@ from common.env import check_environment as ce class InfluxMixin: """Serve common connection method for InfluxDB.""" DEFAULT_HOST = ce("INFLUXDB_HOST", "localhost") DEFAULT_PORT = int(ce("INFLUXDB_PORT", 8086)) DEFAULT_USER = ce("INFLUXDB_ADMIN_USER", "admin") DEFAULT_PASSWORD = ce("INFLUXDB_ADMIN_PASSWORD", "secret") DEFAULT_DB = ce("INFLUX_DB", "cast") DEFAULT_HOST = ce('DATABASE_HOST_IN', ce("INFLUXDB_HOST", "localhost")) DEFAULT_PORT = int(ce('DATABASE_PORT_IN', ce("INFLUXDB_PORT", 8086))) DEFAULT_USER = ce('DATABASE_USER_IN', ce("INFLUXDB_ADMIN_USER", "admin")) DEFAULT_PASSWORD = ce('DATABASE_PW_IN', ce("INFLUXDB_ADMIN_PASSWORD", "secret")) DEFAULT_DB = ce('DATABASE_DB_IN', ce("INFLUX_DB", "cast")) def open(self): """Create the InfluxDB client instance.""" Loading src/common/mixins/multiple.py +52 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ class MultiDatabase: db_type: str = config.get("type") db_id: str = config.get("id", str(uuid4())) self.logger.debug(f"Creating {db_type} database with id {db_id}") config = self._rename_env_vars(db_type, config) with mock_env_vars(config): mixin = self._get_mixin_for_type(db_type=db_type) if mixin: Loading @@ -30,6 +31,57 @@ class MultiDatabase: else: self.logger.warning(f"Skipping unknown Database type: {db_type}") @staticmethod def _rename_env_vars(db_type, config): def get_pref_value(config, keys, default=None): for key in keys: if key in config: return config[key] return default suffix = None if db_type.startswith("postgres") or db_type.startswith("pg"): suffix = 'PG' elif db_type.startswith("mssql"): suffix = 'MS' elif db_type.startswith("influx"): suffix = 'IN' if suffix is None: return config new_config = {} # Define mappings of old keys to new keys key_mappings = { 'host': f'DATABASE_HOST_{suffix}', 'dbHost': f'DATABASE_HOST_{suffix}', 'port': f'DATABASE_PORT_{suffix}', 'dbPort': f'DATABASE_PORT_{suffix}', 'db': f'DATABASE_DB_{suffix}', 'database': f'DATABASE_DB_{suffix}', 'dbName': f'DATABASE_DB_{suffix}', 'user': f'DATABASE_USER_{suffix}', 'dbUser': f'DATABASE_USER_{suffix}', 'pw': f'DATABASE_PW_{suffix}', 'password': f'DATABASE_PW_{suffix}', 'pass': f'DATABASE_PW_{suffix}', 'dbPass': f'DATABASE_PW_{suffix}', 'dbPassword': f'DATABASE_PW_{suffix}', } # Rename keys and pop old keys if replacements exist for old_key, new_key in key_mappings.items(): new_value = get_pref_value(config, [old_key]) if new_value is not None: new_config[new_key] = new_value config.pop(old_key, None) # Update config with new entries config.update(new_config) return config @staticmethod def _get_mixin_for_type( db_type: str, Loading Loading
src/common/mixins/influx.py +5 −5 Original line number Diff line number Diff line Loading @@ -8,11 +8,11 @@ from common.env import check_environment as ce class InfluxMixin: """Serve common connection method for InfluxDB.""" DEFAULT_HOST = ce("INFLUXDB_HOST", "localhost") DEFAULT_PORT = int(ce("INFLUXDB_PORT", 8086)) DEFAULT_USER = ce("INFLUXDB_ADMIN_USER", "admin") DEFAULT_PASSWORD = ce("INFLUXDB_ADMIN_PASSWORD", "secret") DEFAULT_DB = ce("INFLUX_DB", "cast") DEFAULT_HOST = ce('DATABASE_HOST_IN', ce("INFLUXDB_HOST", "localhost")) DEFAULT_PORT = int(ce('DATABASE_PORT_IN', ce("INFLUXDB_PORT", 8086))) DEFAULT_USER = ce('DATABASE_USER_IN', ce("INFLUXDB_ADMIN_USER", "admin")) DEFAULT_PASSWORD = ce('DATABASE_PW_IN', ce("INFLUXDB_ADMIN_PASSWORD", "secret")) DEFAULT_DB = ce('DATABASE_DB_IN', ce("INFLUX_DB", "cast")) def open(self): """Create the InfluxDB client instance.""" Loading
src/common/mixins/multiple.py +52 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ class MultiDatabase: db_type: str = config.get("type") db_id: str = config.get("id", str(uuid4())) self.logger.debug(f"Creating {db_type} database with id {db_id}") config = self._rename_env_vars(db_type, config) with mock_env_vars(config): mixin = self._get_mixin_for_type(db_type=db_type) if mixin: Loading @@ -30,6 +31,57 @@ class MultiDatabase: else: self.logger.warning(f"Skipping unknown Database type: {db_type}") @staticmethod def _rename_env_vars(db_type, config): def get_pref_value(config, keys, default=None): for key in keys: if key in config: return config[key] return default suffix = None if db_type.startswith("postgres") or db_type.startswith("pg"): suffix = 'PG' elif db_type.startswith("mssql"): suffix = 'MS' elif db_type.startswith("influx"): suffix = 'IN' if suffix is None: return config new_config = {} # Define mappings of old keys to new keys key_mappings = { 'host': f'DATABASE_HOST_{suffix}', 'dbHost': f'DATABASE_HOST_{suffix}', 'port': f'DATABASE_PORT_{suffix}', 'dbPort': f'DATABASE_PORT_{suffix}', 'db': f'DATABASE_DB_{suffix}', 'database': f'DATABASE_DB_{suffix}', 'dbName': f'DATABASE_DB_{suffix}', 'user': f'DATABASE_USER_{suffix}', 'dbUser': f'DATABASE_USER_{suffix}', 'pw': f'DATABASE_PW_{suffix}', 'password': f'DATABASE_PW_{suffix}', 'pass': f'DATABASE_PW_{suffix}', 'dbPass': f'DATABASE_PW_{suffix}', 'dbPassword': f'DATABASE_PW_{suffix}', } # Rename keys and pop old keys if replacements exist for old_key, new_key in key_mappings.items(): new_value = get_pref_value(config, [old_key]) if new_value is not None: new_config[new_key] = new_value config.pop(old_key, None) # Update config with new entries config.update(new_config) return config @staticmethod def _get_mixin_for_type( db_type: str, Loading