Create and materialize assets
We have integrated with Sling and dbt to bring in and model the data. Next we will define a custom asset to see some more of the internals of Dagster. In this step, you will:
- Build software-defined assets
- Add a DuckDB resource
Relationship between components and definitions
TODO
1. Create an asset
Before we begin we will need to include the Python libraries that the asset will use:
uv pip install dagster-duckdb pandas
When building our own assets, the first step is to scaffold the assets file with dg
.
dg scaffold defs dagster.asset assets.py
This adds a file, assets.py
, to the etl_tutorial
module:
src
└── etl_tutorial
├── __init__.py
└── defs
└── assets.py
We will put the code for our first asset in this file. Assets are functions decorated with @dg.asset
. This asset will be downstream from models of our dbt project so we will include the deps
attribute to link our asset into the same graph.
The logic of our asset will create a new table in DuckDB that joins the staging orders and customers tables from the dbt models together.
@dg.asset(
deps=["stg_orders", "stg_customers"],
kinds={"duckdb"},
automation_condition=dg.AutomationCondition.on_cron(
"0 0 * * 1"
), # every Monday at midnight
description="Joined data between orders and customers",
)
def joined_data(duckdb: DuckDBResource) -> dg.MaterializeResult:
table_name = "jaffle_platform.main.joined_data"
with duckdb.get_connection() as conn:
conn.execute(
f"""
create or replace table {table_name} as (
select
orders.customer_id,
orders.status as order_type,
count(*) as num_orders
from jaffle_platform.main.stg_orders as orders
left join jaffle_platform.main.stg_customers as customers
using(customer_id)
group by 1, 2
)
"""
)
preview_query = f"select * from {table_name} limit 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(f"select count(*) from {table_name}").fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
As well as creating the table, the asset will sample a preview of the results and record the number of rows in the table as metadata within Dagster.
Do not worry about the automation_condition
in the dg.asset
decorator for now. This is not necessary but will make more sense when we discuss automation later.
There is one more thing we need to define. Since our asset uses the same DuckDB database as Sling and dbt and relies on a DuckDBResource
resource. In Dagster, Resources are the external services, tools, and storage backends you need to do your job. We need to define that resource.
2. Define the DuckDB resource
We can use dg
to scaffold our resource file in the same way as our assets.
dg scaffold defs dagster.resources resources.py
This adds a file, resources.py
, to the etl_tutorial
module:
src
└── etl_tutorial
├── __init__.py
└── defs
└── resources.py
Within this file we will define the resource using the @dg.resource
.
from dagster_duckdb import DuckDBResource
import dagster as dg
database_resource = DuckDBResource(database="/tmp/jaffle_platform.duckdb")
@dg.definitions
def resources():
return dg.Definitions(
resources={
"duckdb": database_resource,
}
)
This will allow all assets within the Dagster project to access the DuckDB database connection.
Summary
We have now started to add in our own custom assets into the Dagster project. The etl_tutorial
module should look like this:
src
└── etl_tutorial
├── __init__.py
└── defs
├── __init__.py
├── ingest_files
│ ├── defs.yaml
│ └── replication.yaml
├── jdbt
│ └── defs.yaml
├── assets.py
└── resources.py
There is a lot more we can do with assets to make them more production ready.
Next steps
- Continue this tutorial with your data quality