diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java index d45bbfb4a667..47fb537b7f5d 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java @@ -19,6 +19,7 @@ package org.apache.iceberg.actions; import java.io.Closeable; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -57,6 +58,7 @@ abstract class BaseCommitService implements Closeable { private final ConcurrentLinkedQueue completedRewrites; private final ConcurrentLinkedQueue inProgressCommits; private final ConcurrentLinkedQueue committedRewrites; + private final List exceptionsOfFailedCommits; private final int rewritesPerCommit; private final AtomicBoolean running = new AtomicBoolean(false); private final long timeoutInMS; @@ -94,6 +96,7 @@ abstract class BaseCommitService implements Closeable { completedRewrites = Queues.newConcurrentLinkedQueue(); committedRewrites = Queues.newConcurrentLinkedQueue(); inProgressCommits = Queues.newConcurrentLinkedQueue(); + exceptionsOfFailedCommits = Collections.synchronizedList(Lists.newArrayList()); } /** @@ -231,6 +234,7 @@ private void commitReadyCommitGroups() { succeededCommits++; } catch (Exception e) { LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e); + exceptionsOfFailedCommits.add(e); } inProgressCommits.remove(inProgressCommitToken); } @@ -240,6 +244,14 @@ public int succeededCommits() { return succeededCommits; } + public int failedCommits() { + return exceptionsOfFailedCommits.size(); + } + + public List exceptionsOfFailedCommits() { + return Lists.newArrayList(exceptionsOfFailedCommits); + } + @VisibleForTesting boolean canCreateCommitGroup() { // Either we have a full commit group, or we have completed writing and need to commit