Integrations

An LLM-optimized bundle of this entire section is available at section.md. This single file contains all pages in this section, optimized for AI coding agent context.

Flyte 2 is designed to be extensible by default. While the core platform covers the most common orchestration needs, many production workloads require specialized infrastructure, external services or execution semantics that go beyond the core runtime.

Flyte 2 exposes these capabilities through integrations.

Under the hood, integrations are implemented using Flyte 2’s plugin system, which provides a consistent way to extend the platform without modifying core execution logic.

An integration allows you to declaratively enable new capabilities such as distributed compute frameworks or third-party services without manually managing infrastructure. You specify what you need, and Flyte takes care of how it is provisioned, used and cleaned up.

This page covers:

  • The types of integrations Flyte 2 supports today
  • How integrations fit into Flyte 2’s execution model
  • How to use integrations in your tasks
  • The integrations available out of the box

If you need functionality that doesn’t exist yet, Flyte 2’s plugin system is intentionally open-ended. You can build and register your own integrations using the same architecture described here.

Integration categories

Flyte 2 integrations fall into the following categories:

  1. Distributed compute: Provision transient compute clusters to run tasks across multiple nodes, with automatic lifecycle management.
  2. Agentic AI: Support for various common aspects of agentic AI applications.
  3. Configuration: Compose and pass hierarchical configuration objects between tasks, with type-safe schemas and CLI/YAML composition.
  4. Experiment tracking: Integrate with experiment tracking platforms for logging metrics, parameters, and artifacts.
  5. Data validation: Enforce schema contracts on dataframes flowing between tasks, with automatic validation reports.
  6. Connectors: Stateless, long-running services that receive execution requests via gRPC and then submit work to external (or internal) systems.
  7. LLM Serving: Deploy and serve large language models with an OpenAI-compatible API.
  8. Notebook execution: Run parameterized Jupyter notebooks as typed Flyte tasks with cell-level reports.

Distributed compute

Distributed compute integrations allow tasks to run on dynamically provisioned clusters. These clusters are created just-in-time, scoped to the task execution and torn down automatically when the task completes.

This enables large-scale parallelism without requiring users to operate or maintain long-running infrastructure.

Supported distributed compute integrations

Plugin Description Common use cases
Ray Provisions Ray clusters via KubeRay Distributed Python, ML training, hyperparameter tuning
Spark Provisions Spark clusters via Spark Operator Large-scale data processing, ETL pipelines
Dask Provisions Dask clusters via Dask Operator Parallel Python workloads, dataframe operations
PyTorch Distributed PyTorch training with elastic launch Single-node and multi-node training

Each plugin encapsulates:

  • Cluster provisioning
  • Resource configuration
  • Networking and service discovery
  • Lifecycle management and teardown

From the task author’s perspective, these details are abstracted away.

How the plugin system works

At a high level, Flyte 2’s distributed compute plugin architecture follows a simple and consistent pattern.

1. Registration

Each plugin registers itself with Flyte 2’s core plugin registry:

  • TaskPluginRegistry: The central registry for all distributed compute plugins
  • Each plugin declares:
    • Its configuration schema
    • How that configuration maps to execution behavior

This registration step makes the plugin discoverable by the runtime.

2. Task environments and plugin configuration

Integrations are activated through a TaskEnvironment.

A TaskEnvironment bundles:

  • A container image
  • Execution settings
  • A plugin configuration object enabled with plugin_config

The plugin configuration describes what infrastructure or integration the task requires.

3. Automatic provisioning and execution

When a task associated with a TaskEnvironment runs:

  1. Flyte inspects the environment’s plugin configuration
  2. The plugin provisions the required infrastructure or integration
  3. The task executes with access to that capability
  4. Flyte cleans up all transient resources after completion

Example: Using the Dask plugin

Below is a complete example showing how a task gains access to a Dask cluster simply by running inside an environment configured with the Dask plugin.

from flyteplugins.dask import Dask, WorkerGroup
import flyte

# Define the Dask cluster configuration
dask_config = Dask(
    workers=WorkerGroup(number_of_workers=4)
)

# Create a task environment that enables Dask
env = flyte.TaskEnvironment(
    name="dask_env",
    plugin_config=dask_config,
    image=image,
)

# Any task in this environment has access to the Dask cluster
@env.task
async def process_data(data: list) -> list:
    from distributed import Client

    client = Client()  # Automatically connects to the provisioned cluster
    futures = client.map(transform, data)
    return client.gather(futures)

When process_data executes, Flyte performs the following steps:

  1. Provisions a Dask cluster with 4 workers
  2. Executes the task with network access to the cluster
  3. Tears down the cluster once the task completes

No cluster management logic appears in the task code. The task only expresses intent.

Key design principle

All distributed compute integrations follow the same mental model:

  • You declare the required capability via configuration
  • You attach that configuration to a task environment
  • Tasks decorated with that environment automatically gain access to the capability

This makes it easy to swap execution backends or introduce distributed compute incrementally without rewriting workflows.

Agentic AI

Agentic AI integrations provide drop-in replacements for LLM provider SDKs. They let you use Flyte tasks as agent tools so that tool calls run with full Flyte observability, retries, and caching.

Supported agentic AI integrations

Plugin Description Common use cases
OpenAI Drop-in replacement for OpenAI Agents SDK function_tool Agentic workflows with OpenAI models
Anthropic Agent loop and function_tool for the Anthropic Claude SDK Agentic workflows with Claude
Gemini Agent loop and function_tool for the Google Gemini SDK Agentic workflows with Gemini
Code generation LLM-driven code generation with automatic testing in sandboxes Data processing, ETL, analysis pipelines

Experiment tracking

Experiment tracking integrations let you log metrics, parameters, and artifacts to external tracking platforms during Flyte task execution.

Supported experiment tracking integrations

Plugin Description Common use cases
MLflow MLflow experiment tracking Experiment tracking, autologging, model registry
Weights and Biases Weights & Biases integration Experiment tracking and hyperparameter tuning

Configuration

Configuration integrations let you compose and pass hierarchical configuration objects between Flyte tasks, with type-safe schemas and CLI/YAML composition.

Supported configuration integrations

Plugin Description Common use cases
OmegaConf DictConfig / ListConfig as native task input and output types Passing composed configs between tasks, structured configs, YAML-driven pipelines
Hydra Hydra config composition and sweep submission for Flyte tasks YAML-driven experiment composition, grid and Bayesian sweeps, hardware presets

Data validation

Data validation integrations enforce schema contracts on the dataframes flowing between tasks. They validate data at task boundaries, catch type and constraint violations early, and produce HTML reports visible in the Flyte UI.

Supported data validation integrations

Plugin Description Common use cases
Pandera Validates dataframes with pandera DataFrameModel schemas Schema enforcement, data quality checks, validation reports

Connectors

Connectors are stateless, long‑running services that receive execution requests via gRPC and then submit work to external (or internal) systems. Each connector runs as its own Kubernetes deployment, and is triggered when a Flyte task of the matching type is executed.

Although they normally run inside the data plane, you can also run connectors locally as long as the required secrets/credentials are present locally. This is useful because connectors are just Python services that can be spawned in‑process.

Connectors are designed to scale horizontally and reduce load on the core Flyte backend because they execute outside the core system. This decoupling makes connectors efficient, resilient, and easy to iterate on. You can even test them locally without modifying backend configuration, which reduces friction during development.

Supported connectors

Connector Description Common use cases
Snowflake Run SQL queries on Snowflake asynchronously Data warehousing, ETL, analytics queries
BigQuery Run SQL queries on Google BigQuery Data warehousing, ETL, analytics queries
Databricks Run PySpark jobs on Databricks clusters Large-scale data processing, Spark ETL

Creating a new connector

If none of the existing connectors meet your needs, you can build your own.

Connectors communicate via Protobuf, so in theory they can be implemented in any language. Today, only Python connectors are supported.

Async connector interface

To implement a new async connector, extend AsyncConnector and implement the following methods, all of which must be idempotent:

Method Purpose
create Launch the external job (via REST, gRPC, SDK, or other API)
get Fetch current job state (return job status or output)
delete Delete / cancel the external job
get_logs Stream paginated log lines to the Flyte UI

To test the connector locally, the connector task should inherit from AsyncConnectorExecutorMixin. This mixin simulates how the Flyte 2 system executes asynchronous connector tasks, making it easier to validate your connector implementation before deploying it.

Example: Batch job connector

The following example implements a connector that simulates submitting and polling an external batch job. Replace the mock logic with real API calls for your use case.

Connector (my_connector/connector.py):

connector.py
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional

from flyteidl2.connector.connector_pb2 import (
    GetTaskLogsResponse,
    GetTaskLogsResponseBody,
    GetTaskLogsResponseHeader,
)
from flyteidl2.core.execution_pb2 import TaskExecution
from flyteidl2.logs.dataplane.payload_pb2 import LogLine, LogLineOriginator
from google.protobuf.timestamp_pb2 import Timestamp

from flyte import logger
from flyte.connectors import AsyncConnector, ConnectorRegistry, Resource, ResourceMeta


@dataclass
class BatchJobMetadata(ResourceMeta):
    job_id: str
    created_at: float


class BatchJobConnector(AsyncConnector):
    name = "Batch Job Connector"
    task_type_name = "batch_job"
    metadata_type = BatchJobMetadata

    async def create(self, task_template, inputs: Optional[Dict[str, Any]] = None, **kwargs) -> BatchJobMetadata:
        job_id = str(uuid.uuid4())[:8]
        logger.info(f"Submitted batch job {job_id}")
        return BatchJobMetadata(job_id=job_id, created_at=time.time())

    async def get(self, resource_meta: BatchJobMetadata, **kwargs) -> Resource:
        elapsed = time.time() - resource_meta.created_at
        if elapsed < 5:
            return Resource(phase=TaskExecution.RUNNING, message="Job in progress")
        return Resource(
            phase=TaskExecution.SUCCEEDED,
            message="Job completed",
            outputs={"result": f"output-from-{resource_meta.job_id}"},
        )

    async def delete(self, resource_meta: BatchJobMetadata, **kwargs):
        logger.info(f"Cancelled job {resource_meta.job_id}")

    async def get_logs(self, resource_meta: BatchJobMetadata, token: str = "", **kwargs):
        def line(message: str, ts: float) -> LogLine:
            t = Timestamp()
            t.FromSeconds(int(ts))
            return LogLine(timestamp=t, message=message, originator=LogLineOriginator.USER)

        start = resource_meta.created_at
        job_id = resource_meta.job_id
        pages = {
            "": GetTaskLogsResponseBody(lines=[
                line(f"[INFO] Job {job_id} submitted", start),
                line(f"[INFO] Job {job_id} started", start + 1),
            ]),
            "page-2": GetTaskLogsResponseBody(lines=[
                line(f"[INFO] Job {job_id} finished", start + 5),
            ]),
        }
        next_tokens = {"": "page-2", "page-2": ""}
        yield GetTaskLogsResponse(body=pages.get(token, GetTaskLogsResponseBody(lines=[])))
        next_token = next_tokens.get(token, "")
        if next_token:
            yield GetTaskLogsResponse(header=GetTaskLogsResponseHeader(token=next_token))


ConnectorRegistry.register(BatchJobConnector())

Task plugin (my_connector/task.py):

task.py
from dataclasses import dataclass
from typing import Any, Dict, Optional, Type

from flyte.connectors import AsyncConnectorExecutorMixin
from flyte.extend import TaskTemplate
from flyte.models import NativeInterface, SerializationContext


@dataclass
class BatchJobConfig:
    timeout_seconds: int = 300


class BatchJobTask(AsyncConnectorExecutorMixin, TaskTemplate):
    _TASK_TYPE = "batch_job"

    def __init__(self, name: str, plugin_config: BatchJobConfig,
                 inputs: Optional[Dict[str, Type]] = None,
                 outputs: Optional[Dict[str, Type]] = None, **kwargs):
        super().__init__(
            name=name,
            interface=NativeInterface(
                {k: (v, None) for k, v in inputs.items()} if inputs else {},
                outputs or {},
            ),
            task_type=self._TASK_TYPE,
            image=None,
            **kwargs,
        )
        self.plugin_config = plugin_config

    def custom_config(self, sctx: SerializationContext) -> Optional[Dict[str, Any]]:
        return {"timeout_seconds": self.plugin_config.timeout_seconds}

Usage:

import flyte
from my_connector.task import BatchJobConfig, BatchJobTask

batch_job = BatchJobTask(
    name="my_batch_job",
    plugin_config=BatchJobConfig(timeout_seconds=60),
    inputs={"name": str},
    outputs={"result": str},
)

flyte.TaskEnvironment.from_task("batch-job-env", batch_job)

Connector-level secrets

If your connector needs credentials (API keys, tokens) shared across all tasks, pass them as environment variables into the connector process.

Add secrets to ConnectorEnvironment:

connector = flyte.app.ConnectorEnvironment(
    name="batch-job-connector",
    image=image,
    include=["my_connector"],
    secrets=[flyte.Secret(key="MY_API_KEY")],
)

Inside the connector, read the secret from the environment:

import os

api_key = os.environ["MY_API_KEY"]

See Secrets for how to store and manage secrets.

Deploy a custom connector

Deploy your connector as a long-running service using flyte.app.ConnectorEnvironment. Union handles building the image, pushing it, and keeping the service running — no manual Kubernetes configuration required.

See the Connector app guide (user-guide/build-apps/connector-app) for a complete walkthrough.

LLM Serving

LLM serving integrations let you deploy and serve large language models as Flyte apps with an OpenAI-compatible API. They handle model loading, GPU management, and autoscaling.

Supported LLM serving integrations

Plugin Description Common use cases
SGLang Deploy models with SGLang’s high-throughput runtime LLM inference, model serving
vLLM Deploy models with vLLM’s PagedAttention engine LLM inference, model serving

For full setup instructions including multi-GPU deployment, model prefetching, and autoscaling, see the SGLang app and vLLM app pages.

Notebook execution

Notebook execution integrations let you run Jupyter notebooks as first-class Flyte tasks with typed inputs and outputs, HTML reports surfaced in the Flyte UI, and the ability to call other Flyte tasks from within the notebook.

Supported notebook execution integrations

Plugin Description Common use cases
Papermill Parameterize and execute .ipynb files via papermill Productionizing exploratory notebooks, cell-by-cell HTML reports, notebook-driven analysis pipelines