Loading simulation_server/util/druid.py +26 −8 Original line number Diff line number Diff line Loading @@ -122,6 +122,23 @@ earliest = _size_func(sqla.func.earliest) earliest_py = _size_func(sqla.func.earliest_py) def table_is_ready(engine, tbl: str): """ sqla.inspect(conn.engine) returns tables that have streaming ingestion but no data yet and so still cause errors. Only workaround to check for this I've found is to just check for the error. This only seems to be an issue on older versions of Druid """ try: return sqla.inspect(engine).has_table(tbl) except Exception as e: # druid throws errors like "has_table() got an unexpected keyword argument 'info_cache'" if "has_table" in str(e): return False else: raise e def execute_ignore_missing(conn, stmt) -> sqla.CursorResult: """ Wrapper conn.execute that handles missing tables. Loading @@ -132,13 +149,14 @@ def execute_ignore_missing(conn, stmt) -> sqla.CursorResult: try: return conn.execute(stmt) except Exception as e: existing_tables = set(sqla.inspect(conn.engine).get_table_names()) try: stmt_tables = set([t.name for t in stmt.get_final_froms()]) missing_tables = stmt_tables - existing_tables missing_tables = [tbl for tbl in stmt_tables if not table_is_ready(conn.engine, tbl)] if missing_tables: logger.info(f"table(s) {', '.join(stmt_tables)} missing, returning empty result") logger.info(f"table(s) {', '.join(missing_tables)} missing, returning empty result") return conn.execute(sqla.text("SELECT 1 FROM (VALUES (1)) AS tbl(a) WHERE 1 != 1")) else: except: pass # Just raise the original error raise e Loading Loading
simulation_server/util/druid.py +26 −8 Original line number Diff line number Diff line Loading @@ -122,6 +122,23 @@ earliest = _size_func(sqla.func.earliest) earliest_py = _size_func(sqla.func.earliest_py) def table_is_ready(engine, tbl: str): """ sqla.inspect(conn.engine) returns tables that have streaming ingestion but no data yet and so still cause errors. Only workaround to check for this I've found is to just check for the error. This only seems to be an issue on older versions of Druid """ try: return sqla.inspect(engine).has_table(tbl) except Exception as e: # druid throws errors like "has_table() got an unexpected keyword argument 'info_cache'" if "has_table" in str(e): return False else: raise e def execute_ignore_missing(conn, stmt) -> sqla.CursorResult: """ Wrapper conn.execute that handles missing tables. Loading @@ -132,13 +149,14 @@ def execute_ignore_missing(conn, stmt) -> sqla.CursorResult: try: return conn.execute(stmt) except Exception as e: existing_tables = set(sqla.inspect(conn.engine).get_table_names()) try: stmt_tables = set([t.name for t in stmt.get_final_froms()]) missing_tables = stmt_tables - existing_tables missing_tables = [tbl for tbl in stmt_tables if not table_is_ready(conn.engine, tbl)] if missing_tables: logger.info(f"table(s) {', '.join(stmt_tables)} missing, returning empty result") logger.info(f"table(s) {', '.join(missing_tables)} missing, returning empty result") return conn.execute(sqla.text("SELECT 1 FROM (VALUES (1)) AS tbl(a) WHERE 1 != 1")) else: except: pass # Just raise the original error raise e Loading