diff --git a/Makefile b/Makefile index d440d03503..e1032e5525 100644 --- a/Makefile +++ b/Makefile @@ -5,10 +5,10 @@ check: black --check --config black.toml . test: - pytest -m "not enterprise" + pytest --verbose -m "not enterprise" test-enterprise: - pytest + pytest --verbose test-cover: pytest --cov=hazelcast --cov-report=xml diff --git a/hazelcast/asyncio/client.py b/hazelcast/asyncio/client.py index 0f6db252fe..08e3a7aeb4 100644 --- a/hazelcast/asyncio/client.py +++ b/hazelcast/asyncio/client.py @@ -162,7 +162,6 @@ def _init_context(self): ) async def _start(self): - self._reactor.start() try: self._internal_lifecycle_service.start() self._invocation_service.start() @@ -250,7 +249,6 @@ async def shutdown(self) -> None: await self._connection_manager.shutdown() self._invocation_service.shutdown() self._statistics.shutdown() - self._reactor.shutdown() self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTDOWN) @property diff --git a/hazelcast/internal/asyncio_connection.py b/hazelcast/internal/asyncio_connection.py index f747a53110..562f19afeb 100644 --- a/hazelcast/internal/asyncio_connection.py +++ b/hazelcast/internal/asyncio_connection.py @@ -185,6 +185,10 @@ def __init__( self._use_public_ip = ( isinstance(address_provider, DefaultAddressProvider) and config.use_public_ip ) + # asyncio tasks are weakly referenced + # storing tasks here in order not to lose them midway + # see: https: // docs.python.org / 3 / library / asyncio - task.html # creating-tasks + self._tasks = set() def add_listener(self, on_connection_opened=None, on_connection_closed=None): """Registers a ConnectionListener. @@ -809,6 +813,10 @@ def __init__(self, connection_manager, client, config, reactor, invocation_servi self._heartbeat_timeout = config.heartbeat_timeout self._heartbeat_interval = config.heartbeat_interval self._heartbeat_task: asyncio.Task | None = None + # asyncio tasks are weakly referenced + # storing tasks here in order not to lose them midway + # see: https: // docs.python.org / 3 / library / asyncio - task.html # creating-tasks + self._tasks = set() def start(self): """Starts sending periodic HeartBeat operations.""" @@ -848,7 +856,9 @@ async def _check_connection(self, now, connection): if (now - connection.last_write_time) > self._heartbeat_interval: request = client_ping_codec.encode_request() invocation = Invocation(request, connection=connection, urgent=True) - asyncio.create_task(self._invocation_service.ainvoke(invocation)) + task = asyncio.create_task(self._invocation_service.ainvoke(invocation)) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) _frame_header = struct.Struct(" + hot-restart-test + + %s + + + %s + + """ % ( + port, + self.tmp_dir, + ) + + +_SERVER_PORT = 5701 +_CLIENT_PORT = 5702 +_SERVER_WITH_CLIENT_ENDPOINT = """ + + + + %s + + + %s + + + +""" % ( + _SERVER_PORT, + _CLIENT_PORT, +) + + +@unittest.skipIf( + compare_client_version("5.0") < 0, "Tests the features added in 5.0 version of the client" +) +class AdvancedNetworkConfigTest(SingleMemberTestCase): + @classmethod + def configure_cluster(cls): + return _SERVER_WITH_CLIENT_ENDPOINT + + @classmethod + def configure_client(cls, config): + config["cluster_members"] = ["localhost:%s" % _CLIENT_PORT] + config["cluster_name"] = cls.cluster.id + return config + + def test_member_list(self): + members = self.client.cluster_service.get_members() + self.assertEqual(1, len(members)) + member = members[0] + # Make sure member address is assigned to client endpoint port + self.assertEqual(_CLIENT_PORT, member.address.port) + + # Make sure there are mappings for CLIENT and MEMBER endpoints + self.assertEqual(2, len(member.address_map)) + self.assertEqual( + _SERVER_PORT, member.address_map.get(EndpointQualifier(ProtocolType.MEMBER, None)).port + ) + self.assertEqual( + _CLIENT_PORT, + member.address_map.get(EndpointQualifier(ProtocolType.CLIENT, None)).port, + ) diff --git a/tests/integration/asyncio/connection_strategy_test.py b/tests/integration/asyncio/connection_strategy_test.py new file mode 100644 index 0000000000..ccd51d267d --- /dev/null +++ b/tests/integration/asyncio/connection_strategy_test.py @@ -0,0 +1,101 @@ +import unittest + +from hazelcast.asyncio import HazelcastClient +from hazelcast.config import ReconnectMode +from hazelcast.errors import ClientOfflineError, HazelcastClientNotActiveError +from hazelcast.lifecycle import LifecycleState +from tests.integration.asyncio.base import HazelcastTestCase +from tests.util import random_string + + +class ConnectionStrategyTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + def setUp(self): + self.client = None + self.cluster = None + + async def asyncTearDown(self): + if self.client: + await self.client.shutdown() + self.client = None + if self.cluster: + self.rc.terminateCluster(self.cluster.id) + self.cluster = None + + async def test_off_reconnect_mode(self): + self.cluster = self.rc.createCluster(None, None) + member = self.rc.startMember(self.cluster.id) + + def collector(): + events = [] + + def on_state_change(event): + if event == LifecycleState.SHUTDOWN: + events.append(event) + + on_state_change.events = events + return on_state_change + + event_collector = collector() + + self.client = await HazelcastClient.create_and_start( + cluster_members=["localhost:5701"], + cluster_name=self.cluster.id, + reconnect_mode=ReconnectMode.OFF, + lifecycle_listeners=[event_collector], + ) + m = await self.client.get_map(random_string()) + # no exception at this point + await m.put(1, 1) + self.rc.shutdownMember(self.cluster.id, member.uuid) + await self.assertTrueEventually(lambda: self.assertEqual(1, len(event_collector.events))) + with self.assertRaises(HazelcastClientNotActiveError): + await m.put(1, 1) + + async def test_async_reconnect_mode(self): + import logging + + logging.basicConfig(level=logging.DEBUG) + self.cluster = self.rc.createCluster(None, None) + member = self.rc.startMember(self.cluster.id) + + def collector(event_type): + events = [] + + def on_state_change(event): + if event == event_type: + events.append(event) + + on_state_change.events = events + return on_state_change + + disconnected_collector = collector(LifecycleState.DISCONNECTED) + self.client = await HazelcastClient.create_and_start( + cluster_members=["localhost:5701"], + cluster_name=self.cluster.id, + reconnect_mode=ReconnectMode.ASYNC, + lifecycle_listeners=[disconnected_collector], + ) + m = await self.client.get_map(random_string()) + # no exception at this point + await m.put(1, 1) + self.rc.shutdownMember(self.cluster.id, member.uuid) + await self.assertTrueEventually( + lambda: self.assertEqual(1, len(disconnected_collector.events)) + ) + with self.assertRaises(ClientOfflineError): + await m.put(1, 1) + connected_collector = collector(LifecycleState.CONNECTED) + self.client.lifecycle_service.add_listener(connected_collector) + self.rc.startMember(self.cluster.id) + await self.assertTrueEventually( + lambda: self.assertEqual(1, len(connected_collector.events)) + ) + await m.put(1, 1) diff --git a/tests/integration/asyncio/hazelcast_json_value_test.py b/tests/integration/asyncio/hazelcast_json_value_test.py new file mode 100644 index 0000000000..2193d4d2b3 --- /dev/null +++ b/tests/integration/asyncio/hazelcast_json_value_test.py @@ -0,0 +1,74 @@ +from hazelcast.core import HazelcastJsonValue +from hazelcast.predicate import greater, equal +from tests.integration.asyncio.base import SingleMemberTestCase + + +class HazelcastJsonValueWithMapTest(SingleMemberTestCase): + @classmethod + def setUpClass(cls): + super(HazelcastJsonValueWithMapTest, cls).setUpClass() + cls.json_str = '{"key": "value"}' + cls.json_obj = {"key": "value"} + + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.map = await self.client.get_map("json-test") + + async def asyncTearDown(self): + await self.map.destroy() + + async def test_storing_hazelcast_json_value_as_key(self): + json_value = HazelcastJsonValue(self.json_str) + await self.map.put(json_value, 0) + self.assertEqual(0, await self.map.get(json_value)) + + async def test_storing_hazelcast_json_value_as_value(self): + json_value = HazelcastJsonValue(self.json_str) + await self.map.put(0, json_value) + self.assertEqual(json_value.to_string(), (await self.map.get(0)).to_string()) + + async def test_storing_hazelcast_json_value_with_invalid_str(self): + json_value = HazelcastJsonValue('{"a') + await self.map.put(0, json_value) + self.assertEqual(json_value.to_string(), (await self.map.get(0)).to_string()) + + async def test_querying_over_keys_with_hazelcast_json_value(self): + json_value = HazelcastJsonValue({"a": 1}) + json_value2 = HazelcastJsonValue({"a": 3}) + await self.map.put(json_value, 1) + await self.map.put(json_value2, 2) + results = await self.map.key_set(greater("__key.a", 2)) + self.assertEqual(1, len(results)) + self.assertEqual(json_value2.to_string(), results[0].to_string()) + + async def test_querying_nested_attr_over_keys_with_hazelcast_json_value(self): + json_value = HazelcastJsonValue({"a": 1, "b": {"c": "d"}}) + json_value2 = HazelcastJsonValue({"a": 2, "b": {"c": "e"}}) + await self.map.put(json_value, 1) + await self.map.put(json_value2, 2) + results = await self.map.key_set(equal("__key.b.c", "d")) + self.assertEqual(1, len(results)) + self.assertEqual(json_value.to_string(), results[0].to_string()) + + async def test_querying_over_values_with_hazelcast_json_value(self): + json_value = HazelcastJsonValue({"a": 1}) + json_value2 = HazelcastJsonValue({"a": 3}) + await self.map.put(1, json_value) + await self.map.put(2, json_value2) + results = await self.map.values(greater("a", 2)) + self.assertEqual(1, len(results)) + self.assertEqual(json_value2.to_string(), results[0].to_string()) + + async def test_querying_nested_attr_over_values_with_hazelcast_json_value(self): + json_value = HazelcastJsonValue({"a": 1, "b": {"c": "d"}}) + json_value2 = HazelcastJsonValue({"a": 2, "b": {"c": "e"}}) + await self.map.put(1, json_value) + await self.map.put(2, json_value2) + results = await self.map.values(equal("b.c", "d")) + self.assertEqual(1, len(results)) + self.assertEqual(json_value.to_string(), results[0].to_string()) diff --git a/tests/integration/asyncio/heartbeat_test.py b/tests/integration/asyncio/heartbeat_test.py new file mode 100644 index 0000000000..5fbdd72b3f --- /dev/null +++ b/tests/integration/asyncio/heartbeat_test.py @@ -0,0 +1,97 @@ +import asyncio +import threading +import unittest + +from hazelcast.asyncio import HazelcastClient +from hazelcast.core import Address +from tests.integration.asyncio.base import HazelcastTestCase +from tests.integration.asyncio.util import open_connection_to_address, wait_for_partition_table + + +class HeartbeatTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + rc = None + + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + async def asyncSetUp(self): + self.cluster = self.create_cluster(self.rc) + self.member = self.rc.startMember(self.cluster.id) + self.client = await HazelcastClient.create_and_start( + cluster_name=self.cluster.id, + heartbeat_interval=0.5, + heartbeat_timeout=2, + ) + + async def asyncTearDown(self): + await self.client.shutdown() + self.rc.shutdownCluster(self.cluster.id) + + async def test_heartbeat_stopped_and_restored(self): + member2 = await asyncio.to_thread(self.rc.startMember, self.cluster.id) + addr = Address(member2.host, member2.port) + await wait_for_partition_table(self.client) + await open_connection_to_address(self.client, member2.uuid) + + def connection_collector(): + connections = [] + + def collector(c, *_): + connections.append(c) + + collector.connections = connections + return collector + + connection_added_collector = connection_collector() + connection_removed_collector = connection_collector() + self.client._connection_manager.add_listener( + connection_added_collector, connection_removed_collector + ) + assertion_succeeded = False + + async def run(): + nonlocal assertion_succeeded + # It is possible for client to override the set last_read_time + # of the connection, in case of the periodically sent heartbeat + # requests getting responses, right after we try to set a new + # value to it, before the next iteration of the heartbeat manager. + # In this case, the connection won't be closed, and the test would + # fail. To avoid it, we will try multiple times. + for i in range(10): + if assertion_succeeded: + # We have successfully simulated heartbeat loss + return + + for connection in list(self.client._connection_manager.active_connections.values()): + if connection.remote_address == addr: + connection.last_read_time -= 2 + break + + await asyncio.sleep((i + 1) * 0.1) + + asyncio.create_task(run()) + + async def assert_heartbeat_stopped_and_restored(): + nonlocal assertion_succeeded + self.assertGreaterEqual(len(connection_added_collector.connections), 1) + self.assertGreaterEqual(len(connection_removed_collector.connections), 1) + + stopped_connection = connection_removed_collector.connections[0] + restored_connection = connection_added_collector.connections[0] + + self.assertEqual( + stopped_connection.connected_address, + Address(member2.host, member2.port), + ) + self.assertEqual( + restored_connection.connected_address, + Address(member2.host, member2.port), + ) + assertion_succeeded = True + + await self.assertTrueEventually(assert_heartbeat_stopped_and_restored) diff --git a/tests/integration/asyncio/invocation_test.py b/tests/integration/asyncio/invocation_test.py new file mode 100644 index 0000000000..97c95b96fb --- /dev/null +++ b/tests/integration/asyncio/invocation_test.py @@ -0,0 +1,64 @@ +import asyncio +import time +import unittest + +from mock import MagicMock + +from hazelcast.asyncio import HazelcastClient +from hazelcast.errors import OperationTimeoutError +from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.protocol.client_message import OutboundMessage +from hazelcast.serialization import LE_INT +from tests.integration.asyncio.base import HazelcastTestCase + + +class InvocationTimeoutTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, None) + cls.member = cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.terminateCluster(cls.cluster.id) + cls.rc.exit() + + async def asyncSetUp(self): + self.client = await HazelcastClient.create_and_start( + cluster_name=self.cluster.id, invocation_timeout=1 + ) + + async def asyncTearDown(self): + await self.client.shutdown() + + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + config["invocation_timeout"] = 1 + return config + + async def test_invocation_timeout(self): + request = OutboundMessage(bytearray(22), True) + invocation_service = self.client._invocation_service + invocation = Invocation(request, partition_id=1) + + def mock(*_): + time.sleep(2) + return False + + invocation_service._invoke_on_partition_owner = MagicMock(side_effect=mock) + invocation_service._invoke_on_random_connection = MagicMock(return_value=False) + invocation_service.invoke(invocation) + with self.assertRaises(OperationTimeoutError): + await invocation.future + + async def test_invocation_not_timed_out_when_there_is_no_exception(self): + buf = bytearray(22) + LE_INT.pack_into(buf, 0, 22) + request = OutboundMessage(buf, True) + invocation_service = self.client._invocation_service + invocation = Invocation(request) + invocation_service.invoke(invocation) + await asyncio.sleep(2) + self.assertFalse(invocation.future.done()) + self.assertEqual(1, len(invocation_service._pending)) diff --git a/tests/integration/asyncio/lifecycle_test.py b/tests/integration/asyncio/lifecycle_test.py new file mode 100644 index 0000000000..4efa014053 --- /dev/null +++ b/tests/integration/asyncio/lifecycle_test.py @@ -0,0 +1,101 @@ +import unittest + +from hazelcast.lifecycle import LifecycleState +from tests.integration.asyncio.base import HazelcastTestCase +from tests.util import event_collector + + +class LifecycleTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + rc = None + + def setUp(self): + self.rc = self.create_rc() + self.cluster = self.create_cluster(self.rc) + + async def asyncTearDown(self): + await self.shutdown_all_clients() + self.rc.exit() + + async def test_lifecycle_listener_receives_events_in_order(self): + collector = event_collector() + self.cluster.start_member() + client = await self.create_client( + { + "cluster_name": self.cluster.id, + "lifecycle_listeners": [ + collector, + ], + } + ) + await client.shutdown() + self.assertEqual( + collector.events, + [ + LifecycleState.STARTING, + LifecycleState.STARTED, + LifecycleState.CONNECTED, + LifecycleState.SHUTTING_DOWN, + LifecycleState.DISCONNECTED, + LifecycleState.SHUTDOWN, + ], + ) + + async def test_lifecycle_listener_receives_events_in_order_after_startup(self): + self.cluster.start_member() + collector = event_collector() + client = await self.create_client( + { + "cluster_name": self.cluster.id, + } + ) + client.lifecycle_service.add_listener(collector) + await client.shutdown() + self.assertEqual( + collector.events, + [LifecycleState.SHUTTING_DOWN, LifecycleState.DISCONNECTED, LifecycleState.SHUTDOWN], + ) + + async def test_lifecycle_listener_receives_disconnected_event(self): + member = self.cluster.start_member() + collector = event_collector() + client = await self.create_client( + { + "cluster_name": self.cluster.id, + } + ) + client.lifecycle_service.add_listener(collector) + member.shutdown() + + def assertion(): + self.assertEqual(collector.events, [LifecycleState.DISCONNECTED]) + + await self.assertTrueEventually(assertion) + + await client.shutdown() + + async def test_remove_lifecycle_listener(self): + collector = event_collector() + self.cluster.start_member() + client = await self.create_client( + { + "cluster_name": self.cluster.id, + } + ) + registration_id = client.lifecycle_service.add_listener(collector) + client.lifecycle_service.remove_listener(registration_id) + await client.shutdown() + self.assertEqual(collector.events, []) + + async def test_exception_in_listener(self): + def listener(_): + raise RuntimeError("error") + + self.cluster.start_member() + await self.create_client( + { + "cluster_name": self.cluster.id, + "lifecycle_listeners": [ + listener, + ], + } + ) diff --git a/tests/integration/asyncio/proxy/map_test.py b/tests/integration/asyncio/proxy/map_test.py index b63ae9e0fe..c51adfbeb3 100644 --- a/tests/integration/asyncio/proxy/map_test.py +++ b/tests/integration/asyncio/proxy/map_test.py @@ -630,7 +630,7 @@ async def test_add_entry_listener_item_loaded(self): collector = event_collector() await self.map.add_entry_listener(include_value=True, loaded_func=collector) await self.map.put("key", "value", ttl=0.1) - time.sleep(2) + await asyncio.sleep(2) await self.map.get("key") def assert_event(): diff --git a/tests/integration/asyncio/util.py b/tests/integration/asyncio/util.py index e101a58103..6a15d9c8ec 100644 --- a/tests/integration/asyncio/util.py +++ b/tests/integration/asyncio/util.py @@ -1,6 +1,34 @@ +from uuid import uuid4 + +import asyncio + + async def fill_map(map, size=10, key_prefix="key", value_prefix="val"): entries = dict() for i in range(size): entries[key_prefix + str(i)] = value_prefix + str(i) await map.put_all(entries) return entries + + +async def open_connection_to_address(client, uuid): + key = generate_key_owned_by_instance(client, uuid) + m = await client.get_map(str(uuid4())) + await m.put(key, 0) + await m.destroy() + + +def generate_key_owned_by_instance(client, uuid): + while True: + key = str(uuid4()) + partition_id = client.partition_service.get_partition_id(key) + owner = str(client.partition_service.get_partition_owner(partition_id)) + if owner == uuid: + return key + + +async def wait_for_partition_table(client): + m = await client.get_map(str(uuid4())) + while not client.partition_service.get_partition_owner(0): + await m.put(str(uuid4()), 0) + await asyncio.sleep(0.1)