Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/admin/src/rest_api/cluster_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use axum::Json;
use http::StatusCode;
use okapi_operation::openapi;

use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::{DNSResolution, create_tonic_channel};
use restate_core::protobuf::node_ctl_svc::new_node_ctl_client;
use restate_core::{Metadata, my_node_id};
use restate_types::config::Configuration;
Expand Down Expand Up @@ -43,6 +43,7 @@ pub async fn cluster_health() -> Result<Json<ClusterHealthResponse>, GenericRest
let mut node_ctl_svc_client = new_node_ctl_client(create_tonic_channel(
node_config.address.clone(),
&Configuration::pinned().networking,
DNSResolution::Gai,
));
let cluster_health = node_ctl_svc_client
.cluster_health(())
Expand Down
94 changes: 93 additions & 1 deletion crates/core/src/network/net_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use http::Uri;
Expand All @@ -19,6 +21,7 @@ use hyper_util::rt::TokioIo;
use hyper_util::server::graceful::GracefulShutdown;
use tokio::io;
use tokio::net::UnixStream;
use tokio::task::JoinHandle;
use tokio_util::either::Either;
use tonic::transport::{Channel, Endpoint};
use tracing::{Instrument, Span, debug, error_span, info, instrument, trace};
Expand All @@ -32,12 +35,20 @@ use restate_types::net::listener::Listeners;

use crate::{ShutdownError, TaskCenter, TaskKind, cancellation_watcher};

pub enum DNSResolution {
// use whatever order getaddressinfo returns (http connector will use the first v4 and v6 ips it finds)
Gai,
// pick a single random v4 and v6 ip; useful where the record points to multiple distinct nodes
Headless,
}

pub fn create_tonic_channel<
T: CommonClientConnectionOptions + Send + Sync + ?Sized,
P: ListenerPort + GrpcPort,
>(
address: AdvertisedAddress<P>,
options: &T,
dns_resolution: DNSResolution,
) -> Channel {
let address = address.into_address().expect("valid address");
let endpoint = match &address {
Expand All @@ -59,7 +70,25 @@ pub fn create_tonic_channel<
}
}))
}
PeerNetAddress::Http(_) => endpoint.connect_lazy()
PeerNetAddress::Http(_) => {
match dns_resolution {
DNSResolution::Gai => endpoint.connect_lazy(),
DNSResolution::Headless => {
// headless dns names need special consideration:
// 1. We need to ensure all ips are used across retries
// 2. The http connector will split the conn timeout between all resolved addresses, so we don't want too many
let mut http = hyper_util::client::legacy::connect::HttpConnector::new_with_resolver(RandomAddressResolver);
http.enforce_http(false);
http.set_nodelay(endpoint.get_tcp_nodelay());
http.set_keepalive(endpoint.get_tcp_keepalive());
http.set_keepalive_interval(endpoint.get_tcp_keepalive_interval());
http.set_keepalive_retries(endpoint.get_tcp_keepalive_retries());
http.set_connect_timeout(endpoint.get_connect_timeout());

endpoint.connect_with_connector_lazy(http)
},
}
}
}
}

Expand Down Expand Up @@ -253,3 +282,66 @@ where
});
}
}

/// RandomAddressResolver is adapted from the default GaiResolver used in hyper_util:
/// https://github.com/hyperium/hyper-util/blob/v0.1.18/src/client/legacy/connect/dns.rs#L44
/// But instead of returning the full list of resolved ips in the order from getaddressinfo,
/// we choose a single random ipv4 and ipv6.
/// This allows us to handle headless initial addresses much better, as even if the dns server returns a random
/// address order, gai tends to reorder based on some proximity heuristics which mean we will never
/// hit all the IPs; resolving 100 times may have the same ip as the first returned every time.
#[derive(Clone)]
struct RandomAddressResolver;

pub struct RandomAddressResolverFuture<R>(JoinHandle<Result<R, io::Error>>);

impl tower::Service<hyper_util::client::legacy::connect::dns::Name> for RandomAddressResolver {
type Response = std::iter::Chain<
std::option::IntoIter<std::net::SocketAddr>,
std::option::IntoIter<std::net::SocketAddr>,
>;
type Error = io::Error;
type Future = RandomAddressResolverFuture<Self::Response>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, name: hyper_util::client::legacy::connect::dns::Name) -> Self::Future {
RandomAddressResolverFuture(tokio::task::spawn_blocking(move || {
use rand::seq::IteratorRandom;

let addrs: Vec<_> =
std::net::ToSocketAddrs::to_socket_addrs(&(name.as_str(), 0))?.collect();

// the http connector cares about whether the first ip is ipv4 or ipv6 for the purposes of happy eyeballs
// ie, if the first ip is v6, it will prefer v6
let first_ipv4 = addrs.first().map(|addr| addr.is_ipv4()).unwrap_or(true);

let ipv4s = addrs.iter().filter(|addr| addr.is_ipv4());
let ipv6s = addrs.iter().filter(|addr| addr.is_ipv6());

let rand = &mut rand::rng();
let random_ipv4 = ipv4s.choose(rand).cloned();
let random_ipv6 = ipv6s.choose(rand).cloned();

if first_ipv4 {
Ok(random_ipv4.into_iter().chain(random_ipv6))
} else {
Ok(random_ipv6.into_iter().chain(random_ipv4))
}
}))
}
}

impl<R> Future for RandomAddressResolverFuture<R> {
type Output = Result<R, io::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx).map(|res| match res {
Ok(Ok(addrs)) => Ok(addrs),
Ok(Err(err)) => Err(err),
Err(join_err) => Err(io::Error::new(io::ErrorKind::Interrupted, join_err)),
})
}
}
7 changes: 6 additions & 1 deletion crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tonic::Code;
use tracing::{error, info, warn};
use typed_builder::TypedBuilder;

use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::{DNSResolution, create_tonic_channel};
use restate_core::protobuf::node_ctl_svc::{
ProvisionClusterRequest as ProtoProvisionClusterRequest, new_node_ctl_client,
};
Expand Down Expand Up @@ -817,6 +817,7 @@ impl StartedNode {
let mut metadata_server_client = new_metadata_server_client(create_tonic_channel(
self.fabric_advertised_address.clone(),
&self.config().networking,
DNSResolution::Gai,
));

let Ok(response) = metadata_server_client
Expand All @@ -841,6 +842,7 @@ impl StartedNode {
let channel = create_tonic_channel(
self.advertised_address().clone(),
&Configuration::default().networking,
DNSResolution::Gai,
);

let request = ProtoProvisionClusterRequest {
Expand Down Expand Up @@ -901,6 +903,7 @@ impl StartedNode {
let mut client = new_metadata_server_client(create_tonic_channel(
self.advertised_address().clone(),
&self.config().networking,
DNSResolution::Gai,
));

client.add_node(()).await?;
Expand All @@ -912,6 +915,7 @@ impl StartedNode {
let mut client = new_metadata_server_client(create_tonic_channel(
self.advertised_address().clone(),
&self.config().networking,
DNSResolution::Gai,
));

client
Expand All @@ -928,6 +932,7 @@ impl StartedNode {
let mut client = new_metadata_server_client(create_tonic_channel(
self.advertised_address().clone(),
&self.config().networking,
DNSResolution::Gai,
));
let response = client.status(()).await?.into_inner();
Ok(response)
Expand Down
18 changes: 14 additions & 4 deletions crates/metadata-providers/src/replicated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tonic::transport::Channel;
use tonic::{Code, Status};
use tracing::{debug, instrument, trace};

use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::{DNSResolution, create_tonic_channel};
use restate_core::{Metadata, TaskCenter, TaskKind, cancellation_watcher};
use restate_metadata_server_grpc::grpc::metadata_server_svc_client::MetadataServerSvcClient;
use restate_metadata_server_grpc::grpc::new_metadata_server_client;
Expand Down Expand Up @@ -370,7 +370,11 @@ impl MetadataStore for GrpcMetadataServerClient {

let mut client = MetadataServerSvcClientWithAddress::new(ChannelWithAddress::new(
advertised_address.clone(),
create_tonic_channel(advertised_address.clone(), &config.networking),
create_tonic_channel(
advertised_address.clone(),
&config.networking,
DNSResolution::Gai,
),
));

let mut buffer = BytesMut::new();
Expand Down Expand Up @@ -476,7 +480,7 @@ impl ChannelManager {
) -> ChannelWithAddress {
let channel = ChannelWithAddress::new(
address.clone(),
create_tonic_channel(address, self.connection_options.deref()),
create_tonic_channel(address, self.connection_options.deref(), DNSResolution::Gai),
);
self.channels
.lock()
Expand Down Expand Up @@ -533,6 +537,7 @@ impl ChannelManager {
create_tonic_channel(
node_config.address.clone(),
self.connection_options.deref(),
DNSResolution::Gai,
),
),
))
Expand Down Expand Up @@ -584,7 +589,12 @@ impl ChannelOrInitialAddress {
match self {
ChannelOrInitialAddress::InitialAddress(address) => ChannelWithAddress::new(
address.clone(),
create_tonic_channel(address, connection_options),
create_tonic_channel(
address,
connection_options,
// initial addresses may be headless, ie they point to multiple nodes
DNSResolution::Headless,
),
),
ChannelOrInitialAddress::Channel(channel) => channel,
}
Expand Down
6 changes: 5 additions & 1 deletion crates/metadata-server/src/raft/network/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ where
"metadata-store-network-connection-attempt",
{
trace!(%target, "Try connecting to metadata store peer");
let channel = net_util::create_tonic_channel(address.clone(), networking_options);
let channel = net_util::create_tonic_channel(
address.clone(),
networking_options,
net_util::DNSResolution::Gai,
);

async move {
let mut network_client = new_metadata_server_network_client(channel);
Expand Down
8 changes: 6 additions & 2 deletions crates/metadata-server/src/raft/server/standby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use rand::prelude::IteratorRandom;
use rand::rng;
use tracing::{Span, debug, instrument, trace};

use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::{DNSResolution, create_tonic_channel};
use restate_core::{Metadata, MetadataWriter};
use restate_metadata_providers::replicated::KnownLeader;
use restate_time_util::DurationExt;
Expand Down Expand Up @@ -257,7 +257,11 @@ impl Standby {
.clone()
};

let channel = create_tonic_channel(address, &Configuration::pinned().networking);
let channel = create_tonic_channel(
address,
&Configuration::pinned().networking,
DNSResolution::Gai,
);

match new_metadata_server_network_client(channel)
.join_cluster(network::grpc_svc::JoinClusterRequest {
Expand Down
3 changes: 2 additions & 1 deletion crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_metadata_store::protobuf::metadata_proxy_svc::{
DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest,
};

use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::{DNSResolution, create_tonic_channel};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::{NodeCtlSvc, NodeCtlSvcServer};
use restate_core::protobuf::node_ctl_svc::{
ClusterHealthResponse, EmbeddedMetadataClusterHealth, GetMetadataRequest, GetMetadataResponse,
Expand Down Expand Up @@ -212,6 +212,7 @@ impl NodeCtlSvc for NodeCtlSvcHandler {
let mut metadata_server_client = new_metadata_server_client(create_tonic_channel(
node_config.address.clone(),
&Configuration::pinned().networking,
DNSResolution::Gai,
));

let response = match metadata_server_client.status(()).await {
Expand Down
3 changes: 2 additions & 1 deletion server/tests/trim_gap_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tonic::transport::Channel;
use tracing::info;
use url::Url;

use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::{DNSResolution, create_tonic_channel};
use restate_core::protobuf::cluster_ctrl_svc::{
ClusterStateRequest, CreatePartitionSnapshotRequest,
cluster_ctrl_svc_client::ClusterCtrlSvcClient, new_cluster_ctrl_client,
Expand Down Expand Up @@ -110,6 +110,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> {
let mut client = new_cluster_ctrl_client(create_tonic_channel(
cluster.nodes[0].advertised_address().clone(),
&NetworkingOptions::default(),
DNSResolution::Gai,
));

info!("Waiting until the partition processor has become the leader");
Expand Down
4 changes: 2 additions & 2 deletions tools/restatectl/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use cling::{Collect, prelude::Parser};
use tonic::transport::Channel;

use restate_cli_util::CliContext;
use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::{DNSResolution, create_tonic_channel};
use restate_types::{
logs::metadata::ProviderConfiguration,
net::address::{AdvertisedAddress, GrpcPort, ListenerPort},
};

pub fn grpc_channel<P: ListenerPort + GrpcPort>(address: AdvertisedAddress<P>) -> Channel {
let ctx = CliContext::get();
create_tonic_channel(address, &ctx.network)
create_tonic_channel(address, &ctx.network, DNSResolution::Gai)
}

pub fn write_default_provider<W: fmt::Write>(
Expand Down
Loading