Skip to content

Commit

Permalink
feat(pinecone): metrics support (#1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
nirga authored May 14, 2024
1 parent b1370fc commit 2b6fa3b
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""OpenTelemetry Pinecone instrumentation"""

import logging
import json
from opentelemetry.instrumentation.pinecone.config import Config
from opentelemetry.instrumentation.pinecone.utils import dont_throw
import time
import pinecone
from typing import Collection
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.metrics import get_meter
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace.status import Status, StatusCode

Expand All @@ -17,14 +16,23 @@
_SUPPRESS_INSTRUMENTATION_KEY,
unwrap,
)
from opentelemetry.semconv.ai import EventAttributes, Events
from opentelemetry.instrumentation.pinecone.config import Config
from opentelemetry.instrumentation.pinecone.utils import (
dont_throw,
is_metrics_enabled,
set_span_attribute,
)
from opentelemetry.instrumentation.pinecone.version import __version__

from opentelemetry.semconv.ai import SpanAttributes
from opentelemetry.instrumentation.pinecone.query_handlers import (
set_query_input_attributes,
set_query_response,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv.ai import SpanAttributes as AISpanAttributes

logger = logging.getLogger(__name__)

_instruments = ("pinecone-client >= 2.2.2, <4",)
_instruments = ("pinecone-client >= 2.2.2, <5",)


WRAPPED_METHODS = [
Expand Down Expand Up @@ -61,118 +69,69 @@
]


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


@dont_throw
def _set_query_input_attributes(span, kwargs):
# Pinecone-client 2.2.2 query kwargs
# vector: Optional[List[float]] = None,
# id: Optional[str] = None,
# queries: Optional[Union[List[QueryVector], List[Tuple]]] = None,
# top_k: Optional[int] = None,
# namespace: Optional[str] = None,
# filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
# include_values: Optional[bool] = None,
# include_metadata: Optional[bool] = None,
# sparse_vector: Optional[Union[SparseValues, Dict[str, Union[List[float], List[int]]]]] = None,
# **kwargs) -> QueryResponse:

_set_span_attribute(span, "pinecone.query.id", kwargs.get("id"))
_set_span_attribute(span, "pinecone.query.queries", kwargs.get("queries"))
_set_span_attribute(span, "pinecone.query.top_k", kwargs.get("top_k"))
_set_span_attribute(span, "pinecone.query.namespace", kwargs.get("namespace"))
if isinstance(kwargs.get("filter"), dict):
_set_span_attribute(
span, "pinecone.query.filter", json.dumps(kwargs.get("filter"))
)
else:
_set_span_attribute(span, "pinecone.query.filter", kwargs.get("filter"))
_set_span_attribute(
span, "pinecone.query.include_values", kwargs.get("include_values")
)
_set_span_attribute(
span, "pinecone.query.include_metadata", kwargs.get("include_metadata")
)

# Log query embeddings
# We assume user will pass either vector, sparse_vector or queries
# But not two or more simultaneously
# When defining conflicting sources of embeddings, the trace result is undefined

vector = kwargs.get("vector")
if vector:
span.add_event(
name="db.query.embeddings",
attributes={"db.query.embeddings.vector": vector},
)

sparse_vector = kwargs.get("sparse_vector")
if sparse_vector:
span.add_event(
name="db.query.embeddings",
attributes={"db.query.embeddings.vector": sparse_vector},
)

queries = kwargs.get("queries")
if queries:
for vector in queries:
span.add_event(
name=Events.DB_QUERY_EMBEDDINGS.value,
attributes={EventAttributes.DB_QUERY_EMBEDDINGS_VECTOR.value: vector},
)
def _set_input_attributes(span, instance, kwargs):
set_span_attribute(span, SpanAttributes.SERVER_ADDRESS, instance._config.host)


@dont_throw
def _set_query_response(span, response):
matches = response.get("matches")

for match in matches:
span.add_event(
name=Events.DB_QUERY_RESULT.value,
attributes={
EventAttributes.DB_QUERY_RESULT_ID.value: match.get("id"),
EventAttributes.DB_QUERY_RESULT_SCORE.value: match.get("score"),
EventAttributes.DB_QUERY_RESULT_METADATA.value: str(
match.get("metadata")
),
EventAttributes.DB_QUERY_RESULT_VECTOR.value: match.get("values"),
},
)

def _set_response_attributes(
span, read_units_metric, write_units_metric, shared_attributes, response
):
if response.get("usage"):
read_units = response.get("usage").get("read_units") or 0
write_units = response.get("usage").get("write_units") or 0

def _set_input_attributes(span, kwargs):
pass
read_units_metric.add(read_units, shared_attributes)
span.set_attribute("pinecone.usage.read_units", read_units)

write_units_metric.add(write_units, shared_attributes)
span.set_attribute("pinecone.usage.write_units", write_units)

def _set_response_attributes(span, response):
if response.get("usage"):
span.set_attribute(
"pinecone.usage.read_units", response.get("usage").get("read_units") or 0
)
span.set_attribute(
"pinecone.usage.write_units", response.get("usage").get("write_units") or 0
)


def _with_tracer_wrapper(func):
def _with_wrapper(func):
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, to_wrap):
def _with_tracer(
tracer,
query_duration_metric,
read_units_metric,
write_units_metric,
scores_metric,
to_wrap,
):
def wrapper(wrapped, instance, args, kwargs):
return func(tracer, to_wrap, wrapped, instance, args, kwargs)
return func(
tracer,
query_duration_metric,
read_units_metric,
write_units_metric,
scores_metric,
to_wrap,
wrapped,
instance,
args,
kwargs,
)

return wrapper

return _with_tracer


@_with_tracer_wrapper
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
@_with_wrapper
def _wrap(
tracer,
query_duration_metric,
read_units_metric,
write_units_metric,
scores_metric,
to_wrap,
wrapped,
instance,
args,
kwargs,
):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)
Expand All @@ -182,23 +141,38 @@ def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.VECTOR_DB_VENDOR: "Pinecone",
AISpanAttributes.VECTOR_DB_VENDOR: "Pinecone",
},
) as span:
if span.is_recording():
_set_input_attributes(span, instance, kwargs)
if to_wrap.get("method") == "query":
_set_query_input_attributes(span, kwargs)
else:
_set_input_attributes(span, kwargs)
set_query_input_attributes(span, kwargs)

shared_attributes = {
"server.address": instance._config.host,
}

start_time = time.time()
response = wrapped(*args, **kwargs)
end_time = time.time()

duration = end_time - start_time
if duration > 0 and query_duration_metric and to_wrap.get("method") == "query":
query_duration_metric.record(duration, shared_attributes)

if response:
if span.is_recording():
if to_wrap.get("method") == "query":
_set_query_response(span, response)

_set_response_attributes(span, response)
set_query_response(span, scores_metric, shared_attributes, response)

_set_response_attributes(
span,
read_units_metric,
write_units_metric,
shared_attributes,
response,
)

span.set_status(Status(StatusCode.OK))

Expand All @@ -216,6 +190,31 @@ def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
if is_metrics_enabled():
meter_provider = kwargs.get("meter_provider")
meter = get_meter(__name__, __version__, meter_provider)

query_duration_metric = meter.create_histogram(
"db.pinecone.query.duration",
"s",
"Duration of query operations to Pinecone",
)
read_units_metric = meter.create_counter(
"db.pinecone.usage.read_units",
"unit",
"Number of read units consumed in serverless calls",
)
write_units_metric = meter.create_counter(
"db.pinecone.usage.write_units",
"unit",
"Number of write units consumed in serverless calls",
)
scores_metric = meter.create_histogram(
"db.pinecone.query.scores",
"score",
"Scores returned from Pinecone calls",
)

tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
for wrapped_method in WRAPPED_METHODS:
Expand All @@ -225,7 +224,14 @@ def _instrument(self, **kwargs):
wrap_function_wrapper(
"pinecone",
f"{wrap_object}.{wrap_method}",
_wrap(tracer, wrapped_method),
_wrap(
tracer,
query_duration_metric,
read_units_metric,
write_units_metric,
scores_metric,
wrapped_method,
),
)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import json

from opentelemetry.semconv.ai import EventAttributes, Events
from opentelemetry.instrumentation.pinecone.utils import dont_throw, set_span_attribute


@dont_throw
def set_query_input_attributes(span, kwargs):
# Pinecone-client 2.2.2 query kwargs
# vector: Optional[List[float]] = None,
# id: Optional[str] = None,
# queries: Optional[Union[List[QueryVector], List[Tuple]]] = None,
# top_k: Optional[int] = None,
# namespace: Optional[str] = None,
# filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
# include_values: Optional[bool] = None,
# include_metadata: Optional[bool] = None,
# sparse_vector: Optional[Union[SparseValues, Dict[str, Union[List[float], List[int]]]]] = None,
# **kwargs) -> QueryResponse:

set_span_attribute(span, "pinecone.query.id", kwargs.get("id"))
set_span_attribute(span, "pinecone.query.queries", kwargs.get("queries"))
set_span_attribute(span, "pinecone.query.top_k", kwargs.get("top_k"))
set_span_attribute(span, "pinecone.query.namespace", kwargs.get("namespace"))
if isinstance(kwargs.get("filter"), dict):
set_span_attribute(
span, "pinecone.query.filter", json.dumps(kwargs.get("filter"))
)
else:
set_span_attribute(span, "pinecone.query.filter", kwargs.get("filter"))
set_span_attribute(
span, "pinecone.query.include_values", kwargs.get("include_values")
)
set_span_attribute(
span, "pinecone.query.include_metadata", kwargs.get("include_metadata")
)

# Log query embeddings
# We assume user will pass either vector, sparse_vector or queries
# But not two or more simultaneously
# When defining conflicting sources of embeddings, the trace result is undefined

vector = kwargs.get("vector")
if vector:
span.add_event(
name="db.query.embeddings",
attributes={"db.query.embeddings.vector": vector},
)

sparse_vector = kwargs.get("sparse_vector")
if sparse_vector:
span.add_event(
name="db.query.embeddings",
attributes={"db.query.embeddings.vector": sparse_vector},
)

queries = kwargs.get("queries")
if queries:
for vector in queries:
span.add_event(
name=Events.DB_QUERY_EMBEDDINGS.value,
attributes={EventAttributes.DB_QUERY_EMBEDDINGS_VECTOR.value: vector},
)


@dont_throw
def set_query_response(span, scores_metric, shared_attributes, response):
matches = response.get("matches")

for match in matches:
if scores_metric and match.get("score"):
scores_metric.record(match.get("score"), shared_attributes)

span.add_event(
name=Events.DB_QUERY_RESULT.value,
attributes={
EventAttributes.DB_QUERY_RESULT_ID.value: match.get("id"),
EventAttributes.DB_QUERY_RESULT_SCORE.value: match.get("score"),
EventAttributes.DB_QUERY_RESULT_METADATA.value: str(
match.get("metadata")
),
EventAttributes.DB_QUERY_RESULT_VECTOR.value: match.get("values"),
},
)
Loading

0 comments on commit 2b6fa3b

Please sign in to comment.