Run configuration allows providing parameters to jobs at the time they're executed.
Name | Description |
---|---|
ConfigSchema | See details with code examples in the API documentation. |
It's often useful to provide user-chosen values to Dagster jobs or software-defined assets at runtime. For example, you might want to choose what dataset an op runs against, or provide a connection URL for a database resource. Dagster exposes this functionality through a configuration API.
Various Dagster entities (ops, assets, resources) can be individually configured. When launching a job that executes (ops), materializes (assets), or instantiates (resources) a configurable entity, you can provide run configuration for each entity. Within the function that defines the entity, you can access the passed-in configuration off of the context
. Typically, the provided run configuration values correspond to a configuration schema attached to the op/asset/resource definition. Dagster validates the run configuration against the schema and proceeds only if validation is successful.
A common use of configuration is for a schedule or sensor to provide configuration to the job run it is launching. For example, a daily schedule might provide the day it's running on to one of the ops as a config value, and that op might use that config value to decide what day's data to read.
Configurable parameters accepted by an op, asset, or resource are specified by providing a config_schema
to the corresponding decorator. The structure of a config_schema
is flexible and fully documented in the API Reference. However, most of the time you will want to provide a Python dictionary, with keys the names of parameters and values the types of those parameters.
During execution, the specified parameters are accessible within the body of the op/asset/resource under context.op_config
(for ops/assets) or context.resource_config
(for resources). It might seem confusing that asset config is accessed under context.op_config
instead of context.asset_config
. However, assets are wrappers for ops, so when we access asset config we are literally just accessing config for the underlying op.
Below we define a simple op and asset with identical config_schemas
defining a single configurable parameter, person_name
, as well as a resource with a configurable url
parameter:
@op(config_schema={"person_name": str})
def op_using_config(context):
return f'hello {context.op_config["person_name"]}'
@asset(config_schema={"person_name": str})
def asset_using_config(context):
# Note how asset config is also accessed with context.op_config
return f'hello {context.op_config["person_name"]}'
@resource(config_schema={"url": str})
def resource_using_config(context):
return MyDatabaseConnection(context.resource_config["url"])
It is technically possible to access context.op_config
inside ops (not assets) without defining a config_schema
. However, this is not recommended.
You can also build config into jobs, as described in the Jobs documentation.
If you want to execute op_using_config
or materialize asset_using_config
, we'll need to provide values for the parameters specified in config_schema
. How we provide these values depends on the interface we are using:
From the Python API, we can use the run_config
argument for JobDefinition.execute_in_process
or materialize
. This takes a dictionary where configuration values for ops/assets are specified under ops.<op_or_asset_name>.config
(for resources under resources.<resource_name>.config
):
@job
def example_job():
op_using_config()
job_result = example_job.execute_in_process(
run_config={"ops": {"op_using_config": {"config": {"person_name": "Alice"}}}}
)
asset_result = materialize(
[asset_using_config],
run_config={
"ops": {"asset_using_config": {"config": {"person_name": "Alice"}}}
},
)
From Dagit's Launchpad, we supply config as YAML using the config editor. The editor has typeahead, schema validation, and schema documentation. You can also click the "Scaffold Missing Config" button to generate dummy values based on the config schema. Note that a modal containing the launchpad editor will pop up if we attempt to materialize an asset with a defined config_schema
:
When executing a job from Dagster's CLI with dagster job execute, we can put config in a YAML file and pass the file path with the --config
option:
ops:
op_using_config:
config:
person_name: Alice
dagster job execute --config my_config.yaml
Dagster validates any provided run config against the corresponding config schemas. It will abort execution with a DagsterInvalidConfigError
if validation fails. For example, both of the following will fail, because there is no nonexistent_config_value
in the config schema:
@job
def example_job():
op_using_config()
op_result = example_job.execute_in_process(
run_config={
"ops": {"op_using_config": {"config": {"nonexistent_config_value": 1}}}
}
)
asset_result = materialize(
[asset_using_config],
run_config={
"ops": {"asset_using_config": {"config": {"nonexistent_config_value": 1}}}
},
)
If you want multiple ops to share values, You can use make_values_resource
to pass the values via a resource and reference that resource from any op that needs it.
It defaults to Any
type, meaning Dagster will accept any config value provided for the resource:
from dagster import job, make_values_resource, op
@op(required_resource_keys={"value"})
def needs_value(context):
context.log.info(f"value: {context.resources.value}")
@op(required_resource_keys={"value"})
def also_needs_value(context):
context.log.info(f"value: {context.resources.value}")
@job(resource_defs={"value": make_values_resource()})
def basic_job():
needs_value()
also_needs_value()
basic_result = basic_job.execute_in_process(
run_config={"resources": {"value": {"config": "some_value"}}}
)
You can also specify the schema of the values like:
from dagster import job, make_values_resource, op
@op(required_resource_keys={"values"})
def needs_value(context):
context.log.info(f"my str: {context.resources.values['my_str']}")
@op(required_resource_keys={"values"})
def needs_different_value(context):
context.log.info(f"my int: {context.resources.values['my_int']}")
@job(resource_defs={"values": make_values_resource(my_str=str, my_int=int)})
def different_values_job():
needs_value()
needs_different_value()
result = different_values_job.execute_in_process(
run_config={"resources": {"values": {"config": {"my_str": "foo", "my_int": 1}}}}
)
And pass the values via a run config like so:
resources:
values:
config:
my_str: foo
my_int: 1
For more examples of jobs, check out the following in our Hacker News example: