Create and materialize partitioned assets
Partitions are a core abstraction in Dagster, that allow you to manage large datasets, process incremental updates, and improve pipeline performance. You can partition assets the following ways:
- Time-based: Split data by time periods (e.g., daily, monthly)
- Category-based: Divide by known categories (e.g., country, product type)
- Two-dimensional: Combine two partition types (e.g., country + date)
- Dynamic: Create partitions based on runtime conditions
In this step, you will:
- Create a time-based asset partitioned by month
- Create a category-based asset partitioned by product category
1. Create a time-based partitioned asset
Dagster natively supports partitioning assets by datetime groups. We want to create an asset that calculates the monthly performance for each sales rep. To create the monthly partition copy the following code below the missing_dimension_check
asset check in the assets.py
.
monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2018-01-01")
Partition data are accessed within an asset by context. We want to create an asset that does this calculation for a given month from the partition
and deletes any previous value for that month. Copy the following asset under the monthly_partition
we just created.
@dg.asset(
deps=["stg_orders"],
kinds={"duckdb"},
partitions_def=monthly_partition,
automation_condition=dg.AutomationCondition.eager(),
description="Monthly sales performance",
)
def monthly_orders(context: dg.AssetExecutionContext, duckdb: DuckDBResource):
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]
table_name = "jaffle_platform.main.monthly_orders"
with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists {table_name} (
partition_date varchar,
status varchar,
order_num double
);
delete from {table_name} where partition_date = '{month_to_fetch}';
insert into {table_name}
select
'{month_to_fetch}' as partition_date,
status,
count(*) as order_num
from jaffle_platform.main.stg_orders
where strftime(order_date, '%Y-%m') = '{month_to_fetch}'
group by '{month_to_fetch}', status;
"""
)
preview_query = (
f"select * from {table_name} where partition_date = '{month_to_fetch}';"
)
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
select count(*)
from {table_name}
where partition_date = '{month_to_fetch}'
"""
).fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
Do not worry about the automation_condition
in the dg.asset
decorator for now. This is not necessary but will make more sense when we discuss automation later.
2. Create a category-based partitioned asset
Using known defined partitions is a simple way to break up your dataset when you know the different groups you want to subset it by. In our pipeline, we want to create an asset that represents the performance of each product category.
- To create the statically-defined partition for the product category, copy this code beneath the
monthly_sales_performance
asset:
payment_method_partition = dg.StaticPartitionsDefinition(
["credit_card", "coupon", "bank_transfer", "gift_card"]
)
- Now that the partition has been defined, we can use that in an asset that calculates the product category performance:
@dg.asset(
deps=[dg.AssetKey("stg_payments")],
kinds={"duckdb"},
partitions_def=payment_method_partition,
automation_condition=dg.AutomationCondition.eager(),
description="Payment performance",
)
def payment_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):
table_name = "jaffle_platform.main.payment_performance"
with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists {table_name} (
payment_method varchar,
order_num int,
total_amount int
);
delete from {table_name} where payment_method = '{payment_method_partition}';
insert into {table_name}
select
'{payment_method_partition}' as payment_method,
count(*) as order_num,
sum(amount) as total_amount
from jaffle_platform.main.stg_payments
where category = '{payment_method_partition}'
group by '{payment_method_partition}', product_name;
"""
)
preview_query = f"select * from product_performance where payment_method = '{payment_method_partition}';"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
select count(*)
from {table_name}
where payment_method = '{payment_method_partition}';
"""
).fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
3. Materialize partitioned assets
To materialize these assets:
- Navigate to the assets page.
- Reload definitions.
- Select the
monthly_sales_performance
asset, then Materialize selected. - Ensure all partitions are selected, then launch a backfill.
- Select the
product_performance
asset, then Materialize selected. - Ensure all partitions are selected, then launch a backfill.
TODO: Screenshot
Summary
Partitions provide operational flexibility by allowing you to launch runs that materialize only a subset of your data without affecting the rest, and support backfilling capabilities to reprocess historical data for specific time periods or categories. As you are developing assets, consider where partitions might be helpful.
Next steps
Now that we have the main assets in our ETL pipeline, it's time to add automation your pipeline