This example demonstrates how to use the GE op factory dagster-ge
to test incoming data against a set of expectations built through Great Expectations' tooling.
For this example, we'll be using two versions of a dataset of baseball team payroll and wins, with one version modified to hold incorrect data.
You can use ge_validation_op_factory
to generate Dagster ops that integrate with Great Expectations. For example, here we show a basic call to this GE op factory, with two required arguments: datasource_name
and expectation suite_name
.
payroll_expectations = ge_validation_op_factory(
name="ge_validation_op", datasource_name="getest", suite_name="basic.warning"
)
The GE validations will happen inside the ops created above. Each of the ops will yield an ExpectationResult
with a structured dict of metadata from the GE suite. The structured metadata contain both summary stats from the suite and expectation by expectation results. The op will output the full result in case you want to process it differently. Here's how other ops could use the full result, where expectation
is the result:
@op
def postprocess_payroll(numrows, expectation):
if expectation["success"]:
return numrows
else:
raise ValueError
You can configure the GE Data Context via the ge_data_context
resource from dagster-ge
integration package. All we need to do to expose GE to Dagster is to provide the root of the GE directory (the path to the great_expectations file on your machine).
Finally, here's the full job definition using the GE op, with a default run configuration to use the correct set of data:
@job(
resource_defs={"ge_data_context": ge_data_context},
config={
"resources": {
"ge_data_context": {
"config": {"ge_root_dir": file_relative_path(__file__, "./great_expectations")}
}
},
"ops": {
"read_in_datafile": {
"inputs": {
"csv_path": {"value": file_relative_path(__file__, "./data/succeed.csv")}
}
}
},
},
)
def payroll_data():
output_df = read_in_datafile()
postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))
We can see that we can easily swap the path for succeed.csv
with fail.csv
to exercise our job with incorrect data.