Skip to content

Commit f6bdda8

Browse files
committed
reorganize ipc
1 parent 2844727 commit f6bdda8

File tree

1 file changed

+29
-10
lines changed

1 file changed

+29
-10
lines changed

crates/funcd/src/ipc.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ use anyhow::Result;
22
use serde::{Deserialize, Serialize};
33
use std::fs;
44
use std::path::{Path, PathBuf};
5+
use std::sync::{Arc, Mutex};
56
use tokio::io::{AsyncBufReadExt, BufReader};
67
use tokio::net::UnixListener;
78
use tokio::sync::oneshot;
8-
use tracing::info;
9+
use tracing::{error, info};
910

1011
#[derive(Debug, Clone, Serialize, Deserialize)]
1112
#[serde(tag = "kind", content = "payload")]
@@ -18,7 +19,7 @@ pub enum Message {
1819
pub struct Socket {
1920
path: PathBuf,
2021
listener: UnixListener,
21-
port_tx: Option<oneshot::Sender<u16>>,
22+
port_tx: Arc<Mutex<Option<oneshot::Sender<u16>>>>,
2223
}
2324

2425
impl Socket {
@@ -36,11 +37,11 @@ impl Socket {
3637
Ok(Self {
3738
path,
3839
listener,
39-
port_tx: Some(port_tx),
40+
port_tx: Arc::new(Mutex::new(Some(port_tx))),
4041
})
4142
}
4243

43-
pub async fn listen(mut self) -> Result<()> {
44+
pub async fn listen(self) -> Result<()> {
4445
info!(
4546
component = "socket",
4647
socket = %self.path.display(),
@@ -52,20 +53,18 @@ impl Socket {
5253
Ok((stream, _)) => {
5354
info!("new connection on unix socket");
5455

55-
let port_tx = self.port_tx.take();
56+
let port_tx = Arc::clone(&self.port_tx);
5657
tokio::spawn(async move {
5758
let reader = BufReader::new(stream);
5859
let mut lines = reader.lines();
59-
let mut port_tx = port_tx;
6060

6161
while let Ok(Some(line)) = lines.next_line().await {
6262
match serde_json::from_str::<Message>(&line) {
6363
Ok(msg) => {
64-
info!(message = ?msg, "received message");
65-
if let Message::Ready { port } = msg
66-
&& let Some(tx) = port_tx.take()
64+
if let Err(e) =
65+
Self::handle_message(Arc::clone(&port_tx), msg).await
6766
{
68-
let _ = tx.send(port);
67+
error!(error = %e, "failed to handle message");
6968
}
7069
}
7170
Err(e) => info!(error = %e, "failed to parse message"),
@@ -81,4 +80,24 @@ impl Socket {
8180
}
8281
}
8382
}
83+
84+
pub async fn handle_message(
85+
port_tx: Arc<Mutex<Option<oneshot::Sender<u16>>>>,
86+
message: Message,
87+
) -> Result<()> {
88+
info!(message = ?message, "received message");
89+
90+
match message {
91+
Message::Ping => {}
92+
Message::Ready { port } => {
93+
if let Ok(mut guard) = port_tx.lock()
94+
&& let Some(tx) = guard.take()
95+
{
96+
let _ = tx.send(port);
97+
}
98+
}
99+
}
100+
101+
Ok(())
102+
}
84103
}

0 commit comments

Comments
 (0)