This repository was archived by the owner on Feb 8, 2023. It is now read-only.
File tree Expand file tree Collapse file tree 3 files changed +21
-4
lines changed Expand file tree Collapse file tree 3 files changed +21
-4
lines changed Original file line number Diff line number Diff line change 3939)
4040from .backends import allocate_strategy
4141from .backends .pool import MainActorPoolType
42+ from .backends .transfer import temp_transfer_block_size
4243from .batch import extensible
4344from .core import (
4445 ActorRef ,
Original file line number Diff line number Diff line change 4040DEFAULT_TRANSFER_BLOCK_SIZE = 4 * 1024 ** 2
4141
4242
43+ @contextlib .contextmanager
44+ def temp_transfer_block_size (size : int ):
45+ global DEFAULT_TRANSFER_BLOCK_SIZE
46+
47+ if size == DEFAULT_TRANSFER_BLOCK_SIZE :
48+ yield
49+ else :
50+ default_size = DEFAULT_TRANSFER_BLOCK_SIZE
51+ DEFAULT_TRANSFER_BLOCK_SIZE = size
52+ try :
53+ yield
54+ finally :
55+ DEFAULT_TRANSFER_BLOCK_SIZE = default_size
56+
57+
4358def _get_buffer_size (buf ) -> int :
4459 try :
4560 return buf .nbytes
Original file line number Diff line number Diff line change @@ -207,10 +207,11 @@ async def send_batch_data(
207207 rest_keys .append (data_key )
208208
209209 if local_buffers :
210- # for data that supports buffer protocol on both sides
211- # hand over to oscar to transfer data
212- await mo .copyto_via_buffers (local_buffers , remote_buffer_refs )
213- await receiver_ref .close_writers (session_id , copied_keys )
210+ with mo .temp_transfer_block_size (block_size ):
211+ # for data that supports buffer protocol on both sides
212+ # hand over to oscar to transfer data
213+ await mo .copyto_via_buffers (local_buffers , remote_buffer_refs )
214+ await receiver_ref .close_writers (session_id , copied_keys )
214215 else :
215216 rest_keys = to_send_keys
216217 rest_readers = readers
You can’t perform that action at this time.
0 commit comments