Refactor DAG file queuing and fix redundant processing #60124
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Renamed
add_files_to_queueto_add_new_files_to_queueinDagFileProcessorManagerto reduce confusion with_add_files_to_queueand better reflect its internal usage for newly discovered files.We also call the method only after a bundle has refreshed - we can't find new files without that, so doing it in every loop is wasteful.
The method now checks
_processorsin addition to_file_statsbefore adding files. This prevents a race condition where files currently being processed (which don't yet have stats) were erroneously re-added to the parsing queue.The method also now results in the
dag_processing.file_path_queue_sizegauge being emitted after adding new files to the queue, and reduces log noise by having a single log line vs one per file.