Skip to content

Commit c5cf1b3

Browse files
author
bay
committed
Fix onion service restarting issues. Callback for onion service status
1 parent b738b42 commit c5cf1b3

File tree

3 files changed

+58
-8
lines changed

3 files changed

+58
-8
lines changed

p2p/src/listen.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,22 @@ use std::sync::Arc;
2121
use std::time::Duration;
2222
use std::{io, thread};
2323

24-
pub fn listen<F, G, H>(
24+
pub fn listen<F, G, H, K>(
2525
context_id: u32,
2626
stop_state: Arc<StopState>,
2727
tor_config: Option<TorConfig>,
2828
listen_addr: Option<SocketAddr>, // Port in needed in not internal tor will be used
2929
onion_expanded_key: Option<[u8; 64]>,
3030
started_service_callback: Option<F>,
3131
failed_service_callback: Option<G>,
32+
service_status_callback: Option<K>,
3233
handle_new_peer_callback: H,
3334
) -> Result<(), Error>
3435
where
3536
F: Fn(Option<String>),
3637
G: Fn(&Error) -> bool, // return true if want to exit on falure
3738
H: Fn(TcpDataStream, Option<PeerAddr>),
39+
K: Fn(bool) + Send + 'static + std::marker::Sync, // return true if want to exit on falure
3840
{
3941
let tor_config = tor_config.unwrap_or(TorConfig::no_tor_config());
4042

@@ -49,10 +51,11 @@ where
4951
onion_expanded_key,
5052
started_service_callback,
5153
failed_service_callback,
54+
service_status_callback,
5255
handle_new_peer_callback,
5356
)
5457
} else {
55-
// Listening on extended Tor, listening on sockets
58+
// Listening on external Tor, listening on sockets
5659
let onion_address = tor_config.onion_address.clone().ok_or(Error::ConfigError(
5760
"For tor external config, internal onion address is not specified.".into(),
5861
))?;
@@ -67,6 +70,7 @@ where
6770
Some(onion_address),
6871
started_service_callback,
6972
failed_service_callback,
73+
service_status_callback,
7074
handle_new_peer_callback,
7175
)
7276
}
@@ -79,23 +83,26 @@ where
7983
None,
8084
started_service_callback,
8185
failed_service_callback,
86+
service_status_callback,
8287
handle_new_peer_callback,
8388
)
8489
}
8590
}
8691

87-
fn listen_onion_service<F, G, H>(
92+
fn listen_onion_service<F, G, H, K>(
8893
context_id: u32,
8994
stop_state: Arc<StopState>,
9095
onion_expanded_key: Option<[u8; 64]>,
9196
started_service_callback: Option<F>,
9297
failed_service_callback: Option<G>,
98+
service_status_callback: Option<K>,
9399
handle_new_peer_callback: H,
94100
) -> Result<(), Error>
95101
where
96102
F: Fn(Option<String>),
97103
G: Fn(&Error) -> bool, // return true if want to exit on falure
98104
H: Fn(TcpDataStream, Option<PeerAddr>),
105+
K: Fn(bool) + Send + 'static + std::marker::Sync, // return true if want to exit on falure
99106
{
100107
info!("Starting TOR (Arti) service...");
101108

@@ -110,36 +117,45 @@ where
110117
"mwc-node",
111118
started_service_callback,
112119
failed_service_callback,
120+
service_status_callback,
113121
handle_new_peer_callback,
114122
)
115123
}
116124

117125
/// Starts a new TCP server and listen to incoming connections. This is a
118126
/// blocking call until the TCP server stops.
119-
fn listen_socket<F, G, H>(
127+
fn listen_socket<F, G, H, K>(
120128
stop_state: Arc<StopState>,
121129
host: IpAddr,
122130
port: u16,
123131
onion_service_address: Option<String>,
124132
started_service_callback: Option<F>,
125133
failed_service_callback: Option<G>,
134+
service_status_callback: Option<K>,
126135
handle_new_peer_callback: H,
127136
) -> Result<(), Error>
128137
where
129138
F: Fn(Option<String>),
130139
G: Fn(&Error) -> bool, // return true if want to exit on falure
131140
H: Fn(TcpDataStream, Option<PeerAddr>),
141+
K: Fn(bool) + Send + 'static + std::marker::Sync, // return true if want to exit on falure
132142
{
133143
// start TCP listener and handle incoming connections
134144
let addr = SocketAddr::new(host, port);
135145
let listener = match run_global_async_block(async { TcpListener::bind(addr).await }) {
136146
Ok(listener) => {
147+
if let Some(f) = service_status_callback.as_ref() {
148+
f(true)
149+
}
137150
if let Some(started_service_callback) = started_service_callback {
138151
started_service_callback(onion_service_address);
139152
}
140153
listener
141154
}
142155
Err(e) => {
156+
if let Some(f) = service_status_callback.as_ref() {
157+
f(false)
158+
}
143159
let err = Error::TorProcess(format!(
144160
"Unable to start listening on {}:{}, {}",
145161
host, port, e
@@ -215,5 +231,9 @@ where
215231
}
216232
thread::sleep(Duration::from_millis(5));
217233
}
234+
if let Some(f) = service_status_callback.as_ref() {
235+
f(false)
236+
}
237+
218238
Ok(())
219239
}

p2p/src/serv.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ impl Server {
193193
onion_expanded_key,
194194
Some(service_started_callback),
195195
Some(service_failed_callback),
196+
None::<Box<dyn Fn(bool) + Send + Sync + 'static>>,
196197
handle_new_connection_callback,
197198
)
198199
}

p2p/src/tor/onion_service.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ use crate::{Error, PeerAddr};
1919
use async_std::stream::StreamExt;
2020
use mwc_util::StopState;
2121
use std::pin::Pin;
22-
use std::sync::atomic::{AtomicI64, Ordering};
22+
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
2323
use std::sync::Arc;
2424
use std::thread;
2525
use std::time::{Duration, Instant};
2626
use tor_cell::relaycell::msg::Connected;
2727
use tor_hsservice::status::State;
2828
use tor_proto::client::stream::IncomingStreamRequest;
2929

30+
static SERVICE_COUNTER: AtomicU32 = AtomicU32::new(0);
31+
3032
// started_service_callback accepting onion address
3133
fn start_arti<F>(
3234
context_id: u32,
@@ -51,7 +53,12 @@ where
5153
) = arti::access_arti(|tor_client| {
5254
let (onion_service, onion_address, incoming_requests) = ArtiCore::start_onion_service(
5355
&tor_client,
54-
format!("onion-service-{}-{}", service_name, context_id),
56+
format!(
57+
"onion-service-{}-{}-{}",
58+
service_name,
59+
context_id,
60+
SERVICE_COUNTER.fetch_add(1, Ordering::Relaxed)
61+
),
5562
onion_expanded_key.clone(),
5663
)?;
5764
Ok((
@@ -77,24 +84,31 @@ where
7784
/// Listening on Onion service.
7885
/// Here is we have a full workflow that monitors the onion service, restarting on network failre. It is
7986
/// Used at both mwc-node and mwc-wallet.
80-
pub fn listen_onion_service<F, G, H>(
87+
pub fn listen_onion_service<F, G, H, K>(
8188
context_id: u32,
8289
stop_state: Arc<StopState>,
8390
onion_expanded_key: [u8; 64],
8491
service_name: &str,
8592
started_service_callback: Option<F>,
8693
failed_service_callback: Option<G>,
94+
service_status_callback: Option<K>,
8795
handle_new_peer_callback: H,
8896
) -> Result<(), Error>
8997
where
9098
F: Fn(Option<String>),
9199
G: Fn(&Error) -> bool, // return true if want to exit on falure
92100
H: Fn(TcpDataStream, Option<PeerAddr>),
101+
K: Fn(bool) + Send + 'static + std::marker::Sync, // return true if want to exit on falure
93102
{
94103
//
95104
//let handle_new_peer_callback = Arc::new(handle_new_peer_callback);
96105
let arti_streams = AtomicI64::new(0);
97106

107+
let service_status_callback = Arc::new(service_status_callback);
108+
if let Some(f) = &(*service_status_callback) {
109+
f(false);
110+
};
111+
98112
loop {
99113
if stop_state.is_stopped() {
100114
break;
@@ -119,6 +133,7 @@ where
119133
let stop_state2 = stop_state.clone();
120134
let context_id2 = context_id;
121135
let service_name2 = String::from(service_name);
136+
let service_status_callback2 = service_status_callback.clone();
122137

123138
let monitoring = thread::Builder::new()
124139
.name(format!(
@@ -127,8 +142,13 @@ where
127142
))
128143
.spawn(move || {
129144
let mut last_running_time = Instant::now();
145+
if let Some(f) = &(*service_status_callback2) {
146+
f(false);
147+
};
130148
loop {
131149
if stop_state2.is_stopped() {
150+
arti::unregister_arti_active_object(&onion_service_object);
151+
drop(onion_service);
132152
break;
133153
}
134154
let need_arti_restart = {
@@ -152,10 +172,16 @@ where
152172
| State::DegradedUnreachable
153173
| State::Running => {
154174
last_running_time = Instant::now();
175+
if let Some(f) = &(*service_status_callback2) {
176+
f(true);
177+
};
155178
false
156179
}
157180
State::Broken => true,
158181
_ => {
182+
if let Some(f) = &(*service_status_callback2) {
183+
f(false);
184+
};
159185
let elapsed =
160186
Instant::now().duration_since(last_running_time);
161187
// Giving 3 minutes to arti to restore
@@ -172,8 +198,8 @@ where
172198

173199
if need_arti_restart || arti::is_arti_restarting() {
174200
drop(onion_service);
175-
arti::request_arti_restart();
176201
arti::unregister_arti_active_object(&onion_service_object);
202+
arti::request_arti_restart();
177203
break;
178204
}
179205
for _ in 0..30 {
@@ -183,6 +209,9 @@ where
183209
thread::sleep(Duration::from_secs(1));
184210
}
185211
}
212+
if let Some(f) = &(*service_status_callback2) {
213+
f(false);
214+
};
186215
})
187216
.expect(&format!(
188217
"Unable to start {} onion_service_checker thread",

0 commit comments

Comments
 (0)