Skip to content

Commit 5dfac26

Browse files
committed
more test fix
1 parent 1db0b7d commit 5dfac26

File tree

12 files changed

+579
-166
lines changed

12 files changed

+579
-166
lines changed

salt/channel/client.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,10 @@ async def _send_with_retry(self, load, tries, timeout):
197197
)
198198
break
199199
except Exception as exc: # pylint: disable=broad-except
200-
log.trace("Failed to send msg %r", exc)
201-
if _try >= tries:
200+
log.trace(
201+
"Failed to send msg attempt=%s tries=%s exc=%r", _try, tries, exc
202+
)
203+
if tries is not None and tries > 0 and _try >= tries:
202204
raise
203205
else:
204206
_try += 1
@@ -513,6 +515,12 @@ async def connect(self):
513515
publish_port = self.auth.creds["publish_port"]
514516
# TODO: The zeromq transport does not use connect_callback and
515517
# disconnect_callback.
518+
log.debug(
519+
"AsyncPubChannel connecting to %s:%s (authenticated=%s)",
520+
self.opts.get("master_ip", "127.0.0.1"),
521+
publish_port,
522+
self.auth.authenticated,
523+
)
516524
await self.transport.connect(
517525
publish_port, self.connect_callback, self.disconnect_callback
518526
)
@@ -598,6 +606,17 @@ async def connect_callback(self, result):
598606
try:
599607
# Force re-auth on reconnect since the master
600608
# may have been restarted
609+
try:
610+
self.token = self.auth.gen_token(b"salt")
611+
except Exception: # pylint: disable=broad-except
612+
log.exception(
613+
"Failed to generate authentication token for publish channel"
614+
)
615+
log.debug(
616+
"AsyncPubChannel connect_callback (reconnected=%s, authenticated=%s)",
617+
self._reconnected,
618+
self.auth.authenticated,
619+
)
601620
await self.send_id(self.token, self._reconnected)
602621
self.connected = True
603622
self.event.fire_event({"master": self.opts["master"]}, "__master_connected")
@@ -643,6 +662,15 @@ def disconnect_callback(self):
643662
if self._closing:
644663
return
645664
self.connected = False
665+
if getattr(self, "auth", None) is not None:
666+
try:
667+
self.auth.invalidate()
668+
except Exception: # pylint: disable=broad-except
669+
log.exception("Failed to invalidate authentication after disconnect")
670+
log.debug(
671+
"AsyncPubChannel disconnect_callback invoked (reconnected=%s)",
672+
self._reconnected,
673+
)
646674
self.event.fire_event({"master": self.opts["master"]}, "__master_disconnected")
647675

648676
def _verify_master_signature(self, payload):
@@ -672,6 +700,10 @@ async def _decode_payload(self, payload):
672700
try:
673701
payload["load"] = self.auth.crypticle.loads(payload["load"])
674702
except salt.crypt.AuthenticationError:
703+
log.debug(
704+
"AsyncPubChannel payload decrypt failed, attempting re-auth (authenticated=%s)",
705+
self.auth.authenticated,
706+
)
675707
reauth = True
676708
if reauth:
677709
try:

salt/crypt.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,14 @@ def authenticate(self, callback=None):
821821
This function will de-dupe all calls here and return a *single* future
822822
for the sign-in-- whis way callers can all assume there aren't others
823823
"""
824+
log.debug(
825+
"%s authenticate() invoked (future_exists=%s, future_done=%s)",
826+
self,
827+
hasattr(self, "_authenticate_future"),
828+
getattr(
829+
getattr(self, "_authenticate_future", None), "done", lambda: False
830+
)(),
831+
)
824832
# if an auth is in flight-- and not done-- just pass that back as the future to wait on
825833
if (
826834
hasattr(self, "_authenticate_future")
@@ -856,20 +864,58 @@ async def _authenticate(self):
856864
:rtype: Crypticle
857865
:returns: A crypticle used for encryption operations
858866
"""
867+
log.debug("%s starting _authenticate()", self)
859868
acceptance_wait_time = self.opts["acceptance_wait_time"]
860869
acceptance_wait_time_max = self.opts["acceptance_wait_time_max"]
861870
if not acceptance_wait_time_max:
862871
acceptance_wait_time_max = acceptance_wait_time
863872
creds = None
873+
log.debug(
874+
"%s auth config timeout=%s tries=%s safemode=%s",
875+
self,
876+
self.opts.get("auth_timeout"),
877+
self.opts.get("auth_tries"),
878+
self.opts.get("auth_safemode"),
879+
)
864880

865881
with salt.channel.client.AsyncReqChannel.factory(
866882
self.opts, crypt="clear", io_loop=self._loop_raw
867883
) as channel:
868884
error = None
885+
attempt = 0
886+
max_attempts = self.opts.get("auth_tries", 0) or 0
869887
while True:
888+
attempt += 1
870889
try:
871-
creds = await self.sign_in(channel=channel)
890+
creds = await self.sign_in(channel=channel, tries=1)
891+
log.debug("%s sign_in returned %r", self, creds)
872892
except SaltClientError as exc:
893+
message = str(exc)
894+
if any(
895+
pattern in message
896+
for pattern in (
897+
"Attempt to authenticate with the salt master failed",
898+
"-|RETRY|-",
899+
)
900+
):
901+
log.warning(
902+
"%s sign_in transient failure: %s (will retry)",
903+
self,
904+
message,
905+
)
906+
if acceptance_wait_time:
907+
await asyncio.sleep(acceptance_wait_time)
908+
if acceptance_wait_time < acceptance_wait_time_max:
909+
acceptance_wait_time += acceptance_wait_time
910+
log.debug(
911+
"Authentication wait time is %s",
912+
acceptance_wait_time,
913+
)
914+
if max_attempts > 0 and attempt >= max_attempts:
915+
error = exc
916+
break
917+
continue
918+
log.warning("%s sign_in raised fatal error: %s", self, exc)
873919
error = exc
874920
break
875921
if creds == "retry":
@@ -902,6 +948,13 @@ async def _authenticate(self):
902948
log.debug(
903949
"Authentication wait time is %s", acceptance_wait_time
904950
)
951+
if max_attempts > 0 and attempt >= max_attempts:
952+
error = SaltClientError(
953+
"Authentication retries exhausted for {}".format(
954+
self.opts["id"]
955+
)
956+
)
957+
break
905958
continue
906959
elif creds == "bad enc algo":
907960
log.error(
@@ -929,6 +982,11 @@ async def _authenticate(self):
929982
)
930983
self._authenticate_future.set_exception(error)
931984
else:
985+
log.debug(
986+
"%s received credentials (publish_port=%s)",
987+
self,
988+
creds.get("publish_port"),
989+
)
932990
key = self.__key(self.opts)
933991
new_aes, changed_aes, changed_session = False, False, False
934992
if key not in AsyncAuth.creds_map:
@@ -947,6 +1005,13 @@ async def _authenticate(self):
9471005
self._crypticle = Crypticle(self.opts, creds["aes"])
9481006
self._session_crypticle = Crypticle(self.opts, creds["session"])
9491007

1008+
log.debug(
1009+
"%s authenticated with master %s (publish_port=%s)",
1010+
self,
1011+
self.opts.get("master"),
1012+
creds.get("publish_port"),
1013+
)
1014+
9501015
self._authenticate_future.set_result(
9511016
True
9521017
) # mark the sign-in as complete

salt/metaproxy/deltaproxy.py

Lines changed: 73 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,25 @@ async def _sync_all_task():
228228
salt.engines.start_engines, self.opts, self.process_manager, proxy=self.proxy
229229
)
230230

231+
# At this point the control proxy finished its own initialization, allow
232+
# requests targeting the control proxy to flow even if sub proxies are
233+
# still spinning up (which can take a while when managing dozens of ids).
234+
setup_future = getattr(self, "_setup_future", None)
235+
self.ready = True
236+
if self.opts.get("return_retry_timer", 0) < 20:
237+
new_retry = 20
238+
self.opts["return_retry_timer"] = new_retry
239+
current_max = self.opts.get("return_retry_timer_max", new_retry)
240+
if current_max < new_retry:
241+
current_max = new_retry
242+
self.opts["return_retry_timer_max"] = max(current_max, new_retry + 10)
243+
log.debug(
244+
"Control proxy %s adjusted return retry timers to %s-%s seconds",
245+
self.opts["id"],
246+
self.opts["return_retry_timer"],
247+
self.opts["return_retry_timer_max"],
248+
)
249+
231250
proxy_init_func_name = f"{fq_proxyname}.init"
232251
proxy_shutdown_func_name = f"{fq_proxyname}.shutdown"
233252
if (
@@ -425,16 +444,22 @@ async def _sync_grains_task():
425444
if self.opts["proxy"].get("parallel_startup"):
426445
log.warning("Initiating parallel startup for proxies")
427446
waitfor = []
447+
max_concurrency = max(1, self.opts["proxy"].get("startup_concurrency", 4))
448+
semaphore = asyncio.Semaphore(max_concurrency)
449+
428450
for _id in self.opts["proxy"].get("ids", []):
429-
waitfor.append(
430-
subproxy_post_master_init(
431-
_id,
432-
uid,
433-
self.opts,
434-
self.proxy,
435-
self.utils,
436-
)
437-
)
451+
452+
async def _initialise_subproxy(sub_id, *, _sem=semaphore):
453+
async with _sem:
454+
return await subproxy_post_master_init(
455+
sub_id,
456+
uid,
457+
self.opts,
458+
self.proxy,
459+
self.utils,
460+
)
461+
462+
waitfor.append(_initialise_subproxy(_id))
438463

439464
try:
440465
results = await asyncio.gather(*waitfor)
@@ -504,28 +529,41 @@ async def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils
504529
proxy_grains = {}
505530
proxy_pillar = {}
506531

507-
proxyopts = opts.copy()
508-
proxyopts["id"] = minion_id
532+
loop = asyncio.get_running_loop()
509533

510-
proxyopts = salt.config.proxy_config(
511-
opts["conf_file"], defaults=proxyopts, minion_id=minion_id
512-
)
513-
proxyopts.update({"id": minion_id, "proxyid": minion_id, "subproxy": True})
534+
def _load_proxyopts_and_grains():
535+
proxyopts_local = opts.copy()
536+
proxyopts_local["id"] = minion_id
514537

515-
proxy_context = {"proxy_id": minion_id}
538+
proxyopts_local = salt.config.proxy_config(
539+
opts["conf_file"], defaults=proxyopts_local, minion_id=minion_id
540+
)
541+
proxyopts_local.update(
542+
{"id": minion_id, "proxyid": minion_id, "subproxy": True}
543+
)
516544

517-
# We need grains first to be able to load pillar, which is where we keep the proxy
518-
# configurations
519-
proxy_grains = salt.loader.grains(
520-
proxyopts, proxy=main_proxy, context=proxy_context
545+
proxy_context_local = {"proxy_id": minion_id}
546+
547+
proxy_grains_local = salt.loader.grains(
548+
proxyopts_local, proxy=main_proxy, context=proxy_context_local
549+
)
550+
return proxyopts_local, proxy_context_local, proxy_grains_local
551+
552+
proxyopts, proxy_context, proxy_grains = await loop.run_in_executor(
553+
None, _load_proxyopts_and_grains
521554
)
522-
proxy_pillar = await salt.pillar.get_async_pillar(
523-
proxyopts,
524-
proxy_grains,
525-
minion_id,
526-
saltenv=proxyopts["saltenv"],
527-
pillarenv=proxyopts.get("pillarenv"),
528-
).compile_pillar()
555+
556+
def _compile_proxy_pillar():
557+
pillar = salt.pillar.get_pillar(
558+
proxyopts,
559+
proxy_grains,
560+
minion_id,
561+
saltenv=proxyopts["saltenv"],
562+
pillarenv=proxyopts.get("pillarenv"),
563+
)
564+
return pillar.compile_pillar()
565+
566+
proxy_pillar = await loop.run_in_executor(None, _compile_proxy_pillar)
529567

530568
proxyopts["proxy"] = proxy_pillar.get("proxy", {})
531569
if not proxyopts["proxy"]:
@@ -534,8 +572,6 @@ async def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils
534572
)
535573
return {"proxy_minion": None, "proxy_opts": {}}
536574

537-
loop = asyncio.get_running_loop()
538-
539575
def _finish_subproxy_setup():
540576
proxyopts["proxy"].pop("ids", None)
541577

@@ -1049,12 +1085,18 @@ async def handle_payload(self, payload):
10491085
Verify the publication and then pass
10501086
the payload along to _handle_decoded_payload.
10511087
"""
1088+
log.debug(
1089+
"Control proxy %s received payload enc=%s keys=%s",
1090+
self.opts["id"],
1091+
payload.get("enc") if payload else None,
1092+
sorted(payload["load"].keys()) if payload and "load" in payload else None,
1093+
)
10521094
setup_future = getattr(self, "_setup_future", None)
10531095
if setup_future is not None and not setup_future.done():
1054-
log.warning("Control proxy waiting for setup to finish before handling payload")
1096+
log.debug("Control proxy waiting for setup to finish before handling payload")
10551097
await asyncio.shield(setup_future)
10561098
else:
1057-
log.warning("Control proxy setup already complete; handling payload")
1099+
log.debug("Control proxy setup already complete; handling payload")
10581100
if payload is not None and payload["enc"] == "aes":
10591101
# First handle payload for the "control" proxy
10601102
if self._target_load(payload["load"]):

0 commit comments

Comments
 (0)