Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new “separate” execution mode that runs SQL directly against each datasource’s native SQLAlchemy engine (instead of routing through DuckDB), including Snowflake engine creation and schema introspection to populate the system prompt.
Changes:
- Introduce
SeparateExecutor+SeparateGraphwithrun_sql_query/submit_resulttools and a dedicated system prompt template. - Add SQLAlchemy-based Snowflake schema inspection (
information_schema.tables/columns) to generateTableInfo/ColumnInfofor prompt schema. - Extend database adapter plumbing with
try_create_sqlalchemy_engine()and a Snowflakecreate_sqlalchemy_engine()implementation.
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| databao/agent/sqlalchemy/schema_inspection.py | New SQLAlchemy schema inspector (currently Snowflake-only) producing TableInfo/ColumnInfo. |
| databao/agent/sqlalchemy/init.py | Initializes databao.agent.sqlalchemy package. |
| databao/agent/executors/separate/system_prompt.jinja | New system prompt template for the separate executor (includes Snowflake quoting guidance). |
| databao/agent/executors/separate/separate_executor.py | New executor that builds per-datasource SQLAlchemy engines and injects schema into the prompt. |
| databao/agent/executors/separate/graph.py | New LangGraph tool loop to run SQL via SQLAlchemy engines and submit results. |
| databao/agent/databases/snowflake_adapter.py | Adds Snowflake SQLAlchemy engine creation from connection config (password/keypair/SSO). |
| databao/agent/databases/databases.py | Adds try_create_sqlalchemy_engine() helper to delegate engine creation to adapters. |
| databao/agent/databases/database_adapter.py | Adds optional create_sqlalchemy_engine() hook on adapters (default None). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @classmethod | ||
| def create_sqlalchemy_engine(cls, config: DBConnectionConfig) -> Engine | None: | ||
| if not isinstance(config, SnowflakeConnectionProperties): | ||
| return None | ||
|
|
||
| from snowflake.sqlalchemy import URL # type: ignore[import-untyped] | ||
|
|
||
| url_kwargs: dict[str, str] = {"account": config.account} | ||
| if config.user: | ||
| url_kwargs["user"] = config.user | ||
| if config.database: | ||
| url_kwargs["database"] = config.database | ||
| if config.warehouse: | ||
| url_kwargs["warehouse"] = config.warehouse | ||
| if config.role: | ||
| url_kwargs["role"] = config.role | ||
|
|
||
| connect_args: dict[str, Any] = {} | ||
| auth = config.auth | ||
| if isinstance(auth, SnowflakePasswordAuth): | ||
| url_kwargs["password"] = auth.password | ||
| elif isinstance(auth, SnowflakeKeyPairAuth): | ||
| connect_args["private_key"] = cls._load_private_key_bytes(auth) | ||
| elif isinstance(auth, SnowflakeSSOAuth): | ||
| url_kwargs["authenticator"] = auth.authenticator | ||
| else: | ||
| return None | ||
|
|
||
| if connect_args: | ||
| return create_engine(URL(**url_kwargs), connect_args=connect_args) | ||
| return create_engine(URL(**url_kwargs)) |
There was a problem hiding this comment.
New create_sqlalchemy_engine() behavior is not covered by tests. There is already comprehensive coverage for SnowflakeAdapter helpers in tests/test_snowflake_adapter.py; please add unit tests that validate URL/connect_args generation for password, key-pair, and SSO auth (and that additional_properties are preserved).
|
@kosstbarz please address the copilot's comments somehow :) I personally don't look into the code until AI is happy |
… additional properties
ca1d38d to
863a5c9
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 13 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| db_tables = inspect_sqlalchemy_schema(engine) | ||
| # Use the registered name as table_catalog so the LLM can derive | ||
| # the datasource argument directly from the schema prefix. | ||
| tables.extend(replace(t, table_catalog=name) for t in db_tables) |
There was a problem hiding this comment.
_inspect_database_schema() rewrites TableInfo.table_catalog to the datasource name, but leaves columns_catalog unchanged. Downstream helpers (e.g. summarize_duckdb_schema_overview) assume a 1:1 mapping between table_catalog and columns_catalog, so this can lead to duplicated/incorrect grouping. Consider rewriting both fields (or constructing a new TableInfo) so columns_catalog stays consistent with the synthetic datasource catalog name.
| tables.extend(replace(t, table_catalog=name) for t in db_tables) | |
| tables.extend( | |
| replace(t, table_catalog=name, columns_catalog=name) for t in db_tables | |
| ) |
|
|
||
|
|
||
| def exception_to_string(e: Exception | str) -> str: | ||
| if isinstance(e, str): | ||
| return e | ||
| return f"Exception Name: {type(e).__name__}. Exception Desc: {e}" | ||
|
|
||
|
|
There was a problem hiding this comment.
This file reimplements exception_to_string() even though an identical helper exists in databao.agent.executors.utils. Reusing the shared helper avoids divergence and keeps error formatting consistent across executors.
| def exception_to_string(e: Exception | str) -> str: | |
| if isinstance(e, str): | |
| return e | |
| return f"Exception Name: {type(e).__name__}. Exception Desc: {e}" | |
| from databao.agent.executors.utils import exception_to_string |
| def trim_string_middle( | ||
| content: str, max_length: int | None, sep: str = "[...trimmed...]", front_percentage: float = 0.7 | ||
| ) -> str: | ||
| if max_length is None or len(content) <= max_length: | ||
| return content | ||
| take_front = max(0, math.ceil(max_length * front_percentage) - len(sep) // 2) | ||
| take_end = max(0, max_length - take_front - len(sep)) | ||
| return content[:take_front] + sep + content[len(content) - take_end :] | ||
|
|
||
|
|
||
| def trim_dataframe_values(df: pd.DataFrame, max_cell_chars: int | None) -> pd.DataFrame: | ||
| df_sanitized = df.copy() | ||
| if max_cell_chars is None: | ||
| return df_sanitized | ||
|
|
||
| def trim_cell(val: Any) -> str: | ||
| return trim_string_middle(str(val), max_cell_chars) | ||
|
|
||
| for col, dtype in zip(df_sanitized.columns, df_sanitized.dtypes, strict=True): | ||
| if not pd.api.types.is_object_dtype(dtype) and not pd.api.types.is_string_dtype(dtype): | ||
| continue | ||
| df_sanitized[col] = df_sanitized[col].apply(trim_cell) | ||
| return df_sanitized |
There was a problem hiding this comment.
trim_string_middle() / trim_dataframe_values() duplicate logic already present in databao.agent.executors.utils. Consider importing and reusing the shared implementations to reduce maintenance overhead and keep truncation behavior consistent across executors.
| if auth.private_key: | ||
| pem_data = auth.private_key.encode() | ||
| elif auth.private_key_file: | ||
| pem_data = Path(auth.private_key_file).read_bytes() |
There was a problem hiding this comment.
_load_private_key_bytes() reads private_key_file directly and will propagate raw OSError/FileNotFoundError exceptions. For consistency with _create_secret_params() (which wraps file read errors in a ValueError with a clear message), consider catching OSError here and raising a ValueError that points to the invalid private_key_file path.
| pem_data = Path(auth.private_key_file).read_bytes() | |
| try: | |
| pem_data = Path(auth.private_key_file).read_bytes() | |
| except OSError as exc: | |
| raise ValueError(f"Failed to read private key file at '{auth.private_key_file}'.") from exc |
…comma in Snowflake OAuth example.
No description provided.