IO Managers#

IO Managers are user-provided objects that store asset and op outputs and load them as inputs to downstream assets and ops.

IO Manager Diagram

Relevant APIs#

NameDescription
@io_managerA decorator used to define IO managers.
IOManagerBase class for user-provided IO managers.
build_input_contextFunction for directly constructing a InputContext, to be passed to the IOManager.load_input method. This is designed primarily for testing purposes.
build_output_contextFunction for directly constructing a OutputContext, to be passed to the IOManager.handle_output method. This is designed primarily for testing purposes.

Overview#

Dagster ops have inputs and outputs. When an op returns an output and a downstream op takes that output as an input, where does the data live in between? IOManagers let the user decide. Similarly, IOManagers are responsible for storing asset outputs and loading inputs in downstream assets.

The IO manager APIs make it easy to separate code that's responsible for logical data transformation from code that's responsible for reading and writing the results. Assets and ops can focus on business logic, while IO managers handle I/O. This separation makes it easier to test the business logic and run it in different environments.

Not all inputs depend on upstream outputs. The Unconnected Inputs overview covers DagsterTypeLoaders and RootInputManagers (experimental), which let you decide how inputs at the beginning of a job are loaded.

Outputs and downstream inputs#

IOManagers are user-provided objects that are responsible for storing the output of an asset or op and loading it as input to downstream assets or ops. For example, an IO Manager might store and load objects from files on a filesystem.

For ops, each op output can have its own IO manager, or multiple op outputs can share an IO manager. The IO manager that's used for handling a particular op output is automatically used for loading it in downstream ops.

two-io-managers

This diagram shows a job with two IO managers, each of which is shared across a few inputs and outputs.

For assets, each asset can have its own IO manager. In the multi-asset case where multiple assets are outputted, each outputted asset can be handled with a different IO manager.

The default IO manager, fs_io_manager, stores and retrieves values in the filesystem while pickling. If a job is invoked via JobDefinition.execute_in_process, the default IO manager is switched to mem_io_manager, which stores outputs in memory.

Dagster provides out-of-the-box IO managers that pickle objects and save them. These are s3_pickle_io_manager , adls2_pickle_io_manager , or gcs_pickle_io_manager. These filesystem IO managers, along with fs_io_manager, store op outputs at a unique path identified by the run ID, step key, and output name. These IO managers will output assets at a unique path identified by the asset key.

IO managers are resources, which means users can supply different IO managers for the same op outputs in different situations. For example, you might use an in-memory IO manager for unit-testing a job and an S3 IO manager in production.


Using an IO manager#

Applying IO managers to assets#

By default, materializing an asset will pickle it to a local file named my_asset in a temporary directory. You can specify this directory by providing a value for the local_artifact_storage property in your dagster.yaml file.

IO managers enable fully overriding this behavior and storing asset contents in any way you wish - e.g. writing them as tables in a database or as objects in a cloud object store. You can use one of Dagster's built-in IO managers that pickle assets to popular services - AWS S3 (s3_pickle_io_manager), Azure Blob Storage (adls2_pickle_io_manager), or GCS (gcs_pickle_io_manager) - or you can write your own.

To apply an IO manager to a set of assets, you can use with_resources:

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import asset, with_resources


@asset
def upstream_asset():
    return [1, 2, 3]


@asset
def downstream_asset(upstream_asset):
    return upstream_asset + [4]


assets_with_io_manager = with_resources(
    [upstream_asset, downstream_asset],
    resource_defs={"io_manager": s3_pickle_io_manager, "s3": s3_resource},
)

This example also includes "s3": s3_resource because the s3_pickle_io_manager depends on an S3 resource.

When upstream_asset is materialized, the value [1, 2, 3] will be pickled and stored in an object on S3. When downstream_asset is materialized, the value of upstream_asset will be read from S3 and depickled, and [1, 2, 3, 4] will be pickled and stored in a different object on S3.

Per-asset IO manager#

Different assets can have different IO managers:

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import asset, fs_io_manager, with_resources


@asset(io_manager_key="s3_io_manager")
def upstream_asset():
    return [1, 2, 3]


@asset(io_manager_key="fs_io_manager")
def downstream_asset(upstream_asset):
    return upstream_asset + [4]


assets_with_io_managers = with_resources(
    [upstream_asset, downstream_asset],
    resource_defs={
        "s3_io_manager": s3_pickle_io_manager,
        "s3": s3_resource,
        "fs_io_manager": fs_io_manager,
    },
)

When upstream_asset is materialized, the value [1, 2, 3] will be pickled and stored in an object on S3. When downstream_asset is materialized, the value of upstream_asset will be read from S3 and depickled, and [1, 2, 3, 4] will be pickled and stored in a file on the local filesystem.

In the multi-asset case, you can customize how each asset is materialized by specifying an io_manager_key on each output of the multi-asset.

from dagster import Out, multi_asset


@multi_asset(
    outs={
        "s3_asset": Out(io_manager_key="s3_io_manager"),
        "adls_asset": Out(io_manager_key="adls2_io_manager"),
    },
)
def my_assets():
    return "store_me_on_s3", "store_me_on_adls2"

The same assets can be bound to different resources and IO managers in different environments. For example, for local development, you might want to store assets on your local filesystem while in production, you might want to store the assets in S3.

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import asset, fs_io_manager, with_resources


@asset
def upstream_asset():
    return [1, 2, 3]


@asset
def downstream_asset(upstream_asset):
    return upstream_asset + [4]


prod_assets = with_resources(
    [upstream_asset, downstream_asset],
    resource_defs={"io_manager": s3_pickle_io_manager, "s3": s3_resource},
)

local_assets = with_resources(
    [upstream_asset, downstream_asset],
    resource_defs={"io_manager": fs_io_manager},
)

Job-wide IO manager#

By default, all the inputs and outputs in a job use the same IO manager. This IO manager is determined by the ResourceDefinition provided for the "io_manager" resource key. "io_manager" is a resource key that Dagster reserves specifically for this purpose.

Here’s how to specify that all op outputs are stored using the fs_io_manager, which pickles outputs and stores them on the local filesystem. It stores files in a directory with the run ID in the path, so that outputs from prior runs will never be overwritten.

from dagster import fs_io_manager, job, op


@op
def op_1():
    return 1


@op
def op_2(a):
    return a + 1


@job(resource_defs={"io_manager": fs_io_manager})
def my_job():
    op_2(op_1())

Per-output IO manager#

Not all the outputs in a job should necessarily be stored the same way. Maybe some of the outputs should live on the filesystem so they can be inspected, and others can be transiently stored in memory.

To select the IO manager for a particular output, you can set an io_manager_key on Out, and then refer to that io_manager_key when setting IO managers in your job. In this example, the output of op_1 will go to fs_io_manager and the output of op_2 will go to s3_pickle_io_manager.

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import Out, fs_io_manager, job, op


@op(out=Out(io_manager_key="fs"))
def op_1():
    return 1


@op(out=Out(io_manager_key="s3_io"))
def op_2(a):
    return a + 1


@job(
    resource_defs={
        "fs": fs_io_manager,
        "s3_io": s3_pickle_io_manager,
        "s3": s3_resource,
    }
)
def my_job():
    op_2(op_1())

Defining an IO manager#

If you have specific requirements for where and how your outputs should be stored and retrieved, you can define your own IO manager. This boils down to implementing two functions: one that stores outputs and one that loads inputs.

To define an IO manager, use the @io_manager decorator.

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        write_csv("some/path")

    def load_input(self, context):
        return read_csv("some/path")

@io_manager
def my_io_manager(init_context):
    return MyIOManager()

The io_manager decorator behaves nearly identically to the resource decorator. It yields an IOManagerDefinition, which is a subclass of ResourceDefinition that will produce an IOManager.

The provided context argument for handle_output is an OutputContext. The provided context argument for load_input is an InputContext. The linked API documentation lists all the fields that are available on these objects.

Accessing the partition key#

IO managers interoperate smoothly with partitions. You can access the partition key for the current run using the context for both load_input and handle_output. If working with assets, you can also access the asset-specific partition key or partition key range, though most of the time the run partition key will be equal to the asset partition key.

from dagster import IOManager


class MyPartitionedIOManager(IOManager):
    def path_for_partition(self, partition_key):
        return f"some/path/{partition_key}.csv"

    # `context.partition_key` is the run-scoped partition key
    def handle_output(self, context, obj):
        write_csv(self.path_for_partition(context.partition_key), obj)

    # `context.asset_partition_key` is set to the partition key for an asset
    # (if the `IOManager` is handling an asset). This is usually equal to the
    # run `partition_key`.
    def load_input(self, context):
        return read_csv(self.path_for_partition(context.asset_partition_key))

Examples#

A custom IO manager that stores Pandas DataFrames in tables#

If your ops produce Pandas DataFrames that populate tables in a data warehouse, you might write something like the following. This IO manager uses the name assigned to the output as the name of the table to write the output to.

from dagster import IOManager, io_manager


class DataframeTableIOManager(IOManager):
    def handle_output(self, context, obj):
        # name is the name given to the Out that we're storing for
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        # upstream_output.name is the name given to the Out that we're loading for
        table_name = context.upstream_output.name
        return read_dataframe_from_table(name=table_name)


@io_manager
def df_table_io_manager(_):
    return DataframeTableIOManager()


@job(resource_defs={"io_manager": df_table_io_manager})
def my_job():
    op_2(op_1())

Providing per-output config to an IO manager#

When launching a run, you might want to parameterize how particular outputs are stored.

For example, if your job produces DataFrames to populate tables in a data warehouse, you might want to specify the table that each output goes to at run launch time.

To accomplish this, you can define an output_config_schema on the IO manager definition. The IO manager methods can access this config when storing or loading data, via the OutputContext.

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.config["table"]
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        table_name = context.upstream_output.config["table"]
        return read_dataframe_from_table(name=table_name)


@io_manager(output_config_schema={"table": str})
def my_io_manager(_):
    return MyIOManager()

Then, when executing a job, you can pass in this per-output config.

def execute_my_job_with_config():
    @job(resource_defs={"io_manager": my_io_manager})
    def my_job():
        op_2(op_1())

    my_job.execute_in_process(
        run_config={
            "ops": {
                "op_1": {"outputs": {"result": {"table": "table1"}}},
                "op_2": {"outputs": {"result": {"table": "table2"}}},
            }
        },
    )

Providing per-output metadata to an IO manager#

You might want to provide static metadata that controls how particular outputs are stored. You don't plan to change the metadata at runtime, so it makes more sense to attach it to a definition rather than expose it as a configuration option.

For example, if your job produces DataFrames to populate tables in a data warehouse, you might want to specify that each output always goes to a particular table. To accomplish this, you can define metadata on each Out:

@op(out=Out(metadata={"schema": "some_schema", "table": "some_table"}))
def op_1():
    """Return a Pandas DataFrame"""


@op(out=Out(metadata={"schema": "other_schema", "table": "other_table"}))
def op_2(_input_dataframe):
    """Return a Pandas DataFrame"""

The IO manager can then access this metadata when storing or retrieving data, via the OutputContext.

In this case, the table names are encoded in the job definition. If, instead, you want to be able to set them at run time, the next section describes how.

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.metadata["table"]
        schema = context.metadata["schema"]
        write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj)

    def load_input(self, context):
        table_name = context.upstream_output.metadata["table"]
        schema = context.upstream_output.metadata["schema"]
        return read_dataframe_from_table(name=table_name, schema=schema)


@io_manager
def my_io_manager(_):
    return MyIOManager()

Testing an IO manager#

The easiest way to test an IO manager is to construct an OutputContext or InputContext and pass it to the handle_output or load_input method of the IO manager. The build_output_context and build_input_context functions allow for easy construction of these contexts.

Here's an example for a simple IO manager that stores outputs in an in-memory dictionary that's keyed on the step and name of the output.

from dagster import IOManager, build_input_context, build_output_context, io_manager


class MyIOManager(IOManager):
    def __init__(self):
        self.storage_dict = {}

    def handle_output(self, context, obj):
        self.storage_dict[(context.step_key, context.name)] = obj

    def load_input(self, context):
        return self.storage_dict[
            (context.upstream_output.step_key, context.upstream_output.name)
        ]


@io_manager
def my_io_manager(_):
    return MyIOManager()


def test_my_io_manager_handle_output():
    manager = my_io_manager(None)
    context = build_output_context(name="abc", step_key="123")
    manager.handle_output(context, 5)
    assert manager.storage_dict[("123", "abc")] == 5


def test_my_io_manager_load_input():
    manager = my_io_manager(None)
    manager.storage_dict[("123", "abc")] = 5

    context = build_input_context(
        upstream_output=build_output_context(name="abc", step_key="123")
    )
    assert manager.load_input(context) == 5

Yielding metadata from an IO Manager
Experimental
#

Sometimes, you may want to record some metadata while handling an output in an IO manager. To do this, you can optionally yield MetadataEntry objects from within the body of the handle_output function. Using this, we can modify one of the above examples to now include some helpful metadata in the log:

class DataframeTableIOManagerWithMetadata(IOManager):
    def handle_output(self, context, obj):
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

        # attach these to the Handled Output event
        yield MetadataEntry.int(len(obj), label="number of rows")
        yield MetadataEntry.text(table_name, label="table name")

    def load_input(self, context):
        table_name = context.upstream_output.name
        return read_dataframe_from_table(name=table_name)

Any entries yielded this way will be attached to the Handled Output event for this output.

Additionally, if you have specified that this handle_output function will be writing to an asset by defining a get_output_asset_key function, these metadata entries will also be attached to the materialization event created for that asset. You can learn more about this functionality in the Asset Docs.

See it in action#

For more examples of IO Managers, check out the following in our Hacker News example:

Our Bollinger example and New York Times example also cover writing custom IO Managers.