Skip to main content

Dagster & Python scripts with components

Dagster provides a ready-to-use PythonScriptComponent which can be used to execute Python scripts as assets in your Dagster project. This component runs your Python scripts in a subprocess using Dagster Pipes, allowing you to leverage existing Python scripts while benefiting from Dagster's orchestration and observability features. This guide will walk you through how to use the PythonScriptComponent to execute your Python scripts.

1. Prepare a Dagster project

To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:

uvx create-dagster project my-project && cd my-project/src

Activate the project virtual environment:

source ../.venv/bin/activate

2. Scaffold a Python script component

Now that you have a Dagster project, you can scaffold a Python script component. You'll need to provide a name for your component. In this example, we'll create a component that will execute a Python script to process sales data and generate a revenue report.

dg scaffold defs dagster.PythonScriptComponent generate_revenue_report
Creating defs at /.../my-project/src/my_project/defs/generate_revenue_report.

The scaffold call will generate a defs.yaml file:

tree my_project/defs
my_project/defs
├── __init__.py
└── generate_revenue_report
└── defs.yaml

2 directories, 2 files

3. Create your Python script

Create a Python script that will be executed by the component, or use any existing Python script you've already written. Dagster will orchestrate it without requiring changes to your code. For this example, we'll create a simple data processing script:

my-project/src/my_project/defs/generate_revenue_report/process_sales_data.py
import pandas as pd

# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}

df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]

# Calculate total revenue
total_revenue = df["revenue"].sum()

print(f"Generated revenue report with total revenue: ${total_revenue}")
print(f"Number of transactions: {len(df)}")
print(f"Average transaction: ${df['revenue'].mean():.2f}")

This script will be executed by Dagster in a subprocess. Any output printed to stdout/stderr will be captured and displayed in the Dagster UI logs.

4. Configure your component

Update your defs.yaml file to specify the Python script and define the assets that will be created. You can also specify properties for the asset in Dagster, such as a group name and description:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report generated from transaction data"
group_name: "analytics"
kinds: ["python", "report"]

You can run dg list defs to see the asset corresponding to your component:

dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ sales_revenue_report │ analytics │ │ python │ Daily sales revenue report generated from │ │
│ │ │ │ │ │ report │ transaction data │ │
│ │ └──────────────────────┴───────────┴──────┴────────┴─────────────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

5. Launch your assets

Once your component is configured, you can launch your assets to execute the Python scripts:

dg dev

Navigate to the Dagster UI and you'll see your assets. Click on the asset and then "Materialize" to execute your Python script. The script will run in a subprocess, and you'll be able to see the logs and metadata in the Dagster UI.

6. Advanced configuration

Log metadata inside Python script

For more advanced use cases, you can use Dagster Pipes to pass metadata from your Python script back to Dagster. This allows you to provide rich information about your assets directly in the Dagster UI:

my-project/src/my_project/defs/generate_revenue_report/process_sales_data.py
import pandas as pd
from dagster_pipes import open_dagster_pipes

# Sample sales data (in a real scenario, this might come from a database or file)
sales_data = {
"date": ["2024-01-01", "2024-01-02", "2024-01-03"],
"product": ["A", "B", "A"],
"quantity": [10, 5, 8],
"price": [100.0, 200.0, 100.0],
}

with open_dagster_pipes() as context:
df = pd.DataFrame(sales_data)
df["revenue"] = df["quantity"] * df["price"]

# Calculate total revenue
total_revenue = df["revenue"].sum()

# Log the result to Dagster
context.log.info(f"Generated revenue report with total revenue: ${total_revenue}")
context.log.info(f"Processed {len(df)} transactions")

# Report asset materialization with rich metadata
context.report_asset_materialization(
metadata={
"total_revenue": total_revenue,
"num_transactions": len(df),
"average_transaction": df["revenue"].mean(),
"top_product": df.loc[df["revenue"].idxmax(), "product"],
}
)

With Dagster Pipes, you can:

  • Log structured information: Use context.log.info() to send logs directly to Dagster
  • Report asset metadata: Use context.report_asset_materialization() to attach rich metadata that appears in the Dagster UI
  • Handle errors: Exception information is automatically captured and reported to Dagster

Orchestrate multiple Python scripts

You can define multiple Python script components in a single defs.yaml file using the --- separator syntax. This allows you to run different scripts for different assets:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"

Each component instance runs independently and can execute different Python scripts. This approach is useful when you have multiple related data processing tasks that should be organized together but run separately.

Set up dependencies

You can specify dependencies between assets from different scripts. Using the multiple scripts example above, you can make one script depend on another:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
---
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_customer_data.py
assets:
- key: customer_summary_stats
description: "Summary statistics for customer data"
group_name: "analytics"
deps: [sales_revenue_report]

Automate Python scripts

You can configure when assets should be automatically materialized using automation conditions:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: process_sales_data.py
assets:
- key: sales_revenue_report
description: "Daily sales revenue report"
group_name: "analytics"
automation_condition: "{{ automation_condition.on_cron('@daily') }}"

Creating scripts in subdirectories

You can organize your scripts in subdirectories within your component:

my-project/src/my_project/defs/generate_revenue_report/
├── defs.yaml
├── scripts/
│ ├── process_sales_data.py
│ └── generate_reports.py
└── utils/
└── data_helpers.py

Reference scripts in subdirectories in your defs.yaml:

my-project/src/my_project/defs/generate_revenue_report/defs.yaml
type: dagster.PythonScriptComponent

attributes:
execution:
path: scripts/process_sales_data.py
assets:
- key: sales_revenue_report

Best practices

  • Start simple: Begin with standard Python scripts that print output for basic orchestration needs.
  • Log structured metadata and information with Dagster Pipes: Use print statements for simple cases, or leverage context.log.info() with Pipes for structured logging and use open_dagster_pipes() context manager to leverage full Pipes support, such as streaming structured asset materialization events back to Dagster.
  • Keep scripts focused: Each script should have a clear, single responsibility, and offload complex dependencies to Dagster to benefit from native observability like lineage tracking and asset metadata.