diff --git a/servicex_app/servicex_app/models.py b/servicex_app/servicex_app/models.py index 6e78c4827..ad2d79bb3 100644 --- a/servicex_app/servicex_app/models.py +++ b/servicex_app/servicex_app/models.py @@ -157,6 +157,7 @@ class TransformStatus(Enum): complete = ("Complete", True) fatal = ("Fatal", True) canceled = ("Canceled", True) + bad_dataset = ("Bad Dataset", True) def __init__(self, string_name, is_complete): self.string_name = string_name @@ -404,6 +405,9 @@ class DatasetStatus(str, Enum): created = "created" looking = "looking" complete = "complete" + does_not_exist = "does_not_exist" + bad_name = "bad_name" + internal_failure = "internal_failure" class Dataset(db.Model): diff --git a/servicex_app/servicex_app/resources/internal/fileset_error.py b/servicex_app/servicex_app/resources/internal/fileset_error.py new file mode 100644 index 000000000..ee16a30c5 --- /dev/null +++ b/servicex_app/servicex_app/resources/internal/fileset_error.py @@ -0,0 +1,119 @@ +# Copyright (c) 2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from flask import request, current_app + +from servicex_app.models import ( + Dataset, + db, + TransformRequest, + TransformStatus, + DatasetStatus, +) +from servicex_app.resources.servicex_resource import ServiceXResource + +from datetime import datetime, timezone + + +class FilesetError(ServiceXResource): + @classmethod + def make_api(cls, lookup_result_processor, transformer_manager): + cls.lookup_result_processor = lookup_result_processor + cls.transformer_manager = transformer_manager + return cls + + def put(self, dataset_id): + summary = request.get_json() + dataset = Dataset.find_by_id(int(dataset_id)) + + if dataset is None: + current_app.logger.info( + "Dataset lookup error received for unknown dataset", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "_message": summary["message"], + }, + ) + return "", 422 + + current_app.logger.info( + "Error in file lookup", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "_message": summary["message"], + }, + ) + + dataset.lookup_status = DatasetStatus(summary["error-type"]) + dataset.stale = True # Repeat lookup if we try again + db.session.commit() + + # shut down related transformations. Nothing good can come of letting them + # continue to run + namespace = current_app.config["TRANSFORMER_NAMESPACE"] + for running_request in TransformRequest.lookup_running_by_dataset_id( + int(dataset_id) + ): + running_request.status = TransformStatus.bad_dataset + running_request.finish_time = datetime.now(tz=timezone.utc) + self.transformer_manager.shutdown_transformer_job( + running_request.request_id, namespace + ) + current_app.logger.info( + "Shutting down transformer because of dataset lookup problem", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "_message": summary["message"], + "requestId": running_request.request_id, + }, + ) + + # Tell any other transform that was waiting for the lookup to complete + # not to expect to run + for pending_transform in TransformRequest.lookup_pending_on_dataset( + int(dataset_id) + ): + pending_transform.status = TransformStatus.bad_dataset + pending_transform.finish_time = datetime.now(tz=timezone.utc) + current_app.logger.info( + "Shutting down transformer because of dataset lookup problem", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "_message": summary["message"], + "requestId": pending_transform.request_id, + }, + ) + + db.session.commit() diff --git a/servicex_app/servicex_app/routes.py b/servicex_app/servicex_app/routes.py index 8fbeb1f10..148946dbd 100644 --- a/servicex_app/servicex_app/routes.py +++ b/servicex_app/servicex_app/routes.py @@ -49,6 +49,7 @@ def add_routes( from servicex_app.resources.internal.add_file_to_dataset import AddFileToDataset from servicex_app.resources.internal.fileset_complete import FilesetComplete + from servicex_app.resources.internal.fileset_error import FilesetError from servicex_app.resources.internal.transform_status import ( TransformationStatusInternal, ) @@ -183,6 +184,12 @@ def add_routes( "/servicex/internal/transformation//complete", ) + FilesetError.make_api(lookup_result_processor, transformer_manager) + api.add_resource( + FilesetError, + "/servicex/internal/transformation//error", + ) + TransformerFileComplete.make_api(transformer_manager) api.add_resource( TransformerFileComplete, diff --git a/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py b/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py new file mode 100644 index 000000000..679234eac --- /dev/null +++ b/servicex_app/servicex_app_test/resources/internal/test_fileset_error.py @@ -0,0 +1,139 @@ +# Copyright (c) 2019, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from datetime import timezone, datetime + +from servicex_app import LookupResultProcessor, TransformerManager +from servicex_app_test.resource_test_base import ResourceTestBase + +from servicex_app.models import ( + DatasetStatus, + Dataset, + TransformRequest, + TransformStatus, +) +from pytest import fixture, mark + + +class TestFilesetError(ResourceTestBase): + @fixture + def mock_find_dataset_by_id(self, mocker): + dm = mocker.Mock() + dm.dataset = Dataset( + name="rucio://my-did?files=1", + did_finder="rucio", + lookup_status=DatasetStatus.looking, + last_used=datetime.now(tz=timezone.utc), + last_updated=datetime.fromtimestamp(0), + ) + + dm.name = "rucio://my-did?files=1" + dm.id = 42 + + mock_find_by_id = mocker.patch.object(Dataset, "find_by_id", return_value=dm) + return mock_find_by_id + + @mark.parametrize("error", ["does_not_exist", "bad_name", "internal_failure"]) + def test_put_fileset_error(self, mocker, mock_find_dataset_by_id, error): + dataset = mock_find_dataset_by_id.return_value + + pending_request = TransformRequest() + pending_request.status = TransformStatus.pending_lookup + mock_lookup_pending = mocker.patch.object( + TransformRequest, + "lookup_pending_on_dataset", + return_value=[pending_request], + ) + + lookup_request = TransformRequest() + lookup_request.status = TransformStatus.lookup + mock_lookup_running = mocker.patch.object( + TransformRequest, + "lookup_running_by_dataset_id", + return_value=[lookup_request], + ) + mock_processor = mocker.MagicMock(LookupResultProcessor) + mock_transformer_manager = mocker.MagicMock(TransformerManager) + mock_transformer_manager.shutdown_transformer_job = mocker.Mock() + + client = self._test_client( + lookup_result_processor=mock_processor, + transformation_manager=mock_transformer_manager, + ) + + response = client.put( + "/servicex/internal/transformation/1234/error", + json={"elapsed-time": 0, "error-type": error, "message": "honk"}, + ) + assert response.status_code == 200 + mock_find_dataset_by_id.assert_called_once_with(1234) + assert dataset.lookup_status == DatasetStatus(error) + assert dataset.stale + + mock_lookup_pending.assert_called_once_with(1234) + mock_lookup_running.assert_called_once_with(1234) + assert pending_request.status == TransformStatus.bad_dataset + assert lookup_request.status == TransformStatus.bad_dataset + + def test_put_fileset_error_invalid_did(self, mocker): + pending_request = TransformRequest() + pending_request.status = TransformStatus.pending_lookup + mock_lookup_pending = mocker.patch.object( + TransformRequest, + "lookup_pending_on_dataset", + return_value=[pending_request], + ) + + lookup_request = TransformRequest() + lookup_request.status = TransformStatus.lookup + mock_lookup_running = mocker.patch.object( + TransformRequest, + "lookup_running_by_dataset_id", + return_value=[lookup_request], + ) + + mock_find_dataset_by_id = mocker.patch.object( + Dataset, "find_by_id", return_value=None + ) + mock_processor = mocker.MagicMock(LookupResultProcessor) + mock_transformer_manager = mocker.MagicMock(TransformerManager) + mock_transformer_manager.shutdown_transformer_job = mocker.Mock() + + client = self._test_client( + lookup_result_processor=mock_processor, + transformation_manager=mock_transformer_manager, + ) + + response = client.put( + "/servicex/internal/transformation/1234/error", + json={"elapsed-time": 0, "error-type": "bad_dataset", "message": "honk"}, + ) + assert response.status_code == 422 + mock_find_dataset_by_id.assert_called_once_with(1234) + + mock_lookup_pending.assert_not_called() + mock_lookup_running.assert_not_called()