11//! The historical batch download API.
22
3- use core:: fmt;
43use std:: {
4+ cmp:: Ordering ,
55 collections:: HashMap ,
6+ fmt,
67 fmt:: Write ,
78 num:: NonZeroU64 ,
89 path:: { Path , PathBuf } ,
@@ -11,11 +12,13 @@ use std::{
1112
1213use dbn:: { Compression , Encoding , SType , Schema } ;
1314use futures:: StreamExt ;
15+ use hex:: ToHex ;
1416use reqwest:: RequestBuilder ;
1517use serde:: { de, Deserialize , Deserializer } ;
18+ use sha2:: { Digest , Sha256 } ;
1619use time:: OffsetDateTime ;
1720use tokio:: io:: BufWriter ;
18- use tracing:: info;
21+ use tracing:: { debug , error , info, info_span , warn , Instrument } ;
1922use typed_builder:: TypedBuilder ;
2023
2124use crate :: { historical:: check_http_error, Error , Symbols } ;
@@ -144,10 +147,11 @@ impl BatchClient<'_> {
144147 . urls
145148 . get ( "https" )
146149 . ok_or_else ( || Error :: internal ( "Missing https URL for batch file" ) ) ?;
147- self . download_file ( https_url, & output_path) . await ?;
150+ self . download_file ( https_url, & output_path, & file_desc. hash , file_desc. size )
151+ . await ?;
148152 Ok ( vec ! [ output_path] )
149153 } else {
150- let mut paths = Vec :: new ( ) ;
154+ let mut paths = Vec :: with_capacity ( job_files . len ( ) ) ;
151155 for file_desc in job_files. iter ( ) {
152156 let output_path = params
153157 . output_dir
@@ -157,31 +161,136 @@ impl BatchClient<'_> {
157161 . urls
158162 . get ( "https" )
159163 . ok_or_else ( || Error :: internal ( "Missing https URL for batch file" ) ) ?;
160- self . download_file ( https_url, & output_path) . await ?;
164+ self . download_file ( https_url, & output_path, & file_desc. hash , file_desc. size )
165+ . await ?;
161166 paths. push ( output_path) ;
162167 }
163168 Ok ( paths)
164169 }
165170 }
166171
167- async fn download_file ( & mut self , url : & str , path : impl AsRef < Path > ) -> crate :: Result < ( ) > {
172+ async fn download_file (
173+ & mut self ,
174+ url : & str ,
175+ path : & Path ,
176+ hash : & str ,
177+ exp_size : u64 ,
178+ ) -> crate :: Result < ( ) > {
179+ const MAX_RETRIES : usize = 5 ;
168180 let url = reqwest:: Url :: parse ( url)
169181 . map_err ( |e| Error :: internal ( format ! ( "Unable to parse URL: {e:?}" ) ) ) ?;
170- let resp = self . inner . get_with_path ( url. path ( ) ) ?. send ( ) . await ?;
171- let mut stream = check_http_error ( resp) . await ?. bytes_stream ( ) ;
172- info ! ( %url, path=%path. as_ref( ) . display( ) , "Downloading file" ) ;
173- let mut output = BufWriter :: new (
174- tokio:: fs:: OpenOptions :: new ( )
175- . create ( true )
176- . truncate ( true )
177- . write ( true )
178- . open ( path)
179- . await ?,
180- ) ;
181- while let Some ( chunk) = stream. next ( ) . await {
182- tokio:: io:: copy ( & mut chunk?. as_ref ( ) , & mut output) . await ?;
182+
183+ let Some ( ( hash_algo, exp_hash_hex) ) = hash. split_once ( ':' ) else {
184+ return Err ( Error :: internal ( "Unexpected hash string format {hash:?}" ) ) ;
185+ } ;
186+ let mut hasher = if hash_algo == "sha256" {
187+ Some ( Sha256 :: new ( ) )
188+ } else {
189+ warn ! (
190+ hash_algo,
191+ "Skipping checksum with unsupported hash algorithm"
192+ ) ;
193+ None
194+ } ;
195+
196+ let span = info_span ! ( "BatchDownload" , %url, path=%path. display( ) ) ;
197+ async move {
198+ let mut retries = 0 ;
199+ ' retry: loop {
200+ let mut req = self . inner . get_with_path ( url. path ( ) ) ?;
201+ match Self :: check_if_exists ( path, exp_size) . await ? {
202+ Header :: Skip => {
203+ return Ok ( ( ) ) ;
204+ }
205+ Header :: Range ( Some ( ( key, val) ) ) => {
206+ req = req. header ( key, val) ;
207+ }
208+ Header :: Range ( None ) => { }
209+ }
210+ let resp = req. send ( ) . await ?;
211+ let mut stream = check_http_error ( resp) . await ?. bytes_stream ( ) ;
212+ info ! ( "Downloading file" ) ;
213+ let mut output = BufWriter :: new (
214+ tokio:: fs:: OpenOptions :: new ( )
215+ . create ( true )
216+ . append ( true )
217+ . write ( true )
218+ . open ( path)
219+ . await ?,
220+ ) ;
221+ while let Some ( chunk) = stream. next ( ) . await {
222+ let chunk = match chunk {
223+ Ok ( chunk) => chunk,
224+ Err ( err) if retries < MAX_RETRIES => {
225+ retries += 1 ;
226+ error ! ( ?err, retries, "Retrying download" ) ;
227+ continue ' retry;
228+ }
229+ Err ( err) => {
230+ return Err ( crate :: Error :: from ( err) ) ;
231+ }
232+ } ;
233+ if retries > 0 {
234+ retries = 0 ;
235+ info ! ( "Resumed download" ) ;
236+ }
237+ if let Some ( hasher) = hasher. as_mut ( ) {
238+ hasher. update ( & chunk)
239+ }
240+ tokio:: io:: copy ( & mut chunk. as_ref ( ) , & mut output) . await ?;
241+ }
242+ debug ! ( "Completed download" ) ;
243+ Self :: verify_hash ( hasher, exp_hash_hex) . await ;
244+ return Ok ( ( ) ) ;
245+ }
246+ }
247+ . instrument ( span)
248+ . await
249+ }
250+
251+ async fn check_if_exists ( path : & Path , exp_size : u64 ) -> crate :: Result < Header > {
252+ let Ok ( metadata) = tokio:: fs:: metadata ( path) . await else {
253+ return Ok ( Header :: Range ( None ) ) ;
254+ } ;
255+ let actual_size = metadata. len ( ) ;
256+ match actual_size. cmp ( & exp_size) {
257+ Ordering :: Less => {
258+ debug ! (
259+ prev_downloaded_bytes = actual_size,
260+ total_bytes = exp_size,
261+ "Found existing file, resuming download"
262+ ) ;
263+ }
264+ Ordering :: Equal => {
265+ debug ! ( "Skipping download as file already exists and matches expected size" ) ;
266+ return Ok ( Header :: Skip ) ;
267+ }
268+ Ordering :: Greater => {
269+ return Err ( crate :: Error :: Io ( std:: io:: Error :: other ( format ! (
270+ "Batch file {} already exists with size {actual_size} which is larger than expected size {exp_size}" ,
271+ path. file_name( ) . unwrap( ) . display( ) ,
272+ ) ) ) ) ;
273+ }
274+ }
275+ Ok ( Header :: Range ( Some ( (
276+ "Range" ,
277+ format ! ( "bytes={}-" , metadata. len( ) ) ,
278+ ) ) ) )
279+ }
280+
281+ async fn verify_hash ( hasher : Option < Sha256 > , exp_hash_hex : & str ) {
282+ let Some ( hasher) = hasher else {
283+ return ;
284+ } ;
285+ let hash_hex = hasher. finalize ( ) . encode_hex :: < String > ( ) ;
286+ if hash_hex != exp_hash_hex {
287+ warn ! (
288+ hash_hex,
289+ exp_hash_hex, "Downloaded file failed checksum validation"
290+ ) ;
291+ } else {
292+ debug ! ( "Successfully verified checksum" ) ;
183293 }
184- Ok ( ( ) )
185294 }
186295
187296 const PATH_PREFIX : & ' static str = "batch" ;
@@ -403,7 +512,7 @@ pub struct DownloadParams {
403512 #[ builder( setter( transform = |dt: impl ToString | dt. to_string( ) ) ) ]
404513 pub job_id : String ,
405514 /// `None` means all files associated with the job will be downloaded.
406- #[ builder( default , setter( strip_option ) ) ]
515+ #[ builder( default , setter( transform = |filename : impl ToString | Some ( filename . to_string ( ) ) ) ) ]
407516 pub filename_to_download : Option < String > ,
408517}
409518
@@ -542,6 +651,11 @@ fn deserialize_compression<'de, D: serde::Deserializer<'de>>(
542651 Ok ( opt. unwrap_or ( Compression :: None ) )
543652}
544653
654+ enum Header {
655+ Skip ,
656+ Range ( Option < ( & ' static str , String ) > ) ,
657+ }
658+
545659#[ cfg( test) ]
546660mod tests {
547661 use reqwest:: StatusCode ;
0 commit comments