from mixtrain import (
MixRoutine,
RoutineInvocationError,
on_dataset_append,
on_schedule,
on_workflow_success,
on_workflow_failure,
on_complete,
)MixRoutine is like a workflow that runs on a trigger. It supports the same setup(), run(), cleanup(), inputs, outputs, and sandbox configuration as MixFlow. Triggers can be dataset changes, schedules, or workflow completions.
Basic Structure
from mixtrain import MixRoutine, on_schedule
class HelloRoutine(MixRoutine):
def run(self, trigger=on_schedule(every="1h"), word: str = "world"):
print(f"hello {word}")The trigger is declared as the default value of exactly one run() parameter. Other parameters must have defaults and become configurable inputs.
Validation Rules
| Requirement | Error |
|---|---|
| Exactly one trigger parameter | TypeError at class definition or spec extraction |
| Non-trigger parameters must have defaults | TypeError |
on_schedule() must receive exactly one of cron or every | ValueError |
| Trigger-created runs must receive a fired event payload | RoutineInvocationError if called directly without one |
Dataset Triggers
on_dataset_append()
on_dataset_append(
name: str,
*,
batch_rows: int | None = None,
added_fraction: float | None = None,
) -> DatasetEventFires when rows are appended to name.
from mixtrain import MixRoutine, on_dataset_append
class EmbedNewRows(MixRoutine):
def run(self, rows=on_dataset_append("photos", batch_rows=100)):
for batch in rows:
print(batch.num_rows)batch_rows and added_fraction allow for accumulation of changes before the trigger fires. This trigger will only fire when 100 new rows are appended to the photos dataset from the previous trigger fire.
DatasetEvent
class DatasetEvent(Event):
dataset: str
on: str
threshold: dict | None
operation: str | None
from_version: int | None
to_version: int | None
added_records: int | None
def added_rows(self) -> Iterator[pyarrow.RecordBatch]: ...from_version is exclusive and to_version is inclusive.
Dataset events are iterable:
def run(self, event=on_dataset_append("photos")):
for batch in event:
process(batch)Schedule Triggers
on_schedule()
on_schedule(
cron: str | None = None,
*,
every: str | None = None,
tz: str = "UTC",
) -> ScheduleEventProvide exactly one of cron or every. tz is optional and defaults to UTC.
class Nightly(MixRoutine):
def run(self, trigger=on_schedule("0 3 * * *")):
...
class Hourly(MixRoutine):
def run(self, trigger=on_schedule(every="1h")):
...every accepts one or more integer duration parts with units: s seconds, m minutes, h hours, d days, or w weeks. Examples: 10s, 30m, 1h, 2d, 1w, 1h30m.
ScheduleEvent
class ScheduleEvent(Event):
cron: str | None
every: str | None
tz: strfired_at is the schedule firing time.
Completion Triggers
on_workflow_success()
on_workflow_success(name: str) -> CompletionEventFires when the named workflow succeeds.
on_workflow_failure()
on_workflow_failure(name: str) -> CompletionEventFires when the named workflow fails.
on_complete()
on_complete(
name: str,
*,
status: str = "success",
) -> CompletionEventOn complete triggers fire when the named workflow completes with a given status. status can be success, failure, or any. (Completion triggers currently watch workflows only.)
CompletionEvent
class CompletionEvent(Event):
resource: str
kind: str
status: str
run_number: int | None
outputs: dict | NoneUse run_number and outputs to chain work from the upstream run.
Base Event
All events share:
class Event:
type: str
source: str
fired_at: datetime | None
def to_dict(self) -> dict: ...
@classmethod
def from_dict(cls, data: dict) -> Event: ...fired_at is None for trigger declarations returned by on_* constructors. Fired events have fired_at set and include runtime fields populated by Mixtrain.
Creating Routines
Create a routine with the routine CLI:
mixtrain routine create ./routine-dir --name routine-nameUse --entrypoint when the upload contains more than one candidate class:
mixtrain routine create . --name routine-name --entrypoint routines/jobs.py:RoutineClass