Skip to content

Commit 3f1e8fa

Browse files
feat: Support S3 virtual-hosted–style URI (#24405)
1 parent 869fe5d commit 3f1e8fa

File tree

7 files changed

+85
-40
lines changed

7 files changed

+85
-40
lines changed

crates/polars-io/src/path_utils/hugging_face.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ impl GetPages<'_> {
215215
pub(super) async fn expand_paths_hf(
216216
paths: &[PlPath],
217217
check_directory_level: bool,
218-
cloud_options: Option<&CloudOptions>,
218+
cloud_options: &Option<CloudOptions>,
219219
glob: bool,
220220
) -> PolarsResult<(usize, Vec<PlPath>)> {
221221
assert!(!paths.is_empty());

crates/polars-io/src/path_utils/mod.rs

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ pub fn expanded_from_single_directory(addrs: &[PlPath], expanded_addrs: &[PlPath
163163
pub fn expand_paths(
164164
paths: &[PlPath],
165165
glob: bool,
166-
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
166+
#[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
167167
) -> PolarsResult<Arc<[PlPath]>> {
168168
expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
169169
}
@@ -204,7 +204,7 @@ impl HiveIdxTracker<'_> {
204204
pub fn expand_paths_hive(
205205
paths: &[PlPath],
206206
glob: bool,
207-
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
207+
#[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
208208
check_directory_level: bool,
209209
) -> PolarsResult<(Arc<[PlPath]>, usize)> {
210210
let Some(first_path) = paths.first() else {
@@ -398,13 +398,65 @@ pub fn expand_paths_hive(
398398
};
399399

400400
for (path_idx, path) in paths.iter().enumerate() {
401+
use std::borrow::Cow;
402+
403+
let mut path = Cow::Borrowed(path);
404+
401405
if matches!(
402406
path.cloud_scheme(),
403407
Some(CloudScheme::Http | CloudScheme::Https)
404408
) {
405-
out_paths.push(path.clone());
406-
hive_idx_tracker.update(0, path_idx)?;
407-
continue;
409+
let mut rewrite_aws = false;
410+
411+
#[cfg(feature = "aws")]
412+
{
413+
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access
414+
// Path format: https://bucket-name.s3.region-code.amazonaws.com/key-name
415+
let p = path.as_ref().as_ref();
416+
let after_scheme = p.strip_scheme();
417+
if let Some(bucket_end) = after_scheme.find(".s3.") {
418+
if let Some(region_end) = after_scheme.find(".amazonaws.com/") {
419+
if bucket_end < region_end
420+
&& region_end < after_scheme.find("/").unwrap()
421+
{
422+
use crate::cloud::CloudConfig;
423+
424+
rewrite_aws = true;
425+
426+
let bucket = &after_scheme[..bucket_end];
427+
let region = &after_scheme[bucket_end + 4..region_end];
428+
let key = &after_scheme[region_end + 15..];
429+
430+
if let CloudConfig::Aws(configs) = cloud_options
431+
.get_or_insert_default()
432+
.config
433+
.get_or_insert_with(|| {
434+
CloudConfig::Aws(Vec::with_capacity(1))
435+
})
436+
{
437+
use object_store::aws::AmazonS3ConfigKey;
438+
439+
if !matches!(
440+
configs.last(),
441+
Some((AmazonS3ConfigKey::Region, _))
442+
) {
443+
configs.push((AmazonS3ConfigKey::Region, region.into()))
444+
}
445+
}
446+
447+
path = Cow::Owned(PlPath::from_string(format!(
448+
"s3://{bucket}/{key}"
449+
)))
450+
}
451+
}
452+
}
453+
}
454+
455+
if !rewrite_aws {
456+
out_paths.push(path.into_owned());
457+
hive_idx_tracker.update(0, path_idx)?;
458+
continue;
459+
}
408460
}
409461

410462
let glob_start_idx = get_glob_start_idx(path.to_str().as_bytes());
@@ -413,7 +465,7 @@ pub fn expand_paths_hive(
413465
path.clone()
414466
} else {
415467
let (expand_start_idx, paths) =
416-
expand_path_cloud(path.to_str(), cloud_options)?;
468+
expand_path_cloud(path.to_str(), cloud_options.as_ref())?;
417469
out_paths.extend_from_slice(&paths);
418470
hive_idx_tracker.update(expand_start_idx, path_idx)?;
419471
continue;
@@ -422,7 +474,7 @@ pub fn expand_paths_hive(
422474
hive_idx_tracker.update(0, path_idx)?;
423475

424476
let iter = crate::pl_async::get_runtime()
425-
.block_in_place_on(crate::async_glob(path.to_str(), cloud_options))?;
477+
.block_in_place_on(crate::async_glob(path.to_str(), cloud_options.as_ref()))?;
426478

427479
if is_cloud {
428480
out_paths.extend(iter.into_iter().map(PlPath::from_string));
@@ -577,7 +629,7 @@ mod tests {
577629

578630
let path = "https://pola.rs/test.csv?token=bear";
579631
let paths = &[PlPath::new(path)];
580-
let out = expand_paths(paths, true, None).unwrap();
632+
let out = expand_paths(paths, true, &mut None).unwrap();
581633
assert_eq!(out.as_ref(), paths);
582634
}
583635
}

crates/polars-lazy/src/scan/csv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ impl LazyCsvReader {
275275
ScanSources::Paths(paths) => {
276276
// TODO: Path expansion should happen when converting to the IR
277277
// https://github.com/pola-rs/polars/issues/17634
278-
let paths = expand_paths(&paths[..], self.glob(), self.cloud_options())?;
278+
let paths = expand_paths(&paths[..], self.glob(), &mut self.cloud_options)?;
279279

280280
let Some(path) = paths.first() else {
281281
polars_bail!(ComputeError: "no paths specified for this reader");

crates/polars-plan/src/dsl/scan_sources.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,12 @@ impl PartialEq for ScanSources {
159159
impl Eq for ScanSources {}
160160

161161
impl ScanSources {
162-
pub fn expand_paths(
163-
&self,
164-
scan_args: &UnifiedScanArgs,
165-
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
166-
) -> PolarsResult<Self> {
162+
pub fn expand_paths(&self, scan_args: &mut UnifiedScanArgs) -> PolarsResult<Self> {
167163
match self {
168164
Self::Paths(paths) => Ok(Self::Paths(expand_paths(
169165
paths,
170166
scan_args.glob,
171-
cloud_options,
167+
&mut scan_args.cloud_options,
172168
)?)),
173169
v => Ok(v.clone()),
174170
}
@@ -180,14 +176,13 @@ impl ScanSources {
180176
pub fn expand_paths_with_hive_update(
181177
&self,
182178
scan_args: &mut UnifiedScanArgs,
183-
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
184179
) -> PolarsResult<Self> {
185180
match self {
186181
Self::Paths(paths) => {
187182
let (expanded_paths, hive_start_idx) = expand_paths_hive(
188183
paths,
189184
scan_args.glob,
190-
cloud_options,
185+
&mut scan_args.cloud_options,
191186
scan_args.hive_options.enabled.unwrap_or(false),
192187
)?;
193188

crates/polars-plan/src/plans/conversion/dsl_to_ir/scans.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use either::Either;
22
use polars_io::RowIndex;
3-
use polars_io::cloud::CloudOptions;
43
#[cfg(feature = "cloud")]
54
use polars_io::pl_async::get_runtime;
65
use polars_io::prelude::*;
@@ -20,9 +19,6 @@ pub(super) fn dsl_to_ir(
2019
let mut cached_ir = cached_ir.lock().unwrap();
2120

2221
if cached_ir.is_none() {
23-
let cloud_options = unified_scan_args_box.cloud_options.clone();
24-
let cloud_options = cloud_options.as_ref();
25-
2622
let unified_scan_args = unified_scan_args_box.as_mut();
2723

2824
if let Some(hive_schema) = unified_scan_args.hive_options.schema.as_deref() {
@@ -45,16 +41,14 @@ pub(super) fn dsl_to_ir(
4541
let sources = match &*scan_type {
4642
#[cfg(feature = "parquet")]
4743
FileScanDsl::Parquet { .. } => {
48-
sources.expand_paths_with_hive_update(unified_scan_args, cloud_options)?
44+
sources.expand_paths_with_hive_update(unified_scan_args)?
4945
},
5046
#[cfg(feature = "ipc")]
51-
FileScanDsl::Ipc { .. } => {
52-
sources.expand_paths_with_hive_update(unified_scan_args, cloud_options)?
53-
},
47+
FileScanDsl::Ipc { .. } => sources.expand_paths_with_hive_update(unified_scan_args)?,
5448
#[cfg(feature = "csv")]
55-
FileScanDsl::Csv { .. } => sources.expand_paths(unified_scan_args, cloud_options)?,
49+
FileScanDsl::Csv { .. } => sources.expand_paths(unified_scan_args)?,
5650
#[cfg(feature = "json")]
57-
FileScanDsl::NDJson { .. } => sources.expand_paths(unified_scan_args, cloud_options)?,
51+
FileScanDsl::NDJson { .. } => sources.expand_paths(unified_scan_args)?,
5852
#[cfg(feature = "python")]
5953
FileScanDsl::PythonDataset { .. } => {
6054
// There are a lot of places that short-circuit if the paths is empty,
@@ -71,7 +65,6 @@ pub(super) fn dsl_to_ir(
7165
&sources,
7266
sources_before_expansion,
7367
unified_scan_args,
74-
cloud_options,
7568
ctxt.verbose,
7669
)?;
7770

@@ -522,7 +515,6 @@ impl SourcesToFileInfo {
522515
sources: &ScanSources,
523516
sources_before_expansion: &ScanSources,
524517
unified_scan_args: &mut UnifiedScanArgs,
525-
cloud_options: Option<&CloudOptions>,
526518
) -> PolarsResult<(FileInfo, FileScanIR)> {
527519
let require_first_source = |failed_operation_name: &'static str, hint: &'static str| {
528520
sources.first_or_empty_expand_err(
@@ -533,6 +525,8 @@ impl SourcesToFileInfo {
533525
)
534526
};
535527

528+
let cloud_options = unified_scan_args.cloud_options.as_ref();
529+
536530
Ok(match scan_type {
537531
#[cfg(feature = "parquet")]
538532
FileScanDsl::Parquet { options } => {
@@ -666,7 +660,6 @@ this scan to succeed with an empty DataFrame.",
666660
sources: &ScanSources,
667661
sources_before_expansion: &ScanSources,
668662
unified_scan_args: &mut UnifiedScanArgs,
669-
cloud_options: Option<&CloudOptions>,
670663
verbose: bool,
671664
) -> PolarsResult<(FileInfo, FileScanIR)> {
672665
// Only cache non-empty paths. Others are directly parsed.
@@ -679,7 +672,6 @@ this scan to succeed with an empty DataFrame.",
679672
sources,
680673
sources_before_expansion,
681674
unified_scan_args,
682-
cloud_options,
683675
);
684676
},
685677
};
@@ -731,7 +723,6 @@ this scan to succeed with an empty DataFrame.",
731723
sources,
732724
sources_before_expansion,
733725
unified_scan_args,
734-
cloud_options,
735726
);
736727
},
737728
};
@@ -747,7 +738,6 @@ this scan to succeed with an empty DataFrame.",
747738
sources,
748739
sources_before_expansion,
749740
unified_scan_args,
750-
cloud_options,
751741
)?;
752742
self.inner.insert(k, v.clone());
753743
Ok(v)

py-polars/polars/io/cloud/_utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,14 @@ def _get_path_scheme(path: str | Path) -> str | None:
5353
return path_str[:i] if i >= 0 else None
5454

5555

56-
def _is_aws_cloud(scheme: str) -> bool:
57-
return any(scheme == x for x in ["s3", "s3a"])
56+
def _is_aws_cloud(*, scheme: str, first_scan_path: str) -> bool:
57+
return any(scheme == x for x in ["s3", "s3a"]) or (
58+
(scheme == "http" or scheme == "https")
59+
and 0
60+
< first_scan_path.find(".s3.")
61+
< first_scan_path.find(".amazonaws.com/")
62+
< first_scan_path[len(scheme) + 3 :].find("/")
63+
)
5864

5965

6066
def _is_azure_cloud(scheme: str) -> bool:

py-polars/polars/io/cloud/credential_provider/_builder.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,10 @@ def f() -> CredentialProviderBuilder | None:
337337
credential_provider
338338
)
339339

340-
if (path := _first_scan_path(source)) is None:
340+
if (first_scan_path := _first_scan_path(source)) is None:
341341
return None
342342

343-
if (scheme := _get_path_scheme(path)) is None:
343+
if (scheme := _get_path_scheme(first_scan_path)) is None:
344344
return None
345345

346346
if _is_azure_cloud(scheme):
@@ -374,7 +374,9 @@ def f() -> CredentialProviderBuilder | None:
374374

375375
storage_account = (
376376
# Prefer the one embedded in the path
377-
CredentialProviderAzure._extract_adls_uri_storage_account(str(path))
377+
CredentialProviderAzure._extract_adls_uri_storage_account(
378+
str(first_scan_path)
379+
)
378380
or storage_account
379381
)
380382

@@ -386,7 +388,7 @@ def f() -> CredentialProviderBuilder | None:
386388
)
387389
)
388390

389-
elif _is_aws_cloud(scheme):
391+
elif _is_aws_cloud(scheme=scheme, first_scan_path=str(first_scan_path)):
390392
region = None
391393
profile = None
392394
default_region = None

0 commit comments

Comments
 (0)