Last Updated: 2025-12-27
This document provides a high-level overview of the PyB3 data ingestion pipeline's architecture. Its purpose is to explain how data flows from the source (B3) to the destination (Supabase database) and how the different components of the repository work together.
The PyB3 project is designed as a focused and robust ETL (Extract, Transform, Load) pipeline. The architecture prioritizes:
- Modularity: Each part of the process (download, parse, upload) is handled by a distinct component.
- Automation: The entire daily pipeline is automated via GitHub Actions, requiring zero manual intervention.
- Scalability: The system is designed to handle both daily incremental updates and large-scale historical backfills.
- Consistency: All data ingestion scripts follow a similar, class-based structure for maintainability.
The pipeline follows a classic ETL pattern, orchestrated by scripts and automated by workflows.
graph TD
subgraph "A. Extract"
B3[("B3 FTP/HTTP Servers")] -->|pyb3.fetch_marketdata| Cache[fa:fa-folder-open Local Cache<br>(~/.pyb3_cache/)]
end
subgraph "B. Transform"
Cache -->|pyb3.*_get()| Parse[fa:fa-cogs Parse & Structure<br>(Polars DataFrame)]
end
subgraph "C. Load"
Parse -->|Pyb3Client| DB[(fa:fa-database Supabase<br>PostgreSQL)]
end
subgraph "D. Orchestration"
GH_Actions[fa:fa-robot GitHub Actions] -->|Triggers| IngestionScripts[fa:fa-file-code Ingestion Scripts<br>(scripts/*.py)]
IngestionScripts --> B3
end
style B3 fill:#D5F5E3
style DB fill:#D5F5E3
style GH_Actions fill:#D6EAF8
- Extract: The
pyb3core library connects to B3's public servers to download raw data files (e.g., ZIP, TXT). These files are stored in a local cache (~/.pyb3_cache/) to avoid re-downloading. - Transform: The library then parses these raw files into structured, in-memory Polars DataFrames. This step involves cleaning data, casting types, and filtering for relevant information (e.g., equity-only trades).
- Load: The ingestion scripts take these DataFrames, convert them into records, and use the
Pyb3Clientto perform efficient, bulkUPSERToperations into the target tables in the Supabase PostgreSQL database.
The repository is organized into four main components:
- Purpose: Provides the fundamental tools for interacting with B3 data. It is completely agnostic of the database or how the data will be used.
- Key Modules:
data/: Low-level functions for downloading and reading raw files.api/: High-level, user-facing functions (cotahist_get,futures_get, etc.) that return Polars DataFrames.core/: Contains the YAML templates that define each B3 data source.db/: The database client (Pyb3Client) for communicating with PostgreSQL.
- Purpose: These are the executable entry points for the ETL process. Each script is responsible for one specific data type.
- Function: A script uses the
pyb3library to fetch and parse data, then uses thePyb3Clientto load it into the database. - Examples:
ingest_cotahist_data.py,ingest_futures_data.py.
- Purpose: To orchestrate the execution of the ingestion scripts automatically.
- Key Workflows:
data-ingestion-daily.yml: Runs every night. It uses amatrixstrategy to run a parallel job for each daily data type.data-ingestion-backfill-*.yml: Manually triggered workflows that allow a user to populate historical data for specific date ranges or years.data-ingestion-test.yml: A manual workflow for testing all ingestion scripts against a single day's data.
- Purpose: Contains the SQL
CREATE TABLEstatements that define the structure of the database. - Function: These files are the single source of truth for the database schema. They are used to set up the database initially.
- Examples:
001_initial_schema.sql,003_add_futures_and_yieldcurves_tables.sql.
The database is designed to store raw and structured data from B3.
- Technology: PostgreSQL (hosted on Supabase).
- Interaction: All database operations are performed via the
Pyb3Clientusing theasyncpgdriver. - Key Tables:
cotahist: Daily stock and option quotes.futures: Daily settlement prices for futures contracts.yield_curves: Daily reference interest rate curves.index_historical: Historical daily values of B3 indexes.index_composition: Portfolio composition of B3 indexes.symbols: Metadata about each financial instrument.
- Primary Keys: Each table uses a composite primary key, typically
(refdate, <id_columns>), to uniquely identify a record. - Upsert Logic: All ingestion scripts use an
ON CONFLICT (...) DO UPDATEclause. This makes the ingestion process idempotent—running the script for the same day twice will update existing records rather than creating duplicates.
A developer can run any ingestion script locally. This is the primary method for development and debugging.
- Set the
DATABASE_URLenvironment variable. - Execute the desired script from the command line:
python scripts/ingest_futures_data.py --date 2024-01-15
In the automated environment, GitHub Actions takes over the role of the developer.
- The
DATABASE_URLis securely provided via GitHub Secrets. - A workflow (e.g.,
data-ingestion-daily.yml) is triggered by a schedule or a manual event. - The workflow runs one or more jobs, each of which executes an ingestion script with the appropriate command-line arguments.
This separation of concerns ensures that the core library and scripts are independent of the automation environment, making them highly portable and easy to test.