Name | Description |
---|---|
@root_input_manager | The decorator used to define a Root Input Manager. |
RootInputManager | The base class to define how to load inputs to ops at the root of a job. |
@dagster_type_loader | The decorator used to define a Dagster Type Loader. |
DagsterTypeLoader | The base class used to specify how to load inputs that depends on the type. |
Ops in a job may have input definitions that don't correspond to the outputs of upstream ops. You can provide values for these inputs in a few different ways. Dagster checks each, in order, and uses the first that's available:
RootInputManager
, which can be referenced from In
s, is a resource that loads unconnected inputs.DagsterTypeLoader
provides a way to specify how to load inputs that depends on a type. A DagsterTypeLoader
can be placed on DagsterType
, which can be placed on In
.In
accepts a default_value
argument.When you have an op at the beginning of a job that operates on a built-in dagster type like string or int, you can provide a value for that input via run config.
Here's a basic job with an unconnected string input:
@op(ins={"input_string": In(String)})
def my_op(context, input_string):
context.log.info(f"input string: {input_string}")
@job
def my_job():
my_op()
The String
dagster type has a dagster type loader that allows it to load inputs from config:
my_job.execute_in_process(
run_config={"ops": {"my_op": {"inputs": {"input_string": {"value": "marmot"}}}}}
)
When you have an op at the beginning of your job that operates on a dagster type that you've defined, you can write your own DagsterTypeLoader
to define how to load that input via run config.
@dagster_type_loader(
config_schema={"diameter": float, "juiciness": float, "cultivar": str}
)
def apple_loader(_context, config):
return Apple(
diameter=config["diameter"],
juiciness=config["juiciness"],
cultivar=config["cultivar"],
)
@usable_as_dagster_type(loader=apple_loader)
class Apple:
def __init__(self, diameter, juiciness, cultivar):
self.diameter = diameter
self.juiciness = juiciness
self.cultivar = cultivar
@op(ins={"input_apple": In(Apple)})
def my_op(context, input_apple):
context.log.info(f"input apple diameter: {input_apple.diameter}")
@job
def my_job():
my_op()
With this, the input can be specified via config as below:
my_job.execute_in_process(
run_config={
"ops": {
"my_op": {
"inputs": {
"input_apple": {
"diameter": 2.4,
"juiciness": 6.0,
"cultivar": "honeycrisp",
}
}
}
}
},
)
When you have an op at the beginning of a job that operates on data from external source, you might wish to separate that I/O from your op's business logic, in the same way you would with an IO manager if the op were loading from an upstream output.
To accomplish this, you can define an RootInputManager
.
@op(ins={"dataframe": In(root_manager_key="my_root_manager")})
def my_op(dataframe):
"""Do some stuff"""
@root_input_manager
def table1_loader(_):
return read_dataframe_from_table(name="table1")
@job(resource_defs={"my_root_manager": table1_loader})
def my_job():
my_op()
Setting the root_manager_key
on an In
controls how that input is loaded in jobs where it has no upstream output.
The root_input_manager
decorator behaves nearly identically to the resource
decorator. It yields an RootInputManagerDefinition
, which is a ResourceDefinition
that will produce an RootInputManager
.
When launching a run, you might want to parameterize how particular root inputs are loaded.
To accomplish this, you can define an input_config_schema
on the input manager definition. The load function can access this config when storing or loading data, via the InputContext
.
@root_input_manager(input_config_schema={"table_name": str})
def table_loader(context):
return read_dataframe_from_table(name=context.config["table_name"])
Then, when executing a job, you can pass in this per-input config.
@job(resource_defs={"my_root_manager": table_loader})
def my_job():
my_op()
my_job.execute_in_process(
run_config={
"ops": {"my_op": {"inputs": {"dataframe": {"table_name": "table1"}}}}
},
)
You might want to execute a subset of ops in your job and control how the inputs of those ops are loaded. Root input managers also help in these situations, because the inputs at the beginning of the subset become the new "roots".
For example, you might have op1
that normally produces a table that op2
consumes. To debug op2
, you might want to run it on a different table than the one normally produced by op1
.
To accomplish this, you can set up the root_manager_key
on op2
's In
to point to an input manager with the desired loading behavior. As before, setting the root_manager_key
on an In
controls how that input is loaded when it has no upstream output.
@root_input_manager(input_config_schema={"table_name": str})
def my_root_input_manager(context):
return read_dataframe_from_table(name=context.config["table_name"])
class MyIOManager(IOManager):
def handle_output(self, context, obj):
table_name = context.name
write_dataframe_to_table(name=table_name, dataframe=obj)
def load_input(self, context):
return read_dataframe_from_table(name=context.upstream_output.name)
@io_manager
def my_io_manager(_):
return MyIOManager()
@op(out=Out(io_manager_key="my_io_manager"))
def op1():
"""Do stuff"""
@op(ins={"dataframe": In(root_manager_key="my_root_input_manager")})
def op2(dataframe):
"""Do stuff"""
@job(
resource_defs={
"my_io_manager": my_io_manager,
"my_root_input_manager": my_root_input_manager,
}
)
def my_job():
op2(op1())
When running the full job, op2
's input will be loaded using the IO manager on the output of op1
. When running the job subset, op2
's input has no upstream output, so the input manager corresponding to its root_manager_key
is used.
my_job.execute_in_process(
run_config={
"ops": {"op2": {"inputs": {"dataframe": {"table_name": "tableX"}}}}
},
op_selection=["op2"],
)