Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.

Commit dbd7ad7

Browse files
Increase coverage
1 parent 35405bf commit dbd7ad7

File tree

5 files changed

+12
-10
lines changed

5 files changed

+12
-10
lines changed

mars/oscar/backends/communication/socket.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ async def stop(self):
159159
await asyncio.gather(
160160
*(channel.close() for channel in self._channels if not channel.closed)
161161
)
162+
self._channels = []
162163

163164
@property
164165
@implements(Server.stopped)

mars/oscar/backends/communication/ucx.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ async def recv_buffers(self, buffers: list):
308308
try:
309309
for buffer in buffers:
310310
await self.ucp_endpoint.recv(buffer)
311-
except BaseException as e:
311+
except BaseException as e: # pragma: no cover
312312
if not self._closed:
313313
# In addition to UCX exceptions, may be CancelledError or another
314314
# "low-level" exception. The only safe thing to do is to abort.
@@ -455,6 +455,7 @@ async def stop(self):
455455
await asyncio.gather(
456456
*(channel.close() for channel in self._channels if not channel.closed)
457457
)
458+
self._channels = []
458459
self._ucp_listener = None
459460
self._closed.set()
460461

mars/oscar/backends/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def __init__(self):
3939
self._clients: Dict[Client, asyncio.Task] = dict()
4040

4141
async def get_client(self, router: Router, dest_address: str) -> Client:
42-
client = await router.get_client(dest_address, from_who=self)
42+
client = await router.get_client(dest_address, from_who=type(self))
4343
if client not in self._clients:
4444
self._clients[client] = asyncio.create_task(self._listen(client))
4545
self._client_to_message_futures[client] = dict()

mars/oscar/backends/router.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async def get_client_via_type(
191191
return cached_client
192192

193193
client_type_to_addresses = self._get_client_type_to_addresses(external_address)
194-
if client_type not in client_type_to_addresses:
194+
if client_type not in client_type_to_addresses: # pragma: no cover
195195
raise ValueError(
196196
f"Client type({client_type}) is not supported for {external_address}"
197197
)

mars/oscar/backends/transfer.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ def __init__(
7373
@staticmethod
7474
@abstractmethod
7575
def _new_message(ref: Union[BufferRef, FileObjectRef], buf: Any):
76-
pass
76+
"""new message"""
7777

7878
@abstractmethod
7979
async def _read(self, index: int, buffer_or_fileobj: Any):
80-
pass
80+
"""read data"""
8181

8282
async def _send_one(
8383
self,
@@ -142,14 +142,14 @@ def __init__(
142142

143143
@abstractmethod
144144
async def _write(self, index: int, buffer_or_fileobj: Any):
145-
pass
145+
"""write data"""
146146

147147
@staticmethod
148148
@staticmethod
149149
def _get_ref_from_message(
150150
message: Union[CopytoBuffersMessage, CopytoFileObjectsMessage]
151151
):
152-
pass
152+
"""get ref according to message"""
153153

154154
async def _recv_part(self, buf: Any, index: int, message_id: bytes):
155155
async with _catch_error(self.channel, message_id):
@@ -285,7 +285,7 @@ async def copyto_via_buffers(
285285
# do not support buffer copy
286286
# send data in batches
287287
client, is_cached = await router.get_client(
288-
address, from_who=self, return_from_cache=True
288+
address, from_who=type(self), return_from_cache=True
289289
)
290290
if not is_cached:
291291
# tell server to switch to transfer dedicated channel
@@ -299,7 +299,7 @@ async def copyto_via_buffers(
299299
)
300300
else:
301301
client, is_cached = await router.get_client_via_type(
302-
address, client_type, from_who=self, return_from_cache=True
302+
address, client_type, from_who=type(self), return_from_cache=True
303303
)
304304
if not is_cached:
305305
# tell server to switch to transfer dedicated channel
@@ -339,7 +339,7 @@ async def copyto_via_file_objects(
339339
), "`copyto_via_file_objects` can only be used inside pools"
340340
address = remote_file_object_refs[0].address
341341
client, is_cached = await router.get_client(
342-
address, from_who=self, return_from_cache=True
342+
address, from_who=type(self), return_from_cache=True
343343
)
344344
if not is_cached:
345345
# tell server to switch to transfer dedicated channel

0 commit comments

Comments
 (0)