An orchestration platform for the development, production, and observation of data assets.
APACHE-2.0 License
Bot releases are hidden (Show)
The Python GraphQL client now includes a shutdown_repository_location API call that shuts down a gRPC server. This is useful in situations where you want Kubernetes to restart your server and re-create your repository definitions, even though the underlying Python code hasn’t changed (for example, if your pipelines are loaded programatically from a database)
io_manager_key and root_manager_key is disallowed on composite solids’ InputDefinitions and OutputDefinitions. Instead, custom IO managers on the solids inside composite solids will be respected:
@solid(input_defs=[InputDefinition("data", dagster_type=str, root_manager_key="my_root")])
def inner_solid(_, data):
return data
@composite_solid
def my_composite():
return inner_solid()
Schedules can now be directly invoked. This is intended to be used for testing. To learn more, see https://docs.dagster.io/master/concepts/partitions-schedules-sensors/schedules#testing-schedules
dagster-postgres
or dagster-graphql
) are now pinned to the same version as the core dagster
package. This should reduce instances of issues due to backwards compatibility problems between Dagster packages.Invoking a generator solid now yields a generator, and output objects are not unpacked.
@solid
def my_solid():
yield Output("hello")
assert isinstance(list(my_solid())[0], Output)
EcsRunLauncher
. This creates a new ECS Task Definition and launches a new ECS Task for each run. You can use the new ECS Reference Deployment to experiment with the EcsRunLauncher
. We’d love your feedback in our #dagster-ecs Slack channel!S3ComputeLogManager
now takes a boolean config argument skip_empty_files
, which skips uploading empty log files to S3. This should enable a work around of timeout errors when using the S3ComputeLogManager to persist logs to MinIO object storage.dagit
with flag --suppress-warnings
will now ignore all warnings, such as ExperimentalWarnings.dagster-dbt
library with some helpful tips and example code (thanks @makotonium!).dagster-pyspark
documentation for providing and accessing the pyspark resource (thanks @Andrew-Crosby!).Published by gibsondan over 3 years ago
examples/hacker_news
!retry_number
is now available on SolidExecutionContext
, allowing you to determine within a solid function how many times the solid has been previously retried.PIPELINE_INIT_FAILURE
event type. A failure that occurs during pipeline initialization will now produce a PIPELINE_FAILURE
as with all other pipeline failures.get_run_status
method on the Python GraphQL client now returns a PipelineRunStatus
enum instead of the raw string value in order to align with the mypy type annotation. Thanks to Dylan Bienstock for surfacing this bug!k8s_job_executor
, which executes solids in separate kubernetes jobs. With the addition of this executor, you can now choose at runtime between single pod and multi-pod isolation for solids in your run. Previously this was only configurable for the entire deployment - you could either use the K8sRunLauncher with the default executors (in_process and multiprocess) for low isolation, or you could use the CeleryK8sRunLauncher with the celery_k8s_job_executor for pod-level isolation. Now, your instance can be configured with the K8sRunLauncher and you can choose between the default executors or the k8s_job_executor.DagsterGraphQLClient
now allows you to specify whether to use HTTP or HTTPS when connecting to the GraphQL server. In addition, error messages during query execution or connecting to dagit are now clearer. Thanks to @emily-hawkins for raising this issue! from dagster import build_hook_context
my_hook(build_hook_context(resources={"foo_resource": "foo"}))
from dagster import build_init_resource_context
@resource(config_schema=str)
def my_basic_resource(init_context):
return init_context.resource_config
context = build_init_resource_context(config="foo")
assert my_basic_resource(context) == "foo"
ScheduleDefinition
and SensorDefinition
now carry over properties from functions decorated by @sensor
and @schedule
. Ie: docstrings.ResourceDefinition
was not being passed to the ResourceDefinition
created by the call to configured
.IOManager
handle_output
implementation that was a generator, it would not be wrapped DagsterExecutionHandleOutputError
. Now, it is wrapped.queuedRunCoordinator
. See the docs for more information on setup.RetryPolicy
now supports backoff and jitter settings, to allow for modulating the delay
as a function of attempt number and randomness.build_schedule_context
and validate_run_config
functions are still in an experimental state. https://docs.dagster.io/master/concepts/partitions-schedules-sensors/schedules#testing-schedules
validate_run_config
function is still in an experimental state. https://docs.dagster.io/master/concepts/partitions-schedules-sensors/partitions#experimental-testing-a-partition-set
dagit.enableReadOnly
. When enabled, a separate Dagit instance is deployed in —read-only
mode. You can use this feature to serve Dagit to users who you do not want to able to kick off new runs or make other changes to application state.EventMetadata.asset
.LOGS_CAPTURED
, which explicitly links to the captured stdout/stderr logs for a given step, as determined by the configured ComputeLogManager
on the Dagster instance. Previously, these links were available on the STEP_START
event.network
key on DockerRunLauncher
config can now be sourced from an environment variable.get_execution_data
method of SensorDefinition
and ScheduleDefinition
has been renamed to evaluate_tick
. We expect few to no users of the previous name, and are renaming to prepare for improved testing support for schedules and sensors.build_sensor_context
API. See Testing sensors.dagster.Int
) as type annotations on functions decorated with @solid
have been resolved.K8sRunLauncher
sometimes hanged while launching a run due to holding a stale Kubernetes client.context.update_cursor(str_value)
that is persisted across evaluations to save unnecessary computation. This persisted string value is made available on the context as context.cursor
. Previously, we encouraged cursor-like behavior by exposing last_run_key
on the sensor context, to keep track of the last time the sensor successfully requested a run. This, however, was not useful for avoiding unnecessary computation when the sensor evaluation did not result in a run request.--read-only
mode, which will disable mutations in the user interface and on the server. You can use this feature to run instances of Dagit that are visible to users who you do not want to able to kick off new runs or make other changes to application state.dagster-pandas
, the event_metadata_fn
parameter to the function create_dagster_pandas_dataframe_type
may now return a dictionary of EventMetadata
values, keyed by their string labels. This should now be consistent with the parameters accepted by Dagster events, including the TypeCheck
event.# old
MyDataFrame = create_dagster_pandas_dataframe_type(
"MyDataFrame",
event_metadata_fn=lambda df: [
EventMetadataEntry.int(len(df), "number of rows"),
EventMetadataEntry.int(len(df.columns), "number of columns"),
]
)
# new
MyDataFrame = create_dagster_pandas_dataframe_type(
"MyDataFrame",
event_metadata_fn=lambda df: {
"number of rows": len(df),
"number of columns": len(dataframe.columns),
},
)
PandasColumn.datetime_column()
now has a new tz
parameter, allowing you to constrain the column to a specific timezone (thanks @mrdavidlaing
!)DagsterGraphQLClient
now takes in an optional transport
argument, which may be useful in cases where you need to authenticate your GQL requests:authed_client = DagsterGraphQLClient(
"my_dagit_url.com",
transport=RequestsHTTPTransport(..., auth=<some auth>),
)
ecr_public_resource
to get login credentials for the AWS ECR Public Gallery. This is useful if any of your pipelines need to push images.max_completion_wait_time_seconds
configuration option, which controls how long it will wait for a Databricks job to complete before exiting.build_solid_context
function can be used to provide a context to the invocation.from dagster import build_solid_context
@solid
def basic_solid():
return "foo"
assert basic_solid() == 5
@solid
def add_one(x):
return x + 1
assert add_one(5) == 6
@solid(required_resource_keys={"foo_resource"})
def solid_reqs_resources(context):
return context.resources.foo_resource + "bar"
context = build_solid_context(resources={"foo_resource": "foo"})
assert solid_reqs_resources(context) == "foobar"
build_schedule_context
allows you to build a ScheduleExecutionContext
using a DagsterInstance
. This can be used to test schedules.from dagster import build_schedule_context
with DagsterInstance.get() as instance:
context = build_schedule_context(instance)
my_schedule.get_execution_data(context)
build_sensor_context
allows you to build a SensorExecutionContext
using a DagsterInstance
. This can be used to test sensors.
from dagster import build_sensor_context
with DagsterInstance.get() as instance:
context = build_sensor_context(instance)
my_sensor.get_execution_data(context)
build_input_context
and build_output_context
allow you to construct InputContext
and OutputContext
respectively. This can be used to test IO managers.from dagster import build_input_context, build_output_context
io_manager = MyIoManager()
io_manager.load_input(build_input_context())
io_manager.handle_output(build_output_context(), val)
build_input_context
/build_output_context
must be used as a context manager.with build_input_context(resources={"cm_resource": my_cm_resource}) as context:
io_manager.load_input(context)
validate_run_config
can be used to validate a run config blob against a pipeline definition & mode. If the run config is invalid for the pipeline and mode, this function will throw an error, and if correct, this function will return a dictionary representing the validated run config that Dagster uses during execution.validate_run_config(
{"solids": {"a": {"config": {"foo": "bar"}}}},
pipeline_contains_a
) # usage for pipeline that requires config
validate_run_config(
pipeline_no_required_config
) # usage for pipeline that has no required config
RetryPolicy
has been added. This allows you to declare automatic retry behavior when exceptions occur during solid execution. You can set retry_policy
on a solid invocation, @solid
definition, or @pipeline
definition.@solid(retry_policy=RetryPolicy(max_retries=3, delay=5))
def fickle_solid(): # ...
@pipeline( # set a default policy for all solids
solid_retry_policy=RetryPolicy()
)
def my_pipeline(): # will use the pipelines policy by default
some_solid()
# solid definition takes precedence over pipeline default
fickle_solid()
# invocation setting takes precedence over definition
fickle_solid.with_retry_policy(RetryPolicy(max_retries=2))
dagster/priority
tag directly on pipeline definitions would cause an error. This has been fixed.create_dagster_pandas_dataframe_type()
function would, in some scenarios, not use the specified materializer
argument when provided. This has been fixed (thanks @drewsonne
!)dagster-graphql --remote
now sends the query and variables as post body data, avoiding uri length limit issues.sqlalchemy.Engine
objects would be invalidated after 8 hours of idle time due to MySQL’s default configuration, resulting in an sqlalchemy.exc.OperationalError
when attempting to view pages in Dagit in long-running deployments.asOf
URL parameter, which shows a snapshot of the asset at the provided timestamp, including parent materializations as of that time.make dev_install
has been fixed.reload_repository_location
and submit_pipeline_execution
have been fixed - the underlying GraphQL queries had a missing inline fragment case.@solid
decorator can now wrap a function without a context
argument, if no context information is required. For example, you can now do:@solid
def basic_solid():
return 5
@solid
def solid_with_inputs(x, y):
return x + y
however, if your solid requires config or resources, then you will receive an error at definition time.
metadata_entries
argument may now instead accept a metadata
argument, which should allow for a more convenient API. The metadata
argument takes a dictionary with string labels as keys and EventMetadata
values. Some base types (str
, int
, float
, and JSON-serializable list
/dict
s) are also accepted as values and will be automatically coerced to the appropriate EventMetadata
value. For example:@solid
def old_metadata_entries_solid(df):
yield AssetMaterialization(
"my_asset",
metadata_entries=[
EventMetadataEntry.text("users_table", "table name"),
EventMetadataEntry.int(len(df), "row count"),
EventMetadataEntry.url("http://mysite/users_table", "data url")
]
)
@solid
def new_metadata_solid(df):
yield AssetMaterialization(
"my_asset",
metadata={
"table name": "users_table",
"row count": len(df),
"data url": EventMetadata.url("http://mysite/users_table")
}
)
--heartbeat-tolerance
argument that allows you to configure how long the process can run before shutting itself down due to a hanging thread. This parameter can be used to troubleshoot failures with the daemon process.PartitionSetDefinition.create_schedule_definition
, the partition_selector
function that determines which partition to use for a given schedule tick can now return a list of partitions or a single partition, allowing you to create schedules that create multiple runs for each schedule tick.KeyError
.Dict
and Set
types for solid inputs/outputs now works as expected. Previously a structure like Dict[str, Dict[str, Dict[str, SomeClass]]]
could result in confusing errors.solid_config
.map
and collect
steps downstream of other map
and collect
steps to mysteriously not execute when using multiprocess executors has been resolved.solid_exception
on HookContext
which returns the actual exception object thrown in a failed solid. See the example “Accessing failure information in a failure hook“ for more details.solid_output_values
on HookContext
which returns the computed output values.make_values_resource
helper for defining a resource that passes in user-defined values. This is useful when you want multiple solids to share values. See the example for more details.--path-prefix
, our color-coded favicons denoting the success or failure of a run were not loading properly. This has been fixed.DagsterInstance.get()
no longer falls back to an ephemeral instance if DAGSTER_HOME
is not set. We don’t expect this to break normal workflows. This change allows our tooling to be more consistent around it’s expectations. If you were relying on getting an ephemeral instance you can use DagsterInstance.ephemeral()
directly.HookContext
have been removed. step_key
and mode_def
have been documented as attributes.config_schema
for all configurable objects - solids, resources, IO managers, composite solids, executors, loggers - is now Any
. This means that you can now use configuration without explicitly providing a config_schema
. Refer to the docs for more details: https://docs.dagster.io/concepts/configuration/config-schema.input_defs
and output_defs
on @solid
will now flexibly combine data that can be inferred from the function signature that is not declared explicitly via InputDefinition
/ OutputDefinition
. This allows for more concise defining of solids with reduced repetition of information.dagster-daemon
process within the last 5 minutes. Previously, it would only display errors from the last 30 seconds.dagster-daemon
process.DockerRunLauncher
now accepts a container_kwargs
config parameter, allowing you to specify any argument to the run container that can be passed into the Docker containers.run method. See https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run for the full list of available options.celery_k8s_job_executor
now accepts a job_wait_timeout
allowing you to override the default of 24 hours.--prefix-path
argument.Requested
state.output_config_schema
or input_config_schema
arguments of @io_manager
, the config would still be required. Now, the config is not required.Assets
catalog, where the view switcher would flip back from the directory view to the flat view when navigating into subdirectories.dagster-daemon
process would crash if it experienced a transient connection error while connecting to the Dagster database.dagster-airflow scaffold
command would raise an exception if a preset was specified.