Skip to content

Commit

Permalink
Add support for aliases in elasticsearch document store (#2448)
Browse files Browse the repository at this point in the history
* Add support for aliases in elasticsearch document store

* Add alias support for OpenSearch

* Missing variable index

* Update Documentation & Code Style

* Add unit test for elasticsearch alias support

* Fix unit test when index is not compatible with haystack

* Fix auto format conflict

* Add comment explaining for loop for alias

* Update Documentation & Code Style

Co-authored-by: Jonathan Gallon <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Julian Risch <[email protected]>
  • Loading branch information
4 people authored Apr 28, 2022
1 parent 2a44840 commit 25b87e8
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 80 deletions.
176 changes: 96 additions & 80 deletions haystack/document_stores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,31 +311,41 @@ def _create_document_index(self, index_name: str, headers: Optional[Dict[str, st
Create a new index for storing documents. In case if an index with the name already exists, it ensures that
the embedding_field is present.
"""
# Check if index_name refers to an alias
if self.client.indices.exists_alias(name=index_name):
logger.debug(f"Index name {index_name} is an alias.")

# check if the existing index has the embedding field; if not create it
if self.client.indices.exists(index=index_name, headers=headers):
mapping = self.client.indices.get(index_name, headers=headers)[index_name]["mappings"]
if self.search_fields:
for search_field in self.search_fields:
if search_field in mapping["properties"] and mapping["properties"][search_field]["type"] != "text":
indices = self.client.indices.get(index_name, headers=headers)
# If the index name is an alias that groups multiple existing indices, each of them must have an embedding_field.
for index_id, index_info in indices.items():
mapping = index_info["mappings"]
if self.search_fields:
for search_field in self.search_fields:
if (
search_field in mapping["properties"]
and mapping["properties"][search_field]["type"] != "text"
):
raise Exception(
f"The search_field '{search_field}' of index '{index_id}' with type '{mapping['properties'][search_field]['type']}' "
f"does not have the right type 'text' to be queried in fulltext search. Please use only 'text' type properties as search_fields or use another index. "
f"This error might occur if you are trying to use haystack 1.0 and above with an existing elasticsearch index created with a previous version of haystack. "
f'In this case deleting the index with `delete_index(index="{index_id}")` will fix your environment. '
f"Note, that all data stored in the index will be lost!"
)
if self.embedding_field:
if (
self.embedding_field in mapping["properties"]
and mapping["properties"][self.embedding_field]["type"] != "dense_vector"
):
raise Exception(
f"The search_field '{search_field}' of index '{index_name}' with type '{mapping['properties'][search_field]['type']}' "
f"does not have the right type 'text' to be queried in fulltext search. Please use only 'text' type properties as search_fields or use another index. "
f"This error might occur if you are trying to use haystack 1.0 and above with an existing elasticsearch index created with a previous version of haystack. "
f'In this case deleting the index with `delete_index(index="{index_name}")` will fix your environment. '
f"Note, that all data stored in the index will be lost!"
f"The '{index_id}' index in Elasticsearch already has a field called '{self.embedding_field}'"
f" with the type '{mapping['properties'][self.embedding_field]['type']}'. Please update the "
f"document_store to use a different name for the embedding_field parameter."
)
if self.embedding_field:
if (
self.embedding_field in mapping["properties"]
and mapping["properties"][self.embedding_field]["type"] != "dense_vector"
):
raise Exception(
f"The '{index_name}' index in Elasticsearch already has a field called '{self.embedding_field}'"
f" with the type '{mapping['properties'][self.embedding_field]['type']}'. Please update the "
f"document_store to use a different name for the embedding_field parameter."
)
mapping["properties"][self.embedding_field] = {"type": "dense_vector", "dims": self.embedding_dim}
self.client.indices.put_mapping(index=index_name, body=mapping, headers=headers)
mapping["properties"][self.embedding_field] = {"type": "dense_vector", "dims": self.embedding_dim}
self.client.indices.put_mapping(index=index_id, body=mapping, headers=headers)
return

if self.custom_mapping:
Expand Down Expand Up @@ -1864,74 +1874,80 @@ def _create_document_index(self, index_name: str, headers: Optional[Dict[str, st
"""
Create a new index for storing documents.
"""
# Check if index_name refers to an alias
if self.client.indices.exists_alias(name=index_name):
logger.debug(f"Index name {index_name} is an alias.")

# check if the existing index has the embedding field; if not create it
if self.client.indices.exists(index=index_name, headers=headers):
index_info = self.client.indices.get(index_name, headers=headers)[index_name]
mappings = index_info["mappings"]
index_settings = index_info["settings"]["index"]
if self.search_fields:
for search_field in self.search_fields:
if (
search_field in mappings["properties"]
and mappings["properties"][search_field]["type"] != "text"
):
raise Exception(
f"The search_field '{search_field}' of index '{index_name}' with type '{mappings['properties'][search_field]['type']}' "
f"does not have the right type 'text' to be queried in fulltext search. Please use only 'text' type properties as search_fields or use another index. "
f"This error might occur if you are trying to use haystack 1.0 and above with an existing elasticsearch index created with a previous version of haystack. "
f'In this case deleting the index with `delete_index(index="{index_name}")` will fix your environment. '
f"Note, that all data stored in the index will be lost!"
)

# embedding field will be created
if self.embedding_field not in mappings["properties"]:
mappings["properties"][self.embedding_field] = self._get_embedding_field_mapping(
similarity=self.similarity
)
self.client.indices.put_mapping(index=self.index, body=mappings, headers=headers)
self.embeddings_field_supports_similarity = True
else:
# bad embedding field
if mappings["properties"][self.embedding_field]["type"] != "knn_vector":
raise Exception(
f"The '{index_name}' index in OpenSearch already has a field called '{self.embedding_field}'"
f" with the type '{mappings['properties'][self.embedding_field]['type']}'. Please update the "
f"document_store to use a different name for the embedding_field parameter."
indices = self.client.indices.get(index_name, headers=headers)
# If the index name is an alias that groups multiple existing indices, each of them must have an embedding_field.
for index_id, index_info in indices.items():
mappings = index_info["mappings"]
index_settings = index_info["settings"]["index"]
if self.search_fields:
for search_field in self.search_fields:
if (
search_field in mappings["properties"]
and mappings["properties"][search_field]["type"] != "text"
):
raise Exception(
f"The search_field '{search_field}' of index '{index_id}' with type '{mappings['properties'][search_field]['type']}' "
f"does not have the right type 'text' to be queried in fulltext search. Please use only 'text' type properties as search_fields or use another index. "
f"This error might occur if you are trying to use haystack 1.0 and above with an existing elasticsearch index created with a previous version of haystack. "
f'In this case deleting the index with `delete_index(index="{index_id}")` will fix your environment. '
f"Note, that all data stored in the index will be lost!"
)

# embedding field will be created
if self.embedding_field not in mappings["properties"]:
mappings["properties"][self.embedding_field] = self._get_embedding_field_mapping(
similarity=self.similarity
)
# embedding field with global space_type setting
if "method" not in mappings["properties"][self.embedding_field]:
embedding_field_space_type = index_settings["knn.space_type"]
# embedding field with local space_type setting
self.client.indices.put_mapping(index=index_id, body=mappings, headers=headers)
self.embeddings_field_supports_similarity = True
else:
# bad embedding field
if mappings["properties"][self.embedding_field]["type"] != "knn_vector":
raise Exception(
f"The '{index_id}' index in OpenSearch already has a field called '{self.embedding_field}'"
f" with the type '{mappings['properties'][self.embedding_field]['type']}'. Please update the "
f"document_store to use a different name for the embedding_field parameter."
)
# embedding field with global space_type setting
if "method" not in mappings["properties"][self.embedding_field]:
embedding_field_space_type = index_settings["knn.space_type"]
# embedding field with local space_type setting
else:
embedding_field_space_type = mappings["properties"][self.embedding_field]["method"][
"space_type"
]

embedding_field_similarity = self.space_type_to_similarity[embedding_field_space_type]
if embedding_field_similarity == self.similarity:
self.embeddings_field_supports_similarity = True
else:
logger.warning(
f"Embedding field '{self.embedding_field}' is optimized for similarity '{embedding_field_similarity}'. "
f"Falling back to slow exact vector calculation. "
f"Consider cloning the embedding field optimized for '{embedding_field_similarity}' by calling clone_embedding_field(similarity='{embedding_field_similarity}', ...) "
f"or creating a new index optimized for '{self.similarity}' by setting `similarity='{self.similarity}'` the first time you instantiate OpenSearchDocumentStore for the new index, "
f"e.g. `OpenSearchDocumentStore(index='my_new_{self.similarity}_index', similarity='{self.similarity}')`."
)

# Adjust global ef_search setting. If not set, default is 512.
ef_search = index_settings.get("knn.algo_param", {"ef_search": 512}).get("ef_search", 512)
if self.index_type == "hnsw" and ef_search != 20:
body = {"knn.algo_param.ef_search": 20}
self.client.indices.put_settings(index=self.index, body=body, headers=headers)
elif self.index_type == "flat" and ef_search != 512:
body = {"knn.algo_param.ef_search": 512}
self.client.indices.put_settings(index=self.index, body=body, headers=headers)
# embedding field with global space_type setting
if "method" not in mappings["properties"][self.embedding_field]:
embedding_field_space_type = index_settings["knn.space_type"]
# embedding field with local space_type setting
else:
embedding_field_space_type = mappings["properties"][self.embedding_field]["method"][
"space_type"
]

embedding_field_similarity = self.space_type_to_similarity[embedding_field_space_type]
if embedding_field_similarity == self.similarity:
self.embeddings_field_supports_similarity = True
else:
logger.warning(
f"Embedding field '{self.embedding_field}' is optimized for similarity '{embedding_field_similarity}'. "
f"Falling back to slow exact vector calculation. "
f"Consider cloning the embedding field optimized for '{embedding_field_similarity}' by calling clone_embedding_field(similarity='{embedding_field_similarity}', ...) "
f"or creating a new index optimized for '{self.similarity}' by setting `similarity='{self.similarity}'` the first time you instantiate OpenSearchDocumentStore for the new index, "
f"e.g. `OpenSearchDocumentStore(index='my_new_{self.similarity}_index', similarity='{self.similarity}')`."
)

# Adjust global ef_search setting. If not set, default is 512.
ef_search = index_settings.get("knn.algo_param", {"ef_search": 512}).get("ef_search", 512)
if self.index_type == "hnsw" and ef_search != 20:
body = {"knn.algo_param.ef_search": 20}
self.client.indices.put_settings(index=index_id, body=body, headers=headers)
elif self.index_type == "flat" and ef_search != 512:
body = {"knn.algo_param.ef_search": 512}
self.client.indices.put_settings(index=index_id, body=body, headers=headers)

return

Expand Down
47 changes: 47 additions & 0 deletions test/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,53 @@ def test_elasticsearch_search_field_mapping():
assert indexed_settings["haystack_search_field_mapping"]["mappings"]["properties"]["sub_content"]["type"] == "text"


@pytest.mark.elasticsearch
def test_elasticsearch_existing_alias():

client = Elasticsearch()
client.indices.delete(index="haystack_existing_alias_1", ignore=[404])
client.indices.delete(index="haystack_existing_alias_2", ignore=[404])
client.indices.delete_alias(index="_all", name="haystack_existing_alias", ignore=[404])

settings = {"mappings": {"properties": {"content": {"type": "text"}}}}

client.indices.create(index="haystack_existing_alias_1", body=settings)
client.indices.create(index="haystack_existing_alias_2", body=settings)

client.indices.put_alias(
index="haystack_existing_alias_1,haystack_existing_alias_2", name="haystack_existing_alias"
)

# To be valid, all indices related to the alias must have content field of type text
_ = ElasticsearchDocumentStore(index="haystack_existing_alias", search_fields=["content"])


@pytest.mark.elasticsearch
def test_elasticsearch_existing_alias_missing_fields():

client = Elasticsearch()
client.indices.delete(index="haystack_existing_alias_1", ignore=[404])
client.indices.delete(index="haystack_existing_alias_2", ignore=[404])
client.indices.delete_alias(index="_all", name="haystack_existing_alias", ignore=[404])

right_settings = {"mappings": {"properties": {"content": {"type": "text"}}}}

wrong_settings = {"mappings": {"properties": {"content": {"type": "histogram"}}}}

client.indices.create(index="haystack_existing_alias_1", body=right_settings)
client.indices.create(index="haystack_existing_alias_2", body=wrong_settings)

client.indices.put_alias(
index="haystack_existing_alias_1,haystack_existing_alias_2", name="haystack_existing_alias"
)

with pytest.raises(Exception):
# wrong field type for "content" in index "haystack_existing_alias_2"
_ = ElasticsearchDocumentStore(
index="haystack_existing_alias", search_fields=["content"], content_field="title"
)


@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True)
def test_elasticsearch_brownfield_support(document_store_with_docs):
new_document_store = InMemoryDocumentStore()
Expand Down

0 comments on commit 25b87e8

Please sign in to comment.