Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.

Commit 0eaea4f

Browse files
Refactor
1 parent 9905524 commit 0eaea4f

File tree

2 files changed

+40
-27
lines changed

2 files changed

+40
-27
lines changed

mars/oscar/backends/pool.py

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -494,36 +494,44 @@ async def stop(self):
494494
def stopped(self) -> bool:
495495
return self._stopped.is_set()
496496

497+
@classmethod
498+
async def _recv_message(cls, channel: Channel):
499+
try:
500+
return await channel.recv()
501+
except EOFError:
502+
# no data to read, check channel
503+
try:
504+
await channel.close()
505+
except (ConnectionError, EOFError):
506+
# close failed, ignore
507+
pass
508+
497509
async def on_new_channel(self, channel: Channel):
498-
is_transfer = False
510+
message = await self._recv_message(channel)
511+
if (
512+
message.message_type == MessageType.control
513+
and message.control_message_type == ControlMessageType.switch_to_transfer
514+
):
515+
# switch this channel to data transfer channel
516+
# the channel will be handed over to TransferServer
517+
# and this loop will exit
518+
return await TransferServer.handle_transfer_channel(channel, self._stopped)
519+
else:
520+
asyncio.create_task(self.process_message(message, channel))
521+
# delete to release the reference of message
522+
del message
523+
await asyncio.sleep(0)
524+
525+
# continue to keep processing messages
499526
while not self._stopped.is_set():
500-
try:
501-
message = await channel.recv()
502-
except EOFError:
503-
# no data to read, check channel
504-
try:
505-
await channel.close()
506-
except (ConnectionError, EOFError):
507-
# close failed, ignore
508-
pass
509-
return
510-
if (
511-
message.message_type == MessageType.control
512-
and message.control_message_type
513-
== ControlMessageType.switch_to_transfer
514-
):
515-
# switch this channel to data transfer channel
516-
# the channel will be handed over to TansferServer
517-
is_transfer = True
527+
message = await self._recv_message(channel)
528+
if message is None:
518529
break
519530
asyncio.create_task(self.process_message(message, channel))
520531
# delete to release the reference of message
521532
del message
522533
await asyncio.sleep(0)
523534

524-
if is_transfer:
525-
await TransferServer.handle_transfer_channel(channel)
526-
527535
async def __aenter__(self):
528536
await self.start()
529537
return self

mars/oscar/backends/transfer.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,18 @@ async def _catch_error(channel: Channel, message_id: bytes):
193193

194194
class TransferServer:
195195
@classmethod
196-
async def handle_transfer_channel(cls, channel: Channel):
197-
while True:
196+
async def handle_transfer_channel(cls, channel: Channel, stopped: asyncio.Event):
197+
while not stopped.is_set():
198198
try:
199199
message = await channel.recv()
200-
except EOFError:
201-
# closed
202-
break
200+
except EOFError: # pragma: no cover
201+
# no data to read, check channel
202+
try:
203+
await channel.close()
204+
except (ConnectionError, EOFError):
205+
# close failed, ignore
206+
pass
207+
return
203208
assert message.message_type in (
204209
MessageType.copyto_buffers,
205210
MessageType.copyto_fileobjects,

0 commit comments

Comments
 (0)