Kubernetes is a container orchestration system for automating deployment, scaling, and management of containerized applications. Dagster uses Kubernetes in combination with Helm, a package manager for Kubernetes applications. Using Helm, users specify the configuration of required Kubernetes resources to deploy Dagster through a values file or command-line overrides. References to values.yaml
in the following sections refer to Dagster's values.yaml
.
Dagster publishes a fully-featured Helm chart to manage installing and running a production-grade Kubernetes deployment of Dagster. For each Dagster component in the chart, Dagster publishes a corresponding Docker image on DockerHub.
kubectl
should be configured with your desired Kubernetes cluster. You should understand the basics of Helm, and Helm 3 should be installed. If you are creating your own user code images, Docker should be installed as well.
The Dagster Helm chart is versioned with the same version numbers as the Dagster Python library, and ideally should only be used together when the version numbers match.
In the following tutorial, we install the most recent version of the Dagster Helm chart. To use an older version of the Chart, a --version
flag can be passed to helm upgrade
. If you are using a chart version before 0.11.13, you will also need to update the tags of the Dagster provided images to match the Chart version. After 0.11.13, this will automatically be done for you.
Component Name | Type | Image |
---|---|---|
Daemon | Deployment | dagster/dagster-k8s (released weekly) |
Dagit | Deployment behind a Service | dagster/dagster-k8s (released weekly) |
Database | PostgreSQL | postgres (Optional) |
Run Worker | Job | User-provided or dagster/user-code-example (released weekly) |
User Code Deployment | Deployment behind a Service | User-provided or dagster/user-code-example (released weekly) |
The daemon periodically checks the Runs table in PostgreSQL for runs that are ready to be launched. The daemon also submits runs from schedules and sensors.
The daemon launches runs via the K8sRunLauncher
, creating a run worker Job with the image specified in the user code deployment.
The Dagit webserver communicates with the user code deployments via gRPC to fetch information needed to populate the Dagit UI. Dagit does not load or execute user-written code, and will remain available even when user code contains errors. Dagit frequently checks whether the user code deployment has been updated; and if so, the new information is fetched.
Dagit can be horizontally scaled by setting the dagit.replicaCount
field in the values.yaml
.
By default, dagit launches runs via the K8sRunLauncher
, which creates a new Kubernetes Job per run.
The user can connect an external database (i.e. using a cloud provider's managed database service, like RDS) or run PostgreSQL on Kubernetes. This database stores runs event logs, and other metadata, and powers much of the real-time and historical data visible in Dagit. In order to maintain a referenceable history of events, we recommend connecting an external database for most use cases.
The run worker is responsible for executing launched Dagster runs. The run worker uses the same image as the user code deployment at the time the run was submitted. The run worker uses ephemeral compute, and completes once the run is finished. Events that occur during the run are written to the database, and are displayed in Dagit.
The run worker jobs and pods are not automatically deleted, so that users are able to inspect results. It is up to the user to periodically delete old jobs and pods.
Each Dagster job specifies an executor that determines how the run worker will execute each step of the job. Different executors offer different levels of isolation and concurrency. Common choices are in_process_executor
(all steps run serially in a single process in a single pod), multiprocess_executor
(multiple processes in a single pod), and k8s_job_executor
(each step runs in a separate Kubernetes job). Generally, increasing isolation incurs some additional overhead per step (e.g. starting up a new Kubernetes job vs starting a new process within a pod). The executor can be configured per-run in the execution
block.
A user code deployment runs a gRPC server and responds to Dagit's requests for information (such as: "List all of the jobs in each repository" or "What is the dependency structure of job X?"). The user-provided image for the user code deployment must contain a repository definition and all of the packages needed to execute within the repository.
Users can have multiple user code deployments. A common pattern is for each user code deployment to correspond to a different repository.
User code deployments can be updated independently from other Dagster components, including Dagit. As a result, updates to repositories can occur without causing downtime to any other repository or to Dagit. After updating, if there is an error with any repository, an error is surfaced for that repository within Dagit; all other repositories and Dagit will still operate normally.
First, configure the kubectl
CLI to point at a kubernetes cluster.
You can use docker-desktop to set up a local k8s cluster to develop against, or substitute with another k8s cluster as desired.
If you're using docker-desktop and you have a local cluster set up, configure the kubectl
CLI to point to the local k8s cluster:
$ kubectl config set-context dagster --namespace default --cluster docker-desktop --user=docker-desktop
$ kubectl config use-context dagster
Skip this step if using Dagster's example user code image dagster/user-code-example.
Build a Docker image containing your Dagster repository and any dependencies needed to execute the business logic in your code.
For reference, here is an example Dockerfile and the corresponding user code directory. Here, we install all the Dagster-related dependencies in the Dockerfile, and then copy over the directory with the implementation of the Dagster repository into the root folder. We'll need to remember the path of this repository in a subsequent step to setup the gRPC server as a deployment.
The example user code repository includes a step_isolated_job
job that uses the k8s_job_executor
to run each op in its own pod, and a single_pod_job
job that runs all ops in a single pod.
For projects with many dependencies, it is recommended that you publish your Python project as a package and install that package in your Dockerfile.
Skip this step if using Dagster's example user code image.
Publish the image to a registry that is accessible from the Kubernetes cluster, such as AWS ECR or DockerHub.
Several of the jobs in dagster/user-code-example use an S3 IO Manager.
To run these jobs, you'll need an AWS S3 bucket available, and access to a pair of AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
values. This is because the IO Manager uses boto.
This tutorial also has the option of using minio
to mock an S3 endpoint locally in K8s. Note that this option utilizes host.docker.internal
to access a host from within Docker - this behavior has only been tested for MacOS, so may need different configuration for other platforms.
Skip this step if you'd like to use minio for a local S3 endpoint
If using S3, create a bucket in your AWS account -- for this tutorial, we'll create a bucket called test-bucket
. Also, keep your AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
credentials handy. Now, you can create your k8s secrets:
$ kubectl create secret generic dagster-aws-access-key-id --from-literal=AWS_ACCESS_KEY_ID=<YOUR ACCESS KEY ID>
$ kubectl create secret generic dagster-aws-secret-access-key --from-literal=AWS_SECRET_ACCESS_KEY=<SECRET ACCESS KEY>
Skip this step if you're using AWS S3
First, set up minio locally:
brew install minio/stable/minio # server
brew install minio/stable/mc # client
mkdir $HOME/miniodata # Prepare a directory for data
minio server $HOME/miniodata # start a server with default user/pass and no TLS
mc --insecure alias set minio http://localhost:9000 minioadmin minioadmin
# See it work
mc ls minio
date > date1.txt # create a sample file
mc cp date1.txt minio://testbucket/date1.txt
export AWS_ACCESS_KEY_ID="minioadmin"
export AWS_SECRET_ACCESS_KEY="minioadmin"
# See the aws cli work
aws --endpoint-url http://localhost:9000 s3 mb s3://test-bucket
aws --endpoint-url http://localhost:9000 s3 cp date1.txt s3://test-bucket/
Now, create your k8s AWS secrets:
$ kubectl create secret generic dagster-aws-access-key-id --from-literal=AWS_ACCESS_KEY_ID=minioadmin
$ kubectl create secret generic dagster-aws-secret-access-key --from-literal=AWS_SECRET_ACCESS_KEY=minioadmin
The Dagster chart repository contains the versioned charts for all Dagster releases. Add the remote url under the namespace dagster
to install the Dagster charts.
$ helm repo add dagster https://dagster-io.github.io/helm
Update the dagster-user-deployments.deployments
section of the Dagster chart's values.yaml
to include your deployment. Here, we can specify the configuration of the Kubernetes Deployment that will create the gRPC server for Dagit and the daemon to access the user code. The gRPC server is created through the arguments passed to dagsterApiGrpcArgs
, which expects a list of arguments for dagster api grpc
.
To get access to the Dagster values.yaml
, run:
$ helm show values dagster/dagster > values.yaml
The following snippet works for Dagster's example user code image. Since our Dockerfile contains the repository definition in a path, we specify arguments for the gRPC server to find this path under dagsterApiGrpcArgs
. Note that if you haven't set up an S3 endpoint, you can only run the job called single_pod_job
.
dagster-user-deployments:
enabled: true
deployments:
- name: "k8s-example-user-code-1"
image:
repository: "docker.io/dagster/user-code-example"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "--python-file"
- "/example_project/example_repo/repo.py"
port: 3030
dagsterApiGrpcArgs
also supports loading repository definitions from a package name. To find the applicable arguments, read here.
You can also specify configuration like configmaps, secrets, volumes, resource limits, and labels for individual user code deployments:
dagster-user-deployments:
enabled: true
deployments:
- name: "k8s-example-user-code-1"
image:
repository: "docker.io/dagster/user-code-example"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "--python-file"
- "/example_project/example_repo/repo.py"
port: 3030
envConfigMaps:
- my-config-map
envSecrets:
- my-secret
labels:
foo_label: bar_value
volumes:
- name: my-volume
configMap: my-config-map
volumeMounts:
- name: test-volume
mountPath: /opt/dagster/test_folder
subPath: test_file.yaml
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
includeConfigInLaunchedRuns:
enabled: true
By default, this configuration will also be included in the pods for any runs that are launched for the user code deployment. You can disable this behavior for a user code deployment by setting includeConfigInLaunchedRuns.enabled
to false
for that deployment.
step_isolated_job
(Optional)#You'll need a slightly different configuration to run the job called step_isolated_job
. This is because step_isolated_job
uses an s3_pickle_io_manager
, so you'll need to provide the user code k8s pods with AWS S3 credentials.
See the set up S3 section for setup instructions. The below snippet works for both AWS S3 and a local S3 endpoint via minio
.
dagster-user-deployments:
enabled: true
deployments:
- name: "k8s-example-user-code-1"
image:
repository: "docker.io/dagster/user-code-example"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "--python-file"
- "/example_project/example_repo/repo.py"
port: 3030
envSecrets:
- name: dagster-aws-access-key-id
- name: dagster-aws-secret-access-key
runLauncher:
type: K8sRunLauncher
config:
k8sRunLauncher:
envSecrets:
- name: dagster-aws-access-key-id
- name: dagster-aws-secret-access-key
Install the Helm chart and create a release. Below, we've named our release dagster
. We use helm upgrade --install
to create the release if it does not exist; otherwise, the existing dagster
release will be modified:
helm upgrade --install dagster dagster/dagster -f /path/to/values.yaml
Helm will launch several pods including PostgreSQL. You can check the status of the installation with kubectl
- note that it might take a few minutes for the pods to move to a Running
state.
If everything worked correctly, you should see output like the following:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
dagster-dagit-645b7d59f8-6lwxh 1/1 Running 0 11m
dagster-k8s-example-user-code-1-88764b4f4-ds7tn 1/1 Running 0 9m24s
dagster-postgresql-0 1/1 Running 0 17m
After Helm has successfully installed all the required kubernetes resources, start port forwarding to the Dagit pod via:
export DAGIT_POD_NAME=$(kubectl get pods --namespace default \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" \
-o jsonpath="{.items[0].metadata.name}")
kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80
Now try running a job. Visit http://127.0.0.1:8080, navigate to the launchpad and click Launch Run.
You can introspect the jobs that were launched with kubectl
:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-run-5ee8a0b3-7ca5-44e6-97a6-8f4bd86ee630 1/1 4s 11s
Now, you can try a run with step isolation. Switch to the step_isolated_job
job, changing the default config to point to your S3 bucket if needed, and launch the run.
If you're using minio, change your config to look like this:
resources:
io_manager:
config:
s3_bucket: "test-bucket"
s3:
config:
# This use of host.docker.internal is unique to Mac
endpoint_url: http://host.docker.internal:9000
region_name: us-east-1
ops:
multiply_the_word:
config:
factor: 0
inputs:
word: ""
Again, you can view the launched jobs:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-run-5ee8a0b3-7ca5-44e6-97a6-8f4bd86ee630 1/1 4s 11s
dagster-run-733baf75-fab2-4366-9542-0172fa4ebc1f 1/1 4s 100s
Some of the following commands will be useful if you'd like to debug issues with deploying on Helm:
# Get the Dagit pod's name
$ export DAGIT_POD_NAME=$(kubectl get pods --namespace default \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" \
-o jsonpath="{.items[0].metadata.name}")
# Start a shell in the dagit pod
$ kubectl exec --stdin --tty $DAGIT_POD_NAME -- /bin/bash
# Get debug data from $RUN_ID
$ kubectl exec $DAGIT_POD_NAME -- dagster debug export $RUN_ID debug_info.gzip
# Get a list of recently failed runs
$ kubectl exec $DAGIT_POD -- dagster debug export fakename fakename.gzip
# Get debug output of a failed run - note that this information is also available in Dagit
$ kubectl exec $DAGIT_POD -- dagster debug export 360d7882-e631-4ac7-8632-43c75cb4d426 debug.gzip
# Extract the debug.gzip from the pod
$ kubectl cp $DAGIT_POD:debug.gzip debug.gzip
# List config maps
$ kubectl get configmap # Make note of the "user-deployments" configmap
$ kubectl get configmap dagster-dagster-user-deployments-$NAME
We deployed Dagster, configured with the default K8sRunLauncher
, onto a Kubernetes cluster using Helm.