Skip to content

Commit ae32eb8

Browse files
author
Vladyslav Moisieienkov
committed
job-status-consumer, log DB status of "not alive" workflows
addresses #437
1 parent 9a943cd commit ae32eb8

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

reana_workflow_controller/consumer.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2018, 2019, 2020, 2021 CERN.
4+
# Copyright (C) 2018, 2019, 2020, 2021, 2022 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -87,22 +87,19 @@ def on_message(self, body, message):
8787
Session.query(Workflow)
8888
.filter(
8989
Workflow.id_ == workflow_uuid,
90-
Workflow.status.in_(ALIVE_STATUSES),
9190
)
9291
.one_or_none()
9392
)
94-
if workflow:
93+
if workflow and workflow.status in ALIVE_STATUSES:
9594
next_status = body_dict.get("status")
9695
if next_status:
9796
next_status = RunStatus(next_status)
9897
logging.info(
99-
" [x] Received workflow_uuid: {0} status: {1}".format(
100-
workflow_uuid, next_status
101-
)
98+
f" [x] Received workflow_uuid: {workflow_uuid} status: {next_status}"
10299
)
103100

104-
logs = body_dict.get("logs") or ""
105101
if workflow.can_transition_to(next_status):
102+
logs = body_dict.get("logs") or ""
106103
_update_workflow_status(workflow, next_status, logs)
107104
if "message" in body_dict and body_dict.get("message"):
108105
msg = body_dict["message"]
@@ -119,17 +116,21 @@ def on_message(self, body, message):
119116
f" from status {workflow.status} to"
120117
f" {next_status}."
121118
)
122-
elif workflow_uuid:
119+
elif workflow and workflow.status not in ALIVE_STATUSES:
123120
logging.warning(
124-
"Event for not alive workflow {workflow_uuid} received:\n"
125-
"{body}\n"
126-
"Ignoring ...".format(workflow_uuid=workflow_uuid, body=body)
121+
f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n"
122+
f"{body}\nIgnoring..."
123+
)
124+
else:
125+
logging.warning(
126+
f"Event for workflow {workflow_uuid} that doesn't exist in DB received:\n"
127+
f"{body}\nIgnoring..."
127128
)
128129
except REANAWorkflowControllerError as rwce:
129130
logging.error(rwce, exc_info=True)
130131
except SQLAlchemyError as sae:
131132
logging.error(
132-
f"Something went wrong while querying the database for workflow: {workflow.id_}"
133+
f"Something went wrong while querying the database for workflow: {workflow_uuid}"
133134
)
134135
logging.error(sae, exc_info=True)
135136
except Exception as e:

0 commit comments

Comments
 (0)