Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion install/requirements_py3.11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ scikit-learn
pymilvus
clickhouse_connect
pyvespa
mysql-connector-python
mysql-connector-python
packaging
49 changes: 48 additions & 1 deletion vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import Any, Final

from opensearchpy import OpenSearch
from packaging.version import Version
from packaging.version import parse as parse_version

from vectordb_bench.backend.filter import Filter, FilterOp

Expand All @@ -17,6 +19,16 @@
WAITING_FOR_FORCE_MERGE_SEC: Final[int] = 30
SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC: Final[int] = 30

# Central registry for version-dependent OpenSearch index settings.
# Add new rules here to automatically support future versions.
VERSION_SPECIFIC_SETTING_RULES = [
{
"name": "knn.advanced.approximate_threshold",
"applies": lambda version, _: version >= Version("3.0"),
"value": lambda _: "-1",
},
]


class OpenSearchError(Exception):
"""Custom exception for OpenSearch operations."""
Expand Down Expand Up @@ -216,10 +228,39 @@ def need_normalize_cosine(self) -> bool:
"""Whether this database needs to normalize dataset to support COSINE metric."""
return True

def _get_cluster_version(self, client: OpenSearch) -> Version:
"""
Return the OpenSearch cluster version as a comparable Version object.
Raises an exception if the version cannot be determined.
"""
try:
info = client.info()
raw_version_str = info.get("version", {}).get("number", "")
if not raw_version_str:
raise ValueError("Received empty version string from OpenSearch") # noqa: TRY301
cluster_version = parse_version(raw_version_str)
log.debug(f"Detected OpenSearch version: {cluster_version}")
return cluster_version # noqa: TRY300
except Exception:
log.exception("Failed to determine OpenSearch version")
raise

def _get_settings_manager(self, client: OpenSearch) -> OpenSearchSettingsManager:
"""Get settings manager for the given client."""
return OpenSearchSettingsManager(client, self.index_name)

def _get_version_specific_settings(self, cluster_version: Version) -> dict:
"""
Builds and returns a dictionary of applicable version-specific settings.
"""
version_specific_settings = {}
for setting in VERSION_SPECIFIC_SETTING_RULES:
if setting["applies"](cluster_version, self.case_config):
name = setting["name"]
value = setting["value"](self.case_config)
version_specific_settings[name] = value
return version_specific_settings

def _get_bulk_manager(self, client: OpenSearch) -> BulkInsertManager:
"""Get bulk insert manager for the given client."""
return BulkInsertManager(client, self.index_name, self.case_config)
Expand All @@ -241,18 +282,24 @@ def _create_index(self, client: OpenSearch) -> None:
settings_manager.apply_cluster_settings(
cluster_settings, "Successfully updated cluster settings for index creation"
)
# Base settings that are safe for all versions
settings = {
"index": {
"knn": True,
"number_of_shards": self.case_config.number_of_shards,
"number_of_replicas": self.case_config.number_of_replicas,
"translog.flush_threshold_size": self.case_config.flush_threshold_size,
"knn.advanced.approximate_threshold": "-1",
"replication.type": self.case_config.replication_type,
},
"refresh_interval": self.case_config.refresh_interval,
}
settings["index"]["knn.algo_param.ef_search"] = ef_search_value

version_specific_settings = self._get_version_specific_settings(self._get_cluster_version(client))
if version_specific_settings:
log.info(f"Applying version-dependent settings: {version_specific_settings}")
settings["index"].update(version_specific_settings)

# Build properties mapping, excluding _id which is automatically handled by OpenSearch
properties = {}

Expand Down