3636import co .elastic .clients .elasticsearch .core .BulkRequest ;
3737import co .elastic .clients .elasticsearch .core .BulkResponse ;
3838import co .elastic .clients .elasticsearch .core .bulk .BulkOperation ;
39+ import co .elastic .clients .elasticsearch .core .bulk .BulkResponseItem ;
3940import org .slf4j .Logger ;
4041import org .slf4j .LoggerFactory ;
4142
4243import java .net .ConnectException ;
4344import java .net .NoRouteToHostException ;
4445import java .util .ArrayList ;
46+ import java .util .Arrays ;
4547import java .util .Collection ;
48+ import java .util .HashSet ;
4649import java .util .List ;
50+ import java .util .Set ;
4751
4852import static org .apache .flink .util .Preconditions .checkNotNull ;
4953
@@ -80,6 +84,9 @@ public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, O
8084 "Could not connect to Elasticsearch cluster using the provided hosts" ,
8185 err )));
8286
87+ private static final Set <Integer > ELASTICSEARCH_NON_RETRYABLE_STATUS =
88+ new HashSet <>(Arrays .asList (400 , 404 ));
89+
8390 public Elasticsearch8AsyncWriter (
8491 ElementConverter <InputT , Operation > elementConverter ,
8592 WriterInitContext context ,
@@ -160,21 +167,45 @@ private void handlePartiallyFailedRequest(
160167 ResultHandler <Operation > resultHandler ,
161168 BulkResponse response ) {
162169 LOG .debug ("The BulkRequest has failed partially. Response: {}" , response );
163- ArrayList <Operation > failedItems = new ArrayList <>();
170+
171+ ArrayList <Operation > failedItemsToRetry = new ArrayList <>();
172+ int totalFailedItems = 0 ;
173+ FlinkRuntimeException nonRetryableException = null ;
174+
164175 for (int i = 0 ; i < response .items ().size (); i ++) {
165- if (response .items ().get (i ).error () != null ) {
166- failedItems .add (requestEntries .get (i ));
176+ BulkResponseItem responseItem = response .items ().get (i );
177+ if (responseItem .error () != null ) {
178+ totalFailedItems ++;
179+ if (isOperationRetryable (responseItem .status ())) {
180+ failedItemsToRetry .add (requestEntries .get (i ));
181+ } else {
182+ LOG .error (
183+ "Failed to process non-retryable operation: {}, response: {}" ,
184+ requestEntries .get (i ),
185+ responseItem );
186+ nonRetryableException =
187+ new FlinkRuntimeException (
188+ "Failed to process non-retryable operation, reason=%s"
189+ + responseItem .error ().reason ());
190+ break ;
191+ }
167192 }
168193 }
169194
170- numRecordsOutErrorsCounter .inc (failedItems .size ());
171- numRecordsSendPartialFailureCounter .inc (failedItems .size ());
195+ numRecordsOutErrorsCounter .inc (totalFailedItems );
172196 LOG .info (
173- "The BulkRequest with {} operation(s) has {} failure(s). It took {}ms" ,
197+ "The BulkRequest with {} operation(s) has {} failure(s), {} retryable . It took {}ms" ,
174198 requestEntries .size (),
175- failedItems .size (),
199+ totalFailedItems ,
200+ failedItemsToRetry .size (),
176201 response .took ());
177- resultHandler .retryForEntries (failedItems );
202+
203+ if (nonRetryableException != null ) {
204+ resultHandler .completeExceptionally (nonRetryableException );
205+ } else {
206+ numRecordsSendPartialFailureCounter .inc (failedItemsToRetry .size ());
207+ resultHandler .retryForEntries (failedItemsToRetry );
208+ }
178209 }
179210
180211 private void handleSuccessfulRequest (
@@ -190,6 +221,11 @@ private boolean isRetryable(Throwable error) {
190221 return !ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER .isFatal (error , getFatalExceptionCons ());
191222 }
192223
224+ /** Given the response status, check if an operation is retryable. */
225+ private static boolean isOperationRetryable (int status ) {
226+ return !ELASTICSEARCH_NON_RETRYABLE_STATUS .contains (status );
227+ }
228+
193229 @ Override
194230 protected long getSizeInBytes (Operation requestEntry ) {
195231 return new OperationSerializer ().size (requestEntry );
0 commit comments