@@ -25,11 +25,11 @@ use datafusion::{
2525 listing:: { ListingOptions , ListingTable , ListingTableConfig , ListingTableUrl } ,
2626 } ,
2727 error:: DataFusionError ,
28- logical_expr:: { col, SortExpr } ,
28+ logical_expr:: col,
2929} ;
30- use futures_util:: { future , stream:: FuturesUnordered , Future , TryStreamExt } ;
30+ use futures_util:: { stream:: FuturesUnordered , Future , TryStreamExt } ;
3131use itertools:: Itertools ;
32- use object_store:: { ObjectMeta , ObjectStore } ;
32+ use object_store:: { path :: Path , ObjectMeta , ObjectStore } ;
3333
3434use crate :: {
3535 event:: DEFAULT_TIMESTAMP_KEY ,
@@ -60,25 +60,25 @@ impl ListingTableBuilder {
6060 client : Arc < dyn ObjectStore > ,
6161 time_filters : & [ PartialTimeFilter ] ,
6262 ) -> Result < Self , DataFusionError > {
63+ // Extract the minimum start time from the time filters.
6364 let start_time = time_filters
6465 . iter ( )
65- . filter_map ( |x | match x {
66- PartialTimeFilter :: Low ( Bound :: Excluded ( x) ) => Some ( x ) ,
67- PartialTimeFilter :: Low ( Bound :: Included ( x) ) => Some ( x) ,
66+ . filter_map ( |filter | match filter {
67+ PartialTimeFilter :: Low ( Bound :: Excluded ( x) )
68+ | PartialTimeFilter :: Low ( Bound :: Included ( x) ) => Some ( x) ,
6869 _ => None ,
6970 } )
70- . min ( )
71- . cloned ( ) ;
71+ . min ( ) ;
7272
73+ // Extract the maximum end time from the time filters.
7374 let end_time = time_filters
7475 . iter ( )
75- . filter_map ( |x | match x {
76- PartialTimeFilter :: High ( Bound :: Excluded ( x) ) => Some ( x ) ,
77- PartialTimeFilter :: High ( Bound :: Included ( x) ) => Some ( x) ,
76+ . filter_map ( |filter | match filter {
77+ PartialTimeFilter :: High ( Bound :: Excluded ( x) )
78+ | PartialTimeFilter :: High ( Bound :: Included ( x) ) => Some ( x) ,
7879 _ => None ,
7980 } )
80- . max ( )
81- . cloned ( ) ;
81+ . max ( ) ;
8282
8383 let Some ( ( start_time, end_time) ) = start_time. zip ( end_time) else {
8484 return Err ( DataFusionError :: NotImplemented (
@@ -87,62 +87,49 @@ impl ListingTableBuilder {
8787 ) ) ;
8888 } ;
8989
90+ // Generate prefixes for the given time range
9091 let prefixes = TimePeriod :: new (
9192 start_time. and_utc ( ) ,
9293 end_time. and_utc ( ) ,
9394 OBJECT_STORE_DATA_GRANULARITY ,
9495 )
9596 . generate_prefixes ( ) ;
9697
97- let prefixes = prefixes
98- . into_iter ( )
99- . map ( |entry| {
100- let path =
101- relative_path:: RelativePathBuf :: from ( format ! ( "{}/{}" , & self . stream, entry) ) ;
102- storage. absolute_url ( path. as_relative_path ( ) ) . to_string ( )
103- } )
104- . collect_vec ( ) ;
105-
106- let mut minute_resolve: HashMap < String , Vec < String > > = HashMap :: new ( ) ;
98+ // Categorizes prefixes into "minute" and general resolve lists.
99+ let mut minute_resolve = HashMap :: < String , Vec < String > > :: new ( ) ;
107100 let mut all_resolve = Vec :: new ( ) ;
108-
109101 for prefix in prefixes {
110- let components = prefix. split_terminator ( '/' ) ;
111- if components. last ( ) . is_some_and ( |x| x. starts_with ( "minute" ) ) {
112- let hour_prefix = & prefix[ 0 ..prefix. rfind ( "minute" ) . expect ( "minute exists" ) ] ;
102+ let path = relative_path:: RelativePathBuf :: from ( format ! ( "{}/{}" , & self . stream, prefix) ) ;
103+ storage. absolute_url ( path. as_relative_path ( ) ) . to_string ( ) ;
104+ if let Some ( pos) = prefix. rfind ( "minute" ) {
105+ let hour_prefix = & prefix[ ..pos] ;
113106 minute_resolve
114107 . entry ( hour_prefix. to_owned ( ) )
115- . and_modify ( |list| list . push ( prefix ) )
116- . or_default ( ) ;
108+ . or_default ( )
109+ . push ( prefix ) ;
117110 } else {
118- all_resolve. push ( prefix)
111+ all_resolve. push ( prefix) ;
119112 }
120113 }
121114
122- type ResolveFuture = Pin <
123- Box < dyn Future < Output = Result < Vec < ObjectMeta > , object_store:: Error > > + Send + ' static > ,
124- > ;
125- // Pin<Box<dyn Future<Output = Result<BoxStream<'_, Result<ObjectMeta>>>> + Send + 'async_trait>>
126- // BoxStream<'_, Result<ObjectMeta>>
115+ /// Resolve all prefixes asynchronously and collect the object metadata.
116+ type ResolveFuture =
117+ Pin < Box < dyn Future < Output = Result < Vec < ObjectMeta > , object_store:: Error > > + Send > > ;
127118 let tasks: FuturesUnordered < ResolveFuture > = FuturesUnordered :: new ( ) ;
128-
129- for ( listing_prefix, prefix) in minute_resolve {
119+ for ( listing_prefix, prefixes) in minute_resolve {
130120 let client = Arc :: clone ( & client) ;
131121 tasks. push ( Box :: pin ( async move {
132- let mut list = client
133- . list ( Some ( & object_store:: path:: Path :: from ( listing_prefix) ) )
134- . try_collect :: < Vec < _ > > ( )
135- . await ?;
122+ let path = Path :: from ( listing_prefix) ;
123+ let mut objects = client. list ( Some ( & path) ) . try_collect :: < Vec < _ > > ( ) . await ?;
136124
137- list. retain ( |object| {
138- prefix. iter ( ) . any ( |prefix| {
139- object
140- . location
125+ objects. retain ( |obj| {
126+ prefixes. iter ( ) . any ( |prefix| {
127+ obj. location
141128 . prefix_matches ( & object_store:: path:: Path :: from ( prefix. as_ref ( ) ) )
142129 } )
143130 } ) ;
144131
145- Ok ( list )
132+ Ok ( objects )
146133 } ) ) ;
147134 }
148135
@@ -157,25 +144,23 @@ impl ListingTableBuilder {
157144 } ) ) ;
158145 }
159146
160- let res: Vec < Vec < String > > = tasks
161- . and_then ( |res| {
162- future:: ok (
163- res. into_iter ( )
164- . map ( |res| res. location . to_string ( ) )
165- . collect_vec ( ) ,
166- )
167- } )
168- . try_collect ( )
147+ let listing = tasks
148+ . try_collect :: < Vec < Vec < ObjectMeta > > > ( )
169149 . await
170- . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?;
171-
172- let mut res = res. into_iter ( ) . flatten ( ) . collect_vec ( ) ;
173- res. sort ( ) ;
174- res. reverse ( ) ;
150+ . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?
151+ . into_iter ( )
152+ . flat_map ( |res| {
153+ res. into_iter ( )
154+ . map ( |obj| obj. location . to_string ( ) )
155+ . collect :: < Vec < String > > ( )
156+ } )
157+ . sorted ( )
158+ . rev ( )
159+ . collect_vec ( ) ;
175160
176161 Ok ( Self {
177162 stream : self . stream ,
178- listing : res ,
163+ listing,
179164 } )
180165 }
181166
@@ -188,25 +173,21 @@ impl ListingTableBuilder {
188173 if self . listing . is_empty ( ) {
189174 return Ok ( None ) ;
190175 }
191- let file_sort_order: Vec < Vec < SortExpr > > ;
192- let file_format = ParquetFormat :: default ( ) . with_enable_pruning ( true ) ;
193- if let Some ( time_partition) = time_partition {
194- file_sort_order = vec ! [ vec![ col( time_partition) . sort( true , false ) ] ] ;
195- } else {
196- file_sort_order = vec ! [ vec![ col( DEFAULT_TIMESTAMP_KEY ) . sort( true , false ) ] ] ;
197- }
198176
177+ let file_sort_order = vec ! [ vec![ time_partition
178+ . map_or_else( || col( DEFAULT_TIMESTAMP_KEY ) , col)
179+ . sort( true , false ) ] ] ;
180+ let file_format = ParquetFormat :: default ( ) . with_enable_pruning ( true ) ;
199181 let listing_options = ListingOptions :: new ( Arc :: new ( file_format) )
200182 . with_file_extension ( ".parquet" )
201183 . with_file_sort_order ( file_sort_order)
202184 . with_collect_stat ( true )
203185 . with_target_partitions ( 1 ) ;
204-
205186 let config = ListingTableConfig :: new_with_multi_paths ( map ( self . listing ) )
206187 . with_listing_options ( listing_options)
207188 . with_schema ( schema) ;
189+ let listing_table = ListingTable :: try_new ( config) ?;
208190
209- let listing_table = Arc :: new ( ListingTable :: try_new ( config) ?) ;
210- Ok ( Some ( listing_table) )
191+ Ok ( Some ( Arc :: new ( listing_table) ) )
211192 }
212193}
0 commit comments