Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.

Commit 35405bf

Browse files
Parallelize send&recv in batches
1 parent 4c0be13 commit 35405bf

File tree

3 files changed

+245
-70
lines changed

3 files changed

+245
-70
lines changed

mars/oscar/backends/message.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,13 @@ class CopytoBuffersMessage(_MessageBase):
219219
message_type = MessageType.copyto_buffers
220220

221221
buffer_refs: List[BufferRef]
222+
content: object
222223

223224
def __int__(
224225
self,
225226
message_id: bytes = None,
226227
buffer_refs: List[BufferRef] = None,
228+
content: object = None,
227229
protocol: int = DEFAULT_PROTOCOL,
228230
message_trace: list = None,
229231
): ...
@@ -232,11 +234,13 @@ class CopytoFileObjectsMessage(_MessageBase):
232234
message_type = MessageType.copyto_file_objects
233235

234236
fileobj_refs: List[FileObjectRef]
237+
content: object
235238

236239
def __int__(
237240
self,
238241
message_id: bytes = None,
239242
fileobj_refs: List[FileObjectRef] = None,
243+
content: object = None,
240244
protocol: int = DEFAULT_PROTOCOL,
241245
message_trace: list = None,
242246
): ...

mars/oscar/backends/message.pyx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,11 +493,13 @@ cdef class CopytoBuffersMessage(_MessageBase):
493493

494494
cdef:
495495
public list buffer_refs
496+
public object content
496497

497498
def __init__(
498499
self,
499500
bytes message_id = None,
500501
list buffer_refs = None,
502+
object content = None,
501503
int protocol = _DEFAULT_PROTOCOL,
502504
list message_trace = None,
503505
):
@@ -508,6 +510,7 @@ cdef class CopytoBuffersMessage(_MessageBase):
508510
message_trace=message_trace
509511
)
510512
self.buffer_refs = buffer_refs
513+
self.content = content
511514

512515
cdef _MessageSerialItem serial(self):
513516
cdef _MessageSerialItem item = _MessageBase.serial(self)
@@ -516,6 +519,7 @@ cdef class CopytoBuffersMessage(_MessageBase):
516519
buffer_ref.address, buffer_ref.uid
517520
)
518521
item.serialized += (len(self.buffer_refs),)
522+
item.subs = [self.content]
519523
return item
520524

521525
cdef deserial_members(self, tuple serialized, list subs):
@@ -526,17 +530,20 @@ cdef class CopytoBuffersMessage(_MessageBase):
526530
refs.append(BufferRef(address, uid))
527531
assert len(refs) == size
528532
self.buffer_refs = refs
533+
self.content = subs[0]
529534

530535
cdef class CopytoFileObjectsMessage(_MessageBase):
531536
message_type = MessageType.copyto_fileobjects
532537

533538
cdef:
534539
public list fileobj_refs
540+
public object content
535541

536542
def __init__(
537543
self,
538544
bytes message_id = None,
539545
list fileobj_refs = None,
546+
object content = None,
540547
int protocol = _DEFAULT_PROTOCOL,
541548
list message_trace = None,
542549
):
@@ -547,6 +554,7 @@ cdef class CopytoFileObjectsMessage(_MessageBase):
547554
message_trace=message_trace
548555
)
549556
self.fileobj_refs = fileobj_refs
557+
self.content = content
550558

551559
cdef _MessageSerialItem serial(self):
552560
cdef _MessageSerialItem item = _MessageBase.serial(self)
@@ -555,6 +563,7 @@ cdef class CopytoFileObjectsMessage(_MessageBase):
555563
fileobj_ref.address, fileobj_ref.uid
556564
)
557565
item.serialized += (len(self.fileobj_refs),)
566+
item.subs = [self.content]
558567
return item
559568

560569
cdef deserial_members(self, tuple serialized, list subs):
@@ -565,6 +574,7 @@ cdef class CopytoFileObjectsMessage(_MessageBase):
565574
refs.append(FileObjectRef(address, uid))
566575
assert len(refs) == size
567576
self.fileobj_refs = refs
577+
self.content = subs[0]
568578

569579

570580
cdef dict _message_type_to_message_cls = {

0 commit comments

Comments
 (0)