Skip to content

Commit 355088f

Browse files
committed
Add meaningful exception handling, report errors
1 parent 0a9d96e commit 355088f

File tree

6 files changed

+210
-20
lines changed

6 files changed

+210
-20
lines changed

src/servicex_did_finder_lib/communication.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,37 @@
1+
# Copyright (c) 2021-2025, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
129
import argparse
230
from datetime import datetime
331
import json
432
import logging
533
import time
6-
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional
34+
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, Union
735
import sys
836

937
import pika
@@ -15,7 +43,8 @@
1543
from .servicex_adaptor import ServiceXAdapter
1644

1745
# The type for the callback method to handle DID's, supplied by the user.
18-
UserDIDHandler = Callable[[str, Dict[str, Any]], AsyncGenerator[Dict[str, Any], None]]
46+
UserDIDHandler = Callable[[str, Dict[str, Any]], AsyncGenerator[Union[Dict[str, Any],
47+
List[Dict[str, Any]]], None]]
1948

2049
# Given name, build the RabbitMQ queue name by appending this.
2150
# This is backed into how ServiceX works - do not change unless it

src/servicex_did_finder_lib/did_finder_app.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2022, IRIS-HEP
1+
# Copyright (c) 2022-2025, IRIS-HEP
22
# All rights reserved.
33
#
44
# Redistribution and use in source and binary forms, with or without
@@ -36,6 +36,7 @@
3636
from servicex_did_finder_lib.did_summary import DIDSummary
3737
from servicex_did_finder_lib.servicex_adaptor import ServiceXAdapter
3838
from servicex_did_finder_lib.util_uri import parse_did_uri
39+
from servicex_did_finder_lib import exceptions
3940

4041
# The type for the callback method to handle DID's, supplied by the user.
4142
# Arguments are:
@@ -102,14 +103,7 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U
102103

103104
if did_info.file_count > 0: # otherwise wait until all files arrive then limit results
104105
acc.send_on(did_info.file_count)
105-
except Exception:
106-
# noinspection PyTypeChecker
107-
self.logger.error(
108-
f"Error processing DID {did}",
109-
extra={"dataset_id": dataset_id},
110-
exc_info=1
111-
)
112-
finally:
106+
113107
elapsed_time = int((datetime.now() - start_time).total_seconds())
114108
servicex.put_fileset_complete(
115109
{
@@ -120,6 +114,23 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U
120114
"elapsed-time": elapsed_time,
121115
}
122116
)
117+
except Exception as e:
118+
# noinspection PyTypeChecker
119+
self.logger.error(
120+
f"Error processing DID {did}",
121+
extra={"dataset_id": dataset_id},
122+
exc_info=1
123+
)
124+
elapsed_time = int((datetime.now() - start_time).total_seconds())
125+
error_dict: dict[str, Any] = {"elapsed-time": elapsed_time,
126+
"message": str(e)}
127+
if isinstance(e, exceptions.BaseDIDFinderException):
128+
error_dict["error-type"] = e.error_type
129+
else:
130+
error_dict["error-type"] = "internal_failure"
131+
servicex.put_fileset_error(
132+
error_dict
133+
)
123134

124135

125136
class DIDFinderApp(Celery):
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright (c) 2025, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
29+
# Define some exceptions to indicate various problems in DID finding
30+
31+
class BaseDIDFinderException(Exception):
32+
""" Base exception type """
33+
transient: bool = True
34+
error_type: str = ""
35+
36+
37+
class NoSuchDatasetException(BaseDIDFinderException):
38+
""" The dataset does not exist in the catalog """
39+
transient = True
40+
error_type = "does_not_exist"
41+
42+
43+
class BadDatasetNameException(BaseDIDFinderException):
44+
""" The specified dataset name is invalid """
45+
transient = False
46+
error_type = "bad_name"
47+
48+
49+
class LookupFailureException(BaseDIDFinderException):
50+
""" There was a failure when looking up the dataset in the catalog """
51+
transient = True
52+
error_type = "internal_failure"

src/servicex_did_finder_lib/servicex_adaptor.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2019, IRIS-HEP
1+
# Copyright (c) 2019-2025, IRIS-HEP
22
# All rights reserved.
33
#
44
# Redistribution and use in source and binary forms, with or without
@@ -74,6 +74,10 @@ def put_file_add_bulk(self, file_list, chunk_length=300):
7474
self.logger.error(f'After {attempts} tries, failed to send ServiceX App '
7575
f'a put_file_bulk message: {mesg} - Ignoring error.')
7676

77+
def put_file_add(self, file):
78+
# add one file
79+
self.put_file_add_bulk([file])
80+
7781
def put_fileset_complete(self, summary):
7882
success = False
7983
attempts = 0
@@ -88,3 +92,18 @@ def put_fileset_complete(self, summary):
8892
if not success:
8993
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a put_file '
9094
f'message: {str(summary)} - Ignoring error.')
95+
96+
def put_fileset_error(self, summary):
97+
success = False
98+
attempts = 0
99+
while not success and attempts < MAX_RETRIES:
100+
try:
101+
requests.put(f"{self.endpoint}{self.dataset_id}/error", json=summary)
102+
success = True
103+
except requests.exceptions.ConnectionError:
104+
self.logger.exception(f'Connection error to ServiceX App. Will retry '
105+
f'(try {attempts} out of {MAX_RETRIES}')
106+
attempts += 1
107+
if not success:
108+
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a put_file '
109+
f'message: {str(summary)} - Ignoring error.')

tests/servicex_did_finder_lib_tests/test_did_finder_app.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2024, IRIS-HEP
1+
# Copyright (c) 2024-2025, IRIS-HEP
22
# All rights reserved.
33
#
44
# Redistribution and use in source and binary forms, with or without
@@ -31,6 +31,7 @@
3131

3232
from servicex_did_finder_lib.accumulator import Accumulator
3333
from servicex_did_finder_lib.did_finder_app import DIDFinderTask, DIDFinderApp
34+
from servicex_did_finder_lib import exceptions
3435

3536

3637
@pytest.fixture()
@@ -74,11 +75,15 @@ def test_did_finder_task(mocker, servicex, single_file_info):
7475
)
7576

7677

77-
def test_did_finder_task_exception(mocker, servicex, single_file_info):
78+
@pytest.mark.parametrize("exc", [Exception("Boom"),
79+
exceptions.BadDatasetNameException("Bad name"),
80+
exceptions.LookupFailureException("Boom 2"),
81+
exceptions.NoSuchDatasetException("Not there")])
82+
def test_did_finder_task_exception(mocker, servicex, exc, single_file_info):
7883
did_finder_task = DIDFinderTask()
7984
# did_finder_task.app = mocker.Mock()
8085
did_finder_task.app.did_finder_args = {}
81-
mock_generator = mocker.Mock(side_effect=Exception("Boom"))
86+
mock_generator = mocker.Mock(side_effect=exc)
8287

8388
mock_accumulator = mocker.MagicMock(Accumulator)
8489
with patch(
@@ -92,13 +97,14 @@ def test_did_finder_task_exception(mocker, servicex, single_file_info):
9297
mock_accumulator.add.assert_not_called()
9398
mock_accumulator.send_on.assert_not_called()
9499

95-
servicex.return_value.put_fileset_complete.assert_called_with(
100+
error_type_str = (exc.error_type
101+
if isinstance(exc, exceptions.BaseDIDFinderException)
102+
else "internal_failure")
103+
servicex.return_value.put_fileset_error.assert_called_with(
96104
{
97-
"files": 0, # Aught to have a side effect in mock accumulator that updates this
98-
"files-skipped": 0,
99-
"total-events": 0,
100-
"total-bytes": 0,
101105
"elapsed-time": 0,
106+
"error-type": error_type_str,
107+
"message": str(exc),
102108
}
103109
)
104110

tests/servicex_did_finder_lib_tests/test_servicex_adaptor.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,30 @@
1+
# Copyright (c) 2024-2025, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
128
import json
229

330
import requests
@@ -142,3 +169,49 @@ def request_callback(request):
142169
"elapsed-time": 10
143170
})
144171
assert len(responses.calls) == 3 # Max retries
172+
173+
174+
@responses.activate
175+
def test_put_file_error():
176+
call_count = 0
177+
178+
def request_callback(request):
179+
nonlocal call_count
180+
call_count += 1
181+
182+
if call_count == 1:
183+
raise requests.exceptions.ConnectionError("Connection failed")
184+
else:
185+
return (206, {}, "")
186+
187+
responses.add_callback(responses.PUT,
188+
'http://servicex.org/12345/error',
189+
callback=request_callback)
190+
191+
sx = ServiceXAdapter("http://servicex.org/", '12345')
192+
sx.put_fileset_error({
193+
"error-type": "bad_name",
194+
"elapsed-time": 10
195+
})
196+
assert len(responses.calls) == 1 + 1 # 1 retry
197+
submitted = json.loads(responses.calls[0].request.body)
198+
assert submitted['error-type'] == "bad_name"
199+
assert submitted['elapsed-time'] == 10
200+
201+
202+
@responses.activate
203+
def test_put_file_error_failure():
204+
205+
def request_callback(request):
206+
raise requests.exceptions.ConnectionError("Connection failed")
207+
208+
responses.add_callback(responses.PUT,
209+
'http://servicex.org/12345/error',
210+
callback=request_callback)
211+
212+
sx = ServiceXAdapter("http://servicex.org/", '12345')
213+
sx.put_fileset_error({
214+
"error-type": "bad_name",
215+
"elapsed-time": 10
216+
})
217+
assert len(responses.calls) == 3 # Max retries

0 commit comments

Comments
 (0)