A graph is a set of interconnected ops or sub-graphs. While individual ops typically perform simple tasks, ops can be assembled into a graph to accomplish complex tasks.
Dagster supports using graphs to create assets, allowing you to produce multiple assets with a single graph and easily track cross-job lineage.
Graphs form the core of jobs.
Name | Description |
---|---|
@graph | The decorator used to define a graph, which can form the basis for multiple jobs. |
GraphDefinition | A graph definition, which is a set of ops (or sub-graphs) wired together. Forms the core of a job. Typically constructed using the @job decorator. |
To create a graph, use the @graph
decorator.
In the following example, we return one output from the root op (return_one
) and pass data along through single inputs and outputs:
from dagster import graph, op
@op
def return_one(context) -> int:
return 1
@op
def add_one(context, number: int) -> int:
return number + 1
@graph
def linear():
add_one(add_one(add_one(return_one())))
Graphs can be used to create software-defined assets. Graph-backed assets are useful if you have an existing graph that produces and consumes assets.
Wrapping your graph inside a software-defined asset gives you all the benefits of software-defined assets — like cross-job lineage — without requiring you to change the code inside your graph. Refer to the Software-defined assets documentation for more info and examples.
Need some inspiration? Using these patterns, you can build graphs:
You can use the same op definition multiple times in the same graph. Note that this approach can also apply to jobs.
@graph
def multiple_usage():
add_one(add_one(return_one()))
To differentiate between the two invocations of add_one
, Dagster automatically aliases the op names to be add_one
and add_one_2
.
You can also manually define the alias by using the .alias
method on the op invocation:
@graph
def alias():
add_one.alias("second_addition")(add_one(return_one()))
A single output can be passed to multiple inputs on downstream ops. In this example, the output from the first op is passed to two different ops. The outputs of those ops are combined and passed to the final op:
from dagster import graph, op
@op
def return_one(context) -> int:
return 1
@op
def add_one(context, number: int):
return number + 1
@op
def adder(context, a: int, b: int) -> int:
return a + b
@graph
def inputs_and_outputs():
value = return_one()
a = add_one(value)
b = add_one(value)
adder(a, b)
As an op only starts to execute once all its inputs have been resolved, you can use this behavior to model conditional execution.
In this example, the branching_op
outputs either the branch_1
result or branch_2
result. Since op execution is skipped for ops that have unresolved inputs, only one of the downstream ops will execute:
import random
from dagster import Out, Output, graph, op
@op(out={"branch_1": Out(is_required=False), "branch_2": Out(is_required=False)})
def branching_op():
num = random.randint(0, 1)
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@op
def branch_1_op(_input):
pass
@op
def branch_2_op(_input):
pass
@graph
def branching():
branch_1, branch_2 = branching_op()
branch_1_op(branch_1)
branch_2_op(branch_2)
Note: When using conditional branching, Output
objects must be yielded instead of returned.
If you have a fixed set of ops that all return the same output type, you can collect the outputs into a list and pass them into a single downstream op.
The downstream op executes only if all of the outputs were successfully created by the upstream op:
from typing import List
from dagster import graph, op
@op
def return_one() -> int:
return 1
@op
def sum_fan_in(nums: List[int]) -> int:
return sum(nums)
@graph
def fan_in():
fan_outs = []
for i in range(0, 10):
fan_outs.append(return_one.alias(f"return_one_{i}")())
sum_fan_in(fan_outs)
In this example, we have 10 ops that all output the number 1
. The sum_fan_in
op takes all of these outputs as a list and sums them.
Graphs can contain other graphs, or sub-graphs. Refer to the Nesting graphs documentation for more info and examples.
Using dynamic outputs, you can duplicate portions of a graph at runtime. Refer to the Dynamic graphs documentation for more info and examples.
Dependencies in Dagster are primarily data dependencies. Using data dependencies means each input of an op depends on the output of an upstream op.
If you have an op, say Op A
, that does not depend on any outputs of another op, say Op B
, there theoretically shouldn't be a reason for Op A
to run after Op B
. In most cases, these two ops should be parallelizable. However, there are some cases where an explicit ordering is required, but it doesn't make sense to pass data through inputs and outputs to model the dependency.
If you need to model an explicit ordering dependency, you can use the Nothing
Dagster type on the input definition of the downstream op. This type specifies that you are passing "nothing" via Dagster between the ops, while still using inputs and outputs to model the dependency between the two ops.
from dagster import In, Nothing, graph, op
@op
def create_table_1():
get_database_connection().execute(
"create table_1 as select * from some_source_table"
)
@op(ins={"start": In(Nothing)})
def create_table_2():
get_database_connection().execute("create table_2 as select * from table_1")
@graph
def nothing_dependency():
create_table_2(start=create_table_1())
In this example, create_table_2
has an input of type Nothing
meaning that it doesn't expect any data to be provided by the upstream op. This lets us connect them in the graph definition so that create_table_2
executes only after create_table_1
successfully executes.
Nothing
type inputs do not have a corresponding parameter in the function since there is no data to pass. When connecting the dependencies, it is recommended to use keyword args to prevent mix-ups with other positional inputs.
Note that in most cases, it is usually possible to pass some data dependency. In the example above, even though we probably don't want to pass the table data itself between the ops, we could pass table pointers. For example, create_table_1
could return a table_pointer
output of type str
with a value of table_1
, and this table name can be used in create_table_2
to more accurately model the data dependency.
Dagster also provides more advanced abstractions to handle dependencies and IO. If you find that you are finding it difficult to model data dependencies when using external storage, check out IO managers.
You may run into a situation where you need to programmatically construct the dependencies for a graph. In that case, you can directly define the GraphDefinition
object.
To construct a GraphDefinition, you need to pass the constructor a graph name, a list of op or graph definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each op’s inputs on the outputs of other ops in the graph. The top-level keys of the dependency dictionary are the string names of ops or graphs. If you are using op aliases, be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a DependencyDefinition
.
one_plus_one_from_constructor = GraphDefinition(
name="one_plus_one",
node_defs=[return_one, add_one],
dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
).to_job()
Sometimes you may want to construct the dependencies of a graph definition from a YAML file or similar. This is useful when migrating to Dagster from other workflow systems.
For example, you can have a YAML like this:
name: some_example
description: blah blah blah
ops:
- def: add_one
alias: A
- def: add_one
alias: B
deps:
num:
op: A
- def: add_two
alias: C
deps:
num:
op: A
- def: subtract
deps:
left:
op: B
right:
op: C
You can programmatically generate a GraphDefinition from this YAML:
import os
from dagster import DependencyDefinition, GraphDefinition, NodeInvocation, op
from dagster.utils.yaml_utils import load_yaml_from_path
@op
def add_one(num: int) -> int:
return num + 1
@op
def add_two(num: int) -> int:
return num + 2
@op
def subtract(left: int, right: int) -> int:
return left + right
def construct_graph_with_yaml(yaml_file, op_defs) -> GraphDefinition:
yaml_data = load_yaml_from_path(yaml_file)
deps = {}
for op_yaml_data in yaml_data["ops"]:
def_name = op_yaml_data["def"]
alias = op_yaml_data.get("alias", def_name)
op_deps_entry = {}
for input_name, input_data in op_yaml_data.get("deps", {}).items():
op_deps_entry[input_name] = DependencyDefinition(
solid=input_data["op"], output=input_data.get("output", "result")
)
deps[NodeInvocation(name=def_name, alias=alias)] = op_deps_entry
return GraphDefinition(
name=yaml_data["name"],
description=yaml_data.get("description"),
node_defs=op_defs,
dependencies=deps,
)
def define_dep_dsl_graph() -> GraphDefinition:
path = os.path.join(os.path.dirname(__file__), "my_graph.yaml")
return construct_graph_with_yaml(path, [add_one, add_two, subtract])
Ready to start using your graphs in Dagster jobs? Refer to the Jobs documentation for detailed info and examples.