Using dbt with Dagster#

This guide explains how you can run a dbt project as part of a Dagster job. This guide focuses on dbt Core, the open-source dbt offering. Dagster also offers an integration with the hosted version of dbt, dbt Cloud. Check out the API docs to learn more.

What is dbt?#

dbt (data build tool) helps engineers transform data in their warehouses by simply writing SELECT statements. dbt automatically builds a dependency graph for your transformations and turns these SELECT statements into tables and views in your data warehouse.

dbt not only runs your data transformations, but also can create data quality tests and generate documentation for your data, right out of the box. To learn more about dbt, visit the official dbt documentation website.

How does dbt work with Dagster?#

Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single job. Dagster also provides built-in operational and data observability capabilities, like storing dbt run results longitudinally and sending alerts when a dbt run fails.

dagster-dbt is an integration library that provides pre-built resources for using dbt together with Dagster. These are all designed to be configurable for any dbt project.

The resources that dagster_dbt provides are

  • dbt_cli_resource (DbtCliResource): for running dbt CLI commands
  • dbt_rpc_resource (DbtRpcResource): for sending dbt commands to an RPC server
  • dbt_rpc_sync_resource (DbtRpcSyncResource): for sending dbt commands to an RPC server and polling until they complete

Each resource derives from the same base class, DbtResource, which provides a consistent python interface for interacting with dbt.

This library also provides pre-built ops that take advantage of these resources. These exist solely for convenience (they just run the underlying resource command and return the result), and if you have more complex use cases that require some degree of customization, it's recommended that you build your own ops that use these resources. These ops are compatible with all of the aforementioned dbt resources:

To view example jobs that take advantage of this dbt integration, check out either the Hacker News example repo or the dbt example repo.

Using the dbt CLI resource in a Dagster job#

dagster-dbt provides a dbt_cli_resource to help make it easy to run commands through the dbt CLI.

When you supply this resource to an op, you can call any of the many provided methods to invoke that particular CLI command. You can check out a full list of functions (as well as their signatures) in the DbtCliResource API Docs. All methods on the resource will return a DbtCliOutput object.

To run dbt CLI commands, your dbt project directory must be on your local filesystem and you must have a dbt profile already set up to connect to your data warehouse. Visit the official dbt CLI documentation for more details.

Configuration#

When you are configuring the dbt_cli_resource, you have a number of options available to you. Here, you're able to specify any command line options that you'd want to pass into all of your dbt commands.

Typically, you'll want to configure your project_dir here, as in most cases, you will only be working with a single dbt project in a given job, and wouldn't want to have to pass in this option to every function call. You might want to configure your profiles_dir, or the specific profile you'll be using for similar reasons.

from dagster_dbt import dbt_cli_resource

my_dbt_resource = dbt_cli_resource.configured(
    {
        "project_dir": "path/to/dbt/project",
        "profiles_dir": "path/to/dbt/profiles",
    }
)

While the config schema doesn't have an option for every single dbt flag (as some flags only work with certain commands), if you configure a flag that is not in the schema, it will still get passed into every cli invocation, exactly the same as the pre-defined config options.

There are also a few options that are not associated with command line flags, which may be useful. These are:

  • warn_error: will raise an error for issues that dbt would normally just warn on
  • target_path: the dbt target path, if you set it to something other than the default
  • dbt_executable: the name of the specific dbt executable you're using, if it's not just dbt

CLI Examples#

Below are some examples of using dagster_dbt library to run a dbt project via the CLI.

For these examples, we make heavy use of the dbt_cli_resource. This resource has methods corresponding to most dbt commands (such as run, seed, test, ls, etc.). The methods behave essentially identically to each other, so these examples will focus on dbt run for simplicity.

The dagster_dbt library provides ops that simplify cases where you simply wish to run a single one of these commands (with no special arguments) and return the result. This saves you from having to write out the boilerplate associated with creating an op from scratch. However, it's fairly common to have more complex needs, and in those cases you should create your own custom op that invokes the resource directly. The below examples will mix and match these two approaches where appropriate.

Note that you can pass in any keyword to these functions that you wish, and they will get added as flags to the underlying dbt command (e.g. my_flag_name = 'foo' will get converted to --my-flag-name foo). If there is a dbt option that you would like to set, but is not reflected in the function signature, this is how you would do so.

Using dbt_cli_resource to run your entire dbt project#

One common way to use this integration is to have the a step in your job run all of the models in a dbt project. For this case, the easiest method is to configure the resource so it knows where your dbt project is, and import the dbt_run_op to use in your job.

from dagster_dbt import dbt_cli_resource, dbt_run_op

from dagster import job

my_dbt_resource = dbt_cli_resource.configured(
    {"project_dir": "path/to/dbt/project"}
)

@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
    dbt_run_op()

Using dbt_cli_resource to run a specific set of models#

Sometimes, you just want to run a select set of models in your dbt project, rather than the entire thing. The below examples show two ways of doing this, depending on your use case.

Note that in both cases, the models option takes in a list of strings. The string "tag:staging" uses dbt's node selection syntax to filter models with the tag "staging". For more details, visit the official dbt documentation on the node selection syntax.

... with configuration#

If you know what models you want to select ahead of time, you might prefer specifying this while configuring your resource. Because you aren't specifying any specific arguments at runtime, you can use the prebuilt dbt_run_op, instead of writing your own.

from dagster_dbt import dbt_cli_resource, dbt_run_op

from dagster import job

my_dbt_resource = dbt_cli_resource.configured(
    {"project_dir": "path/to/dbt/project", "models": ["tag:staging"]}
)

@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
    dbt_run_op()
... supplying an argument#

If you want to change which models you select depending on what happens during execution, you should create your own op that invokes the dbt resource (rather than using a prebuilt op). This allows you to explicitly set the models parameter at runtime. This would look similar to the following:

from dagster import op

@op(required_resource_keys={"dbt"})
def run_models(context, some_condition: bool):
    if some_condition:
        context.resources.dbt.run(models=["tag:staging"])
    else:
        context.resources.dbt.run(models=["tag:other"])

Using a different dbt profile for different dagster modes#

Dagster supports creating multiple jobs from the same graph. dbt has a similar concept, profiles. You might want to run a dev version of your graph that targets the development-specific dbt profile, and then have a prod version that runs using the prod dbt profile. This example shows how to accomplish this.

from dagster_dbt import dbt_cli_resource, dbt_run_op

from dagster import graph

@graph
def my_dbt():
    dbt_run_op()

my_dbt_graph_dev = my_dbt.to_job(
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {"project_dir": "path/to/dbt/project", "profile": "dev"}
        )
    }
)

my_dbt_graph_prod = my_dbt.to_job(
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {"project_dir": "path/to/dbt/project", "profile": "prod"}
        )
    }
)

Invoking multiple dbt commands in the same job#

Sometimes, you'll want to run multiple different dbt commands in the same job. The dbt_cli_resource makes this convenient, as you only need to configure your dbt resource once, and all of that configuration will already be set for any ops that are using this resource.

One common use case would be to first run dbt run to update all of your models, and then run dbt test to check that they all are working as expected, seen below.

from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op

from dagster import job

my_dbt_resource = dbt_cli_resource.configured(
    {"project_dir": "path/to/dbt/project"}
)

@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
    dbt_test_op(start_after=dbt_run_op())

Using a dbt RPC server in a Dagster job#

This integration provides two separate resources to help run commands against a dbt RPC server. The dbt_rpc_resource will fire off requests against the server and return immediately, while the dbt_rpc_sync_resource will poll the server until each request completes.

The prebuilt ops provided by dagster-dbt (such as dbt_run_op, dbt_test_op, etc.) are all compatible with both of these resources.

Configuration#

Your dbt RPC server can be running locally or remotely. To interact with the RPC server in your Dagster job, you will need to create a resource for your dbt RPC server.

dagster_dbt.dbt_rpc_resource can be configured with your specific host and port.

from dagster_dbt import dbt_rpc_resource

my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})

For convenience during local development, you may also use dagster_dbt.local_dbt_rpc_resource, which is preconfigured for a dbt RPC server that is running on http://localhost:8580.

RPC Examples#

Running a dbt project over RPC#

All of the prebuilt dbt ops in this library are compatible with the dbt_rpc_resource (which sends requests and then exists immediately) and the dbt_rpc_sync_resource (which sends requests and waits for the task to complete). Therefore, we can use the same ops as in the CLI examples as long as we provide the correct resource to the job.

from dagster_dbt import dbt_run_op

from dagster import job

@job(resource_defs={"dbt": my_remote_rpc})
def my_dbt_job():
    dbt_run_op()

Running specific models in a dbt project over RPC#

This is similar to having "params": {"models": "tag:staging"} in your dbt RPC request body.

from dagster_dbt import dbt_rpc_resource

from dagster import job, op

my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})

@op(required_resource_keys={"dbt"})
def run_staging_models(context):
    context.resources.dbt.run(models=["tag:staging"])

@job(resource_defs={"dbt": my_remote_rpc})
def my_dbt_job():
    run_staging_models()

Note that the job above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the op to wait until execution is finished, instead supply a dbt_rpc_sync_resource (seen below).

Running a dbt project and polling the RPC server until it has finished executing#

from dagster_dbt import dbt_rpc_sync_resource

from dagster import job, op

my_remote_sync_rpc = dbt_rpc_sync_resource.configured(
    {"host": "80.80.80.80", "port": 8080}
)

@op(required_resource_keys={"dbt_sync"})
def run_staging_models_and_wait(context):
    context.resources.dbt.run(models=["tag:staging"])

@job(resource_defs={"dbt_sync": my_remote_sync_rpc})
def my_dbt_job():
    run_staging_models_and_wait()

Advanced Configuration#

For full documentation on all available config, visit the API docs for dagster-dbt.

Generating Asset Materializations when you run your dbt project

dagster_dbt is configured to automatically create Asset Materializations for each of your dbt models when you run the run command (via either the CLI or over RPC). These materializations are populated with metadata that is automatically parsed from the dbt response. The available metadata differs between dbt versions. If there's anything you would like to add to these materializations, you can explicitly invoke the underlying generate_materializations utility function, and modify each of the resulting materializations like so:

from dagster_dbt.utils import generate_materializations

from dagster import op

@op(required_resource_keys={"dbt"})
def dbt_run_with_custom_assets(context):
    dbt_result = context.resources.dbt.run()
    for materialization in generate_materializations(dbt_result):
        context.log_event(
            materialization._replace(
                metadata_entries=[...]  # insert whatever metadata you want here
            )
        )
    return dbt_result

dbt CLI: Set the dbt profile and target to load

from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"profile": PROFILE_NAME, "target": TARGET_NAME}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

dbt CLI: Set the path to the dbt executable

from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"dbt_executable": "path/to/dbt/executable"}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

dbt CLI: Select specific models to run

from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Exclude specific models

from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Set key-values for dbt vars

from dagster_dbt import dbt_cli_resource

from dagster import job

config = {"vars": {"key": "value"}}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

For more details, visit the official documentation on using variables in dbt.

dbt RPC: Configure a remote dbt RPC resource

from dagster_dbt import dbt_rpc_resource

custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT})

Conclusion#

If you find a bug or want to add a feature to the dagster-dbt library, we invite you to contribute.

If you have questions on using dbt with Dagster, we'd love to hear from you:

join-us-on-slack