Skip to content

Commit f49d8ab

Browse files
authored
[API-2326] Initial Asyncio Module PR [1/6] (#741)
This is the initial asyncio support. * Added the `asyncio` module which contains public asyncio API * Added the `internal` module and `internal/asyncio_` modules, which contains the private asyncio API/implementation. * Added asyncio support for the Map proxy. Currently near cache, transactions, and lock releated-methods are not supported. The near cache support will come in another PR. Transactions will likely not be supported. Locks may need a different design. * I didn't include the API docs, in order to make the PR smaller. I'll add them in another PR. * The following tests are ported from the old API, and they work without changes (besides making them compatible with async/await syntax): - `tests/integration/asyncio/authentication_tests` - `tests/integration/asyncio/backup_acks_tests` - `tests/integration/asyncio/client_test` (*one test is not ported, due to its Topic DDS dependency*) - `tests/integration/asyncio/proxy/map_test` Most of the code in this PR was duplicated to the `internal` module by prefixing them with `asyncio_`. For example `ROOT/cluster.py` was duplicated/modifed as `internal/asyncio_cluster.py` Here is the diff between modules in this PR vs their counterparts in the old API: https://gist.github.com/yuce/56e79a29a1d4d1d996788381d489c0a4
1 parent 5c7a4d5 commit f49d8ab

File tree

22 files changed

+5697
-0
lines changed

22 files changed

+5697
-0
lines changed

hazelcast/asyncio/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import warnings
2+
3+
warnings.warn("Asyncio API for Hazelcast Python Client is in BETA. DO NOT use it in production.")
4+
del warnings
5+
6+
__all__ = ["HazelcastClient", "Map"]
7+
8+
from hazelcast.asyncio.client import HazelcastClient
9+
from hazelcast.internal.asyncio_proxy.map import Map

hazelcast/asyncio/client.py

Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
import asyncio
2+
import logging
3+
import sys
4+
import typing
5+
6+
from hazelcast.internal.asyncio_cluster import ClusterService, _InternalClusterService
7+
from hazelcast.internal.asyncio_compact import CompactSchemaService
8+
from hazelcast.config import Config
9+
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAddressProvider
10+
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
11+
from hazelcast.cp import CPSubsystem, ProxySessionManager
12+
from hazelcast.discovery import HazelcastCloudAddressProvider
13+
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
14+
from hazelcast.internal.asyncio_invocation import InvocationService, Invocation
15+
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
16+
from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService
17+
from hazelcast.near_cache import NearCacheManager
18+
from hazelcast.partition import PartitionService, _InternalPartitionService
19+
from hazelcast.protocol.codec import (
20+
client_add_distributed_object_listener_codec,
21+
client_get_distributed_objects_codec,
22+
client_remove_distributed_object_listener_codec,
23+
)
24+
from hazelcast.internal.asyncio_proxy.manager import (
25+
MAP_SERVICE,
26+
ProxyManager,
27+
)
28+
from hazelcast.internal.asyncio_proxy.base import Proxy
29+
from hazelcast.internal.asyncio_proxy.map import Map
30+
from hazelcast.internal.asyncio_reactor import AsyncioReactor
31+
from hazelcast.serialization import SerializationServiceV1
32+
from hazelcast.sql import SqlService, _InternalSqlService
33+
from hazelcast.statistics import Statistics
34+
from hazelcast.types import KeyType, ValueType, ItemType, MessageType
35+
from hazelcast.util import AtomicInteger, RoundRobinLB
36+
37+
__all__ = ("HazelcastClient",)
38+
39+
_logger = logging.getLogger(__name__)
40+
41+
42+
class HazelcastClient:
43+
44+
_CLIENT_ID = AtomicInteger()
45+
46+
@classmethod
47+
async def create_and_start(cls, config: Config | None = None, **kwargs) -> "HazelcastClient":
48+
client = HazelcastClient(config, **kwargs)
49+
await client._start()
50+
return client
51+
52+
def __init__(self, config: Config | None = None, **kwargs):
53+
if config:
54+
if kwargs:
55+
raise InvalidConfigurationError(
56+
"Ambiguous client configuration is found. Either provide "
57+
"the config object as the only parameter, or do not "
58+
"pass it and use keyword arguments to configure the "
59+
"client."
60+
)
61+
else:
62+
config = Config.from_dict(kwargs)
63+
64+
self._config = config
65+
self._context = _ClientContext()
66+
client_id = HazelcastClient._CLIENT_ID.get_and_increment()
67+
self._name = self._create_client_name(client_id)
68+
self._reactor = AsyncioReactor()
69+
self._serialization_service = SerializationServiceV1(config)
70+
self._near_cache_manager = NearCacheManager(config, self._serialization_service)
71+
self._internal_lifecycle_service = _InternalLifecycleService(config)
72+
self._lifecycle_service = LifecycleService(self._internal_lifecycle_service)
73+
self._internal_cluster_service = _InternalClusterService(self, config)
74+
self._cluster_service = ClusterService(self._internal_cluster_service)
75+
self._invocation_service = InvocationService(self, config, self._reactor)
76+
self._compact_schema_service = CompactSchemaService(
77+
self._serialization_service.compact_stream_serializer,
78+
self._invocation_service,
79+
self._cluster_service,
80+
self._reactor,
81+
self._config,
82+
)
83+
self._address_provider = self._create_address_provider()
84+
self._internal_partition_service = _InternalPartitionService(self)
85+
self._partition_service = PartitionService(
86+
self._internal_partition_service,
87+
self._serialization_service,
88+
self._compact_schema_service.send_schema_and_retry,
89+
)
90+
self._connection_manager = ConnectionManager(
91+
self,
92+
config,
93+
self._reactor,
94+
self._address_provider,
95+
self._internal_lifecycle_service,
96+
self._internal_partition_service,
97+
self._internal_cluster_service,
98+
self._invocation_service,
99+
self._near_cache_manager,
100+
self._send_state_to_cluster,
101+
)
102+
self._load_balancer = self._init_load_balancer(config)
103+
self._listener_service = ListenerService(
104+
self,
105+
config,
106+
self._connection_manager,
107+
self._invocation_service,
108+
self._compact_schema_service,
109+
)
110+
self._proxy_manager = ProxyManager(self._context)
111+
self._cp_subsystem = CPSubsystem(self._context)
112+
self._proxy_session_manager = ProxySessionManager(self._context)
113+
self._lock_reference_id_generator = AtomicInteger(1)
114+
self._statistics = Statistics(
115+
self,
116+
config,
117+
self._reactor,
118+
self._connection_manager,
119+
self._invocation_service,
120+
self._near_cache_manager,
121+
)
122+
self._cluster_view_listener = ClusterViewListenerService(
123+
self,
124+
self._connection_manager,
125+
self._internal_partition_service,
126+
self._internal_cluster_service,
127+
self._invocation_service,
128+
)
129+
self._shutdown_lock = asyncio.Lock()
130+
self._invocation_service.init(
131+
self._internal_partition_service,
132+
self._connection_manager,
133+
self._listener_service,
134+
self._compact_schema_service,
135+
)
136+
self._internal_sql_service = _InternalSqlService(
137+
self._connection_manager,
138+
self._serialization_service,
139+
self._invocation_service,
140+
self._compact_schema_service.send_schema_and_retry,
141+
)
142+
self._sql_service = SqlService(self._internal_sql_service)
143+
self._init_context()
144+
145+
def _init_context(self):
146+
self._context.init_context(
147+
self,
148+
self._config,
149+
self._invocation_service,
150+
self._internal_partition_service,
151+
self._internal_cluster_service,
152+
self._connection_manager,
153+
self._serialization_service,
154+
self._listener_service,
155+
self._proxy_manager,
156+
self._near_cache_manager,
157+
self._lock_reference_id_generator,
158+
self._name,
159+
self._proxy_session_manager,
160+
self._reactor,
161+
self._compact_schema_service,
162+
)
163+
164+
async def _start(self):
165+
self._reactor.start()
166+
try:
167+
self._internal_lifecycle_service.start()
168+
self._invocation_service.start()
169+
membership_listeners = self._config.membership_listeners
170+
self._internal_cluster_service.start(self._connection_manager, membership_listeners)
171+
self._cluster_view_listener.start()
172+
await self._connection_manager.start(self._load_balancer)
173+
sync_start = not self._config.async_start
174+
if sync_start:
175+
await self._internal_cluster_service.wait_initial_member_list_fetched()
176+
await self._connection_manager.connect_to_all_cluster_members(sync_start)
177+
self._listener_service.start()
178+
await self._invocation_service.add_backup_listener()
179+
self._load_balancer.init(self._cluster_service)
180+
self._statistics.start()
181+
except Exception:
182+
await self.shutdown()
183+
raise
184+
_logger.info("Client started")
185+
186+
async def get_map(self, name: str) -> Map[KeyType, ValueType]:
187+
return await self._proxy_manager.get_or_create(MAP_SERVICE, name)
188+
189+
async def add_distributed_object_listener(
190+
self, listener_func: typing.Callable[[DistributedObjectEvent], None]
191+
) -> str:
192+
is_smart = self._config.smart_routing
193+
codec = client_add_distributed_object_listener_codec
194+
request = codec.encode_request(is_smart)
195+
196+
def handle_distributed_object_event(name, service_name, event_type, source):
197+
event = DistributedObjectEvent(name, service_name, event_type, source)
198+
listener_func(event)
199+
200+
def event_handler(client_message):
201+
return codec.handle(client_message, handle_distributed_object_event)
202+
203+
return await self._listener_service.register_listener(
204+
request,
205+
codec.decode_response,
206+
client_remove_distributed_object_listener_codec.encode_request,
207+
event_handler,
208+
)
209+
210+
async def remove_distributed_object_listener(self, registration_id: str) -> bool:
211+
return await self._listener_service.deregister_listener(registration_id)
212+
213+
async def get_distributed_objects(self) -> typing.List[Proxy]:
214+
request = client_get_distributed_objects_codec.encode_request()
215+
invocation = Invocation(request, response_handler=lambda m: m)
216+
await self._invocation_service.ainvoke(invocation)
217+
218+
local_distributed_object_infos = {
219+
DistributedObjectInfo(dist_obj.service_name, dist_obj.name)
220+
for dist_obj in self._proxy_manager.get_distributed_objects()
221+
}
222+
223+
response = client_get_distributed_objects_codec.decode_response(invocation.future.result())
224+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
225+
for dist_obj_info in response:
226+
local_distributed_object_infos.discard(dist_obj_info)
227+
tg.create_task(
228+
self._proxy_manager.get_or_create(
229+
dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False
230+
)
231+
)
232+
233+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
234+
for dist_obj_info in local_distributed_object_infos:
235+
tg.create_task(
236+
self._proxy_manager.destroy_proxy(
237+
dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False
238+
)
239+
)
240+
241+
return self._proxy_manager.get_distributed_objects()
242+
243+
async def shutdown(self) -> None:
244+
async with self._shutdown_lock:
245+
if self._internal_lifecycle_service.running:
246+
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN)
247+
self._internal_lifecycle_service.shutdown()
248+
self._proxy_session_manager.shutdown().result()
249+
self._near_cache_manager.destroy_near_caches()
250+
await self._connection_manager.shutdown()
251+
self._invocation_service.shutdown()
252+
self._statistics.shutdown()
253+
self._reactor.shutdown()
254+
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTDOWN)
255+
256+
@property
257+
def name(self) -> str:
258+
return self._name
259+
260+
@property
261+
def lifecycle_service(self) -> LifecycleService:
262+
return self._lifecycle_service
263+
264+
@property
265+
def partition_service(self) -> PartitionService:
266+
return self._partition_service
267+
268+
@property
269+
def cluster_service(self) -> ClusterService:
270+
return self._cluster_service
271+
272+
@property
273+
def cp_subsystem(self) -> CPSubsystem:
274+
return self._cp_subsystem
275+
276+
def _create_address_provider(self):
277+
config = self._config
278+
cluster_members = config.cluster_members
279+
address_list_provided = len(cluster_members) > 0
280+
cloud_discovery_token = config.cloud_discovery_token
281+
cloud_enabled = cloud_discovery_token is not None
282+
if address_list_provided and cloud_enabled:
283+
raise IllegalStateError(
284+
"Only one discovery method can be enabled at a time. "
285+
"Cluster members given explicitly: %s, Hazelcast Cloud enabled: %s"
286+
% (address_list_provided, cloud_enabled)
287+
)
288+
289+
if cloud_enabled:
290+
connection_timeout = self._get_connection_timeout(config)
291+
return HazelcastCloudAddressProvider(cloud_discovery_token, connection_timeout)
292+
293+
return DefaultAddressProvider(cluster_members)
294+
295+
def _create_client_name(self, client_id):
296+
client_name = self._config.client_name
297+
if client_name:
298+
return client_name
299+
return "hz.client_%s" % client_id
300+
301+
async def _send_state_to_cluster(self):
302+
return await self._compact_schema_service.send_all_schemas()
303+
304+
@staticmethod
305+
def _get_connection_timeout(config):
306+
timeout = config.connection_timeout
307+
return sys.maxsize if timeout == 0 else timeout
308+
309+
@staticmethod
310+
def _init_load_balancer(config):
311+
load_balancer = config.load_balancer
312+
if not load_balancer:
313+
load_balancer = RoundRobinLB()
314+
return load_balancer
315+
316+
317+
class _ClientContext:
318+
def __init__(self):
319+
self.client = None
320+
self.config = None
321+
self.invocation_service = None
322+
self.partition_service = None
323+
self.cluster_service = None
324+
self.connection_manager = None
325+
self.serialization_service = None
326+
self.listener_service = None
327+
self.proxy_manager = None
328+
self.near_cache_manager = None
329+
self.lock_reference_id_generator = None
330+
self.name = None
331+
self.proxy_session_manager = None
332+
self.reactor = None
333+
self.compact_schema_service = None
334+
335+
def init_context(
336+
self,
337+
client,
338+
config,
339+
invocation_service,
340+
partition_service,
341+
cluster_service,
342+
connection_manager,
343+
serialization_service,
344+
listener_service,
345+
proxy_manager,
346+
near_cache_manager,
347+
lock_reference_id_generator,
348+
name,
349+
proxy_session_manager,
350+
reactor,
351+
compact_schema_service,
352+
):
353+
self.client = client
354+
self.config = config
355+
self.invocation_service = invocation_service
356+
self.partition_service = partition_service
357+
self.cluster_service = cluster_service
358+
self.connection_manager = connection_manager
359+
self.serialization_service = serialization_service
360+
self.listener_service = listener_service
361+
self.proxy_manager = proxy_manager
362+
self.near_cache_manager = near_cache_manager
363+
self.lock_reference_id_generator = lock_reference_id_generator
364+
self.name = name
365+
self.proxy_session_manager = proxy_session_manager
366+
self.reactor = reactor
367+
self.compact_schema_service = compact_schema_service

hazelcast/internal/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)