Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions servicex/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class Status(str, Enum):
looking = ("Lookup",)
pending = "Pending Lookup"
running = "Running"
bad_dataset = "Bad Dataset"


class TransformRequest(DocStringBaseModel):
Expand Down Expand Up @@ -145,6 +146,7 @@ class TransformStatus(DocStringBaseModel):

request_id: str
did: str
did_id: int
title: Optional[str] = None
selection: str
tree_name: Optional[str] = Field(validation_alias="tree-name")
Expand Down
65 changes: 64 additions & 1 deletion servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

from make_it_sync import make_sync

DONE_STATUS = (Status.complete, Status.canceled, Status.fatal)
DONE_STATUS = (Status.complete, Status.canceled, Status.fatal, Status.bad_dataset)
ProgressIndicators = Union[Progress, ExpandableProgress]
logger = logging.getLogger(__name__)
shell_handler = RichHandler(markup=True)
Expand Down Expand Up @@ -452,6 +452,25 @@ async def transform_status_listener(
else ""
)
if self.current_status.status == Status.complete:
if self.current_status.files == 0:
err_str = (
f"Transform {titlestr}completed with 0 files.\n "
"This indicates there were no files in the input dataset "
f"{self.current_status.did}."
"Is this intentional? "
)
logger.warning(err_str)
if self.current_status.log_url is not None:
kibana_link = create_kibana_link_parameters(
self.current_status.log_url,
self.current_status.request_id,
LogLevel.error,
TimeFrame.month,
)
logger.warning(
f"More logfiles of '{self.title}' [bold red on white]"
f"[link={kibana_link}]HERE[/link][/bold red on white]"
)
if self.files_failed:
bar = "failure"
else:
Expand Down Expand Up @@ -482,7 +501,51 @@ async def transform_status_listener(
f"{err_str}\nMore logfiles of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]" # NOQA: E501
)
raise ServiceXException(err_str)
elif self.current_status.status == Status.bad_dataset:
msg_map = {
"does_not_exist": "Dataset does not exist",
"bad_name": "Bad dataset specification",
"internal_failure": "Internal failure during lookup",
}
# get dataset info
try:
dataset_info = await self.servicex.get_dataset(
self.current_status.did_id
)
except Exception as e:
logger.error(
f"Request {titlestr} failed due to a dataset problem, and "
f"an error ({e}) was encountered while trying to get more details. "
f"{self.current_status.files_completed}/{self.current_status.files} "
f"files completed"
)
err_str = (
f"Request {titlestr} failed due to a dataset problem. "
"Further information not available"
)
else:
logger.error(
f"Request {titlestr} failed due to a dataset problem: "
f"{msg_map[dataset_info.lookup_status]}. "
f"{self.current_status.files_completed}/{self.current_status.files} "
f"files completed"
)

err_str = (
f"Request {titlestr} failed due to a dataset problem: "
f"{msg_map[dataset_info.lookup_status]}"
)
if self.current_status.log_url is not None:
kibana_link = create_kibana_link_parameters(
self.current_status.log_url,
self.current_status.request_id,
LogLevel.error,
TimeFrame.month,
)
logger.error(
f"More logfiles of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]" # NOQA: E501
)
raise ServiceXException(err_str)
else:
err_str = f"Fatal issue in ServiceX server for request {titlestr}"
if self.current_status.log_url is not None:
Expand Down
1 change: 1 addition & 0 deletions tests/app/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def transform_status_record() -> TransformStatus:
base_data = {
"request_id": "test-request-123",
"did": "test-did-456",
"did_id": 1,
"title": "Test Transform Job",
"selection": "(muon_pt > 20)",
"tree-name": "mytree",
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def transform_status_response() -> dict:
{
"request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf",
"did": "File List Provided in Request",
"did_id": 1,
"columns": None,
"selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA
"tree-name": None,
Expand Down Expand Up @@ -135,6 +136,7 @@ def completed_status() -> TransformStatus:
**{
"request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf",
"did": "File List Provided in Request",
"did_id": 1,
"columns": None,
"selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA
"tree-name": None,
Expand Down
115 changes: 115 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from servicex.query_cache import QueryCache
from servicex.expandable_progress import ExpandableProgress
from servicex.query_core import ServiceXException
from servicex.models import CachedDataset

from pathlib import Path
from servicex.models import (
Expand Down Expand Up @@ -250,6 +251,120 @@ async def test_transform_status_listener_cancelled(python_dataset):
assert python_dataset.files_failed == 1


@pytest.mark.asyncio
async def test_transform_status_listener_bad_dataset(python_dataset):
progress = Mock(spec=Progress)
progress_task = Mock()
download_task = Mock()
status = Mock(files=0, files_completed=0, files_failed=0, status=Status.bad_dataset)
python_dataset.current_status = status
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)
bad_dataset = CachedDataset(
id=42,
name="test_dataset",
did_finder="some_finder",
n_files=2,
size=3072,
events=300,
last_used=datetime.datetime.now(),
last_updated=datetime.datetime.now(),
lookup_status="does_not_exist",
is_stale=True,
files=[],
)
python_dataset.servicex = Mock()
python_dataset.servicex.get_dataset = AsyncMock(return_value=bad_dataset)
with pytest.raises(ServiceXException, match=r"dataset problem"):
with patch(
"servicex.app.transforms.create_kibana_link_parameters"
) as mock_link:
await python_dataset.transform_status_listener(
progress, progress_task, "mock_title", download_task, "mock_title"
)
mock_link.assert_called_once()
python_dataset.retrieve_current_transform_status.assert_awaited_once()
assert python_dataset.files_completed == 0
assert python_dataset.files_failed == 0


@pytest.mark.asyncio
async def test_transform_status_listener_bad_dataset_bad_lookup(python_dataset):
progress = Mock(spec=Progress)
progress_task = Mock()
download_task = Mock()
status = Mock(files=0, files_completed=0, files_failed=0, status=Status.bad_dataset)
python_dataset.current_status = status
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)
python_dataset.servicex = Mock()
python_dataset.servicex.get_dataset = AsyncMock(side_effect=Exception)
with pytest.raises(ServiceXException, match=r"Further information not available"):
with patch(
"servicex.app.transforms.create_kibana_link_parameters"
) as mock_link:
await python_dataset.transform_status_listener(
progress, progress_task, "mock_title", download_task, "mock_title"
)
mock_link.assert_called_once()
python_dataset.retrieve_current_transform_status.assert_awaited_once()
assert python_dataset.files_completed == 0
assert python_dataset.files_failed == 0


@pytest.mark.asyncio
async def test_transform_status_listener_zero_files_no_log_url(python_dataset):
progress = Mock(spec=Progress)
progress_task = Mock()
download_task = Mock()
status = Mock(
files=0,
files_completed=0,
files_failed=0,
status=Status.complete,
log_url=None,
title="Test Transform",
)
python_dataset.current_status = status
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)
with patch("servicex.query_core.logger") as mock_logger:

await python_dataset.transform_status_listener(
progress, progress_task, "mock_title", download_task, "mock_title"
)
mock_logger.warning.assert_called_once()

python_dataset.retrieve_current_transform_status.assert_awaited_once()


@pytest.mark.asyncio
async def test_transform_status_listener_zero_files_with_log_url(python_dataset):
progress = Mock(spec=Progress)
progress_task = Mock()
download_task = Mock()
status = Mock(
files=0,
files_completed=0,
files_failed=0,
status=Status.complete,
log_url="http://example.com/logs",
request_id="test-request-123",
title="Test Transform",
)
python_dataset.current_status = status
python_dataset.retrieve_current_transform_status = AsyncMock(return_value=status)

with patch("servicex.app.transforms.create_kibana_link_parameters") as mock_link:
with patch("servicex.query_core.logger") as mock_logger:
mock_link.return_value = "http://kibana.example.com/link"
await python_dataset.transform_status_listener(
progress, progress_task, "mock_title", download_task, "mock_title"
)

mock_link.assert_called_once()
mock_logger.warning.assert_called()

python_dataset.retrieve_current_transform_status.assert_awaited_once()


@pytest.mark.asyncio
async def test_retrieve_current_transform_status_status_none(
python_dataset, completed_status
Expand Down
1 change: 1 addition & 0 deletions tests/test_servicex_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
**{
"request_id": "b8c508d0-ccf2-4deb-a1f7-65c839eebabf",
"did": "File List Provided in Request",
"did_id": 1,
"columns": None,
"selection": "(Where (SelectMany (call EventDataset) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (and (> (/ (call (attr j 'pt')) 1000) 20) (< (call abs (/ (call (attr j 'eta')) 1000)) 4.5))))", # NOQA 501
"tree-name": None,
Expand Down