Skip to main content

Create a sensor

Sensors allow you to automate workflows based on external events or conditions, making them useful for event-driven automation, especially in situations where jobs occur at irregular cadences or in rapid succession.

Consider using sensors in the following situations:

  • Event-driven workflows: When your workflow depends on external events, such as the arrival of a new data file or a change in an API response.
  • Conditional execution: When you want to execute jobs only if certain conditions are met, reducing unnecessary computations.
  • Real-time processing: When you need to process data as soon as it becomes available, rather than waiting for a scheduled time.

In this step you will:

  • Create an asset that runs based on a event-driven workflow
  • Create a sensor to listen for conditions to materialize the asset

1. Create an event-driven asset

For our pipeline, we want to model a situation where an executive wants a pivot table report of sales results by department and product. They want that processed in real time from their request.

For this asset, we need to define the structure of the request that it is expecting in the materialization context.

Other than that, defining this asset is the same as our previous assets. Copy the following code beneath product_performance.

src/etl_tutorial/defs/assets.py
class AdhocRequestConfig(dg.Config):
start_date: str
end_date: str


@dg.asset(
deps=["stg_orders"],
kinds={"python"},
description="Adhoc order requests",
)
def adhoc_request(
config: AdhocRequestConfig, duckdb: DuckDBResource
) -> dg.MaterializeResult:
table_name = "jaffle_platform.main.stg_orders"

with duckdb.get_connection() as conn:
preview_df = conn.execute(
f"""
select
order_id,
customer_id
from {table_name}
where date >= '{config.start_date}'
and date < '{config.end_date}'
"""
).fetchdf()

return dg.MaterializeResult(
metadata={"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))}
)

2. Build the sensor

To define a sensor in Dagster, use the @sensor decorator. This decorator is applied to a function that evaluates whether the conditions for triggering a job are met.

Sensors include the following elements:

  • Job: The job that the sensor will trigger when the conditions are met.
  • RunRequest: An object that specifies the configuration for the job run. It includes a run_key to ensure idempotency and a run_config for job-specific settings.

First we will use dg to create the sensor file:

dg scaffold dagster.sensor sensors.py

Now copy the following sensor code in the sensors.py file:

src/etl_tutorial/defs/sensors.py
import json
import os

import dagster as dg


@dg.sensor(target="adhoc_request")
def adhoc_request_sensor(context: dg.SensorEvaluationContext):
PATH_TO_REQUESTS = os.path.join(os.path.dirname(__file__), "../", "data/requests")

previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []

for filename in os.listdir(PATH_TO_REQUESTS):
file_path = os.path.join(PATH_TO_REQUESTS, filename)
if filename.endswith(".json") and os.path.isfile(file_path):
last_modified = os.path.getmtime(file_path)

current_state[filename] = last_modified

# if the file is new or has been modified since the last run, add it to the request queue
if (
filename not in previous_state
or previous_state[filename] != last_modified
):
with open(file_path) as f:
request_config = json.load(f)

runs_to_request.append(
dg.RunRequest(
run_key=f"adhoc_request_{filename}_{last_modified}",
run_config={
"ops": {"adhoc_request": {"config": {**request_config}}}
},
)
)

return dg.SensorResult(
run_requests=runs_to_request, cursor=json.dumps(current_state)
)

3. Materialize the sensor asset

  1. Reload your Definitions.
  2. Navigate to the Automation page.
  3. Turn on the adhoc_request_sensor.
  4. Click on the adhoc_request_sensor details.

TODO: Screenshot

  1. Create a data/requests directory in dagster_tutorial. Then include a request.json file:
{
"department": "South",
"product": "Driftwood Denim Jacket",
"start_date": "2024-01-01",
"end_date": "2024-06-05"
}
  1. Click on the green tick to see the run for this request.

TODO: Screenshot

Summary

One new files have been added to the etl_tutorial module, sensors.py:

src
└── etl_tutorial
├── __init__.py
└── defs
├── __init__.py
├── ingest_files
│   ├── defs.yaml
│   └── replication.yaml
├── jdbt
│   └── defs.yaml
├── assets.py
├── resources.py
└── sensors.py

Sensors provide a fine grained way to build event driven systems. Combined with declarative automation, there are a number of ways to automate your pipelines.

Next steps

  • Continue this tutorial with adding Evidence