Skip to content

Commit f165915

Browse files
danielmulvadalesharik
authored andcommitted
rebased PR streamnative#267
1 parent a4012e8 commit f165915

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

src/client.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,25 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
511511
self
512512
}
513513

514+
/// add a certificate and private key to authenticate the client in TLS connections
515+
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
516+
pub fn with_identity(mut self, certificate: Vec<u8>, private_key: Vec<u8>) -> Self {
517+
match &mut self.tls_options {
518+
Some(tls) => {
519+
tls.certificate = Some(certificate);
520+
tls.private_key = Some(private_key);
521+
}
522+
None => {
523+
self.tls_options = Some(TlsOptions {
524+
certificate: Some(certificate),
525+
private_key: Some(private_key),
526+
..Default::default()
527+
})
528+
}
529+
}
530+
self
531+
}
532+
514533
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
515534
pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self {
516535
match &mut self.tls_options {
@@ -560,6 +579,26 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
560579
self
561580
}
562581

582+
/// add a certificate and private key to authenticate the client in TLS connections
583+
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
584+
pub fn with_identity_files<P: AsRef<std::path::Path>>(
585+
self,
586+
certificate_path: P,
587+
private_key_path: P,
588+
) -> Result<Self, std::io::Error> {
589+
use std::io::Read;
590+
591+
let mut file = std::fs::File::open(certificate_path)?;
592+
let mut certificate = vec![];
593+
file.read_to_end(&mut certificate)?;
594+
595+
let mut file = std::fs::File::open(private_key_path)?;
596+
let mut private_key = vec![];
597+
file.read_to_end(&mut private_key)?;
598+
599+
Ok(self.with_identity(certificate, private_key))
600+
}
601+
563602
/// creates the Pulsar client and connects it
564603
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
565604
pub async fn build(self) -> Result<Pulsar<Exe>, Error> {

src/connection.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use futures::{
2020
task::{Context, Poll},
2121
Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
2222
};
23+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
24+
use native_tls::{Certificate, Identity};
2325
use proto::MessageIdData;
2426
use rand::{seq::SliceRandom, thread_rng};
2527
use url::Url;
@@ -781,6 +783,9 @@ impl<Exe: Executor> Connection<Exe> {
781783
auth_data: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
782784
proxy_to_broker_url: Option<String>,
783785
certificate_chain: &[Certificate],
786+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] identity: &Option<
787+
Identity,
788+
>,
784789
allow_insecure_connection: bool,
785790
tls_hostname_verification_enabled: bool,
786791
connection_timeout: Duration,
@@ -840,6 +845,8 @@ impl<Exe: Executor> Connection<Exe> {
840845
auth_data.clone(),
841846
proxy_to_broker_url.clone(),
842847
certificate_chain,
848+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
849+
identity.clone(),
843850
allow_insecure_connection,
844851
tls_hostname_verification_enabled,
845852
executor.clone(),
@@ -918,6 +925,9 @@ impl<Exe: Executor> Connection<Exe> {
918925
auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
919926
proxy_to_broker_url: Option<String>,
920927
certificate_chain: &[Certificate],
928+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] identity: Option<
929+
Identity,
930+
>,
921931
allow_insecure_connection: bool,
922932
tls_hostname_verification_enabled: bool,
923933
executor: Arc<Exe>,
@@ -934,6 +944,9 @@ impl<Exe: Executor> Connection<Exe> {
934944
for certificate in certificate_chain {
935945
builder.add_root_certificate(certificate.clone());
936946
}
947+
if let Some(identity) = identity {
948+
builder.identity(identity);
949+
}
937950
builder.danger_accept_invalid_hostnames(
938951
allow_insecure_connection && !tls_hostname_verification_enabled,
939952
);
@@ -1035,6 +1048,9 @@ impl<Exe: Executor> Connection<Exe> {
10351048
for certificate in certificate_chain {
10361049
connector = connector.add_root_certificate(certificate.clone());
10371050
}
1051+
if let Some(identity) = identity {
1052+
connector = connector.identity(identity);
1053+
}
10381054
connector = connector.danger_accept_invalid_hostnames(
10391055
allow_insecure_connection && !tls_hostname_verification_enabled,
10401056
);

src/connection_manager.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use futures::{channel::oneshot, lock::Mutex};
4+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
5+
use native_tls::{Certificate, Identity};
46
use rand::Rng;
57
use url::Url;
68

@@ -74,6 +76,12 @@ pub struct TlsOptions {
7476
/// contains a list of PEM encoded certificates
7577
pub certificate_chain: Option<Vec<u8>>,
7678

79+
/// PEM encoded X509 certificates
80+
pub certificate: Option<Vec<u8>>,
81+
82+
/// is a PEM encoded PKCS #8 formatted private key for the leaf certificate
83+
pub private_key: Option<Vec<u8>>,
84+
7785
/// allow insecure TLS connection if set to true
7886
///
7987
/// defaults to *false*
@@ -90,6 +98,8 @@ impl Default for TlsOptions {
9098
fn default() -> Self {
9199
Self {
92100
certificate_chain: None,
101+
certificate: None,
102+
private_key: None,
93103
allow_insecure_connection: false,
94104
tls_hostname_verification_enabled: true,
95105
}
@@ -116,6 +126,8 @@ pub struct ConnectionManager<Exe: Executor> {
116126
pub(crate) operation_retry_options: OperationRetryOptions,
117127
tls_options: TlsOptions,
118128
certificate_chain: Vec<Certificate>,
129+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
130+
identity: Option<Identity>,
119131
outbound_channel_size: usize,
120132
}
121133

@@ -172,6 +184,17 @@ impl<Exe: Executor> ConnectionManager<Exe> {
172184
}
173185
};
174186

187+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
188+
let identity = match (
189+
tls_options.certificate.as_ref(),
190+
tls_options.private_key.as_ref(),
191+
) {
192+
(None, _) | (_, None) => None,
193+
(Some(certificate), Some(privatekey)) => {
194+
Some(native_tls::Identity::from_pkcs8(&certificate, &privatekey)?)
195+
}
196+
};
197+
175198
if let Some(auth) = auth.clone() {
176199
auth.lock().await.initialize().await?;
177200
}
@@ -185,6 +208,8 @@ impl<Exe: Executor> ConnectionManager<Exe> {
185208
operation_retry_options,
186209
tls_options,
187210
certificate_chain,
211+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
212+
identity,
188213
outbound_channel_size,
189214
};
190215
let broker_address = BrokerAddress {
@@ -303,6 +328,8 @@ impl<Exe: Executor> ConnectionManager<Exe> {
303328
self.auth.clone(),
304329
proxy_url.clone(),
305330
&self.certificate_chain,
331+
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
332+
&self.identity,
306333
self.tls_options.allow_insecure_connection,
307334
self.tls_options.tls_hostname_verification_enabled,
308335
self.connection_retry_options.connection_timeout,

0 commit comments

Comments
 (0)