-
Notifications
You must be signed in to change notification settings - Fork 73
[API-2326] Initial Asyncio Module PR [1/6] #741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #741 +/- ##
==========================================
- Coverage 95.33% 93.48% -1.86%
==========================================
Files 378 389 +11
Lines 21992 24404 +2412
==========================================
+ Hits 20967 22815 +1848
- Misses 1025 1589 +564 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I've added the diff between modules in this PR vs their counterparts in the old API to the PR description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a quick look.
For others: probably easier to use https://difftastic.wilfred.me.uk/introduction.html to make life easier.
Haven't studied the reactor in-detail and associated logic in detail. From the diff it looked like the changes were relatively minor, albeit many of them.
I can review those in-depth if needed (or you cannot find people).
hazelcast/asyncio/client.py
Outdated
| _CLIENT_ID = AtomicInteger() | ||
|
|
||
| @classmethod | ||
| async def create_and_start(cls, config: Config = None, **kwargs) -> "HazelcastClient": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General note -- I'm sure there are others.
If we're trying to get this API nice w.r.t. typing as well then this will probably show some error by default as None is not applicable for Config. Might be more prominent here as I think this is entry point into client creation.
Same for __init__ -- config: Config | None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be missing something in the diff: in client.py there's a load of documentation -- is it elsewhere, or why omit it for this one? (most likely doc applicable from __init__)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: Docs, I mentioned that in the PR description:
I didn't include the API docs, in order to make the PR smaller. I'll add them in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use better diff tool it doesn't make a difference. Give that one I mentioned a go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can review those in-depth if needed (or you cannot find people).
Yes, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least with PyCharm, the user gets the correct type annotation.
There is this project in case you'd like to try the asyncio module:
https://github.com/yuce/hazelcast-asyncio-sample
In any case, I pushed a PR that adds explicit |None:
baa3bc1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PyCharm may be more forgiving -- I've never used it. pyright or pyrefly are good tools to use to determine compliance to the type invariants specified using type annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We check the typings with mypy.
| raise | ||
| _logger.info("Client started") | ||
|
|
||
| async def get_map(self, name: str) -> Map[KeyType, ValueType]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intention is only to support also VC in near (immediate) term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map is the only proxy included, in order to make the PR small. Other proxies, except VC, may be excluded from the beta release.
| } | ||
|
|
||
|
|
||
| class ProxyManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to others: is in proxy/__init__.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the diff that I provided in the description I've shown that, but maybe it wasn't easy to notice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff was not good.
| from hazelcast.internal.asyncio_connection import Connection | ||
| from hazelcast.core import Address | ||
|
|
||
| _BUFFER_SIZE = 128000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best to define centrally if possible given it's used across reactors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they should be independent with each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other constant hasn't been changed for ~4 years, same value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's not a big probability that it will change.
But I would either have to import it from hazelcast.reactor, or refactor the code so both the asyncore and the asyncio reactor imported it from a common module.
That either introduces a dependency between those reactor modules, or require changes in the "old" Python code, which I tried to avoid.
| def shutdown(self): | ||
| if not self._is_live: | ||
| return | ||
| # TODO: cancel tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compared to reactor.py is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start, shutdown are not necessary for the AsyncioReactor.
Removed them in the 3rd PR with this commit:
58783dc
| _CLIENT_ID = AtomicInteger() | ||
|
|
||
| @classmethod | ||
| async def create_and_start(cls, config: Config | None = None, **kwargs) -> "HazelcastClient": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also StartNewClientAsync in .net. Also, it's initialized over a factory class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would choose the name as new_hazelcast_client to be more consistent and easily understandable by any existing Hazelcast users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self._cluster_connect_timeout_text, | ||
| self._max_backoff, | ||
| ) | ||
| time.sleep(sleep_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.sleep ? called on line 550
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've fixed the WaitStrategy in 91bf1d1
|
|
||
| def callback(future): | ||
| try: | ||
| schema = future.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does .result() block if it's not ready? is it not something like await future, then future.result() or schema = await future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it doesn't block.
Trying to get the result when !future.done() raises an exception.
The fetch_schema_future.add_done_callback(callback) a few lines below makes sure that callback receives a done future.
ihsandemir
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will continue to review.
| _CLIENT_ID = AtomicInteger() | ||
|
|
||
| @classmethod | ||
| async def create_and_start(cls, config: Config | None = None, **kwargs) -> "HazelcastClient": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would choose the name as new_hazelcast_client to be more consistent and easily understandable by any existing Hazelcast users.
|
@ihsandemir #741 (comment) Let's discuss about that on the TDD. |
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return self._name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, the missing docs will be part of next PRs to reduce line of changes here.
| ) | ||
| return await self._invoke(request, handler) | ||
|
|
||
| async def flush(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why force_unlock and lock are removed?
ihsandemir
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at code coverage report #741 (comment), new code is around 76% covered. Will you complete it to 100%?
There are 5 more PRs that port more tests. |
emreyigit
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have anything left for the first PR, thanks.
gbarnett-hz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a few scans -- PR is too large to be that confident. I had a few notes though that I forgot from last time.
| except KeyError: | ||
| partition_map[partition_id] = [entry] | ||
|
|
||
| async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we handling < 3.11? https://github.com/hazelcast/hazelcast-python-client/blob/master/setup.py
TaskGroup added in 3.11 https://docs.python.org/3.11/library/asyncio-task.html#asyncio.TaskGroup
is it possible to use gather? add created tasks into list then gather the list and handle cancellation+exception handling ourselves? If we catch exception then manually cancel the tasks + then have return_exceptions=True?
Or, use TaskGroup for when >= 3.11. Otherwise, we need to assess impact of removing 3.7 support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.7 was left unintentionally.
3.11 is the minimum we'll support.
It's possible to approximate ThreadGroup with `gather, but I think it's best to use it directly to make our code a bit more robust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's more difficult -- the analysis to determine EOL'ing 3.7 or the technical compromise?
| def close(self): | ||
| self._transport.close() | ||
|
|
||
| def write(self, buf): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not just _transport.write(buf)? I think this already does what you do in _write_loop without polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that causes buf to be written immediately, which is not ideal.
_write_loop tries to collect more data before flushing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the implementations:
https://github.com/python/cpython/blob/9c4ff8a615af3f1e746fcfe5cdb6af5ddfb3b6a7/Lib/asyncio/selector_events.py#L1071
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks like it is doing what we want for a non-blocking socket unless I misread something: the send will complete later if the call send on the socket would block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the send will complete later if the call send on the socket would block.
The problem is, it tries to write the data it gets immediately, no matter how small the data is.
Waiting for a bit for more data to arrive is more efficient.
In my first try, I used _transport.write directly, but then switched to the write loop afterwards, since it was more efficient.
To confirm that again, I did a simple benchmark again today, and in all tries, the throughput when writing directly is worse than the write loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the performance benefits? IMO it's better to integrate as-is rather than invent what looks to be similar to what they are doing unless the improvements are compelling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The numbers I got were 15% to 70% worse when I used _transport.write directly.
What they do is not similar.
Note that the asyncore API variant of the client also tries to batch socket writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have the results + test you can share? Personally I would go with the _transport.write(...) as it seems to be the idiomatic way unless the difference between approaches when you consider avg + tail latencies is significant.
This is the initial asyncio support.
asynciomodule which contains public asyncio APIinternalmodule andinternal/asyncio_modules, which contains the private asyncio API/implementation.tests/integration/asyncio/authentication_teststests/integration/asyncio/backup_acks_teststests/integration/asyncio/client_test(one test is not ported, due to its Topic DDS dependency)tests/integration/asyncio/proxy/map_testMost of the code in this PR was duplicated to the
internalmodule by prefixing them withasyncio_. For exampleROOT/cluster.pywas duplicated/modifed asinternal/asyncio_cluster.pyHere is the diff between modules in this PR vs their counterparts in the old API:
https://gist.github.com/yuce/56e79a29a1d4d1d996788381d489c0a4