diff --git a/README.md b/README.md index b109d416..0dcd7731 100644 --- a/README.md +++ b/README.md @@ -457,6 +457,8 @@ To rebalance **all** topics in a cluster, use the `rebalance` subcommand, which function on all qualifying topics. It will inventory all topic configs found at `--path-prefix` for a cluster specified by `--cluster-config`. +To rebalance topics in a cluster that exist without topic configuration files, use the `rebalance` subcommand with the `--bootstrap-missing-configs` flag. This will temporarily bootstrap any missing topic configs at `--path-prefix`. This can also be used to use topicctl as a topic rebalancing tool, without using its topic configuration management features + This subcommand will not rebalance a topic if: 1. the topic config is inconsistent with the cluster config (name, region, environment etc...) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 2654c0d1..c704d575 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -32,6 +32,7 @@ type rebalanceCmdConfig struct { brokersToRemove []int brokerThrottleMBsOverride int dryRun bool + bootstrapMissingConfigs bool partitionBatchSizeOverride int pathPrefix string sleepLoopDuration time.Duration @@ -85,6 +86,12 @@ func init() { 0*time.Second, "Interval of time to show progress during rebalance", ) + rebalanceCmd.Flags().BoolVar( + &rebalanceConfig.bootstrapMissingConfigs, + "bootstrap-missing-configs", + false, + "Bootstrap temporary topic config(s) for the rebalance of configless topic(s)", + ) addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared) RootCmd.AddCommand(rebalanceCmd) @@ -151,55 +158,80 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { return err } - // iterate through each topic config and initiate rebalance - topicConfigs := []config.TopicConfig{} - topicErrorDict := make(map[string]error) - for _, topicFile := range topicFiles { - // do not consider invalid topic yaml files for rebalance - topicConfigs, err = config.LoadTopicsFile(topicFile) + existingConfigFiles := make(map[string]struct{}) + if rebalanceConfig.bootstrapMissingConfigs { + // make set of existing files + err := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error { + _, topicFilename := filepath.Split(topicFile) + existingConfigFiles[topicFilename] = struct{}{} + return nil + }) if err != nil { - log.Errorf("Invalid topic yaml file: %s", topicFile) - continue + return err } - for _, topicConfig := range topicConfigs { - // topic config should be consistent with the cluster config - if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil { - log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) - continue - } + // bootstrap missing config files + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) + cliRunner.BootstrapTopics( + ctx, + []string{}, + clusterConfig, + ".*", + ".^", + rebalanceConfig.pathPrefix, + false, + false, + ) - log.Infof( - "Rebalancing topic %s from config file %s with cluster config %s", - topicConfig.Meta.Name, - topicFile, - clusterConfigPath, - ) - - topicErrorDict[topicConfig.Meta.Name] = nil - rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{ - TopicName: topicConfig.Meta.Name, - ClusterName: clusterConfig.Meta.Name, - ClusterEnvironment: clusterConfig.Meta.Environment, - ToRemove: rebalanceConfig.brokersToRemove, - RebalanceError: false, - } - if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { - topicErrorDict[topicConfig.Meta.Name] = err - rebalanceTopicProgressConfig.RebalanceError = true - log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err) - } + // re-inventory topic configs to take into account bootstrapped ones + topicFiles, err = getAllFiles(topicConfigDir) + if err != nil { + return err + } + } + + // iterate through each topic config and initiate rebalance + topicErrorDict := make(map[string]error) + processed := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error { + // topic config should be consistent with the cluster config + if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil { + log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) + return nil + } + + log.Infof( + "Rebalancing topic %s from config file %s with cluster config %s", + topicConfig.Meta.Name, + topicFile, + clusterConfigPath, + ) + topicErrorDict[topicConfig.Meta.Name] = nil + rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{ + TopicName: topicConfig.Meta.Name, + ClusterName: clusterConfig.Meta.Name, + ClusterEnvironment: clusterConfig.Meta.Environment, + ToRemove: rebalanceConfig.brokersToRemove, + RebalanceError: false, + } + if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { + topicErrorDict[topicConfig.Meta.Name] = err + rebalanceTopicProgressConfig.RebalanceError = true + log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err) + } - // show topic final progress - if rebalanceCtxStruct.Enabled { - progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) - if err != nil { - log.Errorf("progress struct to string error: %+v", err) - } else { - log.Infof("Rebalance Progress: %s", progressStr) - } + // show topic final progress + if rebalanceCtxStruct.Enabled { + progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) + if err != nil { + log.Errorf("progress struct to string error: %+v", err) + } else { + log.Infof("Rebalance Progress: %s", progressStr) } } + return nil + }) + if processed != nil { + return processed } // audit at the end of all topic rebalances @@ -231,6 +263,20 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } } + // clean up any bootstrapped topic configs + if rebalanceConfig.bootstrapMissingConfigs { + for _, topicFile := range topicFiles { + _, topicFilename := filepath.Split(topicFile) + if _, found := existingConfigFiles[topicFilename]; found { + continue + } + err := os.Remove(topicFile) + if err != nil { + log.Errorf("error deleting temporary file %s: %v", topicFile, err) + } + } + } + log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) return nil } @@ -366,3 +412,22 @@ func getAllFiles(dir string) ([]string, error) { return files, err } + +func processTopicFiles(topicFiles []string, operation func(topicConfig config.TopicConfig, topicFile string) error) error { + for _, topicFile := range topicFiles { + // do not consider invalid topic yaml files for rebalance + topicConfigs, err := config.LoadTopicsFile(topicFile) + if err != nil { + log.Errorf("Invalid topic yaml file: %s", topicFile) + continue + } + + for _, topicConfig := range topicConfigs { + err := operation(topicConfig, topicFile) + if err != nil { + return fmt.Errorf("error during operation on config %d (%s): %w", 0, topicConfig.Meta.Name, err) + } + } + } + return nil +}