Dagster & Python scripts with components
Dagster provides a ready-to-use PythonScriptComponent
which can be used to execute Python scripts as assets in your Dagster project. This component runs your Python scripts in a subprocess using Dagster Pipes, allowing you to leverage existing Python scripts while benefiting from Dagster's orchestration and observability features. This guide will walk you through how to use the PythonScriptComponent
to execute your Python scripts.
1. Prepare a Dagster project
To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:
uvx create-dagster project my-project && cd my-project/src
Activate the project virtual environment:
source ../.venv/bin/activate
2. Scaffold a Python script component
Now that you have a Dagster project, you can scaffold a Python script component. You'll need to provide a name for your component. In this example, we'll create a component that will execute a Python script to process sales data and generate a revenue report.
dg scaffold defs dagster.PythonScriptComponent generate_revenue_report
Creating defs at /.../my-project/src/my_project/defs/generate_revenue_report.
The scaffold call will generate a defs.yaml
file:
tree my_project/defs
my_project/defs
├── __init__.py
└── generate_revenue_report
└── defs.yaml
2 directories, 2 files
3. Create your Python script
Create a Python script that will be executed by the component, or use any existing Python script you've already written. Dagster will orchestrate it without requiring changes to your code. For this example, we'll create a simple data processing script:
import pandas as pd
# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}
df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]
# Calculate total revenue
total_revenue = df["revenue"].sum()
print(f"Generated revenue report with total revenue: ${total_revenue}")
print(f"Number of transactions: {len(df)}")
print(f"Average transaction: ${df['revenue'].mean():.2f}")
This script will be executed by Dagster in a subprocess. Any output printed to stdout/stderr will be captured and displayed in the Dagster UI logs.
4. Configure your component
Update your defs.yaml
file to specify the Python script and define the assets that will be created. You can also specify properties for the asset in Dagster, such as a group name and description:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report generated from transaction data"
group_name: "analytics"
kinds: ["python", "report"]
You can run dg list defs
to see the asset corresponding to your component:
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━╇━ ━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ sales_revenue_report │ analytics │ │ python │ Daily sales revenue report generated from │ │
│ │ │ │ │ │ report │ transaction data │ │
│ │ └──────────────────────┴───────────┴──────┴────────┴─────────────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
5. Launch your assets
Once your component is configured, you can launch your assets to execute the Python scripts:
dg dev
Navigate to the Dagster UI and you'll see your assets. Click on the asset and then "Materialize" to execute your Python script. The script will run in a subprocess, and you'll be able to see the logs and metadata in the Dagster UI.
6. Advanced configuration
Log metadata inside Python script
For more advanced use cases, you can use Dagster Pipes to pass metadata from your Python script back to Dagster. This allows you to provide rich information about your assets directly in the Dagster UI:
import pandas as pd
from dagster_pipes import open_dagster_pipes
# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}
with open_dagster_pipes() as context:
df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]
# Calculate total revenue
total_revenue = df["revenue"].sum()
# Log the result to Dagster
context.log.info(f"Generated revenue report with total revenue: ${total_revenue}")
context.log.info(f"Processed {len(df)} transactions")
# Report asset materialization with rich metadata
context.report_asset_materialization(
metadata={
"total_revenue": total_revenue,
"num_transactions": len(df),
"average_transaction": df["revenue"].mean(),
"top_product": df.loc[df["revenue"].idxmax(), "product"],
}
)
With Dagster Pipes, you can:
- Log structured information: Use
context.log.info()
to send logs directly to Dagster - Report asset metadata: Use
context.report_asset_materialization()
to attach rich metadata that appears in the Dagster UI - Handle errors: Exception information is automatically captured and reported to Dagster
Orchestrate multiple Python scripts
You can define multiple Python script components in a single defs.yaml
file using the ---
separator syntax. This allows you to run different scripts for different assets:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"
Each component instance runs independently and can execute different Python scripts. This approach is useful when you have multiple related data processing tasks that should be organized together but run separately.
Set up dependencies
You can specify dependencies between assets from different scripts. Using the multiple scripts example above, you can make one script depend on another:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"
deps: [sales_revenue_report]
Automate Python scripts
You can configure when assets should be automatically materialized using automation conditions:
type: dagster.PythonScriptComponent
attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
Creating scripts in subdirectories
You can organize your scripts in subdirectories within your component:
my-project/src/my_project/defs/generate_revenue_report/
├── defs.yaml
├── scripts/
│ ├── process_sales_data.py
│ └── generate_reports.py
└── utils/
└── data_helpers.py
Reference scripts in subdirectories in your defs.yaml
:
type: dagster.PythonScriptComponent
attributes:
execution:
path: scripts/process_sales_data.py
assets:
- key: sales_revenue_report
Best practices
- Start simple: Begin with standard Python scripts that print output for basic orchestration needs.
- Log structured metadata and information with Dagster Pipes: Use print statements for simple cases, or leverage
context.log.info()
with Pipes for structured logging and useopen_dagster_pipes()
context manager to leverage full Pipes support, such as streaming structured asset materialization events back to Dagster. - Keep scripts focused: Each script should have a clear, single responsibility, and offload complex dependencies to Dagster to benefit from native observability like lineage tracking and asset metadata.