@@ -69,6 +69,41 @@ func isConnectionError(err error) bool {
6969 strings .Contains (errStr , "sql: database is closed" )
7070}
7171
72+ // isRetryableError determines if an error should be retried
73+ func isRetryableError (err error ) bool {
74+ if err == nil {
75+ return false
76+ }
77+
78+ // Context deadline exceeded is retryable
79+ if errors .Is (err , context .DeadlineExceeded ) {
80+ return true
81+ }
82+
83+ // Connection errors are retryable
84+ if isConnectionError (err ) {
85+ return true
86+ }
87+
88+ // PostgreSQL specific retryable errors
89+ errStr := strings .ToLower (err .Error ())
90+ retryablePatterns := []string {
91+ "timeout" ,
92+ "canceling statement due to statement timeout" ,
93+ "deadline exceeded" ,
94+ "connection reset" ,
95+ "broken pipe" ,
96+ }
97+
98+ for _ , pattern := range retryablePatterns {
99+ if strings .Contains (errStr , pattern ) {
100+ return true
101+ }
102+ }
103+
104+ return false
105+ }
106+
72107type Archiver struct {
73108 config * Config
74109 db * sql.DB
@@ -377,6 +412,8 @@ func (a *Archiver) connect(ctx context.Context) error {
377412 if sslMode == "" {
378413 sslMode = "disable"
379414 }
415+
416+ // Build connection string with optional statement timeout
380417 connStr := fmt .Sprintf ("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s" ,
381418 a .config .Database .Host ,
382419 a .config .Database .Port ,
@@ -386,6 +423,14 @@ func (a *Archiver) connect(ctx context.Context) error {
386423 sslMode ,
387424 )
388425
426+ // Add statement timeout if configured (convert seconds to milliseconds for PostgreSQL)
427+ if a .config .Database .StatementTimeout > 0 {
428+ timeoutMs := a .config .Database .StatementTimeout * 1000
429+ connStr += fmt .Sprintf (" statement_timeout=%d" , timeoutMs )
430+ a .logger .Debug (fmt .Sprintf (" 📝 Configured statement timeout: %d seconds (%d ms)" ,
431+ a .config .Database .StatementTimeout , timeoutMs ))
432+ }
433+
389434 db , err := sql .Open ("postgres" , connStr )
390435 if err != nil {
391436 return err
@@ -416,6 +461,50 @@ func (a *Archiver) connect(ctx context.Context) error {
416461 return nil
417462}
418463
464+ // queryWithRetry executes a query with retry logic for transient failures
465+ func (a * Archiver ) queryWithRetry (ctx context.Context , query string , args ... interface {}) (* sql.Rows , error ) {
466+ maxRetries := a .config .Database .MaxRetries
467+ if maxRetries <= 0 {
468+ maxRetries = 3 // Default to 3 retries
469+ }
470+
471+ retryDelay := time .Duration (a .config .Database .RetryDelay ) * time .Second
472+ if retryDelay <= 0 {
473+ retryDelay = 5 * time .Second // Default to 5 seconds
474+ }
475+
476+ var lastErr error
477+ for attempt := 0 ; attempt <= maxRetries ; attempt ++ {
478+ rows , err := a .db .QueryContext (ctx , query , args ... )
479+ if err == nil {
480+ return rows , nil
481+ }
482+
483+ lastErr = err
484+
485+ // Check if error is retryable
486+ if ! isRetryableError (err ) {
487+ return nil , err
488+ }
489+
490+ // Don't retry on the last attempt
491+ if attempt < maxRetries {
492+ a .logger .Warn (fmt .Sprintf (" ⚠️ Query failed (attempt %d/%d): %v. Retrying in %v..." ,
493+ attempt + 1 , maxRetries + 1 , err , retryDelay ))
494+
495+ // Wait before retrying, respecting context cancellation
496+ select {
497+ case <- time .After (retryDelay ):
498+ continue
499+ case <- ctx .Done ():
500+ return nil , ctx .Err ()
501+ }
502+ }
503+ }
504+
505+ return nil , fmt .Errorf ("query failed after %d attempts: %w" , maxRetries + 1 , lastErr )
506+ }
507+
419508func (a * Archiver ) checkTablePermissions (ctx context.Context ) error {
420509 // Use PostgreSQL's has_table_privilege function which is much faster
421510 // This checks SELECT permission without actually running a query
@@ -1593,9 +1682,10 @@ func (a *Archiver) printSummary(results []ProcessResult) {
15931682//nolint:gocognit // complex row extraction with progress tracking
15941683func (a * Archiver ) extractRowsWithProgress (partition PartitionInfo , program * tea.Program ) ([]map [string ]interface {}, error ) {
15951684 quotedTable := pq .QuoteIdentifier (partition .TableName )
1596- query := fmt .Sprintf ("SELECT row_to_json(t) FROM %s t" , quotedTable ) //nolint:gosec // Table name is quoted with pq.QuoteIdentifier
1685+ query := fmt .Sprintf ("SELECT row_to_json(t) FROM %s t" , quotedTable )
15971686
1598- rows , err := a .db .QueryContext (a .ctx , query )
1687+ // Use queryWithRetry for automatic retry on timeout/connection errors
1688+ rows , err := a .queryWithRetry (a .ctx , query )
15991689 if err != nil {
16001690 // Check if error is due to cancellation or closed connection
16011691 if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) || isConnectionError (err ) {
@@ -1697,15 +1787,15 @@ func (a *Archiver) extractRowsWithDateFilter(partition PartitionInfo, startTime,
16971787 quotedDateColumn := pq .QuoteIdentifier (a .config .DateColumn )
16981788
16991789 // Build query with date range filter
1700- //nolint:gosec // Table and column names are quoted with pq.QuoteIdentifier
17011790 query := fmt .Sprintf (
17021791 "SELECT row_to_json(t) FROM %s t WHERE %s >= $1 AND %s < $2" ,
17031792 quotedTable ,
17041793 quotedDateColumn ,
17051794 quotedDateColumn ,
17061795 )
17071796
1708- rows , err := a .db .QueryContext (a .ctx , query , startTime , endTime )
1797+ // Use queryWithRetry for automatic retry on timeout/connection errors
1798+ rows , err := a .queryWithRetry (a .ctx , query , startTime , endTime )
17091799 if err != nil {
17101800 // Check if error is due to cancellation or closed connection
17111801 if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) || isConnectionError (err ) {
0 commit comments