Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[toolchain]
channel = "1.80.0"
channel = "1.85.0"
targets = [
"x86_64-linux-android",
"i686-linux-android",
Expand Down
4 changes: 2 additions & 2 deletions uplink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "uplink"
version = "2.18.4"
version = "2.18.5"
authors = ["tekjar <[email protected]>"]
edition = "2021"
edition = "2024"

[dependencies]
bytes = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct ActionResponse {
}

impl ActionResponse {
fn new(id: &str, state: &str, progress: u8, errors: Vec<String>) -> Self {
pub fn new(id: &str, state: &str, progress: u8, errors: Vec<String>) -> Self {
let timestamp = clock() as u64;

ActionResponse {
Expand Down
75 changes: 43 additions & 32 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ enum DownloadResult {
pub struct FileDownloader {
config: DownloaderConfig,
actions_rx: Receiver<Action>,
action_id: String,
bridge_tx: BridgeTx,
client: Client,
shutdown_rx: Receiver<DownloaderShutdown>,
Expand Down Expand Up @@ -102,7 +101,6 @@ impl FileDownloader {
actions_rx,
client,
bridge_tx,
action_id: String::default(),
shutdown_rx,
disabled,
})
Expand All @@ -116,28 +114,29 @@ impl FileDownloader {

info!("Downloader thread is ready to receive download actions");
while let Ok(action) = self.actions_rx.recv_async().await {
action.action_id.clone_into(&mut self.action_id);
let action_id = action.action_id.clone();
let mut state = match DownloadState::new(action, &self.config) {
Ok(s) => s,
Err(e) => {
self.forward_error(e).await;
self.bridge_tx.send_action_response(ActionResponse::failure(&action_id, e.to_string())).await;
continue;
}
};

// Update action status for process initiated
let status = ActionResponse::progress(&self.action_id, "Downloading", 0);
let status = ActionResponse::progress(&state.current.action.action_id, "Downloading", 0);
self.bridge_tx.send_action_response(status).await;

match self.download(&mut state).await {
DownloadResult::Ok => {
// Forward updated action as part of response
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action));
let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]);
status.done_response = Some(action);
self.bridge_tx.send_action_response(status).await;
}
DownloadResult::Err(e) => {
self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await;
self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await;
}
DownloadResult::Suspended => {
break
Expand All @@ -158,24 +157,28 @@ impl FileDownloader {
return;
}
};
state.current.action.action_id.clone_into(&mut self.action_id);

match self.download(&mut state).await {
DownloadResult::Ok => {
// Forward updated action as part of response
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action));
let mut status = ActionResponse::new(&action.action_id, "Downloaded", 100, vec![]);
status.done_response = Some(action);
self.bridge_tx.send_action_response(status).await;
}
DownloadResult::Err(e) => {
self.bridge_tx.send_action_response(ActionResponse::failure(&self.action_id, e)).await;
self.bridge_tx.send_action_response(ActionResponse::failure(&state.current.action.action_id, e)).await;
}
DownloadResult::Suspended => {}
}
}

// Accepts `DownloadState`, sets a timeout for the action
async fn download(&mut self, state: &mut DownloadState) -> DownloadResult {
if state.already_downloaded {
return DownloadResult::Ok;
}

let shutdown_rx = self.shutdown_rx.clone();
loop {
select! {
Expand All @@ -187,7 +190,7 @@ impl FileDownloader {
}
},
Ok(action) = self.actions_rx.recv_async() => {
if action.action_id == self.action_id {
if &action.action_id == &state.current.action.action_id {
// This handles the edge case when the device is able to receive actions
// from the broker but for something goes wrong when pushing action statuses back to the backend
// In this case the backend will try sending the same action again
Expand All @@ -202,10 +205,10 @@ impl FileDownloader {
match serde_json::from_str::<Cancellation>(&action.payload)
.context("Invalid cancel action payload")
.and_then(|cancellation| {
if cancellation.action_id == self.action_id {
if cancellation.action_id == state.current.action.action_id {
Ok(())
} else {
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, self.action_id)))
Err(anyhow::Error::msg(format!("Cancel action target ({}) doesn't match active download action id ({})", cancellation.action_id, &state.current.action.action_id)))
}
})
.and_then(|_| {
Expand Down Expand Up @@ -233,9 +236,11 @@ impl FileDownloader {
}
}

self.bridge_tx.send_action_response(ActionResponse::progress(self.action_id.as_str(), "VerifyingChecksum", 99)).await;
if let Err(e) = state.current.meta.verify_checksum() {
return DownloadResult::Err(e.to_string());
self.bridge_tx.send_action_response(ActionResponse::progress(&state.current.action.action_id, "VerifyingChecksum", 99)).await;
if state.current.meta.checksum.is_some() {
if let Err(e) = state.current.meta.verify_checksum() {
return DownloadResult::Err(e.to_string());
}
}
// Update Action payload with `download_path`, i.e. downloaded file's location in fs
state.current.action.payload = match serde_json::to_string(&state.current.meta) {
Expand Down Expand Up @@ -284,7 +289,7 @@ impl FileDownloader {
// Retry non-status errors
Err(e) if !e.is_status() => {
let status =
ActionResponse::progress(&self.action_id, "Download Failed", 0)
ActionResponse::progress(&state.current.action.action_id, "Download Failed", 0)
.add_error(e.to_string());
self.bridge_tx.send_action_response(status).await;
error!("Download failed: {e:?}");
Expand All @@ -296,7 +301,7 @@ impl FileDownloader {
};
if let Some(percentage) = state.write_bytes(&chunk)? {
let status =
ActionResponse::progress(&self.action_id, "Downloading", percentage);
ActionResponse::progress(&state.current.action.action_id, "Downloading", percentage);
self.bridge_tx.send_action_response(status).await;
}
}
Expand All @@ -307,12 +312,6 @@ impl FileDownloader {

Ok(())
}

// Forward errors as action response to bridge
async fn forward_error(&mut self, err: Error) {
let status = ActionResponse::failure(&self.action_id, err.to_string());
self.bridge_tx.send_action_response(status).await;
}
}

#[cfg(unix)]
Expand All @@ -333,9 +332,7 @@ fn create_dirs_with_perms(path: &Path, perms: Permissions) -> std::io::Result<()
}

/// Creates file to download into
fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBuf), Error> {
let mut file_path = download_path.to_owned();
file_path.push(file_name);
fn create_file(file_path: &Path) -> Result<File, Error> {
// NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it
if let Ok(f) = metadata(&file_path) {
if f.is_dir() {
Expand All @@ -346,7 +343,7 @@ fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBu
#[cfg(unix)]
file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?;

Ok((file, file_path))
Ok(file)
}

fn check_disk_size(config: &DownloaderConfig, download: &DownloadFile) -> Result<(), Error> {
Expand Down Expand Up @@ -381,14 +378,13 @@ pub struct DownloadFile {

impl DownloadFile {
fn verify_checksum(&self) -> Result<(), Error> {
let Some(checksum) = &self.checksum else { return Ok(()) };
let path = self.download_path.as_ref().expect("Downloader didn't set \"download_path\"");
let mut file = File::open(path)?;
let mut hasher = Sha256::new();
io::copy(&mut file, &mut hasher)?;
let hash = hasher.finalize();

if checksum != &hex::encode(hash) {
if self.checksum.as_ref().unwrap() != &hex::encode(hash) {
return Err(Error::BadChecksum);
}

Expand All @@ -410,6 +406,7 @@ struct DownloadState {
file: File,
bytes_written: usize,
percentage_downloaded: u8,
already_downloaded: bool,
start: Instant,
}

Expand Down Expand Up @@ -438,6 +435,19 @@ impl DownloadState {
};

let file_path = path.join(&meta.file_name);
meta.download_path = Some(file_path.clone());
if meta.checksum.is_some() && meta.verify_checksum().is_ok() {
info!("file has already been downloaded and its checksum matches, skipping download...");
return Ok(Self {
bytes_written: meta.content_length,
current: CurrentDownload { action, meta },
file: File::open("/dev/null")?,
percentage_downloaded: 100,
already_downloaded: true,
start: Instant::now(),
});
}

let _ = remove_file(&file_path);
let _ = remove_dir_all(&file_path);

Expand All @@ -446,22 +456,22 @@ impl DownloadState {
let url = meta.url.clone();

// Create file to actually download into
let (file, file_path) = create_file(&path, &meta.file_name)?;
let file = create_file(&file_path)?;
// Retry downloading upto 3 times in case of connectivity issues
// TODO: Error out for 1XX/3XX responses
info!(
"Downloading from {url} into {}; size = {}",
file_path.display(),
human_bytes(meta.content_length as f64)
);
meta.download_path = Some(file_path);
let current = CurrentDownload { action, meta };

Ok(Self {
current,
file,
bytes_written: 0,
percentage_downloaded: 0,
already_downloaded: false,
start: Instant::now(),
})
}
Expand Down Expand Up @@ -489,6 +499,7 @@ impl DownloadState {
file,
bytes_written,
percentage_downloaded: 0,
already_downloaded: false,
start: Instant::now(),
})
}
Expand Down
Loading