-
Couldn't load subscription status.
- Fork 18.2k
fix(metadata_service): ensure database rollback on exceptions during metadata operations #27462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
1a00a83
0f18aa2
90068c3
400f287
af72a5c
eaa7de0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,8 @@ | ||
| import logging | ||
| from enum import StrEnum, auto | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BuiltInField(StrEnum): | ||
| document_name = auto() | ||
|
|
@@ -15,3 +18,22 @@ class MetadataDataSource(StrEnum): | |
| notion_import = "notion" | ||
| local_file = "file_upload" | ||
| online_document = "online_document" | ||
| online_drive = "online_drive" | ||
|
|
||
|
|
||
| def get_safe_data_source_value(data_source_type: str) -> str: | ||
| """ | ||
| Safely get data source value for metadata. | ||
| Returns the mapped value if exists in enum, otherwise returns the original value. | ||
|
|
||
| Args: | ||
| data_source_type: The data source type string | ||
|
|
||
| Returns: | ||
| The mapped enum value if exists, otherwise the original value | ||
| """ | ||
| try: | ||
| return MetadataDataSource[data_source_type].value | ||
|
||
| except KeyError: | ||
| logger.warning("Unknown data source type: %s, using original value", data_source_type) | ||
| return data_source_type | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
| from core.helper.name_generator import generate_incremental_name | ||
| from core.model_manager import ModelManager | ||
| from core.model_runtime.entities.model_entities import ModelType | ||
| from core.rag.index_processor.constant.built_in_field import BuiltInField | ||
| from core.rag.index_processor.constant.built_in_field import BuiltInField, get_safe_data_source_value | ||
| from core.rag.index_processor.constant.index_type import IndexType | ||
| from core.rag.retrieval.retrieval_methods import RetrievalMethod | ||
| from events.dataset_event import dataset_was_deleted | ||
|
|
@@ -2010,10 +2010,10 @@ def build_document( | |
| if dataset.built_in_field_enabled: | ||
| doc_metadata = { | ||
| BuiltInField.document_name: name, | ||
| BuiltInField.uploader: account.name, | ||
| BuiltInField.uploader: account.name or "Unknown", | ||
|
||
| BuiltInField.upload_date: datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d %H:%M:%S"), | ||
| BuiltInField.last_update_date: datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d %H:%M:%S"), | ||
| BuiltInField.source: data_source_type, | ||
| BuiltInField.source: get_safe_data_source_value(data_source_type), | ||
| } | ||
| if doc_metadata: | ||
| document.doc_metadata = doc_metadata | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,10 @@ | ||
| import copy | ||
| import logging | ||
|
|
||
| from core.rag.index_processor.constant.built_in_field import BuiltInField, MetadataDataSource | ||
| from core.rag.index_processor.constant.built_in_field import ( | ||
| BuiltInField, | ||
| get_safe_data_source_value, | ||
| ) | ||
| from extensions.ext_database import db | ||
| from extensions.ext_redis import redis_client | ||
| from libs.datetime_utils import naive_utc_now | ||
|
|
@@ -45,7 +48,7 @@ def create_metadata(dataset_id: str, metadata_args: MetadataArgs) -> DatasetMeta | |
| return metadata | ||
|
|
||
| @staticmethod | ||
| def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata: # type: ignore | ||
| def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata | None: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The return types of |
||
| # check if metadata name is too long | ||
| if len(name) > 255: | ||
| raise ValueError("Metadata name cannot exceed 255 characters.") | ||
|
|
@@ -66,7 +69,7 @@ def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> Datase | |
| MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) | ||
| metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first() | ||
| if metadata is None: | ||
| raise ValueError("Metadata not found.") | ||
| return None | ||
| old_name = metadata.name | ||
| metadata.name = name | ||
| metadata.updated_by = current_user.id | ||
|
|
@@ -91,18 +94,20 @@ def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> Datase | |
| db.session.commit() | ||
| return metadata | ||
| except Exception: | ||
| db.session.rollback() | ||
| logger.exception("Update metadata name failed") | ||
| raise | ||
| finally: | ||
| redis_client.delete(lock_key) | ||
|
Comment on lines
96
to
101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pattern of To adhere to the DRY (Don't Repeat Yourself) principle, you could abstract this pattern into a reusable context manager. This would make the intent of the code clearer and reduce the chance of errors. Here's an example of how it could look: from contextlib import contextmanager
@contextmanager
def managed_transaction(lock_key: str, error_message: str):
try:
yield
db.session.commit()
except Exception:
db.session.rollback()
logger.exception(error_message)
raise
finally:
if redis_client.get(lock_key):
redis_client.delete(lock_key)Using this, the |
||
|
|
||
| @staticmethod | ||
| def delete_metadata(dataset_id: str, metadata_id: str): | ||
| def delete_metadata(dataset_id: str, metadata_id: str) -> DatasetMetadata | None: | ||
| lock_key = f"dataset_metadata_lock_{dataset_id}" | ||
| try: | ||
| MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) | ||
| metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first() | ||
| if metadata is None: | ||
| raise ValueError("Metadata not found.") | ||
| return None | ||
| db.session.delete(metadata) | ||
|
|
||
| # deal related documents | ||
|
|
@@ -123,7 +128,9 @@ def delete_metadata(dataset_id: str, metadata_id: str): | |
| db.session.commit() | ||
| return metadata | ||
| except Exception: | ||
| db.session.rollback() | ||
| logger.exception("Delete metadata failed") | ||
| raise | ||
| finally: | ||
| redis_client.delete(lock_key) | ||
|
|
||
|
|
@@ -153,16 +160,18 @@ def enable_built_in_field(dataset: Dataset): | |
| else: | ||
| doc_metadata = copy.deepcopy(document.doc_metadata) | ||
| doc_metadata[BuiltInField.document_name] = document.name | ||
| doc_metadata[BuiltInField.uploader] = document.uploader | ||
| doc_metadata[BuiltInField.uploader] = document.uploader or "Unknown" | ||
|
||
| doc_metadata[BuiltInField.upload_date] = document.upload_date.timestamp() | ||
| doc_metadata[BuiltInField.last_update_date] = document.last_update_date.timestamp() | ||
| doc_metadata[BuiltInField.source] = MetadataDataSource[document.data_source_type] | ||
| doc_metadata[BuiltInField.source] = get_safe_data_source_value(document.data_source_type) | ||
| document.doc_metadata = doc_metadata | ||
| db.session.add(document) | ||
| dataset.built_in_field_enabled = True | ||
| db.session.commit() | ||
| except Exception: | ||
| db.session.rollback() | ||
| logger.exception("Enable built-in field failed") | ||
| raise | ||
| finally: | ||
| redis_client.delete(lock_key) | ||
|
|
||
|
|
@@ -193,7 +202,9 @@ def disable_built_in_field(dataset: Dataset): | |
| dataset.built_in_field_enabled = False | ||
| db.session.commit() | ||
| except Exception: | ||
| db.session.rollback() | ||
| logger.exception("Disable built-in field failed") | ||
| raise | ||
| finally: | ||
| redis_client.delete(lock_key) | ||
|
|
||
|
|
@@ -203,21 +214,25 @@ def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperation | |
| lock_key = f"document_metadata_lock_{operation.document_id}" | ||
| try: | ||
| MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id) | ||
| document = DocumentService.get_document(dataset.id, operation.document_id) | ||
| try: | ||
| document = DocumentService.get_document(dataset.id, operation.document_id) | ||
| except Exception as e: | ||
| logger.warning("Failed to get document %s: %s", operation.document_id, str(e)) | ||
| continue | ||
|
Comment on lines
+219
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Catching a broad |
||
| if document is None: | ||
| raise ValueError("Document not found.") | ||
| logger.warning("Document not found: %s", operation.document_id) | ||
| continue | ||
| doc_metadata = {} | ||
| for metadata_value in operation.metadata_list: | ||
| doc_metadata[metadata_value.name] = metadata_value.value | ||
| if dataset.built_in_field_enabled: | ||
| doc_metadata[BuiltInField.document_name] = document.name | ||
| doc_metadata[BuiltInField.uploader] = document.uploader | ||
| doc_metadata[BuiltInField.uploader] = document.uploader or "Unknown" | ||
| doc_metadata[BuiltInField.upload_date] = document.upload_date.timestamp() | ||
| doc_metadata[BuiltInField.last_update_date] = document.last_update_date.timestamp() | ||
| doc_metadata[BuiltInField.source] = MetadataDataSource[document.data_source_type] | ||
| doc_metadata[BuiltInField.source] = get_safe_data_source_value(document.data_source_type) | ||
| document.doc_metadata = doc_metadata | ||
| db.session.add(document) | ||
| db.session.commit() | ||
| # deal metadata binding | ||
| db.session.query(DatasetMetadataBinding).filter_by(document_id=operation.document_id).delete() | ||
| current_user, current_tenant_id = current_account_with_tenant() | ||
|
|
@@ -232,7 +247,9 @@ def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperation | |
| db.session.add(dataset_metadata_binding) | ||
| db.session.commit() | ||
| except Exception: | ||
| db.session.rollback() | ||
| logger.exception("Update documents metadata failed") | ||
| raise | ||
liugddx marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
234
to
+252
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The For a more robust batch operation, you might consider a strategy where the failure of one document does not stop the processing of others. You could collect the errors for each failed document and report them all at the end, allowing successful updates to proceed. |
||
| finally: | ||
| redis_client.delete(lock_key) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Using a hardcoded string 'Unknown' for missing uploader data may cause issues with data consistency and internationalization. Consider defining this as a constant (e.g.,
UNKNOWN_UPLOADER = 'Unknown') at the module level or in a constants file to ensure consistency across the codebase and easier maintenance.