11package io .neonbee ;
22
3+ import static io .neonbee .config .NeonBeeConfig .BootDeploymentHandling .FAIL_ON_ERROR ;
4+ import static io .neonbee .config .NeonBeeConfig .BootDeploymentHandling .KEEP_PARTIAL ;
35import static io .neonbee .internal .deploy .DeployableModule .fromJar ;
46import static io .neonbee .internal .deploy .DeployableVerticle .fromClass ;
57import static io .neonbee .internal .deploy .DeployableVerticle .fromVerticle ;
4446import io .neonbee .cluster .ClusterManagerFactory ;
4547import io .neonbee .config .HealthConfig ;
4648import io .neonbee .config .NeonBeeConfig ;
49+ import io .neonbee .config .NeonBeeConfig .BootDeploymentHandling ;
4750import io .neonbee .config .ServerConfig ;
4851import io .neonbee .data .DataException ;
4952import io .neonbee .data .DataQuery ;
102105import io .vertx .core .eventbus .EventBusOptions ;
103106import io .vertx .core .eventbus .MessageCodec ;
104107import io .vertx .core .impl .ConcurrentHashSet ;
108+ import io .vertx .core .impl .VertxInternal ;
105109import io .vertx .core .json .Json ;
106110import io .vertx .core .json .JsonObject ;
107111import io .vertx .core .net .PfxOptions ;
@@ -566,21 +570,25 @@ private Future<Void> deploySystemVerticles() {
566570 List <Future <? extends Deployable >> requiredVerticles = new ArrayList <>();
567571 requiredVerticles .add (fromClass (vertx , ConsolidationVerticle .class , new JsonObject ().put ("instances" , 1 )));
568572 requiredVerticles .add (fromClass (vertx , LoggerManagerVerticle .class ));
569-
570- List <Future <Optional <? extends Deployable >>> optionalVerticles = new ArrayList <>();
571573 if (Optional .ofNullable (config .getHealthConfig ()).map (HealthConfig ::isEnabled ).orElse (true )) {
572574 requiredVerticles .add (fromClass (vertx , HealthCheckVerticle .class ));
573575 }
576+
577+ List <Future <Optional <? extends Deployable >>> optionalVerticles = new ArrayList <>();
574578 optionalVerticles .add (deployableWatchVerticle (options .getModelsDirectory (), ModelRefreshVerticle ::new ));
575579 optionalVerticles .add (deployableWatchVerticle (options .getModulesDirectory (), DeployerVerticle ::new ));
576580 optionalVerticles .add (deployableRedeployEntitiesJobVerticle (options ));
577581
578582 LOGGER .info ("Deploying system verticles ..." );
579- return all (List .of (fromDeployables (requiredVerticles ).compose (allTo (this )),
580- all (optionalVerticles ).map (CompositeFuture ::list ).map (optionals -> {
581- return optionals .stream ().map (Optional .class ::cast ).filter (Optional ::isPresent ).map (Optional ::get )
582- .map (Deployable .class ::cast ).toList ();
583- }).map (Deployables ::new ).compose (anyTo (this )))).mapEmpty ();
583+ return all (fromDeployables (requiredVerticles ).compose (allTo (this )).onFailure (throwable -> {
584+ LOGGER .error ("Failed to deploy (some / all) required system verticle(s)" , throwable );
585+ }), all (optionalVerticles ).map (CompositeFuture ::list ).map (optionals -> {
586+ return optionals .stream ().map (Optional .class ::cast ).filter (Optional ::isPresent ).map (Optional ::get )
587+ .map (Deployable .class ::cast ).toList ();
588+ }).map (Deployables ::new ).compose (anyTo (this )).onFailure (throwable -> {
589+ LOGGER .error ("Failed to deploy (some / all) optional system verticle(s), bootstrap will continue" ,
590+ throwable );
591+ }).otherwiseEmpty ()).mapEmpty ();
584592 }
585593
586594 private Future <Optional <? extends Deployable >> deployableWatchVerticle (
@@ -621,7 +629,9 @@ private Future<Optional<? extends Deployable>> deployableRedeployEntitiesJobVert
621629 private Future <Void > deployServerVerticle () {
622630 LOGGER .info ("Deploying server verticle ..." );
623631 return fromClass (vertx , ServerVerticle .class , new JsonObject ().put ("instances" , NUMBER_DEFAULT_INSTANCES ))
624- .compose (deployable -> deployable .deploy (this )).mapEmpty ();
632+ .compose (deployable -> deployable .deploy (this )).onFailure (throwable -> {
633+ LOGGER .error ("Failed to deploy server verticle" , throwable );
634+ }).mapEmpty ();
625635 }
626636
627637 /**
@@ -638,11 +648,7 @@ private Future<Void> deployClassPathVerticles() {
638648 return scanForDeployableClasses (vertx ).compose (deployableClasses -> fromDeployables (deployableClasses .stream ()
639649 .filter (verticleClass -> filterByAutoDeployAndProfiles (verticleClass , options .getActiveProfiles ()))
640650 .map (verticleClass -> fromClass (vertx , verticleClass )).collect (Collectors .toList ())))
641- .onSuccess (deployables -> {
642- if (LOGGER .isInfoEnabled ()) {
643- LOGGER .info ("Deploy class path verticle(s) {}." , deployables .getIdentifier ());
644- }
645- }).compose (allTo (this )).mapEmpty ();
651+ .compose (handleBootDeployment ("class path verticle(s)" ));
646652 }
647653
648654 @ VisibleForTesting
@@ -665,7 +671,39 @@ private Future<Void> deployModules() {
665671
666672 LOGGER .info ("Deploying module(s) ..." );
667673 return fromDeployables (moduleJarPaths .stream ().map (moduleJarPath -> fromJar (vertx , moduleJarPath ))
668- .collect (Collectors .toList ())).compose (allTo (this )).mapEmpty ();
674+ .collect (Collectors .toList ())).compose (handleBootDeployment ("module(s)" ));
675+ }
676+
677+ private Function <Deployables , Future <Void >> handleBootDeployment (String deploymentType ) {
678+ BootDeploymentHandling handling = config .getBootDeploymentHandling ();
679+ return deployables -> {
680+ // in case we should keep partial deployments, for every deployable that we are about to deploy
681+ // set the keep partial deployment flag, so that in case there is an error we don't undeploy
682+ if (handling == KEEP_PARTIAL ) {
683+ for (Deployable deployable : deployables .getDeployables ()) {
684+ if (deployable instanceof Deployables ) {
685+ ((Deployables ) deployable ).keepPartialDeployment ();
686+ }
687+ }
688+ }
689+
690+ return (handling == FAIL_ON_ERROR ? allTo (this ) : anyTo (this )).apply (deployables )
691+ .onSuccess (deployments -> {
692+ if (LOGGER .isInfoEnabled ()) {
693+ LOGGER .info ("Successfully deployed all {} {}" ,
694+ deploymentType , deployments .getDeploymentId ());
695+ }
696+ }).recover (throwable -> {
697+ if (LOGGER .isErrorEnabled ()) {
698+ LOGGER .error ("Failed to deploy (some / all) {}{}" ,
699+ deploymentType , handling == FAIL_ON_ERROR ? "" : ", bootstrap will continue" ,
700+ throwable );
701+ }
702+
703+ // abort the boot process if any class path verticle failed to deploy
704+ return handling == FAIL_ON_ERROR ? failedFuture (throwable ) : succeededFuture ();
705+ }).mapEmpty ();
706+ };
669707 }
670708
671709 @ VisibleForTesting
@@ -721,7 +759,7 @@ private void registerCloseHandler(Vertx vertx) {
721759 try {
722760 // unfortunately the addCloseHook method is public, but hidden in VertxImpl. As we need to know when the
723761 // instance shuts down, register a close hook using reflections (might fail due to a SecurityManager)
724- vertx . getClass () .getMethod ("addCloseHook" , Closeable .class ).invoke (vertx , (Closeable ) completion -> {
762+ VertxInternal . class .getMethod ("addCloseHook" , Closeable .class ).invoke (vertx , (Closeable ) completion -> {
725763 /*
726764 * Called when Vert.x instance is closed, perform shut-down operations here
727765 */
0 commit comments