-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Expose failed commit count and exceptions in BaseCommitService #14872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| package org.apache.iceberg.actions; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
|
|
@@ -57,6 +58,7 @@ abstract class BaseCommitService<T> implements Closeable { | |
| private final ConcurrentLinkedQueue<T> completedRewrites; | ||
| private final ConcurrentLinkedQueue<String> inProgressCommits; | ||
| private final ConcurrentLinkedQueue<T> committedRewrites; | ||
| private final List<Exception> exceptionsOfFailedCommits; | ||
| private final int rewritesPerCommit; | ||
| private final AtomicBoolean running = new AtomicBoolean(false); | ||
| private final long timeoutInMS; | ||
|
|
@@ -94,6 +96,7 @@ abstract class BaseCommitService<T> implements Closeable { | |
| completedRewrites = Queues.newConcurrentLinkedQueue(); | ||
| committedRewrites = Queues.newConcurrentLinkedQueue(); | ||
| inProgressCommits = Queues.newConcurrentLinkedQueue(); | ||
| exceptionsOfFailedCommits = Collections.synchronizedList(Lists.newArrayList()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -231,6 +234,7 @@ private void commitReadyCommitGroups() { | |
| succeededCommits++; | ||
| } catch (Exception e) { | ||
| LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e); | ||
| exceptionsOfFailedCommits.add(e); | ||
| } | ||
| inProgressCommits.remove(inProgressCommitToken); | ||
| } | ||
|
|
@@ -240,6 +244,14 @@ public int succeededCommits() { | |
| return succeededCommits; | ||
| } | ||
|
|
||
| public int failedCommits() { | ||
| return exceptionsOfFailedCommits.size(); | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think we should add some tests for this behaviour, but am happy to get buy-in first for this change before we do that.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at the tests of TestCommitService. We test the main behaviour but tests don't test minor/simple details like succeededCommits so I think it's fine to leave it as it is because what we do is just incrementing another variable and adding exceptions into a new list. |
||
| public List<Exception> exceptionsOfFailedCommits() { | ||
| return Lists.newArrayList(exceptionsOfFailedCommits); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just return exceptionsOfFailedCommits?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a defensive copy which prevents callers from mutating internal state. |
||
| } | ||
|
|
||
| @VisibleForTesting | ||
| boolean canCreateCommitGroup() { | ||
| // Either we have a full commit group, or we have completed writing and need to commit | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might have to be a synchronizedList right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Committer service looks to be executed with a single thread through
Executors.newSingleThreadExecutorHowever; to be consistent and making it resistant to future changes, I'm making it
Collections.synchronizedList(Lists.newArrayList());