Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ To rebalance **all** topics in a cluster, use the `rebalance` subcommand, which
function on all qualifying topics. It will inventory all topic configs found at `--path-prefix` for a cluster
specified by `--cluster-config`.

To rebalance topics in a cluster that exist without topic configuration files, use the `rebalance` subcommand with the `--bootstrap-missing-configs` flag. This will temporarily bootstrap any missing topic configs at `--path-prefix`. This can also be used to use topicctl as a topic rebalancing tool, without using its topic configuration management features

This subcommand will not rebalance a topic if:

1. the topic config is inconsistent with the cluster config (name, region, environment etc...)
Expand Down
149 changes: 107 additions & 42 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type rebalanceCmdConfig struct {
brokersToRemove []int
brokerThrottleMBsOverride int
dryRun bool
bootstrapMissingConfigs bool
partitionBatchSizeOverride int
pathPrefix string
sleepLoopDuration time.Duration
Expand Down Expand Up @@ -85,6 +86,12 @@ func init() {
0*time.Second,
"Interval of time to show progress during rebalance",
)
rebalanceCmd.Flags().BoolVar(
&rebalanceConfig.bootstrapMissingConfigs,
"bootstrap-missing-configs",
false,
"Bootstrap temporary topic config(s) for the rebalance of configless topic(s)",
)

addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared)
RootCmd.AddCommand(rebalanceCmd)
Expand Down Expand Up @@ -151,55 +158,80 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
return err
}

// iterate through each topic config and initiate rebalance
topicConfigs := []config.TopicConfig{}
topicErrorDict := make(map[string]error)
for _, topicFile := range topicFiles {
// do not consider invalid topic yaml files for rebalance
topicConfigs, err = config.LoadTopicsFile(topicFile)
existingConfigFiles := make(map[string]struct{})
if rebalanceConfig.bootstrapMissingConfigs {
// make set of existing files
err := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error {
_, topicFilename := filepath.Split(topicFile)
existingConfigFiles[topicFilename] = struct{}{}
return nil
})
if err != nil {
log.Errorf("Invalid topic yaml file: %s", topicFile)
continue
return err
}

for _, topicConfig := range topicConfigs {
// topic config should be consistent with the cluster config
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
continue
}
// bootstrap missing config files
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false)
cliRunner.BootstrapTopics(
ctx,
[]string{},
clusterConfig,
".*",
".^",
rebalanceConfig.pathPrefix,
false,
false,
)

log.Infof(
"Rebalancing topic %s from config file %s with cluster config %s",
topicConfig.Meta.Name,
topicFile,
clusterConfigPath,
)

topicErrorDict[topicConfig.Meta.Name] = nil
rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{
TopicName: topicConfig.Meta.Name,
ClusterName: clusterConfig.Meta.Name,
ClusterEnvironment: clusterConfig.Meta.Environment,
ToRemove: rebalanceConfig.brokersToRemove,
RebalanceError: false,
}
if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil {
topicErrorDict[topicConfig.Meta.Name] = err
rebalanceTopicProgressConfig.RebalanceError = true
log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err)
}
// re-inventory topic configs to take into account bootstrapped ones
topicFiles, err = getAllFiles(topicConfigDir)
if err != nil {
return err
}
}

// iterate through each topic config and initiate rebalance
topicErrorDict := make(map[string]error)
processed := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error {
// topic config should be consistent with the cluster config
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
return nil
}

log.Infof(
"Rebalancing topic %s from config file %s with cluster config %s",
topicConfig.Meta.Name,
topicFile,
clusterConfigPath,
)
topicErrorDict[topicConfig.Meta.Name] = nil
rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{
TopicName: topicConfig.Meta.Name,
ClusterName: clusterConfig.Meta.Name,
ClusterEnvironment: clusterConfig.Meta.Environment,
ToRemove: rebalanceConfig.brokersToRemove,
RebalanceError: false,
}
if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil {
topicErrorDict[topicConfig.Meta.Name] = err
rebalanceTopicProgressConfig.RebalanceError = true
log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err)
}

// show topic final progress
if rebalanceCtxStruct.Enabled {
progressStr, err := util.StructToStr(rebalanceTopicProgressConfig)
if err != nil {
log.Errorf("progress struct to string error: %+v", err)
} else {
log.Infof("Rebalance Progress: %s", progressStr)
}
// show topic final progress
if rebalanceCtxStruct.Enabled {
progressStr, err := util.StructToStr(rebalanceTopicProgressConfig)
if err != nil {
log.Errorf("progress struct to string error: %+v", err)
} else {
log.Infof("Rebalance Progress: %s", progressStr)
}
}
return nil
})
if processed != nil {
return processed
}

// audit at the end of all topic rebalances
Expand Down Expand Up @@ -231,6 +263,20 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
}
}

// clean up any bootstrapped topic configs
if rebalanceConfig.bootstrapMissingConfigs {
for _, topicFile := range topicFiles {
_, topicFilename := filepath.Split(topicFile)
if _, found := existingConfigFiles[topicFilename]; found {
continue
}
err := os.Remove(topicFile)
if err != nil {
log.Errorf("error deleting temporary file %s: %v", topicFile, err)
}
}
}

log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics)
return nil
}
Expand Down Expand Up @@ -366,3 +412,22 @@ func getAllFiles(dir string) ([]string, error) {

return files, err
}

func processTopicFiles(topicFiles []string, operation func(topicConfig config.TopicConfig, topicFile string) error) error {
for _, topicFile := range topicFiles {
// do not consider invalid topic yaml files for rebalance
topicConfigs, err := config.LoadTopicsFile(topicFile)
if err != nil {
log.Errorf("Invalid topic yaml file: %s", topicFile)
continue
}

for _, topicConfig := range topicConfigs {
err := operation(topicConfig, topicFile)
if err != nil {
return fmt.Errorf("error during operation on config %d (%s): %w", 0, topicConfig.Meta.Name, err)
}
}
}
return nil
}