This guide enables Claude Code to write code for the Mixtrain ML platform, including models, workflows, evaluations, and datasets.
Platform Overview
Mixtrain is an ML platform with these core resources:
- Models - ML inference endpoints (custom or provider-hosted)
- Workflows - Multi-step orchestration pipelines
- Datasets - Data management with pandas/arrow support
- Evaluations - Model assessment configurations
Installation & Authentication
pip install mixtrainfrom mixtrain import MixClient
# Option 1: Environment variable (recommended)
# export MIXTRAIN_API_KEY=mix-...
client = MixClient()
# Option 2: Explicit API key
client = MixClient(api_key="mix-...")Writing Models
Models are Python classes that define inference logic. Extend MixModel and implement run().
Basic Model Structure
from mixtrain import MixModel
class MyModel(MixModel):
"""Description of what this model does."""
def setup(self):
"""Called once when model is loaded. Initialize resources here."""
# Load weights, initialize clients, etc.
pass
def run(
self,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 100,
) -> dict:
"""Run inference.
Args:
prompt: Text prompt to process (required)
temperature: Sampling temperature
max_tokens: Maximum output tokens
Returns:
Dictionary with outputs
"""
return {"output": result}
def cleanup(self):
"""Optional: Called when model is unloaded."""
passSandbox Configuration
Configure compute resources using the Sandbox class:
from mixtrain import MixModel, Sandbox
class GPUModel(MixModel):
"""Model that runs on GPU."""
_sandbox = Sandbox(
image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime",
gpu="A100",
memory=32768, # 32GB
timeout=3600, # Timeout after 1 hour
mixtrain_version="0.1.23",
)
def setup(self):
import torch
self.device = "cuda" if torch.cuda.is_available() else "cpu"
def run(self, prompt: str) -> dict:
"""Process the prompt.
Args:
prompt: Input prompt
"""
return {"result": process(prompt)}Sandbox options:
image: Docker image (e.g., "pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime")gpu: GPU type ("T4", "L4", "A10", "A100", "A100-80GB", "L40S", "H100", "H200", "B200")cpu: Number of CPU coresmemory: Memory in MBtimeout: Max execution time in secondscloud: Cloud provider ("gcp", "aws")region: Cloud regionnum_nodes: Number of nodes for distributed trainingmin_containers: Minimum warm containersmixtrain_version: SDK version to install
Model Examples
Text Classification Model
from mixtrain import MixModel
from transformers import pipeline
class TextClassifier(MixModel):
"""Classify text into sentiment categories."""
def setup(self, model_name: str = "distilbert-base-uncased-finetuned-sst-2-english"):
"""Initialize the classifier.
Args:
model_name: HuggingFace model to use
"""
self.classifier = pipeline("sentiment-analysis", model=model_name)
def run(self, text: str) -> dict:
"""Classify text sentiment.
Args:
text: Text to classify
"""
result = self.classifier(text)[0]
return {
"label": result["label"],
"confidence": result["score"]
}Image Generation Model
from mixtrain import MixModel
from mixtrain.types import Image
import replicate
class ImageGenerator(MixModel):
"""Generate images from text prompts."""
def setup(self):
self.client = replicate.Client()
def run(
self,
prompt: str,
negative_prompt: str = "",
width: int = 512,
height: int = 512,
) -> dict:
"""Generate an image from a text prompt.
Args:
prompt: Text prompt for image generation
negative_prompt: What to avoid in the image
width: Image width in pixels
height: Image height in pixels
"""
output = self.client.run(
"stability-ai/sdxl",
input={
"prompt": prompt,
"negative_prompt": negative_prompt,
"width": width,
"height": height
}
)
return {
"image": Image(output[0], width=width, height=height)
}Deploy Model via CLI
mixtrain model create ./my-model/ --name my-model # From directory
mixtrain model create model.py --name my-model # Single file
mixtrain model run my-model --text "Hello world" # Spec inputs as flags (see --help)Writing Workflows
Workflows orchestrate multi-step pipelines. Extend MixFlow and implement run().
Basic Workflow Structure
from mixtrain import MixFlow, Dataset
class MyWorkflow(MixFlow):
"""Description of what this workflow does."""
def setup(self):
"""Called before run. Initialize resources."""
pass
def run(self, input_dataset: Dataset, batch_size: int = 10) -> dict:
"""Execute the workflow.
Args:
input_dataset: Dataset to process (required)
batch_size: Batch size for processing
Returns:
Dictionary with outputs
"""
df = input_dataset.to_pandas()
return {"result": output}
def cleanup(self):
"""Optional: Called after run completes."""
passWorkflow Examples
Data Processing Pipeline
from mixtrain import MixFlow, Dataset
import pandas as pd
class DataProcessingPipeline(MixFlow):
"""Clean and transform a dataset."""
def run(
self,
input_dataset: Dataset,
output_dataset_name: str,
drop_nulls: bool = True,
normalize: bool = False,
) -> dict:
"""Clean and transform a dataset.
Args:
input_dataset: Source dataset
output_dataset_name: Destination dataset name
drop_nulls: Whether to drop null values
normalize: Whether to normalize numeric columns
"""
df = input_dataset.to_pandas()
original_rows = len(df)
# Clean data
if drop_nulls:
df = df.dropna()
# Normalize numeric columns
if normalize:
numeric_cols = df.select_dtypes(include=['number']).columns
df[numeric_cols] = (df[numeric_cols] - df[numeric_cols].mean()) / df[numeric_cols].std()
# Save to new dataset
output = Dataset.from_pandas(df).save(output_dataset_name)
return {
"original_rows": original_rows,
"processed_rows": len(df),
"output_dataset": output
}Model Training Pipeline
from mixtrain import MixFlow, Dataset
from mixtrain.types import JSON, Markdown
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib
class TrainingPipeline(MixFlow):
"""Train a classification model on a dataset."""
def run(
self,
input_dataset: Dataset,
target_column: str,
test_size: float = 0.2,
n_estimators: int = 100,
) -> dict:
"""Train a classification model.
Args:
input_dataset: Training dataset
target_column: Column to predict
test_size: Fraction of data for testing
n_estimators: Number of trees in the forest
"""
df = input_dataset.to_pandas()
# Split features and target
X = df.drop(columns=[target_column])
y = df[target_column]
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# Train model
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
# Save model
model_path = "/tmp/model.joblib"
joblib.dump(model, model_path)
return {
"accuracy": report["accuracy"],
"metrics": JSON(data=report),
"report": Markdown(content=f"# Training Complete\n\nAccuracy: {report['accuracy']:.2%}"),
"model_path": model_path
}Batch Inference Pipeline
from mixtrain import MixFlow, Dataset, Model
import pandas as pd
class BatchInferencePipeline(MixFlow):
"""Run batch inference on a dataset."""
def run(
self,
model: Model,
input_dataset: Dataset,
input_column: str,
output_dataset_name: str,
batch_size: int = 10,
) -> dict:
"""Run batch inference.
Args:
model: Model to use for inference
input_dataset: Dataset with inputs
input_column: Column containing model inputs
output_dataset_name: Dataset name to save results
batch_size: Number of items to process at once
"""
df = input_dataset.to_pandas()
results = []
for _, row in df.iterrows():
result = model.run(inputs={input_column: row[input_column]})
results.append(result.outputs)
# Add results to dataframe
df["prediction"] = results
# Save results
output = Dataset.from_pandas(df).save(output_dataset_name)
return {
"total": len(results),
"output_dataset": output
}Multi-Model Comparison Pipeline
from mixtrain import MixFlow, Dataset, Model
from mixtrain.types import JSON, Markdown
import pandas as pd
class ModelComparisonPipeline(MixFlow):
"""Compare multiple models on a test dataset."""
def run(
self,
models: list[Model],
test_dataset: Dataset,
input_column: str,
label_column: str,
) -> dict:
"""Compare multiple models.
Args:
models: List of models to compare
test_dataset: Dataset for evaluation
input_column: Input column name
label_column: Ground truth column
"""
df = test_dataset.to_pandas()
model_names = [m.name for m in models]
inputs_list = [{input_column: row[input_column]} for _, row in df.iterrows()]
labels = df[label_column].tolist()
# Run batch comparison across all models
batch_result = Model.batch(
models=model_names,
inputs=inputs_list,
max_in_flight=50,
)
# Calculate accuracy for each model
results = {}
for model_name in model_names:
predictions = [r.outputs.get("label") for r in batch_result[model_name]]
correct = sum(1 for p, l in zip(predictions, labels) if p == l)
results[model_name] = {
"accuracy": correct / len(labels),
"total": len(labels),
"correct": correct
}
# Find best model
best_model = max(results, key=lambda k: results[k]["accuracy"])
return {
"results": JSON(data=results),
"best_model": best_model,
"best_accuracy": results[best_model]["accuracy"],
"report": Markdown(content=self._generate_report(results))
}
def _generate_report(self, results: dict) -> str:
lines = ["# Model Comparison Results\n"]
for model, metrics in sorted(results.items(), key=lambda x: -x[1]["accuracy"]):
lines.append(f"- **{model}**: {metrics['accuracy']:.2%} ({metrics['correct']}/{metrics['total']})")
return "\n".join(lines)Deploy Workflow via CLI
mixtrain workflow create ./my-pipeline/ --name my-pipeline # From directory
mixtrain workflow create workflow.py --name my-pipeline # Single file
mixtrain workflow run my-pipeline --input-dataset training-data --epochs 10 # Spec inputs as flagsWriting Routines
Use MixRoutine when the user asks for work to run automatically from dataset appends, schedules, or completion of another workflow.
from mixtrain import MixRoutine, on_dataset_append
class EmbedNewRows(MixRoutine):
"""Embed rows appended to the photos dataset."""
def run(self, rows=on_dataset_append("photos", batch_rows=100), batch_size: int = 256):
for batch in rows:
process(batch, batch_size=batch_size)Agent rules for routines:
- Exactly one
run()parameter must default to anon_*trigger constructor. - All other
run()parameters must have defaults. - Use
on_dataset_append("name"),on_schedule("0 3 * * *"),on_schedule(every="1h"),on_workflow_success("name"), oron_complete("name", kind="workflow"). tzis optional for schedules and defaults toUTC.everyacceptss,m,h,d, andwduration parts such as30m,1h, or1h30m.- Create and run routines with
mixtrain routine, notmixtrain workflow.
mixtrain routine create ./routine-dir/ --name embed-new-rows
mixtrain routine list
mixtrain routine get embed-new-rows
mixtrain routine run embed-new-rows --detachWorking with Datasets
SDK Methods
from mixtrain import Dataset, Image, Text, list_datasets
# Get existing dataset reference
dataset = Dataset("my-dataset")
# Create from file
dataset = Dataset.from_file("data.csv").save("new-dataset")
dataset = Dataset.from_file("data.parquet").save("new-dataset")
# Create from pandas DataFrame
dataset = Dataset.from_pandas(df).save("new-dataset")
# Create with column types for rich rendering
dataset = Dataset.from_pandas(df).save(
"new-dataset",
description="My dataset",
column_types={"image_url": Image, "caption": Text}
)
# Stream rows without loading the whole dataset
for row in dataset.rows():
process(row)
# Stream columnar batches (dicts by default, Arrow when requested)
for batch in dataset.batches(1024):
process_batch(batch)
for batch in dataset.batches(1024, format="arrow"):
process_arrow_batch(batch)
# Materialize only when intended
table = dataset.to_arrow() # PyArrow Table
df = dataset.to_pandas() # pandas DataFrame
# Limit large materializations
table = dataset.collect(max_bytes=512 * 1024 * 1024)
# File-backed columns: fetch referenced files as bytes or local paths
loaded = dataset.load_files({"image_url": "bytes"})
for row in loaded.head(10):
image_bytes = row["image_url"]
# Cache table data and referenced files locally for repeated training passes
cached = dataset.cache()
# Get metadata
print(dataset.metadata) # row count, description, etc.
print(dataset.column_types) # {"col": "image", "text_col": "text"}
print(dataset.detailed_metadata) # Full schema info
# Update column types (for rich rendering in UI)
dataset.set_column_types({"url_column": Image, "text_column": Text})
# Update metadata
dataset.update(
description="My training dataset",
column_types={"image_url": Image}
)
# List all datasets
datasets = list_datasets()
# Delete dataset
dataset.delete()Notes:
- Prefer
rows(),batches(),sql(), andto_torch()for large datasets. - Use
to_pandas(),to_arrow(), orcollect()only when materialization is intentional. column_types={...}sets exactly the named columns. To pin some types and infer the rest, useDataset.from_file(...).with_column_types({...}).save(...).
Using Datasets in Workflows
from mixtrain import MixFlow, Dataset
class DataAnalysis(MixFlow):
def run(self, input_dataset: Dataset):
"""Analyze a dataset.
Args:
input_dataset: Dataset to analyze
"""
df = input_dataset.to_pandas()
return {
"rows": len(df),
"columns": list(df.columns),
"summary": df.describe().to_dict()
}CLI Commands
mixtrain dataset list
mixtrain dataset create my-data data.csv
mixtrain dataset query my-data
mixtrain dataset metadata my-data
mixtrain dataset delete my-dataWorking with Evaluations
Evaluations provide side-by-side comparison views for datasets. They auto-generate visualization config from column types.
SDK Methods
from mixtrain import Eval, Dataset, Image, Text
# Get existing evaluation (lazy reference)
eval = Eval("my-eval")
# Create evaluation from dataset (recommended)
# Auto-generates viz_config from column types
ds = Dataset.from_pandas(df).save(
"results",
column_types={"prompt": Text, "model_a": Image, "model_b": Image}
)
eval = Eval.from_dataset(ds) # Creates "results-eval"
eval = Eval.from_dataset(ds, name="my-eval") # Custom name
eval = Eval.from_dataset(ds, columns=["prompt", "model_a"]) # Specific columns
# Create evaluation linked to existing dataset
eval = Eval.create("my-eval", dataset="results")
# Check if evaluation exists
if not Eval.exists("my-eval"):
Eval.from_dataset(ds, name="my-eval")
# Access properties
print(eval.dataset)
print(eval.description)
# List all evaluations
evals = Eval.list()
# Delete
eval.delete()Evaluation Workflow Pattern
from mixtrain import MixFlow, Dataset, Eval, Model, Image, Text, generate_name
class ModelComparisonEval(MixFlow):
"""Compare models and create evaluation view."""
def run(
self,
input_dataset: Dataset,
models: list[Model],
prompt_column: str = "prompt",
):
model_names = [m.name for m in models]
output_name = generate_name("comparison", "results")
eval_name = generate_name("comparison", "eval")
# Run models and collect results
prompts = input_dataset.to_pandas()[prompt_column].tolist()
inputs_list = [{"prompt": p} for p in prompts]
result = Model.batch(model_names, inputs_list, filter_failures=True)
df = result.to_pandas()
# Save with column types
output_dataset = Dataset.from_pandas(df).save(
output_name,
column_types={
prompt_column: Text,
**dict.fromkeys(model_names, Image),
},
)
# Create evaluation (auto-generates viz from column types)
Eval.from_dataset(output_dataset, name=eval_name)
return [Eval(eval_name), Dataset(output_name)]Types
Rich types for inputs and outputs that render well in the UI.
from mixtrain import Image, Video, Audio, Markdown, JSON, Text, Embedding, Tensor
# Workspace file paths
Image("outputs/generated/cat.png", width=512, height=512)
Video("runs/demo/preview.mp4", duration_seconds=30)
Audio("clips/intro.wav", duration_seconds=60)
# External URLs
Image("https://...", width=512, height=512)
Video("https://...", duration_seconds=30)
Audio("https://...", duration_seconds=60)
# Content types
Text(content="Plain text")
Markdown(content="# Heading\n\n- Item 1")
JSON(data={"key": "value"})
# Array data
Embedding(values=[0.1, 0.2, 0.3]) # 1-D vector
Tensor([[0.1, 0.2], [0.3, 0.4]], dim_names=["time", "joints"]) # N-dimensional arrayReturning Results
Return media types, resource links, and structured data from workflows and models:
from mixtrain import MixFlow, Dataset, Model, Eval, Image, Video, Markdown
class MyWorkflow(MixFlow):
def run(self) -> dict:
ds = Dataset.from_file("/tmp/data.csv").save("output-data")
return {
# Media types (render inline)
"image": Image("outputs/generated/cat.png", width=512, height=512),
"video": Video("runs/demo/preview.mp4", duration_seconds=30),
# Resource links (clickable in UI)
"dataset": ds,
"model": Model("my-model"),
"evaluation": Eval("my-eval"),
# Formatted content
"report": Markdown(content="# Results\n\n- Success"),
}Running Models Programmatically
from mixtrain import get_model, list_models, Model
# Get model (lazy reference)
model = get_model("my-model")
# Synchronous run (blocks until complete)
# Can pass inputs as dict or kwargs, and optional sandbox config
result = model.run(text="Hello", timeout=300)
result = model.run(inputs={"text": "Hello"}, sandbox={"gpu": "A100", "timeout": 3600})
print(result.outputs) # Raw outputs dict
print(result.status) # RunStatus.COMPLETED
print(result.video) # Typed accessor for video output (if present)
print(result.image) # Typed accessor for image output (if present)
# Async run (returns immediately)
run_info = model.submit(text="Hello")
run_info = model.submit(inputs={"text": "Hello"}, sandbox={"gpu": "A100"})
print(run_info["run_number"]) # Check status later
run = model.get_run(run_info["run_number"])
# Batch inference across multiple models
batch_result = Model.batch(
models=["model-a", "model-b", "model-c"],
inputs=[{"text": "test1"}, {"text": "test2"}],
max_in_flight=50, # Control concurrency
timeout=600,
filter_failures=True, # Only keep results where ALL models succeeded
)
# Access batch results
df = batch_result.to_pandas() # Convert to DataFrame
for model_name in ["model-a", "model-b"]:
for result in batch_result[model_name]:
print(result.outputs)
# Access run history
runs = model.runs # Cached list of runs
specific_run = model.get_run(run_number=5)
logs = model.get_logs(run_number=5)
# Model metadata
print(model.metadata)
print(model.spec)
print(model.source) # "native", "fal", "openai", etc.
# List all models
models = list_models()
models = list_models(provider="native") # Filter by providerRunning Workflows Programmatically
from mixtrain import Workflow
# Get workflow (lazy reference)
workflow = Workflow("my-pipeline")
# Synchronous run (blocks until complete)
# Can pass inputs as dict or kwargs, and optional sandbox config
result = workflow.run(epochs=10, learning_rate=0.001)
result = workflow.run(inputs={"epochs": 10}, sandbox={"gpu": "A100", "timeout": 3600})
print(result["outputs"])
print(result["status"])
# Async run (returns immediately)
run_info = workflow.submit(epochs=10, learning_rate=0.001)
run_info = workflow.submit(inputs={"epochs": 10}, sandbox={"gpu": "A100"})
print(run_info["run_number"])
# Check run status
run = workflow.get_run(run_info["run_number"])
# Access run history
runs = workflow.runs
# List all workflows
workflows = Workflow.list()Helper Functions
from mixtrain import (
generate_name,
validate_resource_name,
)
# Generate unique names with timestamps
name = generate_name("t2i", "eval") # "t2i_eval_jan_17_2026"
name = generate_name("image-caption", "results") # "image_caption_results_jan_17_2026"
# Validate resource names (raises ValueError if invalid)
validate_resource_name("my-model", "model") # OK
validate_resource_name("My Model", "model") # Raises ValueErrorCLI Reference
Run mixtrain --help and subcommand help before using command options. CLI flags may change between releases.
Authentication
mixtrain login # Browser-based OAuth login
mixtrain config # Show current configModels
mixtrain model list
mixtrain model create ./model-dir/ --name my-model # From directory
mixtrain model create model.py utils.py --name my-model # Multiple files
mixtrain model create "*.py" --name my-model # Glob pattern
mixtrain model run NAME --prompt "A sunset" # Spec inputs as flags (run --help to list them)
mixtrain model run NAME -i input.json --detach # All inputs as one JSON blob; start and exit
mixtrain model run NAME -i '{"prompt": "test"}' --gpu A100 --timeout 3600 # With sandbox
mixtrain model logs NAME RUN_NUMBER
mixtrain model edit NAME
mixtrain model delete NAME
mixtrain model cancel NAME RUN_NUMBER
mixtrain model get NAME # Get model details
mixtrain model runs NAME # List runs
mixtrain model catalog # Browse external models
mixtrain model download NAME --dir ./model-dir
mixtrain model upload NAME --dir ./model-dirWorkflows
mixtrain workflow list
mixtrain workflow create ./workflow-dir/ --name my-workflow # From directory
mixtrain workflow create workflow.py utils.py --name my-workflow # Multiple files
mixtrain workflow create "*.py" --name my-workflow # Glob pattern
mixtrain workflow run NAME --epochs 10 # Spec inputs as flags (run --help to list them)
mixtrain workflow run NAME -i input.json --detach # All inputs as one JSON blob
mixtrain workflow run NAME -i '{"epochs": 10}' --gpu A100 --timeout 3600 # With sandbox
mixtrain workflow logs NAME RUN_NUMBER
mixtrain workflow edit NAME
mixtrain workflow delete NAME
mixtrain workflow cancel NAME RUN_NUMBER
mixtrain workflow get NAME # Get workflow details
mixtrain workflow runs NAME # List runs
mixtrain workflow download NAME --dir ./workflow-dir
mixtrain workflow upload NAME --dir ./workflow-dirRoutines
mixtrain routine list
mixtrain routine create ./routine-dir/ --name my-routine
mixtrain routine run NAME --detach
mixtrain routine logs NAME RUN_NUMBER
mixtrain routine get NAME # Get routine details and trigger info
mixtrain routine runs NAME # List runs
mixtrain routine edit NAME
mixtrain routine delete NAMEDatasets
mixtrain dataset list
mixtrain dataset create NAME file.csv
mixtrain dataset query NAME
mixtrain dataset metadata NAME
mixtrain dataset delete NAMESecrets
mixtrain secret list
mixtrain secret set NAME # Create or update secret
mixtrain secret get NAME # View secret details
mixtrain secret delete NAMEEnvironment Variables
MIXTRAIN_API_KEY=mix-... # API key for authenticationCommon Patterns
Access secrets in models/workflows
from mixtrain import MixModel
class MyModel(MixModel):
def setup(self):
# MixClient is available as self.mix
api_key = self.mix.get_secret("OPENAI_API_KEY")
self.openai = OpenAI(api_key=api_key)Chain models in a workflow
from mixtrain import MixFlow, Model
class ChainedPipeline(MixFlow):
def run(
self,
document: str,
extractor: Model,
summarizer: Model,
translator: Model,
):
"""Process a document through multiple models.
Args:
document: Document to process
extractor: Model for text extraction
summarizer: Model for summarization
translator: Model for translation
"""
# Step 1: Extract
extracted = extractor.run(inputs={"document": document})
# Step 2: Summarize
summary = summarizer.run(inputs={"text": extracted.outputs["text"]})
# Step 3: Translate
translated = translator.run(inputs={
"text": summary.outputs["summary"],
"target_lang": "es"
})
return {"result": translated.outputs["translation"]}Error handling
from mixtrain import Model
model = Model("my-model")
try:
result = model.run(inputs={"text": "Hello"}, timeout=300)
if result.status == "completed":
print(result.outputs)
else:
print(f"Run failed: {result.error}")
except TimeoutError:
print("Model inference timed out")
except Exception as e:
print(f"Error: {e}")Workflow dependencies
Add a pyproject.toml, requirements.txt, or Dockerfile in your workflow folder:
# pyproject.toml
[project]
name = "my-workflow"
version = "0.1.0"
dependencies = [
"pandas",
"scikit-learn",
"numpy",
]Or use requirements.txt:
pandas
scikit-learn
numpyOr use Dockerfile for full environment control:
FROM python:3.11-slim
RUN pip install pandas scikit-learn numpyThe platform automatically installs dependencies before running your workflow.
Using batch inference with progress
from mixtrain import Model
def progress_callback(completed, total):
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
result = Model.batch(
models=["flux", "stable-diffusion"],
inputs=[{"prompt": f"Image {i}"} for i in range(100)],
max_in_flight=20,
progress_callback=progress_callback,
filter_failures=True, # Skip inputs where any model failed
)
# Convert to DataFrame for analysis
df = result.to_pandas()
print(df.head())