Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Azure converter updates #7409

Merged
merged 13 commits into from
Apr 9, 2024
Next Next commit
Initial commit
  • Loading branch information
vblagoje committed Mar 27, 2024
commit 2169f82eab1b7393ed2ff293f30a474f1e2a2dd2
301 changes: 290 additions & 11 deletions haystack/components/converters/azure.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import copy
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Union

import networkx as nx
import pandas as pd

from haystack import Document, component, default_from_dict, default_to_dict, logging
from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
Expand All @@ -10,7 +15,7 @@
logger = logging.getLogger(__name__)

with LazyImport(message="Run 'pip install \"azure-ai-formrecognizer>=3.2.0b2\"'") as azure_import:
vblagoje marked this conversation as resolved.
Show resolved Hide resolved
from azure.ai.formrecognizer import AnalyzeResult, DocumentAnalysisClient
from azure.ai.formrecognizer import AnalyzeResult, DocumentAnalysisClient, DocumentLine, DocumentParagraph
from azure.core.credentials import AzureKeyCredential


Expand Down Expand Up @@ -39,7 +44,16 @@ class AzureOCRDocumentConverter:
"""

def __init__(
self, endpoint: str, api_key: Secret = Secret.from_env_var("AZURE_AI_API_KEY"), model_id: str = "prebuilt-read"
self,
endpoint: str,
api_key: Secret = Secret.from_env_var("AZURE_AI_API_KEY"),
model_id: str = "prebuilt-read",
save_json: bool = False,
preceding_context_len: int = 3,
following_context_len: int = 3,
merge_multiple_column_headers: bool = True,
page_layout: Literal["natural", "single_column"] = "natural",
threshold_y: Optional[float] = 0.05,
):
"""
Create an AzureOCRDocumentConverter component.
Expand All @@ -58,6 +72,14 @@ def __init__(
self.endpoint = endpoint
self.model_id = model_id
self.api_key = api_key
self.save_json = save_json
self.preceding_context_len = preceding_context_len
self.following_context_len = following_context_len
self.merge_multiple_column_headers = merge_multiple_column_headers
self.page_layout = page_layout
self.threshold_y = threshold_y
if self.page_layout == "single_column" and self.threshold_y is None:
self.threshold_y = 0.05

@component.output_types(documents=List[Document], raw_azure_response=List[Dict])
def run(self, sources: List[Union[str, Path, ByteStream]], meta: Optional[List[Dict[str, Any]]] = None):
Expand Down Expand Up @@ -95,14 +117,8 @@ def run(self, sources: List[Union[str, Path, ByteStream]], meta: Optional[List[D
result = poller.result()
azure_output.append(result.to_dict())

file_suffix = None
if "file_path" in bytestream.meta:
file_suffix = Path(bytestream.meta["file_path"]).suffix

document = AzureOCRDocumentConverter._convert_azure_result_to_document(result, file_suffix)
merged_metadata = {**bytestream.meta, **metadata}
document.meta = merged_metadata
documents.append(document)
docs = self._convert_tables_and_text(result=result, meta=meta)
documents.extend(docs)

return {"documents": documents, "raw_azure_response": azure_output}
sjrl marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -147,3 +163,266 @@ def _convert_azure_result_to_document(result: "AnalyzeResult", file_suffix: Opti
document = Document(content=text)

return document

# pylint: disable=line-too-long
def _convert_tables_and_text(self, result: "AnalyzeResult", meta: Optional[Dict[str, Any]]) -> List[Document]:
"""
:param result: The AnalyzeResult object returned by the `begin_analyze_document` method. Docs on Analyze result
can be found [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-ai-formrecognizer/3.3.0/azure.ai.formrecognizer.html?highlight=read#azure.ai.formrecognizer.AnalyzeResult).
:param meta: Optional dictionary with metadata that shall be attached to all resulting documents.
Can be any custom keys and values.
"""
tables = self._convert_tables(result=result, meta=meta)
if self.page_layout == "natural":
text = self._convert_to_natural_text(result=result, meta=meta)
else:
assert isinstance(self.threshold_y, float)
text = self._convert_to_single_column_text(result=result, meta=meta, threshold_y=self.threshold_y)
docs = [*tables, text]
return docs

def _convert_tables(self, result: "AnalyzeResult", meta: Optional[Dict[str, Any]]) -> List[Document]:
converted_tables: List[Document] = []

if not result.tables:
return converted_tables

for table in result.tables:
# Initialize table with empty cells
table_list = [[""] * table.column_count for _ in range(table.row_count)]
additional_column_header_rows = set()
caption = ""
row_idx_start = 0

for idx, cell in enumerate(table.cells):
# Remove ':selected:'/':unselected:' tags from cell's content
cell.content = cell.content.replace(":selected:", "")
cell.content = cell.content.replace(":unselected:", "")

# Check if first row is a merged cell spanning whole table
# -> exclude this row and use as a caption
if idx == 0 and cell.column_span == table.column_count:
caption = cell.content
row_idx_start = 1
table_list.pop(0)
continue

column_span = cell.column_span if cell.column_span else 0
for c in range(column_span): # pylint: disable=invalid-name
row_span = cell.row_span if cell.row_span else 0
for r in range(row_span): # pylint: disable=invalid-name
if (
self.merge_multiple_column_headers
and cell.kind == "columnHeader"
and cell.row_index > row_idx_start
):
# More than one row serves as column header
table_list[0][cell.column_index + c] += f"\n{cell.content}"
additional_column_header_rows.add(cell.row_index - row_idx_start)
else:
table_list[cell.row_index + r - row_idx_start][cell.column_index + c] = cell.content

# Remove additional column header rows, as these got attached to the first row
for row_idx in sorted(additional_column_header_rows, reverse=True):
del table_list[row_idx]

# Get preceding context of table
if table.bounding_regions:
table_beginning_page = next(
page for page in result.pages if page.page_number == table.bounding_regions[0].page_number
)
else:
table_beginning_page = None
table_start_offset = table.spans[0].offset
if table_beginning_page and table_beginning_page.lines:
preceding_lines = [
line.content for line in table_beginning_page.lines if line.spans[0].offset < table_start_offset
]
else:
preceding_lines = []
preceding_context = "\n".join(preceding_lines[-self.preceding_context_len :]) + f"\n{caption}"
preceding_context = preceding_context.strip()

# Get following context
if table.bounding_regions and len(table.bounding_regions) == 1:
table_end_page = table_beginning_page
elif table.bounding_regions:
table_end_page = next(
page for page in result.pages if page.page_number == table.bounding_regions[-1].page_number
)
else:
table_end_page = None

table_end_offset = table_start_offset + table.spans[0].length
if table_end_page and table_end_page.lines:
following_lines = [
line.content for line in table_end_page.lines if line.spans[0].offset > table_end_offset
]
else:
following_lines = []
following_context = "\n".join(following_lines[: self.following_context_len])

table_meta = copy.deepcopy(meta)

if isinstance(table_meta, dict):
table_meta["preceding_context"] = preceding_context
table_meta["following_context"] = following_context
else:
table_meta = {"preceding_context": preceding_context, "following_context": following_context}

if table.bounding_regions:
table_meta["page"] = table.bounding_regions[0].page_number

table_df = pd.DataFrame(columns=table_list[0], data=table_list[1:])
converted_tables.append(Document(dataframe=table_df, meta=table_meta))

return converted_tables

def _convert_to_natural_text(self, result: "AnalyzeResult", meta: Optional[Dict[str, str]]) -> Document:
"""
This converts the `AnalyzeResult` object into a single Haystack Document. We add "\f" separators between to
differentiate between the text on separate pages. This is the expected format for the PreProcessor.

:param result: The AnalyzeResult object returned by the `begin_analyze_document` method. Docs on Analyze result
can be found [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-ai-formrecognizer/3.3.0/azure.ai.formrecognizer.html?highlight=read#azure.ai.formrecognizer.AnalyzeResult).
:param meta: Optional dictionary with metadata that shall be attached to all resulting documents.
Can be any custom keys and values.
"""
table_spans_by_page = self._collect_table_spans(result=result)

texts = []
if result.paragraphs:
paragraphs_to_pages: Dict = defaultdict(str)
for paragraph in result.paragraphs:
if paragraph.bounding_regions:
# If paragraph spans multiple pages we group it with the first page number
page_numbers = [b.page_number for b in paragraph.bounding_regions]
else:
# If page_number is not available we put the paragraph onto an existing page
current_last_page_number = sorted(paragraphs_to_pages.keys())[-1]
page_numbers = [current_last_page_number]
tables_on_page = table_spans_by_page[page_numbers[0]]
# Check if paragraph is part of a table and if so skip
if self._check_if_in_table(tables_on_page, line_or_paragraph=paragraph):
continue
paragraphs_to_pages[page_numbers[0]] += paragraph.content + "\n"

max_page_number = max(n for n in paragraphs_to_pages)
for page_idx in range(1, max_page_number + 1):
# We add empty strings for missing pages so the preprocessor can still extract the correct page number
# from the original PDF.
page_text = paragraphs_to_pages.get(page_idx, "")
texts.append(page_text)
else:
logger.warning("No text paragraphs were detected by the OCR conversion.")

all_text = "\f".join(texts)
return Document(content=all_text, meta=meta)

def _convert_to_single_column_text(
self, result: "AnalyzeResult", meta: Optional[Dict[str, str]], threshold_y: float = 0.05
) -> Document:
"""
This converts the `AnalyzeResult` object into a single Haystack Document. We add "\f" separators between to
differentiate between the text on separate pages. This is the expected format for the PreProcessor.

:param result: The AnalyzeResult object returned by the `begin_analyze_document` method. Docs on Analyze result
can be found [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-ai-formrecognizer/3.3.0/azure.ai.formrecognizer.html?highlight=read#azure.ai.formrecognizer.AnalyzeResult).
:param meta: Optional dictionary with metadata that shall be attached to all resulting documents.
Can be any custom keys and values.
:param threshold_y: height threshold in inches for PDF and pixels for images
"""
table_spans_by_page = self._collect_table_spans(result=result)

# Find all pairs of lines that should be grouped together based on the y-value of the upper left coordinate
# of their bounding box
pairs_by_page = defaultdict(list)
for page_idx, page in enumerate(result.pages):
lines = page.lines if page.lines else []
# Only works if polygons is available
if all(line.polygon is not None for line in lines):
for i in range(len(lines)): # pylint: disable=consider-using-enumerate
# left_upi, right_upi, right_lowi, left_lowi = lines[i].polygon
left_upi, _, _, _ = lines[i].polygon # type: ignore
pairs_by_page[page_idx].append([i, i])
for j in range(i + 1, len(lines)): # pylint: disable=invalid-name
left_upj, _, _, _ = lines[j].polygon # type: ignore
close_on_y_axis = abs(left_upi[1] - left_upj[1]) < threshold_y
if close_on_y_axis:
pairs_by_page[page_idx].append([i, j])
# Default if polygon is not available
else:
logger.info(
"Polygon information for lines on page %s is not available so it is not possible to enforce a "
"single column page layout.",
page_idx,
)
for i in range(len(lines)):
pairs_by_page[page_idx].append([i, i])

# merged the line pairs that are connected by page
merged_pairs_by_page = {}
for page_idx in pairs_by_page:
graph = nx.Graph()
graph.add_edges_from(pairs_by_page[page_idx])
merged_pairs_by_page[page_idx] = [list(a) for a in list(nx.connected_components(graph))]

# Convert line indices to the DocumentLine objects
merged_lines_by_page = {}
for page_idx, page in enumerate(result.pages):
rows = []
lines = page.lines if page.lines else []
# We use .get(page_idx, []) since the page could be empty
for row_of_lines in merged_pairs_by_page.get(page_idx, []):
lines_in_row = [lines[line_idx] for line_idx in row_of_lines]
rows.append(lines_in_row)
merged_lines_by_page[page_idx] = rows

# Sort the merged pairs in each row by the x-value of the upper left bounding box coordinate
x_sorted_lines_by_page = {}
for page_idx, _ in enumerate(result.pages):
sorted_rows = []
for row_of_lines in merged_lines_by_page[page_idx]:
sorted_rows.append(sorted(row_of_lines, key=lambda x: x.polygon[0][0])) # type: ignore
x_sorted_lines_by_page[page_idx] = sorted_rows

# Sort each row within the page by the y-value of the upper left bounding box coordinate
y_sorted_lines_by_page = {}
for page_idx, _ in enumerate(result.pages):
sorted_rows = sorted(x_sorted_lines_by_page[page_idx], key=lambda x: x[0].polygon[0][1]) # type: ignore
y_sorted_lines_by_page[page_idx] = sorted_rows

# Construct the text to write
texts = []
for page_idx, page in enumerate(result.pages):
tables_on_page = table_spans_by_page[page.page_number]
page_text = ""
for row_of_lines in y_sorted_lines_by_page[page_idx]:
# Check if line is part of a table and if so skip
if any(self._check_if_in_table(tables_on_page, line_or_paragraph=line) for line in row_of_lines):
continue
page_text += " ".join(line.content for line in row_of_lines)
page_text += "\n"
texts.append(page_text)
all_text = "\f".join(texts)
return Document(content=all_text, meta=meta)

def _collect_table_spans(self, result: "AnalyzeResult") -> Dict:
table_spans_by_page = defaultdict(list)
tables = result.tables if result.tables else []
for table in tables:
if not table.bounding_regions:
continue
table_spans_by_page[table.bounding_regions[0].page_number].append(table.spans[0])
return table_spans_by_page

def _check_if_in_table(
self, tables_on_page: dict, line_or_paragraph: Union["DocumentLine", "DocumentParagraph"]
) -> bool:
in_table = False
# Check if line is part of a table
for table in tables_on_page:
if table.offset <= line_or_paragraph.spans[0].offset <= table.offset + table.length:
in_table = True
break
return in_table