MixtrainDocs

Workflows are reusable ML pipelines built with MixFlow. They provide a structured lifecycle for setup, execution, and cleanup with configurable parameters.

Workflow inputs and outputs use Mixtrain Types for rich UI rendering.

Quick Start

from mixtrain import Workflow

workflow = Workflow("data-pipeline")
result = workflow.run({"input": "value"})
print(result["outputs"])

Key Concepts

Running Workflows

workflow = Workflow("data-pipeline")

# Synchronous - blocks until complete
result = workflow.run({"batch_size": 64})

# Asynchronous - returns immediately
run_info = workflow.submit({"batch_size": 64})
print(f"Started run #{run_info['run_number']}")

Workflow Properties

workflow = Workflow("data-pipeline")

print(workflow.name)        # Workflow name
print(workflow.description) # Description
print(workflow.runs)        # List of runs

Building Custom Workflows

Create workflows with MixFlow. Define inputs in the run() method signature:

from mixtrain import MixFlow

class TrainingWorkflow(MixFlow):
    def setup(self):
        print("Initializing...")

    def run(self, learning_rate: float = 0.001, epochs: int = 10):
        """Train the model.

        Args:
            learning_rate: Learning rate
            epochs: Training epochs
        """
        print(f"Training for {epochs} epochs with lr={learning_rate}")
        return {"status": "completed"}

    def cleanup(self):
        print("Cleaning up...")

Lifecycle Methods

MethodDescription
setup()Initialize resources, load data. Can receive inputs it declares.
run()Execute main logic. Inputs defined in signature. (required)
cleanup()Release resources, save artifacts

Configurable Inputs

Define inputs in the run() method signature:

class DataPipeline(MixFlow):
    def run(
        self,
        input_path: str,  # Required (no default)
        batch_size: int = 32,
        use_gpu: bool = True,
    ):
        """Process data pipeline.

        Args:
            input_path: Path to input data
            batch_size: Batch size
            use_gpu: Enable GPU
        """
        return {"status": "completed"}

Both calling styles work:

workflow = DataPipeline()

# Keyword arguments
result = workflow.run(input_path="/data", batch_size=64)

# Dict input
result = workflow.run({"input_path": "/data", "batch_size": 64})

GPU and Resource Configuration

Configure runtime environment with sandbox():

from mixtrain import MixFlow, sandbox

class GPUWorkflow(MixFlow):
    _sandbox = sandbox(
        image="nvcr.io/nvidia/pytorch:24.01-py3",
        gpu="T4",           # T4, A10G, L4, A100, H100, H200, B200
        memory=8192,        # MB
        timeout=1800,       # seconds
    )

Dependencies

Add a pyproject.toml, requirements.txt, or Dockerfile in your workflow folder to specify dependencies:

my-workflow/
├── my_workflow.py
└── pyproject.toml  # or requirements.txt or Dockerfile

Using pyproject.toml:

[project]
name = "my-workflow"
version = "0.1.0"
dependencies = [
    "pandas>=2.0.0",
    "torch>=2.0.0",
    "transformers>=4.40.0",
]

Using requirements.txt:

pandas>=2.0.0
torch>=2.0.0
transformers>=4.40.0

Using Dockerfile for full control over the environment:

FROM python:3.11-slim
RUN pip install pandas torch transformers

The platform automatically installs dependencies before running your workflow.

Next Steps

On this page