Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
cbe98dc
Initial asyncio commit
yuce Sep 19, 2025
162fd17
Updates
yuce Sep 19, 2025
9931956
Updates
yuce Sep 19, 2025
856e3df
Merge branch 'master' into asyncio-module
yuce Sep 19, 2025
35384bf
Black
yuce Sep 19, 2025
fdda120
Updates
yuce Sep 19, 2025
fee5b45
Updates
yuce Sep 19, 2025
fc2c38b
Updates
yuce Sep 19, 2025
1772031
Removed docs, include HazelcastClient/Map in public API
yuce Sep 19, 2025
170cf89
Updates
yuce Sep 19, 2025
539c904
Merge branch 'master' into asyncio-module
yuce Sep 22, 2025
22449a8
black
yuce Sep 22, 2025
5406bc6
Ignore incorrect mypy errors
yuce Sep 22, 2025
a417a4a
Updates
yuce Sep 24, 2025
d00c480
Updates
yuce Sep 25, 2025
baa3bc1
Annotate optional params
yuce Sep 29, 2025
ebfc9e2
black
yuce Sep 29, 2025
6928837
Remove update to test util
yuce Sep 30, 2025
3e03cbf
black
yuce Sep 30, 2025
51ced7a
black
yuce Sep 30, 2025
e635b94
update
yuce Sep 30, 2025
4f103f6
Added support for SSL
yuce Sep 30, 2025
042cc58
Added SSL tests
yuce Sep 30, 2025
265a2b4
Added mutual authentication test
yuce Sep 30, 2025
293975d
Added hostname verification tests
yuce Oct 1, 2025
2718478
black
yuce Oct 1, 2025
58783dc
Ported more integration tests
yuce Oct 1, 2025
3cf9982
Ported hazelcast json value test
yuce Oct 2, 2025
7e97ec7
Merge branch 'master' into asyncio-module-integration-tests1
yuce Oct 2, 2025
6a558e8
Merge branch 'master' into asyncio-module-ssl
yuce Oct 2, 2025
a630706
Merge branch 'master' into asyncio-module
yuce Oct 2, 2025
c1798ea
Ported heart beat test
yuce Oct 2, 2025
e92936a
Ported more tests
yuce Oct 20, 2025
6ced889
Merge branch 'master' into asyncio-module
yuce Oct 20, 2025
c313bfa
Merge branch 'master' into asyncio-module-ssl
yuce Oct 20, 2025
6222c6b
Merge branch 'master' into asyncio-module-integration-tests1
yuce Oct 20, 2025
120a58a
black
yuce Oct 22, 2025
80880b8
Fixed type hints
yuce Oct 30, 2025
6431acc
type hints
yuce Nov 14, 2025
e9a9b5e
Ported more tests
yuce Nov 17, 2025
5334cd1
Added near cache, statistics, statistics tests
yuce Nov 17, 2025
a14290a
Black
yuce Nov 17, 2025
492ccc1
Fixed getting local address
yuce Nov 18, 2025
e8a2600
Fixed getting local address, take 2
yuce Nov 18, 2025
24eb6bf
Added nearcache tests
yuce Nov 18, 2025
6ab9365
Ported missing nearcache test
yuce Nov 18, 2025
91bf1d1
Addressed review comment
yuce Nov 21, 2025
2128f5e
Removed unnecessary code
yuce Nov 21, 2025
62697e3
Add BETA warning
yuce Nov 21, 2025
a87a5c6
Black
yuce Nov 21, 2025
8d7eede
Merge branch 'asyncio-module' into asyncio-module-ssl
yuce Nov 24, 2025
00a2d12
Merge branch 'asyncio-module-ssl' into asyncio-module-integration-tests1
yuce Nov 24, 2025
539466b
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Nov 24, 2025
e673679
Updated test_heartbeat_stopped_and_restored
yuce Nov 25, 2025
ab4a746
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Nov 25, 2025
319bb35
Fixed tests
yuce Nov 25, 2025
8e325ea
Linter
yuce Nov 25, 2025
eed53b3
Test updates
yuce Nov 25, 2025
1ca7fd6
Addressed review comment
yuce Nov 26, 2025
d9acede
updates
yuce Nov 28, 2025
bd23f41
Merge branch 'asyncio-module-ssl' into asyncio-module-integration-tests1
yuce Nov 28, 2025
76759ec
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Nov 28, 2025
2a2d6e8
Merge branch 'master' into asyncio-module-integration-tests1
yuce Dec 1, 2025
6da4226
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Dec 1, 2025
550a006
Merge branch 'master' into asyncio-module-integration-tests2
yuce Dec 4, 2025
2c8f10e
Prevent deadlock
yuce Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hazelcast/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from hazelcast.internal.asyncio_reactor import AsyncioReactor
from hazelcast.serialization import SerializationServiceV1
from hazelcast.sql import SqlService, _InternalSqlService
from hazelcast.statistics import Statistics
from hazelcast.internal.asyncio_statistics import Statistics
from hazelcast.types import KeyType, ValueType, ItemType, MessageType
from hazelcast.util import AtomicInteger, RoundRobinLB

Expand Down Expand Up @@ -176,7 +176,7 @@ async def _start(self):
self._listener_service.start()
await self._invocation_service.add_backup_listener()
self._load_balancer.init(self._cluster_service)
self._statistics.start()
await self._statistics.start()
except Exception:
await self.shutdown()
raise
Expand Down
77 changes: 40 additions & 37 deletions hazelcast/internal/asyncio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,47 +650,50 @@ async def _handle_successful_auth(self, response, connection):
connection.remote_uuid = remote_uuid

existing = self.active_connections.get(remote_uuid, None)
if existing:
await connection.close_connection(
"Duplicate connection to same member with UUID: %s" % remote_uuid, None
)
return existing

new_cluster_id = response["cluster_id"]
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
if changed_cluster:
await self._check_client_state_on_cluster_change(connection)
_logger.warning(
"Switching from current cluster: %s to new cluster: %s",
self._cluster_id,
new_cluster_id,
)
self._on_cluster_restart()

if existing:
await connection.close_connection(
"Duplicate connection to same member with UUID: %s" % remote_uuid, None
)
return existing

new_cluster_id = response["cluster_id"]
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
if changed_cluster:
await self._check_client_state_on_cluster_change(connection)
_logger.warning(
"Switching from current cluster: %s to new cluster: %s",
self._cluster_id,
new_cluster_id,
)
self._on_cluster_restart()

async with self._lock:
is_initial_connection = not self.active_connections
self.active_connections[remote_uuid] = connection
fire_connected_lifecycle_event = False
if is_initial_connection:
self._cluster_id = new_cluster_id
# In split brain, the client might connect to the one half
# of the cluster, and then later might reconnect to the
# other half, after the half it was connected to is
# completely dead. Since the cluster id is preserved in
# split brain scenarios, it is impossible to distinguish
# reconnection to the same cluster vs reconnection to the
# other half of the split brain. However, in the latter,
# we might need to send some state to the other half of
# the split brain (like Compact schemas). That forces us
# to send the client state to the cluster after the first
# cluster connection, regardless the cluster id is
# changed or not.
if self._established_initial_cluster_connection:
self._client_state = ClientState.CONNECTED_TO_CLUSTER
await self._initialize_on_cluster(new_cluster_id)
else:
fire_connected_lifecycle_event = True
self._established_initial_cluster_connection = True
self._client_state = ClientState.INITIALIZED_ON_CLUSTER

if is_initial_connection:
self._cluster_id = new_cluster_id
# In split brain, the client might connect to the one half
# of the cluster, and then later might reconnect to the
# other half, after the half it was connected to is
# completely dead. Since the cluster id is preserved in
# split brain scenarios, it is impossible to distinguish
# reconnection to the same cluster vs reconnection to the
# other half of the split brain. However, in the latter,
# we might need to send some state to the other half of
# the split brain (like Compact schemas). That forces us
# to send the client state to the cluster after the first
# cluster connection, regardless the cluster id is
# changed or not.
if self._established_initial_cluster_connection:
self._client_state = ClientState.CONNECTED_TO_CLUSTER
await self._initialize_on_cluster(new_cluster_id)
else:
fire_connected_lifecycle_event = True
self._established_initial_cluster_connection = True
self._client_state = ClientState.INITIALIZED_ON_CLUSTER

if fire_connected_lifecycle_event:
self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED)
Expand Down
36 changes: 14 additions & 22 deletions hazelcast/internal/asyncio_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,8 @@ async def _register_on_connection(
self, user_registration_id, listener_registration, connection
):
registration_map = listener_registration.connection_registrations

if connection in registration_map:
return

registration_request = listener_registration.registration_request.copy()
invocation = Invocation(
registration_request,
Expand All @@ -187,26 +185,20 @@ async def _register_on_connection(
response_handler=lambda m: m,
urgent=True,
)
self._invocation_service.invoke(invocation)

def callback(f):
try:
response = f.result()
server_registration_id = listener_registration.decode_register_response(response)
correlation_id = registration_request.get_correlation_id()
registration = _EventRegistration(server_registration_id, correlation_id)
registration_map[connection] = registration
except Exception as e:
if connection.live:
_logger.exception(
"Listener %s can not be added to a new connection: %s",
user_registration_id,
connection,
)
raise e

invocation.future.add_done_callback(callback)
return await invocation.future
response = await self._invocation_service.ainvoke(invocation)
try:
server_registration_id = listener_registration.decode_register_response(response)
correlation_id = registration_request.get_correlation_id()
registration = _EventRegistration(server_registration_id, correlation_id)
registration_map[connection] = registration
except Exception as e:
if connection.live:
_logger.exception(
"Listener %s can not be added to a new connection: %s",
user_registration_id,
connection,
)
raise e

async def _connection_added(self, connection):
async with self._registration_lock:
Expand Down
9 changes: 6 additions & 3 deletions hazelcast/internal/asyncio_proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ async def destroy(self) -> bool:
``True`` if this proxy is destroyed successfully, ``False``
otherwise.
"""
self._on_destroy()
return await self._context.proxy_manager.destroy_proxy(self.service_name, self.name)
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
tg.create_task(self._on_destroy())
return await tg.create_task(
Copy link

@gbarnett-hz gbarnett-hz Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use simpler (really need concurrency here?):

await self._on_destroy()
return await self._conftext...

Reads a little odd with the return there -- is that even safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safe, the task group will not exit without all tasks are completed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks odd to me to have the return there, the following seems clearer IMO as it doesn't mix the task group completion semantics with some nested return.

async with asyncio.TaskGroup() as tg:  # type: ignore[attr-defined]
            tg.create_task(self._on_destroy())
            t = await tg.create_task(...)
return t.result()

self._context.proxy_manager.destroy_proxy(self.service_name, self.name)
)

def _on_destroy(self):
async def _on_destroy(self):
pass

def __repr__(self) -> str:
Expand Down
7 changes: 5 additions & 2 deletions hazelcast/internal/asyncio_proxy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

MAP_SERVICE = "hz:impl:mapService"

_proxy_init: typing.Dict[str, typing.Callable[[str, str, typing.Any], Proxy]] = {
_proxy_init: typing.Dict[
str,
typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]],
] = {
MAP_SERVICE: create_map_proxy,
}

Expand All @@ -34,7 +37,7 @@ async def _create_proxy(self, service_name, name, create_on_remote) -> Proxy:
invocation_service = self._context.invocation_service
await invocation_service.ainvoke(invocation)

return _proxy_init[service_name](service_name, name, self._context)
return await _proxy_init[service_name](service_name, name, self._context)

async def destroy_proxy(self, service_name, name, destroy_on_remote=True):
ns = (service_name, name)
Expand Down
174 changes: 172 additions & 2 deletions hazelcast/internal/asyncio_proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
map_set_with_max_idle_codec,
map_remove_interceptor_codec,
map_remove_all_codec,
map_add_near_cache_invalidation_listener_codec,
)
from hazelcast.internal.asyncio_proxy.base import (
Proxy,
Expand Down Expand Up @@ -971,8 +972,177 @@ def handler(message):
return self._invoke_on_key(request, key_data, handler)


def create_map_proxy(service_name, name, context):
class MapFeatNearCache(Map[KeyType, ValueType]):
"""Map proxy implementation featuring Near Cache"""

def __init__(self, service_name, name, context):
super(MapFeatNearCache, self).__init__(service_name, name, context)
self._invalidation_listener_id = None
self._near_cache = context.near_cache_manager.get_or_create_near_cache(name)

async def clear(self):
self._near_cache._clear()
return await super(MapFeatNearCache, self).clear()

async def evict_all(self):
self._near_cache.clear()
return await super(MapFeatNearCache, self).evict_all()

async def load_all(self, keys=None, replace_existing_values=True):
if keys is None and replace_existing_values:
self._near_cache.clear()
return await super(MapFeatNearCache, self).load_all(keys, replace_existing_values)

async def _on_destroy(self):
await self._remove_near_cache_invalidation_listener()
self._near_cache.clear()
await super(MapFeatNearCache, self)._on_destroy()

async def _add_near_cache_invalidation_listener(self):
codec = map_add_near_cache_invalidation_listener_codec
request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart)
self._invalidation_listener_id = await self._register_listener(
request,
lambda r: codec.decode_response(r),
lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id),
lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation),
)

async def _remove_near_cache_invalidation_listener(self):
if self._invalidation_listener_id:
await self.remove_entry_listener(self._invalidation_listener_id)

def _handle_invalidation(self, key, source_uuid, partition_uuid, sequence):
# key is always ``Data``
# null key means near cache has to remove all entries in it.
# see MapAddNearCacheEntryListenerMessageTask.
if key is None:
self._near_cache._clear()
else:
self._invalidate_cache(key)

def _handle_batch_invalidation(self, keys, source_uuids, partition_uuids, sequences):
# key_list is always list of ``Data``
for key_data in keys:
self._invalidate_cache(key_data)

def _invalidate_cache(self, key_data):
self._near_cache._invalidate(key_data)

def _invalidate_cache_batch(self, key_data_list):
for key_data in key_data_list:
self._near_cache._invalidate(key_data)

# internals
async def _contains_key_internal(self, key_data):
try:
return self._near_cache[key_data]
except KeyError:
return await super(MapFeatNearCache, self)._contains_key_internal(key_data)

async def _get_internal(self, key_data):
try:
return self._near_cache[key_data]
except KeyError:
value = await super(MapFeatNearCache, self)._get_internal(key_data)
self._near_cache.__setitem__(key_data, value)
return value

async def _get_all_internal(self, partition_to_keys, tasks=None):
tasks = tasks or []
for key_dic in partition_to_keys.values():
for key in list(key_dic.keys()):
try:
key_data = key_dic[key]
value = self._near_cache[key_data]
future = asyncio.Future()
future.set_result((key, value))
tasks.append(future)
del key_dic[key]
except KeyError:
pass
return await super(MapFeatNearCache, self)._get_all_internal(partition_to_keys, tasks)

def _try_remove_internal(self, key_data, timeout):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._try_remove_internal(key_data, timeout)

def _try_put_internal(self, key_data, value_data, timeout):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._try_put_internal(key_data, value_data, timeout)

def _set_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._set_internal(key_data, value_data, ttl, max_idle)

def _set_ttl_internal(self, key_data, ttl):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._set_ttl_internal(key_data, ttl)

def _replace_internal(self, key_data, value_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._replace_internal(key_data, value_data)

def _replace_if_same_internal(self, key_data, old_value_data, new_value_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._replace_if_same_internal(
key_data, old_value_data, new_value_data
)

def _remove_internal(self, key_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._remove_internal(key_data)

def _remove_all_internal(self, predicate_data):
self._near_cache.clear()
return super(MapFeatNearCache, self)._remove_all_internal(predicate_data)

def _remove_if_same_internal_(self, key_data, value_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._remove_if_same_internal_(key_data, value_data)

def _put_transient_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._put_transient_internal(
key_data, value_data, ttl, max_idle
)

def _put_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._put_internal(key_data, value_data, ttl, max_idle)

def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._put_if_absent_internal(
key_data, value_data, ttl, max_idle
)

def _load_all_internal(self, key_data_list, replace_existing_values):
self._invalidate_cache_batch(key_data_list)
return super(MapFeatNearCache, self)._load_all_internal(
key_data_list, replace_existing_values
)

def _execute_on_key_internal(self, key_data, entry_processor_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._execute_on_key_internal(
key_data, entry_processor_data
)

def _evict_internal(self, key_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._evict_internal(key_data)

def _delete_internal(self, key_data):
self._invalidate_cache(key_data)
return super(MapFeatNearCache, self)._delete_internal(key_data)


async def create_map_proxy(service_name, name, context):
near_cache_config = context.config.near_caches.get(name, None)
if near_cache_config is None:
return Map(service_name, name, context)
raise InvalidConfigurationError("near cache is not supported")
nc = MapFeatNearCache(service_name, name, context)
if nc._near_cache.invalidate_on_change:
await nc._add_near_cache_invalidation_listener()
return nc
9 changes: 8 additions & 1 deletion hazelcast/internal/asyncio_reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ async def _create_connection(self, config, address):
ssl=ssl_context,
server_hostname=server_hostname,
)
_sock, self._proto = res
sock, self._proto = res
if hasattr(sock, "_ssl_protocol"):
sock = sock._ssl_protocol._transport._sock
else:
sock = sock._sock
sockname = sock.getsockname()
host, port = sockname[0], sockname[1]
self.local_address = Address(host, port)

def _write(self, buf):
self._proto.write(buf)
Expand Down
Loading