This guide enables Claude Code to write code for the Mixtrain ML platform, including models, workflows, evaluations, and datasets.
Platform Overview
Mixtrain is an ML platform with these core resources:
- Models - ML inference endpoints (custom or provider-hosted)
- Workflows - Multi-step orchestration pipelines
- Datasets - Data management with pandas/arrow support
- Evaluations - Model assessment configurations
Installation & Authentication
pip install mixtrainfrom mixtrain import MixClient
# Option 1: Environment variable (recommended)
# export MIXTRAIN_API_KEY=mix-...
client = MixClient()
# Option 2: Explicit API key
client = MixClient(api_key="mix-...")Writing Models
Models are Python classes that define inference logic. Extend MixModel and implement run().
Basic Model Structure
from mixtrain import MixModel
class MyModel(MixModel):
"""Description of what this model does."""
def setup(self):
"""Called once when model is loaded. Initialize resources here."""
# Load weights, initialize clients, etc.
pass
def run(
self,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 100,
) -> dict:
"""Run inference.
Args:
prompt: Text prompt to process (required)
temperature: Sampling temperature
max_tokens: Maximum output tokens
Returns:
Dictionary with outputs
"""
return {"output": result}
def cleanup(self):
"""Optional: Called when model is unloaded."""
passSandbox Configuration
Configure compute resources using the sandbox() function:
from mixtrain import MixModel, sandbox
class GPUModel(MixModel):
"""Model that runs on GPU."""
_sandbox = sandbox(
image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime",
gpu="A100",
memory=32768, # 32GB
timeout=3600, # Timeout after 1 hour
mixtrain_version="0.1.23",
)
def setup(self):
import torch
self.device = "cuda" if torch.cuda.is_available() else "cpu"
def run(self, prompt: str) -> dict:
"""Process the prompt.
Args:
prompt: Input prompt
"""
return {"result": process(prompt)}Sandbox options:
image: Docker image (e.g., "pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime")gpu: GPU type ("T4", "L4", "A10", "A100", "A100-80GB", "L40S", "H100", "H200", "B200")cpu: Number of CPU coresmemory: Memory in MBtimeout: Max execution time in secondscloud: Cloud provider ("gcp", "aws")region: Cloud regionnum_nodes: Number of nodes for distributed trainingmixtrain_version: SDK version to install
Model Examples
Text Classification Model
from mixtrain import MixModel
from transformers import pipeline
class TextClassifier(MixModel):
"""Classify text into sentiment categories."""
def setup(self, model_name: str = "distilbert-base-uncased-finetuned-sst-2-english"):
"""Initialize the classifier.
Args:
model_name: HuggingFace model to use
"""
self.classifier = pipeline("sentiment-analysis", model=model_name)
def run(self, text: str) -> dict:
"""Classify text sentiment.
Args:
text: Text to classify
"""
result = self.classifier(text)[0]
return {
"label": result["label"],
"confidence": result["score"]
}Image Generation Model
from mixtrain import MixModel
from mixtrain.types import Image
import replicate
class ImageGenerator(MixModel):
"""Generate images from text prompts."""
def setup(self):
self.client = replicate.Client()
def run(
self,
prompt: str,
negative_prompt: str = "",
width: int = 512,
height: int = 512,
) -> dict:
"""Generate an image from a text prompt.
Args:
prompt: Text prompt for image generation
negative_prompt: What to avoid in the image
width: Image width in pixels
height: Image height in pixels
"""
output = self.client.run(
"stability-ai/sdxl",
input={
"prompt": prompt,
"negative_prompt": negative_prompt,
"width": width,
"height": height
}
)
return {
"image": Image(url=output[0], width=width, height=height)
}Deploy Model via CLI
mixtrain model create ./my-model/ --name my-model # From directory
mixtrain model create model.py --name my-model # Single file
mixtrain model run my-model --input '{"text": "Hello world"}'Writing Workflows
Workflows orchestrate multi-step pipelines. Extend MixFlow and implement run().
Basic Workflow Structure
from mixtrain import MixFlow, Dataset
class MyWorkflow(MixFlow):
"""Description of what this workflow does."""
def setup(self):
"""Called before run. Initialize resources."""
pass
def run(self, input_dataset: Dataset, batch_size: int = 10) -> dict:
"""Execute the workflow.
Args:
input_dataset: Dataset to process (required)
batch_size: Batch size for processing
Returns:
Dictionary with outputs
"""
df = input_dataset.to_pandas()
return {"result": output}
def cleanup(self):
"""Optional: Called after run completes."""
passWorkflow Examples
Data Processing Pipeline
from mixtrain import MixFlow, Dataset
import pandas as pd
class DataProcessingPipeline(MixFlow):
"""Clean and transform a dataset."""
def run(
self,
input_dataset: Dataset,
output_dataset_name: str,
drop_nulls: bool = True,
normalize: bool = False,
) -> dict:
"""Clean and transform a dataset.
Args:
input_dataset: Source dataset
output_dataset_name: Destination dataset name
drop_nulls: Whether to drop null values
normalize: Whether to normalize numeric columns
"""
df = input_dataset.to_pandas()
original_rows = len(df)
# Clean data
if drop_nulls:
df = df.dropna()
# Normalize numeric columns
if normalize:
numeric_cols = df.select_dtypes(include=['number']).columns
df[numeric_cols] = (df[numeric_cols] - df[numeric_cols].mean()) / df[numeric_cols].std()
# Save to new dataset
output = Dataset.from_pandas(df).save(output_dataset_name)
return {
"original_rows": original_rows,
"processed_rows": len(df),
"output_dataset": output
}Model Training Pipeline
from mixtrain import MixFlow, Dataset
from mixtrain.types import JSON, Markdown
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib
class TrainingPipeline(MixFlow):
"""Train a classification model on a dataset."""
def run(
self,
input_dataset: Dataset,
target_column: str,
test_size: float = 0.2,
n_estimators: int = 100,
) -> dict:
"""Train a classification model.
Args:
input_dataset: Training dataset
target_column: Column to predict
test_size: Fraction of data for testing
n_estimators: Number of trees in the forest
"""
df = input_dataset.to_pandas()
# Split features and target
X = df.drop(columns=[target_column])
y = df[target_column]
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# Train model
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
# Save model
model_path = "/tmp/model.joblib"
joblib.dump(model, model_path)
return {
"accuracy": report["accuracy"],
"metrics": JSON(data=report),
"report": Markdown(content=f"# Training Complete\n\nAccuracy: {report['accuracy']:.2%}"),
"model_path": model_path
}Batch Inference Pipeline
from mixtrain import MixFlow, Dataset, Model
import pandas as pd
class BatchInferencePipeline(MixFlow):
"""Run batch inference on a dataset."""
def run(
self,
model: Model,
input_dataset: Dataset,
input_column: str,
output_dataset_name: str,
batch_size: int = 10,
) -> dict:
"""Run batch inference.
Args:
model: Model to use for inference
input_dataset: Dataset with inputs
input_column: Column containing model inputs
output_dataset_name: Dataset name to save results
batch_size: Number of items to process at once
"""
df = input_dataset.to_pandas()
results = []
for _, row in df.iterrows():
result = model.run(inputs={input_column: row[input_column]})
results.append(result.outputs)
# Add results to dataframe
df["prediction"] = results
# Save results
output = Dataset.from_pandas(df).save(output_dataset_name)
return {
"total": len(results),
"output_dataset": output
}Multi-Model Comparison Pipeline
from mixtrain import MixFlow, Dataset, Model
from mixtrain.types import JSON, Markdown
import pandas as pd
class ModelComparisonPipeline(MixFlow):
"""Compare multiple models on a test dataset."""
def run(
self,
models: list[Model],
test_dataset: Dataset,
input_column: str,
label_column: str,
) -> dict:
"""Compare multiple models.
Args:
models: List of models to compare
test_dataset: Dataset for evaluation
input_column: Input column name
label_column: Ground truth column
"""
df = test_dataset.to_pandas()
model_names = [m.name for m in models]
inputs_list = [{input_column: row[input_column]} for _, row in df.iterrows()]
labels = df[label_column].tolist()
# Run batch comparison across all models
batch_result = Model.batch(
models=model_names,
inputs_list=inputs_list,
max_in_flight=50,
)
# Calculate accuracy for each model
results = {}
for model_name in model_names:
predictions = [r.outputs.get("label") for r in batch_result[model_name]]
correct = sum(1 for p, l in zip(predictions, labels) if p == l)
results[model_name] = {
"accuracy": correct / len(labels),
"total": len(labels),
"correct": correct
}
# Find best model
best_model = max(results, key=lambda k: results[k]["accuracy"])
return {
"results": JSON(data=results),
"best_model": best_model,
"best_accuracy": results[best_model]["accuracy"],
"report": Markdown(content=self._generate_report(results))
}
def _generate_report(self, results: dict) -> str:
lines = ["# Model Comparison Results\n"]
for model, metrics in sorted(results.items(), key=lambda x: -x[1]["accuracy"]):
lines.append(f"- **{model}**: {metrics['accuracy']:.2%} ({metrics['correct']}/{metrics['total']})")
return "\n".join(lines)Deploy Workflow via CLI
mixtrain workflow create ./my-pipeline/ --name my-pipeline # From directory
mixtrain workflow create workflow.py --name my-pipeline # Single file
mixtrain workflow run my-pipeline --input '{"input_dataset": "training-data", "epochs": 10}'Working with Datasets
SDK Methods
from mixtrain import Dataset, Image, Text
# Get existing dataset (lazy reference)
dataset = Dataset("my-dataset")
# Create from file
dataset = Dataset.from_file("data.csv").save("new-dataset")
dataset = Dataset.from_file("data.parquet").save("new-dataset")
# Create from pandas DataFrame
dataset = Dataset.from_pandas(df).save("new-dataset")
# Create with column types for rich rendering
dataset = Dataset.from_pandas(df).save(
"new-dataset",
description="My dataset",
column_types={"image_url": Image, "caption": Text}
)
# Load as pandas DataFrame
df = dataset.to_pandas()
# Load as PyArrow Table
table = dataset.to_table()
# Get metadata
print(dataset.metadata) # row count, description, etc.
print(dataset.column_types) # {"col": "image", "text_col": "text"}
print(dataset.detailed_metadata) # Full schema info
# Update column types (for rich rendering in UI)
dataset.set_column_types({"url_column": Image, "text_column": Text})
# Update metadata
dataset.update_metadata(
description="My training dataset",
column_types={"image_url": Image}
)
# List all datasets
datasets = list_datasets()
# Delete dataset
dataset.delete()Using Datasets in Workflows
from mixtrain import MixFlow, Dataset
class DataAnalysis(MixFlow):
def run(self, input_dataset: Dataset):
"""Analyze a dataset.
Args:
input_dataset: Dataset to analyze
"""
df = input_dataset.to_pandas()
return {
"rows": len(df),
"columns": list(df.columns),
"summary": df.describe().to_dict()
}CLI Commands
mixtrain dataset list
mixtrain dataset create my-data data.csv
mixtrain dataset browse my-data # Interactive TUI browser
mixtrain dataset delete my-dataWorking with Evaluations
Evaluations provide side-by-side comparison views for datasets. They auto-generate visualization config from column types.
SDK Methods
from mixtrain import Eval, Dataset, Image, Text
# Get existing evaluation (lazy reference)
eval = Eval("my-eval")
# Create evaluation from dataset (recommended)
# Auto-generates viz_config from column types
ds = Dataset.from_pandas(df).save(
"results",
column_types={"prompt": Text, "model_a": Image, "model_b": Image}
)
eval = Eval.from_dataset(ds) # Creates "results-eval"
eval = Eval.from_dataset(ds, name="my-eval") # Custom name
eval = Eval.from_dataset(ds, columns=["prompt", "model_a"]) # Specific columns
# Create evaluation linked to existing dataset
eval = Eval.create("my-eval", dataset="results")
# Check if evaluation exists
if not Eval.exists("my-eval"):
Eval.from_dataset(ds, name="my-eval")
# Access properties
print(eval.dataset)
print(eval.description)
# List all evaluations
evals = Eval.list()
# Delete
eval.delete()Evaluation Workflow Pattern
from mixtrain import MixFlow, Dataset, Eval, Model, Image, Text, generate_name
class ModelComparisonEval(MixFlow):
"""Compare models and create evaluation view."""
def run(
self,
input_dataset: Dataset,
models: list[Model],
prompt_column: str = "prompt",
):
model_names = [m.name for m in models]
output_name = generate_name("comparison", "results")
eval_name = generate_name("comparison", "eval")
# Run models and collect results
prompts = input_dataset.to_pandas()[prompt_column].tolist()
inputs_list = [{"prompt": p} for p in prompts]
result = Model.batch(model_names, inputs_list, filter_failures=True)
df = result.to_pandas()
# Save with column types
output_dataset = Dataset.from_pandas(df).save(
output_name,
column_types={
prompt_column: Text,
**dict.fromkeys(model_names, Image),
},
)
# Create evaluation (auto-generates viz from column types)
Eval.from_dataset(output_dataset, name=eval_name)
return [Eval(eval_name), Dataset(output_name)]Types
Rich types for inputs and outputs that render well in the UI.
from mixtrain import Image, Video, Audio, Markdown, JSON, Text, Embedding, Model3D
# Media types
Image(url="https://...", width=512, height=512)
Video(url="https://...", duration_seconds=30)
Audio(url="https://...", duration_seconds=60)
Model3D(url="https://...model.glb")
# Content types
Text(content="Plain text")
Markdown(content="# Heading\n\n- Item 1")
JSON(data={"key": "value"})
Embedding(values=[0.1, 0.2, 0.3])Returning Results
Return media types, resource links, and structured data from workflows and models:
from mixtrain import MixFlow, Dataset, Model, Eval, Image, Video, Markdown
class MyWorkflow(MixFlow):
def run(self) -> dict:
ds = Dataset.from_file("/tmp/data.csv").save("output-data")
return {
# Media types (render inline)
"image": Image(url="https://...", width=512, height=512),
"video": Video(url="https://...", duration_seconds=30),
# Resource links (clickable in UI)
"dataset": ds,
"model": Model("my-model"),
"evaluation": Eval("my-eval"),
# Formatted content
"report": Markdown(content="# Results\n\n- Success"),
}Running Models Programmatically
from mixtrain import get_model, list_models, Model
# Get model (lazy reference)
model = get_model("my-model")
# Synchronous run (blocks until complete)
# Can pass inputs as dict or kwargs, and optional sandbox config
result = model.run(text="Hello", timeout=300)
result = model.run(inputs={"text": "Hello"}, sandbox={"gpu": "A100", "timeout": 3600})
print(result.outputs) # Raw outputs dict
print(result.status) # RunStatus.COMPLETED
print(result.video) # Typed accessor for video output (if present)
print(result.image) # Typed accessor for image output (if present)
# Async run (returns immediately)
run_info = model.submit(text="Hello")
run_info = model.submit(inputs={"text": "Hello"}, sandbox={"gpu": "A100"})
print(run_info["run_number"]) # Check status later
run = model.get_run(run_info["run_number"])
# Batch inference across multiple models
batch_result = Model.batch(
models=["model-a", "model-b", "model-c"],
inputs_list=[{"text": "test1"}, {"text": "test2"}],
max_in_flight=50, # Control concurrency
timeout=600,
filter_failures=True, # Only keep results where ALL models succeeded
)
# Access batch results
df = batch_result.to_pandas() # Convert to DataFrame
for model_name in ["model-a", "model-b"]:
for result in batch_result[model_name]:
print(result.outputs)
# Access run history
runs = model.runs # Cached list of runs
specific_run = model.get_run(run_number=5)
logs = model.get_logs(run_number=5)
# Model metadata
print(model.metadata)
print(model.spec)
print(model.source) # "native", "fal", "openai", etc.
# List all models
models = list_models()
models = list_models(provider="native") # Filter by providerRunning Workflows Programmatically
from mixtrain import Workflow
# Get workflow (lazy reference)
workflow = Workflow("my-pipeline")
# Synchronous run (blocks until complete)
# Can pass inputs as dict or kwargs, and optional sandbox config
result = workflow.run(epochs=10, learning_rate=0.001)
result = workflow.run(inputs={"epochs": 10}, sandbox={"gpu": "A100", "timeout": 3600})
print(result["outputs"])
print(result["status"])
# Async run (returns immediately)
run_info = workflow.submit(epochs=10, learning_rate=0.001)
run_info = workflow.submit(inputs={"epochs": 10}, sandbox={"gpu": "A100"})
print(run_info["run_number"])
# Check run status
run = workflow.get_run(run_info["run_number"])
# Access run history
runs = workflow.runs
# List all workflows
workflows = Workflow.list()Helper Functions
from mixtrain import (
generate_name,
validate_resource_name,
)
# Generate unique names with timestamps
name = generate_name("t2i", "eval") # "t2i_eval_jan_17_2026"
name = generate_name("image-caption", "results") # "image_caption_results_jan_17_2026"
# Validate resource names (raises ValueError if invalid)
validate_resource_name("my-model", "model") # OK
validate_resource_name("My Model", "model") # Raises ValueErrorCLI Reference
Authentication
mixtrain login # Browser-based OAuth login
mixtrain config # Show current configModels
mixtrain model list
mixtrain model create ./model-dir/ --name my-model # From directory
mixtrain model create model.py utils.py --name my-model # Multiple files
mixtrain model create "*.py" --name my-model # Glob pattern
mixtrain model run NAME --input '{"key": "value"}'
mixtrain model run NAME -i input.json --detach # Start and exit
mixtrain model run NAME -i '{"prompt": "test"}' --gpu A100 --timeout 3600 # With sandbox
mixtrain model logs NAME RUN_NUMBER
mixtrain model update NAME ./model-dir/ # Update from directory
mixtrain model update NAME model.py utils.py # Update specific files
mixtrain model delete NAME
mixtrain model cancel NAME RUN_NUMBER
mixtrain model code NAME # List all files
mixtrain model code NAME -f model.py # View specific file
mixtrain model code NAME --edit # Edit in $EDITOR
mixtrain model get NAME # Get model details
mixtrain model runs NAME # List runs
mixtrain model catalog # Browse external modelsWorkflows
mixtrain workflow list
mixtrain workflow create ./workflow-dir/ --name my-workflow # From directory
mixtrain workflow create workflow.py utils.py --name my-workflow # Multiple files
mixtrain workflow create "*.py" --name my-workflow # Glob pattern
mixtrain workflow run NAME --input '{"param": "value"}'
mixtrain workflow run NAME -i input.json --detach
mixtrain workflow run NAME -i '{"epochs": 10}' --gpu A100 --timeout 3600 # With sandbox
mixtrain workflow logs NAME RUN_NUMBER
mixtrain workflow update NAME ./workflow-dir/ # Update from directory
mixtrain workflow update NAME workflow.py utils.py # Update specific files
mixtrain workflow delete NAME
mixtrain workflow cancel NAME RUN_NUMBER
mixtrain workflow code NAME # List all files
mixtrain workflow code NAME -f main.py # View specific file
mixtrain workflow code NAME --edit # Edit in $EDITOR
mixtrain workflow get NAME # Get workflow details
mixtrain workflow runs NAME # List runsDatasets
mixtrain dataset list
mixtrain dataset create NAME file.csv
mixtrain dataset browse NAME # Interactive browser
mixtrain dataset delete NAMESecrets
mixtrain secret list
mixtrain secret set NAME # Create or update secret
mixtrain secret get NAME # View secret details
mixtrain secret delete NAMEEnvironment Variables
MIXTRAIN_API_KEY=mix-... # API key for authenticationCommon Patterns
Access secrets in models/workflows
from mixtrain import MixModel
class MyModel(MixModel):
def setup(self):
# MixClient is available as self.mix
api_key = self.mix.get_secret("OPENAI_API_KEY")
self.openai = OpenAI(api_key=api_key)Chain models in a workflow
from mixtrain import MixFlow, Model
class ChainedPipeline(MixFlow):
def run(
self,
document: str,
extractor: Model,
summarizer: Model,
translator: Model,
):
"""Process a document through multiple models.
Args:
document: Document to process
extractor: Model for text extraction
summarizer: Model for summarization
translator: Model for translation
"""
# Step 1: Extract
extracted = extractor.run(inputs={"document": document})
# Step 2: Summarize
summary = summarizer.run(inputs={"text": extracted.outputs["text"]})
# Step 3: Translate
translated = translator.run(inputs={
"text": summary.outputs["summary"],
"target_lang": "es"
})
return {"result": translated.outputs["translation"]}Error handling
from mixtrain import Model
model = Model("my-model")
try:
result = model.run(inputs={"text": "Hello"}, timeout=300)
if result.status == "completed":
print(result.outputs)
else:
print(f"Run failed: {result.error}")
except TimeoutError:
print("Model inference timed out")
except Exception as e:
print(f"Error: {e}")Workflow dependencies
Add a pyproject.toml, requirements.txt, or Dockerfile in your workflow folder:
# pyproject.toml
[project]
name = "my-workflow"
version = "0.1.0"
dependencies = [
"pandas",
"scikit-learn",
"numpy",
]Or use requirements.txt:
pandas
scikit-learn
numpyOr use Dockerfile for full environment control:
FROM python:3.11-slim
RUN pip install pandas scikit-learn numpyThe platform automatically installs dependencies before running your workflow.
Using batch inference with progress
from mixtrain import Model
def progress_callback(completed, total):
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
result = Model.batch(
models=["flux", "stable-diffusion"],
inputs_list=[{"prompt": f"Image {i}"} for i in range(100)],
max_in_flight=20,
progress_callback=progress_callback,
filter_failures=True, # Skip inputs where any model failed
)
# Convert to DataFrame for analysis
df = result.to_pandas()
print(df.head())