diff --git a/Cargo.lock b/Cargo.lock index 4fe5f469..1814343b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -3492,7 +3492,7 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "uplink" -version = "2.18.4" +version = "2.18.5" dependencies = [ "anyhow", "async-trait", diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 4c7b0f1b..5da38ab2 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.80.0" +channel = "1.85.0" targets = [ "x86_64-linux-android", "i686-linux-android", diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index 6ab8f49c..076ba0d6 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "uplink" -version = "2.18.4" +version = "2.18.5" authors = ["tekjar "] -edition = "2021" +edition = "2024" [dependencies] bytes = { workspace = true } diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 2b556799..20370533 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -34,7 +34,7 @@ pub struct ActionResponse { } impl ActionResponse { - fn new(id: &str, state: &str, progress: u8, errors: Vec) -> Self { + pub fn new(id: &str, state: &str, progress: u8, errors: Vec) -> Self { let timestamp = clock() as u64; ActionResponse { diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 41ee996a..6f301f95 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -65,7 +65,6 @@ enum DownloadResult { pub struct FileDownloader { config: DownloaderConfig, actions_rx: Receiver, - action_id: String, bridge_tx: BridgeTx, client: Client, shutdown_rx: Receiver, @@ -102,7 +101,6 @@ impl FileDownloader { actions_rx, client, bridge_tx, - action_id: String::default(), shutdown_rx, disabled, }) @@ -116,28 +114,29 @@ impl FileDownloader { info!("Downloader thread is ready to receive download actions"); while let Ok(action) = self.actions_rx.recv_async().await { - action.action_id.clone_into(&mut self.action_id); + let action_id = action.action_id.clone(); let mut state = match DownloadState::new(action, &self.config) { Ok(s) => s, Err(e) => { - self.forward_error(e).await; + self.bridge_tx.send_action_response(ActionResponse::failure(&action_id, e.to_string())).await; continue; } }; // Update action status for process initiated - let status = ActionResponse::progress(&self.action_id, "Downloading", 0); + let status = ActionResponse::progress(&state.current.action.action_id, "Downloading", 0); self.bridge_tx.send_action_response(status).await; match self.download(&mut state).await { DownloadResult::Ok => { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); + let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]); + status.done_response = Some(action); self.bridge_tx.send_action_response(status).await; } DownloadResult::Err(e) => { - self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await; + self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await; } DownloadResult::Suspended => { break @@ -158,17 +157,17 @@ impl FileDownloader { return; } }; - state.current.action.action_id.clone_into(&mut self.action_id); match self.download(&mut state).await { DownloadResult::Ok => { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); + let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]); + status.done_response = Some(action); self.bridge_tx.send_action_response(status).await; } DownloadResult::Err(e) => { - self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await; + self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await; } DownloadResult::Suspended => {} } @@ -176,6 +175,10 @@ impl FileDownloader { // Accepts `DownloadState`, sets a timeout for the action async fn download(&mut self, state: &mut DownloadState) -> DownloadResult { + if state.already_downloaded { + return DownloadResult::Ok; + } + let shutdown_rx = self.shutdown_rx.clone(); loop { select! { @@ -187,7 +190,7 @@ impl FileDownloader { } }, Ok(action) = self.actions_rx.recv_async() => { - if action.action_id == self.action_id { + if &action.action_id == &state.current.action.action_id { // This handles the edge case when the device is able to receive actions // from the broker but for something goes wrong when pushing action statuses back to the backend // In this case the backend will try sending the same action again @@ -202,10 +205,10 @@ impl FileDownloader { match serde_json::from_str::(&action.payload) .context("Invalid cancel action payload") .and_then(|cancellation| { - if cancellation.action_id == self.action_id { + if cancellation.action_id == state.current.action.action_id { Ok(()) } else { - Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, self.action_id))) + Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id))) } }) .and_then(|_| { @@ -233,9 +236,11 @@ impl FileDownloader { } } - self.bridge_tx.send_action_response(ActionResponse::progress(self.action_id.as_str(), "VerifyingChecksum", 99)).await; - if let Err(e) = state.current.meta.verify_checksum() { - return DownloadResult::Err(e.to_string()); + self.bridge_tx.send_action_response(ActionResponse::progress(&state.current.action.action_id, "VerifyingChecksum", 99)).await; + if state.current.meta.checksum.is_some() { + if let Err(e) = state.current.meta.verify_checksum() { + return DownloadResult::Err(e.to_string()); + } } // Update Action payload with `download_path`, i.e. downloaded file's location in fs state.current.action.payload = match serde_json::to_string(&state.current.meta) { @@ -284,7 +289,7 @@ impl FileDownloader { // Retry non-status errors Err(e) if !e.is_status() => { let status = - ActionResponse::progress(&self.action_id, "Download Failed", 0) + ActionResponse::progress(&state.current.action.action_id, "Download Failed", 0) .add_error(e.to_string()); self.bridge_tx.send_action_response(status).await; error!("Download failed: {e:?}"); @@ -296,7 +301,7 @@ impl FileDownloader { }; if let Some(percentage) = state.write_bytes(&chunk)? { let status = - ActionResponse::progress(&self.action_id, "Downloading", percentage); + ActionResponse::progress(&state.current.action.action_id, "Downloading", percentage); self.bridge_tx.send_action_response(status).await; } } @@ -307,12 +312,6 @@ impl FileDownloader { Ok(()) } - - // Forward errors as action response to bridge - async fn forward_error(&mut self, err: Error) { - let status = ActionResponse::failure(&self.action_id, err.to_string()); - self.bridge_tx.send_action_response(status).await; - } } #[cfg(unix)] @@ -333,9 +332,7 @@ fn create_dirs_with_perms(path: &Path, perms: Permissions) -> std::io::Result<() } /// Creates file to download into -fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBuf), Error> { - let mut file_path = download_path.to_owned(); - file_path.push(file_name); +fn create_file(file_path: &Path) -> Result { // NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it if let Ok(f) = metadata(&file_path) { if f.is_dir() { @@ -346,7 +343,7 @@ fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBu #[cfg(unix)] file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?; - Ok((file, file_path)) + Ok(file) } fn check_disk_size(config: &DownloaderConfig, download: &DownloadFile) -> Result<(), Error> { @@ -381,14 +378,13 @@ pub struct DownloadFile { impl DownloadFile { fn verify_checksum(&self) -> Result<(), Error> { - let Some(checksum) = &self.checksum else { return Ok(()) }; let path = self.download_path.as_ref().expect("Downloader didn't set \"download_path\""); let mut file = File::open(path)?; let mut hasher = Sha256::new(); io::copy(&mut file, &mut hasher)?; let hash = hasher.finalize(); - if checksum != &hex::encode(hash) { + if self.checksum.as_ref().unwrap() != &hex::encode(hash) { return Err(Error::BadChecksum); } @@ -410,6 +406,7 @@ struct DownloadState { file: File, bytes_written: usize, percentage_downloaded: u8, + already_downloaded: bool, start: Instant, } @@ -438,6 +435,19 @@ impl DownloadState { }; let file_path = path.join(&meta.file_name); + meta.download_path = Some(file_path.clone()); + if meta.checksum.is_some() && meta.verify_checksum().is_ok() { + info!("file has already been downloaded and its checksum matches, skipping download..."); + return Ok(Self { + bytes_written: meta.content_length, + current: CurrentDownload { action, meta }, + file: File::open("/dev/null")?, + percentage_downloaded: 100, + already_downloaded: true, + start: Instant::now(), + }); + } + let _ = remove_file(&file_path); let _ = remove_dir_all(&file_path); @@ -446,7 +456,7 @@ impl DownloadState { let url = meta.url.clone(); // Create file to actually download into - let (file, file_path) = create_file(&path, &meta.file_name)?; + let file = create_file(&file_path)?; // Retry downloading upto 3 times in case of connectivity issues // TODO: Error out for 1XX/3XX responses info!( @@ -454,7 +464,6 @@ impl DownloadState { file_path.display(), human_bytes(meta.content_length as f64) ); - meta.download_path = Some(file_path); let current = CurrentDownload { action, meta }; Ok(Self { @@ -462,6 +471,7 @@ impl DownloadState { file, bytes_written: 0, percentage_downloaded: 0, + already_downloaded: false, start: Instant::now(), }) } @@ -489,6 +499,7 @@ impl DownloadState { file, bytes_written, percentage_downloaded: 0, + already_downloaded: false, start: Instant::now(), }) }