From 64bf9c346e4417fc0689f8107bb9c45591f91b02 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Fri, 30 May 2025 09:12:04 +0800 Subject: [PATCH 1/2] [#1442] Support rocketmq message gray --- pom.xml | 1 + spring-cloud-huawei-bom/pom.xml | 5 + spring-cloud-huawei-dependencies/pom.xml | 17 ++ spring-cloud-huawei-rocketmq/pom.xml | 59 ++++ .../ConditionalOnRocketMqMsgGrayEnabled.java | 31 ++ .../RocketMqMessageGrayConfiguration.java | 86 ++++++ .../grayscale/RocketMqMessageGrayUtils.java | 132 +++++++++ ...cketMqMessageGrayWebFluxConfiguration.java | 36 +++ .../grayscale/config/BaseProperties.java | 58 ++++ .../grayscale/config/ConsumeModeEnum.java | 30 ++ .../config/ConsumerClientConfig.java | 69 +++++ .../grayscale/config/GrayscaleProperties.java | 77 +++++ .../config/MessageGrayPropertiesManager.java | 82 ++++++ .../config/RocketMqMessageGrayProperties.java | 73 +++++ .../GrayMessageListenerConcurrently.java | 50 ++++ .../client/MessageGrayDefaultMQProducer.java | 69 +++++ .../MessageGrayDefaultMQPushConsumer.java | 123 ++++++++ .../filter/WebFluxGrayHeaderFilter.java | 62 ++++ .../filter/WebMvcGrayHeaderFilter.java | 54 ++++ .../holder/RequestGrayHeaderHolder.java | 36 +++ .../holder/RocketMqGraySendMessageHook.java | 90 ++++++ .../ConsumerGroupAutoCheckManager.java | 213 ++++++++++++++ .../manager/RocketMqConsumerImplManager.java | 83 ++++++ .../RocketMqSubscriptionDataManager.java | 275 ++++++++++++++++++ .../servicemeta/NacosServiceMeta.java | 33 +++ .../servicemeta/ServiceMetaManager.java | 43 +++ .../servicemeta/ServicecombServiceMeta.java | 48 +++ .../RocketMqListenerContainerAspect.java | 79 +++++ .../RocketMqMessageListenerAspect.java | 44 +++ .../RocketMqSendMessageHookManager.java | 39 +++ ...ot.autoconfigure.AutoConfiguration.imports | 20 ++ spring-cloud-starter-huawei/pom.xml | 1 + .../pom.xml | 38 +++ 33 files changed, 2156 insertions(+) create mode 100644 spring-cloud-huawei-rocketmq/pom.xml create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/ConditionalOnRocketMqMsgGrayEnabled.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/BaseProperties.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumeModeEnum.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumerClientConfig.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/MessageGrayPropertiesManager.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/GrayMessageListenerConcurrently.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQProducer.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQPushConsumer.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RocketMqGraySendMessageHook.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/NacosServiceMeta.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServicecombServiceMeta.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqListenerContainerAspect.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqMessageListenerAspect.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqSendMessageHookManager.java create mode 100644 spring-cloud-huawei-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 spring-cloud-starter-huawei/spring-cloud-starter-huawei-rocketmq/pom.xml diff --git a/pom.xml b/pom.xml index 844c95056..845a1cbb2 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,7 @@ spring-cloud-huawei-mesh spring-cloud-huawei-nacos spring-cloud-starter-huawei + spring-cloud-huawei-rocketmq diff --git a/spring-cloud-huawei-bom/pom.xml b/spring-cloud-huawei-bom/pom.xml index 496a962d8..36c22caa1 100644 --- a/spring-cloud-huawei-bom/pom.xml +++ b/spring-cloud-huawei-bom/pom.xml @@ -182,6 +182,11 @@ spring-cloud-huawei-mesh ${project.version} + + com.huaweicloud + spring-cloud-huawei-rocketmq + ${project.version} + \ No newline at end of file diff --git a/spring-cloud-huawei-dependencies/pom.xml b/spring-cloud-huawei-dependencies/pom.xml index 2b58398a5..56473030d 100644 --- a/spring-cloud-huawei-dependencies/pom.xml +++ b/spring-cloud-huawei-dependencies/pom.xml @@ -48,6 +48,8 @@ 2.7.0 2.3.0 1.0.11 + 5.3.1 + 2.3.3 @@ -175,6 +177,21 @@ spring-context-support ${alibaba.context.support.version} + + org.apache.rocketmq + rocketmq-common + ${rocketmq.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + org.apache.rocketmq + rocketmq-spring-boot + ${rocketmq.boot.version} + \ No newline at end of file diff --git a/spring-cloud-huawei-rocketmq/pom.xml b/spring-cloud-huawei-rocketmq/pom.xml new file mode 100644 index 000000000..089333ee5 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/pom.xml @@ -0,0 +1,59 @@ + + + + + spring-cloud-huawei-parents + com.huaweicloud + 1.12.0-2024.0.x-SNAPSHOT + ../spring-cloud-huawei-parents + + 4.0.0 + + spring-cloud-huawei-rocketmq + Spring Cloud Huawei::Rocketmq + + + + com.huaweicloud + spring-cloud-huawei-common + + + org.springframework + spring-web + provided + + + org.apache.rocketmq + rocketmq-common + provided + + + org.apache.rocketmq + rocketmq-client + provided + + + org.apache.rocketmq + rocketmq-spring-boot + provided + + + + \ No newline at end of file diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/ConditionalOnRocketMqMsgGrayEnabled.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/ConditionalOnRocketMqMsgGrayEnabled.java new file mode 100644 index 000000000..0b427aa60 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/ConditionalOnRocketMqMsgGrayEnabled.java @@ -0,0 +1,31 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +@ConditionalOnProperty(value = "rocketmq.gray.enabled", havingValue = "true") +public @interface ConditionalOnRocketMqMsgGrayEnabled { +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java new file mode 100644 index 000000000..8f135d5d8 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java @@ -0,0 +1,86 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale; + +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; +import org.springframework.core.env.Environment; + +import com.huaweicloud.rocketmq.grayscale.config.MessageGrayPropertiesManager; +import com.huaweicloud.rocketmq.grayscale.servicemeta.ServiceMetaManager; +import com.huaweicloud.rocketmq.grayscale.springboot.RocketMqMessageListenerAspect; +import com.huaweicloud.rocketmq.grayscale.springboot.RocketMqListenerContainerAspect; +import com.huaweicloud.rocketmq.grayscale.filter.WebMvcGrayHeaderFilter; +import com.huaweicloud.rocketmq.grayscale.servicemeta.NacosServiceMeta; +import com.huaweicloud.rocketmq.grayscale.servicemeta.ServicecombServiceMeta; +import com.huaweicloud.rocketmq.grayscale.springboot.RocketMqSendMessageHookManager; + +@Configuration +@ConditionalOnRocketMqMsgGrayEnabled +@AutoConfigureAfter(MessageGrayPropertiesManager.class) +public class RocketMqMessageGrayConfiguration { + @Bean + @ConfigurationProperties("spring.cloud.nacos.discovery") + public NacosServiceMeta serviceMetaInfo() { + return new NacosServiceMeta(); + } + + @Bean + @ConfigurationProperties("spring.cloud.servicecomb.instance") + public ServicecombServiceMeta servicecombServiceMeta() { + return new ServicecombServiceMeta(); + } + + @Bean + public MessageGrayPropertiesManager messageGrayPropertiesManager(Environment env) { + return new MessageGrayPropertiesManager(env); + } + + @Bean + public ServiceMetaManager serviceMetaManager(NacosServiceMeta serviceMeta, ServicecombServiceMeta servicecombServiceMeta) { + return new ServiceMetaManager(serviceMeta, servicecombServiceMeta); + } + + @Bean + @ConditionalOnClass(name = "org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer") + @DependsOn(value = {"messageGrayPropertiesManager", "serviceMetaManager"}) + public RocketMqListenerContainerAspect rocketMQListenerContainerAspect() { + return new RocketMqListenerContainerAspect(); + } + + @Bean + @ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQListener") + public RocketMqMessageListenerAspect RocketMqListenerAspect() { + return new RocketMqMessageListenerAspect(); + } + + @Bean + public RocketMqSendMessageHookManager rocketMqMessageHookManager(ApplicationContext applicationContext) { + return new RocketMqSendMessageHookManager(applicationContext); + } + + @Bean + public WebMvcGrayHeaderFilter webMvcGrayHeaderFilter() { + return new WebMvcGrayHeaderFilter(); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java new file mode 100644 index 000000000..73dd155af --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java @@ -0,0 +1,132 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.springframework.util.CollectionUtils; + +import com.huaweicloud.common.context.InvocationContext; +import com.huaweicloud.common.context.InvocationContextHolder; +import com.huaweicloud.rocketmq.grayscale.config.ConsumeModeEnum; +import com.huaweicloud.rocketmq.grayscale.config.GrayscaleProperties; +import com.huaweicloud.rocketmq.grayscale.config.RocketMqMessageGrayProperties; + +public class RocketMqMessageGrayUtils { + private static final Map MICRO_SERVICE_PROPERTIES = new HashMap<>(); + + private static RocketMqMessageGrayProperties messageGrayProperties = new RocketMqMessageGrayProperties(); + + public static void setServiceMetaData(Map serviceMeta) { + MICRO_SERVICE_PROPERTIES.putAll(serviceMeta); + } + + public static void setMessageGrayProperties(RocketMqMessageGrayProperties grayProperties) { + messageGrayProperties = grayProperties; + } + + public static String getGrayGroupTagsByServiceMeta() { + if (MICRO_SERVICE_PROPERTIES.isEmpty()) { + return ""; + } + for (GrayscaleProperties item : messageGrayProperties.getGrayscale()) { + Map ruleServiceMeta = item.getServiceMeta(); + for (Map.Entry entry : MICRO_SERVICE_PROPERTIES.entrySet()) { + if (ruleServiceMeta.containsKey(entry.getKey()) + && StringUtils.equals(ruleServiceMeta.get(entry.getKey()), entry.getValue())) { + return item.getConsumerGroupTag(); + } + } + } + return ""; + } + + public static RocketMqMessageGrayProperties getMessageGrayProperties() { + return messageGrayProperties; + } + + public static long getAutoCheckDelayTime() { + return messageGrayProperties.getBase().getAutoCheckDelayTime(); + } + + public static ConsumeModeEnum getConsumeMode() { + return messageGrayProperties.getBase().getConsumeMode(); + } + + public static List getExcludeGroupTags() { + return messageGrayProperties.getBase().getExcludeGroupTags(); + } + + public static Map getGrayTagsByServiceMeta() { + return messageGrayProperties.getGrayTagsByServiceMeta(MICRO_SERVICE_PROPERTIES); + } + + public static Map getGrayTagsByGrayHeaders(Map trafficGrayHeaders) { + return messageGrayProperties.getGrayTagsByGrayHeaders(trafficGrayHeaders); + } + + public static Map> getAllTrafficTagMap() { + Map> trafficTags = new HashMap<>(); + for (GrayscaleProperties grayscale : messageGrayProperties.getGrayscale()) { + for (String key : grayscale.getTrafficTag().keySet()) { + if (trafficTags.get(key) != null) { + trafficTags.get(key).add(grayscale.getTrafficTag().get(key)); + } else { + HashSet values = new HashSet<>(); + values.add(grayscale.getTrafficTag().get(key)); + trafficTags.put(key, values); + } + } + } + return trafficTags; + } + + public static String getGrayConsumerGroup(String consumerGroup) { + String grayGroupTag = getGrayGroupTagsByServiceMeta(); + if (StringUtils.isEmpty(grayGroupTag)) { + return consumerGroup; + } + String grayGroupSuffix = "_" + grayGroupTag; + if (consumerGroup.endsWith(grayGroupSuffix)) { + return consumerGroup; + } + return consumerGroup + grayGroupSuffix; + } + + public static String buildCacheKey(String address, String topic, String consumerGroup) { + return address + "@" + topic + "@" + consumerGroup; + } + + public static void setInvocationContext(Map properties) { + Map> trafficTagMap = getAllTrafficTagMap(); + if (CollectionUtils.isEmpty(properties) || CollectionUtils.isEmpty(trafficTagMap)) { + return; + } + InvocationContext invocationContext = InvocationContextHolder.getOrCreateInvocationContext(); + for (String key : properties.keySet()) { + if (trafficTagMap.get(key) != null && properties.get(key) != null + && trafficTagMap.get(key).contains(properties.get(key))) { + invocationContext.putContext(key, properties.get(key)); + } + } + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java new file mode 100644 index 000000000..15d40492d --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java @@ -0,0 +1,36 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication.Type; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.huaweicloud.rocketmq.grayscale.ConditionalOnRocketMqMsgGrayEnabled; +import com.huaweicloud.rocketmq.grayscale.filter.WebFluxGrayHeaderFilter; + +@Configuration +@ConditionalOnRocketMqMsgGrayEnabled +@ConditionalOnWebApplication(type = Type.REACTIVE) +public class RocketMqMessageGrayWebFluxConfiguration { + @Bean + public WebFluxGrayHeaderFilter webFluxGrayHeaderFilter() { + return new WebFluxGrayHeaderFilter(); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/BaseProperties.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/BaseProperties.java new file mode 100644 index 000000000..341033c96 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/BaseProperties.java @@ -0,0 +1,58 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.config; + +import java.util.ArrayList; +import java.util.List; + +public class BaseProperties { + /** + * default value + */ + private static final long DEFAULT = 15L; + + private ConsumeModeEnum consumeMode = ConsumeModeEnum.AUTO; + + private long autoCheckDelayTime = DEFAULT; + + private List excludeGroupTags = new ArrayList<>(); + + public ConsumeModeEnum getConsumeMode() { + return consumeMode; + } + + public void setConsumeMode(ConsumeModeEnum consumeMode) { + this.consumeMode = consumeMode; + } + + public long getAutoCheckDelayTime() { + return autoCheckDelayTime; + } + + public void setAutoCheckDelayTime(long autoCheckDelayTime) { + this.autoCheckDelayTime = autoCheckDelayTime; + } + + public List getExcludeGroupTags() { + return excludeGroupTags; + } + + public void setExcludeGroupTags(List excludeGroupTags) { + this.excludeGroupTags = excludeGroupTags; + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumeModeEnum.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumeModeEnum.java new file mode 100644 index 000000000..6aa5fbfcd --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumeModeEnum.java @@ -0,0 +1,30 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.config; + +public enum ConsumeModeEnum { + /** + * auto consume mode + */ + AUTO, + + /** + * base consume mode + */ + BASE +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumerClientConfig.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumerClientConfig.java new file mode 100644 index 000000000..46d8e61fb --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/ConsumerClientConfig.java @@ -0,0 +1,69 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.config; + +import org.apache.rocketmq.client.impl.factory.MQClientInstance; + +public class ConsumerClientConfig { + private String topic; + + private String address; + + private String consumerGroup; + + private MQClientInstance mqClientInstance; + + public ConsumerClientConfig(String topic, String address, String consumerGroup, MQClientInstance mqClientInstance) { + this.topic = topic; + this.address = address; + this.consumerGroup = consumerGroup; + this.mqClientInstance = mqClientInstance; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public MQClientInstance getMqClientInstance() { + return mqClientInstance; + } + + public void setMqClientInstance(MQClientInstance mqClientInstance) { + this.mqClientInstance = mqClientInstance; + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java new file mode 100644 index 000000000..7f8517542 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java @@ -0,0 +1,77 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.config; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.cache.Cache; + +public class GrayscaleProperties { + private String consumerGroupTag; + + private Map serviceMeta = new HashMap<>(); + + private Map trafficTag = new HashMap<>(); + + public String getConsumerGroupTag() { + return consumerGroupTag; + } + + public void setConsumerGroupTag(String consumerGroupTag) { + this.consumerGroupTag = consumerGroupTag; + } + + public Map getServiceMeta() { + return serviceMeta; + } + + public void setServiceMeta(Map serviceMeta) { + this.serviceMeta = serviceMeta; + } + + public Map getTrafficTag() { + return trafficTag; + } + + public void setTrafficTag(Map trafficTag) { + this.trafficTag = trafficTag; + } + + public boolean isServiceMetaMatch(Map microServiceMeta) { + for (Map.Entry entry : microServiceMeta.entrySet()) { + if (serviceMeta.containsKey(entry.getKey()) + && StringUtils.equals(serviceMeta.get(entry.getKey()), entry.getValue())) { + return true; + } + } + return false; + } + + public Map.Entry getTrafficTagByGrayHeaders(Map trafficGrayHeaders) { + for (Map.Entry entry : trafficGrayHeaders.entrySet()) { + if (trafficTag.containsKey(entry.getKey()) + && trafficTag.get(entry.getKey()).equals(entry.getValue())) { + return entry; + } + } + return null; + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/MessageGrayPropertiesManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/MessageGrayPropertiesManager.java new file mode 100644 index 000000000..0be647fba --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/MessageGrayPropertiesManager.java @@ -0,0 +1,82 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.governance.event.GovernanceConfigurationChangedEvent; +import org.apache.servicecomb.governance.event.GovernanceEventManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.env.Environment; +import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.representer.Representer; + +import com.google.common.eventbus.Subscribe; +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; +import com.huaweicloud.rocketmq.grayscale.manager.ConsumerGroupAutoCheckManager; +import com.huaweicloud.rocketmq.grayscale.manager.RocketMqConsumerImplManager; + +public class MessageGrayPropertiesManager { + public static final String MESSAGE_GRAY_PREFIX = "rocketmq.gray.config"; + + private static final Logger LOGGER = LoggerFactory.getLogger(MessageGrayPropertiesManager.class); + + private final Environment env; + + private final Yaml yaml; + + public MessageGrayPropertiesManager(Environment env) { + this.env = env; + GovernanceEventManager.register(this); + Representer representer = new Representer(new DumperOptions()); + representer.getPropertyUtils().setSkipMissingProperties(true); + yaml = new Yaml(representer); + initCacheMessageProperties(); + } + + private void initCacheMessageProperties() { + String properties = env.getProperty(MESSAGE_GRAY_PREFIX, ""); + if (StringUtils.isEmpty(properties)) { + return; + } + RocketMqMessageGrayProperties messageGrayProperties; + try { + messageGrayProperties = yaml.loadAs(properties, RocketMqMessageGrayProperties.class); + } catch (Exception e) { + LOGGER.error("Loaded message gray properties failed!", e); + return; + } + if (messageGrayProperties == null) { + LOGGER.warn("Loaded message gray properties is empty"); + return; + } + RocketMqMessageGrayUtils.setMessageGrayProperties(messageGrayProperties); + } + + @Subscribe + public void onConfigurationChangedEvent(GovernanceConfigurationChangedEvent event) { + for (String key : event.getChangedConfigurations()) { + if (key.startsWith(MESSAGE_GRAY_PREFIX)) { + initCacheMessageProperties(); + ConsumerGroupAutoCheckManager.checkAndStartAutoFindGrayGroup(); + RocketMqConsumerImplManager.updateConsumerSubscriptionData(); + } + } + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java new file mode 100644 index 000000000..c34b8898d --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java @@ -0,0 +1,73 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.config; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.cloud.context.config.annotation.RefreshScope; + +import com.huaweicloud.rocketmq.grayscale.config.BaseProperties; +import com.huaweicloud.rocketmq.grayscale.config.GrayscaleProperties; + +public class RocketMqMessageGrayProperties { + private List grayscale = new ArrayList<>(); + + private BaseProperties base = new BaseProperties(); + + public List getGrayscale() { + return grayscale; + } + + public void setGrayscale(List grayscale) { + this.grayscale = grayscale; + } + + public BaseProperties getBase() { + return base; + } + + public void setBase(BaseProperties base) { + this.base = base; + } + + public Map getGrayTagsByServiceMeta(Map microServiceMeta) { + Map map = new HashMap<>(); + for (GrayscaleProperties properties : grayscale) { + if (properties.isServiceMetaMatch(microServiceMeta) + && !properties.getTrafficTag().isEmpty()) { + // set item traffic tags when serviceMeta match, because all message tag using traffic tags. + map.putAll(properties.getTrafficTag()); + } + } + return map; + } + + public Map getGrayTagsByGrayHeaders(Map trafficGrayHeaders) { + Map map = new HashMap<>(); + for (GrayscaleProperties properties : grayscale) { + Map.Entry matchEntry = properties.getTrafficTagByGrayHeaders(trafficGrayHeaders); + if (matchEntry != null) { + map.put(matchEntry.getKey(), matchEntry.getValue()); + } + } + return map; + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/GrayMessageListenerConcurrently.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/GrayMessageListenerConcurrently.java new file mode 100644 index 000000000..85766147f --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/GrayMessageListenerConcurrently.java @@ -0,0 +1,50 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.extend; + +import java.util.List; + +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.util.CollectionUtils; + +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; + +public interface GrayMessageListenerConcurrently extends MessageListenerConcurrently { + /** + * origin rocketmq client listen message method, if microservice calls need routing by message properties, + * this method cannot be implemented. Instead, the 'consumeMessageExtend' method needs to be implemented. + * + * @param messages messages + * @param context current consumer context + * @return ConsumeConcurrentlyStatus + */ + @Override + default ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) { + if (!CollectionUtils.isEmpty(messages)) { + RocketMqMessageGrayUtils.setInvocationContext(messages.get(0).getProperties()); + } + return this.consumeMessageExtend(messages, context); + } + + default ConsumeConcurrentlyStatus consumeMessageExtend(List messages, ConsumeConcurrentlyContext context) { + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }; +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQProducer.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQProducer.java new file mode 100644 index 000000000..a50029ed9 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQProducer.java @@ -0,0 +1,69 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.extend.client; + +import java.util.List; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RPCHook; + +import com.huaweicloud.rocketmq.grayscale.holder.RocketMqGraySendMessageHook; + +public class MessageGrayDefaultMQProducer extends DefaultMQProducer { + public MessageGrayDefaultMQProducer() { + this(MixAll.DEFAULT_PRODUCER_GROUP); + } + + public MessageGrayDefaultMQProducer(RPCHook rpcHook) { + this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); + } + + public MessageGrayDefaultMQProducer(final String producerGroup) { + this(producerGroup, (RPCHook) null); + } + + public MessageGrayDefaultMQProducer(final String producerGroup, RPCHook rpcHook) { + this(producerGroup, rpcHook, null); + } + + public MessageGrayDefaultMQProducer(final String producerGroup, RPCHook rpcHook, + final List topics) { + this(producerGroup, rpcHook, topics, false, null); + } + + public MessageGrayDefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { + this(producerGroup, null, enableMsgTrace, customizedTraceTopic); + } + + public MessageGrayDefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, + final String customizedTraceTopic) { + this(producerGroup, rpcHook, null, enableMsgTrace, customizedTraceTopic); + } + + public MessageGrayDefaultMQProducer(final String producerGroup, RPCHook rpcHook, final List topics, + boolean enableMsgTrace, final String customizedTraceTopic) { + super(producerGroup, rpcHook, topics, enableMsgTrace, customizedTraceTopic); + setMessageSendHook(); + } + + @SuppressWarnings({"deprecation"}) + private void setMessageSendHook() { + getDefaultMQProducerImpl().registerSendMessageHook(new RocketMqGraySendMessageHook()); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQPushConsumer.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQPushConsumer.java new file mode 100644 index 000000000..504362f36 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/extend/client/MessageGrayDefaultMQPushConsumer.java @@ -0,0 +1,123 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.extend.client; + +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; + +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; +import com.huaweicloud.rocketmq.grayscale.config.ConsumerClientConfig; +import com.huaweicloud.rocketmq.grayscale.manager.ConsumerGroupAutoCheckManager; +import com.huaweicloud.rocketmq.grayscale.manager.RocketMqConsumerImplManager; +import com.huaweicloud.rocketmq.grayscale.manager.RocketMqSubscriptionDataManager; + +public class MessageGrayDefaultMQPushConsumer extends DefaultMQPushConsumer { + private String topic; + + public MessageGrayDefaultMQPushConsumer(final String consumerGroup) { + this(consumerGroup, null, new AllocateMessageQueueAveragely()); + } + + public MessageGrayDefaultMQPushConsumer(RPCHook rpcHook) { + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); + } + + public MessageGrayDefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) { + this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely()); + } + + public MessageGrayDefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, + final String customizedTraceTopic) { + this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); + } + + public MessageGrayDefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this(consumerGroup, rpcHook, allocateMessageQueueStrategy, false, null); + } + + private MessageGrayDefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, + final String customizedTraceTopic) { + super(consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic); + checkAndSetConsumerGroup(consumerGroup); + } + + private void checkAndSetConsumerGroup(String consumerGroup) { + String grayConsumerGroup = RocketMqMessageGrayUtils.getGrayConsumerGroup(consumerGroup); + if (StringUtils.equals(consumerGroup, grayConsumerGroup)) { + return; + } + setConsumerGroup(grayConsumerGroup); + } + + @Override + public void subscribe(String topic, String subExpression) throws MQClientException { + this.topic = topic; + super.subscribe(topic, subExpression); + } + + @Override + public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { + this.topic = topic; + super.subscribe(topic, messageSelector); + } + + @Override + public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { + this.topic = topic; + super.subscribe(topic, fullClassName, filterClassSource); + } + + @Override + public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { + this.topic = topic; + return super.fetchSubscribeMessageQueues(topic); + } + + @Override + public void start() throws MQClientException { + super.start(); + rebuildSubscriptionData(); + } + + @SuppressWarnings({"deprecation"}) + private void rebuildSubscriptionData() { + String address = getNamesrvAddr(); + String consumerGroup = getConsumerGroup(); + String consumeScope = RocketMqMessageGrayUtils.buildCacheKey(address, topic, consumerGroup); + DefaultMQPushConsumerImpl pushConsumerImpl = getDefaultMQPushConsumerImpl(); + RocketMqConsumerImplManager.getInstance().addPushConsumerImpls(consumeScope, pushConsumerImpl); + ConsumerClientConfig clientConfig = new ConsumerClientConfig(topic, address, consumerGroup, + pushConsumerImpl.getmQClientFactory()); + ConsumerGroupAutoCheckManager.addConsumerClientConfig(consumeScope, clientConfig); + ConsumerGroupAutoCheckManager.checkAndStartAutoFindGrayGroup(); + RocketMqSubscriptionDataManager.updateSubscriptionData(pushConsumerImpl.getSubscriptionInner(), topic, + consumeScope); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java new file mode 100644 index 000000000..38f844498 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java @@ -0,0 +1,62 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.filter; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.springframework.boot.web.reactive.filter.OrderedWebFilter; +import org.springframework.core.Ordered; +import org.springframework.http.HttpHeaders; +import org.springframework.util.CollectionUtils; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.WebFilterChain; + +import com.huaweicloud.rocketmq.grayscale.holder.RequestGrayHeaderHolder; +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; + +import reactor.core.publisher.Mono; + +public class WebFluxGrayHeaderFilter implements OrderedWebFilter { + @Override + public int getOrder() { + return Ordered.HIGHEST_PRECEDENCE; + } + + @Override + public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { + Map> trafficTags = RocketMqMessageGrayUtils.getAllTrafficTagMap(); + if (CollectionUtils.isEmpty(trafficTags) || exchange.getRequest().getHeaders().isEmpty()) { + return chain.filter(exchange); + } + Map matchHeaders = new HashMap<>(); + HttpHeaders requestHeaders = exchange.getRequest().getHeaders(); + for (String key : trafficTags.keySet()) { + String headerValue = requestHeaders.getFirst(key); + if (!StringUtils.isEmpty(headerValue) && trafficTags.get(key).contains(headerValue)) { + matchHeaders.put(key, headerValue); + } + } + if (!CollectionUtils.isEmpty(matchHeaders)) { + RequestGrayHeaderHolder.setRequestGrayHeader(matchHeaders); + } + return chain.filter(exchange); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java new file mode 100644 index 000000000..ff9ae88a3 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java @@ -0,0 +1,54 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.filter; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.springframework.util.CollectionUtils; + +import com.huaweicloud.rocketmq.grayscale.holder.RequestGrayHeaderHolder; +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; + +import jakarta.servlet.Filter; +import jakarta.servlet.FilterChain; +import jakarta.servlet.ServletException; +import jakarta.servlet.ServletRequest; +import jakarta.servlet.ServletResponse; +import jakarta.servlet.http.HttpServletRequest; + +public class WebMvcGrayHeaderFilter implements Filter { + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + HttpServletRequest servletRequest = (HttpServletRequest) request; + Map> trafficTags = RocketMqMessageGrayUtils.getAllTrafficTagMap(); + Map matchHeaders = new HashMap<>(); + for (String key : trafficTags.keySet()) { + if (servletRequest.getHeader(key) != null && trafficTags.get(key).contains(servletRequest.getHeader(key))) { + matchHeaders.put(key, servletRequest.getHeader(key)); + } + } + if (!CollectionUtils.isEmpty(matchHeaders)) { + RequestGrayHeaderHolder.setRequestGrayHeader(matchHeaders); + } + chain.doFilter(request, response); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java new file mode 100644 index 000000000..487ddaf7b --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java @@ -0,0 +1,36 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.holder; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +public final class RequestGrayHeaderHolder { + private static final ThreadLocal> INVOCATION_GRAY_HEADERS = new ThreadLocal<>(); + + public static void setRequestGrayHeader(Map headers) { + INVOCATION_GRAY_HEADERS.set(headers); + } + + public static Map getRequestGrayHeader() { + return INVOCATION_GRAY_HEADERS.get(); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RocketMqGraySendMessageHook.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RocketMqGraySendMessageHook.java new file mode 100644 index 000000000..578400946 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RocketMqGraySendMessageHook.java @@ -0,0 +1,90 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.holder; + +import java.util.Map; + +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.common.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; + +public class RocketMqGraySendMessageHook implements SendMessageHook { + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqGraySendMessageHook.class); + + @Override + public String hookName() { + return "MessageGraySendMessageHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + Message message = context.getMessage(); + + // set traffic tags in message by matching serviceMeta + if (injectTrafficTagByServiceMeta(message)) { + return; + } + + // set traffic tags in message by matching trafficTags + injectTrafficTagByTrafficTag(message); + } + + private void injectTrafficTagByTrafficTag(Message message) { + Map trafficGrayHeaders = RequestGrayHeaderHolder.getRequestGrayHeader(); + if (CollectionUtils.isEmpty(trafficGrayHeaders)) { + return; + } + Map grayTags = RocketMqMessageGrayUtils.getGrayTagsByGrayHeaders(trafficGrayHeaders); + if (grayTags.isEmpty()) { + return; + } + for (Map.Entry entry : grayTags.entrySet()) { + message.putUserProperty(entry.getKey(), entry.getValue()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("set property for message by gray header, messageId: " + message.getProperty("UNIQ_KEY") + + ", traffic tag: " + entry.getValue()); + } + } + } + + private boolean injectTrafficTagByServiceMeta(Message message) { + Map grayTags = RocketMqMessageGrayUtils.getGrayTagsByServiceMeta(); + if (grayTags.isEmpty()) { + return false; + } + for (Map.Entry entry : grayTags.entrySet()) { + if (message.getProperties() == null || !message.getProperties().containsKey(entry.getKey())) { + message.putUserProperty(entry.getKey(), entry.getValue()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("set property for message by service meta, messageId: " + message.getProperty("UNIQ_KEY") + ", " + + "traffic tag: " + entry.getValue()); + } + } + } + return true; + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java new file mode 100644 index 000000000..e98dee479 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java @@ -0,0 +1,213 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.manager; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; +import com.huaweicloud.rocketmq.grayscale.config.ConsumeModeEnum; +import com.huaweicloud.rocketmq.grayscale.config.ConsumerClientConfig; + +public class ConsumerGroupAutoCheckManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGroupAutoCheckManager.class); + + private static final AtomicBoolean START_AUTO_CHECK = new AtomicBoolean(false); + + private static final Map CONSUMER_CLIENT_CONFIG_MAP = new HashMap<>(); + + private static final long ROCKET_MQ_READ_TIMEOUT = 5000L; + + private static final Map> CONSUMER_GROUP_GRAY_TAG_MAP = new ConcurrentHashMap<>(); + + private static ScheduledExecutorService EXECUTOR_SERVICE; + + private static Future future = null; + + public static void addConsumerClientConfig(String consumeScope, ConsumerClientConfig clientConfig) { + CONSUMER_CLIENT_CONFIG_MAP.putIfAbsent(consumeScope, clientConfig); + } + + public static ConsumerClientConfig getConsumerClientConfig(String consumeScope) { + return CONSUMER_CLIENT_CONFIG_MAP.get(consumeScope); + } + + public static void checkAndStartAutoFindGrayGroup() { + String grayGroupTags = RocketMqMessageGrayUtils.getGrayGroupTagsByServiceMeta(); + if (!StringUtils.isEmpty(grayGroupTags)) { + return; + } + if (RocketMqMessageGrayUtils.getConsumeMode() == ConsumeModeEnum.AUTO + && START_AUTO_CHECK.compareAndSet(false, true)) { + findGrayConsumerGroupAndUpdateGrayTags(); + startSchedulerCheckGroupTask(); + return; + } + if (RocketMqMessageGrayUtils.getConsumeMode() == ConsumeModeEnum.BASE) { + shutdownExecutor(); + START_AUTO_CHECK.compareAndSet(true, false); + } + } + + private static void shutdownExecutor() { + if (EXECUTOR_SERVICE != null && !EXECUTOR_SERVICE.isShutdown()) { + try { + EXECUTOR_SERVICE.shutdown(); + EXECUTOR_SERVICE.awaitTermination(15, TimeUnit.SECONDS); + future.cancel(true); + } catch (InterruptedException e) { + LOGGER.debug("interrupted scheduler task failed!", e); + } + } + } + + public static void startSchedulerCheckGroupTask() { + if (EXECUTOR_SERVICE == null || EXECUTOR_SERVICE.isShutdown()) { + EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); + future = EXECUTOR_SERVICE.scheduleWithFixedDelay( + ConsumerGroupAutoCheckManager::findGrayConsumerGroupAndUpdateGrayTags, 10, + RocketMqMessageGrayUtils.getAutoCheckDelayTime(), TimeUnit.SECONDS); + } + } + + public static void findGrayConsumerGroupAndUpdateGrayTags() { + if (CONSUMER_CLIENT_CONFIG_MAP.isEmpty()) { + return; + } + for (ConsumerClientConfig clientConfig : CONSUMER_CLIENT_CONFIG_MAP.values()) { + if (clientConfig.getMqClientInstance() == null) { + continue; + } + HashSet grayTags = findGrayConsumerGroupAndGetTags(clientConfig); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[auto-check] current find gray tags: {}.", grayTags); + } + resetAutoFindConsumerGrayTags(grayTags, clientConfig); + } + } + + private static HashSet findGrayConsumerGroupAndGetTags(ConsumerClientConfig clientConfig) { + try { + MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl(); + String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi); + GroupList groupList = mqClientApi.queryTopicConsumeByWho(brokerAddress, clientConfig.getTopic(), + ROCKET_MQ_READ_TIMEOUT); + return getGrayTagsByConsumerGroup(groupList, brokerAddress, mqClientApi, + clientConfig.getConsumerGroup()); + } catch (MQClientException | InterruptedException | RemotingTimeoutException | RemotingSendRequestException + | RemotingConnectException | MQBrokerException e) { + LOGGER.error("[auto-check] mqClientApi query consumer group error!", e); + } + return new HashSet<>(); + } + + private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi) + throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, + InterruptedException, MQClientException { + TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(topic, ROCKET_MQ_READ_TIMEOUT, false); + List brokerList = new ArrayList<>(); + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + brokerList.addAll(brokerData.getBrokerAddrs().values()); + } + + // cluster mode has multiple addresses, just select one + return CollectionUtils.isEmpty(brokerList) ? "" : brokerList.get(0); + } + + private static HashSet getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress, + MQClientAPIImpl mqClientApi, String consumerGroup) { + HashSet grayTags = new HashSet<>(); + for (String group : groupList.getGroupList()) { + if (group.equals(consumerGroup) || !group.contains(consumerGroup)) { + continue; + } + try { + List consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group, + ROCKET_MQ_READ_TIMEOUT); + if (consumerIds.isEmpty()) { + continue; + } + String grayTag = StringUtils.substringAfterLast(group, consumerGroup + "_"); + if (!StringUtils.isEmpty(grayTag)) { + grayTags.add(grayTag); + } + } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException + | MQBrokerException | InterruptedException e) { + LOGGER.warn("[auto-check] can not find ids in group: {}.", group); + } + } + return grayTags; + } + + private static void resetAutoFindConsumerGrayTags(HashSet grayTags, ConsumerClientConfig clientConfig) { + String consumeScope = RocketMqMessageGrayUtils.buildCacheKey(clientConfig.getAddress(), + clientConfig.getTopic(), clientConfig.getConsumerGroup()); + HashSet lastGrayTags = CONSUMER_GROUP_GRAY_TAG_MAP.get(consumeScope); + if (isConsumerGrayTagsChanged(grayTags, lastGrayTags)) { + LOGGER.info("consumeScope [{}] grayTags changed, current grayTags [{}], origin grayTags [{}].", consumeScope, + grayTags, lastGrayTags); + CONSUMER_GROUP_GRAY_TAG_MAP.put(consumeScope, grayTags); + Map subscriptionInner + = RocketMqConsumerImplManager.getInstance().getSubscriptionInner(consumeScope); + RocketMqSubscriptionDataManager + .updateSubscriptionData(subscriptionInner, clientConfig.getTopic(), consumeScope); + } + } + + private static boolean isConsumerGrayTagsChanged(Set grayTags, HashSet lastGrayTags) { + if (lastGrayTags == null) { + return !grayTags.isEmpty(); + } + if (grayTags.isEmpty()) { + return !lastGrayTags.isEmpty(); + } + HashSet tempGrayTags = new HashSet<>(grayTags); + tempGrayTags.removeAll(lastGrayTags); + return !tempGrayTags.isEmpty() || grayTags.size() != lastGrayTags.size(); + } + + public static List getAutoFindGrayTags(String consumeScope) { + HashSet consumerGrayTags = CONSUMER_GROUP_GRAY_TAG_MAP.get(consumeScope); + return consumerGrayTags == null ? new ArrayList<>() : new ArrayList<>(consumerGrayTags); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java new file mode 100644 index 000000000..d3e8309b2 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java @@ -0,0 +1,83 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.manager; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; + +public class RocketMqConsumerImplManager { + private final static RocketMqConsumerImplManager instance = new RocketMqConsumerImplManager(); + + private final static Map pushConsumerImpls = new ConcurrentHashMap<>(); + + private final static Map litePullRebalanceImpls = new ConcurrentHashMap<>(); + + private RocketMqConsumerImplManager() { + } + + public static RocketMqConsumerImplManager getInstance() { + return instance; + } + + public void addPushConsumerImpls(String consumeScope, DefaultMQPushConsumerImpl pushConsumer) { + pushConsumerImpls.putIfAbsent(consumeScope, pushConsumer); + } + + public void addLitePullRebalanceImpl(String consumeScope, RebalanceImpl rebalanceImpl) { + litePullRebalanceImpls.putIfAbsent(consumeScope, rebalanceImpl); + } + + public Map getSubscriptionInner(String consumeScope) { + if (pushConsumerImpls.get(consumeScope) != null) { + return pushConsumerImpls.get(consumeScope).getSubscriptionInner(); + } + if (litePullRebalanceImpls.get(consumeScope) != null) { + return litePullRebalanceImpls.get(consumeScope).getSubscriptionInner(); + } + return new HashMap<>(); + } + + public static void updateConsumerSubscriptionData() { + if (!pushConsumerImpls.isEmpty()) { + for (String consumeScope : pushConsumerImpls.keySet()) { + ConcurrentMap subscriptionInner = pushConsumerImpls.get(consumeScope) + .getSubscriptionInner(); + RocketMqSubscriptionDataManager.updateSubscriptionData(subscriptionInner, + getTopicFromConsumeScope(consumeScope), consumeScope); + } + } + if (!litePullRebalanceImpls.isEmpty()) { + for (String consumeScope : litePullRebalanceImpls.keySet()) { + ConcurrentMap subscriptionInner = litePullRebalanceImpls.get(consumeScope) + .getSubscriptionInner(); + RocketMqSubscriptionDataManager.updateSubscriptionData(subscriptionInner, + getTopicFromConsumeScope(consumeScope), consumeScope); + } + } + } + + private static String getTopicFromConsumeScope(String consumeScope) { + return consumeScope.split("@")[1]; + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java new file mode 100644 index 000000000..2d46c40c0 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java @@ -0,0 +1,275 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.manager; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; +import com.huaweicloud.rocketmq.grayscale.config.ConsumeModeEnum; +import com.huaweicloud.rocketmq.grayscale.config.GrayscaleProperties; + +public class RocketMqSubscriptionDataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqSubscriptionDataManager.class); + + public static final String EXPRESSION_TYPE_TAG = "TAG"; + + public static final String EXPRESSION_TYPE_SQL92 = "SQL92"; + + public static final String SELECT_ALL_MESSAGE_SQL = "(_message_tag_ is null) or (_message_tag_ is not null)"; + + private static final Pattern PATTERN = Pattern.compile(" and | or ", Pattern.CASE_INSENSITIVE); + + private static final String AND_SPLICE_STR = " and "; + + public static void updateSubscriptionData(Map subscriptionInner, String topic, + String consumeScope) { + if (subscriptionInner == null || subscriptionInner.isEmpty()) { + return; + } + String originSubstring = subscriptionInner.get(topic).getSubString(); + String sql92Substring = buildSql92Substring(subscriptionInner.get(topic), consumeScope); + for (SubscriptionData subscriptionData : subscriptionInner.values()) { + subscriptionData.setSubString(sql92Substring); + if (EXPRESSION_TYPE_TAG.equals(subscriptionData.getExpressionType())) { + subscriptionData.setExpressionType(EXPRESSION_TYPE_SQL92); + subscriptionData.getCodeSet().clear(); + subscriptionData.getTagsSet().clear(); + } + subscriptionData.setSubVersion(System.currentTimeMillis()); + } + LOGGER.info("update TOPIC: {} SQL92 subscriptionData, originSubStr: {}, sql92SubStr: {}", topic, + originSubstring, sql92Substring); + } + + private static String buildSql92Substring(SubscriptionData subscriptionData, String consumeScope) { + String tempSubStr = subscriptionData.getSubString(); + if (EXPRESSION_TYPE_TAG.equals(subscriptionData.getExpressionType())) { + tempSubStr = buildTagsSetSql92Expression(subscriptionData.getTagsSet()); + } + String sql92Expression = addGrayTagsToSql92Expression(tempSubStr, consumeScope); + if (StringUtils.isEmpty(sql92Expression)) { + sql92Expression = SELECT_ALL_MESSAGE_SQL; + } + return sql92Expression; + } + + public static String addGrayTagsToSql92Expression(String subscriptionData, String consumeScope) { + String tempSubscriptionData = subscriptionData; + if (!StringUtils.isBlank(tempSubscriptionData)) { + tempSubscriptionData = rebuildSubDataWithoutGrayTag(tempSubscriptionData); + } + String sql92Expression = buildGrayTagsSql92Expression(StringUtils.isBlank(tempSubscriptionData), consumeScope); + if (StringUtils.isBlank(sql92Expression)) { + return tempSubscriptionData; + } + return StringUtils.isBlank(tempSubscriptionData) + ? sql92Expression : tempSubscriptionData + AND_SPLICE_STR + sql92Expression; + } + + private static String buildGrayTagsSql92Expression(boolean isOriginSubStrIsEmpty, String consumeScope) { + StringBuilder sb = new StringBuilder(); + String grayGroupTag = RocketMqMessageGrayUtils.getGrayGroupTagsByServiceMeta(); + if (StringUtils.isEmpty(grayGroupTag)) { + // base model return without exclude group message + if (RocketMqMessageGrayUtils.getConsumeMode() == ConsumeModeEnum.BASE) { + sb.append(buildBaseConsumerSql92Expression(getTrafficTagMapInBaseMode(), isOriginSubStrIsEmpty)); + return sb.toString(); + } + + // auto model return without exclude group and current consume message gray group message + sb.append(buildBaseConsumerSql92Expression(getTrafficTagMapInAutoMode(consumeScope), isOriginSubStrIsEmpty)); + } else { + Map> trafficTagMap = getTrafficTagMapByGroupTags(List.of(grayGroupTag)); + if (!CollectionUtils.isEmpty(trafficTagMap)) { + sb.append(buildGrayConsumerSql92Expression(trafficTagMap, isOriginSubStrIsEmpty)); + } else { + LOGGER.warn("current gray group {} had not set grayscale, set it and restart service.", grayGroupTag); + } + } + return sb.toString(); + } + + private static String buildGrayConsumerSql92Expression(Map> trafficTagMap, + boolean isOriginSubStrIsEmpty) { + StringBuilder builder = new StringBuilder(); + if (trafficTagMap.size() > 1 || !isOriginSubStrIsEmpty) { + builder.append("("); + } + for (Map.Entry> envEntry : trafficTagMap.entrySet()) { + if (builder.length() > 1) { + builder.append(" or "); + } + builder.append("(") + .append(envEntry.getKey()) + .append(" in ") + .append(getStrForSets(new HashSet<>(envEntry.getValue()))) + .append(")"); + } + if (trafficTagMap.size() > 1 || !isOriginSubStrIsEmpty) { + builder.append(")"); + } + return builder.toString(); + } + + private static Map> getTrafficTagMapInBaseMode() { + return getTrafficTagMapByGroupTags(RocketMqMessageGrayUtils.getExcludeGroupTags()); + } + + private static Map> getTrafficTagMapInAutoMode(String consumeScope) { + List groupTags = new ArrayList<>(); + List excludeGroupTags = RocketMqMessageGrayUtils.getExcludeGroupTags(); + if (!CollectionUtils.isEmpty(excludeGroupTags)) { + groupTags.addAll(excludeGroupTags); + } + List autoFindGroupTags = ConsumerGroupAutoCheckManager.getAutoFindGrayTags(consumeScope); + if (!CollectionUtils.isEmpty(autoFindGroupTags)) { + groupTags.addAll(autoFindGroupTags); + } + return getTrafficTagMapByGroupTags(groupTags); + } + + private static String buildBaseConsumerSql92Expression(Map> trafficTagMap, + boolean isOriginSubStrIsEmpty) { + if (CollectionUtils.isEmpty(trafficTagMap)) { + return ""; + } + StringBuilder builder = new StringBuilder(); + if (trafficTagMap.size() > 1 || !isOriginSubStrIsEmpty) { + builder.append("("); + } + for (Map.Entry> envEntry : trafficTagMap.entrySet()) { + if (builder.length() > 1) { + builder.append(AND_SPLICE_STR); + } + if (trafficTagMap.size() > 1) { + builder.append("("); + } + builder.append("(") + .append(envEntry.getKey()) + .append(" not in ") + .append(getStrForSets(new HashSet<>(envEntry.getValue()))) + .append(")") + .append(" or ") + .append("(") + .append(envEntry.getKey()) + .append(" is null") + .append(")"); + if (trafficTagMap.size() > 1) { + builder.append(")"); + } + } + if (trafficTagMap.size() > 1 || !isOriginSubStrIsEmpty) { + builder.append(")"); + } + return builder.toString(); + } + + private static Map> getTrafficTagMapByGroupTags(List groupTags) { + Map> trafficTagMap = new HashMap<>(); + if (CollectionUtils.isEmpty(groupTags)) { + return trafficTagMap; + } + List grayscale = RocketMqMessageGrayUtils.getMessageGrayProperties().getGrayscale(); + for (GrayscaleProperties properties : grayscale) { + if (groupTags.contains(properties.getConsumerGroupTag())) { + buildTrafficTagMap(trafficTagMap, properties.getTrafficTag()); + } + } + return trafficTagMap; + } + + private static void buildTrafficTagMap(Map> trafficTagMap, Map trafficTag) { + if (CollectionUtils.isEmpty(trafficTag)) { + return; + } + for (String key : trafficTag.keySet()) { + trafficTagMap.computeIfAbsent(key, k -> new ArrayList<>()).add(trafficTag.get(key)); + } + } + + private static String rebuildSubDataWithoutGrayTag(String originSubData) { + if (StringUtils.isBlank(originSubData)) { + return originSubData; + } + String[] originConditions = PATTERN.split(originSubData); + List refactorConditions = new ArrayList<>(); + for (String condition : originConditions) { + if (!containsGrayTags(condition) && !condition.contains("_message_tag_")) { + refactorConditions.add(condition); + } + } + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < refactorConditions.size(); i++) { + sb.append(refactorConditions.get(i)); + if (i != refactorConditions.size() - 1) { + sb.append(AND_SPLICE_STR); + } + } + return sb.toString(); + } + + private static boolean containsGrayTags(String condition) { + for (String key : getGrayTagsSet()) { + if (condition.contains(key)) { + return true; + } + } + return false; + } + + private static Set getGrayTagsSet() { + Set grayTags = new HashSet<>(); + for (GrayscaleProperties item : RocketMqMessageGrayUtils.getMessageGrayProperties().getGrayscale()) { + if (!item.getTrafficTag().isEmpty()) { + grayTags.addAll(item.getTrafficTag().keySet()); + } + } + return grayTags; + } + + public static String buildTagsSetSql92Expression(Set tagsSet) { + return tagsSet != null && !tagsSet.isEmpty() ? buildTagsExpression(tagsSet) : ""; + } + + private static String buildTagsExpression(Set tagsSet) { + return "(TAGS is not null and TAGS in " + getStrForSets(tagsSet) + ")"; + } + + private static String getStrForSets(Set tags) { + StringBuilder builder = new StringBuilder("("); + for (String tag : tags) { + builder.append("'").append(tag).append("'"); + builder.append(","); + } + builder.deleteCharAt(builder.length() - 1); + builder.append(")"); + return builder.toString(); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/NacosServiceMeta.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/NacosServiceMeta.java new file mode 100644 index 000000000..5ec07040d --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/NacosServiceMeta.java @@ -0,0 +1,33 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.servicemeta; + +import java.util.HashMap; +import java.util.Map; + +public class NacosServiceMeta { + private Map serviceMeta = new HashMap<>(); + + public Map getServiceMeta() { + return serviceMeta; + } + + public void setServiceMeta(Map serviceMeta) { + this.serviceMeta = serviceMeta; + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java new file mode 100644 index 000000000..1d374f719 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java @@ -0,0 +1,43 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.servicemeta; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; + +public class ServiceMetaManager { + public ServiceMetaManager(NacosServiceMeta nacosServiceMeta, ServicecombServiceMeta servicecombServiceMeta) { + loadedServiceMetaData(nacosServiceMeta, servicecombServiceMeta); + } + private void loadedServiceMetaData(NacosServiceMeta nacosServiceMeta, + ServicecombServiceMeta servicecombServiceMeta) { + Map serviceMeta = new HashMap<>(); + if (!servicecombServiceMeta.getProperties().isEmpty()) { + serviceMeta.putAll(servicecombServiceMeta.getProperties()); + } + if (!nacosServiceMeta.getServiceMeta().isEmpty()) { + serviceMeta.putAll(nacosServiceMeta.getServiceMeta()); + } + RocketMqMessageGrayUtils.setServiceMetaData(serviceMeta); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServicecombServiceMeta.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServicecombServiceMeta.java new file mode 100644 index 000000000..30e11c3a4 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServicecombServiceMeta.java @@ -0,0 +1,48 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.servicemeta; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Value; + +public class ServicecombServiceMeta { + private Map properties = new HashMap<>(); + + @Value("${spring.cloud.servicecomb.service.version:" + + "${spring.cloud.servicecomb.discovery.version:0.0.1}}") + private String version; + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + this.properties.put("version", version); + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqListenerContainerAspect.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqListenerContainerAspect.java new file mode 100644 index 000000000..057a54ff5 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqListenerContainerAspect.java @@ -0,0 +1,79 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.springboot; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.After; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.aspectj.lang.annotation.Pointcut; + +import com.huaweicloud.rocketmq.grayscale.manager.ConsumerGroupAutoCheckManager; +import com.huaweicloud.rocketmq.grayscale.manager.RocketMqConsumerImplManager; +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; +import com.huaweicloud.rocketmq.grayscale.manager.RocketMqSubscriptionDataManager; +import com.huaweicloud.rocketmq.grayscale.config.ConsumerClientConfig; + +@Aspect +public class RocketMqListenerContainerAspect { + @Pointcut("execution(* org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start())") + public void pointcut() { + } + + @Before("pointcut()") + public void beforeStart(JoinPoint joinPoint) { + DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) joinPoint.getTarget(); + String consumerGroup = container.getConsumerGroup(); + String grayConsumerGroup = RocketMqMessageGrayUtils.getGrayConsumerGroup(consumerGroup); + if (StringUtils.equals(consumerGroup, grayConsumerGroup)) { + return; + } + container.setConsumerGroup(grayConsumerGroup); + container.getConsumer().setConsumerGroup(grayConsumerGroup); + } + + @After("pointcut()") + public void afterStart(JoinPoint joinPoint) { + DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) joinPoint.getTarget(); + String consumeScope = RocketMqMessageGrayUtils.buildCacheKey(container.getConsumer().getNamesrvAddr(), + container.getTopic(), container.getConsumer().getConsumerGroup()); + DefaultMQPushConsumerImpl pushConsumerImpl = getPushConsumerImpl(container); + RocketMqConsumerImplManager.getInstance().addPushConsumerImpls(consumeScope, pushConsumerImpl); + addConsumerClientConfig(container.getConsumer().getNamesrvAddr(), container.getTopic(), + container.getConsumer().getConsumerGroup(), pushConsumerImpl.getmQClientFactory()); + ConsumerGroupAutoCheckManager.checkAndStartAutoFindGrayGroup(); + RocketMqSubscriptionDataManager.updateSubscriptionData(pushConsumerImpl.getSubscriptionInner(), + container.getTopic(), consumeScope); + } + + private void addConsumerClientConfig(String namesrvAddr, String topic, String consumerGroup, + MQClientInstance mqClientInstance) { + ConsumerClientConfig clientConfig = new ConsumerClientConfig(topic, namesrvAddr, consumerGroup, mqClientInstance); + String consumeScope = RocketMqMessageGrayUtils.buildCacheKey(namesrvAddr, topic, consumerGroup); + ConsumerGroupAutoCheckManager.addConsumerClientConfig(consumeScope, clientConfig); + } + + @SuppressWarnings({"deprecation"}) + private DefaultMQPushConsumerImpl getPushConsumerImpl(DefaultRocketMQListenerContainer container) { + return container.getConsumer().getDefaultMQPushConsumerImpl(); + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqMessageListenerAspect.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqMessageListenerAspect.java new file mode 100644 index 000000000..4db4bd36b --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqMessageListenerAspect.java @@ -0,0 +1,44 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.springboot; + +import org.apache.rocketmq.common.message.Message; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.aspectj.lang.annotation.Pointcut; + +import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; + +@Aspect +public class RocketMqMessageListenerAspect { + @Pointcut("execution(* org.apache.rocketmq.spring.core.RocketMQListener.*(..))") + public void pointcut() { + } + + @Before("pointcut()") + public void beforeOnMessage(JoinPoint joinPoint) { + Object message = joinPoint.getArgs()[0]; + if (message == null) { + return; + } + if (message instanceof Message) { + RocketMqMessageGrayUtils.setInvocationContext(((Message) message).getProperties()); + } + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqSendMessageHookManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqSendMessageHookManager.java new file mode 100644 index 000000000..af67bd5b9 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/springboot/RocketMqSendMessageHookManager.java @@ -0,0 +1,39 @@ +/* + + * Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.rocketmq.grayscale.springboot; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.springframework.context.ApplicationContext; + +import com.huaweicloud.rocketmq.grayscale.holder.RocketMqGraySendMessageHook; + +public class RocketMqSendMessageHookManager { + public static final String PRODUCER_BEAN_NAME = "defaultMQProducer"; + + public RocketMqSendMessageHookManager(ApplicationContext applicationContext) { + registerMessageHook(applicationContext); + } + + @SuppressWarnings({"deprecation"}) + private void registerMessageHook(ApplicationContext applicationContext) { + if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) { + DefaultMQProducer defaultMQProducer = (DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME); + defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(new RocketMqGraySendMessageHook()); + } + } +} diff --git a/spring-cloud-huawei-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-huawei-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 000000000..718e30197 --- /dev/null +++ b/spring-cloud-huawei-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,20 @@ +# +## --------------------------------------------------------------------------- +## +## Copyright (C) 2020-2025 Huawei Technologies Co., Ltd. All rights reserved. +## +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayConfiguration +com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayWebFluxConfiguration diff --git a/spring-cloud-starter-huawei/pom.xml b/spring-cloud-starter-huawei/pom.xml index aa934bba8..f3277d97f 100644 --- a/spring-cloud-starter-huawei/pom.xml +++ b/spring-cloud-starter-huawei/pom.xml @@ -48,6 +48,7 @@ spring-cloud-starter-huawei-mesh spring-cloud-starter-huawei-mesh-gateway spring-cloud-starter-huawei-mesh-webflux + spring-cloud-starter-huawei-rocketmq \ No newline at end of file diff --git a/spring-cloud-starter-huawei/spring-cloud-starter-huawei-rocketmq/pom.xml b/spring-cloud-starter-huawei/spring-cloud-starter-huawei-rocketmq/pom.xml new file mode 100644 index 000000000..f59bf6997 --- /dev/null +++ b/spring-cloud-starter-huawei/spring-cloud-starter-huawei-rocketmq/pom.xml @@ -0,0 +1,38 @@ + + + + + spring-cloud-starter-huawei + com.huaweicloud + 1.12.0-2024.0.x-SNAPSHOT + + 4.0.0 + + spring-cloud-starter-huawei-rocketmq + Spring Cloud Huawei::Starter::Rocketmq + + + + com.huaweicloud + spring-cloud-huawei-rocketmq + + + + \ No newline at end of file From 3bfa6e5572b076bd677eb4568611359fbe89ed3c Mon Sep 17 00:00:00 2001 From: chengyouling Date: Fri, 30 May 2025 10:22:41 +0800 Subject: [PATCH 2/2] fixed checkstyle problem --- .../RocketMqMessageGrayConfiguration.java | 2 +- .../grayscale/RocketMqMessageGrayUtils.java | 8 +++---- ...cketMqMessageGrayWebFluxConfiguration.java | 1 - .../grayscale/config/GrayscaleProperties.java | 2 -- .../config/RocketMqMessageGrayProperties.java | 5 ---- .../filter/WebFluxGrayHeaderFilter.java | 8 +++---- .../filter/WebMvcGrayHeaderFilter.java | 8 ++++--- .../holder/RequestGrayHeaderHolder.java | 4 ---- .../ConsumerGroupAutoCheckManager.java | 16 ++++++------- .../manager/RocketMqConsumerImplManager.java | 24 +++---------------- .../RocketMqSubscriptionDataManager.java | 4 ++-- .../servicemeta/ServiceMetaManager.java | 3 --- 12 files changed, 27 insertions(+), 58 deletions(-) diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java index 8f135d5d8..54c95fef0 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayConfiguration.java @@ -70,7 +70,7 @@ public RocketMqListenerContainerAspect rocketMQListenerContainerAspect() { @Bean @ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQListener") - public RocketMqMessageListenerAspect RocketMqListenerAspect() { + public RocketMqMessageListenerAspect rocketMqListenerAspect() { return new RocketMqMessageListenerAspect(); } diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java index 73dd155af..8cd506a3b 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayUtils.java @@ -122,10 +122,10 @@ public static void setInvocationContext(Map properties) { return; } InvocationContext invocationContext = InvocationContextHolder.getOrCreateInvocationContext(); - for (String key : properties.keySet()) { - if (trafficTagMap.get(key) != null && properties.get(key) != null - && trafficTagMap.get(key).contains(properties.get(key))) { - invocationContext.putContext(key, properties.get(key)); + for (Map.Entry entry : properties.entrySet()) { + if (trafficTagMap.get(entry.getKey()) != null && entry.getValue() != null + && trafficTagMap.get(entry.getKey()).contains(entry.getValue())) { + invocationContext.putContext(entry.getKey(), entry.getValue()); } } } diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java index 15d40492d..737aee154 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/RocketMqMessageGrayWebFluxConfiguration.java @@ -22,7 +22,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.huaweicloud.rocketmq.grayscale.ConditionalOnRocketMqMsgGrayEnabled; import com.huaweicloud.rocketmq.grayscale.filter.WebFluxGrayHeaderFilter; @Configuration diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java index 7f8517542..6b8f84964 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/GrayscaleProperties.java @@ -22,8 +22,6 @@ import org.apache.commons.lang3.StringUtils; -import com.google.common.cache.Cache; - public class GrayscaleProperties { private String consumerGroupTag; diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java index c34b8898d..1d3a8c664 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/config/RocketMqMessageGrayProperties.java @@ -22,11 +22,6 @@ import java.util.List; import java.util.Map; -import org.springframework.cloud.context.config.annotation.RefreshScope; - -import com.huaweicloud.rocketmq.grayscale.config.BaseProperties; -import com.huaweicloud.rocketmq.grayscale.config.GrayscaleProperties; - public class RocketMqMessageGrayProperties { private List grayscale = new ArrayList<>(); diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java index 38f844498..8ed3630be 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebFluxGrayHeaderFilter.java @@ -48,10 +48,10 @@ public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { } Map matchHeaders = new HashMap<>(); HttpHeaders requestHeaders = exchange.getRequest().getHeaders(); - for (String key : trafficTags.keySet()) { - String headerValue = requestHeaders.getFirst(key); - if (!StringUtils.isEmpty(headerValue) && trafficTags.get(key).contains(headerValue)) { - matchHeaders.put(key, headerValue); + for (Map.Entry> entry : trafficTags.entrySet()) { + String headerValue = requestHeaders.getFirst(entry.getKey()); + if (!StringUtils.isEmpty(headerValue) && entry.getValue().contains(headerValue)) { + matchHeaders.put(entry.getKey(), headerValue); } } if (!CollectionUtils.isEmpty(matchHeaders)) { diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java index ff9ae88a3..09448bf03 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/filter/WebMvcGrayHeaderFilter.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.springframework.util.CollectionUtils; import com.huaweicloud.rocketmq.grayscale.holder.RequestGrayHeaderHolder; @@ -41,9 +42,10 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha HttpServletRequest servletRequest = (HttpServletRequest) request; Map> trafficTags = RocketMqMessageGrayUtils.getAllTrafficTagMap(); Map matchHeaders = new HashMap<>(); - for (String key : trafficTags.keySet()) { - if (servletRequest.getHeader(key) != null && trafficTags.get(key).contains(servletRequest.getHeader(key))) { - matchHeaders.put(key, servletRequest.getHeader(key)); + for (Map.Entry> entry : trafficTags.entrySet()) { + String headerValue = servletRequest.getHeader(entry.getKey()); + if (!StringUtils.isEmpty(headerValue) && entry.getValue().contains(headerValue)) { + matchHeaders.put(entry.getKey(), headerValue); } } if (!CollectionUtils.isEmpty(matchHeaders)) { diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java index 487ddaf7b..cef87bab8 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/holder/RequestGrayHeaderHolder.java @@ -18,10 +18,6 @@ package com.huaweicloud.rocketmq.grayscale.holder; import java.util.Map; -import java.util.concurrent.TimeUnit; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; public final class RequestGrayHeaderHolder { private static final ThreadLocal> INVOCATION_GRAY_HEADERS = new ThreadLocal<>(); diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java index e98dee479..957a2979b 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/ConsumerGroupAutoCheckManager.java @@ -89,7 +89,7 @@ public static void checkAndStartAutoFindGrayGroup() { } } - private static void shutdownExecutor() { + private static synchronized void shutdownExecutor() { if (EXECUTOR_SERVICE != null && !EXECUTOR_SERVICE.isShutdown()) { try { EXECUTOR_SERVICE.shutdown(); @@ -101,7 +101,7 @@ private static void shutdownExecutor() { } } - public static void startSchedulerCheckGroupTask() { + public static synchronized void startSchedulerCheckGroupTask() { if (EXECUTOR_SERVICE == null || EXECUTOR_SERVICE.isShutdown()) { EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); future = EXECUTOR_SERVICE.scheduleWithFixedDelay( @@ -118,7 +118,7 @@ public static void findGrayConsumerGroupAndUpdateGrayTags() { if (clientConfig.getMqClientInstance() == null) { continue; } - HashSet grayTags = findGrayConsumerGroupAndGetTags(clientConfig); + Set grayTags = findGrayConsumerGroupAndGetTags(clientConfig); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[auto-check] current find gray tags: {}.", grayTags); } @@ -126,7 +126,7 @@ public static void findGrayConsumerGroupAndUpdateGrayTags() { } } - private static HashSet findGrayConsumerGroupAndGetTags(ConsumerClientConfig clientConfig) { + private static Set findGrayConsumerGroupAndGetTags(ConsumerClientConfig clientConfig) { try { MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl(); String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi); @@ -154,7 +154,7 @@ private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi return CollectionUtils.isEmpty(brokerList) ? "" : brokerList.get(0); } - private static HashSet getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress, + private static Set getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress, MQClientAPIImpl mqClientApi, String consumerGroup) { HashSet grayTags = new HashSet<>(); for (String group : groupList.getGroupList()) { @@ -179,14 +179,14 @@ private static HashSet getGrayTagsByConsumerGroup(GroupList groupList, S return grayTags; } - private static void resetAutoFindConsumerGrayTags(HashSet grayTags, ConsumerClientConfig clientConfig) { + private static void resetAutoFindConsumerGrayTags(Set grayTags, ConsumerClientConfig clientConfig) { String consumeScope = RocketMqMessageGrayUtils.buildCacheKey(clientConfig.getAddress(), clientConfig.getTopic(), clientConfig.getConsumerGroup()); HashSet lastGrayTags = CONSUMER_GROUP_GRAY_TAG_MAP.get(consumeScope); if (isConsumerGrayTagsChanged(grayTags, lastGrayTags)) { LOGGER.info("consumeScope [{}] grayTags changed, current grayTags [{}], origin grayTags [{}].", consumeScope, grayTags, lastGrayTags); - CONSUMER_GROUP_GRAY_TAG_MAP.put(consumeScope, grayTags); + CONSUMER_GROUP_GRAY_TAG_MAP.put(consumeScope, new HashSet<>(grayTags)); Map subscriptionInner = RocketMqConsumerImplManager.getInstance().getSubscriptionInner(consumeScope); RocketMqSubscriptionDataManager @@ -194,7 +194,7 @@ private static void resetAutoFindConsumerGrayTags(HashSet grayTags, Cons } } - private static boolean isConsumerGrayTagsChanged(Set grayTags, HashSet lastGrayTags) { + private static boolean isConsumerGrayTagsChanged(Set grayTags, Set lastGrayTags) { if (lastGrayTags == null) { return !grayTags.isEmpty(); } diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java index d3e8309b2..c32569fb7 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqConsumerImplManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; -import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class RocketMqConsumerImplManager { @@ -31,8 +30,6 @@ public class RocketMqConsumerImplManager { private final static Map pushConsumerImpls = new ConcurrentHashMap<>(); - private final static Map litePullRebalanceImpls = new ConcurrentHashMap<>(); - private RocketMqConsumerImplManager() { } @@ -44,33 +41,18 @@ public void addPushConsumerImpls(String consumeScope, DefaultMQPushConsumerImpl pushConsumerImpls.putIfAbsent(consumeScope, pushConsumer); } - public void addLitePullRebalanceImpl(String consumeScope, RebalanceImpl rebalanceImpl) { - litePullRebalanceImpls.putIfAbsent(consumeScope, rebalanceImpl); - } - public Map getSubscriptionInner(String consumeScope) { if (pushConsumerImpls.get(consumeScope) != null) { return pushConsumerImpls.get(consumeScope).getSubscriptionInner(); } - if (litePullRebalanceImpls.get(consumeScope) != null) { - return litePullRebalanceImpls.get(consumeScope).getSubscriptionInner(); - } return new HashMap<>(); } public static void updateConsumerSubscriptionData() { if (!pushConsumerImpls.isEmpty()) { - for (String consumeScope : pushConsumerImpls.keySet()) { - ConcurrentMap subscriptionInner = pushConsumerImpls.get(consumeScope) - .getSubscriptionInner(); - RocketMqSubscriptionDataManager.updateSubscriptionData(subscriptionInner, - getTopicFromConsumeScope(consumeScope), consumeScope); - } - } - if (!litePullRebalanceImpls.isEmpty()) { - for (String consumeScope : litePullRebalanceImpls.keySet()) { - ConcurrentMap subscriptionInner = litePullRebalanceImpls.get(consumeScope) - .getSubscriptionInner(); + for (Map.Entry consumerImplEntry : pushConsumerImpls.entrySet()) { + String consumeScope = consumerImplEntry.getKey(); + ConcurrentMap subscriptionInner = consumerImplEntry.getValue().getSubscriptionInner(); RocketMqSubscriptionDataManager.updateSubscriptionData(subscriptionInner, getTopicFromConsumeScope(consumeScope), consumeScope); } diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java index 2d46c40c0..2f5b8e815 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/manager/RocketMqSubscriptionDataManager.java @@ -209,8 +209,8 @@ private static void buildTrafficTagMap(Map> trafficTagMap, if (CollectionUtils.isEmpty(trafficTag)) { return; } - for (String key : trafficTag.keySet()) { - trafficTagMap.computeIfAbsent(key, k -> new ArrayList<>()).add(trafficTag.get(key)); + for (Map.Entry entry : trafficTag.entrySet()) { + trafficTagMap.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(entry.getValue()); } } diff --git a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java index 1d374f719..d55cebc3f 100644 --- a/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java +++ b/spring-cloud-huawei-rocketmq/src/main/java/com/huaweicloud/rocketmq/grayscale/servicemeta/ServiceMetaManager.java @@ -20,9 +20,6 @@ import java.util.HashMap; import java.util.Map; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - import com.huaweicloud.rocketmq.grayscale.RocketMqMessageGrayUtils; public class ServiceMetaManager {