Skip to content

Commit 41d65f9

Browse files
authored
Asyncio Module Cloud Support (#754)
Improved the stability of the asyncio client in this PR. * Added support for non-blocking Hazelcast Cloud address resolution * Ported `connection_manager_test.py` to asyncio * Refactored `AsyncioConnection` to be more reliable by porting more logic from `AsyncoreConnection`. * Run all VectorCollection tests for 5.6.0
1 parent 80119dd commit 41d65f9

File tree

14 files changed

+511
-78
lines changed

14 files changed

+511
-78
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,8 @@ test:
1010
test-enterprise:
1111
pytest --verbose
1212

13+
test-asyncio:
14+
pytest --verbose -k asyncio
15+
1316
test-cover:
1417
pytest --cov=hazelcast --cov-report=xml
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import asyncio
2+
3+
from hazelcast.asyncio import HazelcastClient
4+
5+
6+
async def amain():
7+
client = await HazelcastClient.create_and_start(
8+
# Set up cluster name for authentication
9+
cluster_name="asyncio",
10+
# Set the token of your cloud cluster
11+
cloud_discovery_token="wE1w1USF6zOnaLVjLZwbZHxEoZJhw43yyViTbe6UBTvz4tZniA",
12+
ssl_enabled=True,
13+
ssl_cafile="/path/to/ca.pem",
14+
ssl_certfile="/path/to/cert.pem",
15+
ssl_keyfile="/path/to/key.pem",
16+
ssl_password="05dd4498c3f",
17+
)
18+
my_map = await client.get_map("map-on-the-cloud")
19+
await my_map.put("key", "value")
20+
21+
value = await my_map.get("key")
22+
print(value)
23+
24+
await client.shutdown()
25+
26+
27+
if __name__ == "__main__":
28+
asyncio.run(amain())

examples/cloud-discovery/hazelcast_cloud_discovery_example.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
cluster_name="YOUR_CLUSTER_NAME",
66
# Set the token of your cloud cluster
77
cloud_discovery_token="YOUR_CLUSTER_DISCOVERY_TOKEN",
8-
# If you have enabled encryption for your cluster, also configure TLS/SSL for the client.
9-
# Otherwise, skip options below.
108
ssl_enabled=True,
119
ssl_cafile="/path/to/ca.pem",
1210
ssl_certfile="/path/to/cert.pem",

hazelcast/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "6.0.0"
1+
__version__ = "5.6.0"
22

33
# Set the default handler to "hazelcast" loggers
44
# to avoid "No handlers could be found" warnings.

hazelcast/asyncio/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from hazelcast.internal.asyncio_cluster import ClusterService, _InternalClusterService
77
from hazelcast.internal.asyncio_compact import CompactSchemaService
88
from hazelcast.config import Config, IndexConfig
9-
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAddressProvider
9+
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAsyncioAddressProvider
1010
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
1111
from hazelcast.discovery import HazelcastCloudAddressProvider
1212
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
@@ -313,7 +313,7 @@ def _create_address_provider(self):
313313
connection_timeout = self._get_connection_timeout(config)
314314
return HazelcastCloudAddressProvider(cloud_discovery_token, connection_timeout)
315315

316-
return DefaultAddressProvider(cluster_members)
316+
return DefaultAsyncioAddressProvider(cluster_members)
317317

318318
def _create_client_name(self, client_id):
319319
client_name = self._config.client_name

hazelcast/internal/asyncio_connection.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def __init__(
183183
self._cluster_id = None
184184
self._load_balancer = None
185185
self._use_public_ip = (
186-
isinstance(address_provider, DefaultAddressProvider) and config.use_public_ip
186+
isinstance(address_provider, DefaultAsyncioAddressProvider) and config.use_public_ip
187187
)
188188
# asyncio tasks are weakly referenced
189189
# storing tasks here in order not to lose them midway
@@ -385,9 +385,10 @@ async def _get_or_connect_to_address(self, address):
385385
for connection in list(self.active_connections.values()):
386386
if connection.remote_address == address:
387387
return connection
388-
translated = self._translate(address)
389-
connection = await self._create_connection(translated)
390-
response = await self._authenticate(connection)
388+
translated = await self._translate(address)
389+
connection = self._create_connection(translated)
390+
await connection._create_task
391+
response = self._authenticate(connection)
391392
await self._on_auth(response, connection)
392393
return connection
393394

@@ -396,23 +397,24 @@ async def _get_or_connect_to_member(self, member):
396397
if connection:
397398
return connection
398399

399-
translated = self._translate_member_address(member)
400-
connection = await self._create_connection(translated)
401-
response = await self._authenticate(connection)
400+
translated = await self._translate_member_address(member)
401+
connection = self._create_connection(translated)
402+
await connection._create_task
403+
response = self._authenticate(connection)
402404
await self._on_auth(response, connection)
403405
return connection
404406

405-
async def _create_connection(self, address):
406-
return await self._reactor.connection_factory(
407+
def _create_connection(self, address):
408+
return self._reactor.connection_factory(
407409
self,
408410
self._connection_id_generator.get_and_increment(),
409411
address,
410412
self._config,
411413
self._invocation_service.handle_client_message,
412414
)
413415

414-
def _translate(self, address):
415-
translated = self._address_provider.translate(address)
416+
async def _translate(self, address):
417+
translated = await self._address_provider.translate(address)
416418
if not translated:
417419
raise ValueError(
418420
"Address provider %s could not translate address %s"
@@ -421,15 +423,15 @@ def _translate(self, address):
421423

422424
return translated
423425

424-
def _translate_member_address(self, member):
426+
async def _translate_member_address(self, member):
425427
if self._use_public_ip:
426428
public_address = member.address_map.get(_CLIENT_PUBLIC_ENDPOINT_QUALIFIER, None)
427429
if public_address:
428430
return public_address
429431

430432
return member.address
431433

432-
return self._translate(member.address)
434+
return await self._translate(member.address)
433435

434436
async def _trigger_cluster_reconnection(self):
435437
if self._reconnect_mode == ReconnectMode.OFF:
@@ -529,7 +531,8 @@ async def _sync_connect_to_cluster(self):
529531
if connection:
530532
return
531533

532-
for address in self._get_possible_addresses():
534+
addresses = await self._get_possible_addresses()
535+
for address in addresses:
533536
self._check_client_active()
534537
if address in tried_addresses_per_attempt:
535538
# We already tried this address on from the member list
@@ -614,6 +617,7 @@ def _authenticate(self, connection) -> asyncio.Future:
614617

615618
async def _on_auth(self, response, connection):
616619
try:
620+
response = await response
617621
response = client_authentication_codec.decode_response(response)
618622
except Exception as e:
619623
await connection.close_connection("Failed to authenticate connection", e)
@@ -790,8 +794,8 @@ def _check_client_active(self):
790794
if not self._lifecycle_service.running:
791795
raise HazelcastClientNotActiveError()
792796

793-
def _get_possible_addresses(self):
794-
primaries, secondaries = self._address_provider.load_addresses()
797+
async def _get_possible_addresses(self):
798+
primaries, secondaries = await self._address_provider.load_addresses()
795799
if self._shuffle_member_list:
796800
# The relative order between primary and secondary addresses should
797801
# not be changed. So we shuffle the lists separately and then add
@@ -1028,17 +1032,13 @@ def __hash__(self):
10281032
return self._id
10291033

10301034

1031-
class DefaultAddressProvider:
1032-
"""Provides initial addresses for client to find and connect to a node.
1033-
1034-
It also provides a no-op translator.
1035-
"""
1036-
1035+
class DefaultAsyncioAddressProvider:
10371036
def __init__(self, addresses):
10381037
self._addresses = addresses
10391038

1040-
def load_addresses(self):
1039+
async def load_addresses(self):
10411040
"""Returns the possible primary and secondary member addresses to connect to."""
1041+
# NOTE: This method is marked with async since the caller assumes that.
10421042
configured_addresses = self._addresses
10431043

10441044
if not configured_addresses:
@@ -1053,9 +1053,10 @@ def load_addresses(self):
10531053

10541054
return primaries, secondaries
10551055

1056-
def translate(self, address):
1056+
async def translate(self, address):
10571057
"""No-op address translator.
10581058
10591059
It is there to provide the same API with other address providers.
10601060
"""
1061+
# NOTE: This method is marked with async since the caller assumes that.
10611062
return address
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import asyncio
2+
import logging
3+
4+
from hazelcast.discovery import HazelcastCloudDiscovery
5+
6+
_logger = logging.getLogger(__name__)
7+
8+
9+
class HazelcastCloudAddressProvider:
10+
"""Provides initial addresses for client to find and connect to a node
11+
and resolves private IP addresses of Hazelcast Cloud service.
12+
"""
13+
14+
def __init__(self, token, connection_timeout):
15+
self.cloud_discovery = HazelcastCloudDiscovery(token, connection_timeout)
16+
self._private_to_public = dict()
17+
18+
async def load_addresses(self):
19+
"""Loads member addresses from Hazelcast Cloud endpoint.
20+
21+
Returns:
22+
tuple[list[hazelcast.core.Address], list[hazelcast.core.Address]]: The possible member addresses
23+
as primary addresses to connect to.
24+
"""
25+
try:
26+
nodes = await asyncio.to_thread(self.cloud_discovery.discover_nodes)
27+
# Every private address is primary
28+
return list(nodes.keys()), []
29+
except Exception as e:
30+
_logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e)
31+
return [], []
32+
33+
async def translate(self, address):
34+
"""Translates the given address to another address specific to network or service.
35+
36+
Args:
37+
address (hazelcast.core.Address): Private address to be translated
38+
39+
Returns:
40+
hazelcast.core.Address: New address if given address is known, otherwise returns None
41+
"""
42+
if address is None:
43+
return None
44+
45+
public_address = self._private_to_public.get(address, None)
46+
if public_address:
47+
return public_address
48+
49+
await self.refresh()
50+
51+
return self._private_to_public.get(address, None)
52+
53+
async def refresh(self):
54+
"""Refreshes the internal lookup table if necessary."""
55+
try:
56+
self._private_to_public = self.cloud_discovery.discover_nodes()
57+
except Exception as e:
58+
_logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e)

0 commit comments

Comments
 (0)