4848import org .slf4j .LoggerFactory ;
4949
5050import java .io .IOException ;
51+ import java .util .ArrayList ;
5152import java .util .Collections ;
53+ import java .util .List ;
5254import java .util .Map ;
55+ import java .util .function .Function ;
56+ import java .util .stream .Stream ;
5357import javax .annotation .Nullable ;
5458
5559/**
@@ -100,9 +104,6 @@ public void run(ActionContext context) throws Exception {
100104 // Enable legacy SQL
101105 builder .setUseLegacySql (config .isLegacySQL ());
102106
103- // Location must match that of the dataset(s) referenced in the query.
104- JobId jobId = JobId .newBuilder ().setRandomJob ().setLocation (config .getLocation ()).build ();
105-
106107 // API request - starts the query.
107108 Credentials credentials = config .getServiceAccount () == null ?
108109 null : GCPUtils .loadServiceAccountCredentials (config .getServiceAccount (),
@@ -126,13 +127,17 @@ public void run(ActionContext context) throws Exception {
126127
127128 QueryJobConfiguration queryConfig = builder .build ();
128129
129- Job queryJob = bigQuery .create (JobInfo .newBuilder (queryConfig ).setJobId (jobId ).build ());
130130
131- LOG . info ( "Executing SQL as job {}." , jobId . getJob ());
132- LOG . debug ( "The BigQuery SQL is {}" , config . getSql ());
131+ // Setting external retry strategy for BigQuery client due to BigQuery Client not retrying when a job clashes
132+ // with another job, due to error being 400.
133133
134- // Wait for the query to complete
135- queryJob .waitFor ();
134+ final String retryableStringPattern = "Retrying the job with back-off" ;
135+ List <Function <BigQueryException , Boolean >> retryRules = new ArrayList <>();
136+ retryRules .add (
137+ (BigQueryException e ) -> !((e .getCode () == 400 )
138+ && (e .getMessage ().contains (retryableStringPattern ) || e .getReason ().contains (retryableStringPattern )))
139+ );
140+ Job queryJob = executeQueryJobWithCustomRetry (bigQuery , queryConfig , retryRules );
136141
137142 // Check for errors
138143 if (queryJob .getStatus ().getError () != null ) {
@@ -169,6 +174,47 @@ public void run(ActionContext context) throws Exception {
169174 context .getMetrics ().gauge (RECORDS_PROCESSED , rows );
170175 }
171176
177+ /**
178+ * Executes Query with added retry rules following:
179+ * https://cloud.google.com/bigquery/sla
180+ */
181+ private Job executeQueryJobWithCustomRetry (BigQuery bigQuery , QueryJobConfiguration queryConfig ,
182+ List <Function <BigQueryException , Boolean >> retryRules ) throws Exception {
183+ // The longest amount of time to wait in-between retries.
184+ final int maximum_backoff = 32 ;
185+
186+ // The maximum number of retries.
187+ final int max_retries = 20 ;
188+
189+ int retries = 0 ;
190+
191+ while (true ) {
192+ try {
193+ // Location must match that of the dataset(s) referenced in the query.
194+ JobId jobId = JobId .newBuilder ().setRandomJob ().setLocation (config .getLocation ()).build ();
195+ Job queryJob = bigQuery .create (JobInfo .newBuilder (queryConfig ).setJobId (jobId ).build ());
196+ LOG .info ("Executing SQL as job {}." , jobId .getJob ());
197+ LOG .debug ("The BigQuery SQL is {}" , config .getSql ());
198+
199+ // Wait for the query to complete
200+ queryJob .waitFor ();
201+ return queryJob ;
202+ } catch (BigQueryException bigQueryException ) {
203+ if (retries >= max_retries ) {
204+ LOG .error ("Run out of retries while executing query with backoff." );
205+ throw bigQueryException ;
206+ }
207+ if (retryRules .stream ().noneMatch ((f -> f .apply (bigQueryException )))) {
208+ throw bigQueryException ;
209+ }
210+ LOG .warn ("Received {} error from BigQuery, retrying..." , bigQueryException .getMessage ());
211+ long sleep_time = Math .round ((Math .min (Math .pow (2 , retries ), maximum_backoff ) + Math .random ()) * 1000 );
212+ Thread .sleep (sleep_time );
213+ retries += 1 ;
214+ }
215+ }
216+ }
217+
172218 @ Override
173219 public AbstractBigQueryActionConfig getConfig () {
174220 return config ;
0 commit comments