Skip to content

Commit

Permalink
Fixed linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Redna committed Jun 10, 2024
1 parent c9ed795 commit 97b4c04
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ class LangfuseConnector:
In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to
enable Haystack tracing in your pipeline.
Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` environent
variable to `false`. By default, the data is flushed after each component and blocks the thread until the data is sent to
Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes before the data is sent to Langfuse.
Make sure you will call langfuse.flush() explicitly before the program exits. e.g. by using tracer.actual_tracer.flush():
Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH`
environent variable to `false`. By default, the data is flushed after each component and blocks the thread until
the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes
before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits.
E.g. by using tracer.actual_tracer.flush():
```python
from haystack.tracing import tracer
Expand All @@ -38,7 +39,7 @@ class LangfuseConnector:
from haystack.tracing import tracer
# ...
@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

import langfuse

from threading import Thread

HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH"


class LangfuseSpan(Span):
"""
Internal class representing a bridge between the Haystack span tracing API and Langfuse.
Expand Down Expand Up @@ -132,7 +131,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I
replies = span._data.get("haystack.component.output", {}).get("replies")
if replies:
meta = replies[0].meta
usage = meta.get("usage") if meta.get("usage") else None # empty dict will cause langfuse to throw an error - happens when streaming
usage = meta.get("usage") if meta.get("usage") else None
span._span.update(usage=usage, model=meta.get("model"))

pipeline_input = tags.get("haystack.pipeline.input_data", None)
Expand All @@ -148,7 +147,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I
if len(self._context) == 1:
# The root span has to be a trace, which need to be removed from the context after the pipeline run
self._context.pop()

if self.enforce_flush:
self.flush()

Expand Down
10 changes: 6 additions & 4 deletions integrations/langfuse/tests/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def test_create_new_span(self):
assert span.raw_span().operation_name == "operation_name"
assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"}

assert len(tracer._context) == 0, "The trace span should have been popped, and the root span is closed as well"
assert (
len(tracer._context) == 0
), "The trace span should have been popped, and the root span is closed as well"

# check that update method is called on the span instance with the provided key value pairs
def test_update_span_with_pipeline_input_output_data(self):
Expand Down Expand Up @@ -89,13 +91,13 @@ def test_update_span_gets_flushed_by_default(self):
pass

tracer_mock.flush.assert_called_once()

def test_update_span_flush_disable(self):
os.environ["HAYSTACK_LANGFUSE_ENFORCE_FLUSH"] = "false"
tracer_mock = Mock()

from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer

tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass
Expand All @@ -109,4 +111,4 @@ def test_context_is_empty_after_tracing(self):
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
pass

assert tracer._context == []
assert tracer._context == []

0 comments on commit 97b4c04

Please sign in to comment.