Skip to content

Conversation

@Joshuaalbert
Copy link
Owner

Integates #22 by allowing user to choose asyncio consistent behaviour (of task cancellation) or Trio compatible.

Two implementations:

  1. FairAsyncRLock -- asyncio version name persistent for compat
  2. AnyIOFairAsyncRLock -- anyio version

@Joshuaalbert
Copy link
Owner Author

@JaagupAverin can you review this?

@JaagupAverin
Copy link

Tests passing, works as a drop-in-replacement for my local project as well.
However some warnings by my type checker (basedpyright) that would be nice to get rid of. See PR :)

@x42005e1f
Copy link

It is not enough to use anyio.Event to make the cancellation semantics "different". Under the hood, anyio.Event is just asyncio.Event, with a checkpoint (await asyncio.sleep(0)) added to the event.wait() method for when the event is already set. However, your implementation does not have checkpoints for all cases, and therefore it does not conform to AnyIO semantics, where each asynchronous call checks for cancellation at the start and unconditionally switches to another task at least at the end. See the implementation of anyio.Lock for asyncio.

@x42005e1f
Copy link

The most important place to add await anyio.lowlevel.checkpoint() is "if the lock is free". Right now there is return without checking for cancellation and switching to another task, but after adding the checkpoint the lock will behave the same in any state.

The second place is "if the lock is reentrant", which handles the case of reacquiring the lock. Whether a checkpoint should be added here is debatable. The effect on performance will be purely negative (checkpoints are not about performance at all), but it may help in some complex cases.

In my library, which also provides reentrant locks (but thread-safe, see x42005e1f/aiologic#5 for performance comparisons), there is no checkpoint in the second case. But I think I will change this behavior for the next version.

@JaagupAverin
Copy link

JaagupAverin commented Mar 17, 2025

To be clear, this is a best practices issue and not a logical error you're speaking of (although of course as a library this should conform to best practices anyways)?
And also should this practice be also followed for just asyncio code?
Otherwise its nice to see that library of yours. Will keep it in mind for future reference :)

@x42005e1f
Copy link

There may be expectations from a Trio-compatible library that it will also inherit its semantics. AnyIO adheres to the same semantics. I think this is not a best practices issue, but a compatibility issue.

And no, the asyncio ecosystem has a different model, so this practice does not need to be followed there.

@x42005e1f
Copy link

For example, if you replace trio.Lock or anyio.Lock with AnyIOFairAsyncRLock, and do not use the reentrancy feature, the lock will behave differently from the original locks: if the lock is not held, there will be no checks for cancellation and no switching to another task. Because of this, AnyIOFairAsyncRLock will cause inconvenience where the concept of a checkpoint is strictly enforced, and raise the question of whether it can actually be considered a drop-in replacement.

@JaagupAverin
Copy link

Im thinking from the aspect of where best put these checkpoints in this code. It could be in the anyio specific code but from what I understand it's a good practice to have these checks in any async code anyways.

@x42005e1f
Copy link

x42005e1f commented Mar 17, 2025

I think you could add an additional method to BaseFairAsyncRLock, say _checkpoint(), which you could then override in subclasses: pass for FairAsyncRLock, await anyio.lowlevel.checkpoint() for AnyIOFairAsyncRLock. And place the calls themselves right before return. It remains to add a try-except block for BaseException to correctly handle cancellation: count -= 1 for the case "if the lock is reentrant", self._current_task_release() for the case "if the lock is free".

A combination of anyio.lowlevel.checkpoint_if_cancelled() and anyio.lowlevel.cancel_shielded_checkpoint() may be a better alternative due to clearer semantics, despite lower performance.

@Joshuaalbert
Copy link
Owner Author

Thank you both @JaagupAverin and @x42005e1f for your reviews and discussion. Let's take a staged approach. For 2.0.0 I will make the refactorisation into base class so that users can use AnyIO implementation or asyncio implementation.

To integrate @x42005e1f 's points I'd like to begin by finding a good regression test. If we can pinpoint the divergence of behaviour via a test then the checkpointing will make sense to add.

  1. Do I understand correctly that this semantic is two parts: 1) always check for cancellation at the start of any concurrent block, and 2) always yield control at the end of the block?

  2. @x42005e1f can you produce a single test that shows how AnyIOAsyncRLock diverges from anyio.Lock when not used reentrantly?

  3. Add discussion on any possible impact to reentrant path, and fairness.

@Joshuaalbert
Copy link
Owner Author

I'm also happy to hold-off on merging until we've sorted the above.

@JaagupAverin
Copy link

JaagupAverin commented Mar 18, 2025

I think the easiest and and most dangerous behaviour to test is the endless co-operative while-loop:

async def test_scheduling() -> None:
    lock = AnyIOFairAsyncRLock()

    async def while_loop() -> None:
        while True:
            await lock.acquire()
            #await anyio.sleep(0)

    async with anyio.create_task_group() as tg:
        tg.start_soon(while_loop)
        await anyio.lowlevel.checkpoint()
        tg.cancel_scope.cancel()

Applies to asyncio as well:

@pytest.mark.asyncio
async def test_scheduling():
    lock = FairAsyncRLock()

    async def while_loop():
        while True:
            await lock.acquire()
            #await asyncio.sleep(0)

    t = asyncio.create_task(while_loop())
    await asyncio.sleep(0)  # Give the inner task a chance to run
    t.cancel()
    await t

Both of these loops will spin forever since the cancellation is never checked. Adding a sleep of course fixes the issue.
Personally I was surprised that the await keyword doesn't invoke the scheduler on its own (TIL!). I suppose this is for performance reasons to not have any overhead on such a low level.

I don't personally have any feelings on which level this check should be enforced. @x42005e1f appears to be much more knowledgeable on async code so I'd go with their recommendations.

# Conflicts:
#	src/fair_async_rlock/tests/test_anyio_fair_async_rlock.py
#	src/fair_async_rlock/tests/test_fair_async_rlock.py
* add regression test for nested reentrant (still fails)
@x42005e1f
Copy link

1. Do I understand correctly that this semantic is two parts: 1) always check for cancellation at the start of any concurrent block, and 2) always yield control at the end of the block?

It actually does not matter so much where the cancellation checking is done and where the switching to other tasks takes place. "At the start and at the end" is what it usually looks like. What matters is that both of these actions are unconditional: a successful function call should always do both the check and the switch.

2. @x42005e1f can you produce a single test that shows how `AnyIOAsyncRLock` diverges from `anyio.Lock` when not used reentrantly?
import anyio
import pytest

from fair_async_rlock import AnyIOFairAsyncRLock


@pytest.mark.anyio
async def test_anyio_checkpoints():
    lock = AnyIOFairAsyncRLock()

    async def acquirer():
        async with lock:
            pass

    async def neighbor():
        if not lock.locked():
            await anyio.sleep(0)

        assert lock.locked()

    # check for scheduling
    async with anyio.create_task_group() as tg:
        tg.start_soon(acquirer)
        tg.start_soon(neighbor)

    # check for cancellation
    with anyio.move_on_after(0):
        with pytest.raises(anyio.get_cancelled_exc_class()):
            await acquirer()

@Joshuaalbert
Copy link
Owner Author

I have added 7 tests:

  1. asyncio non-reentrant path
  2. asyncio reentrent path
  3. asyncio reentrant path with nested reentrant path
  • same for anyio

  • test_anyio_checkpoints from @x42005e1f

Both nested reentrant path's are failing. Getting owner!=None and count!=0 after cancelling outer task.

@x42005e1f note, for reentrant path upon cancellation count -= 1 was not sufficient, as the the count after cancel for that owner would still be > 0. Currently, I'm doing this for the reentrant part which simulates cancelling all the reentrant acquires:

# If the lock is reentrant, acquire it immediately
if self.is_owner(task=me):
    self._count += 1
    try:
        await self._checkpoint()
    except self._get_cancelled_exc_class():
        # Cancelled, while reentrant, so release the lock
        self._owner_transfer = False
        self._owner = me
        self._count = 1
        self._current_task_release()
        raise
    return

But, this feel clumsy. There must be a better way. Note, I do a similar thing at line 95, when waiter event isn't found in the queue.

@x42005e1f
Copy link

x42005e1f commented Mar 18, 2025

Note that in this example:

... # 0
async with lock:
    ...  # 1
    async with lock:
        ...  # 2
        async with lock:
            ...  # 3, cancelled!

You should get a sequence of count changes like 0 -> 1 -> 2 -> 3 -> cancelled! -> 2 -> 1 -> 0. That is why I do not think that the lock should be completely released when cancelled: outer lock.release() calls will do that.

@x42005e1f
Copy link

x42005e1f commented Mar 18, 2025

Personally I was surprised that the await keyword doesn't invoke the scheduler on its own (TIL!). I suppose this is for performance reasons to not have any overhead on such a low level.

It is much more prosaic than that. An asynchronous function is a coroutine factory. A coroutine is a slightly extended generator. await is just a yield from with additional checks. And communication with the event loop happens via yield (not to be confused with asynchronous generators), under the hood of __await__() methods. If await something does not result in at least one yield (usually yield future), there is no communication with the event loop, and there is no check for cancellation or switching to other tasks.

example.py
import asyncio


class Checkpoint:
    def __await__(self):
        print("before")
        yield None  # asyncio-specific!
        print("after")


class NotCheckpoint:
    def __await__(self):
        print("before")
        yield from ()  # no yield
        print("after")


async def printer(string):
    print(string)


async def test():
    asyncio.create_task(printer("from task 1"))

    await Checkpoint()

    # before
    # from task 1
    # after

    asyncio.create_task(printer("from task 2"))

    await NotCheckpoint()

    # before
    # after

    await asyncio.sleep(0)

    # from task 2

    print(list(Checkpoint().__await__()))  # [None]
    print(list(NotCheckpoint().__await__()))  # []
    print(list(asyncio.sleep(0).__await__()))  # [None]

    future = asyncio.get_running_loop().create_future()
    future_it = future.__await__()

    print(next(future_it))  # <Future pending> -- yield future

    future.set_result("result")

    try:
        next(future_it)
    except StopIteration as exc:
        print(exc.value)  # result

    print(await future)  # result


asyncio.run(test())

@Joshuaalbert
Copy link
Owner Author

Joshuaalbert commented Mar 18, 2025

You should get a sequence of count changes like 0 -> 1 -> 2 -> 3 -> cancelled! -> 2 -> 1 -> 0

@x42005e1f this property is already met in this test (which I just added). But, this one is failing.

@pytest.mark.asyncio
async def test_chained_lock_count_reentrant():
    lock = FairAsyncRLock()

    async def c():
        async with lock:
            assert lock._count == 3
            await asyncio.get_running_loop().create_future()

    async def b():
        async with lock:
            assert lock._count == 2
            try:
                await c()
            except asyncio.CancelledError:
                assert lock._count == 2
                raise

    async def a():
        async with lock:
            assert lock._count == 1
            try:
                await b()
            except asyncio.CancelledError:
                assert lock._count == 1
                raise

    task = asyncio.create_task(a())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        assert lock._count == 0
        assert lock._owner == None

@x42005e1f
Copy link

x42005e1f commented Mar 18, 2025

No, that is not what I meant.

import asyncio

import pytest

from fair_async_rlock import FairAsyncRLock


@pytest.mark.asyncio
async def test_chained_lock_count_reentrant():
    lock = FairAsyncRLock()

    async def b():
        assert lock._count == 1
        assert lock._owner is not None

        asyncio.current_task().cancel()

        with pytest.raises(asyncio.CancelledError):
            await lock.acquire()

        assert lock._count == 1
        assert lock._owner is not None

    async def a():
        assert lock._count == 0
        assert lock._owner is None

        async with lock:
            await b()

        assert lock._count == 0
        assert lock._owner is None

    await a()

It fails with RuntimeError: Cannot release un-acquired lock.

@Joshuaalbert
Copy link
Owner Author

@x42005e1f I'm not getting the same error for your test_chained_lock_count_reentrant. It fails at assert lock._count == 1 after the cancel because the count is decremented to zero, which is your main point. It's a good test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants