Skip to content

Commit bf7903d

Browse files
committed
reorganize channels & comms module
1 parent 09ddc27 commit bf7903d

File tree

9 files changed

+48
-59
lines changed

9 files changed

+48
-59
lines changed

crates/funcgg-runtime/src/http.rs renamed to crates/funcgg-runtime/src/comms.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use anyhow::Result;
22
use http::StatusCode;
33
use serde::{Deserialize, Serialize};
44
use std::collections::HashMap;
5+
use tokio::sync::{mpsc, oneshot};
56
use uuid::Uuid;
67

78
// TODO: see if we can get rid of these "in the middle" types
@@ -34,3 +35,10 @@ impl Response {
3435
.insert("X-FUNC-GG-REQUEST-ID".into(), request_id.to_string());
3536
}
3637
}
38+
39+
#[derive(Debug)]
40+
pub struct Channels {
41+
pub incoming_body_rx: mpsc::Receiver<Result<bytes::Bytes, String>>,
42+
pub outgoing_body_tx: mpsc::Sender<bytes::Bytes>,
43+
pub response_tx: oneshot::Sender<Response>,
44+
}

crates/funcgg-runtime/src/ext/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use std::cell::RefCell;
1010
use std::rc::Rc;
1111
use std::vec;
1212

13-
use super::http;
14-
use super::sandbox::State;
13+
use super::comms;
14+
use super::runtime::State;
1515

1616
mod permissions;
1717
use permissions::Permissions;
@@ -74,18 +74,18 @@ pub fn op_tls_peer_certificate(#[smi] _: u32, _: bool) -> Option<deno_core::serd
7474

7575
#[op2]
7676
#[serde]
77-
fn op_get_request(state: &mut OpState) -> Option<http::Request> {
77+
fn op_get_request(state: &mut OpState) -> Option<comms::Request> {
7878
state.borrow::<Rc<RefCell<State>>>().borrow().req.clone()
7979
}
8080

8181
#[op2(async)]
8282
async fn op_set_response(
8383
state: Rc<RefCell<OpState>>,
84-
#[serde] mut res: http::Response,
84+
#[serde] mut res: comms::Response,
8585
) -> Result<(), JsError> {
8686
let (sender, request_id) = {
8787
let state_borrow = state.borrow();
88-
let sandbox_state = state_borrow.borrow::<Rc<RefCell<super::sandbox::State>>>();
88+
let sandbox_state = state_borrow.borrow::<Rc<RefCell<super::runtime::State>>>();
8989
let mut borrowed = sandbox_state.borrow_mut();
9090
(borrowed.response_tx.take(), borrowed.request_id)
9191
};
@@ -121,7 +121,7 @@ fn op_get_request_id(state: &mut OpState) -> String {
121121
async fn op_read_request_chunk(state: Rc<RefCell<OpState>>) -> Result<Vec<u8>, JsError> {
122122
let receiver = {
123123
let state_borrow = state.borrow();
124-
let sandbox_state = state_borrow.borrow::<Rc<RefCell<super::sandbox::State>>>();
124+
let sandbox_state = state_borrow.borrow::<Rc<RefCell<super::runtime::State>>>();
125125
sandbox_state.borrow().incoming_body_rx.clone()
126126
};
127127

@@ -141,7 +141,7 @@ async fn op_write_response_chunk(
141141
) -> Result<(), JsError> {
142142
let sender = {
143143
let state_borrow = state.borrow();
144-
let sandbox_state = state_borrow.borrow::<Rc<RefCell<super::sandbox::State>>>();
144+
let sandbox_state = state_borrow.borrow::<Rc<RefCell<super::runtime::State>>>();
145145
sandbox_state.borrow().outgoing_body_tx.clone()
146146
};
147147

crates/funcgg-runtime/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod ext;
22
mod loader;
3-
mod sandbox;
3+
mod runtime;
44

5-
pub mod http;
5+
pub mod comms;
66
pub mod snapshot;
7-
pub use sandbox::Sandbox;
7+
pub use runtime::Sandbox;

crates/funcgg-runtime/src/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
mod ext;
21
mod loader;
32
mod sandbox;
43
mod snapshot;

crates/funcgg-runtime/src/sandbox.rs renamed to crates/funcgg-runtime/src/runtime.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use std::time::Duration;
99
use tokio::sync::{Mutex, mpsc, oneshot};
1010
use uuid::Uuid;
1111

12+
use super::comms;
1213
use super::ext;
13-
use super::http;
1414
use super::loader;
1515

1616
static HEAP_LIMIT: usize = 64 * 1024 * 1024; // 64MB
@@ -23,12 +23,12 @@ static USER_MOD_SPECIFIER: LazyLock<Url> =
2323
LazyLock::new(|| "func:user-code".parse().expect("bad module specifier"));
2424

2525
pub struct State {
26-
pub req: Option<http::Request>,
27-
pub res: Option<http::Response>,
26+
pub req: Option<comms::Request>,
27+
pub res: Option<comms::Response>,
2828
pub request_id: Uuid,
2929
pub incoming_body_rx: Rc<Mutex<mpsc::Receiver<Result<bytes::Bytes, String>>>>,
3030
pub outgoing_body_tx: mpsc::Sender<bytes::Bytes>,
31-
pub response_tx: Option<oneshot::Sender<http::Response>>,
31+
pub response_tx: Option<oneshot::Sender<comms::Response>>,
3232
}
3333

3434
pub struct Sandbox {
@@ -40,19 +40,17 @@ impl Sandbox {
4040
pub fn new(
4141
request_id: Uuid,
4242
startup_snapshot: Option<&'static [u8]>,
43-
incoming_body_rx: mpsc::Receiver<Result<bytes::Bytes, String>>,
44-
outgoing_body_tx: mpsc::Sender<bytes::Bytes>,
45-
response_tx: oneshot::Sender<http::Response>,
43+
channels: comms::Channels,
4644
) -> Result<Self> {
4745
_ = CryptoProvider::install_default(aws_lc_rs::default_provider());
4846

4947
let state = Rc::new(RefCell::new(State {
5048
request_id,
5149
req: None,
5250
res: None,
53-
incoming_body_rx: Rc::new(Mutex::new(incoming_body_rx)),
54-
outgoing_body_tx,
55-
response_tx: Some(response_tx),
51+
incoming_body_rx: Rc::new(Mutex::new(channels.incoming_body_rx)),
52+
outgoing_body_tx: channels.outgoing_body_tx,
53+
response_tx: Some(channels.response_tx),
5654
}));
5755

5856
let extension_transpiler = Rc::new(loader::transpile);
@@ -86,7 +84,7 @@ impl Sandbox {
8684
pub async fn execute(
8785
&mut self,
8886
user_code: String,
89-
request: http::Request,
87+
request: comms::Request,
9088
timeout_duration: Duration,
9189
) -> Result<()> {
9290
let execution_result = tokio::time::timeout(timeout_duration, async {

crates/funcgg-runtime/src/snapshot.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use deno_core::snapshot::CreateSnapshotOutput;
22
use std::env;
33
use std::rc::Rc;
44

5-
use super::ext;
6-
use crate::loader;
5+
use super::{ext, loader};
76

87
pub const FILE_NAME: &str = "FUNCGG_RUNTIME_SNAPSHOT.bin";
98

crates/funcgg-worker/src/pool.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ use rand::seq::SliceRandom;
55
use std::collections::HashMap;
66
use std::sync::Arc;
77
use std::time::Duration;
8-
use tokio::sync::{Mutex, mpsc, oneshot};
8+
use tokio::sync::{Mutex, mpsc};
99
use tokio::time::{Instant, sleep};
1010
use uuid::Uuid;
1111

1212
use crate::worker::{Worker, WorkerRequest};
13-
use funcgg_runtime::http;
13+
use funcgg_runtime::comms;
1414

1515
#[derive(Debug)]
1616
pub enum StateChange {
@@ -79,10 +79,8 @@ impl Pool {
7979
pub async fn send_work(
8080
&self,
8181
js_code: String,
82-
http_request: http::Request,
83-
incoming_body_rx: mpsc::Receiver<Result<bytes::Bytes, String>>,
84-
outgoing_body_tx: mpsc::Sender<bytes::Bytes>,
85-
response_tx: oneshot::Sender<http::Response>,
82+
http_request: comms::Request,
83+
channels: comms::Channels,
8684
) -> Result<(), String> {
8785
let request_id = Uuid::now_v7();
8886

@@ -91,9 +89,7 @@ impl Pool {
9189
id: request_id,
9290
js_code,
9391
http_request,
94-
incoming_body_rx,
95-
outgoing_body_tx,
96-
response_tx,
92+
channels,
9793
};
9894

9995
tracing::debug!(

crates/funcgg-worker/src/routes.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use axum::{
66
response::{IntoResponse, Response},
77
};
88
use deno_core::futures::StreamExt;
9-
use funcgg_runtime::http;
9+
use funcgg_runtime::comms;
1010
use std::sync::Arc;
1111
use std::{collections::HashMap, convert::Infallible};
1212
use tokio::sync::{mpsc, oneshot};
@@ -52,25 +52,22 @@ pub async fn invoke(State(pool): State<Arc<Pool>>, request: Request) -> Response
5252
}
5353
});
5454

55-
let req = funcgg_runtime::http::Request {
55+
let req = funcgg_runtime::comms::Request {
5656
method,
5757
uri,
5858
headers,
5959
};
6060

6161
let (response_body_tx, response_body_rx) = mpsc::channel::<bytes::Bytes>(1);
62-
let (response_tx, response_rx) = oneshot::channel::<http::Response>();
62+
let (response_tx, response_rx) = oneshot::channel::<comms::Response>();
6363

64-
if let Err(err) = pool
65-
.send_work(
66-
js_code.to_string(),
67-
req,
68-
body_rx,
69-
response_body_tx,
70-
response_tx,
71-
)
72-
.await
73-
{
64+
let channels = comms::Channels {
65+
incoming_body_rx: body_rx,
66+
outgoing_body_tx: response_body_tx,
67+
response_tx,
68+
};
69+
70+
if let Err(err) = pool.send_work(js_code.to_string(), req, channels).await {
7471
tracing::error!("Handler invocation failed: {}", err);
7572
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error").into_response();
7673
};

crates/funcgg-worker/src/worker.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
use std::time::Duration;
2-
use tokio::sync::{mpsc, oneshot};
2+
use tokio::sync::mpsc;
33
use uuid::Uuid;
44

55
use super::pool::StateChange;
6-
use funcgg_runtime::{Sandbox, http};
6+
use funcgg_runtime::{Sandbox, comms};
77

88
static STARTUP_SNAPSHOT: &[u8] = include_bytes!(env!("SNAPSHOT_PATH"));
99

1010
#[derive(Debug)]
1111
pub struct WorkerRequest {
1212
pub id: Uuid,
1313
pub js_code: String,
14-
pub http_request: http::Request,
15-
pub incoming_body_rx: mpsc::Receiver<Result<bytes::Bytes, String>>,
16-
pub outgoing_body_tx: mpsc::Sender<bytes::Bytes>,
17-
pub response_tx: oneshot::Sender<http::Response>,
14+
pub http_request: comms::Request,
15+
pub channels: comms::Channels,
1816
}
1917

2018
pub struct Worker {
@@ -58,13 +56,7 @@ impl Worker {
5856

5957
// TODO: make this state change to failure on failure
6058
async fn process_request(&self, request: WorkerRequest) -> Result<(), String> {
61-
let mut sandbox = match Sandbox::new(
62-
request.id,
63-
Some(STARTUP_SNAPSHOT),
64-
request.incoming_body_rx,
65-
request.outgoing_body_tx,
66-
request.response_tx,
67-
) {
59+
let mut sandbox = match Sandbox::new(request.id, Some(STARTUP_SNAPSHOT), request.channels) {
6860
Ok(rt) => rt,
6961
Err(e) => {
7062
tracing::error!(

0 commit comments

Comments
 (0)