Welcome to Module 2 of the Data Engineering Zoomcamp! This week, we’ll dive into workflow orchestration using Kestra.
Kestra is an open-source, event-driven orchestration platform that simplifies building both scheduled and event-driven workflows. By adopting Infrastructure as Code practices for data and process orchestration, Kestra enables you to build reliable workflows with just a few lines of YAML.
Note
You can find all videos for this week in this YouTube Playlist.
- 2.1 - Introduction to Workflow Orchestration
- 2.2 - Getting Started With Kestra
- 2.3 - Hands-On Coding Project: Build ETL Data Pipelines with Kestra
- 2.4 - ELT Pipelines in Kestra: Google Cloud Platform
- 2.5 - Using AI for Data Engineering in Kestra
- 2.6 - Bonus
In this section, you’ll learn the foundations of workflow orchestration, its importance, and how Kestra fits into the orchestration landscape.
Think of a music orchestra. There's a variety of different instruments. Some more than others, all with different roles when it comes to playing music. To make sure they all come together at the right time, they follow a conductor who helps the orchestra to play together.
Now replace the instruments with tools and the conductor with an orchestrator. We often have multiple tools and platforms that we need to work together. Sometimes on a routine schedule, other times based on events that happen. That's where the orchestrator comes in to help all of these tools work together.
A workflow orchestrator might do the following tasks:
- Run workflows which contain a number of predefined steps
- Monitor and log errors, as well as taking a number of extra steps when they occur
- Automatically run workflows based on schedules and events
In data engineering, you often need to move data from one place, to another, sometimes with some modifications made to the data in the middle. This is where a workflow orchestrator can help out by managing these steps, while giving us visibility into it at the same time.
In this module, we're going to build our own data pipeline using ETL (Extract, Transform Load) with Kestra at the core of the operation, but first we need to understand a bit more about how Kestra works before we can get building!
Kestra is an open-source, infinitely-scalable orchestration platform that enables all engineers to manage business-critical workflows.
Kestra is a great choice for workflow orchestration:
- Build with Flow code (YAML), No-code or with the AI Copilot - flexibility in how you build your workflows
- 1000+ Plugins - integrate with all the tools you use
- Support for any programming language - pick the right tool for the job
- Schedule or Event Based Triggers - have your workflows respond to data
In this section, you'll learn how to install Kestra, as well as the key concepts required to build your first workflow. Once our first workflow is built, we can extend this further by executing a Python script inside of a workflow.
You will:
- Install Kestra using Docker Compose
- Learn the concepts of Kestra to build your first workflow
- Execute a Python script inside of a Kestra Flow
To install Kestra, we are going to use Docker Compose. We already have a Postgres database set up, along with pgAdmin from Module 1. We can continue to use these with Kestra but we'll need to make a few modifications to our Docker Compose file.
Use this example Docker Compose file to correctly add the 2 new services and set up the volumes correctly.
Add information about setting a username and password.
We'll set up Kestra using Docker Compose containing one container for the Kestra server and another for the Postgres database:
cd 02-workflow-orchestration
docker compose up -dNote: Check that pgAdmin isn't running on the same ports as Kestra. If so, check out the FAQ at the bottom of the README.
Once the container starts, you can access the Kestra UI at http://localhost:8080.
To shut down Kestra, go to the same directory and run the following command:
docker compose downFlows can be added to Kestra by copying and pasting the YAML directly into the editor, or by adding via Kestra's API. See below for adding programmatically.
Add Flows to Kestra programmatically
If you prefer to add flows programmatically using Kestra's API, run the following commands:
# Import all flows: assuming username [email protected] and password Admin1234! (adjust to match your username and password)
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/01_hello_world.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_python.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/03_getting_started_data_pipeline.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/04_postgres_taxi.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/05_postgres_taxi_scheduled.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_kv.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/07_gcp_setup.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/08_gcp_taxi.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/09_gcp_taxi_scheduled.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/10_chat_without_rag.yaml
curl -X POST -u '[email protected]:Admin1234!' http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/11_chat_with_rag.yamlTo start building workflows in Kestra, we need to understand a number of concepts.
- Flow - a container for tasks and their orchestration logic.
- Tasks - the steps within a flow.
- Inputs - dynamic values passed to the flow at runtime.
- Outputs - pass data between tasks and flows.
- Triggers - mechanism that automatically starts the execution of a flow.
- Execution - a single run of a flow with a specific state.
- Variables - key–value pairs that let you reuse values across tasks.
- Plugin Defaults - default values applied to every task of a given type within one or more flows.
- Concurrency - control how many executions of a flow can run at the same time.
While there are more concepts used for building powerful workflows, these are the ones we're going to use to build our data pipelines.
The flow 01_hello_world.yaml showcases all of these concepts inside of one workflow:
- The flow has 5 tasks: 3 log tasks and a sleep task
- The flow takes an input called
name. - There is a variable that takes the
nameinput to generate a full welcome message. - An output is generated from the return task and is logged in a later log task.
- There is a trigger to execute this flow every day at 10am.
- Plugin Defaults are used to make both log tasks send their messages as
ERRORlevel. - We have a concurrency limit of 2 executions. Any further ones made while 2 are running will fail.
Now that we've built our first workflow, we can take it a step further by adding Python code into our flow. In Kestra, we can run Python code from a dedicated file or write it directly inside of our workflow.
While Kestra has a huge variety of plugins available for building your workflows, you also have the option to write your own code and have Kestra execute that based on schedules or events. This means you can pick the right tools for your pipelines, rather than the ones you're limited to.
In our example Python workflow, 02_python.yaml, our code fetches the number of Docker image pulls from DockerHub and returns it as an output to Kestra. This is useful as we can access this output with other tasks, even though it was generated inside of our Python script.
Next, we're gonna build ETL pipelines for Yellow and Green Taxi data from NYC’s Taxi and Limousine Commission (TLC). You will:
- Extract data from CSV files.
- Load it into Postgres or Google Cloud (GCS + BigQuery).
- Explore scheduling and backfilling workflows.
This introductory flow is added just to demonstrate a simple data pipeline which extracts data via HTTP REST API, transforms that data in Python and then queries it using DuckDB. For this stage, a new separate Postgres database is created for the exercises.
graph LR
Extract[Extract Data via HTTP REST API] --> Transform[Transform Data in Python]
Transform --> Query[Query Data with DuckDB]
Add the flow 03_getting_started_data_pipeline.yaml from the UI if you haven't already and execute it to see the results. Inspect the Gantt and Logs tabs to understand the flow execution.
Before we start loading data to GCP, we'll first play with the Yellow and Green Taxi data using a local Postgres database running in a Docker container. We will use the same database from Module 1 which should be in the same Docker Compose file as Kestra.
The flow will extract CSV data partitioned by year and month, create tables, load data to the monthly table, and finally merge the data to the final destination table.
graph LR
Start[Select Year & Month] --> SetLabel[Set Labels]
SetLabel --> Extract[Extract CSV Data]
Extract -->|Taxi=Yellow| YellowFinalTable[Create Yellow Final Table]:::yellow
Extract -->|Taxi=Green| GreenFinalTable[Create Green Final Table]:::green
YellowFinalTable --> YellowMonthlyTable[Create Yellow Monthly Table]:::yellow
GreenFinalTable --> GreenMonthlyTable[Create Green Monthly Table]:::green
YellowMonthlyTable --> YellowCopyIn[Load Data to Monthly Table]:::yellow
GreenMonthlyTable --> GreenCopyIn[Load Data to Monthly Table]:::green
YellowCopyIn --> YellowMerge[Merge Yellow Data]:::yellow
GreenCopyIn --> GreenMerge[Merge Green Data]:::green
classDef yellow fill:#FFD700,stroke:#000,stroke-width:1px,color:#000;
classDef green fill:#32CD32,stroke:#000,stroke-width:1px,color:#000;
The flow code: 04_postgres_taxi.yaml.
Note
The NYC Taxi and Limousine Commission (TLC) Trip Record Data provided on the nyc.gov website is currently available only in a Parquet format, but this is NOT the dataset we're going to use in this course. For the purpose of this course, we'll use the CSV files available here on GitHub. This is because the Parquet format can be challenging to understand by newcomers, and we want to make the course as accessible as possible — the CSV format can be easily introspected using tools like Excel or Google Sheets, or even a simple text editor.
We can now schedule the same pipeline shown above to run daily at 9 AM UTC. We'll also demonstrate how to backfill the data pipeline to run on historical data.
Note: given the large dataset, we'll backfill only data for the green taxi dataset for the year 2019.
The flow code: 05_postgres_taxi_scheduled.yaml.
Now that you've learned how to build ETL pipelines locally using Postgres, we are ready to move to the cloud. In this section, we'll load the same Yellow and Green Taxi data to Google Cloud Platform (GCP) using:
- Google Cloud Storage (GCS) as a data lake
- BigQuery as a data warehouse.
In 2.3, we made a ETL pipeline inside of Kestra:
- Extract: Firstly, we extract the dataset from GitHub
- Transform: Next, we transform it with Python
- Load: Finally, we load it into our Postgres database
While this is very standard across the industry, sometimes it makes sense to change the order when working with the cloud. If you're working with a large dataset, like the Yellow Taxi data, there can be benefits to extracting and loading straight into a data warehouse, and then performing transformations directly in the data warehouse. When working with BigQuery, we will use ELT:
- Extract: Firstly, we extract the dataset from GitHub
- Load: Next, we load this dataset (in this case, a csv file) into a data lake (Google Cloud Storage)
- Transform: Finally, we can create a table inside of our data warehouse (BigQuery) which uses the data from our data lake to perform our transformations.
The reason for loading into the data warehouse before transforming means we can utilize the cloud's performance benefits for transforming large datasets. What might take a lot longer for a local machine, can take a fraction of the time in the cloud.
Over the next few videos, we'll look at setting up BigQuery and transforming the Yellow Taxi dataset.
Before we start loading data to GCP, we need to set up the Google Cloud Platform.
First, adjust the following flow 06_gcp_kv.yaml to include your service account, GCP project ID, BigQuery dataset and GCS bucket name (along with their location) as KV Store values:
- GCP_PROJECT_ID
- GCP_LOCATION
- GCP_BUCKET_NAME
- GCP_DATASET.
If you haven't already created the GCS bucket and BigQuery dataset in the first week of the course, you can use this flow to create them: 07_gcp_setup.yaml.
Warning
The GCP_CREDS service account contains sensitive information. Ensure you keep it secure and do not commit it to Git. Keep it as secure as your passwords.
Now that Google Cloud is set up with a storage bucket, we can start the ELT process.
graph LR
SetLabel[Set Labels] --> Extract[Extract CSV Data]
Extract --> UploadToGCS[Upload Data to GCS]
UploadToGCS -->|Taxi=Yellow| BQYellowTripdata[Main Yellow Tripdata Table]:::yellow
UploadToGCS -->|Taxi=Green| BQGreenTripdata[Main Green Tripdata Table]:::green
BQYellowTripdata --> BQYellowTableExt[External Table]:::yellow
BQGreenTripdata --> BQGreenTableExt[External Table]:::green
BQYellowTableExt --> BQYellowTableTmp[Monthly Table]:::yellow
BQGreenTableExt --> BQGreenTableTmp[Monthly Table]:::green
BQYellowTableTmp --> BQYellowMerge[Merge to Main Table]:::yellow
BQGreenTableTmp --> BQGreenMerge[Merge to Main Table]:::green
BQYellowMerge --> PurgeFiles[Purge Files]
BQGreenMerge --> PurgeFiles[Purge Files]
classDef yellow fill:#FFD700,stroke:#000,stroke-width:1px,color:#000
classDef green fill:#32CD32,stroke:#000,stroke-width:1px,color:#000
The flow code: 08_gcp_taxi.yaml.
We can now schedule the same pipeline shown above to run daily at 9 AM UTC for the green dataset and at 10 AM UTC for the yellow dataset. You can backfill historical data directly from the Kestra UI.
Since we now process data in a cloud environment with infinitely scalable storage and compute, we can backfill the entire dataset for both the yellow and green taxi data without the risk of running out of resources on our local machine.
The flow code: 09_gcp_taxi_scheduled.yaml.
This section builds on what you learned earlier in Module 2 to show you how AI can speed up workflow development.
By the end of this section, you will:
- Understand why context engineering matters when collaborating with LLMs
- Use AI Copilot to build Kestra flows faster
- Use Retrieval Augmented Generation (RAG) in data pipelines
- Completion of earlier sections in Module 2 (Workflow Orchestration with Kestra)
- Kestra running locally
- Google Cloud account with access to Gemini API (there's a generous free tier!)
As data engineers, we spend significant time writing boilerplate code, searching documentation, and structuring data pipelines. AI tools can help us:
- Generate workflows faster: Describe what you want to accomplish in natural language instead of writing YAML from scratch
- Avoid errors: Get syntax-correct, up-to-date workflow code that follows best practices
However, AI is only as good as the context we provide. This section teaches you how to engineer that context for reliable, production-ready data workflows.
Let's start by seeing what happens when AI lacks proper context.
-
Open ChatGPT in a private browser window (to avoid any existing chat context): https://chatgpt.com
-
Enter this prompt:
Create a Kestra flow that loads NYC taxi data from a CSV file to BigQuery. The flow should extract data, upload to GCS, and load to BigQuery. -
Observe the results:
- ChatGPT will generate a Kestra flow, but it likely contains:
- Outdated plugin syntax e.g., old task types that have been renamed
- Incorrect property names e.g., properties that don't exist in current versions
- Hallucinated features e.g., tasks, triggers or properties that never existed
- ChatGPT will generate a Kestra flow, but it likely contains:
Large Language Models (LLMs) like GPT models from OpenAI are trained on data up to a specific point in time (knowledge cutoff). They don't automatically know about:
- Software updates and new releases
- Renamed plugins or changed APIs
This is the fundamental challenge of using AI: the model can only work with information it has access to.
Without proper context:
- ❌ Generic AI assistants hallucinate outdated or incorrect code
- ❌ You can't trust the output for production use
With proper context:
- ✅ AI generates accurate, current, production-ready code
- ✅ You can iterate faster by letting AI generate boilerplate workflow code
In the next section, we'll see how Kestra's AI Copilot solves this problem.
Kestra's AI Copilot is specifically designed to generate and modify Kestra flows with full context about the latest plugins, workflow syntax, and best practices.
Before using AI Copilot, you need to configure Gemini API access in your Kestra instance.
Step 1: Get Your Gemini API Key
- Visit Google AI Studio: https://aistudio.google.com/app/apikey
- Sign in with your Google account
- Click "Create API Key"
- Copy the generated key (keep it secure!)
Warning
Never commit API keys to Git. Always use environment variables or Kestra's KV Store.
Step 2: Configure Kestra AI Copilot
Add the following to your Kestra configuration. You can do this by modifying your docker-compose.yml file from 2.2:
services:
kestra:
environment:
KESTRA_CONFIGURATION: |
kestra:
ai:
type: gemini
gemini:
model-name: gemini-2.5-flash
api-key: ${GEMINI_API_KEY}Then restart Kestra:
cd 02-workflow-orchestration/docker
export GEMINI_API_KEY="your-api-key-here"
docker compose up -dObjective: Learn why context engineering matters.
- Open Kestra UI at http://localhost:8080
- Create a new flow and open the Code editor panel
- Click the AI Copilot button (sparkle icon ✨) in the top-right corner
- Enter the same exact prompt we used with ChatGPT:
Create a Kestra flow that loads NYC taxi data from a CSV file to BigQuery. The flow should extract data, upload to GCS, and load to BigQuery. - Compare the outputs:
- ✅ Copilot generates executable, working YAML
- ✅ Copilot uses correct plugin types and properties
- ✅ Copilot follows current Kestra best practices
Key Learning: Context matters! AI Copilot has access to current Kestra documentation, generating Kestra flows better than a generic ChatGPT assistant.
To further learn how to provide context to your prompts, this bonus section demonstrates how to use RAG.
RAG (Retrieval Augmented Generation) is a technique that:
- Retrieves relevant information from your data sources
- Augments the AI prompt with this context
- Generates a response grounded in real data
This solves the hallucination problem by ensuring the AI has access to current, accurate information at query time.
graph LR
A[Ask AI] --> B[Fetch Docs]
B --> C[Create Embeddings]
C --> D[Find Similar Content]
D --> E[Add Context to Prompt]
E --> F[LLM Answer]
The Process:
- Ingest documents: Load documentation, release notes, or other data sources
- Create embeddings: Convert text into vector representations using an LLM
- Store embeddings: Save vectors in Kestra's KV Store (or a vector database)
- Query with context: When you ask a question, retrieve relevant embeddings and include them in the prompt
- Generate response: The LLM has real context and provides accurate answers
Objective: Understand how RAG eliminates hallucinations by grounding LLM responses in real data.
Part A: Without RAG
- Navigate to the
10_chat_without_rag.yamlflow in your Kestra UI - Click Execute
- Wait for the execution to complete
- Open the Logs tab
- Read the output - notice how the response about "Kestra 1.1 features" is:
- Vague or generic
- Potentially incorrect
- Missing specific details
- Based only on the model's training data (which may be outdated)
Part B: With RAG
- Navigate to the
11_chat_with_rag.yamlflow - Click Execute
- Watch the execution:
- First task: Ingests Kestra 1.1 release documentation, creates embeddings and stores them
- Second task: Prompts LLM with context retrieved from stored embeddings
- Open the Logs tab
- Compare this output with the previous one - notice how it's:
- ✅ Specific and detailed
- ✅ Accurate with real features from the release
- ✅ Grounded in actual documentation
Key Learning: RAG (Retrieval Augmented Generation) grounds AI responses in current documentation, eliminating hallucinations and providing accurate, context-aware answers.
- Keep documents updated: Regularly re-ingest to ensure current information
- Chunk appropriately: Break large documents into meaningful chunks
- Test retrieval quality: Verify that the right documents are retrieved
Kestra Documentation:
- AI Tools Overview
- AI Copilot
- RAG Workflows
- AI Workflows
- Kestra Blueprints - Pre-built workflow examples
Kestra Plugin Documentation:
External Documentation:
Now that we've got all our pipelines working and we know how to quickly create new flows with Kestra's AI Copilot, we can deploy Kestra to the cloud so it can continue to orchestrate our scheduled pipelines.
In this bonus section, we'll cover how you can deploy Kestra on Google Cloud and automatically sync your workflows from a Git repository.
Note: When committing your workflows to Kestra, make sure your workflow doesn't contain any sensitive information. You can use Secrets and the KV Store to keep sensitive data out of your workflow logic.
- Install Kestra on Google Cloud
- Moving from Development to Production
- Using Git in Kestra
- Deploy Flows with GitHub Actions
- Check Kestra Docs
- Explore our Blueprints library
- Browse over 600 plugins available in Kestra
- Give us a star on GitHub
- Join our Slack community if you have any questions
- Find all the videos in this YouTube Playlist
If you face any issues with Kestra flows in Module 2, make sure to use the following Docker images/ports:
image: kestra/kestra:v1.1- pin your Kestra Docker image to this version so we can ensure reproducibility; do NOT usekestra/kestra:developas this is a bleeding-edge development version that might contain bugspostgres:18— make sure to pin your Postgres image to version 18- If you run
pgAdminor something else on port 8080, you can adjust Kestradocker-composeto use a different port, e.g. change port mapping to 18080 instead of 8080, and then access Kestra UI in your browser from http://localhost:18080/ instead of from http://localhost:8080/
If you are still facing any issues, stop and remove your existing Kestra + Postgres containers and start them again using docker-compose up -d. If this doesn't help, post your question on the DataTalksClub Slack or on Kestra's Slack http://kestra.io/slack.
If you encounter similar errors to:
BigQueryError{reason=invalid, location=null,
message=Error while reading table: kestra-sandbox.zooomcamp.yellow_tripdata_2020_01,
error message: CSV table references column position 17, but line contains only 14 columns.;
line_number: 2103925 byte_offset_to_start_of_line: 194863028
column_index: 17 column_name: "congestion_surcharge" column_type: NUMERIC
File: gs://anna-geller/yellow_tripdata_2020-01.csv}
It means that the CSV file you're trying to load into BigQuery has a mismatch in the number of columns between the external source table (i.e. file in GCS) and the destination table in BigQuery. This can happen when for due to network/transfer issues, the file is not fully downloaded from GitHub or not correctly uploaded to GCS. The error suggests schema issues but that's not the case. Simply rerun the entire execution including redownloading the CSV file and reuploading it to GCS. This should resolve the issue.
See the 2026 cohort folder
Did you take notes? You can share them by creating a PR to this file!
- Add your notes above this line