Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
build_unique_component_name,
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, RunStatus
from reana_db.models import Job, JobCache, Workflow, RunStatus, JobStatus
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.attributes import flag_modified

Expand Down Expand Up @@ -118,14 +118,27 @@ def on_message(self, body, message):
)
elif workflow and workflow.status not in ALIVE_STATUSES:
logging.warning(
f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n"
f"{body}\nIgnoring..."
f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received."
)
try:
_delete_workflow_engine_batch_job(workflow)
logging.info(
f"Removed batch-pod for not alive {workflow.id_} workflow."
)
except REANAWorkflowControllerError as exception:
logging.error(
f"Could not clean up not alive workflow {workflow.id_} batch pod for workflow."
f" Error: {exception}"
)

_delete_workflow_step_jobs(workflow)
else:
logging.warning(
logging.error(
f"Event for workflow {workflow_uuid} that doesn't exist in DB received:\n"
f"{body}\nIgnoring..."
)
import time
time.sleep(5)
except REANAWorkflowControllerError as rwce:
logging.error(rwce, exc_info=True)
except SQLAlchemyError as sae:
Expand Down Expand Up @@ -162,7 +175,7 @@ def _update_workflow_status(workflow, status, logs):

if RunStatus.should_cleanup_job(status):
try:
_delete_workflow_job(workflow)
_delete_workflow_engine_batch_job(workflow)
except REANAWorkflowControllerError as exception:
logging.error(
f"Could not clean up workflow job for workflow {workflow.id_}."
Expand Down Expand Up @@ -281,7 +294,7 @@ def _update_job_cache(msg):
Session.add(cached_job)


def _delete_workflow_job(workflow: Workflow) -> None:
def _delete_workflow_engine_batch_job(workflow: Workflow) -> None:
job_name = build_unique_component_name("run-batch", workflow.id_)
try:
current_k8s_batchv1_api_client.delete_namespaced_job(
Expand All @@ -295,6 +308,36 @@ def _delete_workflow_job(workflow: Workflow) -> None:
)


def _delete_workflow_step_jobs(workflow: Workflow) -> None:
"""
Delete step jobs that belong to workflow. Ignore Kubernetes API errors if occur.
If job deletion request is successfull, update its status to JobStatus.stopped.
"""
jobs = Session.query(Job).filter(
Job.workflow_uuid == workflow.id_,
)
for job in jobs:
job_name = build_unique_component_name("run-job", job.id_)
if job.status in [
JobStatus.running,
JobStatus.queued,
JobStatus.created,
]:
try:
current_k8s_batchv1_api_client.delete_namespaced_job(
name=job_name,
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
propagation_policy="Background",
)
job.status = JobStatus.stopped
Session.add(job)
except ApiException as e:
logging.error(
f"run-job pod {job_name} for {workflow.id_} could not be deleted. Error: {e}"
)
Session.commit()


def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
try:
pods = current_k8s_corev1_api_client.list_namespaced_pod(
Expand Down