diff --git a/servicex/models.py b/servicex/models.py index 23b9bd27..3bad43e1 100644 --- a/servicex/models.py +++ b/servicex/models.py @@ -94,6 +94,7 @@ class Status(str, Enum): looking = ("Lookup",) pending = "Pending Lookup" running = "Running" + bad_dataset = "Bad Dataset" class TransformRequest(DocStringBaseModel): @@ -145,6 +146,7 @@ class TransformStatus(DocStringBaseModel): request_id: str did: str + did_id: int title: Optional[str] = None selection: str tree_name: Optional[str] = Field(validation_alias="tree-name") diff --git a/servicex/query_core.py b/servicex/query_core.py index ad2341ef..2bbd29d6 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -55,7 +55,7 @@ from make_it_sync import make_sync -DONE_STATUS = (Status.complete, Status.canceled, Status.fatal) +DONE_STATUS = (Status.complete, Status.canceled, Status.fatal, Status.bad_dataset) ProgressIndicators = Union[Progress, ExpandableProgress] logger = logging.getLogger(__name__) shell_handler = RichHandler(markup=True) @@ -452,6 +452,25 @@ async def transform_status_listener( else "" ) if self.current_status.status == Status.complete: + if self.current_status.files == 0: + err_str = ( + f"Transform {titlestr}completed with 0 files.\n " + "This indicates there were no files in the input dataset " + f"{self.current_status.did}." + "Is this intentional? " + ) + logger.warning(err_str) + if self.current_status.log_url is not None: + kibana_link = create_kibana_link_parameters( + self.current_status.log_url, + self.current_status.request_id, + LogLevel.error, + TimeFrame.month, + ) + logger.warning( + f"More logfiles of '{self.title}' [bold red on white]" + f"[link={kibana_link}]HERE[/link][/bold red on white]" + ) if self.files_failed: bar = "failure" else: @@ -482,7 +501,51 @@ async def transform_status_listener( f"{err_str}\nMore logfiles of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]" # NOQA: E501 ) raise ServiceXException(err_str) + elif self.current_status.status == Status.bad_dataset: + msg_map = { + "does_not_exist": "Dataset does not exist", + "bad_name": "Bad dataset specification", + "internal_failure": "Internal failure during lookup", + } + # get dataset info + try: + dataset_info = await self.servicex.get_dataset( + self.current_status.did_id + ) + except Exception as e: + logger.error( + f"Request {titlestr} failed due to a dataset problem, and " + f"an error ({e}) was encountered while trying to get more details. " + f"{self.current_status.files_completed}/{self.current_status.files} " + f"files completed" + ) + err_str = ( + f"Request {titlestr} failed due to a dataset problem. " + "Further information not available" + ) + else: + logger.error( + f"Request {titlestr} failed due to a dataset problem: " + f"{msg_map[dataset_info.lookup_status]}. " + f"{self.current_status.files_completed}/{self.current_status.files} " + f"files completed" + ) + err_str = ( + f"Request {titlestr} failed due to a dataset problem: " + f"{msg_map[dataset_info.lookup_status]}" + ) + if self.current_status.log_url is not None: + kibana_link = create_kibana_link_parameters( + self.current_status.log_url, + self.current_status.request_id, + LogLevel.error, + TimeFrame.month, + ) + logger.error( + f"More logfiles of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]" # NOQA: E501 + ) + raise ServiceXException(err_str) else: err_str = f"Fatal issue in ServiceX server for request {titlestr}" if self.current_status.log_url is not None: diff --git a/tests/app/test_transforms.py b/tests/app/test_transforms.py index eb4e3910..fd77c2fa 100644 --- a/tests/app/test_transforms.py +++ b/tests/app/test_transforms.py @@ -48,6 +48,7 @@ def transform_status_record() -> TransformStatus: base_data = { "request_id": "test-request-123", "did": "test-did-456", + "did_id": 1, "title": "Test Transform Job", "selection": "(muon_pt > 20)", "tree-name": "mytree", diff --git a/tests/conftest.py b/tests/conftest.py index c152b6cf..b535e233 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -105,6 +105,7 @@ def transform_status_response() -> dict: { "request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf", "did": "File List Provided in Request", + "did_id": 1, "columns": None, "selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA "tree-name": None, @@ -135,6 +136,7 @@ def completed_status() -> TransformStatus: **{ "request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf", "did": "File List Provided in Request", + "did_id": 1, "columns": None, "selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA "tree-name": None, diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 1bf2446f..1e17017b 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -38,6 +38,7 @@ from servicex.query_cache import QueryCache from servicex.expandable_progress import ExpandableProgress from servicex.query_core import ServiceXException +from servicex.models import CachedDataset from pathlib import Path from servicex.models import ( @@ -250,6 +251,120 @@ async def test_transform_status_listener_cancelled(python_dataset): assert python_dataset.files_failed == 1 +@pytest.mark.asyncio +async def test_transform_status_listener_bad_dataset(python_dataset): + progress = Mock(spec=Progress) + progress_task = Mock() + download_task = Mock() + status = Mock(files=0, files_completed=0, files_failed=0, status=Status.bad_dataset) + python_dataset.current_status = status + python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status) + bad_dataset = CachedDataset( + id=42, + name="test_dataset", + did_finder="some_finder", + n_files=2, + size=3072, + events=300, + last_used=datetime.datetime.now(), + last_updated=datetime.datetime.now(), + lookup_status="does_not_exist", + is_stale=True, + files=[], + ) + python_dataset.servicex = Mock() + python_dataset.servicex.get_dataset = AsyncMock(return_value=bad_dataset) + with pytest.raises(ServiceXException, match=r"dataset problem"): + with patch( + "servicex.app.transforms.create_kibana_link_parameters" + ) as mock_link: + await python_dataset.transform_status_listener( + progress, progress_task, "mock_title", download_task, "mock_title" + ) + mock_link.assert_called_once() + python_dataset.retrieve_current_transform_status.assert_awaited_once() + assert python_dataset.files_completed == 0 + assert python_dataset.files_failed == 0 + + +@pytest.mark.asyncio +async def test_transform_status_listener_bad_dataset_bad_lookup(python_dataset): + progress = Mock(spec=Progress) + progress_task = Mock() + download_task = Mock() + status = Mock(files=0, files_completed=0, files_failed=0, status=Status.bad_dataset) + python_dataset.current_status = status + python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status) + python_dataset.servicex = Mock() + python_dataset.servicex.get_dataset = AsyncMock(side_effect=Exception) + with pytest.raises(ServiceXException, match=r"Further information not available"): + with patch( + "servicex.app.transforms.create_kibana_link_parameters" + ) as mock_link: + await python_dataset.transform_status_listener( + progress, progress_task, "mock_title", download_task, "mock_title" + ) + mock_link.assert_called_once() + python_dataset.retrieve_current_transform_status.assert_awaited_once() + assert python_dataset.files_completed == 0 + assert python_dataset.files_failed == 0 + + +@pytest.mark.asyncio +async def test_transform_status_listener_zero_files_no_log_url(python_dataset): + progress = Mock(spec=Progress) + progress_task = Mock() + download_task = Mock() + status = Mock( + files=0, + files_completed=0, + files_failed=0, + status=Status.complete, + log_url=None, + title="Test Transform", + ) + python_dataset.current_status = status + python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status) + with patch("servicex.query_core.logger") as mock_logger: + + await python_dataset.transform_status_listener( + progress, progress_task, "mock_title", download_task, "mock_title" + ) + mock_logger.warning.assert_called_once() + + python_dataset.retrieve_current_transform_status.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_transform_status_listener_zero_files_with_log_url(python_dataset): + progress = Mock(spec=Progress) + progress_task = Mock() + download_task = Mock() + status = Mock( + files=0, + files_completed=0, + files_failed=0, + status=Status.complete, + log_url="http://example.com/logs", + request_id="test-request-123", + title="Test Transform", + ) + python_dataset.current_status = status + python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status) + + with patch("servicex.app.transforms.create_kibana_link_parameters") as mock_link: + with patch("servicex.query_core.logger") as mock_logger: + mock_link.return_value = "http://kibana.example.com/link" + await python_dataset.transform_status_listener( + progress, progress_task, "mock_title", download_task, "mock_title" + ) + + mock_link.assert_called_once() + mock_logger.warning.assert_called() + + python_dataset.retrieve_current_transform_status.assert_awaited_once() + + @pytest.mark.asyncio async def test_retrieve_current_transform_status_status_none( python_dataset, completed_status diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 4299a3c2..fc852c17 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -55,6 +55,7 @@ **{ "request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf", "did": "File List Provided in Request", + "did_id": 1, "columns": None, "selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA 501 "tree-name": None,