Dagster ops can perform computations using Spark.
Running computations on Spark presents unique challenges, because, unlike other computations, Spark jobs typically execute on infrastructure that's specialized for Spark - i.e. that can network sets of workers into clusters that Spark can run computations against. Spark applications are typically not containerized or executed on Kubernetes. Running Spark code often requires submitting code to a Databricks or EMR cluster.
There are two approaches to writing Dagster ops that invoke Spark computations:
With this approach, the code inside the op submits a Spark job to an external system like Databricks or EMR, usually pointing to a jar or zip of Python files that contain the actual Spark data transformations and actions.
If you want to use this approach to run a Spark job on Databricks, create_databricks_job_op
helps create an op that submits a Spark job using the Databricks REST API.
If you want to run a Spark job against YARN or a Spark Standalone cluster, you can use create_shell_command_op
to create an op that invokes spark-submit
.
This is the easiest approach for migrating existing Spark jobs, and it's the only approach that works for Spark jobs written in Java or Scala. The downside is that it loses out on some of the benefits of Dagster - the implementation of each op is bound to the execution environment, so you can't run your Spark transformations without relying on external infrastructure. Writing unit tests is cumbersome.
With this approach, the code inside the op consists of pure logical data transformations on Spark DataFrames or RDDs. The op-decorated function accepts DataFrames as parameters and returns DataFrames when it completes. An IO manager handles writing and reading the DataFrames to and from persistent storage. The Running PySpark code in op example below shows what this looks like.
If you want your Spark driver to run inside a Spark cluster, you use a "step launcher" resource that informs Dagster how to launch the op. The step launcher resource is responsible for invoking spark-submit
or submitting the job to Databricks or EMR. Submitting PySpark ops on EMR shows what this looks like for EMR.
The advantage of this approach is a very clean local testing story. You can run an entire job of Spark ops in a single process. You can use IO managers to abstract away IO - storing outputs on the local filesystem during local development and in the cloud in production.
The downside is that this approach only works with PySpark, and setting up a step launcher can be difficult. We currently provide an emr_pyspark_step_launcher
and a databricks_pyspark_step_launcher
, but if you need to submit your Spark job to a different kind of cluster, writing your own can be time consuming (here are some tips). You also need to install the Dagster library itself on the cluster.
Passing PySpark DataFrames between ops requires a little bit of extra care, compared to other data types, for a couple reasons:
write
or collect
is called on a DataFrame.fs_io_manager
won't work for them.In this example, we've defined an IOManager
that knows how to store and retrieve PySpark DataFrames that are produced and consumed by ops.
This example assumes that all the outputs within the dagster job will be PySpark DataFrames and stored in the same way. To learn how to use different IO managers for different outputs within the same dagster job, take a look at the IO Manager concept page.
This example writes out DataFrames to the local file system, but can be tweaked to write to cloud object stores like S3 by changing to the write
and read
invocations.
import os
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from dagster import IOManager, graph, io_manager, op, repository
class LocalParquetIOManager(IOManager):
def _get_path(self, context):
return os.path.join(context.run_id, context.step_key, context.name)
def handle_output(self, context, obj):
obj.write.parquet(self._get_path(context))
def load_input(self, context):
spark = SparkSession.builder.getOrCreate()
return spark.read.parquet(self._get_path(context.upstream_output))
@io_manager
def local_parquet_io_manager():
return LocalParquetIOManager()
@op
def make_people() -> DataFrame:
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
spark = SparkSession.builder.getOrCreate()
return spark.createDataFrame(rows, schema)
@op
def filter_over_50(people: DataFrame) -> DataFrame:
return people.filter(people["age"] > 50)
@graph
def make_and_filter_data():
filter_over_50(make_people())
make_and_filter_data_job = make_and_filter_data.to_job(
resource_defs={"io_manager": local_parquet_io_manager}
)
This example demonstrates how to use the emr_pyspark_step_launcher
to have an op run as a Spark step on an EMR cluster. In it, each of the two ops will be executed as a separate EMR step on the same EMR cluster.
from pathlib import Path
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import s3_resource
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from dagster import IOManager, ResourceDefinition, graph, io_manager, op, repository
class ParquetIOManager(IOManager):
def _get_path(self, context):
return "/".join(
[context.resource_config["path_prefix"], context.run_id, context.step_key, context.name]
)
def handle_output(self, context, obj):
obj.write.parquet(self._get_path(context))
def load_input(self, context):
spark = context.resources.pyspark.spark_session
return spark.read.parquet(self._get_path(context.upstream_output))
@io_manager(required_resource_keys={"pyspark"}, config_schema={"path_prefix": str})
def parquet_io_manager():
return ParquetIOManager()
@op(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
return context.resources.pyspark.spark_session.createDataFrame(rows, schema)
@op(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(people: DataFrame) -> DataFrame:
return people.filter(people["age"] > 50)
emr_resource_defs = {
"pyspark_step_launcher": emr_pyspark_step_launcher.configured(
{
"cluster_id": {"env": "EMR_CLUSTER_ID"},
"local_pipeline_package_path": str(Path(__file__).parent),
"deploy_local_pipeline_package": True,
"region_name": "us-west-1",
"staging_bucket": "my_staging_bucket",
"wait_for_logs": True,
}
),
"pyspark": pyspark_resource.configured({"spark_conf": {"spark.executor.memory": "2g"}}),
"s3": s3_resource,
"io_manager": parquet_io_manager.configured({"path_prefix": "s3://my-s3-bucket"}),
}
local_resource_defs = {
"pyspark_step_launcher": ResourceDefinition.none_resource(),
"pyspark": pyspark_resource.configured({"spark_conf": {"spark.default.parallelism": 1}}),
"io_manager": parquet_io_manager.configured({"path_prefix": "."}),
}
@graph
def make_and_filter_data():
filter_over_50(make_people())
make_and_filter_data_local = make_and_filter_data.to_job(
name="local", resource_defs=local_resource_defs
)
make_and_filter_data_emr = make_and_filter_data.to_job(name="prod", resource_defs=emr_resource_defs)
The EMR PySpark step launcher relies on S3 to shuttle config and events to and from EMR.