diff --git a/install/requirements_py3.11.txt b/install/requirements_py3.11.txt index 0ae328a6f..be01230e7 100644 --- a/install/requirements_py3.11.txt +++ b/install/requirements_py3.11.txt @@ -24,4 +24,5 @@ scikit-learn pymilvus clickhouse_connect pyvespa -mysql-connector-python \ No newline at end of file +mysql-connector-python +packaging \ No newline at end of file diff --git a/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py b/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py index 091b8ff45..d0ad3a61f 100644 --- a/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py +++ b/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py @@ -5,6 +5,8 @@ from typing import Any, Final from opensearchpy import OpenSearch +from packaging.version import Version +from packaging.version import parse as parse_version from vectordb_bench.backend.filter import Filter, FilterOp @@ -17,6 +19,16 @@ WAITING_FOR_FORCE_MERGE_SEC: Final[int] = 30 SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC: Final[int] = 30 +# Central registry for version-dependent OpenSearch index settings. +# Add new rules here to automatically support future versions. +VERSION_SPECIFIC_SETTING_RULES = [ + { + "name": "knn.advanced.approximate_threshold", + "applies": lambda version, _: version >= Version("3.0"), + "value": lambda _: "-1", + }, +] + class OpenSearchError(Exception): """Custom exception for OpenSearch operations.""" @@ -216,10 +228,39 @@ def need_normalize_cosine(self) -> bool: """Whether this database needs to normalize dataset to support COSINE metric.""" return True + def _get_cluster_version(self, client: OpenSearch) -> Version: + """ + Return the OpenSearch cluster version as a comparable Version object. + Raises an exception if the version cannot be determined. + """ + try: + info = client.info() + raw_version_str = info.get("version", {}).get("number", "") + if not raw_version_str: + raise ValueError("Received empty version string from OpenSearch") # noqa: TRY301 + cluster_version = parse_version(raw_version_str) + log.debug(f"Detected OpenSearch version: {cluster_version}") + return cluster_version # noqa: TRY300 + except Exception: + log.exception("Failed to determine OpenSearch version") + raise + def _get_settings_manager(self, client: OpenSearch) -> OpenSearchSettingsManager: """Get settings manager for the given client.""" return OpenSearchSettingsManager(client, self.index_name) + def _get_version_specific_settings(self, cluster_version: Version) -> dict: + """ + Builds and returns a dictionary of applicable version-specific settings. + """ + version_specific_settings = {} + for setting in VERSION_SPECIFIC_SETTING_RULES: + if setting["applies"](cluster_version, self.case_config): + name = setting["name"] + value = setting["value"](self.case_config) + version_specific_settings[name] = value + return version_specific_settings + def _get_bulk_manager(self, client: OpenSearch) -> BulkInsertManager: """Get bulk insert manager for the given client.""" return BulkInsertManager(client, self.index_name, self.case_config) @@ -241,18 +282,24 @@ def _create_index(self, client: OpenSearch) -> None: settings_manager.apply_cluster_settings( cluster_settings, "Successfully updated cluster settings for index creation" ) + # Base settings that are safe for all versions settings = { "index": { "knn": True, "number_of_shards": self.case_config.number_of_shards, "number_of_replicas": self.case_config.number_of_replicas, "translog.flush_threshold_size": self.case_config.flush_threshold_size, - "knn.advanced.approximate_threshold": "-1", "replication.type": self.case_config.replication_type, }, "refresh_interval": self.case_config.refresh_interval, } settings["index"]["knn.algo_param.ef_search"] = ef_search_value + + version_specific_settings = self._get_version_specific_settings(self._get_cluster_version(client)) + if version_specific_settings: + log.info(f"Applying version-dependent settings: {version_specific_settings}") + settings["index"].update(version_specific_settings) + # Build properties mapping, excluding _id which is automatically handled by OpenSearch properties = {}