Loading src/common/mixins/pandas.py 0 → 100644 +45 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Provide a `pandas` mixin for the Database class.""" import pandas as pd import sqlalchemy class Pandas_Mixin(): """Group together all pandas logic.""" def query_pandas(self, query=None, table=None, schema=None): """Return a query as a pandas table. Provided a query, uses the default cursor assumming a PostgreSQL connection. :param query: a query string to return as a table :param table: the table name to retrieve data from :param schema: the schema to select the table from :returns: a pandas.DataFframe object """ def make_dataframe(query): """Create a pandas dataframe from a query.""" if self.cursor is None: # TODO raise an appropriate error here self.logger.error('Cursor is not open, please create one.') # TODO wrap in a try except block self.logger.debug(f'Returning {query} as pandas dataframe.') data_fetch_query = self.cursor.execute(query) if data_fetch_query is None: # FIXME better way to handle this? self.logger.error(f"Unable to fetch {query}.") # FIXME raise an Exception here? dataframe = pd.DataFrame(data_fetch_query.fetchall()) dataframe.columns = data_fetch_query.keys() return dataframe if query is not None and not isinstance(self.connection, sqlalchemy.engine.base.Engine): return make_dataframe(query) if table is not None and schema is not None and \ isinstance(self.connection, sqlalchemy.engine.base.Engine): return pd.read_from_table(table_name=table, schema=schema) if query is not None and \ isinstance(self.connection, sqlalchemy.engine.base.Engine): return pd.read_sql_from_query(query, ur=self.connection_inf['uri']) Loading
src/common/mixins/pandas.py 0 → 100644 +45 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Provide a `pandas` mixin for the Database class.""" import pandas as pd import sqlalchemy class Pandas_Mixin(): """Group together all pandas logic.""" def query_pandas(self, query=None, table=None, schema=None): """Return a query as a pandas table. Provided a query, uses the default cursor assumming a PostgreSQL connection. :param query: a query string to return as a table :param table: the table name to retrieve data from :param schema: the schema to select the table from :returns: a pandas.DataFframe object """ def make_dataframe(query): """Create a pandas dataframe from a query.""" if self.cursor is None: # TODO raise an appropriate error here self.logger.error('Cursor is not open, please create one.') # TODO wrap in a try except block self.logger.debug(f'Returning {query} as pandas dataframe.') data_fetch_query = self.cursor.execute(query) if data_fetch_query is None: # FIXME better way to handle this? self.logger.error(f"Unable to fetch {query}.") # FIXME raise an Exception here? dataframe = pd.DataFrame(data_fetch_query.fetchall()) dataframe.columns = data_fetch_query.keys() return dataframe if query is not None and not isinstance(self.connection, sqlalchemy.engine.base.Engine): return make_dataframe(query) if table is not None and schema is not None and \ isinstance(self.connection, sqlalchemy.engine.base.Engine): return pd.read_from_table(table_name=table, schema=schema) if query is not None and \ isinstance(self.connection, sqlalchemy.engine.base.Engine): return pd.read_sql_from_query(query, ur=self.connection_inf['uri'])