dagster.
RunRequest
(run_key, run_config=None, tags=None, job_name=None)[source]¶Represents all the information required to launch a single run. Must be returned by a SensorDefinition or ScheduleDefinition’s evaluation function for a run to be launched.
run_key
¶A string key to identify this launched run. For sensors, ensures that only one run is created per run key across all sensor evaluations. For schedules, ensures that one run is created per tick, across failure recoveries. Passing in a None value means that a run will always be launched per evaluation.
str | None
run_config
¶The config that parameterizes the run execution to be launched, as a dict.
Optional[Dict]
A dictionary of tags (string key-value pairs) to attach to the launched run.
@
dagster.
schedule
(cron_schedule, pipeline_name=None, name=None, tags=None, tags_fn=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, execution_timezone=None, description=None, job=None, default_status=<DefaultScheduleStatus.STOPPED: 'STOPPED'>)[source]¶Creates a schedule following the provided cron schedule and requests runs for the provided job.
The decorated function takes in a ScheduleEvaluationContext
as its only
argument, and does one of the following:
Return a RunRequest object.
Return a list of RunRequest objects.
Return a SkipReason object, providing a descriptive message of why no runs were requested.
Return nothing (skipping without providing a reason)
Return a run config dictionary.
Yield a SkipReason or yield one ore more RunRequest objects.
Returns a ScheduleDefinition
.
cron_schedule (str) – A valid cron string specifying when the schedule will run, e.g.,
'45 23 * * 6'
for a schedule that runs at 11:45 PM every Saturday.
pipeline_name (Optional[str]) – (legacy) The name of the pipeline to execute when the schedule runs.
name (Optional[str]) – The name of the schedule to create.
tags (Optional[Dict[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.
tags_fn (Optional[Callable[[ScheduleEvaluationContext], Optional[Dict[str, str]]]]) – A function
that generates tags to attach to the schedules runs. Takes a
ScheduleEvaluationContext
and returns a dictionary of tags (string
key-value pairs). You may set only one of tags
and tags_fn
.
solid_selection (Optional[List[str]]) – A list of solid subselection (including single
solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']
mode (Optional[str]) – The pipeline mode in which to execute this schedule. (Default: ‘default’)
should_execute (Optional[Callable[[ScheduleEvaluationContext], bool]]) – A function that runs at
schedule execution time to determine whether a schedule should execute or skip. Takes a
ScheduleEvaluationContext
and returns a boolean (True
if the
schedule should execute). Defaults to a function that always returns True
.
environment_vars (Optional[Dict[str, str]]) – Any environment variables to set when executing the schedule.
execution_timezone (Optional[str]) – Timezone in which the schedule should run. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
description (Optional[str]) – A human-readable description of the schedule.
job (Optional[Union[GraphDefinition, JobDefinition]]) – The job that should execute when this schedule runs.
default_status (DefaultScheduleStatus) – Whether the schedule starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
dagster.
ScheduleDefinition
(name=None, cron_schedule=None, pipeline_name=None, run_config=None, run_config_fn=None, tags=None, tags_fn=None, solid_selection=None, mode='default', should_execute=None, environment_vars=None, execution_timezone=None, execution_fn=None, description=None, job=None, default_status=<DefaultScheduleStatus.STOPPED: 'STOPPED'>)[source]¶Define a schedule that targets a job
name (Optional[str]) – The name of the schedule to create. Defaults to the job name plus “_schedule”.
cron_schedule (str) – A valid cron string specifying when the schedule will run, e.g., ‘45 23 * * 6’ for a schedule that runs at 11:45 PM every Saturday.
pipeline_name (Optional[str]) – (legacy) The name of the pipeline to execute when the schedule runs.
execution_fn (Callable[ScheduleEvaluationContext]) –
The core evaluation function for the
schedule, which is run at an interval to determine whether a run should be launched or
not. Takes a ScheduleEvaluationContext
.
This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.
run_config (Optional[Dict]) – The config that parameterizes this execution, as a dict.
run_config_fn (Optional[Callable[[ScheduleEvaluationContext], [Dict]]]) – A function that
takes a ScheduleEvaluationContext object and returns the run configuration that
parameterizes this execution, as a dict. You may set only one of run_config
,
run_config_fn
, and execution_fn
.
tags (Optional[Dict[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.
tags_fn (Optional[Callable[[ScheduleEvaluationContext], Optional[Dict[str, str]]]]) – A
function that generates tags to attach to the schedules runs. Takes a
ScheduleEvaluationContext
and returns a dictionary of tags (string
key-value pairs). You may set only one of tags
, tags_fn
, and execution_fn
.
solid_selection (Optional[List[str]]) – A list of solid subselection (including single
solid names) to execute when the schedule runs. e.g. ['*some_solid+', 'other_solid']
mode (Optional[str]) – (legacy) The mode to apply when executing this schedule. (default: ‘default’)
should_execute (Optional[Callable[[ScheduleEvaluationContext], bool]]) – A function that runs
at schedule execution time to determine whether a schedule should execute or skip. Takes
a ScheduleEvaluationContext
and returns a boolean (True
if the
schedule should execute). Defaults to a function that always returns True
.
environment_vars (Optional[dict[str, str]]) – The environment variables to set for the schedule
execution_timezone (Optional[str]) – Timezone in which the schedule should run. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
description (Optional[str]) – A human-readable description of the schedule.
job (Optional[Union[GraphDefinition, JobDefinition]]) – The job that should execute when this schedule runs.
default_status (DefaultScheduleStatus) – Whether the schedule starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
dagster.
ScheduleEvaluationContext
(instance_ref, scheduled_execution_time)[source]¶Schedule-specific execution context.
An instance of this class is made available as the first argument to various ScheduleDefinition
functions. It is passed as the first argument to run_config_fn
, tags_fn
,
and should_execute
.
instance_ref
¶The serialized instance configured to run the schedule
Optional[InstanceRef]
scheduled_execution_time
¶The time in which the execution was scheduled to happen. May differ slightly from both the actual execution time and the time at which the run config is computed. Not available in all schedulers - currently only set in deployments using DagsterDaemonScheduler.
datetime
dagster.
build_schedule_context
(instance=None, scheduled_execution_time=None)[source]¶Builds schedule execution context using the provided parameters.
The instance provided to build_schedule_context
must be persistent;
DagsterInstance.ephemeral() will result in an error.
instance (Optional[DagsterInstance]) – The dagster instance configured to run the schedule.
scheduled_execution_time (datetime) – The time in which the execution was scheduled to happen. May differ slightly from both the actual execution time and the time at which the run config is computed.
Examples
context = build_schedule_context(instance)
daily_schedule.evaluate_tick(context)
dagster.core.scheduler.
DagsterDaemonScheduler
Scheduler[source]¶For partitioned schedules, controls the maximum number of past partitions for each schedule that will be considered when looking for missing runs . Generally this parameter will only come into play if the scheduler falls behind or launches after experiencing downtime. This parameter will not be checked for schedules without partition sets (for example, schedules created using the @schedule decorator) - only the most recent execution time will be considered for those schedules.
Note that no matter what this value is, the scheduler will never launch a run from a time before the schedule was turned on (even if the start_date on the schedule is earlier) - if you want to launch runs for earlier partitions, launch a backfill.
Default Value: 5
For each schedule tick that raises an error, how many times to retry that tick
Default Value: 0
Default scheduler implementation that submits runs from the dagster-daemon long-lived process. Periodically checks each running schedule for execution times that don’t have runs yet and launches them.
dagster.
build_schedule_from_partitioned_job
(job, description=None, name=None, minute_of_hour=None, hour_of_day=None, day_of_week=None, day_of_month=None, default_status=<DefaultScheduleStatus.STOPPED: 'STOPPED'>)[source]¶Creates a schedule from a time window-partitioned job.
The schedule executes at the cadence specified by the partitioning of the given job.
dagster.
PartitionScheduleDefinition
(name, cron_schedule, pipeline_name, tags_fn, solid_selection, mode, should_execute, environment_vars, partition_set, run_config_fn=None, execution_timezone=None, execution_fn=None, description=None, decorated_fn=None, job=None, default_status=<DefaultScheduleStatus.STOPPED: 'STOPPED'>)[source]¶@
dagster.
hourly_partitioned_config
(start_date, minute_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]Defines run config over a set of hourly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
@hourly_partitioned_config(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...
@hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
@
dagster.
daily_partitioned_config
(start_date, minute_offset=0, hour_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]Defines run config over a set of daily partitions.
The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
@daily_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...
@daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
@
dagster.
weekly_partitioned_config
(start_date, minute_offset=0, hour_offset=0, day_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]Defines run config over a set of weekly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
day_offset (int) – Day of the week to “split” the partition. Defaults to 0 (Sunday).
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
@weekly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...
@weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
@
dagster.
monthly_partitioned_config
(start_date, minute_offset=0, hour_offset=0, day_offset=1, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]Defines run config over a set of monthly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at midnight on the soonest first of the month after start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will be midnight the sonnest first of the month following start_date. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
day_offset (int) – Day of the month to “split” the partition. Defaults to 1.
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
@monthly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...
@monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...
@
dagster.
sensor
(pipeline_name=None, name=None, solid_selection=None, mode=None, minimum_interval_seconds=None, description=None, job=None, jobs=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Creates a sensor where the decorated function is used as the sensor’s evaluation function. The decorated function may:
Return a RunRequest object.
Return a list of RunRequest objects.
Return a SkipReason object, providing a descriptive message of why no runs were requested.
Return nothing (skipping without providing a reason)
Yield a SkipReason or yield one ore more RunRequest objects.
Takes a SensorEvaluationContext
.
pipeline_name (Optional[str]) – (legacy) Name of the target pipeline. Cannot be used in conjunction with job or jobs parameters.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
solid_selection (Optional[List[str]]) – (legacy) A list of solid subselection (including single
solid names) to execute for runs for this sensor e.g.
['*some_solid+', 'other_solid']
.
Cannot be used in conjunction with job or jobs parameters.
mode (Optional[str]) – (legacy) The mode to apply when executing runs for this sensor. Cannot be used in conjunction with job or jobs parameters. (default: ‘default’)
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition]]) – The job to be executed when the sensor fires.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
dagster.
SensorDefinition
(name=None, evaluation_fn=None, pipeline_name=None, solid_selection=None, mode=None, minimum_interval_seconds=None, description=None, job=None, jobs=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Define a sensor that initiates a set of runs based on some external state
evaluation_fn (Callable[[SensorEvaluationContext]]) –
The core evaluation function for the
sensor, which is run at an interval to determine whether a run should be launched or
not. Takes a SensorEvaluationContext
.
This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.
name (Optional[str]) – The name of the sensor to create. Defaults to name of evaluation_fn
pipeline_name (Optional[str]) – (legacy) The name of the pipeline to execute when the sensor fires. Cannot be used in conjunction with job or jobs parameters.
solid_selection (Optional[List[str]]) – (legacy) A list of solid subselection (including single
solid names) to execute when the sensor runs. e.g. ['*some_solid+', 'other_solid']
.
Cannot be used in conjunction with job or jobs parameters.
mode (Optional[str]) – (legacy) The mode to apply when executing runs triggered by this sensor. Cannot be used in conjunction with job or jobs parameters. (default: ‘default’)
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[GraphDefinition, JobDefinition]) – The job to execute when this sensor fires.
jobs (Optional[Sequence[GraphDefinition, JobDefinition]]) – (experimental) A list of jobs to execute when this sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
dagster.
SensorEvaluationContext
(instance_ref, last_completion_time, last_run_key, cursor, repository_name, instance=None)[source]¶Sensor execution context.
An instance of this class is made available as the first argument to the evaluation function on SensorDefinition.
instance_ref
¶The serialized instance configured to run the schedule
Optional[InstanceRef]
cursor
¶The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest
Optional[str]
last_run_key
¶DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor attribute instead.
instance
¶The deserialized instance can also be passed in directly (primarily useful in testing contexts).
Optional[DagsterInstance]
dagster.
build_sensor_context
(instance=None, cursor=None, repository_name=None)[source]¶Builds sensor execution context using the provided parameters.
This function can be used to provide a context to the invocation of a sensor definition.If provided, the dagster instance must be persistent; DagsterInstance.ephemeral() will result in an error.
instance (Optional[DagsterInstance]) – The dagster instance configured to run the sensor.
cursor (Optional[str]) – A cursor value to provide to the evaluation of the sensor.
repository_name (Optional[str]) – The name of the repository that the sensor belongs to.
Examples
context = build_sensor_context()
my_sensor(context)
dagster.
AssetSensorDefinition
(name, asset_key, pipeline_name, asset_materialization_fn, solid_selection=None, mode=None, minimum_interval_seconds=None, description=None, job=None, jobs=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Define an asset sensor that initiates a set of runs based on the materialization of a given asset.
name (str) – The name of the sensor to create.
asset_key (AssetKey) – The asset_key this sensor monitors.
pipeline_name (Optional[str]) – (legacy) The name of the pipeline to execute when the sensor fires. Cannot be used in conjunction with job or jobs parameters.
asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntry], Union[Iterator[Union[RunRequest, SkipReason]], RunRequest, SkipReason]]) –
The core
evaluation function for the sensor, which is run at an interval to determine whether a
run should be launched or not. Takes a SensorEvaluationContext
and
an EventLogEntry corresponding to an AssetMaterialization event.
This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.
solid_selection (Optional[List[str]]) – (legacy) A list of solid subselection (including single
solid names) to execute when the sensor runs. e.g. ['*some_solid+', 'other_solid']
.
Cannot be used in conjunction with job or jobs parameters.
mode (Optional[str]) – (legacy) The mode to apply when executing runs triggered by this sensor. (default: ‘default’). Cannot be used in conjunction with job or jobs parameters.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition]]) – The job object to target with this sensor.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
@
dagster.
asset_sensor
(asset_key, pipeline_name=None, name=None, solid_selection=None, mode=None, minimum_interval_seconds=None, description=None, job=None, jobs=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Creates an asset sensor where the decorated function is used as the asset sensor’s evaluation function. The decorated function may:
Return a RunRequest object.
Return a list of RunRequest objects.
Return a SkipReason object, providing a descriptive message of why no runs were requested.
Return nothing (skipping without providing a reason)
Yield a SkipReason or yield one ore more RunRequest objects.
Takes a SensorEvaluationContext
and an EventLogEntry corresponding to an
AssetMaterialization event.
asset_key (AssetKey) – The asset_key this sensor monitors.
pipeline_name (Optional[str]) – (legacy) Name of the target pipeline. Cannot be used in conjunction with job or jobs parameters.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
solid_selection (Optional[List[str]]) – (legacy) A list of solid subselection (including single
solid names) to execute for runs for this sensor e.g.
['*some_solid+', 'other_solid']
. Cannot be used in conjunction with job or jobs
parameters.
mode (Optional[str]) – (legacy) The mode to apply when executing runs for this sensor. Cannot be used in conjunction with job or jobs parameters. (default: ‘default’)
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition]]) – The job to be executed when the sensor fires.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
dagster.
RunStatusSensorDefinition
(name, pipeline_run_status, run_status_sensor_fn, pipeline_selection=None, minimum_interval_seconds=None, description=None, job_selection=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Define a sensor that reacts to a given status of pipeline execution, where the decorated function will be evaluated when a run is at the given status.
name (str) – The name of the sensor. Defaults to the name of the decorated function.
pipeline_run_status (PipelineRunStatus) – The status of a run which will be monitored by the sensor.
run_status_sensor_fn (Callable[[RunStatusSensorContext], Union[SkipReason, PipelineRunReaction]]) – The core
evaluation function for the sensor. Takes a RunStatusSensorContext
.
pipeline_selection (Optional[List[str]]) – (legacy) Names of the pipelines that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job_selection (Optional[List[Union[JobDefinition, GraphDefinition]]]) – The jobs that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
dagster.
RunStatusSensorContext
(sensor_name, dagster_run, dagster_event, instance)[source]¶The context
object available to a decorated function of run_status_sensor
.
dagster_run
¶the run of the job or pipeline.
DagsterRun
dagster_event
¶the event associated with the job or pipeline run status.
instance
¶the current instance.
dagster.
RunFailureSensorContext
(sensor_name, dagster_run, dagster_event, instance)[source]¶The context
object available to a decorated function of run_failure_sensor
.
pipeline_run
¶the failed pipeline run.
failure_event
¶the pipeline failure event.
dagster.
build_run_status_sensor_context
(sensor_name, dagster_event, dagster_instance, dagster_run)[source]¶Builds run status sensor context from provided parameters.
This function can be used to provide the context argument when directly invoking a function decorated with @run_status_sensor or @run_failure_sensor, such as when writing unit tests.
sensor_name (str) – The name of the sensor the context is being constructed for.
dagster_event (DagsterEvent) – A DagsterEvent with the same event type as the one that triggers the run_status_sensor
dagster_instance (DagsterInstance) – The dagster instance configured for the context.
dagster_run (DagsterRun) – DagsterRun object from running a job
Examples
instance = DagsterInstance.ephemeral()
result = my_job.execute_in_process(instance=instance)
dagster_run = result.dagster_run
dagster_event = result.get_job_success_event() # or get_job_failure_event()
context = build_run_status_sensor_context(
sensor_name="run_status_sensor_to_invoke",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,
)
run_status_sensor_to_invoke(context)
@
dagster.
run_status_sensor
(pipeline_run_status, pipeline_selection=None, name=None, minimum_interval_seconds=None, description=None, job_selection=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Creates a sensor that reacts to a given status of pipeline execution, where the decorated function will be run when a pipeline is at the given status.
Takes a RunStatusSensorContext
.
pipeline_run_status (PipelineRunStatus) – The status of pipeline execution which will be monitored by the sensor.
pipeline_selection (Optional[List[str]]) – Names of the pipelines that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job_selection (Optional[List[Union[PipelineDefinition, GraphDefinition]]]) – Jobs that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
@
dagster.
run_failure_sensor
(name=None, minimum_interval_seconds=None, description=None, job_selection=None, pipeline_selection=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Creates a sensor that reacts to job failure events, where the decorated function will be run when a run fails.
Takes a RunFailureSensorContext
.
name (Optional[str]) – The name of the job failure sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job_selection (Optional[List[Union[JobDefinition, GraphDefinition]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
pipeline_selection (Optional[List[str]]) – (legacy) Names of the pipelines that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
@
dagster.
pipeline_failure_sensor
(name=None, minimum_interval_seconds=None, description=None, pipeline_selection=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Creates a sensor that reacts to pipeline failure events, where the decorated function will be run when a pipeline run fails.
Takes a PipelineFailureSensorContext
.
name (Optional[str]) – The name of the pipeline failure sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
pipeline_selection (Optional[List[str]]) – Names of the pipelines that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.