diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 0d55e8ad..ebd5958a 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -31,7 +31,7 @@ build_unique_component_name, ) from reana_db.database import Session -from reana_db.models import Job, JobCache, Workflow, RunStatus +from reana_db.models import Job, JobCache, Workflow, RunStatus, JobStatus from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm.attributes import flag_modified @@ -118,14 +118,27 @@ def on_message(self, body, message): ) elif workflow and workflow.status not in ALIVE_STATUSES: logging.warning( - f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n" - f"{body}\nIgnoring..." + f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received." ) + try: + _delete_workflow_engine_batch_job(workflow) + logging.info( + f"Removed batch-pod for not alive {workflow.id_} workflow." + ) + except REANAWorkflowControllerError as exception: + logging.error( + f"Could not clean up not alive workflow {workflow.id_} batch pod for workflow." + f" Error: {exception}" + ) + + _delete_workflow_step_jobs(workflow) else: - logging.warning( + logging.error( f"Event for workflow {workflow_uuid} that doesn't exist in DB received:\n" f"{body}\nIgnoring..." ) + import time + time.sleep(5) except REANAWorkflowControllerError as rwce: logging.error(rwce, exc_info=True) except SQLAlchemyError as sae: @@ -162,7 +175,7 @@ def _update_workflow_status(workflow, status, logs): if RunStatus.should_cleanup_job(status): try: - _delete_workflow_job(workflow) + _delete_workflow_engine_batch_job(workflow) except REANAWorkflowControllerError as exception: logging.error( f"Could not clean up workflow job for workflow {workflow.id_}." @@ -281,7 +294,7 @@ def _update_job_cache(msg): Session.add(cached_job) -def _delete_workflow_job(workflow: Workflow) -> None: +def _delete_workflow_engine_batch_job(workflow: Workflow) -> None: job_name = build_unique_component_name("run-batch", workflow.id_) try: current_k8s_batchv1_api_client.delete_namespaced_job( @@ -295,6 +308,36 @@ def _delete_workflow_job(workflow: Workflow) -> None: ) +def _delete_workflow_step_jobs(workflow: Workflow) -> None: + """ + Delete step jobs that belong to workflow. Ignore Kubernetes API errors if occur. + If job deletion request is successfull, update its status to JobStatus.stopped. + """ + jobs = Session.query(Job).filter( + Job.workflow_uuid == workflow.id_, + ) + for job in jobs: + job_name = build_unique_component_name("run-job", job.id_) + if job.status in [ + JobStatus.running, + JobStatus.queued, + JobStatus.created, + ]: + try: + current_k8s_batchv1_api_client.delete_namespaced_job( + name=job_name, + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + propagation_policy="Background", + ) + job.status = JobStatus.stopped + Session.add(job) + except ApiException as e: + logging.error( + f"run-job pod {job_name} for {workflow.id_} could not be deleted. Error: {e}" + ) + Session.commit() + + def _get_workflow_engine_pod_logs(workflow: Workflow) -> str: try: pods = current_k8s_corev1_api_client.list_namespaced_pod(