Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
cbe98dc
Initial asyncio commit
yuce Sep 19, 2025
162fd17
Updates
yuce Sep 19, 2025
9931956
Updates
yuce Sep 19, 2025
856e3df
Merge branch 'master' into asyncio-module
yuce Sep 19, 2025
35384bf
Black
yuce Sep 19, 2025
fdda120
Updates
yuce Sep 19, 2025
fee5b45
Updates
yuce Sep 19, 2025
fc2c38b
Updates
yuce Sep 19, 2025
1772031
Removed docs, include HazelcastClient/Map in public API
yuce Sep 19, 2025
170cf89
Updates
yuce Sep 19, 2025
539c904
Merge branch 'master' into asyncio-module
yuce Sep 22, 2025
22449a8
black
yuce Sep 22, 2025
5406bc6
Ignore incorrect mypy errors
yuce Sep 22, 2025
a417a4a
Updates
yuce Sep 24, 2025
d00c480
Updates
yuce Sep 25, 2025
baa3bc1
Annotate optional params
yuce Sep 29, 2025
ebfc9e2
black
yuce Sep 29, 2025
6928837
Remove update to test util
yuce Sep 30, 2025
3e03cbf
black
yuce Sep 30, 2025
51ced7a
black
yuce Sep 30, 2025
e635b94
update
yuce Sep 30, 2025
4f103f6
Added support for SSL
yuce Sep 30, 2025
042cc58
Added SSL tests
yuce Sep 30, 2025
265a2b4
Added mutual authentication test
yuce Sep 30, 2025
293975d
Added hostname verification tests
yuce Oct 1, 2025
2718478
black
yuce Oct 1, 2025
6a558e8
Merge branch 'master' into asyncio-module-ssl
yuce Oct 2, 2025
a630706
Merge branch 'master' into asyncio-module
yuce Oct 2, 2025
6ced889
Merge branch 'master' into asyncio-module
yuce Oct 20, 2025
c313bfa
Merge branch 'master' into asyncio-module-ssl
yuce Oct 20, 2025
91bf1d1
Addressed review comment
yuce Nov 21, 2025
2128f5e
Removed unnecessary code
yuce Nov 21, 2025
62697e3
Add BETA warning
yuce Nov 21, 2025
a87a5c6
Black
yuce Nov 21, 2025
8d7eede
Merge branch 'asyncio-module' into asyncio-module-ssl
yuce Nov 24, 2025
1ca7fd6
Addressed review comment
yuce Nov 26, 2025
d9acede
updates
yuce Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions hazelcast/internal/asyncio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ async def on_connection_close(self, closed_connection):

if removed:
async with asyncio.TaskGroup() as tg:
# TODO: see on_connection_open
for _, on_connection_closed in self._connection_listeners:
if on_connection_closed:
try:
Expand Down Expand Up @@ -395,13 +394,12 @@ async def _get_or_connect_to_member(self, member):

translated = self._translate_member_address(member)
connection = await self._create_connection(translated)
response = await self._authenticate(connection) # .continue_with(self._on_auth, connection)
response = await self._authenticate(connection)
await self._on_auth(response, connection)
return connection

async def _create_connection(self, address):
factory = self._reactor.connection_factory
return await factory(
return await self._reactor.connection_factory(
self,
self._connection_id_generator.get_and_increment(),
address,
Expand Down Expand Up @@ -473,7 +471,6 @@ async def run():
connecting_uuids.add(member_uuid)
if not self._lifecycle_service.running:
break
# TODO: ERROR:asyncio:Task was destroyed but it is pending!
tg.create_task(self._get_or_connect_to_member(member))
member_uuids.append(member_uuid)

Expand Down Expand Up @@ -706,8 +703,6 @@ async def _handle_successful_auth(self, response, connection):
for on_connection_opened, _ in self._connection_listeners:
if on_connection_opened:
try:
# TODO: creating the task may not throw the exception
# TODO: protect the loop against exceptions, so all handlers run
maybe_coro = on_connection_opened(connection)
if isinstance(maybe_coro, Coroutine):
tg.create_task(maybe_coro)
Expand Down
63 changes: 52 additions & 11 deletions hazelcast/internal/asyncio_reactor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import io
import logging
import ssl
import time
from asyncio import AbstractEventLoop, transports

from hazelcast.config import Config, SSLProtocol
from hazelcast.internal.asyncio_connection import Connection
from hazelcast.core import Address

Expand Down Expand Up @@ -83,25 +85,28 @@ async def create_and_connect(
this = cls(
loop, reactor, connection_manager, connection_id, address, config, message_callback
)
if this._config.ssl_enabled:
await this._create_ssl_connection()
else:
await this._create_connection()
await this._create_connection(config, address)
return this

def _create_protocol(self):
return HazelcastProtocol(self)

async def _create_connection(self):
loop = self._loop
res = await loop.create_connection(
self._create_protocol, host=self._address.host, port=self._address.port
async def _create_connection(self, config, address):
ssl_context = None
if config.ssl_enabled:
ssl_context = self._create_ssl_context(config)
server_hostname = None
if config.ssl_check_hostname:
server_hostname = address.host
res = await self._loop.create_connection(
self._create_protocol,
host=self._address.host,
port=self._address.port,
ssl=ssl_context,
server_hostname=server_hostname,
)
_sock, self._proto = res

async def _create_ssl_connection(self):
raise NotImplementedError

def _write(self, buf):
self._proto.write(buf)

Expand All @@ -120,6 +125,42 @@ def _update_sent(self, sent):
def _update_received(self, received):
self._reactor.update_bytes_received(received)

def _create_ssl_context(self, config: Config):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
protocol = config.ssl_protocol
# Use only the configured protocol
try:
if protocol != SSLProtocol.SSLv2:
ssl_context.options |= ssl.OP_NO_SSLv2
if protocol != SSLProtocol.SSLv3:
ssl_context.options |= ssl.OP_NO_SSLv3
if protocol != SSLProtocol.TLSv1:
ssl_context.options |= ssl.OP_NO_TLSv1
if protocol != SSLProtocol.TLSv1_1:
ssl_context.options |= ssl.OP_NO_TLSv1_1
if protocol != SSLProtocol.TLSv1_2:
ssl_context.options |= ssl.OP_NO_TLSv1_2
if protocol != SSLProtocol.TLSv1_3:
ssl_context.options |= ssl.OP_NO_TLSv1_3
except AttributeError:
pass

ssl_context.verify_mode = ssl.CERT_REQUIRED
if config.ssl_cafile:
ssl_context.load_verify_locations(config.ssl_cafile)
else:
ssl_context.load_default_certs()
if config.ssl_certfile:
ssl_context.load_cert_chain(
config.ssl_certfile, config.ssl_keyfile, config.ssl_password
)
if config.ssl_ciphers:
ssl_context.set_ciphers(config.ssl_ciphers)
if config.ssl_check_hostname:
ssl_context.check_hostname = True

return ssl_context


class HazelcastProtocol(asyncio.BufferedProtocol):

Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import os
import sys
import unittest

import pytest

from hazelcast.asyncio.client import HazelcastClient
from hazelcast.config import SSLProtocol
from hazelcast.errors import IllegalStateError
from tests.integration.asyncio.base import HazelcastTestCase
from tests.util import compare_client_version, get_abs_path

current_directory = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "../../../backward_compatible/ssl_tests/hostname_verification"
)
)

MEMBER_CONFIG = """
<hazelcast xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config
http://www.hazelcast.com/schema/config/hazelcast-config-5.0.xsd">
<network>
<ssl enabled="true">
<factory-class-name>
com.hazelcast.nio.ssl.BasicSSLContextFactory
</factory-class-name>
<properties>
<property name="keyStore">%s</property>
<property name="keyStorePassword">123456</property>
<property name="keyStoreType">PKCS12</property>
<property name="protocol">TLSv1.2</property>
</properties>
</ssl>
</network>
</hazelcast>
"""


@unittest.skipIf(
sys.version_info < (3, 7),
"Hostname verification feature requires Python 3.7+",
)
@unittest.skipIf(
compare_client_version("5.1") < 0,
"Tests the features added in 5.1 version of the client",
)
@pytest.mark.enterprise
class SslHostnameVerificationTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase):
def setUp(self):
self.rc = self.create_rc()
self.cluster = None

async def asyncTearDown(self):
await self.shutdown_all_clients()
self.rc.terminateCluster(self.cluster.id)
self.rc.exit()

async def test_hostname_verification_with_loopback_san(self):
# SAN entry is present with different possible values
file_name = "tls-host-loopback-san"
self.start_member_with(f"{file_name}.p12")
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")
await self.start_client_with(f"{file_name}.pem", "localhost:5701")

async def test_hostname_verification_with_loopback_dns_san(self):
# SAN entry is present, but only with `dns:localhost`
file_name = "tls-host-loopback-san-dns"
self.start_member_with(f"{file_name}.p12")
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")

async def test_hostname_verification_with_different_san(self):
# There is a valid entry, but it does not match with the address of the member.
file_name = "tls-host-not-our-san"
self.start_member_with(f"{file_name}.p12")
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")

async def test_hostname_verification_with_loopback_cn(self):
# No entry in SAN but an entry in CN which checked as a fallback
# when no entry in SAN is present.
file_name = "tls-host-loopback-cn"
self.start_member_with(f"{file_name}.p12")
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
# See https://stackoverflow.com/a/8444863/12394291. IP addresses in CN
# are not supported. So, we don't have a test for it.
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")

async def test_hostname_verification_with_no_entry(self):
# No entry either in the SAN or CN. No way to verify hostname.
file_name = "tls-host-no-entry"
self.start_member_with(f"{file_name}.p12")
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
await self.start_client_with(f"{file_name}.pem", "localhost:5701")
with self.assertRaisesRegex(IllegalStateError, "Unable to connect to any cluster"):
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701")

async def test_hostname_verification_disabled(self):
# When hostname verification is disabled, the scenarious that
# would fail in `test_hostname_verification_with_no_entry` will
# no longer fail, showing that it is working as expected.
file_name = "tls-host-no-entry"
self.start_member_with(f"{file_name}.p12")
await self.start_client_with(f"{file_name}.pem", "localhost:5701", check_hostname=False)
await self.start_client_with(f"{file_name}.pem", "127.0.0.1:5701", check_hostname=False)

async def start_client_with(
self,
truststore_name: str,
member_address: str,
*,
check_hostname=True,
) -> HazelcastClient:
return await self.create_client(
{
"cluster_name": self.cluster.id,
"cluster_members": [member_address],
"ssl_enabled": True,
"ssl_protocol": SSLProtocol.TLSv1_2,
"ssl_cafile": get_abs_path(current_directory, truststore_name),
"ssl_check_hostname": check_hostname,
"cluster_connect_timeout": 0,
}
)

def start_member_with(self, keystore_name: str) -> None:
config = MEMBER_CONFIG % get_abs_path(current_directory, keystore_name)
self.cluster = self.create_cluster(self.rc, config)
self.cluster.start_member()
Loading