MixtrainDocs

This guide enables Claude Code to write code for the Mixtrain ML platform, including models, workflows, evaluations, and datasets.

Platform Overview

Mixtrain is an ML platform with these core resources:

  • Models - ML inference endpoints (custom or provider-hosted)
  • Workflows - Multi-step orchestration pipelines
  • Datasets - Data management with pandas/arrow support
  • Evaluations - Model assessment configurations

Installation & Authentication

pip install mixtrain
from mixtrain import MixClient

# Option 1: Environment variable (recommended)
# export MIXTRAIN_API_KEY=mix-...
client = MixClient()

# Option 2: Explicit API key
client = MixClient(api_key="mix-...")

Writing Models

Models are Python classes that define inference logic. Extend MixModel and implement run().

Basic Model Structure

from mixtrain import MixModel

class MyModel(MixModel):
    """Description of what this model does."""

    def setup(self):
        """Called once when model is loaded. Initialize resources here."""
        # Load weights, initialize clients, etc.
        pass

    def run(
        self,
        prompt: str,
        temperature: float = 0.7,
        max_tokens: int = 100,
    ) -> dict:
        """Run inference.

        Args:
            prompt: Text prompt to process (required)
            temperature: Sampling temperature
            max_tokens: Maximum output tokens

        Returns:
            Dictionary with outputs
        """
        return {"output": result}

    def cleanup(self):
        """Optional: Called when model is unloaded."""
        pass

Sandbox Configuration

Configure compute resources using the sandbox() function:

from mixtrain import MixModel, sandbox

class GPUModel(MixModel):
    """Model that runs on GPU."""

    _sandbox = sandbox(
        image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime",
        gpu="A100",
        memory=32768,  # 32GB
        timeout=3600,  # Timeout after 1 hour
        mixtrain_version="0.1.23",
    )

    def setup(self):
        import torch
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

    def run(self, prompt: str) -> dict:
        """Process the prompt.

        Args:
            prompt: Input prompt
        """
        return {"result": process(prompt)}

Sandbox options:

  • image: Docker image (e.g., "pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime")
  • gpu: GPU type ("T4", "L4", "A10", "A100", "A100-80GB", "L40S", "H100", "H200", "B200")
  • cpu: Number of CPU cores
  • memory: Memory in MB
  • timeout: Max execution time in seconds
  • cloud: Cloud provider ("gcp", "aws")
  • region: Cloud region
  • num_nodes: Number of nodes for distributed training
  • mixtrain_version: SDK version to install

Model Examples

Text Classification Model

from mixtrain import MixModel
from transformers import pipeline

class TextClassifier(MixModel):
    """Classify text into sentiment categories."""

    def setup(self, model_name: str = "distilbert-base-uncased-finetuned-sst-2-english"):
        """Initialize the classifier.

        Args:
            model_name: HuggingFace model to use
        """
        self.classifier = pipeline("sentiment-analysis", model=model_name)

    def run(self, text: str) -> dict:
        """Classify text sentiment.

        Args:
            text: Text to classify
        """
        result = self.classifier(text)[0]
        return {
            "label": result["label"],
            "confidence": result["score"]
        }

Image Generation Model

from mixtrain import MixModel
from mixtrain.types import Image
import replicate

class ImageGenerator(MixModel):
    """Generate images from text prompts."""

    def setup(self):
        self.client = replicate.Client()

    def run(
        self,
        prompt: str,
        negative_prompt: str = "",
        width: int = 512,
        height: int = 512,
    ) -> dict:
        """Generate an image from a text prompt.

        Args:
            prompt: Text prompt for image generation
            negative_prompt: What to avoid in the image
            width: Image width in pixels
            height: Image height in pixels
        """
        output = self.client.run(
            "stability-ai/sdxl",
            input={
                "prompt": prompt,
                "negative_prompt": negative_prompt,
                "width": width,
                "height": height
            }
        )
        return {
            "image": Image(url=output[0], width=width, height=height)
        }

Deploy Model via CLI

mixtrain model create ./my-model/ --name my-model   # From directory
mixtrain model create model.py --name my-model      # Single file
mixtrain model run my-model --input '{"text": "Hello world"}'

Writing Workflows

Workflows orchestrate multi-step pipelines. Extend MixFlow and implement run().

Basic Workflow Structure

from mixtrain import MixFlow, Dataset

class MyWorkflow(MixFlow):
    """Description of what this workflow does."""

    def setup(self):
        """Called before run. Initialize resources."""
        pass

    def run(self, input_dataset: Dataset, batch_size: int = 10) -> dict:
        """Execute the workflow.

        Args:
            input_dataset: Dataset to process (required)
            batch_size: Batch size for processing

        Returns:
            Dictionary with outputs
        """
        df = input_dataset.to_pandas()
        return {"result": output}

    def cleanup(self):
        """Optional: Called after run completes."""
        pass

Workflow Examples

Data Processing Pipeline

from mixtrain import MixFlow, Dataset
import pandas as pd

class DataProcessingPipeline(MixFlow):
    """Clean and transform a dataset."""

    def run(
        self,
        input_dataset: Dataset,
        output_dataset_name: str,
        drop_nulls: bool = True,
        normalize: bool = False,
    ) -> dict:
        """Clean and transform a dataset.

        Args:
            input_dataset: Source dataset
            output_dataset_name: Destination dataset name
            drop_nulls: Whether to drop null values
            normalize: Whether to normalize numeric columns
        """
        df = input_dataset.to_pandas()
        original_rows = len(df)

        # Clean data
        if drop_nulls:
            df = df.dropna()

        # Normalize numeric columns
        if normalize:
            numeric_cols = df.select_dtypes(include=['number']).columns
            df[numeric_cols] = (df[numeric_cols] - df[numeric_cols].mean()) / df[numeric_cols].std()

        # Save to new dataset
        output = Dataset.from_pandas(df).save(output_dataset_name)

        return {
            "original_rows": original_rows,
            "processed_rows": len(df),
            "output_dataset": output
        }

Model Training Pipeline

from mixtrain import MixFlow, Dataset
from mixtrain.types import JSON, Markdown
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib

class TrainingPipeline(MixFlow):
    """Train a classification model on a dataset."""

    def run(
        self,
        input_dataset: Dataset,
        target_column: str,
        test_size: float = 0.2,
        n_estimators: int = 100,
    ) -> dict:
        """Train a classification model.

        Args:
            input_dataset: Training dataset
            target_column: Column to predict
            test_size: Fraction of data for testing
            n_estimators: Number of trees in the forest
        """
        df = input_dataset.to_pandas()

        # Split features and target
        X = df.drop(columns=[target_column])
        y = df[target_column]

        # Train/test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42
        )

        # Train model
        model = RandomForestClassifier(n_estimators=n_estimators)
        model.fit(X_train, y_train)

        # Evaluate
        y_pred = model.predict(X_test)
        report = classification_report(y_test, y_pred, output_dict=True)

        # Save model
        model_path = "/tmp/model.joblib"
        joblib.dump(model, model_path)

        return {
            "accuracy": report["accuracy"],
            "metrics": JSON(data=report),
            "report": Markdown(content=f"# Training Complete\n\nAccuracy: {report['accuracy']:.2%}"),
            "model_path": model_path
        }

Batch Inference Pipeline

from mixtrain import MixFlow, Dataset, Model
import pandas as pd

class BatchInferencePipeline(MixFlow):
    """Run batch inference on a dataset."""

    def run(
        self,
        model: Model,
        input_dataset: Dataset,
        input_column: str,
        output_dataset_name: str,
        batch_size: int = 10,
    ) -> dict:
        """Run batch inference.

        Args:
            model: Model to use for inference
            input_dataset: Dataset with inputs
            input_column: Column containing model inputs
            output_dataset_name: Dataset name to save results
            batch_size: Number of items to process at once
        """
        df = input_dataset.to_pandas()

        results = []
        for _, row in df.iterrows():
            result = model.run(inputs={input_column: row[input_column]})
            results.append(result.outputs)

        # Add results to dataframe
        df["prediction"] = results

        # Save results
        output = Dataset.from_pandas(df).save(output_dataset_name)

        return {
            "total": len(results),
            "output_dataset": output
        }

Multi-Model Comparison Pipeline

from mixtrain import MixFlow, Dataset, Model
from mixtrain.types import JSON, Markdown
import pandas as pd

class ModelComparisonPipeline(MixFlow):
    """Compare multiple models on a test dataset."""

    def run(
        self,
        models: list[Model],
        test_dataset: Dataset,
        input_column: str,
        label_column: str,
    ) -> dict:
        """Compare multiple models.

        Args:
            models: List of models to compare
            test_dataset: Dataset for evaluation
            input_column: Input column name
            label_column: Ground truth column
        """
        df = test_dataset.to_pandas()
        model_names = [m.name for m in models]

        inputs_list = [{input_column: row[input_column]} for _, row in df.iterrows()]
        labels = df[label_column].tolist()

        # Run batch comparison across all models
        batch_result = Model.batch(
            models=model_names,
            inputs_list=inputs_list,
            max_in_flight=50,
        )

        # Calculate accuracy for each model
        results = {}
        for model_name in model_names:
            predictions = [r.outputs.get("label") for r in batch_result[model_name]]
            correct = sum(1 for p, l in zip(predictions, labels) if p == l)
            results[model_name] = {
                "accuracy": correct / len(labels),
                "total": len(labels),
                "correct": correct
            }

        # Find best model
        best_model = max(results, key=lambda k: results[k]["accuracy"])

        return {
            "results": JSON(data=results),
            "best_model": best_model,
            "best_accuracy": results[best_model]["accuracy"],
            "report": Markdown(content=self._generate_report(results))
        }

    def _generate_report(self, results: dict) -> str:
        lines = ["# Model Comparison Results\n"]
        for model, metrics in sorted(results.items(), key=lambda x: -x[1]["accuracy"]):
            lines.append(f"- **{model}**: {metrics['accuracy']:.2%} ({metrics['correct']}/{metrics['total']})")
        return "\n".join(lines)

Deploy Workflow via CLI

mixtrain workflow create ./my-pipeline/ --name my-pipeline  # From directory
mixtrain workflow create workflow.py --name my-pipeline     # Single file
mixtrain workflow run my-pipeline --input '{"input_dataset": "training-data", "epochs": 10}'

Working with Datasets

SDK Methods

from mixtrain import Dataset, Image, Text

# Get existing dataset (lazy reference)
dataset = Dataset("my-dataset")

# Create from file
dataset = Dataset.from_file("data.csv").save("new-dataset")
dataset = Dataset.from_file("data.parquet").save("new-dataset")

# Create from pandas DataFrame
dataset = Dataset.from_pandas(df).save("new-dataset")

# Create with column types for rich rendering
dataset = Dataset.from_pandas(df).save(
    "new-dataset",
    description="My dataset",
    column_types={"image_url": Image, "caption": Text}
)

# Load as pandas DataFrame
df = dataset.to_pandas()

# Load as PyArrow Table
table = dataset.to_table()

# Get metadata
print(dataset.metadata)  # row count, description, etc.
print(dataset.column_types)  # {"col": "image", "text_col": "text"}
print(dataset.detailed_metadata)  # Full schema info

# Update column types (for rich rendering in UI)
dataset.set_column_types({"url_column": Image, "text_column": Text})

# Update metadata
dataset.update_metadata(
    description="My training dataset",
    column_types={"image_url": Image}
)

# List all datasets
datasets = list_datasets()

# Delete dataset
dataset.delete()

Using Datasets in Workflows

from mixtrain import MixFlow, Dataset

class DataAnalysis(MixFlow):
    def run(self, input_dataset: Dataset):
        """Analyze a dataset.

        Args:
            input_dataset: Dataset to analyze
        """
        df = input_dataset.to_pandas()

        return {
            "rows": len(df),
            "columns": list(df.columns),
            "summary": df.describe().to_dict()
        }

CLI Commands

mixtrain dataset list
mixtrain dataset create my-data data.csv
mixtrain dataset browse my-data  # Interactive TUI browser
mixtrain dataset delete my-data

Working with Evaluations

Evaluations provide side-by-side comparison views for datasets. They auto-generate visualization config from column types.

SDK Methods

from mixtrain import Eval, Dataset, Image, Text

# Get existing evaluation (lazy reference)
eval = Eval("my-eval")

# Create evaluation from dataset (recommended)
# Auto-generates viz_config from column types
ds = Dataset.from_pandas(df).save(
    "results",
    column_types={"prompt": Text, "model_a": Image, "model_b": Image}
)
eval = Eval.from_dataset(ds)  # Creates "results-eval"
eval = Eval.from_dataset(ds, name="my-eval")  # Custom name
eval = Eval.from_dataset(ds, columns=["prompt", "model_a"])  # Specific columns

# Create evaluation linked to existing dataset
eval = Eval.create("my-eval", dataset="results")

# Check if evaluation exists
if not Eval.exists("my-eval"):
    Eval.from_dataset(ds, name="my-eval")

# Access properties
print(eval.dataset)
print(eval.description)

# List all evaluations
evals = Eval.list()

# Delete
eval.delete()

Evaluation Workflow Pattern

from mixtrain import MixFlow, Dataset, Eval, Model, Image, Text, generate_name

class ModelComparisonEval(MixFlow):
    """Compare models and create evaluation view."""

    def run(
        self,
        input_dataset: Dataset,
        models: list[Model],
        prompt_column: str = "prompt",
    ):
        model_names = [m.name for m in models]
        output_name = generate_name("comparison", "results")
        eval_name = generate_name("comparison", "eval")

        # Run models and collect results
        prompts = input_dataset.to_pandas()[prompt_column].tolist()
        inputs_list = [{"prompt": p} for p in prompts]
        result = Model.batch(model_names, inputs_list, filter_failures=True)
        df = result.to_pandas()

        # Save with column types
        output_dataset = Dataset.from_pandas(df).save(
            output_name,
            column_types={
                prompt_column: Text,
                **dict.fromkeys(model_names, Image),
            },
        )

        # Create evaluation (auto-generates viz from column types)
        Eval.from_dataset(output_dataset, name=eval_name)

        return [Eval(eval_name), Dataset(output_name)]

Types

Rich types for inputs and outputs that render well in the UI.

from mixtrain import Image, Video, Audio, Markdown, JSON, Text, Embedding, Model3D

# Media types
Image(url="https://...", width=512, height=512)
Video(url="https://...", duration_seconds=30)
Audio(url="https://...", duration_seconds=60)
Model3D(url="https://...model.glb")

# Content types
Text(content="Plain text")
Markdown(content="# Heading\n\n- Item 1")
JSON(data={"key": "value"})
Embedding(values=[0.1, 0.2, 0.3])

Returning Results

Return media types, resource links, and structured data from workflows and models:

from mixtrain import MixFlow, Dataset, Model, Eval, Image, Video, Markdown

class MyWorkflow(MixFlow):
    def run(self) -> dict:
        ds = Dataset.from_file("/tmp/data.csv").save("output-data")

        return {
            # Media types (render inline)
            "image": Image(url="https://...", width=512, height=512),
            "video": Video(url="https://...", duration_seconds=30),

            # Resource links (clickable in UI)
            "dataset": ds,
            "model": Model("my-model"),
            "evaluation": Eval("my-eval"),

            # Formatted content
            "report": Markdown(content="# Results\n\n- Success"),
        }

Running Models Programmatically

from mixtrain import get_model, list_models, Model

# Get model (lazy reference)
model = get_model("my-model")

# Synchronous run (blocks until complete)
# Can pass inputs as dict or kwargs, and optional sandbox config
result = model.run(text="Hello", timeout=300)
result = model.run(inputs={"text": "Hello"}, sandbox={"gpu": "A100", "timeout": 3600})
print(result.outputs)  # Raw outputs dict
print(result.status)   # RunStatus.COMPLETED
print(result.video)    # Typed accessor for video output (if present)
print(result.image)    # Typed accessor for image output (if present)

# Async run (returns immediately)
run_info = model.submit(text="Hello")
run_info = model.submit(inputs={"text": "Hello"}, sandbox={"gpu": "A100"})
print(run_info["run_number"])  # Check status later
run = model.get_run(run_info["run_number"])

# Batch inference across multiple models
batch_result = Model.batch(
    models=["model-a", "model-b", "model-c"],
    inputs_list=[{"text": "test1"}, {"text": "test2"}],
    max_in_flight=50,  # Control concurrency
    timeout=600,
    filter_failures=True,  # Only keep results where ALL models succeeded
)

# Access batch results
df = batch_result.to_pandas()  # Convert to DataFrame
for model_name in ["model-a", "model-b"]:
    for result in batch_result[model_name]:
        print(result.outputs)

# Access run history
runs = model.runs  # Cached list of runs
specific_run = model.get_run(run_number=5)
logs = model.get_logs(run_number=5)

# Model metadata
print(model.metadata)
print(model.spec)
print(model.source)  # "native", "fal", "openai", etc.

# List all models
models = list_models()
models = list_models(provider="native")  # Filter by provider

Running Workflows Programmatically

from mixtrain import Workflow

# Get workflow (lazy reference)
workflow = Workflow("my-pipeline")

# Synchronous run (blocks until complete)
# Can pass inputs as dict or kwargs, and optional sandbox config
result = workflow.run(epochs=10, learning_rate=0.001)
result = workflow.run(inputs={"epochs": 10}, sandbox={"gpu": "A100", "timeout": 3600})
print(result["outputs"])
print(result["status"])

# Async run (returns immediately)
run_info = workflow.submit(epochs=10, learning_rate=0.001)
run_info = workflow.submit(inputs={"epochs": 10}, sandbox={"gpu": "A100"})
print(run_info["run_number"])

# Check run status
run = workflow.get_run(run_info["run_number"])

# Access run history
runs = workflow.runs

# List all workflows
workflows = Workflow.list()

Helper Functions

from mixtrain import (
    generate_name,
    validate_resource_name,
)

# Generate unique names with timestamps
name = generate_name("t2i", "eval")  # "t2i_eval_jan_17_2026"
name = generate_name("image-caption", "results")  # "image_caption_results_jan_17_2026"

# Validate resource names (raises ValueError if invalid)
validate_resource_name("my-model", "model")  # OK
validate_resource_name("My Model", "model")  # Raises ValueError

CLI Reference

Authentication

mixtrain login                    # Browser-based OAuth login
mixtrain config                   # Show current config

Models

mixtrain model list
mixtrain model create ./model-dir/ --name my-model    # From directory
mixtrain model create model.py utils.py --name my-model  # Multiple files
mixtrain model create "*.py" --name my-model          # Glob pattern
mixtrain model run NAME --input '{"key": "value"}'
mixtrain model run NAME -i input.json --detach  # Start and exit
mixtrain model run NAME -i '{"prompt": "test"}' --gpu A100 --timeout 3600  # With sandbox
mixtrain model logs NAME RUN_NUMBER
mixtrain model update NAME ./model-dir/               # Update from directory
mixtrain model update NAME model.py utils.py          # Update specific files
mixtrain model delete NAME
mixtrain model cancel NAME RUN_NUMBER
mixtrain model code NAME              # List all files
mixtrain model code NAME -f model.py  # View specific file
mixtrain model code NAME --edit       # Edit in $EDITOR
mixtrain model get NAME           # Get model details
mixtrain model runs NAME          # List runs
mixtrain model catalog            # Browse external models

Workflows

mixtrain workflow list
mixtrain workflow create ./workflow-dir/ --name my-workflow  # From directory
mixtrain workflow create workflow.py utils.py --name my-workflow  # Multiple files
mixtrain workflow create "*.py" --name my-workflow         # Glob pattern
mixtrain workflow run NAME --input '{"param": "value"}'
mixtrain workflow run NAME -i input.json --detach
mixtrain workflow run NAME -i '{"epochs": 10}' --gpu A100 --timeout 3600  # With sandbox
mixtrain workflow logs NAME RUN_NUMBER
mixtrain workflow update NAME ./workflow-dir/              # Update from directory
mixtrain workflow update NAME workflow.py utils.py         # Update specific files
mixtrain workflow delete NAME
mixtrain workflow cancel NAME RUN_NUMBER
mixtrain workflow code NAME              # List all files
mixtrain workflow code NAME -f main.py   # View specific file
mixtrain workflow code NAME --edit       # Edit in $EDITOR
mixtrain workflow get NAME           # Get workflow details
mixtrain workflow runs NAME          # List runs

Datasets

mixtrain dataset list
mixtrain dataset create NAME file.csv
mixtrain dataset browse NAME      # Interactive browser
mixtrain dataset delete NAME

Secrets

mixtrain secret list
mixtrain secret set NAME          # Create or update secret
mixtrain secret get NAME          # View secret details
mixtrain secret delete NAME

Environment Variables

MIXTRAIN_API_KEY=mix-...          # API key for authentication

Common Patterns

Access secrets in models/workflows

from mixtrain import MixModel

class MyModel(MixModel):
    def setup(self):
        # MixClient is available as self.mix
        api_key = self.mix.get_secret("OPENAI_API_KEY")
        self.openai = OpenAI(api_key=api_key)

Chain models in a workflow

from mixtrain import MixFlow, Model

class ChainedPipeline(MixFlow):
    def run(
        self,
        document: str,
        extractor: Model,
        summarizer: Model,
        translator: Model,
    ):
        """Process a document through multiple models.

        Args:
            document: Document to process
            extractor: Model for text extraction
            summarizer: Model for summarization
            translator: Model for translation
        """
        # Step 1: Extract
        extracted = extractor.run(inputs={"document": document})

        # Step 2: Summarize
        summary = summarizer.run(inputs={"text": extracted.outputs["text"]})

        # Step 3: Translate
        translated = translator.run(inputs={
            "text": summary.outputs["summary"],
            "target_lang": "es"
        })

        return {"result": translated.outputs["translation"]}

Error handling

from mixtrain import Model

model = Model("my-model")

try:
    result = model.run(inputs={"text": "Hello"}, timeout=300)
    if result.status == "completed":
        print(result.outputs)
    else:
        print(f"Run failed: {result.error}")
except TimeoutError:
    print("Model inference timed out")
except Exception as e:
    print(f"Error: {e}")

Workflow dependencies

Add a pyproject.toml, requirements.txt, or Dockerfile in your workflow folder:

# pyproject.toml
[project]
name = "my-workflow"
version = "0.1.0"
dependencies = [
    "pandas",
    "scikit-learn",
    "numpy",
]

Or use requirements.txt:

pandas
scikit-learn
numpy

Or use Dockerfile for full environment control:

FROM python:3.11-slim
RUN pip install pandas scikit-learn numpy

The platform automatically installs dependencies before running your workflow.

Using batch inference with progress

from mixtrain import Model

def progress_callback(completed, total):
    print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")

result = Model.batch(
    models=["flux", "stable-diffusion"],
    inputs_list=[{"prompt": f"Image {i}"} for i in range(100)],
    max_in_flight=20,
    progress_callback=progress_callback,
    filter_failures=True,  # Skip inputs where any model failed
)

# Convert to DataFrame for analysis
df = result.to_pandas()
print(df.head())

On this page