Familiar with ops and graphs? Want to understand when, why, and how to use software-defined assets in Dagster? If so, this guide is for you. We'll also demonstrate what some common Dagster jobs look like before and after using software-defined assets.
Before we jump in, here's a quick refresher:
Software-defined assets aren't a replacement for Dagster's core computational concepts - ops are, in fact, the core unit of computation that occurs within an asset. Think of them as a top layer that links ops, graphs, and jobs to the long-lived objects they interact with.
Using software-defined assets means building Dagster jobs in a way that declares ahead of time the assets they produce and consume. This is different than using the AssetMaterialization
API, which only informs Dagster at runtime about the assets a job interacted with.
Preemptively declaring assets offers distinct advantages, including:
As software-defined assets know what other assets they depend on, an asset's lineage can be viewed easily in Dagit.
Assets help track and define cross-job dependencies. For example, when viewing a job that materializes assets, you can navigate to the jobs that produce the assets that it depends on. Additionally, when an upstream asset has been updated more recently than a downstream asset, Dagster will indicate that the downstream asset might be out of date.
Using software-defined assets enables you to directly operate them in Dagit. On the Asset's Details page, you can:
Software-defined assets provide sizeable improvements when it comes to code ergonomics:
You'll usually write less code. Specifying the inputs to a software-defined asset defines the assets it depends on. This means you don't need to use @graph
and @job
to wire dependencies between ops.
This approach improves scalability by reducing the number of times an asset's name appears in your codebase by half. Refer to the IO manager-based example below to see this in action.
You no longer have to choose between easy dependency tracking and manageable organization. Without software-defined assets, you're often forced to:
As assets track their dependencies, you can avoid interruptions in dependency graphs and eliminate the need for root input managers.
You should use software-defined assets when:
Note that using software-defined assets in one job doesn’t mean they need to be used in all your jobs. If your use case doesn't meet these criteria, you can still use graphs and ops.
Still not sure? Check out these examples to see what's a good fit and what isn't:
Use case | Good fit? | Explanation |
---|---|---|
Every day, drop and recreate the users table and the user_recommender_model model that depends on it | Yes | Assets are known before a run and are being updated |
Every hour, add a partition to the events table | Yes | Assets are known before a run and are being updated |
Clicking a button refreshes the recommender model | Yes | Assets are known before a run and are being updated |
Every day, send emails to a set of users | No | No assets are being updated |
Every day, read a file of user IDs and change the value of a particular attribute for each user | No | The set of assets to update is not known before running the job. |
Every day, scan my warehouse for tables that haven't been used in months and delete them | No | The set of assets to update is not known before running the job. |
Let's say you've written jobs that you want to enrich using software-defined assets. Assuming assets are known and being updated, what would upgrading look like?
Generally, every op output in a job that corresponds to a long-lived object in storage should have a software-defined asset. The following examples demonstrate some realistic Dagster jobs, both with and without software-defined assets:
This isn't an exhaustive list! We're adding the ability to define jobs that materialize software-defined assets and then run arbitrary ops. Interested? We'd love to hear from you in Slack or a GitHub discussion.
This example is a vanilla, op-based job that follows the idiomatic practice of delegating all IO to IO managers and root input managers.
The goal of each op in the job is to produce an asset. However, because the job doesn't use the software-defined asset APIs, Dagster is unaware of this:
from pandas import DataFrame
from dagster import In, Out, job, op, repository
from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model
@op(
ins={"raw_users": In(root_manager_key="warehouse")},
out={"users": Out(io_manager_key="warehouse")},
)
def build_users(raw_users: DataFrame) -> DataFrame:
users_df = raw_users.dropna()
return users_df
@op(out={"users_recommender_model": Out(io_manager_key="object_store")})
def build_user_recommender_model(users: DataFrame):
users_recommender_model = train_recommender_model(users)
return users_recommender_model
@job(resource_defs={"warehouse": snowflake_io_manager, "object_store": s3_io_manager})
def users_recommender_job():
build_user_recommender_model(build_users())
@repository
def repo():
return [users_recommender_job]
Here's what an equivalent job looks like using software-defined assets:
from pandas import DataFrame
from dagster import SourceAsset, asset, define_asset_job, repository, with_resources
from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model
raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")
@asset(io_manager_key="warehouse")
def users(raw_users: DataFrame) -> DataFrame:
users_df = raw_users.dropna()
return users_df
@asset(io_manager_key="object_store")
def user_recommender_model(users: DataFrame):
users_recommender_model = train_recommender_model(users)
return users_recommender_model
@repository
def repo():
return [
*with_resources(
[raw_users, users, user_recommender_model],
resource_defs={
"warehouse": snowflake_io_manager,
"object_store": s3_io_manager,
},
),
define_asset_job("users_recommender_job"),
]
This example does the same things as the previous example, with one difference. This job performs IO inside of the ops instead of delegating it to IO managers and root input managers:
from pandas import read_sql
from dagster import In, Nothing, job, op, repository
from .mylib import create_db_connection, pickle_to_s3, train_recommender_model
@op
def build_users():
raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
users_df.to_sql(name="users", con=create_db_connection())
@op(ins={"users": In(Nothing)})
def build_user_recommender_model():
users_df = read_sql(f"select * from users", con=create_db_connection())
users_recommender_model = train_recommender_model(users_df)
pickle_to_s3(users_recommender_model, key="users_recommender_model")
@job
def users_recommender_job():
build_user_recommender_model(build_users())
@repository
def repo():
return [users_recommender_job]
Here's an example of an equivalent job that uses software-defined assets:
from pandas import read_sql
from dagster import asset, define_asset_job, repository
from .mylib import create_db_connection, pickle_to_s3, train_recommender_model
@asset(non_argument_deps={"raw_users"})
def users():
raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
users_df = raw_users_df.dropna()
users_df.to_sql(name="users", con=create_db_connection())
@asset(non_argument_deps={"users"})
def user_recommender_model():
users_df = read_sql(f"select * from users", con=create_db_connection())
users_recommender_model = train_recommender_model(users_df)
pickle_to_s3(users_recommender_model, key="users_recommender_model")
@repository
def repo():
return [users, user_recommender_model, define_asset_job("users_recommender_job")]
This example demonstrates a job where some of the ops (extract_products
and get_categories
) don't produce assets of their own. Instead, they produce transient data that downstream ops will use to produce assets:
from pandas import DataFrame
from dagster import job, op, repository
from .mylib import create_db_connection, fetch_products
@op
def extract_products() -> DataFrame:
return fetch_products()
@op
def get_categories(products: DataFrame) -> DataFrame:
return DataFrame({"category": products["category"].unique()})
@op
def write_products_table(products: DataFrame) -> None:
products.to_sql(name="categories", con=create_db_connection())
@op
def write_categories_table(categories: DataFrame) -> None:
categories.to_sql(name="categories", con=create_db_connection())
@job
def ingest_products_and_categories():
products = extract_products()
product_categories = get_categories(products)
return write_products_table(products), write_categories_table(product_categories)
@repository
def repo():
return [ingest_products_and_categories]
Here's an equivalent job using software-defined assets. Note: Because some ops don't correspond to assets, this job uses @op
and @graph
APIs and from_graph
to wrap a graph in a software-defined asset:
from pandas import DataFrame
from dagster import AssetsDefinition, GraphOut, define_asset_job, graph, op, repository
from .mylib import create_db_connection, fetch_products
@op
def extract_products() -> DataFrame:
return fetch_products()
@op
def get_categories(products: DataFrame) -> DataFrame:
return DataFrame({"category": products["category"].unique()})
@op
def write_products_table(products: DataFrame) -> None:
products.to_sql(name="categories", con=create_db_connection())
@op
def write_categories_table(categories: DataFrame) -> None:
categories.to_sql(name="categories", con=create_db_connection())
@graph(out={"products": GraphOut(), "categories": GraphOut()})
def ingest_graph():
products = extract_products()
product_categories = get_categories(products)
return write_products_table(products), write_categories_table(product_categories)
two_tables = AssetsDefinition.from_graph(ingest_graph)
@repository
def repo():
return [two_tables, define_asset_job("products_and_categories_job")]
Still not sure how software-defined assets fit into your current Dagster usage? In this section, we'll touch on how software-defined assets work with some of Dagster's core concepts:
Without software-defined assets | With software-defined assets |
---|---|
An op is the basic unit of computation | Every software-defined asset includes a graph or an op |
A graph is a composite unit of computation that connects multiple ops | Every software-defined asset includes a graph or an op |
Ops can have multiple outputs | Multiple assets can be produced by a single op when defined using the @multi_asset decorator |
Ops can use config | Assets can use config |
Ops can access OpExecutionContext | Assets can access OpExecutionContext |
Ops can require resources | Software-defined assets can require resources |
Ops can be tested by directly invoking them | Assets can be tested by directly invoking them |
Without software-defined assets | With software-defined assets |
---|---|
A job targets a graph of ops | An asset job targets a selection of software-defined assets |
Jobs can be partitioned | Assets can be partitioned |
Jobs can be put on schedules or sensors | Asset jobs can be put on schedules or sensors |
Without software-defined assets | With software-defined assets |
---|---|
Op outputs and inputs can have Dagster types | Software-defined assets can have Dagster types |
The Nothing Dagster type enables declaring that Dagster doesn't need to store or load the object corresponding to an op output or input | The non_argument_deps argument when defining an asset enables specifying dependencies without relying on Dagster to store or load objects corresponding to that dependency |
Without software-defined assets | With software-defined assets |
---|---|
Repositories can contain jobs, schedules, and sensors | Repositories can contain assets |
Without software-defined assets | With software-defined assets |
---|---|
IO managers can control how op inputs and outputs are loaded and stored | IO managers can control how assets are loaded and stored |