From e5f8f70c794d41f3e12da5df6fe4ad508ac093b6 Mon Sep 17 00:00:00 2001 From: Sagar Tiwari Date: Wed, 24 Sep 2025 18:14:25 +0530 Subject: [PATCH 1/4] chore: remove an unnecessary field --- uplink/src/collector/downloader.rs | 35 +++++++++++------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 41ee996a..e38c1ede 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -65,7 +65,6 @@ enum DownloadResult { pub struct FileDownloader { config: DownloaderConfig, actions_rx: Receiver, - action_id: String, bridge_tx: BridgeTx, client: Client, shutdown_rx: Receiver, @@ -102,7 +101,6 @@ impl FileDownloader { actions_rx, client, bridge_tx, - action_id: String::default(), shutdown_rx, disabled, }) @@ -116,28 +114,28 @@ impl FileDownloader { info!("Downloader thread is ready to receive download actions"); while let Ok(action) = self.actions_rx.recv_async().await { - action.action_id.clone_into(&mut self.action_id); + let action_id = action.action_id.clone(); let mut state = match DownloadState::new(action, &self.config) { Ok(s) => s, Err(e) => { - self.forward_error(e).await; + self.bridge_tx.send_action_response(ActionResponse::failure(&action_id, e.to_string())).await; continue; } }; // Update action status for process initiated - let status = ActionResponse::progress(&self.action_id, "Downloading", 0); + let status = ActionResponse::progress(&state.current.action.action_id, "Downloading", 0); self.bridge_tx.send_action_response(status).await; match self.download(&mut state).await { DownloadResult::Ok => { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); + let status = ActionResponse::done(&action.action_id, "Downloaded", Some(action)); self.bridge_tx.send_action_response(status).await; } DownloadResult::Err(e) => { - self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await; + self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await; } DownloadResult::Suspended => { break @@ -158,17 +156,16 @@ impl FileDownloader { return; } }; - state.current.action.action_id.clone_into(&mut self.action_id); match self.download(&mut state).await { DownloadResult::Ok => { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); + let status = ActionResponse::done(&action.action_id, "Downloaded", Some(action)); self.bridge_tx.send_action_response(status).await; } DownloadResult::Err(e) => { - self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await; + self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await; } DownloadResult::Suspended => {} } @@ -187,7 +184,7 @@ impl FileDownloader { } }, Ok(action) = self.actions_rx.recv_async() => { - if action.action_id == self.action_id { + if action.action_id == &state.current.action.action_id { // This handles the edge case when the device is able to receive actions // from the broker but for something goes wrong when pushing action statuses back to the backend // In this case the backend will try sending the same action again @@ -202,10 +199,10 @@ impl FileDownloader { match serde_json::from_str::(&action.payload) .context("Invalid cancel action payload") .and_then(|cancellation| { - if cancellation.action_id == self.action_id { + if cancellation.action_id == state.current.action.action_id { Ok(()) } else { - Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, self.action_id))) + Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id))) } }) .and_then(|_| { @@ -233,7 +230,7 @@ impl FileDownloader { } } - self.bridge_tx.send_action_response(ActionResponse::progress(self.action_id.as_str(), "VerifyingChecksum", 99)).await; + self.bridge_tx.send_action_response(ActionResponse::progress(&state.current.action.action_id, "VerifyingChecksum", 99)).await; if let Err(e) = state.current.meta.verify_checksum() { return DownloadResult::Err(e.to_string()); } @@ -284,7 +281,7 @@ impl FileDownloader { // Retry non-status errors Err(e) if !e.is_status() => { let status = - ActionResponse::progress(&self.action_id, "Download Failed", 0) + ActionResponse::progress(&state.current.action.action_id, "Download Failed", 0) .add_error(e.to_string()); self.bridge_tx.send_action_response(status).await; error!("Download failed: {e:?}"); @@ -296,7 +293,7 @@ impl FileDownloader { }; if let Some(percentage) = state.write_bytes(&chunk)? { let status = - ActionResponse::progress(&self.action_id, "Downloading", percentage); + ActionResponse::progress(&state.current.action.action_id, "Downloading", percentage); self.bridge_tx.send_action_response(status).await; } } @@ -307,12 +304,6 @@ impl FileDownloader { Ok(()) } - - // Forward errors as action response to bridge - async fn forward_error(&mut self, err: Error) { - let status = ActionResponse::failure(&self.action_id, err.to_string()); - self.bridge_tx.send_action_response(status).await; - } } #[cfg(unix)] From ae0356f390e899b3615e73d7575a1c940dbf2f0d Mon Sep 17 00:00:00 2001 From: Sagar Tiwari Date: Thu, 25 Sep 2025 16:24:52 +0530 Subject: [PATCH 2/4] chore: skip download if file has already been downloaded --- rust-toolchain.toml | 2 +- uplink/Cargo.toml | 2 +- uplink/src/base/actions.rs | 2 +- uplink/src/collector/downloader.rs | 141 ++++++++++++++++------------- 4 files changed, 81 insertions(+), 66 deletions(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 4c7b0f1b..5da38ab2 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.80.0" +channel = "1.85.0" targets = [ "x86_64-linux-android", "i686-linux-android", diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index 6ab8f49c..92c8068f 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -2,7 +2,7 @@ name = "uplink" version = "2.18.4" authors = ["tekjar "] -edition = "2021" +edition = "2024" [dependencies] bytes = { workspace = true } diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 2b556799..20370533 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -34,7 +34,7 @@ pub struct ActionResponse { } impl ActionResponse { - fn new(id: &str, state: &str, progress: u8, errors: Vec) -> Self { + pub fn new(id: &str, state: &str, progress: u8, errors: Vec) -> Self { let timestamp = clock() as u64; ActionResponse { diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index e38c1ede..52730dcd 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -131,7 +131,8 @@ impl FileDownloader { DownloadResult::Ok => { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&action.action_id, "Downloaded", Some(action)); + let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]); + status.done_response = Some(action); self.bridge_tx.send_action_response(status).await; } DownloadResult::Err(e) => { @@ -161,7 +162,8 @@ impl FileDownloader { DownloadResult::Ok => { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&action.action_id, "Downloaded", Some(action)); + let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]); + status.done_response = Some(action); self.bridge_tx.send_action_response(status).await; } DownloadResult::Err(e) => { @@ -173,66 +175,70 @@ impl FileDownloader { // Accepts `DownloadState`, sets a timeout for the action async fn download(&mut self, state: &mut DownloadState) -> DownloadResult { - let shutdown_rx = self.shutdown_rx.clone(); - loop { - select! { - o = self.continuous_retry(state) => { - if let Err(e) = o { - return DownloadResult::Err(e.to_string()); - } else { - break; - } - }, - Ok(action) = self.actions_rx.recv_async() => { - if action.action_id == &state.current.action.action_id { - // This handles the edge case when the device is able to receive actions - // from the broker but for something goes wrong when pushing action statuses back to the backend - // In this case the backend will try sending the same action again - // - // TODO: Right now we use the action status pushed by device as confirmation that it - // has received the action. It is not very reliable because as of now the action status pipeline can drop messages. - // Would it be better if the backend used MQTT Ack of the action message instead? - log::error!("Backend tried sending the same action again!"); - } else if action.name != "cancel_action" { - self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await; - } else { - match serde_json::from_str::(&action.payload) - .context("Invalid cancel action payload") - .and_then(|cancellation| { - if cancellation.action_id == state.current.action.action_id { - Ok(()) - } else { - Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id))) - } - }) - .and_then(|_| { - state.clean() - .context("Couldn't couldn't perform cleanup") - }) { - Ok(_) => { - self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await; - return DownloadResult::Err("action has been cancelled!".to_string()); - }, - Err(e) => { - self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await; - }, + if state.bytes_written < state.current.meta.content_length { + let shutdown_rx = self.shutdown_rx.clone(); + loop { + select! { + o = self.continuous_retry(state) => { + if let Err(e) = o { + return DownloadResult::Err(e.to_string()); + } else { + break; } - } - }, + }, + Ok(action) = self.actions_rx.recv_async() => { + if &action.action_id == &state.current.action.action_id { + // This handles the edge case when the device is able to receive actions + // from the broker but for something goes wrong when pushing action statuses back to the backend + // In this case the backend will try sending the same action again + // + // TODO: Right now we use the action status pushed by device as confirmation that it + // has received the action. It is not very reliable because as of now the action status pipeline can drop messages. + // Would it be better if the backend used MQTT Ack of the action message instead? + log::error!("Backend tried sending the same action again!"); + } else if action.name != "cancel_action" { + self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await; + } else { + match serde_json::from_str::(&action.payload) + .context("Invalid cancel action payload") + .and_then(|cancellation| { + if cancellation.action_id == state.current.action.action_id { + Ok(()) + } else { + Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id))) + } + }) + .and_then(|_| { + state.clean() + .context("Couldn't couldn't perform cleanup") + }) { + Ok(_) => { + self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await; + return DownloadResult::Err("action has been cancelled!".to_string()); + }, + Err(e) => { + self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await; + }, + } + } + }, - Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => { - if let Err(e) = state.save(&self.config) { - error!("Error saving current_download: {e:?}"); - } + Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => { + if let Err(e) = state.save(&self.config) { + error!("Error saving current_download: {e:?}"); + } - return DownloadResult::Suspended; - }, + return DownloadResult::Suspended; + }, + } } } self.bridge_tx.send_action_response(ActionResponse::progress(&state.current.action.action_id, "VerifyingChecksum", 99)).await; - if let Err(e) = state.current.meta.verify_checksum() { - return DownloadResult::Err(e.to_string()); + if state.current.meta.checksum.is_some() { + if let Err(e) = state.current.meta.verify_checksum() { + return DownloadResult::Err(e.to_string()); + } } // Update Action payload with `download_path`, i.e. downloaded file's location in fs state.current.action.payload = match serde_json::to_string(&state.current.meta) { @@ -324,9 +330,7 @@ fn create_dirs_with_perms(path: &Path, perms: Permissions) -> std::io::Result<() } /// Creates file to download into -fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBuf), Error> { - let mut file_path = download_path.to_owned(); - file_path.push(file_name); +fn create_file(file_path: &Path) -> Result { // NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it if let Ok(f) = metadata(&file_path) { if f.is_dir() { @@ -337,7 +341,7 @@ fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBu #[cfg(unix)] file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?; - Ok((file, file_path)) + Ok(file) } fn check_disk_size(config: &DownloaderConfig, download: &DownloadFile) -> Result<(), Error> { @@ -372,14 +376,13 @@ pub struct DownloadFile { impl DownloadFile { fn verify_checksum(&self) -> Result<(), Error> { - let Some(checksum) = &self.checksum else { return Ok(()) }; let path = self.download_path.as_ref().expect("Downloader didn't set \"download_path\""); let mut file = File::open(path)?; let mut hasher = Sha256::new(); io::copy(&mut file, &mut hasher)?; let hash = hasher.finalize(); - if checksum != &hex::encode(hash) { + if self.checksum.as_ref().unwrap() != &hex::encode(hash) { return Err(Error::BadChecksum); } @@ -429,6 +432,19 @@ impl DownloadState { }; let file_path = path.join(&meta.file_name); + meta.download_path = Some(file_path.clone()); + if meta.checksum.is_some() && meta.verify_checksum().is_ok() { + // TODO: verify that range of size zero works as expected with consoled, platform, + info!("file has already been downloaded and its checksum matches, skipping download..."); + return Ok(Self { + bytes_written: meta.content_length, + current: CurrentDownload { action, meta }, + file: File::open("/dev/null")?, + percentage_downloaded: 100, + start: Instant::now(), + }); + } + let _ = remove_file(&file_path); let _ = remove_dir_all(&file_path); @@ -437,7 +453,7 @@ impl DownloadState { let url = meta.url.clone(); // Create file to actually download into - let (file, file_path) = create_file(&path, &meta.file_name)?; + let file = create_file(&file_path)?; // Retry downloading upto 3 times in case of connectivity issues // TODO: Error out for 1XX/3XX responses info!( @@ -445,7 +461,6 @@ impl DownloadState { file_path.display(), human_bytes(meta.content_length as f64) ); - meta.download_path = Some(file_path); let current = CurrentDownload { action, meta }; Ok(Self { From cc93a82d1f8818e36c7602185f44400d961e9f16 Mon Sep 17 00:00:00 2001 From: Sagar Tiwari Date: Tue, 30 Sep 2025 16:26:24 +0530 Subject: [PATCH 3/4] chore: some cleanup --- Cargo.lock | 4 +- uplink/Cargo.toml | 2 +- uplink/src/collector/downloader.rs | 112 +++++++++++++++-------------- 3 files changed, 62 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4fe5f469..1814343b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -3492,7 +3492,7 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "uplink" -version = "2.18.4" +version = "2.18.5" dependencies = [ "anyhow", "async-trait", diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index 92c8068f..076ba0d6 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "uplink" -version = "2.18.4" +version = "2.18.5" authors = ["tekjar "] edition = "2024" diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 52730dcd..ef516c56 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -175,62 +175,64 @@ impl FileDownloader { // Accepts `DownloadState`, sets a timeout for the action async fn download(&mut self, state: &mut DownloadState) -> DownloadResult { - if state.bytes_written < state.current.meta.content_length { - let shutdown_rx = self.shutdown_rx.clone(); - loop { - select! { - o = self.continuous_retry(state) => { - if let Err(e) = o { - return DownloadResult::Err(e.to_string()); - } else { - break; - } - }, - Ok(action) = self.actions_rx.recv_async() => { - if &action.action_id == &state.current.action.action_id { - // This handles the edge case when the device is able to receive actions - // from the broker but for something goes wrong when pushing action statuses back to the backend - // In this case the backend will try sending the same action again - // - // TODO: Right now we use the action status pushed by device as confirmation that it - // has received the action. It is not very reliable because as of now the action status pipeline can drop messages. - // Would it be better if the backend used MQTT Ack of the action message instead? - log::error!("Backend tried sending the same action again!"); - } else if action.name != "cancel_action" { - self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await; - } else { - match serde_json::from_str::(&action.payload) - .context("Invalid cancel action payload") - .and_then(|cancellation| { - if cancellation.action_id == state.current.action.action_id { - Ok(()) - } else { - Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id))) - } - }) - .and_then(|_| { - state.clean() - .context("Couldn't couldn't perform cleanup") - }) { - Ok(_) => { - self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await; - return DownloadResult::Err("action has been cancelled!".to_string()); - }, - Err(e) => { - self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await; - }, - } - } - }, + if state.already_downloaded { + return DownloadResult::Ok; + } - Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => { - if let Err(e) = state.save(&self.config) { - error!("Error saving current_download: {e:?}"); + let shutdown_rx = self.shutdown_rx.clone(); + loop { + select! { + o = self.continuous_retry(state) => { + if let Err(e) = o { + return DownloadResult::Err(e.to_string()); + } else { + break; + } + }, + Ok(action) = self.actions_rx.recv_async() => { + if &action.action_id == &state.current.action.action_id { + // This handles the edge case when the device is able to receive actions + // from the broker but for something goes wrong when pushing action statuses back to the backend + // In this case the backend will try sending the same action again + // + // TODO: Right now we use the action status pushed by device as confirmation that it + // has received the action. It is not very reliable because as of now the action status pipeline can drop messages. + // Would it be better if the backend used MQTT Ack of the action message instead? + log::error!("Backend tried sending the same action again!"); + } else if action.name != "cancel_action" { + self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), "Downloader is already occupied")).await; + } else { + match serde_json::from_str::(&action.payload) + .context("Invalid cancel action payload") + .and_then(|cancellation| { + if cancellation.action_id == state.current.action.action_id { + Ok(()) + } else { + Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id))) + } + }) + .and_then(|_| { + state.clean() + .context("Couldn't couldn't perform cleanup") + }) { + Ok(_) => { + self.bridge_tx.send_action_response(ActionResponse::success(action.action_id.as_str())).await; + return DownloadResult::Err("action has been cancelled!".to_string()); + }, + Err(e) => { + self.bridge_tx.send_action_response(ActionResponse::failure(action.action_id.as_str(), format!("Could not stop download: {e:?}"))).await; + }, } + } + }, - return DownloadResult::Suspended; - }, - } + Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => { + if let Err(e) = state.save(&self.config) { + error!("Error saving current_download: {e:?}"); + } + + return DownloadResult::Suspended; + }, } } @@ -404,6 +406,7 @@ struct DownloadState { file: File, bytes_written: usize, percentage_downloaded: u8, + already_downloaded: bool, start: Instant, } @@ -441,6 +444,7 @@ impl DownloadState { current: CurrentDownload { action, meta }, file: File::open("/dev/null")?, percentage_downloaded: 100, + already_downloaded: true, start: Instant::now(), }); } @@ -468,6 +472,7 @@ impl DownloadState { file, bytes_written: 0, percentage_downloaded: 0, + already_downloaded: false, start: Instant::now(), }) } @@ -495,6 +500,7 @@ impl DownloadState { file, bytes_written, percentage_downloaded: 0, + already_downloaded: false, start: Instant::now(), }) } From acfb395d293ffbed17683d5cc7766d40c5df26cc Mon Sep 17 00:00:00 2001 From: Sagar Tiwari Date: Wed, 1 Oct 2025 13:25:16 +0530 Subject: [PATCH 4/4] chore: some cleanup --- uplink/src/collector/downloader.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index ef516c56..6f301f95 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -437,7 +437,6 @@ impl DownloadState { let file_path = path.join(&meta.file_name); meta.download_path = Some(file_path.clone()); if meta.checksum.is_some() && meta.verify_checksum().is_ok() { - // TODO: verify that range of size zero works as expected with consoled, platform, info!("file has already been downloaded and its checksum matches, skipping download..."); return Ok(Self { bytes_written: meta.content_length,