4848import org .slf4j .LoggerFactory ;
4949
5050import java .io .IOException ;
51+ import java .util .ArrayList ;
5152import java .util .Collections ;
53+ import java .util .List ;
5254import java .util .Map ;
55+ import java .util .function .Function ;
5356import javax .annotation .Nullable ;
5457
5558/**
@@ -100,9 +103,6 @@ public void run(ActionContext context) throws Exception {
100103 // Enable legacy SQL
101104 builder .setUseLegacySql (config .isLegacySQL ());
102105
103- // Location must match that of the dataset(s) referenced in the query.
104- JobId jobId = JobId .newBuilder ().setRandomJob ().setLocation (config .getLocation ()).build ();
105-
106106 // API request - starts the query.
107107 Credentials credentials = config .getServiceAccount () == null ?
108108 null : GCPUtils .loadServiceAccountCredentials (config .getServiceAccount (),
@@ -126,13 +126,17 @@ public void run(ActionContext context) throws Exception {
126126
127127 QueryJobConfiguration queryConfig = builder .build ();
128128
129- Job queryJob = bigQuery .create (JobInfo .newBuilder (queryConfig ).setJobId (jobId ).build ());
130129
131- LOG . info ( "Executing SQL as job {}." , jobId . getJob ());
132- LOG . debug ( "The BigQuery SQL is {}" , config . getSql ());
130+ // Setting external retry strategy for BigQuery client due to BigQuery Client not retrying when a job clashes
131+ // with another job, due to error being 400.
133132
134- // Wait for the query to complete
135- queryJob .waitFor ();
133+ final String retryableStringPattern = "Retrying the job with back-off" ;
134+ List <Function <BigQueryException , Boolean >> retryRules = new ArrayList <>();
135+ retryRules .add (
136+ (BigQueryException e ) -> e .getCode () == 400
137+ && (e .getMessage ().contains (retryableStringPattern ) || e .getReason ().contains (retryableStringPattern ))
138+ );
139+ Job queryJob = executeQueryJobWithCustomRetry (bigQuery , queryConfig , retryRules );
136140
137141 // Check for errors
138142 if (queryJob .getStatus ().getError () != null ) {
@@ -169,6 +173,46 @@ public void run(ActionContext context) throws Exception {
169173 context .getMetrics ().gauge (RECORDS_PROCESSED , rows );
170174 }
171175
176+ /**
177+ * Executes Query with added retry rules following:
178+ * https://cloud.google.com/bigquery/sla
179+ */
180+ private Job executeQueryJobWithCustomRetry (BigQuery bigQuery , QueryJobConfiguration queryConfig ,
181+ List <Function <BigQueryException , Boolean >> retryRules ) throws Exception {
182+ // The longest amount of time to wait in-between retries.
183+ final int maximumBackoff = 32 ;
184+
185+ // The maximum number of retries.
186+ final int maxRetries = 20 ;
187+
188+ int retries = 0 ;
189+
190+ while (true ) {
191+ try {
192+ // Location must match that of the dataset(s) referenced in the query.
193+ JobId jobId = JobId .newBuilder ().setRandomJob ().setLocation (config .getLocation ()).build ();
194+ Job queryJob = bigQuery .create (JobInfo .newBuilder (queryConfig ).setJobId (jobId ).build ());
195+ LOG .info ("Executing SQL as job {}." , jobId .getJob ());
196+ LOG .debug ("The BigQuery SQL is {}" , config .getSql ());
197+
198+ // Wait for the query to complete
199+ queryJob .waitFor ();
200+ return queryJob ;
201+ } catch (BigQueryException bigQueryException ) {
202+ if (retries >= maxRetries ) {
203+ LOG .error ("Run out of retries while executing query with backoff." );
204+ throw bigQueryException ;
205+ }
206+ if (retryRules .stream ().noneMatch ((f -> f .apply (bigQueryException )))) {
207+ throw bigQueryException ;
208+ }
209+ LOG .warn ("Received {} error from BigQuery, retrying..." , bigQueryException .getMessage ());
210+ Thread .sleep (Math .round ((Math .min (Math .pow (2 , retries ), maximumBackoff ) + Math .random ()) * 1000 ));
211+ retries += 1 ;
212+ }
213+ }
214+ }
215+
172216 @ Override
173217 public AbstractBigQueryActionConfig getConfig () {
174218 return config ;
0 commit comments