Skip to content

Commit caf1a4a

Browse files
authored
Updates to handle bad dataset exceptions on the server side (#674)
* Updates to handle bad dataset exceptions on the server side; give DID in warning message
1 parent a51d825 commit caf1a4a

File tree

6 files changed

+185
-1
lines changed

6 files changed

+185
-1
lines changed

servicex/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class Status(str, Enum):
9494
looking = ("Lookup",)
9595
pending = "Pending Lookup"
9696
running = "Running"
97+
bad_dataset = "Bad Dataset"
9798

9899

99100
class TransformRequest(DocStringBaseModel):
@@ -145,6 +146,7 @@ class TransformStatus(DocStringBaseModel):
145146

146147
request_id: str
147148
did: str
149+
did_id: int
148150
title: Optional[str] = None
149151
selection: str
150152
tree_name: Optional[str] = Field(validation_alias="tree-name")

servicex/query_core.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555

5656
from make_it_sync import make_sync
5757

58-
DONE_STATUS = (Status.complete, Status.canceled, Status.fatal)
58+
DONE_STATUS = (Status.complete, Status.canceled, Status.fatal, Status.bad_dataset)
5959
ProgressIndicators = Union[Progress, ExpandableProgress]
6060
logger = logging.getLogger(__name__)
6161
shell_handler = RichHandler(markup=True)
@@ -464,6 +464,25 @@ async def transform_status_listener(
464464
else ""
465465
)
466466
if self.current_status.status == Status.complete:
467+
if self.current_status.files == 0:
468+
err_str = (
469+
f"Transform {titlestr}completed with 0 files.\n "
470+
"This indicates there were no files in the input dataset "
471+
f"{self.current_status.did}."
472+
"Is this intentional? "
473+
)
474+
logger.warning(err_str)
475+
if self.current_status.log_url is not None:
476+
kibana_link = create_kibana_link_parameters(
477+
self.current_status.log_url,
478+
self.current_status.request_id,
479+
LogLevel.error,
480+
TimeFrame.month,
481+
)
482+
logger.warning(
483+
f"More logfiles of '{self.title}' [bold red on white]"
484+
f"[link={kibana_link}]HERE[/link][/bold red on white]"
485+
)
467486
if self.files_failed:
468487
bar = "failure"
469488
else:
@@ -494,7 +513,51 @@ async def transform_status_listener(
494513
f"{err_str}\nMore logfiles of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]" # NOQA: E501
495514
)
496515
raise ServiceXException(err_str)
516+
elif self.current_status.status == Status.bad_dataset:
517+
msg_map = {
518+
"does_not_exist": "Dataset does not exist",
519+
"bad_name": "Bad dataset specification",
520+
"internal_failure": "Internal failure during lookup",
521+
}
522+
# get dataset info
523+
try:
524+
dataset_info = await self.servicex.get_dataset(
525+
self.current_status.did_id
526+
)
527+
except Exception as e:
528+
logger.error(
529+
f"Request {titlestr} failed due to a dataset problem, and "
530+
f"an error ({e}) was encountered while trying to get more details. "
531+
f"{self.current_status.files_completed}/{self.current_status.files} "
532+
f"files completed"
533+
)
534+
err_str = (
535+
f"Request {titlestr} failed due to a dataset problem. "
536+
"Further information not available"
537+
)
538+
else:
539+
logger.error(
540+
f"Request {titlestr} failed due to a dataset problem: "
541+
f"{msg_map[dataset_info.lookup_status]}. "
542+
f"{self.current_status.files_completed}/{self.current_status.files} "
543+
f"files completed"
544+
)
497545

546+
err_str = (
547+
f"Request {titlestr} failed due to a dataset problem: "
548+
f"{msg_map[dataset_info.lookup_status]}"
549+
)
550+
if self.current_status.log_url is not None:
551+
kibana_link = create_kibana_link_parameters(
552+
self.current_status.log_url,
553+
self.current_status.request_id,
554+
LogLevel.error,
555+
TimeFrame.month,
556+
)
557+
logger.error(
558+
f"More logfiles of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]" # NOQA: E501
559+
)
560+
raise ServiceXException(err_str)
498561
else:
499562
err_str = f"Fatal issue in ServiceX server for request {titlestr}"
500563
if self.current_status.log_url is not None:

tests/app/test_transforms.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def transform_status_record() -> TransformStatus:
4848
base_data = {
4949
"request_id": "test-request-123",
5050
"did": "test-did-456",
51+
"did_id": 1,
5152
"title": "Test Transform Job",
5253
"selection": "(muon_pt > 20)",
5354
"tree-name": "mytree",

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def transform_status_response() -> dict:
105105
{
106106
"request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf",
107107
"did": "File List Provided in Request",
108+
"did_id": 1,
108109
"columns": None,
109110
"selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA
110111
"tree-name": None,
@@ -135,6 +136,7 @@ def completed_status() -> TransformStatus:
135136
**{
136137
"request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf",
137138
"did": "File List Provided in Request",
139+
"did_id": 1,
138140
"columns": None,
139141
"selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA
140142
"tree-name": None,

tests/test_dataset.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from servicex.query_cache import QueryCache
3939
from servicex.expandable_progress import ExpandableProgress
4040
from servicex.query_core import ServiceXException
41+
from servicex.models import CachedDataset
4142

4243
from pathlib import Path
4344
from servicex.models import (
@@ -257,6 +258,120 @@ async def test_transform_status_listener_cancelled(python_dataset):
257258
assert python_dataset.files_failed == 1
258259

259260

261+
@pytest.mark.asyncio
262+
async def test_transform_status_listener_bad_dataset(python_dataset):
263+
progress = Mock(spec=Progress)
264+
progress_task = Mock()
265+
download_task = Mock()
266+
status = Mock(files=0, files_completed=0, files_failed=0, status=Status.bad_dataset)
267+
python_dataset.current_status = status
268+
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)
269+
bad_dataset = CachedDataset(
270+
id=42,
271+
name="test_dataset",
272+
did_finder="some_finder",
273+
n_files=2,
274+
size=3072,
275+
events=300,
276+
last_used=datetime.datetime.now(),
277+
last_updated=datetime.datetime.now(),
278+
lookup_status="does_not_exist",
279+
is_stale=True,
280+
files=[],
281+
)
282+
python_dataset.servicex = Mock()
283+
python_dataset.servicex.get_dataset = AsyncMock(return_value=bad_dataset)
284+
with pytest.raises(ServiceXException, match=r"dataset problem"):
285+
with patch(
286+
"servicex.app.transforms.create_kibana_link_parameters"
287+
) as mock_link:
288+
await python_dataset.transform_status_listener(
289+
progress, progress_task, "mock_title", download_task, "mock_title"
290+
)
291+
mock_link.assert_called_once()
292+
python_dataset.retrieve_current_transform_status.assert_awaited_once()
293+
assert python_dataset.files_completed == 0
294+
assert python_dataset.files_failed == 0
295+
296+
297+
@pytest.mark.asyncio
298+
async def test_transform_status_listener_bad_dataset_bad_lookup(python_dataset):
299+
progress = Mock(spec=Progress)
300+
progress_task = Mock()
301+
download_task = Mock()
302+
status = Mock(files=0, files_completed=0, files_failed=0, status=Status.bad_dataset)
303+
python_dataset.current_status = status
304+
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)
305+
python_dataset.servicex = Mock()
306+
python_dataset.servicex.get_dataset = AsyncMock(side_effect=Exception)
307+
with pytest.raises(ServiceXException, match=r"Further information not available"):
308+
with patch(
309+
"servicex.app.transforms.create_kibana_link_parameters"
310+
) as mock_link:
311+
await python_dataset.transform_status_listener(
312+
progress, progress_task, "mock_title", download_task, "mock_title"
313+
)
314+
mock_link.assert_called_once()
315+
python_dataset.retrieve_current_transform_status.assert_awaited_once()
316+
assert python_dataset.files_completed == 0
317+
assert python_dataset.files_failed == 0
318+
319+
320+
@pytest.mark.asyncio
321+
async def test_transform_status_listener_zero_files_no_log_url(python_dataset):
322+
progress = Mock(spec=Progress)
323+
progress_task = Mock()
324+
download_task = Mock()
325+
status = Mock(
326+
files=0,
327+
files_completed=0,
328+
files_failed=0,
329+
status=Status.complete,
330+
log_url=None,
331+
title="Test Transform",
332+
)
333+
python_dataset.current_status = status
334+
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)
335+
with patch("servicex.query_core.logger") as mock_logger:
336+
337+
await python_dataset.transform_status_listener(
338+
progress, progress_task, "mock_title", download_task, "mock_title"
339+
)
340+
mock_logger.warning.assert_called_once()
341+
342+
python_dataset.retrieve_current_transform_status.assert_awaited_once()
343+
344+
345+
@pytest.mark.asyncio
346+
async def test_transform_status_listener_zero_files_with_log_url(python_dataset):
347+
progress = Mock(spec=Progress)
348+
progress_task = Mock()
349+
download_task = Mock()
350+
status = Mock(
351+
files=0,
352+
files_completed=0,
353+
files_failed=0,
354+
status=Status.complete,
355+
log_url="http://example.com/logs",
356+
request_id="test-request-123",
357+
title="Test Transform",
358+
)
359+
python_dataset.current_status = status
360+
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)
361+
362+
with patch("servicex.app.transforms.create_kibana_link_parameters") as mock_link:
363+
with patch("servicex.query_core.logger") as mock_logger:
364+
mock_link.return_value = "http://kibana.example.com/link"
365+
await python_dataset.transform_status_listener(
366+
progress, progress_task, "mock_title", download_task, "mock_title"
367+
)
368+
369+
mock_link.assert_called_once()
370+
mock_logger.warning.assert_called()
371+
372+
python_dataset.retrieve_current_transform_status.assert_awaited_once()
373+
374+
260375
@pytest.mark.asyncio
261376
async def test_retrieve_current_transform_status_status_none(
262377
python_dataset, completed_status

tests/test_servicex_dataset.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def _sx_mock() -> AsyncMock:
6565
**{
6666
"request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf",
6767
"did": "File List Provided in Request",
68+
"did_id": 1,
6869
"columns": None,
6970
"selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA 501
7071
"tree-name": None,

0 commit comments

Comments
 (0)