Skip to content

Commit

Permalink
fix(langchain): support LCEL (#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
nirga authored Feb 22, 2024
1 parent de3618f commit c18fff2
Show file tree
Hide file tree
Showing 12 changed files with 1,260 additions and 820 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

from opentelemetry.instrumentation.langchain.task_wrapper import task_wrapper
from opentelemetry.instrumentation.langchain.workflow_wrapper import workflow_wrapper
from opentelemetry.instrumentation.langchain.task_wrapper import (
task_wrapper,
atask_wrapper,
)
from opentelemetry.instrumentation.langchain.workflow_wrapper import (
workflow_wrapper,
aworkflow_wrapper,
)
from opentelemetry.instrumentation.langchain.version import __version__

from opentelemetry.semconv.ai import TraceloopSpanKindValues
Expand All @@ -30,7 +36,7 @@
"package": "langchain.chains.base",
"object": "Chain",
"method": "acall",
"wrapper": task_wrapper,
"wrapper": atask_wrapper,
},
{
"package": "langchain.chains",
Expand All @@ -44,7 +50,7 @@
"object": "SequentialChain",
"method": "acall",
"span_name": "langchain.workflow",
"wrapper": workflow_wrapper,
"wrapper": aworkflow_wrapper,
},
{
"package": "langchain.agents",
Expand Down Expand Up @@ -74,8 +80,58 @@
"object": "RetrievalQA",
"method": "acall",
"span_name": "retrieval_qa.workflow",
"wrapper": aworkflow_wrapper,
},
{
"package": "langchain.prompts.base",
"object": "BasePromptTemplate",
"method": "invoke",
"wrapper": task_wrapper,
},
{
"package": "langchain.prompts.base",
"object": "BasePromptTemplate",
"method": "ainvoke",
"wrapper": atask_wrapper,
},
{
"package": "langchain.chat_models.base",
"object": "BaseChatModel",
"method": "invoke",
"wrapper": task_wrapper,
},
{
"package": "langchain.chat_models.base",
"object": "BaseChatModel",
"method": "ainvoke",
"wrapper": atask_wrapper,
},
{
"package": "langchain.schema",
"object": "BaseOutputParser",
"method": "invoke",
"wrapper": task_wrapper,
},
{
"package": "langchain.schema",
"object": "BaseOutputParser",
"method": "ainvoke",
"wrapper": atask_wrapper,
},
{
"package": "langchain.schema.runnable",
"object": "RunnableSequence",
"method": "invoke",
"span_name": "langchain.workflow",
"wrapper": workflow_wrapper,
},
{
"package": "langchain.schema.runnable",
"object": "RunnableSequence",
"method": "ainvoke",
"span_name": "langchain.workflow",
"wrapper": aworkflow_wrapper,
},
]


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from opentelemetry import context as context_api

from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY

from opentelemetry.semconv.ai import SpanAttributes, TraceloopSpanKindValues
Expand Down Expand Up @@ -34,3 +33,32 @@ def task_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
return_value = wrapped(*args, **kwargs)

return return_value


@_with_tracer_wrapper
async def atask_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

# Some Langchain objects are wrapped elsewhere, so we ignore them here
if instance.__class__.__name__ in ("AgentExecutor"):
return wrapped(*args, **kwargs)

if hasattr(instance, "name") and instance.name:
name = f"{to_wrap.get('span_name')}.{instance.name.lower()}"
elif to_wrap.get("span_name"):
name = to_wrap.get("span_name")
else:
name = f"langchain.task.{instance.__class__.__name__}"
kind = to_wrap.get("kind") or TraceloopSpanKindValues.TASK.value
with tracer.start_as_current_span(name) as span:
span.set_attribute(
SpanAttributes.TRACELOOP_SPAN_KIND,
kind,
)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, name)

return_value = await wrapped(*args, **kwargs)

return return_value
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,26 @@ def workflow_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
return_value = wrapped(*args, **kwargs)

return return_value


@_with_tracer_wrapper
async def aworkflow_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
kind = to_wrap.get("kind") or TraceloopSpanKindValues.WORKFLOW.value

attach(set_value("workflow_name", name))

with tracer.start_as_current_span(name) as span:
span.set_attribute(
SpanAttributes.TRACELOOP_SPAN_KIND,
kind,
)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, name)

return_value = await wrapped(*args, **kwargs)

return return_value
Loading

0 comments on commit c18fff2

Please sign in to comment.