MixtrainDocsBlog
from mixtrain import (
    MixRoutine,
    RoutineInvocationError,
    on_dataset_append,
    on_schedule,
    on_workflow_success,
    on_workflow_failure,
    on_complete,
)

MixRoutine is like a workflow that runs on a trigger. It supports the same setup(), run(), cleanup(), inputs, outputs, and sandbox configuration as MixFlow. Triggers can be dataset changes, schedules, or workflow completions.

Basic Structure

from mixtrain import MixRoutine, on_schedule


class HelloRoutine(MixRoutine):
    def run(self, trigger=on_schedule(every="1h"), word: str = "world"):
        print(f"hello {word}")

The trigger is declared as the default value of exactly one run() parameter. Other parameters must have defaults and become configurable inputs.

Validation Rules

RequirementError
Exactly one trigger parameterTypeError at class definition or spec extraction
Non-trigger parameters must have defaultsTypeError
on_schedule() must receive exactly one of cron or everyValueError
Trigger-created runs must receive a fired event payloadRoutineInvocationError if called directly without one

Dataset Triggers

on_dataset_append()

on_dataset_append(
    name: str,
    *,
    batch_rows: int | None = None,
    added_fraction: float | None = None,
) -> DatasetEvent

Fires when rows are appended to name.

from mixtrain import MixRoutine, on_dataset_append


class EmbedNewRows(MixRoutine):
    def run(self, rows=on_dataset_append("photos", batch_rows=100)):
        for batch in rows:
            print(batch.num_rows)

batch_rows and added_fraction allow for accumulation of changes before the trigger fires. This trigger will only fire when 100 new rows are appended to the photos dataset from the previous trigger fire.

DatasetEvent

class DatasetEvent(Event):
    dataset: str
    on: str
    threshold: dict | None
    operation: str | None
    from_version: int | None
    to_version: int | None
    added_records: int | None

    def added_rows(self) -> Iterator[pyarrow.RecordBatch]: ...

from_version is exclusive and to_version is inclusive.

Dataset events are iterable:

def run(self, event=on_dataset_append("photos")):
    for batch in event:
        process(batch)

Schedule Triggers

on_schedule()

on_schedule(
    cron: str | None = None,
    *,
    every: str | None = None,
    tz: str = "UTC",
) -> ScheduleEvent

Provide exactly one of cron or every. tz is optional and defaults to UTC.

class Nightly(MixRoutine):
    def run(self, trigger=on_schedule("0 3 * * *")):
        ...


class Hourly(MixRoutine):
    def run(self, trigger=on_schedule(every="1h")):
        ...

every accepts one or more integer duration parts with units: s seconds, m minutes, h hours, d days, or w weeks. Examples: 10s, 30m, 1h, 2d, 1w, 1h30m.

ScheduleEvent

class ScheduleEvent(Event):
    cron: str | None
    every: str | None
    tz: str

fired_at is the schedule firing time.

Completion Triggers

on_workflow_success()

on_workflow_success(name: str) -> CompletionEvent

Fires when the named workflow succeeds.

on_workflow_failure()

on_workflow_failure(name: str) -> CompletionEvent

Fires when the named workflow fails.

on_complete()

on_complete(
    name: str,
    *,
    status: str = "success",
) -> CompletionEvent

On complete triggers fire when the named workflow completes with a given status. status can be success, failure, or any. (Completion triggers currently watch workflows only.)

CompletionEvent

class CompletionEvent(Event):
    resource: str
    kind: str
    status: str
    run_number: int | None
    outputs: dict | None

Use run_number and outputs to chain work from the upstream run.

Base Event

All events share:

class Event:
    type: str
    source: str
    fired_at: datetime | None

    def to_dict(self) -> dict: ...
    @classmethod
    def from_dict(cls, data: dict) -> Event: ...

fired_at is None for trigger declarations returned by on_* constructors. Fired events have fired_at set and include runtime fields populated by Mixtrain.

Creating Routines

Create a routine with the routine CLI:

mixtrain routine create ./routine-dir --name routine-name

Use --entrypoint when the upload contains more than one candidate class:

mixtrain routine create . --name routine-name --entrypoint routines/jobs.py:RoutineClass

On this page