IO Managers

IO managers are user-provided objects that store op outputs and load them as inputs to downstream ops.

@dagster.io_manager(config_schema=None, description=None, output_config_schema=None, input_config_schema=None, required_resource_keys=None, version=None)[source]

Define an IO manager.

IOManagers are used to store op outputs and load them as inputs to downstream ops.

The decorated function should accept an InitResourceContext and return an IOManager.

Parameters
  • config_schema (Optional[ConfigSchema]) – The schema for the resource config. Configuration data available in init_context.resource_config. If not set, Dagster will accept any config provided.

  • description (Optional[str]) – A human-readable description of the resource.

  • output_config_schema (Optional[ConfigSchema]) – The schema for per-output config. If not set, no per-output configuration will be allowed.

  • input_config_schema (Optional[ConfigSchema]) – The schema for per-input config. If not set, Dagster will accept any config provided.

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the object manager.

  • version (Optional[str]) – (Experimental) The version of a resource function. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs.

Examples:

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        write_csv("some/path")

    def load_input(self, context):
        return read_csv("some/path")

@io_manager
def my_io_manager(init_context):
    return MyIOManager()

@op(out=Out(io_manager_key="my_io_manager_key"))
def my_op(_):
    return do_stuff()

@job(resource_defs={"my_io_manager_key": my_io_manager})
def my_job():
    my_op()
class dagster.IOManager[source]

Base class for user-provided IO managers.

IOManagers are used to store op outputs and load them as inputs to downstream ops.

Extend this class to handle how objects are loaded and stored. Users should implement handle_output to store an object and load_input to retrieve an object.

get_input_asset_key(context)[source]

User-defined method that associates inputs loaded by this IOManager with a particular AssetKey.

Parameters

context (InputContext) – The input context, which describes the input that’s being loaded and the upstream output that’s being loaded from.

get_input_asset_partitions(context)[source]

User-defined method that associates inputs loaded by this IOManager with a set of partitions of an AssetKey.

Parameters

context (InputContext) – The input context, which describes the input that’s being loaded and the upstream output that’s being loaded from.

get_output_asset_key(_context)[source]

User-defined method that associates outputs handled by this IOManager with a particular AssetKey.

Parameters

context (OutputContext) – The context of the step output that produces this object.

get_output_asset_partitions(_context)[source]

User-defined method that associates outputs handled by this IOManager with a set of partitions of an AssetKey.

Parameters

context (OutputContext) – The context of the step output that produces this object.

abstract handle_output(context, obj)[source]

User-defined method that stores an output of an op.

Parameters
  • context (OutputContext) – The context of the step output that produces this object.

  • obj (Any) – The object, returned by the op, to be stored.

abstract load_input(context)[source]

User-defined method that loads an input to an op.

Parameters

context (InputContext) – The input context, which describes the input that’s being loaded and the upstream output that’s being loaded from.

Returns

The data object.

Return type

Any

class dagster.IOManagerDefinition(resource_fn, config_schema=None, description=None, required_resource_keys=None, version=None, input_config_schema=None, output_config_schema=None)[source]

Definition of an IO manager resource.

IOManagers are used to store op outputs and load them as inputs to downstream ops.

An IOManagerDefinition is a ResourceDefinition whose resource_fn returns an IOManager.

The easiest way to create an IOManagerDefnition is with the @io_manager decorator.

static hardcoded_io_manager(value, description=None)[source]

A helper function that creates an IOManagerDefinition with a hardcoded IOManager.

Parameters
  • value (IOManager) – A hardcoded IO Manager which helps mock the definition.

  • description ([Optional[str]]) – The description of the IO Manager. Defaults to None.

Returns

A hardcoded resource.

Return type

[IOManagerDefinition]

property input_config_schema

The schema for per-input configuration for inputs that are managed by this input manager

property output_config_schema

The schema for per-output configuration for outputs that are managed by this manager

Input and Output Contexts

class dagster.InputContext(name=None, pipeline_name=None, solid_def=None, config=None, metadata=None, upstream_output=None, dagster_type=None, log_manager=None, resource_config=None, resources=None, step_context=None, op_def=None)[source]

The context object available to the load_input method of RootInputManager.

name

The name of the input that we’re loading.

Type

Optional[str]

pipeline_name

The name of the pipeline.

Type

Optional[str]

solid_def

The definition of the solid that’s loading the input.

Type

Optional[SolidDefinition]

config

The config attached to the input that we’re loading.

Type

Optional[Any]

metadata

A dict of metadata that is assigned to the InputDefinition that we’re loading for.

Type

Optional[Dict[str, Any]]

upstream_output

Info about the output that produced the object we’re loading.

Type

Optional[OutputContext]

dagster_type

The type of this input.

Type

Optional[DagsterType]

log

The log manager to use for this input.

Type

Optional[DagsterLogManager]

resource_config

The config associated with the resource that initializes the RootInputManager.

Type

Optional[Dict[str, Any]]

resources

The resources required by the resource that initializes the input manager. If using the @root_input_manager() decorator, these resources correspond to those requested with the required_resource_keys parameter.

Type

Optional[Resources]

op_def

The definition of the op that’s loading the input.

Type

Optional[OpDefinition]

add_input_metadata(metadata, description=None)[source]

Accepts a dictionary of metadata. Metadata entries will appear on the LOADED_INPUT event. If the input is an asset, metadata will be attached to an asset observation.

The asset observation will be yielded from the run and appear in the event log. Only valid if the context has an asset key.

property asset_partition_key

The partition key for input asset.

Raises an error if the input asset has no partitioning, or if the run covers a partition range for the input asset.

property asset_partition_key_range

The partition key range for input asset.

Raises an error if the input asset has no partitioning.

property asset_partitions_time_window

The time window for the partitions of the input asset.

Raises an error if either of the following are true: - The input asset has no partitioning. - The input asset is not partitioned with a TimeWindowPartitionsDefinition.

consume_events()[source]

Pops and yields all user-generated events that have been recorded from this context.

If consume_events has not yet been called, this will yield all logged events since the call to handle_input. If consume_events has been called, it will yield all events since the last time consume_events was called. Designed for internal use. Users should never need to invoke this method.

get_identifier()[source]

Utility method to get a collection of identifiers that as a whole represent a unique step input.

If not using memoization, the unique identifier collection consists of

  • run_id: the id of the run which generates the input.

    Note: This method also handles the re-execution memoization logic. If the step that generates the input is skipped in the re-execution, the run_id will be the id of its parent run.

  • step_key: the key for a compute step.

  • name: the name of the output. (default: ‘result’).

If using memoization, the version corresponding to the step output is used in place of the run_id.

Returns

A list of identifiers, i.e. (run_id or version), step_key, and output_name

Return type

List[str, ..]

get_observations()[source]

Retrieve the list of user-generated asset observations that were observed via the context.

User-generated events that were yielded will not appear in this list.

Examples:

from dagster import IOManager, build_input_context, AssetObservation

class MyIOManager(IOManager):
    def load_input(self, context, obj):
        ...

def test_load_input():
    mgr = MyIOManager()
    context = build_input_context()
    mgr.load_input(context)
    observations = context.get_observations()
    ...
property has_input_name

If we’re the InputContext is being used to load the result of a run from outside the run, then it won’t have an input name.

property has_partition_key

Whether the current run is a partitioned run

property partition_key

The partition key for the current run.

Raises an error if the current run is not a partitioned run.

class dagster.OutputContext(step_key=None, name=None, pipeline_name=None, run_id=None, metadata=None, mapping_key=None, config=None, solid_def=None, dagster_type=None, log_manager=None, version=None, resource_config=None, resources=None, step_context=None, op_def=None, asset_info=None, warn_on_step_context_use=False)[source]

The context object that is available to the handle_output method of an IOManager.

step_key

The step_key for the compute step that produced the output.

Type

Optional[str]

name

The name of the output that produced the output.

Type

Optional[str]

pipeline_name

The name of the pipeline definition.

Type

Optional[str]

run_id

The id of the run that produced the output.

Type

Optional[str]

metadata

A dict of the metadata that is assigned to the OutputDefinition that produced the output.

Type

Optional[Dict[str, Any]]

mapping_key

The key that identifies a unique mapped output. None for regular outputs.

Type

Optional[str]

config

The configuration for the output.

Type

Optional[Any]

solid_def

The definition of the solid that produced the output.

Type

Optional[SolidDefinition]

dagster_type

The type of this output.

Type

Optional[DagsterType]

log

The log manager to use for this output.

Type

Optional[DagsterLogManager]

version

(Experimental) The version of the output.

Type

Optional[str]

resource_config

The config associated with the resource that initializes the RootInputManager.

Type

Optional[Dict[str, Any]]

resources

The resources required by the output manager, specified by the required_resource_keys parameter.

Type

Optional[Resources]

op_def

The definition of the op that produced the output.

Type

Optional[OpDefinition]

asset_info

Optional[AssetOutputInfo]: (Experimental) Asset info corresponding to the output.

add_output_metadata(metadata)[source]

Add a dictionary of metadata to the handled output.

Metadata entries added will show up in the HANDLED_OUTPUT and ASSET_MATERIALIZATION events for the run.

Parameters

metadata (Dict[str, Any]) – A metadata dictionary to log

Examples:

from dagster import IOManager

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        context.add_output_metadata({"foo": "bar"})
property asset_partition_key

The partition key for output asset.

Raises an error if the output asset has no partitioning, or if the run covers a partition range for the output asset.

property asset_partition_key_range

The partition key range for output asset.

Raises an error if the output asset has no partitioning.

property asset_partitions_time_window

The time window for the partitions of the output asset.

Raises an error if either of the following are true: - The output asset has no partitioning. - The output asset is not partitioned with a TimeWindowPartitionsDefinition.

consume_events()[source]

Pops and yields all user-generated events that have been recorded from this context.

If consume_events has not yet been called, this will yield all logged events since the call to handle_output. If consume_events has been called, it will yield all events since the last time consume_events was called. Designed for internal use. Users should never need to invoke this method.

consume_logged_metadata_entries()[source]

Pops and yields all user-generated metadata entries that have been recorded from this context.

If consume_logged_metadata_entries has not yet been called, this will yield all logged events since the call to handle_output. If consume_logged_metadata_entries has been called, it will yield all events since the last time consume_logged_metadata_entries was called. Designed for internal use. Users should never need to invoke this method.

get_identifier()[source]

Utility method to get a collection of identifiers that as a whole represent a unique step output.

If not using memoization, the unique identifier collection consists of

  • run_id: the id of the run which generates the output.

    Note: This method also handles the re-execution memoization logic. If the step that generates the output is skipped in the re-execution, the run_id will be the id of its parent run.

  • step_key: the key for a compute step.

  • name: the name of the output. (default: ‘result’).

If using memoization, the version corresponding to the step output is used in place of the run_id.

Returns

A list of identifiers, i.e. (run_id or version), step_key, and output_name

Return type

List[str, ..]

get_logged_events()[source]

Retrieve the list of user-generated events that were logged via the context.

User-generated events that were yielded will not appear in this list.

Examples:

from dagster import IOManager, build_output_context, AssetMaterialization

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        ...

def test_handle_output():
    mgr = MyIOManager()
    context = build_output_context()
    mgr.handle_output(context)
    all_user_events = context.get_logged_events()
    materializations = [event for event in all_user_events if isinstance(event, AssetMaterialization)]
    ...
get_logged_metadata_entries()[source]

Get the list of metadata entries that have been logged for use with this output.

get_run_scoped_output_identifier()[source]

Utility method to get a collection of identifiers that as a whole represent a unique step output.

The unique identifier collection consists of

  • run_id: the id of the run which generates the output.

    Note: This method also handles the re-execution memoization logic. If the step that generates the output is skipped in the re-execution, the run_id will be the id of its parent run.

  • step_key: the key for a compute step.

  • name: the name of the output. (default: ‘result’).

Returns

A list of identifiers, i.e. run id, step key, and output name

Return type

List[str, ..]

property has_partition_key

Whether the current run is a partitioned run

log_event(event)[source]

Log an AssetMaterialization or AssetObservation from within the body of an io manager’s handle_output method.

Events logged with this method will appear in the event log.

Parameters

event (Union[AssetMaterialization, Materialization, AssetObservation]) – The event to log.

Examples:

from dagster import IOManager, AssetMaterialization

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        context.log_event(AssetMaterialization("foo"))
property partition_key

The partition key for the current run.

Raises an error if the current run is not a partitioned run.

dagster.build_input_context(name=None, config=None, metadata=None, upstream_output=None, dagster_type=None, resource_config=None, resources=None, op_def=None, step_context=None)[source]

Builds input context from provided parameters.

build_input_context can be used as either a function, or a context manager. If resources that are also context managers are provided, then build_input_context must be used as a context manager.

Parameters
  • name (Optional[str]) – The name of the input that we’re loading.

  • config (Optional[Any]) – The config attached to the input that we’re loading.

  • metadata (Optional[Dict[str, Any]]) – A dict of metadata that is assigned to the InputDefinition that we’re loading for.

  • upstream_output (Optional[OutputContext]) – Info about the output that produced the object we’re loading.

  • dagster_type (Optional[DagsterType]) – The type of this input.

  • resource_config (Optional[Dict[str, Any]]) – The resource config to make available from the input context. This usually corresponds to the config provided to the resource that loads the input manager.

  • resources (Optional[Dict[str, Any]]) – The resources to make available from the context. For a given key, you can provide either an actual instance of an object, or a resource definition.

  • asset_key (Optional[AssetKey]) – The asset key attached to the InputDefinition.

  • op_def (Optional[OpDefinition]) – The definition of the op that’s loading the input.

  • step_context (Optional[StepExecutionContext]) – For internal use.

Examples

build_input_context()

with build_input_context(resources={"foo": context_manager_resource}) as context:
    do_something
dagster.build_output_context(step_key=None, name=None, metadata=None, run_id=None, mapping_key=None, config=None, dagster_type=None, version=None, resource_config=None, resources=None, solid_def=None, op_def=None, asset_key=None)[source]

Builds output context from provided parameters.

build_output_context can be used as either a function, or a context manager. If resources that are also context managers are provided, then build_output_context must be used as a context manager.

Parameters
  • step_key (Optional[str]) – The step_key for the compute step that produced the output.

  • name (Optional[str]) – The name of the output that produced the output.

  • metadata (Optional[Dict[str, Any]]) – A dict of the metadata that is assigned to the OutputDefinition that produced the output.

  • mapping_key (Optional[str]) – The key that identifies a unique mapped output. None for regular outputs.

  • config (Optional[Any]) – The configuration for the output.

  • dagster_type (Optional[DagsterType]) – The type of this output.

  • version (Optional[str]) – (Experimental) The version of the output.

  • resource_config (Optional[Dict[str, Any]]) – The resource config to make available from the input context. This usually corresponds to the config provided to the resource that loads the output manager.

  • resources (Optional[Resources]) – The resources to make available from the context. For a given key, you can provide either an actual instance of an object, or a resource definition.

  • solid_def (Optional[SolidDefinition]) – The definition of the solid that produced the output.

  • op_def (Optional[OpDefinition]) – The definition of the op that produced the output.

  • asset_key – Optional[Union[AssetKey, Sequence[str], str]]: The asset key corresponding to the output.

Examples

build_output_context()

with build_output_context(resources={"foo": context_manager_resource}) as context:
    do_something

Built-in IO Managers

dagster.mem_io_manager IOManagerDefinition[source]

Built-in IO manager that stores and retrieves values in memory.

dagster.fs_io_manager IOManagerDefinition[source]

Built-in filesystem IO manager that stores and retrieves values using pickling.

Allows users to specify a base directory where all the step outputs will be stored. By default, step outputs will be stored in the directory specified by local_artifact_storage in your dagster.yaml file (which will be a temporary directory if not explicitly set).

Serializes and deserializes output values using pickling and automatically constructs the filepaths for ops and assets.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. So, with a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.

Example usage:

1. Specify a job-level IO manager using the reserved resource key "io_manager", which will set the given IO manager on all ops in a job.

from dagster import fs_io_manager, job, op

@op
def op_a():
    # create df ...
    return df

@op
def op_b(df):
    return df[:5]

@job(
    resource_defs={
        "io_manager": fs_io_manager.configured({"base_dir": "/my/base/path"})
    }
)
def job():
    op_b(op_a())

2. Specify IO manager on Out, which allows the user to set different IO managers on different step outputs.

from dagster import fs_io_manager, job, op, Out

@op(out=Out(io_manager_key="my_io_manager"))
def op_a():
    # create df ...
    return df

@op
def op_b(df):
    return df[:5]

@job(resource_defs={"my_io_manager": fs_io_manager})
def job():
    op_b(op_a())
dagster.custom_path_fs_io_manager IOManagerDefinition[source]

Built-in IO manager that allows users to custom output file path per output definition.

It requires users to specify a base directory where all the step output will be stored in. It serializes and deserializes output values (assets) using pickling and stores the pickled object in the user-provided file paths.

Example usage:

from dagster import custom_path_fs_io_manager, job, op

@op(out=Out(metadata={"path": "path/to/sample_output"}))
def sample_data(df):
    return df[:5]

my_custom_path_fs_io_manager = custom_path_fs_io_manager.configured(
    {"base_dir": "path/to/basedir"}
)

@job(resource_defs={"io_manager": my_custom_path_fs_io_manager})
def my_job():
    sample_data()

Root Input Managers (Experimental)

Root input managers are user-provided objects that specify how to load inputs that aren’t connected to upstream outputs.

@dagster.root_input_manager(config_schema=None, description=None, input_config_schema=None, required_resource_keys=None, version=None)[source]

Define a root input manager.

Root input managers load op inputs that aren’t connected to upstream outputs.

The decorated function should accept a InputContext and resource config, and return a loaded object that will be passed into one of the inputs of an op.

The decorator produces an RootInputManagerDefinition.

Parameters
  • config_schema (Optional[ConfigSchema]) – The schema for the resource-level config. If not set, Dagster will accept any config provided.

  • description (Optional[str]) – A human-readable description of the resource.

  • input_config_schema (Optional[ConfigSchema]) – A schema for the input-level config. Each input that uses this input manager can be configured separately using this config. If not set, Dagster will accept any config provided.

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the input manager.

  • version (Optional[str]) – (Experimental) the version of the input manager definition.

Examples:

from dagster import root_input_manager, op, job, In

@root_input_manager
def csv_loader(_):
    return read_csv("some/path")

@op(ins={"input1": In(root_manager_key="csv_loader_key")})
def my_op(_, input1):
    do_stuff(input1)

@job(resource_defs={"csv_loader_key": csv_loader})
def my_job():
    my_op()

@root_input_manager(config_schema={"base_dir": str})
def csv_loader(context):
    return read_csv(context.resource_config["base_dir"] + "/some/path")

@root_input_manager(input_config_schema={"path": str})
def csv_loader(context):
    return read_csv(context.config["path"])
class dagster.RootInputManager[source]

RootInputManagers are used to load inputs to ops at the root of a job.

The easiest way to define an RootInputManager is with the @root_input_manager decorator.

abstract load_input(context)[source]

The user-defined read method that loads data given its metadata.

Parameters

context (InputContext) – The context of the step output that produces this asset.

Returns

The data object.

Return type

Any

class dagster.RootInputManagerDefinition(resource_fn=None, config_schema=None, description=None, input_config_schema=None, required_resource_keys=None, version=None)[source]

Definition of a root input manager resource.

Root input managers load op inputs that aren’t connected to upstream outputs.

An RootInputManagerDefinition is a ResourceDefinition whose resource_fn returns an RootInputManager.

The easiest way to create an RootInputManagerDefinition is with the @root_input_manager decorator.

property input_config_schema

The schema for per-input configuration for inputs that are managed by this input manager