The data publishing methods are categorized as streamlined approaches and the manual approach.
The streamlined approaches exist for common AI patterns and frameworks.
To use them, you need to wrap or decorate your code a certain way, and
Openlayer automatically captures relevant data and metadata, such as the number of tokens, cost, latency, etc.
This data is then published to the Openlayer platform.
There is a streamlined approach for each of the frameworks below:
OpenAI
To monitor chat completions and completion calls to OpenAI LLMs, you need to:
Copy
Ask AI
# 1. Set the environment variablesimport osimport openaios.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY_HERE"os.environ["OPENLAYER_API_KEY"] = "YOUR_OPENLAYER_API_KEY_HERE"os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "YOUR_OPENLAYER_INFERENCE_PIPELINE_ID_HERE"# 2. Import the `trace_openai` function and wrap the OpenAI client with itfrom openlayer.lib import trace_openaiopenai_client = trace_openai(openai.OpenAI())# 3. From now on, every chat completion/completion call with# the `openai_client` is traced and published to Openlayer. E.g.,completion = openai_client.chat.completions.create( model="gpt-4o", messages=[ {"role": "user", "content": "How are you doing today?"}, ])
That’s it! Now, your calls are being published to Openlayer, along with
metadata, such as latency, number of tokens, cost estimate, and more.
To monitor chat completions models and chains built with LangChain, you need to:
Copy
Ask AI
# 1. Set the environment variablesimport osos.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY_HERE"os.environ["OPENLAYER_API_KEY"] = "YOUR_OPENLAYER_API_KEY_HERE"os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "YOUR_OPENLAYER_INFERENCE_PIPELINE_ID_HERE"# 2. Instantiate the `OpenlayerHandler`from openlayer.lib.integrations import langchain_callbackopenlayer_handler = langchain_callback.OpenlayerHandler()# 3. Pass the handler to your LLM/chain invocationsfrom langchain_openai import ChatOpenAIchat = ChatOpenAI(max_tokens=25, callbacks=[openlayer_handler])chat.invoke("What's the meaning of life?")
That’s it! Now, your calls are being published to Openlayer, along with
metadata, such as latency, number of tokens, cost estimate, and more.
The code snippet above uses LangChain’s ChatOpenAI. However, the Openlayer
Callback Handler works for all LangChain chat
models and
LLMs.
Tracing multi-step LLM systems (e.g., RAG, LLM chains)
To trace a multi-step LLM system (such as a RAG system or LLM chains), you just need to
decorate all the functions you are interested in adding to a trace with Openlayer’s decorator.
For example:
Copy
Ask AI
import osfrom openlayer.lib import trace# Set the environment variablesos.environ["OPENLAYER_API_KEY"] = "YOUR_OPENLAYER_API_KEY_HERE"os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "YOUR_OPENLAYER_INFERENCE_PIPELINE_ID_HERE"# Decorate all the functions you want to trace@trace()def main(user_query: str) -> str: context = retrieve_context(user_query) answer = generate_answer(user_query, context) return answer@trace()def retrieve_context(user_query: str) -> str: return "Some context"@trace()def generate_answer(user_query: str, context: str) -> str: return "Some answer"# Every time the main function is called, the data is automatically# streamed to your Openlayer project. E.g.:main("What is the meaning of life?")
You can use the decorator together with the other streamlined methods. For example, if
your generate_answer function uses a wrapped version of the OpenAI client,
the chat completion calls will get added to the trace under the generate_answer function step.
# 1. Set the environment variablesimport anthropicimport osos.environ["ANTHROPIC_API_KEY"] = "YOUR_ANTHROPIC_API_KEY_HERE"os.environ["OPENLAYER_API_KEY"] = "YOUR_OPENLAYER_API_KEY_HERE"os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "YOUR_OPENLAYER_INFERENCE_PIPELINE_ID_HERE"# 2. Import the `trace_anthropic` functionfrom openlayer.lib import trace_anthropicanthropic_client = trace_anthropic(anthropic.Anthropic())# 3. From now on, every message creation call with# the `anthropic_client`is traced by Openlayer. E.g.,completion = anthropic_client.messages.create( model="claude-3-opus-20240229", max_tokens=1024, messages=[ {"role": "user", "content": "How are you doing today?"} ],)
That’s it! Now, your calls are being published to Openlayer, along with
metadata, such as latency, number of tokens, cost estimate, and more.
# 1. Set the environment variablesimport osos.environ["OPENLAYER_API_KEY"] = "YOUR_OPENLAYER_API_KEY_HERE"os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "YOUR_OPENLAYER_INFERENCE_PIPELINE_ID_HERE"# 2. Import the `trace_mistral` function and wrap the Mistral clientfrom mistralai import Mistralfrom openlayer.lib import trace_mistralmistral_client = trace_mistral(Mistral(api_key=os.environ["MISTRAL_API_KEY"]))# 3. From now on, every chat completion or streaming call with# the `mistral_client` is traced by Openlayer. E.g.,completion = mistral_client.chat.complete( model="mistral-large-latest", messages = [ {"role": "user", "content": "What is the best French cheese?"}, ])
That’s it! Now, your calls are being published to Openlayer, along with
metadata, such as latency, number of tokens, cost estimate, and more.
# 1. Set the environment variablesimport osos.environ["GROQ_API_KEY"] = "YOUR_GROQ_API_KEY_HERE"os.environ["OPENLAYER_API_KEY"] = "YOUR_OPENLAYER_API_KEY_HERE"os.environ["OPENLAYER_INFERENCE_PIPELINE_ID"] = "YOUR_OPENLAYER_INFERENCE_PIPELINE_ID_HERE"# 2. Import the `trace_groq` function and wrap the Groq clientimport groqfrom openlayer.lib import trace_groqgroq_client = trace_groq(groq.Groq())# 3. From now on, every chat completion call with# the `groq_client` is traced by Openlayer. E.g.,completion = groq_client.chat.completions.create( messages=[ { "role": "system", "content": "You are a helpful assistant." }, { "role": "user", "content": "Explain the importance of fast language models", } ], model="llama3-8b-8192",)
That’s it! Now, your calls are being published to Openlayer, along with
metadata, such as latency, number of tokens, cost estimate, and more.
# Create the assistantassistant = openai_client.beta.assistants.create( name="Data visualizer", description="You are great at creating and explaining beautiful data visualizations.", model="gpt-4", tools=[{"type": "code_interpreter"}],)# Create a threadthread = openai_client.beta.threads.create( messages=[ { "role": "user", "content": "Create a data visualization of the american GDP.", } ])# Run assistant on threadrun = openai_client.beta.threads.runs.create( thread_id=thread.id, assistant_id=assistant.id)from openlayer.lib import trace_openai_assistant_thread_runimport time# Keep polling the run resultswhile run.status != "completed": run = openai_client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id) # Trace the run with the Openlayer `trace_openai_assistant_thread_run`. If complete, the thread is sent to Openlayer trace_openai_assistant_thread_run(openai_client, run) time.sleep(5)
That’s it! Now, your calls are being published to Openlayer, along with
metadata, such as latency, number of tokens, cost estimate, and more.
To manually stream data to Openlayer, you can use the stream method, which hits the
/data-stream endpoit of the Openlayer REST API.
Copy
Ask AI
# Let's say we want to stream the following row, which represents a model prediction:rows = [ { "user_query": "what's the meaning of life?", "output": "42", "tokens": 7, "cost": 0.02, "timestamp": 1620000000, }]# Instantiate the Openlayer clientimport osfrom openlayer import Openlayerclient = Openlayer( # This is the default and can be omitted api_key=os.environ.get("OPENLAYER_API_KEY"),)# Prepare the config for the data, which depends on your project's task type. In this# case, we have an LLM project:from openlayer.types.inference_pipelines import data_stream_paramsconfig = data_stream_params.ConfigLlmData( input_variable_names=["user_query"], output_column_name="output", num_of_token_column_name="tokens", cost_column_name="cost", timestamp_column_name="timestamp", prompt=[{"role": "user", "content": "{{ user_query }}"}],)# Use the `stream` methoddata_stream_response = client.inference_pipelines.data.stream( id="YOUR_INFERENCE_PIPELINE_ID", rows=rows, config=config,)