Skip to content

Commit

Permalink
Fixed lint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Redna committed Jun 10, 2024
1 parent c9ed795 commit 6f190d1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ class LangfuseConnector:
In addition, you need to set the `HAYSTACK_CONTENT_TRACING_ENABLED` environment variable to `true` in order to
enable Haystack tracing in your pipeline.
Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH` environent
variable to `false`. By default, the data is flushed after each component and blocks the thread until the data is sent to
Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes before the data is sent to Langfuse.
Make sure you will call langfuse.flush() explicitly before the program exits. e.g. by using tracer.actual_tracer.flush():
Lastly, you may disable flushing the data after each component by setting the `HAYSTACK_LANGFUSE_ENFORCE_FLUSH`
environent variable to `false`. By default, the data is flushed after each component and blocks the thread until
the data is sent to Langfuse. **Caution**: Disabling this feature may result in data loss if the program crashes
before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits.
E.g. by using tracer.actual_tracer.flush():
```python
from haystack.tracing import tracer
Expand All @@ -38,7 +39,7 @@ class LangfuseConnector:
from haystack.tracing import tracer
# ...
@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import langfuse

from threading import Thread

HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR = "HAYSTACK_LANGFUSE_ENFORCE_FLUSH"

class LangfuseSpan(Span):
Expand Down Expand Up @@ -132,7 +130,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I
replies = span._data.get("haystack.component.output", {}).get("replies")
if replies:
meta = replies[0].meta
usage = meta.get("usage") if meta.get("usage") else None # empty dict will cause langfuse to throw an error - happens when streaming
usage = meta.get("usage") if meta.get("usage") else None
span._span.update(usage=usage, model=meta.get("model"))

pipeline_input = tags.get("haystack.pipeline.input_data", None)
Expand All @@ -148,7 +146,7 @@ def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> I
if len(self._context) == 1:
# The root span has to be a trace, which need to be removed from the context after the pipeline run
self._context.pop()

if self.enforce_flush:
self.flush()

Expand Down

0 comments on commit 6f190d1

Please sign in to comment.