Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.

Commit fdda468

Browse files
Fix
1 parent dbd7ad7 commit fdda468

File tree

2 files changed

+56
-46
lines changed

2 files changed

+56
-46
lines changed

mars/oscar/backends/communication/ucx.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ async def serve_forever(client_ucp_endpoint: "ucp.Endpoint"):
403403
client_ucp_endpoint, local_address=server.address
404404
)
405405
except ChannelClosed: # pragma: no cover
406-
logger.debug("Connection closed before handshake completed")
406+
logger.exception("Connection closed before handshake completed")
407407
return
408408

409409
ucp_listener = ucp.create_listener(serve_forever, port=port)
@@ -492,8 +492,11 @@ async def connect(
492492

493493
try:
494494
ucp_endpoint = await ucp.create_endpoint(host, port)
495-
except ucp.exceptions.UCXBaseException: # pragma: no cover
496-
raise ChannelClosed("Connection closed before handshake completed")
495+
except ucp.exceptions.UCXBaseException as e: # pragma: no cover
496+
raise ChannelClosed(
497+
f"Connection closed before handshake completed, "
498+
f"local address: {local_address}, dest address: {dest_address}"
499+
) from e
497500
channel = UCXChannel(
498501
ucp_endpoint, local_address=local_address, dest_address=dest_address
499502
)

mars/oscar/backends/router.py

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import asyncio
1516
import threading
1617
from typing import Dict, List, Tuple, Type, Any, Optional, Union
1718

@@ -29,6 +30,7 @@ class Router:
2930
"_mapping",
3031
"_comm_config",
3132
"_cache_local",
33+
"_lock",
3234
)
3335

3436
_instance: "Router" = None
@@ -62,6 +64,7 @@ def __init__(
6264
self._mapping = mapping
6365
self._comm_config = comm_config or dict()
6466
self._cache_local = threading.local()
67+
self._lock = asyncio.Lock()
6568

6669
@property
6770
def _cache(self) -> Dict[Tuple[str, Any, Optional[Type[Client]]], Client]:
@@ -115,29 +118,30 @@ async def get_client(
115118
return_from_cache=False,
116119
**kw,
117120
) -> Union[Client, Tuple[Client, bool]]:
118-
if cached and (external_address, from_who, None) in self._cache:
119-
cached_client = self._cache[external_address, from_who, None]
120-
if cached_client.closed:
121-
# closed before, ignore it
122-
del self._cache[external_address, from_who, None]
123-
else:
124-
if return_from_cache:
125-
return cached_client, True
121+
async with self._lock:
122+
if cached and (external_address, from_who, None) in self._cache:
123+
cached_client = self._cache[external_address, from_who, None]
124+
if cached_client.closed:
125+
# closed before, ignore it
126+
del self._cache[external_address, from_who, None]
126127
else:
127-
return cached_client
128-
129-
address = self.get_internal_address(external_address)
130-
if address is None:
131-
# no inner address, just use external address
132-
address = external_address
133-
client_type: Type[Client] = get_client_type(address)
134-
client = await self._create_client(client_type, address, **kw)
135-
if cached:
136-
self._cache[external_address, from_who, None] = client
137-
if return_from_cache:
138-
return client, False
139-
else:
140-
return client
128+
if return_from_cache:
129+
return cached_client, True
130+
else:
131+
return cached_client
132+
133+
address = self.get_internal_address(external_address)
134+
if address is None:
135+
# no inner address, just use external address
136+
address = external_address
137+
client_type: Type[Client] = get_client_type(address)
138+
client = await self._create_client(client_type, address, **kw)
139+
if cached:
140+
self._cache[external_address, from_who, None] = client
141+
if return_from_cache:
142+
return client, False
143+
else:
144+
return client
141145

142146
async def _create_client(
143147
self, client_type: Type[Client], address: str, **kw
@@ -179,27 +183,30 @@ async def get_client_via_type(
179183
return_from_cache=False,
180184
**kw,
181185
) -> Union[Client, Tuple[Client, bool]]:
182-
if cached and (external_address, from_who, client_type) in self._cache:
183-
cached_client = self._cache[external_address, from_who, client_type]
184-
if cached_client.closed:
185-
# closed before, ignore it
186-
del self._cache[external_address, from_who, client_type]
187-
else:
188-
if return_from_cache:
189-
return cached_client, True
186+
async with self._lock:
187+
if cached and (external_address, from_who, client_type) in self._cache:
188+
cached_client = self._cache[external_address, from_who, client_type]
189+
if cached_client.closed:
190+
# closed before, ignore it
191+
del self._cache[external_address, from_who, client_type]
190192
else:
191-
return cached_client
193+
if return_from_cache:
194+
return cached_client, True
195+
else:
196+
return cached_client
192197

193-
client_type_to_addresses = self._get_client_type_to_addresses(external_address)
194-
if client_type not in client_type_to_addresses: # pragma: no cover
195-
raise ValueError(
196-
f"Client type({client_type}) is not supported for {external_address}"
198+
client_type_to_addresses = self._get_client_type_to_addresses(
199+
external_address
197200
)
198-
address = client_type_to_addresses[client_type]
199-
client = await self._create_client(client_type, address, **kw)
200-
if cached:
201-
self._cache[external_address, from_who, client_type] = client
202-
if return_from_cache:
203-
return client, False
204-
else:
205-
return client
201+
if client_type not in client_type_to_addresses: # pragma: no cover
202+
raise ValueError(
203+
f"Client type({client_type}) is not supported for {external_address}"
204+
)
205+
address = client_type_to_addresses[client_type]
206+
client = await self._create_client(client_type, address, **kw)
207+
if cached:
208+
self._cache[external_address, from_who, client_type] = client
209+
if return_from_cache:
210+
return client, False
211+
else:
212+
return client

0 commit comments

Comments
 (0)