3131 build_unique_component_name ,
3232)
3333from reana_db .database import Session
34- from reana_db .models import Job , JobCache , Workflow , RunStatus
34+ from reana_db .models import Job , JobCache , Workflow , RunStatus , JobStatus
3535from sqlalchemy .exc import SQLAlchemyError
3636from sqlalchemy .orm .attributes import flag_modified
3737
@@ -118,14 +118,27 @@ def on_message(self, body, message):
118118 )
119119 elif workflow and workflow .status not in ALIVE_STATUSES :
120120 logging .warning (
121- f"Event for not alive workflow { workflow .id_ } with DB status { workflow .status } received:\n "
122- f"{ body } \n Ignoring..."
121+ f"Event for not alive workflow { workflow .id_ } with DB status { workflow .status } received."
123122 )
123+ try :
124+ _delete_workflow_engine_batch_job (workflow )
125+ logging .info (
126+ f"Removed batch-pod for not alive { workflow .id_ } workflow."
127+ )
128+ except REANAWorkflowControllerError as exception :
129+ logging .error (
130+ f"Could not clean up not alive workflow { workflow .id_ } batch pod for workflow."
131+ f" Error: { exception } "
132+ )
133+
134+ _delete_workflow_step_jobs (workflow )
124135 else :
125- logging .warning (
136+ logging .error (
126137 f"Event for workflow { workflow_uuid } that doesn't exist in DB received:\n "
127138 f"{ body } \n Ignoring..."
128139 )
140+ import time
141+ time .sleep (5 )
129142 except REANAWorkflowControllerError as rwce :
130143 logging .error (rwce , exc_info = True )
131144 except SQLAlchemyError as sae :
@@ -162,7 +175,7 @@ def _update_workflow_status(workflow, status, logs):
162175
163176 if RunStatus .should_cleanup_job (status ):
164177 try :
165- _delete_workflow_job (workflow )
178+ _delete_workflow_engine_batch_job (workflow )
166179 except REANAWorkflowControllerError as exception :
167180 logging .error (
168181 f"Could not clean up workflow job for workflow { workflow .id_ } ."
@@ -281,7 +294,7 @@ def _update_job_cache(msg):
281294 Session .add (cached_job )
282295
283296
284- def _delete_workflow_job (workflow : Workflow ) -> None :
297+ def _delete_workflow_engine_batch_job (workflow : Workflow ) -> None :
285298 job_name = build_unique_component_name ("run-batch" , workflow .id_ )
286299 try :
287300 current_k8s_batchv1_api_client .delete_namespaced_job (
@@ -295,6 +308,36 @@ def _delete_workflow_job(workflow: Workflow) -> None:
295308 )
296309
297310
311+ def _delete_workflow_step_jobs (workflow : Workflow ) -> None :
312+ """
313+ Delete step jobs that belong to workflow. Ignore Kubernetes API errors if occur.
314+ If job deletion request is successfull, update its status to JobStatus.stopped.
315+ """
316+ jobs = Session .query (Job ).filter (
317+ Job .workflow_uuid == workflow .id_ ,
318+ )
319+ for job in jobs :
320+ job_name = build_unique_component_name ("run-job" , job .id_ )
321+ if job .status in [
322+ JobStatus .running ,
323+ JobStatus .queued ,
324+ JobStatus .created ,
325+ ]:
326+ try :
327+ current_k8s_batchv1_api_client .delete_namespaced_job (
328+ name = job_name ,
329+ namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
330+ propagation_policy = "Background" ,
331+ )
332+ job .status = JobStatus .stopped
333+ Session .add (job )
334+ except ApiException as e :
335+ logging .error (
336+ f"run-job pod { job_name } for { workflow .id_ } could not be deleted. Error: { e } "
337+ )
338+ Session .commit ()
339+
340+
298341def _get_workflow_engine_pod_logs (workflow : Workflow ) -> str :
299342 try :
300343 pods = current_k8s_corev1_api_client .list_namespaced_pod (
0 commit comments