MixtrainDocsBlog

Routines are workflows that start automatically from dataset, schedule, or completion triggers. Use routines for recurring jobs such as embedding newly appended rows, scheduled evaluation, retraining after a data refresh, or chaining work after another workflow succeeds.

Build a routine by extending MixRoutine and declaring trigger for routine in the run() signature.

Quick Start

from mixtrain import MixRoutine, on_dataset_append


class EmbedNewRows(MixRoutine):
    def run(
        self,
        new_rows=on_dataset_append("photos", batch_rows=100),
        batch_size: int = 256,
    ):
        for batch in new_rows:
            print(f"Process {batch.num_rows} new rows")

Create it with the routine command:

mixtrain routine create ./embed-new-rows --name embed-new-rows

This will create a new routine, which will trigger when rows are appended to the photos dataset. You can view the routine, run history, and run manually via the CLI, SDK or the web app.

Trigger Types

Dataset Append

Dataset triggers fire when rows are appended to a dataset. Declare them with on_dataset_append(). The trigger parameter receives a DatasetEvent; iterating it streams only the rows added since the previous firing.

from mixtrain import MixRoutine, on_dataset_append


class IndexDocuments(MixRoutine):
    def run(
        self,
        rows=on_dataset_append("documents", batch_rows=500),
        index_name: str = "main",
    ):
        for batch in rows:
            upsert_to_index(index_name, batch)

batch_rows delays firing until at least that many new rows have accumulated. added_fraction can also be used to fire after a fraction of the dataset has been added:

rows=on_dataset_append("training-data", added_fraction=0.05)

DatasetEvent includes from_version, to_version, and added_records.

Schedule

Schedule triggers run from a cron expression or an interval. Provide exactly one of cron or every.

from mixtrain import MixRoutine, on_schedule


class NightlyEval(MixRoutine):
    def run(self, trigger=on_schedule("0 3 * * *"), model: str = "candidate"):
        run_eval(model)
class HourlyHealthCheck(MixRoutine):
    def run(self, trigger=on_schedule(every="1h")):
        check_outputs()

every accepts one or more integer duration parts with units: s seconds, m minutes, h hours, d days, or w weeks. Examples: 10s, 30m, 1h, 2d, 1w, 1h30m. Timezone is set with tz on on_schedule and defaults to UTC.

Completion

Completion triggers run after another resource reaches a terminal status.

from mixtrain import MixRoutine, on_workflow_success


class EvaluateAfterTrain(MixRoutine):
    def run(self, trigger=on_workflow_success("train-model"), eval_name: str = "regression"):
        print(f"Training run {trigger.run_number} finished")
        start_eval(eval_name, trigger.outputs)

Use on_workflow_failure("name") for failure handling, or on_complete("name", status="any") for the generic form.

Routine Contract

A MixRoutine class follows the MixFlow lifecycle (setup(), run(), cleanup()), with these additional rules:

RuleWhy
Exactly one run() parameter must default to an on_* trigger constructorThis is the routine trigger
All other run() parameters must have defaultsThese become configurable inputs for trigger-created runs
The trigger parameter is supplied by Mixtrain at runtimeDirect local calls without a fired event raise RoutineInvocationError
from mixtrain import MixRoutine, on_schedule


class GoodRoutine(MixRoutine):
    def run(self, trigger=on_schedule(every="1h"), limit: int = 100):
        ...

The platform creates trigger runs with the configured defaults. Manual runs from the app use the same event payload shape and mark the source as manual.

Reading New Dataset Rows

For dataset append routines, the event is iterable:

def run(self, new_rows=on_dataset_append("photos")):
    for batch in new_rows:
        process(batch)

You can also call added_rows() explicitly:

def run(self, event=on_dataset_append("photos")):
    rows = event.added_rows()
    for batch in rows:
        process(batch)

Under the hood, Mixtrain tracks the last fired dataset version and reads added files in the version range (from_version, to_version]. If a threshold is not met, the cursor is not advanced, so the next append continues accumulating rows.

Do not append the routine's output back to the dataset it watches — that append re-fires the routine on its own output. To write results back, either update or overwrite a column on the watched dataset (column writes are not appends and do not re-fire), or append to a separate dataset.

Entrypoints

If a directory contains multiple routines, pass an entrypoint hint:

mixtrain routine create . --name embed-new-rows --entrypoint routines/embed.py:EmbedNewRows
mixtrain routine create . --name nightly-eval --entrypoint NightlyEval

When a directory contains only MixRoutine classes and no MixFlow classes, Mixtrain can auto-detect the routine entrypoint.

Crash Loop Protection

Completion triggers include loop protection so a chain of routines cannot recurse indefinitely. Repeated dispatch failures can disable a trigger until the routine is updated.

Next Steps

On this page