Skip to content

MArpogaus/dvc-stage

Repository files navigation

img img img img img img img img img

img

DVC-Stage

  1. About The Project
  2. Getting Started
    1. Prerequisites
    2. Installation
  3. Usage
    1. Basic Stage Structure
    2. Examples
    3. Built-in Transformations
    4. Built-in Validations
    5. Using Custom Functions
  4. Contributing
  5. License
  6. Contact
  7. Acknowledgments

About The Project

This python script provides a easy and parameterizeable way of defining typical dvc (sub-)stages for:

  • data prepossessing
  • data transformation
  • data splitting
  • data validation

Getting Started

This is an example of how you may give instructions on setting up your project locally. To get a local copy up and running follow these simple example steps.

Prerequisites

  • pandas>=0.20.*
  • dvc>=2.12.*
  • pyyaml>=5

Installation

This package is available on PyPI. You install it and all of its dependencies using pip:

pip install dvc-stage

Usage

DVC-Stage works on top of two files: dvc.yaml and params.yaml. They are expected to be at the root of an initialized dvc project. From there you can execute dvc-stage -h to see available commands or dvc-stage get-config STAGE to generate the dvc stages from the params.yaml file. The tool then generates the respective yaml which you can then manually paste into the dvc.yaml file. Existing stages can then be updated inplace using dvc-stage update-stage STAGE.

Basic Stage Structure

Stages are defined inside params.yaml in the following schema:

STAGE_NAME:
  load: {}
  transformations: []
  validations: []
  write: {}

The load and write sections both require the yaml-keys path and format to read and save data respectively.

The transformations and validations sections require a sequence of functions to apply, where transformations return data and validations return a truth value (derived from data). Functions are defined by the key id and can be either:

  • Methods defined on Pandas DataFrames, e.g.

    transformations:
      - id: transpose
  • Imported from any python module, e.g.

    transformations:
      - id: custom
        description: duplicate rows
        import_from: demo.duplicate
  • Predefined by DVC-Stage, e.g.

    validations:
      - id: validate_pandera_schema
        schema:
          import_from: demo.get_schema

When writing a custom function, you need to make sure the function gracefully handles data being None, which is required for type inference. Data is passed as first argument. Further arguments can be provided as additional keys, as shown above for validate_pandera_schema, where schema is passed as second argument to the function.

Examples

The examples directory contains a complete working demonstration:

  1. Setup: Navigate to the examples directory
  2. Data: Sample data files are provided in data
  3. Configuration: params.yaml contains all pipeline definitions
  4. Custom functions: src/demo.py contains example custom functions
  5. DVC configuration: dvc.yaml contains the generated DVC stages

To run all examples:

cd examples

# Update all stage deffinitions
dvc-stage update-all -y

# Reproduce pipeline
dvc repro
  1. Example 1: Basic Demo Pipeline

    The simplest example demonstrates basic data loading, transformation, validation, and writing:

    demo_pipeline:
      dvc_stage_args:
        log-level: ${log_level}
        log-file: ${log_file}
      load:
        path: load.csv
        format: csv
      transformations:
      - id: custom
        description: duplicate rows
        import_from: demo.duplicate
      - id: transpose
      - id: rename
        columns:
          0.0: O1
          1.0: O2
          2.0: D1
          3.0: D2
      validations:
      - id: custom
        description: check none
        import_from: demo.isNotNone
      - id: isnull
        reduction: any
        expected: false
      - id: validate_pandera_schema
        schema:
          import_from: demo.get_schema
      write:
        format: csv

    What this pipeline does:

    1. Load: Reads data from load.csv
    2. Transform:
      • Duplicates all rows using a custom function
      • Transposes the DataFrame
      • Renames columns from numeric to meaningful names
    3. Validate:
      • Checks that data is not None
      • Ensures no null values exist
      • Validates against a Pandera schema
    4. Write: Saves the result to outdir/out.csv

    Run with:

    cd examples
    dvc repro demo_pipeline
  2. Example 2: Foreach Pipeline

    Process multiple datasets with the same pipeline using foreach stages:

    foreach_pipeline:
      dvc_stage_args:
        log-level: ${log_level}
        log-file: ${log_file}
      foreach: [dataset_a, dataset_b, dataset_c]
      load:
        path: data/${item}/input.csv
        format: csv
      transformations:
      - id: fillna
        value: 0
      - id: custom
        description: normalize data
        import_from: demo.normalize_data
        columns: [value1, value2]
      validations:
      - id: validate_pandera_schema
        schema:
          import_from: demo.get_foreach_schema
      - id: custom
        description: check data quality
        import_from: demo.check_data_quality
        min_rows: 5
      write:
        path: outdir/${item}_${key}_processed.csv

    What this pipeline does:

    1. Foreach: Processes three datasets (dataseta, datasetb, datasetc)
    2. Load: Reads from data/${item}/input.csv where ${item} is replaced with each dataset name
    3. Transform:
      • Fills missing values with 0
      • Normalizes specified columns using min-max scaling
    4. Validate:
      • Validates against a pandera schema
      • Checks data quality (minimum row count)
    5. Write: Saves each processed dataset to outdir/${item}_${key}_processed.csv

    Run with:

    cd examples
    dvc repro foreach_pipeline
  3. Example 3: Advanced Multi-Input Pipeline

    Handle multiple input files with data splitting:

    advanced_pipeline:
      dvc_stage_args:
        log-level: ${log_level}
        log-file: ${log_file}
      load:
        path:
        - data/features.csv
        - data/labels.csv
        format: csv
        key_map:
          features: data/features.csv
          labels: data/labels.csv
      transformations:
      - id: split
        include: [features]
        by: id
        id_col: category
        left_split_key: train
        right_split_key: test
        size: 0.5
        seed: 42
      - id: combine
        include: [train, test]
        new_key: combined_data
      validations:
      - id: validate_pandera_schema
        schema:
          import_from: demo.get_advanced_schema
        include: [combined]
      write:
        path: outdir/${key}.csv

    What this pipeline does:

    1. Load: Reads multiple files and maps them to keys (features, labels)
    2. Transform:
      • The features table is spitted along the categories in two data frames containing each 50% of the data
      • The spitted data is again combined into a single table
    3. Validate: Validates both train and test sets against a schema
    4. Write: Saves train.csv and test.csv to the output directory

    Run with:

    cd examples
    dvc repro advanced_pipeline
  4. Example 4: Time Series Pipeline

    Process time series data with date-based splitting:

    timeseries_pipeline:
      dvc_stage_args:
        log-level: ${log_level}
        log-file: ${log_file}
      load:
        path: data/timeseries.csv
        format: csv
        parse_dates: [timestamp]
        index_col: timestamp
      transformations:
      - id: reset_index
      - id: add_date_offset_to_column
        column: timestamp
        days: 1
      - id: split
        by: date_time
        left_split_key: train
        right_split_key: test
        size: 0.8
        freq: D
        date_time_col: timestamp
      - id: set_index
        keys: timestamp
      validations:
      - id: validate_pandera_schema
        schema:
          import_from: demo.get_timeseries_schema
      - id: custom
        description: validate split ratio
        pass_dict_to_fn: true
        import_from: demo.validate_split_ratio
        reduction: none
        expected_ratio: 0.8
        tolerance: 0.05
      write:
        path: outdir/timeseries_${key}.csv

    What this pipeline does:

    1. Load: Reads time series data with proper datetime parsing
    2. Transform:
      • Reset pandas index
      • Adds a date offset to the timestamps
      • Splits data chronologically (80% train, 20% test) by date
      • Set timestamp as index
    3. Validate:
      • Validates against a time series specific schema
      • Validate the split ratio
    4. Write: Saves timeseriestrain.csv and timeseriestest.csv

    Run with:

    cd examples
    dvc repro timeseries_pipeline

Built-in Transformations

DVC-Stage provides several built-in transformations:

  • split: Split data (random, datetime, or id-based)

  • combine: Combine multiple DataFrames

  • columntransformerfit: Fit sklearn column transformers

  • columntransformertransform: Apply fitted transformers

  • adddateoffsettocolumn: Add time offsets to date columns

    Additionally all pandas DataFrame methods can be used, e.g.:

  • fillna: Fill missing values

  • dropna: Drop rows with missing values

  • transpose: Transpose the DataFrame

  • rename: Rename columns

Built-in Validations

DVC-Stage provides several built-in validations:

  • validatepanderaschema: Validate against Pandera schemas
  • Custom validations: Import your own validation functions

Additionally all pandas DataFrame methods can be used, e.g.:

  • isnull: Check for null values

Using Custom Functions

When creating custom functions for transformations or validations:

  1. Handle None gracefully: Your function should return appropriate values when data is None
  2. First argument is data: The DataFrame or data structure is always the first parameter
  3. Additional parameters: Pass extra arguments as YAML keys in your stage definition
  4. Return appropriate types: Transformations return data, validations return boolean values

Example custom function:

def normalize_data(data: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
    """Normalize specified columns using min-max scaling."""
    if data is None:
        return None

    result = data.copy()
    for col in columns:
        if col in result.columns:
            min_val = result[col].min()
            max_val = result[col].max()
            if max_val > min_val:
                result[col] = (result[col] - min_val) / (max_val - min_val)
    return result

Contributing

Any Contributions are greatly appreciated! If you have a question, an issue or would like to contribute, please read our contributing guidelines.

License

Distributed under the GNU General Public License v3

Contact

Marcel Arpogaus - [email protected] (encrypted with ROT13)

Project Link: https://github.com/MArpogaus/dvc-stage

Acknowledgments

Parts of this work have been funded by the Federal Ministry for the Environment, Nature Conservation and Nuclear Safety due to a decision of the German Federal Parliament (AI4Grids: 67KI2012A).

Packages

 
 
 

Contributors

Languages