Skip to content

Commit 837205e

Browse files
committed
Allow S3 Re-initialization
This would allow Arrow to coexist with other libraries that also use the AWS SDK. Internally since 2 years ago AWS SDK already has a refcount mechanism for `InitAPI` and supports re-init after deinit. - Introduced a mutex for thread-safe re-initialization of the S3 client after finalization, replacing the previous std::call_once mechanism. - Added an Initialize method to reset the finalized state of the S3 client. - Updated EnsureInitialized to allow re-initialization while ensuring thread safety. This change improves the flexibility and safety of the S3 client lifecycle management.
1 parent 19e3f90 commit 837205e

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

cpp/src/arrow/filesystem/s3fs.cc

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,11 @@ class S3ClientFinalizer : public std::enable_shared_from_this<S3ClientFinalizer>
881881

882882
auto LockShared() { return std::shared_lock(mutex_); }
883883

884+
void Initialize() {
885+
std::unique_lock lock(mutex_);
886+
finalized_ = false;
887+
}
888+
884889
protected:
885890
friend class S3ClientHolder;
886891

@@ -3419,29 +3424,33 @@ struct AwsInstance {
34193424

34203425
// Returns true iff the instance was newly initialized with `options`
34213426
Result<bool> EnsureInitialized(const S3GlobalOptions& options) {
3422-
// NOTE: The individual accesses are atomic but the entire sequence below is not.
3423-
// The application should serialize calls to InitializeS3() and FinalizeS3()
3424-
// (see docstrings).
3425-
if (is_finalized_.load()) {
3426-
return Status::Invalid("Attempt to initialize S3 after it has been finalized");
3427-
}
3428-
bool newly_initialized = false;
34293427
// EnsureInitialized() can be called concurrently by FileSystemFromUri,
34303428
// therefore we need to serialize initialization (GH-39897).
3431-
std::call_once(initialize_flag_, [&]() {
3432-
bool was_initialized = is_initialized_.exchange(true);
3433-
DCHECK(!was_initialized);
3429+
// We use a mutex instead of std::call_once to allow re-initialization after
3430+
// finalization (as supported by the AWS SDK).
3431+
std::lock_guard<std::mutex> lock(init_mutex_);
3432+
3433+
if (!is_initialized_.load()) {
3434+
// Not already initialized, allow re-initialization after finalization
3435+
is_finalized_.store(false);
3436+
is_initialized_.store(true);
34343437
DoInitialize(options);
3435-
newly_initialized = true;
3436-
});
3437-
return newly_initialized;
3438+
return true; // newly initialized
3439+
}
3440+
return false;
34383441
}
34393442

34403443
bool IsInitialized() { return !is_finalized_ && is_initialized_; }
34413444

34423445
bool IsFinalized() { return is_finalized_; }
34433446

34443447
void Finalize(bool from_destructor = false) {
3448+
std::unique_lock<std::mutex> lock(init_mutex_, std::defer_lock);
3449+
// Don't try to acquire the lock from destructor to avoid potential deadlocks
3450+
if (!from_destructor) {
3451+
lock.lock();
3452+
}
3453+
34453454
if (is_finalized_.exchange(true)) {
34463455
// Already finalized
34473456
return;
@@ -3508,12 +3517,13 @@ struct AwsInstance {
35083517
aws_options_.httpOptions.compliantRfc3986Encoding = true;
35093518
aws_options_.httpOptions.installSigPipeHandler = options.install_sigpipe_handler;
35103519
Aws::InitAPI(aws_options_);
3520+
GetClientFinalizer()->Initialize();
35113521
}
35123522

35133523
Aws::SDKOptions aws_options_;
35143524
std::atomic<bool> is_initialized_;
35153525
std::atomic<bool> is_finalized_;
3516-
std::once_flag initialize_flag_;
3526+
std::mutex init_mutex_;
35173527
};
35183528

35193529
AwsInstance* GetAwsInstance() {

0 commit comments

Comments
 (0)