Skip to main content
Open WebUI integration with Openlayer Open WebUI is a self-hosted WebUI that operates offline and supports various LLM runners, including Ollama and OpenAI-compatible APIs. Openlayer integrates with Open WebUI through its Pipelines framework, enabling you to:
  • Capture application traces with full execution hierarchies
  • Track usage patterns and user interactions
  • Monitor LLM performance, latency, and token usage
  • Run automated quality evaluations on your data

Prerequisites

Before you begin, ensure you have:
  1. Open WebUI running - Follow the Open WebUI documentation to set up your instance
  2. Docker - Required to run the Pipelines container
  3. Openlayer account - Sign up at app.openlayer.com
  4. Openlayer API key - Find your API key in the Openlayer dashboard
  5. Data source - Create a data source in your Openlayer project to receive traces

Setup guide

This guide walks you through setting up the Openlayer filter pipeline, which intercepts requests and responses to capture trace data. This is the recommended approach for most users.

Step 1: Start the Pipelines service

Run the Pipelines container using Docker:
docker run -p 9099:9099 --add-host=host.docker.internal:host-gateway \
  -v pipelines:/app/pipelines --name pipelines --restart always \
  ghcr.io/open-webui/pipelines:main
This command exposes the Pipelines service on port 9099 and ensures it restarts automatically.

Step 2: Connect Open WebUI to Pipelines

In the Open WebUI interface, navigate to Admin Panel → Settings → Connections:
  1. Click the + button to add a new connection
  2. Select OpenAI API as the connection type
  3. Configure the connection:
    • API URL: http://localhost:9099/
    • API Key: 0p3n-w3bu! (default Pipelines password)
  4. Save the connection - you should see a Pipelines icon appear when hovering over the API Base URL field
If Open WebUI runs in a Docker container, use http://host.docker.internal:9099/ as the API URL.

Step 3: Install the Openlayer filter pipeline

In the Open WebUI interface, navigate to Admin Panel → Settings → Pipelines:
  1. Click Add a new pipeline
  2. Copy and paste the Openlayer filter pipeline code below into the editor
  3. Click Save to install
"""
title: Openlayer Filter Pipeline
author: Openlayer
date: 2025-01-17
version: 1.0.0
license: MIT
description: A filter pipeline that uses Openlayer for LLM observability and monitoring.
requirements: openlayer>=0.12.1
"""

import logging
import os
import time
import uuid
from typing import Any, Dict, List, Optional

from pydantic import BaseModel

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("openlayer-filter")


def get_last_assistant_message(messages: List[dict]) -> str:
    """Retrieve the last assistant message content from the message list."""
    for message in reversed(messages):
        if message["role"] == "assistant":
            content = message.get("content", "")
            if isinstance(content, str):
                return content
            elif isinstance(content, list):
                text_parts = [
                    part.get("text", "")
                    for part in content
                    if isinstance(part, dict) and part.get("type") == "text"
                ]
                return "".join(text_parts)
    return ""


def get_last_assistant_message_obj(messages: List[dict]) -> dict:
    """Retrieve the last assistant message object from the message list."""
    for message in reversed(messages):
        if message["role"] == "assistant":
            return message
    return {}


class Pipeline:
    """Openlayer Filter Pipeline for LLM observability."""

    class Valves(BaseModel):
        """Configuration parameters for the Openlayer filter pipeline."""
        pipelines: List[str] = []
        priority: int = 0
        api_key: str = ""
        base_url: str = "https://api.openlayer.com/v1"
        inference_pipeline_id: str = ""
        insert_tags: bool = True
        skip_internal_tasks: bool = False
        debug: bool = False

    def __init__(self):
        """Initialize the Openlayer filter pipeline."""
        self.type = "filter"
        self.name = "Openlayer Filter"

        self.valves = self.Valves(
            **{
                "pipelines": ["*"],
                "api_key": os.getenv("OPENLAYER_API_KEY", ""),
                "base_url": os.getenv("OPENLAYER_BASE_URL", "https://api.openlayer.com/v1"),
                "inference_pipeline_id": os.getenv("OPENLAYER_INFERENCE_PIPELINE_ID", ""),
                "skip_internal_tasks": os.getenv("SKIP_INTERNAL_TASKS", "false").lower() == "true",
                "debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
            }
        )

        self.tracer_configured = False
        self.chat_traces: Dict[str, Dict[str, Any]] = {}

        logger.info("Openlayer Filter Pipeline initialized")

    def log_debug(self, message: str, *args):
        """Log debug messages if debugging is enabled."""
        if self.valves.debug:
            logger.info(message, *args)

    async def on_startup(self):
        """Lifecycle hook called when the pipeline starts."""
        self._configure_tracer()

    async def on_shutdown(self):
        """Lifecycle hook called when the pipeline shuts down."""
        self.chat_traces.clear()

    async def on_valves_updated(self):
        """Lifecycle hook called when configuration is updated."""
        self._configure_tracer()

    def _configure_tracer(self):
        """Configure the Openlayer tracer with current settings."""
        try:
            from openlayer.lib.tracing import tracer

            if not self.valves.api_key:
                logger.error("OPENLAYER_API_KEY not configured - tracing disabled")
                self.tracer_configured = False
                return

            if not self.valves.inference_pipeline_id:
                logger.error("OPENLAYER_INFERENCE_PIPELINE_ID not configured - tracing disabled")
                self.tracer_configured = False
                return

            tracer.configure(
                api_key=self.valves.api_key,
                inference_pipeline_id=self.valves.inference_pipeline_id,
                base_url=self.valves.base_url if self.valves.base_url else None,
            )

            self.tracer_configured = True
            self.log_debug("Openlayer tracer configured successfully")

        except ImportError as e:
            logger.error("Failed to import Openlayer SDK: %s", e)
            self.tracer_configured = False
        except Exception as e:
            logger.error("Failed to configure Openlayer tracer: %s", e)
            self.tracer_configured = False

    def _build_tags(self, task_name: str) -> List[str]:
        """Build a list of tags based on valve settings."""
        tags = []
        if self.valves.insert_tags:
            tags.append("open-webui")
            if task_name and task_name not in ["user_response", "llm_response"]:
                tags.append(task_name)
        return tags

    def _extract_messages(self, messages: List[dict]) -> List[dict]:
        """Extract and clean messages for logging to Openlayer."""
        cleaned = []
        for msg in messages:
            content = msg.get("content", "")
            if isinstance(content, list):
                text_parts = []
                for part in content:
                    if isinstance(part, dict):
                        if part.get("type") == "text":
                            text_parts.append(part.get("text", ""))
                        elif part.get("type") == "image_url":
                            text_parts.append("[Image]")
                content = " ".join(text_parts) if text_parts else ""
            cleaned.append({"role": msg.get("role", "user"), "content": content})
        return cleaned

    async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        """Handle incoming requests (user messages)."""
        request_id = str(uuid.uuid4())[:8]

        if not self.tracer_configured:
            return body

        metadata = body.get("metadata", {})
        task = metadata.get("task", "")

        internal_tasks = ["title_generation", "tags_generation", "query_generation", "follow_up_generation"]
        is_internal_task = task in internal_tasks

        if is_internal_task and self.valves.skip_internal_tasks:
            return body

        message_id = metadata.get("message_id", str(uuid.uuid4()))
        chat_id = metadata.get("chat_id", str(uuid.uuid4()))

        if chat_id == "local":
            session_id = metadata.get("session_id", str(uuid.uuid4()))
            chat_id = f"temporary-{session_id}"
            metadata["chat_id"] = chat_id
            body["metadata"] = metadata

        trace_key = f"{message_id}::{task}" if is_internal_task else message_id

        user_id = "anonymous"
        user_email = None
        user_name = None
        if user:
            user_email = user.get("email")
            user_name = user.get("name")
            user_id = user_email or user.get("id") or user_name or "anonymous"
            if not isinstance(user_id, str):
                user_id = str(user_id)

        model_id = body.get("model", "unknown")
        model_info = metadata.get("model", {})
        model_name = model_info.get("name", model_id) if isinstance(model_info, dict) else model_id

        provider = None
        if isinstance(model_info, dict):
            provider = model_info.get("owned_by")
            if not provider:
                if model_id.startswith("gpt-") or model_id.startswith("o1"):
                    provider = "openai"
                elif model_id.startswith("claude-"):
                    provider = "anthropic"
                elif model_id.startswith("gemini-"):
                    provider = "google"

        messages = body.get("messages", [])
        cleaned_messages = self._extract_messages(messages)

        files_metadata = []
        for file_entry in body.get("files", []):
            if isinstance(file_entry, dict):
                file_info = file_entry.get("file", file_entry)
                if isinstance(file_info, dict):
                    files_metadata.append({
                        "filename": file_info.get("filename") or file_info.get("meta", {}).get("name"),
                        "content_type": file_info.get("meta", {}).get("content_type"),
                        "size": file_info.get("meta", {}).get("size"),
                    })

        self.chat_traces[trace_key] = {
            "request_id": request_id,
            "message_id": message_id,
            "start_time": time.time(),
            "user_id": user_id,
            "user_email": user_email,
            "user_name": user_name,
            "session_id": chat_id,
            "model_id": model_id,
            "model_name": model_name,
            "provider": provider,
            "messages": cleaned_messages,
            "message_count": len(messages),
            "tags": self._build_tags(task or "user_response"),
            "task": task or "user_response",
            "is_internal_task": is_internal_task,
            "files": files_metadata,
        }

        return body

    async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
        """Handle outgoing responses (assistant messages)."""
        if not self.tracer_configured:
            return body

        message_id = body.get("id")
        if not message_id:
            return body

        trace_data = None
        trace_key = None

        if message_id in self.chat_traces:
            trace_key = message_id
            trace_data = self.chat_traces.pop(message_id)
        else:
            for key in list(self.chat_traces.keys()):
                if key.startswith(f"{message_id}::"):
                    trace_key = key
                    trace_data = self.chat_traces.pop(key)
                    break

        if not trace_data:
            return body

        messages = body.get("messages", [])
        assistant_message = get_last_assistant_message(messages)
        assistant_message_obj = get_last_assistant_message_obj(messages)

        prompt_tokens = None
        completion_tokens = None
        total_tokens = None

        if assistant_message_obj:
            usage_info = assistant_message_obj.get("usage", {})
            if isinstance(usage_info, dict):
                prompt_tokens = (
                    usage_info.get("prompt_tokens")
                    or usage_info.get("prompt_eval_count")
                    or usage_info.get("input_tokens")
                )
                completion_tokens = (
                    usage_info.get("completion_tokens")
                    or usage_info.get("eval_count")
                    or usage_info.get("output_tokens")
                )
                if prompt_tokens is not None and completion_tokens is not None:
                    total_tokens = int(prompt_tokens) + int(completion_tokens)

        latency_ms = (time.time() - trace_data["start_time"]) * 1000

        try:
            self._create_trace_with_steps(
                trace_data=trace_data,
                output=assistant_message,
                latency_ms=latency_ms,
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                total_tokens=total_tokens,
            )
        except Exception as e:
            logger.error("Failed to create trace: %s", e)

        return body

    def _create_trace_with_steps(
        self,
        trace_data: Dict[str, Any],
        output: str,
        latency_ms: float,
        prompt_tokens: Optional[int] = None,
        completion_tokens: Optional[int] = None,
        total_tokens: Optional[int] = None,
    ):
        """Create a trace with nested steps using openlayer.lib.tracing.tracer."""
        from openlayer.lib.tracing import tracer
        from openlayer.lib.tracing.enums import StepType
        from openlayer.lib import update_trace_user_session

        step_inputs = {"messages": trace_data["messages"], "model": trace_data["model_name"]}
        if trace_data.get("files"):
            step_inputs["files"] = trace_data["files"]

        step_metadata = {
            "request_id": trace_data.get("request_id"),
            "task": trace_data.get("task"),
            "tags": trace_data["tags"],
            "interface": "open-webui",
        }

        with tracer.create_step(
            name="open-webui-request",
            step_type=StepType.USER_CALL,
            inputs=step_inputs,
            metadata=step_metadata,
        ) as parent_step:
            parent_step.start_time = trace_data["start_time"]
            parent_step.end_time = time.time()
            parent_step.latency = latency_ms

            try:
                update_trace_user_session(
                    user_id=trace_data["user_id"],
                    session_id=trace_data["session_id"],
                )
            except Exception as e:
                logger.error("Failed to set user/session context: %s", e)

            provider = trace_data.get("provider") or "unknown"

            with tracer.create_step(
                name="LLM Chat Completion",
                step_type=StepType.CHAT_COMPLETION,
                inputs={"messages": trace_data["messages"], "model": trace_data["model_id"]},
                metadata={"model_id": trace_data["model_id"], "model_name": trace_data["model_name"]},
            ) as llm_step:
                llm_step.provider = provider
                llm_step.model = trace_data["model_id"]
                llm_step.start_time = trace_data["start_time"]
                llm_step.end_time = time.time()
                llm_step.latency = latency_ms

                if prompt_tokens is not None:
                    llm_step.prompt_tokens = prompt_tokens
                if completion_tokens is not None:
                    llm_step.completion_tokens = completion_tokens
                if total_tokens is not None:
                    llm_step.tokens = total_tokens

                llm_step.log(
                    output=output,
                    tokens=total_tokens,
                    metadata={
                        "prompt_tokens": prompt_tokens,
                        "completion_tokens": completion_tokens,
                    },
                )

            parent_step.log(
                output=output,
                metadata={
                    "user_id": trace_data["user_id"],
                    "session_id": trace_data["session_id"],
                    "message_count": trace_data.get("message_count", 0),
                },
            )
Security Notice: Pipelines execute arbitrary code. Only install pipelines from sources you trust.

Step 4: Configure the pipeline

After installing the pipeline, click on it in the Open WebUI Pipelines list to configure the Valves (settings):
SettingRequiredDescription
OPENLAYER_API_KEYYesYour Openlayer API key
OPENLAYER_INFERENCE_PIPELINE_IDYesThe data source ID from your Openlayer project
OPENLAYER_BASE_URLNoAPI base URL (default: https://api.openlayer.com/v1/). Set this for self-hosted Openlayer instances
To find your data source ID in Openlayer, navigate to your project and copy the ID from the data source settings.

Step 5: Enable token usage tracking (optional)

To capture token usage metrics, navigate to model settings in Open WebUI and enable the “Usage” capability. This ensures token counts are included in your traces.

View traces in Openlayer

Once configured, interact with your Open WebUI chat and traces will appear in Openlayer in real-time:
  1. Open your project in the Openlayer dashboard
  2. Navigate to your data source
  3. View traces including:
    • User prompts and LLM responses
    • Latency and performance metrics
    • Token usage and cost estimates
    • Full conversation context

Advanced: Manifold pipelines

For advanced users who need custom processing logic, nested trace hierarchies, or integration with specific LLM providers, you can use manifold pipelines instead of the filter pipeline. Manifold pipelines use Openlayer’s auto-instrumentation functions (trace_openai(), trace_litellm()) for richer tracing with automatic step hierarchies.
Use the filter pipeline (above) unless you need custom request processing or provider-specific features.
"""
title: OpenAI Pipeline with Openlayer Tracing
author: Openlayer
version: 1.0.0
license: MIT
description: OpenAI pipeline with Openlayer tracing for monitoring and observability.
requirements: openlayer>=0.12.1, openai>=1.0.0
"""

import os
from typing import Generator, Iterator, List, Union
import openai
from openlayer.lib import trace_openai, update_trace_user_session
from openlayer.lib.tracing import tracer
from pydantic import BaseModel


class Pipeline:
    """OpenAI Pipeline with Openlayer tracing."""

    class Valves(BaseModel):
        """Configuration options for the pipeline."""
        OPENAI_API_KEY: str = ""
        OPENAI_API_BASE: str = "https://api.openai.com/v1"
        OPENAI_MODEL: str = "gpt-4o-mini"
        OPENLAYER_API_KEY: str = ""
        OPENLAYER_INFERENCE_PIPELINE_ID: str = ""
        OPENLAYER_BASE_URL: str = ""

    def __init__(self):
        """Initialize the pipeline with Openlayer tracing."""
        self.type = "manifold"
        self.name = "OpenAI + Openlayer"

        self.valves = self.Valves(
            **{
                "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""),
                "OPENAI_API_BASE": os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1"),
                "OPENAI_MODEL": os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
                "OPENLAYER_API_KEY": os.getenv("OPENLAYER_API_KEY", ""),
                "OPENLAYER_INFERENCE_PIPELINE_ID": os.getenv("OPENLAYER_INFERENCE_PIPELINE_ID", ""),
                "OPENLAYER_BASE_URL": os.getenv("OPENLAYER_BASE_URL", ""),
            }
        )

        self._configure_openlayer()
        trace_openai()

    def _configure_openlayer(self) -> None:
        """Configure Openlayer tracer with API credentials."""
        if not self.valves.OPENLAYER_API_KEY or not self.valves.OPENLAYER_INFERENCE_PIPELINE_ID:
            return

        os.environ["OPENLAYER_API_KEY"] = self.valves.OPENLAYER_API_KEY
        os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = self.valves.OPENLAYER_INFERENCE_PIPELINE_ID
        if self.valves.OPENLAYER_BASE_URL:
            os.environ["OPENLAYER_BASE_URL"] = self.valves.OPENLAYER_BASE_URL

        tracer.configure(
            api_key=self.valves.OPENLAYER_API_KEY,
            inference_pipeline_id=self.valves.OPENLAYER_INFERENCE_PIPELINE_ID,
            base_url=self.valves.OPENLAYER_BASE_URL if self.valves.OPENLAYER_BASE_URL else None,
        )

    def pipelines(self) -> List[dict]:
        """Return available models for this pipeline."""
        return [{"id": self.valves.OPENAI_MODEL, "name": self.valves.OPENAI_MODEL}]

    async def on_valves_updated(self) -> None:
        """Called when valve settings are updated."""
        self._configure_openlayer()

    @tracer.trace()
    def pipe(
        self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
        """Process a chat completion request through OpenAI."""
        user = body.get("user", {})
        user_id = user.get("email", user.get("id", "anonymous")) if user else "anonymous"
        session_id = body.get("chat_id", "anonymous")

        update_trace_user_session(user_id=user_id, session_id=session_id)

        client = openai.OpenAI(
            api_key=self.valves.OPENAI_API_KEY,
            base_url=self.valves.OPENAI_API_BASE,
        )

        response = client.chat.completions.create(
            model=model_id,
            messages=messages,
            stream=body.get("stream", True),
        )

        if body.get("stream", True):
            for chunk in response:
                if chunk.choices and chunk.choices[0].delta.content:
                    yield chunk.choices[0].delta.content
        else:
            return response.choices[0].message.content
"""
title: LiteLLM Pipeline with Openlayer Tracing
author: Openlayer
version: 1.0.0
license: MIT
description: LiteLLM pipeline with Openlayer tracing for multi-provider observability.
requirements: openlayer>=0.12.1, litellm>=1.0.0
"""

import os
from typing import Generator, Iterator, List, Union
import litellm
from openlayer.lib import trace_litellm, update_trace_user_session
from openlayer.lib.tracing import tracer
from pydantic import BaseModel


class Pipeline:
    """LiteLLM Pipeline with Openlayer tracing."""

    class Valves(BaseModel):
        """Configuration options for the pipeline."""
        LITELLM_BASE_URL: str = ""
        LITELLM_API_KEY: str = ""
        OPENLAYER_API_KEY: str = ""
        OPENLAYER_INFERENCE_PIPELINE_ID: str = ""
        OPENLAYER_BASE_URL: str = "https://api.openlayer.com/v1"

    def __init__(self):
        """Initialize the LiteLLM pipeline with Openlayer tracing."""
        self.type = "manifold"
        self.name = "LiteLLM + Openlayer"

        self.valves = self.Valves(
            **{
                "LITELLM_BASE_URL": os.getenv("LITELLM_BASE_URL", ""),
                "LITELLM_API_KEY": os.getenv("LITELLM_API_KEY", ""),
                "OPENLAYER_API_KEY": os.getenv("OPENLAYER_API_KEY", ""),
                "OPENLAYER_INFERENCE_PIPELINE_ID": os.getenv("OPENLAYER_INFERENCE_PIPELINE_ID", ""),
                "OPENLAYER_BASE_URL": os.getenv("OPENLAYER_BASE_URL", "https://api.openlayer.com/v1"),
            }
        )

        self._configure_openlayer()
        trace_litellm()

    def _configure_openlayer(self) -> None:
        """Configure Openlayer tracer with API credentials."""
        if not self.valves.OPENLAYER_API_KEY or not self.valves.OPENLAYER_INFERENCE_PIPELINE_ID:
            return

        os.environ["OPENLAYER_API_KEY"] = self.valves.OPENLAYER_API_KEY
        os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = self.valves.OPENLAYER_INFERENCE_PIPELINE_ID
        if self.valves.OPENLAYER_BASE_URL:
            os.environ["OPENLAYER_BASE_URL"] = self.valves.OPENLAYER_BASE_URL

        tracer.configure(
            api_key=self.valves.OPENLAYER_API_KEY,
            inference_pipeline_id=self.valves.OPENLAYER_INFERENCE_PIPELINE_ID,
            base_url=self.valves.OPENLAYER_BASE_URL if self.valves.OPENLAYER_BASE_URL else None,
        )

    async def on_valves_updated(self) -> None:
        """Called when valve settings are updated."""
        self._configure_openlayer()

    @tracer.trace()
    def pipe(
        self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
        """Process a chat completion request through LiteLLM."""
        user = body.get("user", {})
        user_id = user.get("email", user.get("id", "anonymous")) if user else "anonymous"
        session_id = body.get("chat_id", "anonymous")

        update_trace_user_session(user_id=user_id, session_id=session_id)

        response = litellm.completion(
            model=model_id,
            messages=messages,
            api_base=self.valves.LITELLM_BASE_URL,
            api_key=self.valves.LITELLM_API_KEY,
            stream=body.get("stream", True),
        )

        if body.get("stream", True):
            for chunk in response:
                if hasattr(chunk, "choices") and chunk.choices:
                    delta = getattr(chunk.choices[0], "delta", None)
                    if delta and hasattr(delta, "content") and delta.content:
                        yield delta.content
        else:
            return response.choices[0].message.content
To install a manifold pipeline, follow the same steps as the filter pipeline: navigate to Open WebUI → Admin Panel → Settings → Pipelines, add a new pipeline, paste the code, and configure the valves.

Troubleshooting

Traces not appearing in Openlayer

  1. Verify your API key and data source ID are correct in the pipeline valves
  2. Ensure the Pipelines container can reach api.openlayer.com
  3. Check the Pipelines logs: docker logs pipelines

Token usage not captured

Enable the “Usage” capability in model settings within Open WebUI (Admin Panel → Settings → Models).

Pipeline not loading

  1. Ensure the Pipelines container is running: docker ps | grep pipelines
  2. Verify the connection URL in Open WebUI matches where Pipelines is running

Learn more