Skip to content

Commit 5b07f5c

Browse files
authored
[API-2326] Ayncio SSL Support [2/6] (#743)
Diff between PR 1 and this one: https://gist.github.com/yuce/61099ece25c22d6569f6a61a9d3ddd4f Diff between asyncio and current API tests: https://gist.github.com/yuce/4d02eb455fc670e4122a1efec12870a3
1 parent f49d8ab commit 5b07f5c

File tree

7 files changed

+492
-18
lines changed

7 files changed

+492
-18
lines changed

hazelcast/internal/asyncio_connection.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,6 @@ async def on_connection_close(self, closed_connection):
340340

341341
if removed:
342342
async with asyncio.TaskGroup() as tg:
343-
# TODO: see on_connection_open
344343
for _, on_connection_closed in self._connection_listeners:
345344
if on_connection_closed:
346345
try:
@@ -395,13 +394,12 @@ async def _get_or_connect_to_member(self, member):
395394

396395
translated = self._translate_member_address(member)
397396
connection = await self._create_connection(translated)
398-
response = await self._authenticate(connection) # .continue_with(self._on_auth, connection)
397+
response = await self._authenticate(connection)
399398
await self._on_auth(response, connection)
400399
return connection
401400

402401
async def _create_connection(self, address):
403-
factory = self._reactor.connection_factory
404-
return await factory(
402+
return await self._reactor.connection_factory(
405403
self,
406404
self._connection_id_generator.get_and_increment(),
407405
address,
@@ -473,7 +471,6 @@ async def run():
473471
connecting_uuids.add(member_uuid)
474472
if not self._lifecycle_service.running:
475473
break
476-
# TODO: ERROR:asyncio:Task was destroyed but it is pending!
477474
tg.create_task(self._get_or_connect_to_member(member))
478475
member_uuids.append(member_uuid)
479476

@@ -706,8 +703,6 @@ async def _handle_successful_auth(self, response, connection):
706703
for on_connection_opened, _ in self._connection_listeners:
707704
if on_connection_opened:
708705
try:
709-
# TODO: creating the task may not throw the exception
710-
# TODO: protect the loop against exceptions, so all handlers run
711706
maybe_coro = on_connection_opened(connection)
712707
if isinstance(maybe_coro, Coroutine):
713708
tg.create_task(maybe_coro)

hazelcast/internal/asyncio_reactor.py

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import asyncio
22
import io
33
import logging
4+
import ssl
45
import time
56
from asyncio import AbstractEventLoop, transports
67

8+
from hazelcast.config import Config, SSLProtocol
79
from hazelcast.internal.asyncio_connection import Connection
810
from hazelcast.core import Address
911

@@ -83,25 +85,28 @@ async def create_and_connect(
8385
this = cls(
8486
loop, reactor, connection_manager, connection_id, address, config, message_callback
8587
)
86-
if this._config.ssl_enabled:
87-
await this._create_ssl_connection()
88-
else:
89-
await this._create_connection()
88+
await this._create_connection(config, address)
9089
return this
9190

9291
def _create_protocol(self):
9392
return HazelcastProtocol(self)
9493

95-
async def _create_connection(self):
96-
loop = self._loop
97-
res = await loop.create_connection(
98-
self._create_protocol, host=self._address.host, port=self._address.port
94+
async def _create_connection(self, config, address):
95+
ssl_context = None
96+
if config.ssl_enabled:
97+
ssl_context = self._create_ssl_context(config)
98+
server_hostname = None
99+
if config.ssl_check_hostname:
100+
server_hostname = address.host
101+
res = await self._loop.create_connection(
102+
self._create_protocol,
103+
host=self._address.host,
104+
port=self._address.port,
105+
ssl=ssl_context,
106+
server_hostname=server_hostname,
99107
)
100108
_sock, self._proto = res
101109

102-
async def _create_ssl_connection(self):
103-
raise NotImplementedError
104-
105110
def _write(self, buf):
106111
self._proto.write(buf)
107112

@@ -120,6 +125,42 @@ def _update_sent(self, sent):
120125
def _update_received(self, received):
121126
self._reactor.update_bytes_received(received)
122127

128+
def _create_ssl_context(self, config: Config):
129+
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
130+
protocol = config.ssl_protocol
131+
# Use only the configured protocol
132+
try:
133+
if protocol != SSLProtocol.SSLv2:
134+
ssl_context.options |= ssl.OP_NO_SSLv2
135+
if protocol != SSLProtocol.SSLv3:
136+
ssl_context.options |= ssl.OP_NO_SSLv3
137+
if protocol != SSLProtocol.TLSv1:
138+
ssl_context.options |= ssl.OP_NO_TLSv1
139+
if protocol != SSLProtocol.TLSv1_1:
140+
ssl_context.options |= ssl.OP_NO_TLSv1_1
141+
if protocol != SSLProtocol.TLSv1_2:
142+
ssl_context.options |= ssl.OP_NO_TLSv1_2
143+
if protocol != SSLProtocol.TLSv1_3:
144+
ssl_context.options |= ssl.OP_NO_TLSv1_3
145+
except AttributeError:
146+
pass
147+
148+
ssl_context.verify_mode = ssl.CERT_REQUIRED
149+
if config.ssl_cafile:
150+
ssl_context.load_verify_locations(config.ssl_cafile)
151+
else:
152+
ssl_context.load_default_certs()
153+
if config.ssl_certfile:
154+
ssl_context.load_cert_chain(
155+
config.ssl_certfile, config.ssl_keyfile, config.ssl_password
156+
)
157+
if config.ssl_ciphers:
158+
ssl_context.set_ciphers(config.ssl_ciphers)
159+
if config.ssl_check_hostname:
160+
ssl_context.check_hostname = True
161+
162+
return ssl_context
163+
123164

124165
class HazelcastProtocol(asyncio.BufferedProtocol):
125166

tests/integration/asyncio/ssl_tests/__init__.py

Whitespace-only changes.

tests/integration/asyncio/ssl_tests/hostname_verification/__init__.py

Whitespace-only changes.
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import os
2+
import sys
3+
import unittest
4+
5+
import pytest
6+
7+
from hazelcast.asyncio.client import HazelcastClient
8+
from hazelcast.config import SSLProtocol
9+
from hazelcast.errors import IllegalStateError
10+
from tests.integration.asyncio.base import HazelcastTestCase
11+
from tests.util import compare_client_version, get_abs_path
12+
13+
current_directory = os.path.abspath(
14+
os.path.join(
15+
os.path.dirname(__file__), "../../../backward_compatible/ssl_tests/hostname_verification"
16+
)
17+
)
18+
19+
MEMBER_CONFIG = """
20+
<hazelcast xmlns="http://www.hazelcast.com/schema/config"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://www.hazelcast.com/schema/config
23+
http://www.hazelcast.com/schema/config/hazelcast-config-5.0.xsd">
24+
<network>
25+
<ssl enabled="true">
26+
<factory-class-name>
27+
com.hazelcast.nio.ssl.BasicSSLContextFactory
28+
</factory-class-name>
29+
<properties>
30+
<property name="keyStore">%s</property>
31+
<property name="keyStorePassword">123456</property>
32+
<property name="keyStoreType">PKCS12</property>
33+
<property name="protocol">TLSv1.2</property>
34+
</properties>
35+
</ssl>
36+
</network>
37+
</hazelcast>
38+
"""
39+
40+
41+
@unittest.skipIf(
42+
sys.version_info < (3, 7),
43+
"Hostname verification feature requires Python 3.7+",
44+
)
45+
@unittest.skipIf(
46+
compare_client_version("5.1") < 0,
47+
"Tests the features added in 5.1 version of the client",
48+
)
49+
@pytest.mark.enterprise
50+
class SslHostnameVerificationTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase):
51+
def setUp(self):
52+
self.rc = self.create_rc()
53+
self.cluster = None
54+
55+
async def asyncTearDown(self):
56+
await self.shutdown_all_clients()
57+
self.rc.terminateCluster(self.cluster.id)
58+
self.rc.exit()
59+
60+
async def test_hostname_verification_with_loopback_san(self):
61+
# SAN entry is present with different possible values
62+
file_name = "tls-host-loopback-san"
63+
self.start_member_with(f"{file_name}.p12")
64+
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")
65+
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
66+
67+
async def test_hostname_verification_with_loopback_dns_san(self):
68+
# SAN entry is present, but only with `dns:localhost`
69+
file_name = "tls-host-loopback-san-dns"
70+
self.start_member_with(f"{file_name}.p12")
71+
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
72+
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
73+
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")
74+
75+
async def test_hostname_verification_with_different_san(self):
76+
# There is a valid entry, but it does not match with the address of the member.
77+
file_name = "tls-host-not-our-san"
78+
self.start_member_with(f"{file_name}.p12")
79+
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
80+
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
81+
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
82+
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")
83+
84+
async def test_hostname_verification_with_loopback_cn(self):
85+
# No entry in SAN but an entry in CN which checked as a fallback
86+
# when no entry in SAN is present.
87+
file_name = "tls-host-loopback-cn"
88+
self.start_member_with(f"{file_name}.p12")
89+
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
90+
# See https://stackoverflow.com/a/8444863/12394291. IP addresses in CN
91+
# are not supported. So, we don't have a test for it.
92+
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
93+
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")
94+
95+
async def test_hostname_verification_with_no_entry(self):
96+
# No entry either in the SAN or CN. No way to verify hostname.
97+
file_name = "tls-host-no-entry"
98+
self.start_member_with(f"{file_name}.p12")
99+
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
100+
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
101+
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
102+
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")
103+
104+
async def test_hostname_verification_disabled(self):
105+
# When hostname verification is disabled, the scenarious that
106+
# would fail in `test_hostname_verification_with_no_entry` will
107+
# no longer fail, showing that it is working as expected.
108+
file_name = "tls-host-no-entry"
109+
self.start_member_with(f"{file_name}.p12")
110+
await self.start_client_with(f"{file_name}.pem", "localhost:5701", check_hostname=False)
111+
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701", check_hostname=False)
112+
113+
async def start_client_with(
114+
self,
115+
truststore_name: str,
116+
member_address: str,
117+
*,
118+
check_hostname=True,
119+
) -> HazelcastClient:
120+
return await self.create_client(
121+
{
122+
"cluster_name": self.cluster.id,
123+
"cluster_members": [member_address],
124+
"ssl_enabled": True,
125+
"ssl_protocol": SSLProtocol.TLSv1_2,
126+
"ssl_cafile": get_abs_path(current_directory, truststore_name),
127+
"ssl_check_hostname": check_hostname,
128+
"cluster_connect_timeout": 0,
129+
}
130+
)
131+
132+
def start_member_with(self, keystore_name: str) -> None:
133+
config = MEMBER_CONFIG % get_abs_path(current_directory, keystore_name)
134+
self.cluster = self.create_cluster(self.rc, config)
135+
self.cluster.start_member()

0 commit comments

Comments
 (0)