dagster_msteams.
msteams_resource
ResourceDefinition[source]¶To send messages to MS Teams channel, an incoming webhook has to be created. The incoming webhook url must be given as a part of the resource config to the msteams_resource in dagster.
Default Value: 60
This resource is for connecting to Microsoft Teams.
The resource object is a dagster_msteams.TeamsClient.
By configuring this resource, you can post messages to MS Teams from any Dagster solid:
Examples:
import os
from dagster import ModeDefinition, execute_pipeline, pipeline, solid
from dagster_msteams import Card, msteams_resource
@solid(required_resource_keys={"msteams"})
def teams_solid(context):
card = Card()
card.add_attachment(text_message="Hello There !!")
context.resources.msteams.post_message(payload=card.payload)
@pipeline(
mode_defs=[ModeDefinition(resource_defs={"msteams": msteams_resource})],
)
def teams_pipeline():
teams_solid()
execute_pipeline(
teams_pipeline,
{"resources": {"msteams": {"config": {"hook_url": os.getenv("TEAMS_WEBHOOK_URL")}}}},
)
dagster_msteams.
teams_on_failure
HookDefinition[source]¶Create a hook on step failure events that will message the given MS Teams webhook URL.
message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific pipeline run that triggered the hook.
Examples
@teams_on_failure(dagit_base_url="http://localhost:3000")
@pipeline(...)
def my_pipeline():
pass
def my_message_fn(context: HookContext) -> str:
return "Solid {solid_name} failed!".format(
solid_name=context.solid
)
@solid
def a_solid(context):
pass
@pipeline(...)
def my_pipeline():
a_solid.with_hooks(hook_defs={teams_on_failure("#foo", my_message_fn)})
dagster_msteams.
teams_on_success
HookDefinition[source]¶Create a hook on step success events that will message the given MS Teams webhook URL.
message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific pipeline run that triggered the hook.
Examples
@teams_on_success(dagit_base_url="http://localhost:3000")
@pipeline(...)
def my_pipeline():
pass
def my_message_fn(context: HookContext) -> str:
return "Solid {solid_name} failed!".format(
solid_name=context.solid
)
@solid
def a_solid(context):
pass
@pipeline(...)
def my_pipeline():
a_solid.with_hooks(hook_defs={teams_on_success("#foo", my_message_fn)})
dagster_msteams.
make_teams_on_pipeline_failure_sensor
(hook_url, message_fn=<function _default_failure_message>, http_proxy=None, https_proxy=None, timeout=60, verify=None, name=None, dagit_base_url=None, default_status=<DefaultSensorStatus.STOPPED: 'STOPPED'>)[source]¶Create a sensor on pipeline failures that will message the given MS Teams webhook URL.
hook_url (str) – MS Teams incoming webhook URL.
message_fn (Optional(Callable[[PipelineFailureSensorContext], str])) – Function which
takes in the PipelineFailureSensorContext
and outputs the message you want to send.
Defaults to a text message that contains error message, pipeline name, and run ID.
http_proxy – (Optional[str]): Proxy for requests using http protocol.
https_proxy – (Optional[str]): Proxy for requests using https protocol.
timeout – (Optional[float]): Connection timeout in seconds. Defaults to 60.
verify – (Optional[bool]): Whether to verify the servers TLS certificate.
name – (Optional[str]): The name of the sensor. Defaults to “teams_on_pipeline_failure”.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed pipeline run.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Examples
teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)
@repository
def my_repo():
return [my_pipeline + teams_on_pipeline_failure]
def my_message_fn(context: PipelineFailureSensorContext) -> str:
return "Pipeline {pipeline_name} failed! Error: {error}".format(
pipeline_name=context.pipeline_run.pipeline_name,
error=context.failure_event.message,
)
teams_on_pipeline_failure = make_teams_on_pipeline_failure_sensor(
hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
message_fn=my_message_fn,
dagit_base_url="http://localhost:3000",
)