Skip to content

Commit 28a109a

Browse files
author
Devdutt Shenoi
authored
feat: parallelized actions run immediately (#239)
* feat: parallelizable actions are run immediately Specifically related to tunshell action(`launch_shell`) - Don't care about `current_action` check or set. - Allow running multiple tunshell instances * fix: update logcat.rs * fix: hardcode parallelizability to only tunshell
1 parent 79c3c41 commit 28a109a

File tree

2 files changed

+42
-11
lines changed

2 files changed

+42
-11
lines changed

uplink/src/base/bridge/mod.rs

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use serde_json::Value;
55
use tokio::select;
66
use tokio::time::{self, interval, Instant, Sleep};
77

8+
use std::collections::HashSet;
89
use std::fs;
910
use std::path::PathBuf;
1011
use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration};
@@ -14,13 +15,14 @@ mod metrics;
1415
pub(crate) mod stream;
1516
mod streams;
1617

18+
use super::Compression;
1719
use crate::base::ActionRoute;
1820
use crate::{Action, ActionResponse, Config};
1921
pub use metrics::StreamMetrics;
2022
use stream::Stream;
2123
use streams::Streams;
2224

23-
use super::Compression;
25+
const TUNSHELL_ACTION: &str = "launch_shell";
2426

2527
#[derive(thiserror::Error, Debug)]
2628
pub enum Error {
@@ -123,6 +125,7 @@ pub struct Bridge {
123125
action_redirections: HashMap<String, String>,
124126
/// Current action that is being processed
125127
current_action: Option<CurrentAction>,
128+
parallel_actions: HashSet<String>,
126129
ctrl_rx: Receiver<BridgeCtrl>,
127130
ctrl_tx: Sender<BridgeCtrl>,
128131
shutdown_handle: Sender<()>,
@@ -152,6 +155,7 @@ impl Bridge {
152155
action_routes: HashMap::with_capacity(10),
153156
action_redirections,
154157
current_action: None,
158+
parallel_actions: HashSet::new(),
155159
shutdown_handle,
156160
ctrl_rx,
157161
ctrl_tx,
@@ -183,9 +187,11 @@ impl Bridge {
183187
info!("Received action: {:?}", action);
184188

185189
if let Some(current_action) = &self.current_action {
186-
warn!("Another action is currently occupying uplink; action_id = {}", current_action.id);
187-
self.forward_action_error(action, Error::Busy).await;
188-
continue
190+
if action.name != TUNSHELL_ACTION {
191+
warn!("Another action is currently occupying uplink; action_id = {}", current_action.id);
192+
self.forward_action_error(action, Error::Busy).await;
193+
continue
194+
}
189195
}
190196

191197
// NOTE: Don't do any blocking operations here
@@ -286,9 +292,15 @@ impl Bridge {
286292
/// Handle received actions
287293
fn try_route_action(&mut self, action: Action) -> Result<(), Error> {
288294
match self.action_routes.get(&action.name) {
289-
Some(app_tx) => {
295+
Some(route) => {
290296
let duration =
291-
app_tx.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
297+
route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
298+
// current action left unchanged in case of forwarded action bein
299+
if action.name == TUNSHELL_ACTION {
300+
self.parallel_actions.insert(action.action_id);
301+
return Ok(());
302+
}
303+
292304
self.current_action = Some(CurrentAction::new(action, duration));
293305

294306
Ok(())
@@ -298,6 +310,12 @@ impl Bridge {
298310
}
299311

300312
async fn forward_action_response(&mut self, response: ActionResponse) {
313+
if self.parallel_actions.contains(&response.action_id) {
314+
self.forward_parallel_action_response(response).await;
315+
316+
return;
317+
}
318+
301319
let inflight_action = match &mut self.current_action {
302320
Some(v) => v,
303321
None => {
@@ -353,6 +371,17 @@ impl Bridge {
353371
}
354372
}
355373

374+
async fn forward_parallel_action_response(&mut self, response: ActionResponse) {
375+
info!("Action response = {:?}", response);
376+
if let Err(e) = self.action_status.fill(response.clone()).await {
377+
error!("Failed to fill. Error = {:?}", e);
378+
}
379+
380+
if response.is_completed() || response.is_failed() {
381+
self.parallel_actions.remove(&response.action_id);
382+
}
383+
}
384+
356385
async fn forward_action_error(&mut self, action: Action, error: Error) {
357386
let status = ActionResponse::failure(&action.action_id, error.to_string());
358387

@@ -437,9 +466,10 @@ pub struct BridgeTx {
437466
impl BridgeTx {
438467
pub async fn register_action_route(&self, route: ActionRoute) -> Receiver<Action> {
439468
let (actions_tx, actions_rx) = bounded(1);
440-
let duration = Duration::from_secs(route.timeout);
469+
let ActionRoute { name, timeout } = route;
470+
let duration = Duration::from_secs(timeout);
441471
let action_router = ActionRouter { actions_tx, duration };
442-
let event = Event::RegisterActionRoute(route.name, action_router);
472+
let event = Event::RegisterActionRoute(name, action_router);
443473

444474
// Bridge should always be up and hence unwrap is ok
445475
self.events_tx.send_async(event).await.unwrap();
@@ -458,9 +488,10 @@ impl BridgeTx {
458488
let (actions_tx, actions_rx) = bounded(1);
459489

460490
for route in routes {
461-
let duration = Duration::from_secs(route.timeout);
491+
let ActionRoute { name, timeout } = route;
492+
let duration = Duration::from_secs(timeout);
462493
let action_router = ActionRouter { actions_tx: actions_tx.clone(), duration };
463-
let event = Event::RegisterActionRoute(route.name, action_router);
494+
let event = Event::RegisterActionRoute(name, action_router);
464495
// Bridge should always be up and hence unwrap is ok
465496
self.events_tx.send_async(event).await.unwrap();
466497
}

uplink/src/collector/downloader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ pub struct DownloadFile {
279279

280280
#[cfg(test)]
281281
mod test {
282-
use flume::{TrySendError, bounded};
282+
use flume::{bounded, TrySendError};
283283
use serde_json::json;
284284

285285
use std::{collections::HashMap, time::Duration};

0 commit comments

Comments
 (0)