This python script provides a easy and parameterizeable way of defining typical dvc (sub-)stages for:
- data prepossessing
- data transformation
- data splitting
- data validation
This is an example of how you may give instructions on setting up your project locally. To get a local copy up and running follow these simple example steps.
pandas>=0.20.*dvc>=2.12.*pyyaml>=5
This package is available on PyPI. You install it and all of its dependencies using pip:
pip install dvc-stageDVC-Stage works on top of two files: dvc.yaml and params.yaml. They are expected to be at the root of an initialized dvc project. From there you can execute dvc-stage -h to see available commands or dvc-stage get-config STAGE to generate the dvc stages from the params.yaml file. The tool then generates the respective yaml which you can then manually paste into the dvc.yaml file. Existing stages can then be updated inplace using dvc-stage update-stage STAGE.
Stages are defined inside params.yaml in the following schema:
STAGE_NAME:
load: {}
transformations: []
validations: []
write: {}The load and write sections both require the yaml-keys path and format to read and save data respectively.
The transformations and validations sections require a sequence of functions to apply, where transformations return data and validations return a truth value (derived from data). Functions are defined by the key id and can be either:
-
Methods defined on Pandas DataFrames, e.g.
transformations: - id: transpose
-
Imported from any python module, e.g.
transformations: - id: custom description: duplicate rows import_from: demo.duplicate
-
Predefined by DVC-Stage, e.g.
validations: - id: validate_pandera_schema schema: import_from: demo.get_schema
When writing a custom function, you need to make sure the function gracefully handles data being None, which is required for type inference. Data is passed as first argument. Further arguments can be provided as additional keys, as shown above for validate_pandera_schema, where schema is passed as second argument to the function.
The examples directory contains a complete working demonstration:
- Setup: Navigate to the examples directory
- Data: Sample data files are provided in
data - Configuration:
params.yamlcontains all pipeline definitions - Custom functions:
src/demo.pycontains example custom functions - DVC configuration:
dvc.yamlcontains the generated DVC stages
To run all examples:
cd examples
# Update all stage deffinitions
dvc-stage update-all -y
# Reproduce pipeline
dvc repro-
Example 1: Basic Demo Pipeline
The simplest example demonstrates basic data loading, transformation, validation, and writing:
demo_pipeline: dvc_stage_args: log-level: ${log_level} log-file: ${log_file} load: path: load.csv format: csv transformations: - id: custom description: duplicate rows import_from: demo.duplicate - id: transpose - id: rename columns: 0.0: O1 1.0: O2 2.0: D1 3.0: D2 validations: - id: custom description: check none import_from: demo.isNotNone - id: isnull reduction: any expected: false - id: validate_pandera_schema schema: import_from: demo.get_schema write: format: csv
What this pipeline does:
- Load: Reads data from
load.csv - Transform:
- Duplicates all rows using a custom function
- Transposes the DataFrame
- Renames columns from numeric to meaningful names
- Validate:
- Checks that data is not None
- Ensures no null values exist
- Validates against a Pandera schema
- Write: Saves the result to
outdir/out.csv
Run with:
cd examples dvc repro demo_pipeline - Load: Reads data from
-
Example 2: Foreach Pipeline
Process multiple datasets with the same pipeline using foreach stages:
foreach_pipeline: dvc_stage_args: log-level: ${log_level} log-file: ${log_file} foreach: [dataset_a, dataset_b, dataset_c] load: path: data/${item}/input.csv format: csv transformations: - id: fillna value: 0 - id: custom description: normalize data import_from: demo.normalize_data columns: [value1, value2] validations: - id: validate_pandera_schema schema: import_from: demo.get_foreach_schema - id: custom description: check data quality import_from: demo.check_data_quality min_rows: 5 write: path: outdir/${item}_${key}_processed.csv
What this pipeline does:
- Foreach: Processes three datasets (dataseta, datasetb, datasetc)
- Load: Reads from
data/${item}/input.csvwhere${item}is replaced with each dataset name - Transform:
- Fills missing values with 0
- Normalizes specified columns using min-max scaling
- Validate:
- Validates against a pandera schema
- Checks data quality (minimum row count)
- Write: Saves each processed dataset to
outdir/${item}_${key}_processed.csv
Run with:
cd examples dvc repro foreach_pipeline -
Example 3: Advanced Multi-Input Pipeline
Handle multiple input files with data splitting:
advanced_pipeline: dvc_stage_args: log-level: ${log_level} log-file: ${log_file} load: path: - data/features.csv - data/labels.csv format: csv key_map: features: data/features.csv labels: data/labels.csv transformations: - id: split include: [features] by: id id_col: category left_split_key: train right_split_key: test size: 0.5 seed: 42 - id: combine include: [train, test] new_key: combined_data validations: - id: validate_pandera_schema schema: import_from: demo.get_advanced_schema include: [combined] write: path: outdir/${key}.csv
What this pipeline does:
- Load: Reads multiple files and maps them to keys (features, labels)
- Transform:
- The features table is spitted along the categories in two data frames containing each 50% of the data
- The spitted data is again combined into a single table
- Validate: Validates both train and test sets against a schema
- Write: Saves train.csv and test.csv to the output directory
Run with:
cd examples dvc repro advanced_pipeline -
Example 4: Time Series Pipeline
Process time series data with date-based splitting:
timeseries_pipeline: dvc_stage_args: log-level: ${log_level} log-file: ${log_file} load: path: data/timeseries.csv format: csv parse_dates: [timestamp] index_col: timestamp transformations: - id: reset_index - id: add_date_offset_to_column column: timestamp days: 1 - id: split by: date_time left_split_key: train right_split_key: test size: 0.8 freq: D date_time_col: timestamp - id: set_index keys: timestamp validations: - id: validate_pandera_schema schema: import_from: demo.get_timeseries_schema - id: custom description: validate split ratio pass_dict_to_fn: true import_from: demo.validate_split_ratio reduction: none expected_ratio: 0.8 tolerance: 0.05 write: path: outdir/timeseries_${key}.csv
What this pipeline does:
- Load: Reads time series data with proper datetime parsing
- Transform:
- Reset pandas index
- Adds a date offset to the timestamps
- Splits data chronologically (80% train, 20% test) by date
- Set timestamp as index
- Validate:
- Validates against a time series specific schema
- Validate the split ratio
- Write: Saves timeseriestrain.csv and timeseriestest.csv
Run with:
cd examples dvc repro timeseries_pipeline
DVC-Stage provides several built-in transformations:
-
split: Split data (random, datetime, or id-based)
-
combine: Combine multiple DataFrames
-
columntransformerfit: Fit sklearn column transformers
-
columntransformertransform: Apply fitted transformers
-
adddateoffsettocolumn: Add time offsets to date columns
Additionally all pandas DataFrame methods can be used, e.g.:
-
fillna: Fill missing values
-
dropna: Drop rows with missing values
-
transpose: Transpose the DataFrame
-
rename: Rename columns
DVC-Stage provides several built-in validations:
- validatepanderaschema: Validate against Pandera schemas
- Custom validations: Import your own validation functions
Additionally all pandas DataFrame methods can be used, e.g.:
- isnull: Check for null values
When creating custom functions for transformations or validations:
- Handle None gracefully: Your function should return appropriate values when data is None
- First argument is data: The DataFrame or data structure is always the first parameter
- Additional parameters: Pass extra arguments as YAML keys in your stage definition
- Return appropriate types: Transformations return data, validations return boolean values
Example custom function:
def normalize_data(data: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
"""Normalize specified columns using min-max scaling."""
if data is None:
return None
result = data.copy()
for col in columns:
if col in result.columns:
min_val = result[col].min()
max_val = result[col].max()
if max_val > min_val:
result[col] = (result[col] - min_val) / (max_val - min_val)
return resultAny Contributions are greatly appreciated! If you have a question, an issue or would like to contribute, please read our contributing guidelines.
Distributed under the GNU General Public License v3
Marcel Arpogaus - [email protected] (encrypted with ROT13)
Project Link: https://github.com/MArpogaus/dvc-stage
Parts of this work have been funded by the Federal Ministry for the Environment, Nature Conservation and Nuclear Safety due to a decision of the German Federal Parliament (AI4Grids: 67KI2012A).