Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 174 additions & 174 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion stsync-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ log = "0.4.17"
tracing ={ version = "0.1.37", features = ["log-always"] }
socket2 = "0.4.7"
snowflaked = { version = "0.1.5", features = ["sync"] }
bytes = "1.2.1"
tokio-stream = { version = "0.1.11", features = ["sync"] }
hyper = { version = "0.14.20", features = ["http1", "http2", "server"] }
ahash = "0.8.0"
Expand All @@ -32,6 +31,8 @@ pin-project = "1.0.12"
serde_json = "1.0.87"
toml = "0.5.9"

polymock = { git = "https://github.com/MrGunflame/polymock-rs" }

[target.'cfg(unix)'.dependencies]
nix = { version = "0.26.1", features = ["signal"] }

Expand Down
3 changes: 2 additions & 1 deletion stsync-proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::Path;
use serde::{Deserialize, Serialize};

use crate::srt;
use crate::srt::config::Workers;

#[derive(Serialize, Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -37,7 +38,7 @@ pub struct Http {
pub struct Srt {
pub enabled: bool,
pub bind: SocketAddr,
pub workers: Option<usize>,
pub workers: Workers,

pub rcvbuf: usize,
pub sndbuf: usize,
Expand Down
4 changes: 4 additions & 0 deletions stsync-proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#![deny(unsafe_op_in_unsafe_fn)]
#![deny(unused_crate_dependencies)]

// Use the bytes drop-in replacement types until the bytes
// crate allows public vtable creation.
extern crate polymock as bytes;

// We only import log to remove trace and debug levels at compile time.
use log as _;

Expand Down
14 changes: 13 additions & 1 deletion stsync-proxy/src/srt/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::net::SocketAddr;
use std::thread::available_parallelism;

use serde::{Deserialize, Serialize};

Expand All @@ -7,7 +8,7 @@ pub struct Config {
/// The tuple to bind the server to.
pub bind: SocketAddr,
/// The number of workers or ``
pub workers: Option<usize>,
pub workers: Workers,

/// The size of `SO_RCVBUF` in bytes.
pub rcvbuf: usize,
Expand All @@ -21,3 +22,14 @@ pub struct Config {
/// Latency in millis
pub latency: u16,
}

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Workers(Option<usize>);

impl Workers {
pub fn get(self) -> usize {
self.0
.unwrap_or_else(|| available_parallelism().map(|n| n.get()).unwrap_or(1))
}
}
3 changes: 2 additions & 1 deletion stsync-proxy/src/srt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ impl Default for DataPacket {

Self {
header,
data: Bytes::new(),
// FIXME: Change back to `Bytes::new()` when possible.
data: Bytes::copy_from_slice(&[]),
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions stsync-proxy/src/srt/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bytes::BytesMut;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use std::future::Future;
Expand Down Expand Up @@ -45,11 +45,7 @@ where
tracing::info!("Srt socket listening on {}", socket.local_addr()?);
tracing::info!("Socket Recv-Q: {}, Send-Q: {}", rx, tx);

let num_workers = config.workers.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
});
let num_workers = config.workers.get();

let socket = Arc::new(socket);
let state = State::new(session_manager, config);
Expand Down Expand Up @@ -164,11 +160,14 @@ impl Worker {
);

loop {
let mut buf = BytesMut::zeroed(1500);
let mut buf = state.arena.alloc(1500);

let (len, addr) = socket.recv_from(&mut buf).await?;
tracing::trace!("[{}] Got {} bytes from {}", ident, len, addr);
buf.truncate(len);

let mut buf = buf.freeze();

let packet = match Packet::decode(&mut buf) {
Ok(packet) => packet,
Err(err) => {
Expand Down
5 changes: 5 additions & 0 deletions stsync-proxy/src/srt/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;

use ahash::{AHashMap, AHashSet};
use parking_lot::{Mutex, RwLock};
use polymock::Arena;
use rand::rngs::OsRng;
use rand::RngCore;

Expand All @@ -29,6 +30,8 @@ where
S: SessionManager,
{
pub fn new(session_manager: S, config: Config) -> Self {
let arena = Arena::new(config.mtu as usize * config.flow_window as usize);

Self {
inner: Arc::new(StateInner {
config: config,
Expand All @@ -37,6 +40,7 @@ where
session_manager,
conn_metrics: Mutex::new(AHashMap::new()),
metrics: ServerMetrics::new(),
arena,
}),
}
}
Expand Down Expand Up @@ -77,6 +81,7 @@ where
pub session_manager: S,
pub conn_metrics: Mutex<AHashMap<ConnectionId, Arc<ConnectionMetrics>>>,
pub metrics: ServerMetrics,
pub arena: Arena,
}

impl<S> StateInner<S>
Expand Down