Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于腾讯云tdmq消息队列封装SpringBootStarter(二)

基于腾讯云tdmq消息队列封装SpringBootStarter(二)

原创
作者头像
JulyWhj
发布于 2022-01-03 06:44:59
发布于 2022-01-03 06:44:59
3.5K03
代码可运行
举报
运行总次数:3
代码可运行

关于腾讯云tdmq的基本使用参见《基于腾讯云tdmq消息队列封装SpringBootStarter(一)》,这里我们基于之前的内容在次进行优化封装。

一、创建消费者注解(TdmqConsumer)和生产者注解(TdmqProducer)

1.1、基础工程回顾

首先我们回顾下上一章完成的基础功能。

工程目录

上一章我们创建了配置目录config、生产者和消费者目录,以及META-INF目录和spring.factories配置文件。

在此基础上我们继续完善我们的工程。

1.2、创建注解

在该工程上新建annotation包,并在annotation包下创建TdmqProducerTdmqConsumer注解。并且在消费者注解TdmqConsumer注解中新增一下属性:topicclazzSubscriptionTypeconsumerNamesubscriptionName

最终消费者注解内容如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 消费者注解
 *
 * @author wanghongjie
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface TdmqConsumer {
    /**
     * 订阅主题
     *
     * @return
     */
    String topic();

    /**
     * 序列化类
     *
     * @return
     */
    Class<?> clazz() default byte[].class;

    /**
     * 消费者类型
     *
     * @return
     */
    SubscriptionType[] subscriptionType() default {};

    /**
     * 消费者名称
     *
     * @return
     */
    String consumerName() default "";

    /**
     * 订阅对象名称
     *
     * @return
     */
    String subscriptionName() default "";
}

生产者注解:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 生产者注解
 *
 * @author wanghongjie
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface TdmqProducer {
}

1.3、创建收集器

我们在工程中创建生产者收集器(ProducerCollector)和消费者收集器(ConsumerCollector),创建收集器的目的是在springBoot项目启动中,扫描所有带有TdmqProducerTdmqConsumer注解的Bean对象,并将其统一管理。

在工程中创建collector包,并在该包下创建ProducerCollectorConsumerCollector

在创建消费者收集器前我们需要创建个对象类ConsumerHolderProducerCollector,用来绑定注解和实现类的绑定关系。

1.3.1、创建消费者绑定对象
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description $
 **/
public class ConsumerHolder {
    private final TdmqConsumer annotation;
    private final Method handler;
    private final Object bean;
    private final Class<?> type;

    ConsumerHolder(TdmqConsumer annotation, Method handler, Object bean, Class<?> type) {
        this.annotation = annotation;
        this.handler = handler;
        this.bean = bean;
        this.type = type;
    }

    public TdmqConsumer getAnnotation() {
        return annotation;
    }

    public Method getHandler() {
        return handler;
    }

    public Object getBean() {
        return bean;
    }

    public Class<?> getType() {
        return type;
    }

    public boolean isWrapped() {
        return type.isAssignableFrom(Object.class);
    }
}
1.3.2 生产者绑定对象
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 生产者绑定关系$
 * @Date 2022/1/3 1:26 下午
 **/
public class ProducerHolder {
    private final String topic;
    private final Class<?> clazz;
    private final String serialization;

    public ProducerHolder(String topic, Class<?> clazz, String serialization) {
        this.topic = topic;
        this.clazz = clazz;
        this.serialization = serialization;
    }

    public String getTopic() {
        return topic;
    }

    public Class<?> getClazz() {
        return clazz;
    }

    public String getSerialization() {
        return serialization;
    }

}
1.3.3、创建消费者收集器`ConsumerCollector`.
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 消费者收集器$
 * @Date 2022/1/3 10:34 上午
 **/
@Configuration
public class ConsumerCollector implements BeanPostProcessor {
    /**
     * 维护SpringBoot所有bean对象中包含TdmqConsumer注解的实例对象
     */
    private Map<String, ConsumerHolder> consumers = new ConcurrentHashMap<>();

    /**
     * SpringBoot 启动过程中,Bean实例化后加载postProcessBeforeInitialization方法
     *
     * @param bean     bean对象
     * @param beanName 注解所在的方法名称
     * @return
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        final Class<?> beanClass = bean.getClass();
        // 过滤所有的Bean对象,如果包含TdmqConsumer注解的加入到consumers中
        consumers.putAll(Arrays.stream(beanClass.getDeclaredMethods())
                .filter($ -> $.isAnnotationPresent(TdmqConsumer.class))
                .collect(Collectors.toMap(
                        method -> buildConsumerName(beanClass, method),
                        method -> new ConsumerHolder(method.getAnnotation(TdmqConsumer.class), method, bean,
                                getParameterType(method)))));

        return bean;
    }


    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        return bean;
    }

    public Map<String, ConsumerHolder> getConsumers() {
        return consumers;
    }

    public Optional<ConsumerHolder> getConsumer(String methodDescriptor) {
        return Optional.ofNullable(consumers.get(methodDescriptor));
    }

    public static Class<?> getParameterType(Method method) {
        return method.getParameterTypes()[0];
    }

    /**
     * 构建消费者名称
     *
     * @param clazz  对象
     * @param method 方法
     * @return 消费者名称
     */
    public String buildConsumerName(Class<?> clazz, Method method) {
        return clazz.getName() + method.getName() + Arrays
                .stream(method.getGenericParameterTypes())
                .map(Type::getTypeName)
                .collect(Collectors.joining());
    }
}
1.3.4、创建生产者收集器
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 生产者收集器$
 * @Date 2022/1/3 1:25 下午
 **/
@Component
public class ProducerCollector implements BeanPostProcessor, EmbeddedValueResolverAware {

    private final PulsarClient pulsarClient;
    private final TdmqProperties tdmqProperties;

    private final Map<String, Producer> producers = new ConcurrentHashMap<>();

    private StringValueResolver stringValueResolver;

    public ProducerCollector(PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
        this.pulsarClient = pulsarClient;
        this.tdmqProperties = tdmqProperties;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        final Class<?> beanClass = bean.getClass();
        if (beanClass.isAnnotationPresent(TdmqProducer.class) && bean instanceof IProducerFactory) {
            producers.putAll(((IProducerFactory) bean).getTopics().entrySet().stream()
                    .map($ -> new ProducerHolder(
                            stringValueResolver.resolveStringValue($.getKey()),
                            $.getValue().left,
                            $.getValue().right))
                    .collect(Collectors.toMap(ProducerHolder::getTopic, this::buildProducer)));
        }

        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        return bean;
    }

    private Producer<?> buildProducer(ProducerHolder holder) {
        try {
            return pulsarClient.newProducer(getSchema(holder))
                    .topic(buildTopicUrl(holder.getTopic()))
                    .create();
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private <T> Schema<?> getSchema(ProducerHolder holder) throws RuntimeException {
        return getGenericSchema(holder.getSerialization(), holder.getClazz());
    }

    public Producer getProducer(String topic) {
        return producers.get(stringValueResolver.resolveStringValue(topic));
    }

    @Override
    public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
        this.stringValueResolver = stringValueResolver;
    }

    public String buildTopicUrl(String topic) {
        return tdmqProperties.getClusterId() + "/" + tdmqProperties.getEnvironmentId() +
                "/" + topic;
    }


    private static <T> Schema<?> getGenericSchema(String type, Class<T> clazz) throws RuntimeException {
        switch (type) {
            case "JSON": {
                return Schema.JSON(clazz);
            }
            case "AVRO": {
                return Schema.AVRO(clazz);
            }
            case "STRING": {
                return Schema.STRING;
            }
            default: {
                throw new RuntimeException("Unknown producer schema.");
            }
        }
    }
}

1.4、创建消费者消息处理聚合器

我们通过postProcessBeforeInitialization方法以及将全部带有TdmqConsumer注解的对象收集起来,接下来我们定义个消费者消息处理器,来出来这些Bean对象,这里也是本篇文章的核心内容。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 消息处理聚合器$
 * @Date 2022/1/3 11:09 上午
 **/
@Component
@DependsOn({"pulsarClient"})
public class ConsumerAggregator implements EmbeddedValueResolverAware {
    private final ConsumerCollector consumerCollector;
    private final PulsarClient pulsarClient;
    private final static SubscriptionType DEFAULT_SUBSCRIPTION_TYPE = SubscriptionType.Exclusive;
    private final TdmqProperties tdmqProperties;
    private StringValueResolver stringValueResolver;
    private List<Consumer> consumers;

    public ConsumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
        this.consumerCollector = consumerCollector;
        this.pulsarClient = pulsarClient;
        this.tdmqProperties = tdmqProperties;
    }

    /**
     * 待spring上下文启动完毕后,加载注解init()方法
     */
    @EventListener(ApplicationReadyEvent.class)
    public void init() {
        //获取收集器中所有的消费者对象
        consumers = consumerCollector.getConsumers().entrySet().stream()
                .map(holder -> subscribe(holder.getKey(), holder.getValue()))
                .collect(Collectors.toList());
    }

    /**
     * 消费者消息监听处理类
     *
     * @param generatedConsumerName 消费者名称
     * @param holder                绑定关系
     * @return
     */
    private Consumer<?> subscribe(String generatedConsumerName, ConsumerHolder holder) {
        try {
            //从注解中获取消费名称
            final String consumerName = stringValueResolver.resolveStringValue(holder.getAnnotation().consumerName());
            //从注解中获取订阅名称
            final String subscriptionName = stringValueResolver.resolveStringValue(holder.getAnnotation().subscriptionName());
            //从注解中获取队列topic名称
            final String topicName = stringValueResolver.resolveStringValue(holder.getAnnotation().topic());
            //获取消费者类型--参考官方文档类型说明
            final SubscriptionType subscriptionType = getSubscriptionType(holder);
            //通过pulsarClient构建consumerBuilder
            final ConsumerBuilder<?> consumerBuilder = pulsarClient
                    .newConsumer()
                    .consumerName(consumerName)
                    .subscriptionName(subscriptionName)
                    .topic(buildTopicUrl(topicName))
                    .subscriptionType(subscriptionType)
                    .messageListener((consumer, msg) -> {
                        try {
                            //从绑定关系中获取需执行的方法
                            final Method method = holder.getHandler();
                            method.setAccessible(true);
                            //通过反射执行注解所在的方法,并将监听到的消息作为参数进行传递
                            method.invoke(holder.getBean(), msg.getValue());
                            //消息执行后手动ack消息
                            consumer.acknowledge(msg);
                        } catch (Exception e) {
                            //消息处理执行异常,进行negativeAcknowledge操作
                            consumer.negativeAcknowledge(msg);
                        }
                    });
            buildDeadLetterPolicy(holder, consumerBuilder);
            return consumerBuilder.subscribe();
        } catch (PulsarClientException e) {
            //应该自定义异常,这里暂时不做处理
            throw new RuntimeException(e);
        }
    }

    private SubscriptionType getSubscriptionType(ConsumerHolder holder) {
        SubscriptionType subscriptionType = Arrays.stream(holder.getAnnotation().subscriptionType())
                .findFirst().orElse(null);
        if (subscriptionType == null) {
            subscriptionType = DEFAULT_SUBSCRIPTION_TYPE;
        }
        return subscriptionType;
    }

    public void buildDeadLetterPolicy(ConsumerHolder holder, ConsumerBuilder<?> consumerBuilder) {
        DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterBuilder =
                DeadLetterPolicy.builder().maxRedeliverCount(-1);
    }


    public List<Consumer> getConsumers() {
        return consumers;
    }


    @Override
    public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
        this.stringValueResolver = stringValueResolver;
    }

    public String buildTopicUrl(String topic) {
        return tdmqProperties.getClusterId() + "/" + tdmqProperties.getEnvironmentId() +
                "/" + topic;
    }
}

1.5、创建生产者工厂和模版处理类

1.5.1、创建生产者工厂接口`IProducerFactory`
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 生产者工程接口$
 * @Date 2021/12/30 7:55 下午
 **/
public interface IProducerFactory {
    Map<String, ImmutablePair<Class<?>, String>> getTopics();
}
1.5.2、创建工程对象
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description $
 * @Date 2022/1/3 1:27 下午
 **/
@TdmqProducer
public class ProducerFactory implements IProducerFactory {

    private final Map<String, ImmutablePair<Class<?>, String>> topics = new HashMap<>();

    public ProducerFactory addProducer(String topic) {
        return addProducer(topic, byte[].class, "JSON");
    }

    public ProducerFactory addProducer(String topic, Class<?> clazz) {
        topics.put(topic, new ImmutablePair<>(clazz, "JSON"));
        return this;
    }

    public ProducerFactory addProducer(String topic, Class<?> clazz, String serialization) {
        topics.put(topic, new ImmutablePair<>(clazz, serialization));
        return this;
    }

    @Override
    public Map<String, ImmutablePair<Class<?>, String>> getTopics() {
        return topics;
    }
}
1.5.3、构建TdmqTemplate
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 模版工具类$
 * @Date 2022/1/3 1:42 下午
 **/
public class TdmqTemplate<T> {
    private final ProducerCollector producerCollector;

    public TdmqTemplate(ProducerCollector producerCollector) {
        this.producerCollector = producerCollector;
    }

    /**
     * 发送消息接口
     *
     * @param topic 队列
     * @param msg   消息内容
     * @return
     * @throws PulsarClientException
     */
    public MessageId send(String topic, T msg) throws PulsarClientException {
        return producerCollector.getProducer(topic).send(msg);
    }

    /**
     * 异步发送消息接口
     *
     * @param topic   队列
     * @param message 消息内容
     * @return
     */
    public CompletableFuture<MessageId> sendAsync(String topic, T message) {
        return producerCollector.getProducer(topic).sendAsync(message);
    }

    /**
     * 构建消息
     *
     * @param topic   队列
     * @param message 消息内容
     * @return
     */
    public TypedMessageBuilder<T> createMessage(String topic, T message) {
        return producerCollector.getProducer(topic).newMessage().value(message);
    }
}

1.6、整合生产消息者配置

将生产者和消费者配置到TdmqAutoConfiguration文件中,完整的TdmqAutoConfiguration内容如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description Mq自动装配类$
 * @Date 2022/1/2 9:59 上午
 **/
@Slf4j
@Data
@EnableConfigurationProperties({TdmqProperties.class})
public class TdmqAutoConfiguration {
    /**
     * Pulsar 客户端
     * 推荐一个进程一个实例
     *
     * @return {@link TdmqAutoConfiguration}
     */
    @Bean
    @ConditionalOnMissingBean(PulsarClient.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public PulsarClient pulsarClient(TdmqProperties mqProperties) throws PulsarClientException {
        log.info("-----------------");
        return PulsarClient.builder()
                .serviceUrl(mqProperties.getServiceUrl())
                .authentication(AuthenticationFactory.token(mqProperties.getToken()))
                .build();
    }

    /**
     * 配置消费者收集器
     *
     * @return
     */
    @Bean
    @ConditionalOnMissingBean(ConsumerCollector.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public ConsumerCollector consumerCollector() {
        return new ConsumerCollector();
    }

    /**
     * 配置消费者消费者消息处理聚合器
     *
     * @param consumerCollector 配置消费者收集器
     * @param pulsarClient      pulsarClient 客户端
     * @param tdmqProperties    配置信息
     * @return
     */
    @Bean
    @ConditionalOnMissingBean(ConsumerAggregator.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public ConsumerAggregator consumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
        return new ConsumerAggregator(consumerCollector, pulsarClient, tdmqProperties);
    }

    /**
     * 配置生产者收集器
     *
     * @param pulsarClient   pulsarClient 客户端
     * @param tdmqProperties 配置信息
     * @return
     */
    @Bean
    @ConditionalOnMissingBean(ProducerCollector.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public ProducerCollector producerCollector(PulsarClient pulsarClient,
                                               TdmqProperties tdmqProperties) {
        return new ProducerCollector(pulsarClient, tdmqProperties);
    }

    /**
     * 生产者消息模版
     *
     * @param producerCollector 生产者收集器
     * @return
     */
    @Bean
    @ConditionalOnMissingBean(TdmqTemplate.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public TdmqTemplate pulsarTemplate(ProducerCollector producerCollector) {
        return new TdmqTemplate(producerCollector);
    }

}

二、使用案例

我们这里使用自定义的TdmqConsumerTdmqTemplate来完成一个生产消费者的案例。

2.1、创建生产者配置类

创建生产者配置类ProducerConfiguration,该配置类,主要将消息队列队列名称绑定到ProducerFactory上下文中,我们可以通过TdmqTemplate去直接使用。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description $
 * @Date 2022/1/3 1:58 下午
 **/
@Configuration
public class ProducerConfiguration {
    /**
     * 队列名称
     */
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @Bean
    public ProducerFactory producerFactory() {
        //将队列添加到ProducerFactory上下文中
        return new ProducerFactory()
                .addProducer(MESSAGE_LOGGING_TOPIC, String.class);
    }
}

创建消费者监听

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 消息队列消费者$
 * @Date 2022/1/3 2:01 下午
 **/
@Slf4j
@Service
public class MessageLoggingListener {
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @TdmqConsumer(topic = MESSAGE_LOGGING_TOPIC, consumerName = "message_logging", clazz = String.class, subscriptionName = "message_logging_es")
    void consume(String msg) {
        log.info("------------{}", msg);
    }
}

去除之前消费者TdmqConsumer.class.

修改单元测试SpringBootStarterTdmqApplicationTests

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
@SpringBootTest
class SpringBootStarterTdmqApplicationTests {
    @Autowired
    private TdmqTemplate proucer;

    @Test
    public void producer() throws PulsarClientException {
        MessageId messageId = proucer.send("message_logging", "发送消息测试");
        log.info("send msg is success Id = {}", messageId);
    }

}

将之前的TdmqProucer改为TdmqTemplate;

启动单元测试:

查看测试结果:

测试结果

三、说明:

1、配置生产者工厂

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description $
 * @Date 2022/1/3 1:58 下午
 **/
@Configuration
public class ProducerConfiguration {
    /**
     * 队列名称
     */
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @Bean
    public ProducerFactory producerFactory() {
        //将队列添加到ProducerFactory上下文中
        return new ProducerFactory()
                .addProducer(MESSAGE_LOGGING_TOPIC, String.class);
    }
}

2、创建消费者实现类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @Author julyWhj
 * @Description 消息队列消费者$
 * @Date 2022/1/3 2:01 下午
 **/
@Slf4j
@Service
public class MessageLoggingListener {
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @TdmqConsumer(topic = MESSAGE_LOGGING_TOPIC, consumerName = "message_logging", clazz = String.class, subscriptionName = "message_logging_es")
    void consume(String msg) {
        log.info("------------{}", msg);
    }
}

主要在方法上增加TdmqConsumer注解。

3、`生产者TdmqTemplate`模版使用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
@SpringBootTest
class SpringBootStarterTdmqApplicationTests {
    @Autowired
    private TdmqTemplate proucer;

    @Test
    public void producer() throws PulsarClientException {
        MessageId messageId = proucer.send("message_logging", "发送消息测试");
        log.info("send msg is success Id = {}", messageId);
    }

}

4、使用配置文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
tdmq:
  enable: true
  serviceUrl: serviceUrl
  token: token
  clusterId: clusterId
  environmentId: environmentId

源码地址:

hongjieWang/spring-boot-starter-tdmq: spring-boot-starter-tdmq (github.com)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【最佳实践】巡检项:TDMQ备份消费者检查
在 TDMQ Pulsar 版控制台中,订阅代表一个具体的消费者以及其对某个 Topic 的订阅关系。当一个消费者订阅了某个 Topic 之后,则该 Topic 下的消息均可以被其消费。一个订阅可以订阅多个 Topic ,例如用户在一个 Topic 下创建了一个订阅后,其不仅会订阅当前的 Topic,还会订阅系统自动创建的重试队列 Topic。
邓愉悦
2022/03/29
4510
200 行代码告诉你 TDMQ 中 Pulsar 广播如何实现
导读 Pulsar 作为 Apache 社区的相对新的成员,在业界受到非常大量的关注。新产品的文档相对不齐全也是非常能够理解的。今天客户问过来广播怎么实现的,我解释了半天,又找了很多介绍产品的 PPT,最终也没有找到“官方”的文档说明这个事情。于是我就写了这篇文章,方便大家 copy/paste 。 作者介绍 徐为 腾讯云微服务团队高级解决方案构架师 毕业于欧盟 Erasmus Mundus IMMIT,获得经济和IT管理硕士学位 自2006年以来,曾就职于SonyEricsson、SAP等
腾讯云中间件团队
2021/04/01
2K0
基于腾讯云tdmq消息队列封装SpringBootStarter(一)
创建好集群后,在命名空间中新建命名空间,命名空间名称可以根据实际业务场景进行区分,比如这里创建可以根据测试环境、预发布环境、生产环境等进行区分创建。
JulyWhj
2022/01/03
3K0
究极缝合怪 | Pulsar核心概念和特性解读
Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
王知无-import_bigdata
2022/03/11
2.1K0
究极缝合怪 | Pulsar核心概念和特性解读
【最佳实践】巡检项:TDMQ死信队列检查
死信队列是一种特殊的消息队列,用于集中处理无法被正常消费的消息的队列。当消息在重试队列中达到一定重试次数后仍未能被正常消费,TDMQ Pulsar 版会判定这条消息在当前情况下无法被消费,将其投递至死信队列。
邓愉悦
2022/03/29
9410
SpringBoot整合分布式消息平台Pulsar
作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端。
jinjunzhu
2022/09/23
7470
SpringBoot整合分布式消息平台Pulsar
分布式消息中间件TDMQ架构及使用案例最佳实践
TDMQ是基于pulsar的金融级分布式消息中间件,是一个具备跨域、高可用、高并发的MQ。拥有原生的java、C++,Python,Go API,同时支持多种协议的接入(kafka、AMQP等)。同时支持 Kafka 协议以及 HTTP Proxy 方式接入,可为分布式应用系统提供异步解耦和削峰填谷的能力,具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。目前TDMQ已逐步成为新一代分布式云上消息中间件。能够很好的兼容和满足客户丰富的业务场景。
邓愉悦
2020/10/26
2K0
搭建单体SpringBoot项目 集成Kafka消息队列
通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。
郭顺发
2023/07/07
6020
手把手教学--从Pulsar到TDMQ
导语:介于TDMQ还没有公网的访问功能,不可能买台CVM安装windows吧,VPN又只能支持协议类型: IKE/IPsec,意思是企业用户才能用,对于个人就只能再想办法了,但办法总比问题多。本地开发测试环境使用pulsar的单机版,生产使用TDMQ,这样怎么样,一起来看看怎么配置。
沐榕樰
2020/12/03
1.9K0
博文推荐|整合 Spring 与 Pulsar,在 Java 中构建微服务
本文我们来探讨如何在 Java 框架——Spring 中整合 Apache Pulsar。文章阐述如何在 Java 中构建基于 Spring 的微服务。在正文内容开始前,我们先介绍 Spring。Spring 是 Java 生态中鼎鼎有名的技术框架,自诞生已有近 20 年历史。Spring 提供了极为方便的装配与控制机制,极大地降低了构建应用的难度。有了 Spring,开发者无需堆砌非业务相关的重复模板代码。基于 Spring,开发者可以如鱼得水般快速开发微服务应用,包括各类 REST API、Web 应用程序、控制台应用程序等。推荐大家深入研究 Spring。
从大数据到人工智能
2022/08/30
1.4K0
博文推荐|整合 Spring 与 Pulsar,在 Java 中构建微服务
Spring Boot 整合 Kafka 详解
本文将详细介绍如何在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的配置、消息的同步和异步发送。
九转成圣
2024/08/09
7700
Pulsar中间件入门学习
Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。最初是由 Yahoo 开发,目前由 Apache 软件基金会 管理。是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。
java技术爱好者
2022/04/18
7290
Pulsar中间件入门学习
号称下一代消息中间件!来看看它有多牛逼
最近这个 Apache Pulsar 消息中间件非常的火,号称下一代消息中件,今天,就一起来看看它到底有多牛逼?
民工哥
2021/07/16
5560
号称下一代消息中间件!来看看它有多牛逼
Pulsar入门学习手册
Pulsar入门学习手册 https://cloud.tencent.com/developer/article/2276612?shareByChannel=link 前言 Apache Pulsa
疯狂的KK
2023/07/14
1.4K0
Pulsar入门学习手册
Flink 基于 TDMQ for Apache Pulsar 的离线场景使用实践
Apache Flink 是一个开源的流处理和批处理框架,具有高吞吐量、低延迟的流式引擎,支持事件时间处理和状态管理,以及确保在机器故障时的容错性和一次性语义。Flink 的核心是一个分布式流数据处理引擎,支持 Java、Scala、Python 和 SQL 编程语言,可以在集群或云环境中执行数据流程序。它提供了 DataStream API 用于处理有界或无界数据流,DataSet API 用于处理有界数据集,以及 Table API 和 SQL 接口用于关系型流和批处理。目前 Flink 最新已经迭代至 1.20 版本,在此过程中不光是 Flink 框架,插件本身也有部分 API 以及配置存在变更,本文主要针对较高版本的 1.17 Flink Pulsar 插件进行测试验证,目前 Flink 版本如下:https://nightlies.apache.org/flink/
腾讯云中间件团队
2024/05/31
4000
Flink 基于 TDMQ for Apache Pulsar 的离线场景使用实践
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
通过简单代码demo进行讲解,pulsar在java中如何使用?如何通过pulsar进行异步解耦?......等
JavaDog程序狗
2024/09/19
3170
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
高性能消息队列中间件MQ_part2
之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。
天天Lotay
2023/02/16
4720
高性能消息队列中间件MQ_part2
消息队列中间件(二)使用 ActiveMQ
Active MQ 是由 Apache 出品的一款流行的功能强大的开源消息中间件,它速度快,支持跨语言的客户端,具有易于使用的企业集成模式和许多的高级功能,同时完全支持 JSM1.1 和 J2EE1.4 。
未读代码
2019/11/04
1.8K0
消息队列中间件(二)使用 ActiveMQ
Apache Pulsar 技术系列 - Pulsar事务实现原理
导语 Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。腾讯云MQ Oteam Pulsar工作组对 Pulsar 做了深入调研以及大量的性能和稳定性方面优化,目前已经在TDBank、腾讯云TDMQ落地上线。本篇将简单介绍Pulsar服务端消息确认的一些概念和原理,欢迎大家阅读。 作者简介  林琳                                                           
腾讯云中间件团队
2022/03/03
2.1K0
Flink 基于 TDMQ for Apache Pulsar 的离线场景使用实践
Apache Flink 是一个开源的流处理和批处理框架,具有高吞吐量、低延迟的流式引擎,支持事件时间处理和状态管理,以及确保在机器故障时的容错性和一次性语义。Flink 的核心是一个分布式流数据处理引擎,支持 Java、Scala、Python 和 SQL 编程语言,可以在集群或云环境中执行数据流程序。它提供了 DataStream API 用于处理有界或无界数据流,DataSet API 用于处理有界数据集,以及 Table API 和 SQL 接口用于关系型流和批处理。目前 Flink 最新已经迭代至 1.20 版本,在此过程中不光是 Flink 框架,插件本身也有部分 API 以及配置存在变更,本文主要针对较高版本的 1.17 Flink Pulsar 插件进行测试验证,目前 Flink 版本如下:https://nightlies.apache.org/flink/
腾讯云开发者
2024/07/02
3390
Flink 基于 TDMQ for Apache Pulsar 的离线场景使用实践
相关推荐
【最佳实践】巡检项:TDMQ备份消费者检查
更多 >
LV.1
北京聚点点科技有限公司架构师
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验