-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Restore ability to mark task groups as success/failed in UI #60161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@arjav1528 Thank you for working on that! 👍 |
5db3cd8 to
c9d177e
Compare
is the UI ok or do you want me to refactor that |
bbovenzi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this on! Do you mind doing more manually testing to find where MarkAs isn't quite working? I think there are some issues with the API that we need to tackle before we can merge any UI changes.
...low-core/src/airflow/ui/src/components/MarkAs/TaskInstance/MarkGroupTaskInstanceAsDialog.tsx
Outdated
Show resolved
Hide resolved
...low-core/src/airflow/ui/src/components/MarkAs/TaskInstance/MarkGroupTaskInstanceAsDialog.tsx
Outdated
Show resolved
Hide resolved
providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
Outdated
Show resolved
Hide resolved
b068b63 to
da03292
Compare
bbovenzi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried using this locally and still had issues. Could you both manually test this out and also add fastapi test for dry runs?
Sorry about the trouble. I’ve tested this locally and couldn’t reproduce the issue. I have reproduced the issue, solved it, you can run it locally now |
d24fdae to
34153d8
Compare
@bbovenzi I am not sure why is this test failing, coz I havent made any changes related to this |
bbovenzi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking much better! Just a few more comments
...low-core/src/airflow/ui/src/components/MarkAs/TaskInstance/MarkGroupTaskInstanceAsDialog.tsx
Outdated
Show resolved
Hide resolved
...low-core/src/airflow/ui/src/components/MarkAs/TaskInstance/MarkGroupTaskInstanceAsDialog.tsx
Outdated
Show resolved
Hide resolved
…alog components for task instance state management
…task instance handling
…ces in MarkGroupTaskInstanceAsDialog
…rs to preview changes without affecting state
…oks to ensure correct functionality
…dering and improve dialog behavior
…ateTaskInstancesDryRun for better debugging and monitoring
…updates, covering various scenarios including authentication, request validation, and state changes
…ect running task state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for that pull request. People have been asking for this feature back a lot! 🎉
For the two reasons mentioned bellow (crafting the payload requires all the TIs of the group which isn't easy to get on the client side because of pagination + the return type of the API will create confusion).
I think the best approach is to create a couple of endpoint specifically for patching a TI group. (dry_run and non dry_run version). Refactoring and reusing the code of the patch ti endpoint, because it already returns a TaskInstanceCollectionResponse (for the scenario of patching all mapped index at once).
This should make this PR much simpler. (basically just a simple call to patch, with the group id + payload, and apply the change to all tasks within the group)
| const { data: groupTaskInstances } = useTaskInstanceServiceGetTaskInstances( | ||
| { | ||
| dagId, | ||
| dagRunId: runId, | ||
| taskGroupId: groupId, | ||
| }, | ||
| undefined, | ||
| { | ||
| enabled: open, | ||
| }, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance by doing this, you only retrieve the first page, the 50 first TI of the group, not all the TIs, so you could be missing some.
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
Show resolved
Hide resolved
…or task group updates and dry run functionality
…ooks and update request body validation
|
@bbovenzi are there any changes left?, would love to implement them |
jason810496
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for the PR!
| if not group_tis: | ||
| raise HTTPException( | ||
| status.HTTP_404_NOT_FOUND, | ||
| f"No task instances found for task group '{task_group_id}' in DAG '{dag_id}' and run '{dag_run_id}'", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to move the if not group_tis: raise ... case handling to _get_task_group_task_instances as well?
| # Collect all affected task instances (including upstream/downstream/future/past) | ||
| all_affected_tis: list[TI] = [] | ||
| seen_ti_keys: set[tuple[str, str, str, int]] = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we could consolidate all_affected_tis: list and seen_ti_keys: set as a dict.
| # Update state and get all affected TIs (including upstream/downstream/future/past) | ||
| updated_tis = dag.set_task_instance_state( | ||
| task_id=ti.task_id, | ||
| run_id=dag_run_id, | ||
| map_indexes=map_indexes, | ||
| state=data["new_state"], | ||
| upstream=body.include_upstream or False, | ||
| downstream=body.include_downstream or False, | ||
| future=body.include_future or False, | ||
| past=body.include_past or False, | ||
| commit=True, | ||
| session=session, | ||
| ) | ||
|
|
||
| if not updated_tis: | ||
| raise HTTPException( | ||
| status.HTTP_409_CONFLICT, | ||
| f"Task id {ti.task_id} is already in {data['new_state']} state", | ||
| ) | ||
|
|
||
| # Track unique affected TIs and trigger listeners | ||
| for updated_ti in updated_tis: | ||
| ti_key = ( | ||
| updated_ti.dag_id, | ||
| updated_ti.run_id, | ||
| updated_ti.task_id, | ||
| updated_ti.map_index if updated_ti.map_index is not None else -1, | ||
| ) | ||
| if ti_key not in seen_ti_keys: | ||
| seen_ti_keys.add(ti_key) | ||
| all_affected_tis.append(updated_ti) | ||
|
|
||
| # Trigger listeners | ||
| try: | ||
| if data["new_state"] == TaskInstanceState.SUCCESS: | ||
| get_listener_manager().hook.on_task_instance_success( | ||
| previous_state=None, task_instance=updated_ti | ||
| ) | ||
| elif data["new_state"] == TaskInstanceState.FAILED: | ||
| get_listener_manager().hook.on_task_instance_failed( | ||
| previous_state=None, | ||
| task_instance=updated_ti, | ||
| error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", | ||
| ) | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use _patch_task_instance_state helper here I think.
airflow/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
Lines 96 to 97 in a51ba2f
| def _patch_task_instance_state( | |
| task_id: str, |
| # Process each task in the group | ||
| for ti in group_tis: | ||
| # Update state if requested | ||
| if data.get("new_state"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the check be
if data.get("new_state"):
# For each task in the group, simulate state change
for ti in group_tis:here? So that we can skip whole the loop if we don't update the state.
| # Refresh the affected TIs from the database to get the latest state and notes | ||
| ti_keys_list = list(seen_ti_keys) | ||
| refreshed_tis = session.scalars( | ||
| select(TI) | ||
| .where(tuple_(TI.dag_id, TI.run_id, TI.task_id, TI.map_index).in_(ti_keys_list)) | ||
| .options(joinedload(TI.rendered_task_instance_fields)) | ||
| ).all() | ||
| all_affected_tis = list(refreshed_tis) | ||
|
|
||
| # Also include group TIs that had notes updated but weren't in the state change list | ||
| # (to avoid duplicates, only add TIs not already in seen_ti_keys) | ||
| if data.get("note") is not None: | ||
| for ti in group_tis: | ||
| ti_key = ( | ||
| ti.dag_id, | ||
| ti.run_id, | ||
| ti.task_id, | ||
| ti.map_index if ti.map_index is not None else -1, | ||
| ) | ||
| if ti_key not in seen_ti_keys: | ||
| seen_ti_keys.add(ti_key) | ||
| all_affected_tis.append(ti) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, we are able to consolidate as one dict instead of having list and set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jason810496 I have applied all the changes, do review them, and merge as per your availability after the CI checks
…o the service layer
…ate update logic and removing listener triggers
…ion for improved clarity and reuse in task instance patching
Description
This PR adds two new React components that restore the task group marking functionality:
MarkGroupTaskInstanceAsButton- A dropdown button component that allows users to select a state (success/failed) and opens a dialog for confirmationMarkGroupTaskInstanceAsDialog- A dialog component that provides:The button is integrated into the Group Task Instance page header, alongside the existing Clear button.
Screenshots
Solves #60121