Springboot Rabbitmq + 线程池技术控制指定数量task执行

news/2025/1/10 6:51:31 标签: java-rabbitmq, spring boot, rabbitmq

定义DataSyncTaskManager,作为线程池任务控制器

package org.demo.scheduletest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
public class DataSyncTaskManager {
    // 线程数
    private static final Integer threadNum = 5;

    private static DataSyncTaskManager taskManager = null;
    private static BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    private ThreadPoolExecutor taskExecutorPool;

    private DataSyncTaskManager() {
        taskExecutorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 构建唯一Manager对象单例
     *
     * @return
     */
    public static synchronized DataSyncTaskManager getManager() {
        if (null == taskManager) {
            taskManager = new DataSyncTaskManager();
        }

        return taskManager;
    }

    /**
     * 提交需要运行的任务
     *
     * @param task
     */
    public void submitTask(DataSyncTask task) {
        taskQueue.add(task);
        log.info("[DataSyncTaskManager] submitTask size={}", taskQueue.size());
    }

    public void runTaskDaemon() {
        log.info("[DataSyncTaskManager] runTaskDaemon start.");
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    Runnable task = taskQueue.take();
                    taskExecutorPool.submit(task);
                    // log.info("[DataSyncTaskManager] runTaskDaemon submit task={}", task);

                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("[startTaskRunningDaemon] task run InterruptedException", e);
                } catch (Exception e) {
                    log.error("[startTaskRunningDaemon] task run Exception", e);
                }
            }
        });

        thread.setName(this.getClass().getSimpleName());
        thread.start();
    }
}

定义DataSyncTask,作为具体任务执行方

package org.demo.scheduletest.service;

import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;


@Data
@Slf4j
public class DataSyncTask implements Runnable {
    private String name;
    private Channel channel;
    private long deliveryTag;

    public DataSyncTask(String name, Channel channel, long deliveryTag) {
        this.name = name;
        this.channel = channel;
        this.deliveryTag = deliveryTag;
    }

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        log.info("[DataSyncTask] run task start, name = {}", name);

        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        try {
            channel.basicAck(deliveryTag, true);
            log.info("[DataSyncTask] run task end, name = {}", name);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}

InitTask,服务启动执行Task管理器

package org.demo.scheduletest.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

/**
 * @author zhe.xiao
 * @version 1.0
 * @since 2025/1/9 上午11:38
 */
@Slf4j
@Component
public class InitTask implements ApplicationRunner {
    /**
     * Callback used to run the bean.
     *
     * @param args incoming application arguments
     * @throws Exception on error
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        DataSyncTaskManager.getManager().runTaskDaemon();
    }
}

配置Rabbitmq

package org.demo.scheduletest.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

/**
 * @author zhe.xiao
 * @date 2022-07-06 17:27
 * @description
 **/
@SpringBootConfiguration
public class MyRabbitTemplateConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host:/}")
    private String virtualhost;

    /**
     * 连接工厂
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        //connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    /**
     *
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }
}

package org.demo.scheduletest.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author zhe.xiao
 * @date 2022-07-05 14:55
 * @description
 **/
@Configuration
public class MyRabbitExecutor {
    //正常队列
    public static final String QUEUE_1 = "my:queue:1";
    public static final String EXCHANGE_1 = "my:exchange:1";
    public static final String ROUTEING_1 = "data:route:1";

    //死信队列
    public static final String QUEUE_DEAD_LETTER = "my:queue:deadLetter";
    public static final String EXCHANGE_DEAD_LETTER = "my:exchange:deadLetter";
    public static final String ROUTING_DEAD_LETTER = "data:route:deadLetter";

    // 提供 Queue
    @Bean
    Queue myQueue1(){
        HashMap<String, Object> args = new HashMap<>();
        //绑定死信队列信息
        args.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER);
        args.put("x-dead-letter-routing-key", ROUTING_DEAD_LETTER);


//        args.put("x-max-length", 5); //队列最大长度,超过了会进入死信队列
//         args.put("x-message-ttl", 5000); //如果5秒没被消费,则进入死信队列

        return new Queue(QUEUE_1, true, false, false, args);
    }

    // 提供 Exchange
    @Bean
    DirectExchange myExchange1(){
        return new DirectExchange(EXCHANGE_1, true, false);
    }

    // 创建一个Binding对象,将Exchange和Queue绑定在一起
    @Bean
    Binding myBinding1(){
        return BindingBuilder.bind(myQueue1()).to(myExchange1()).with(ROUTEING_1);
        // return BindingBuilder.bind(myQueue1()).to(myExchange1());
    }

    // 死信队列配置 QUEUE, EXCHANGE, BINDING
    @Bean
    Queue myQueueDeadLetter(){
        return new Queue(QUEUE_DEAD_LETTER, true, false, false);
    }

    @Bean
    DirectExchange myExchangeDeadLetter(){
        return new DirectExchange(EXCHANGE_DEAD_LETTER, true, false);
    }

    @Bean
    Binding myBindingDeadLetter(){
        return BindingBuilder.bind(myQueueDeadLetter()).to(myExchangeDeadLetter()).with(ROUTING_DEAD_LETTER);
    }
}

Rabbitmq消费者通过task控制器提交执行任务

package org.demo.scheduletest.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.demo.scheduletest.service.DataSyncTask;
import org.demo.scheduletest.service.DataSyncTaskManager;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * 消费QUEUE
 *
 * @author zhe.xiao
 * @date 2022-07-05 14:57
 * @description
 **/
@Slf4j
@Component
public class MyReceiver {
    @RabbitListener(queues = MyRabbitExecutor.QUEUE_1)
    public void handler1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            log.info("handler1 process: {}", data);
            DataSyncTask dataSyncTask = new DataSyncTask(data, channel, deliveryTag);
            DataSyncTaskManager.getManager().submitTask(dataSyncTask);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}


http://www.niftyadmin.cn/n/5818305.html

相关文章

线性表的接口定义及使用

定义接口 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace _001_线性表 {interface IListDS<T>//定义接口{int GetLength();void Clear();bool IsEmpty();void Add(T item);void Inser…

Linux自定义分隔符

在 Shell 脚本中&#xff0c;delimiteraa 这种语法的意思是将字符串 aa 赋值给变量 delimiter。这里的 aa 只是一个普通的字符串&#xff0c;作为分隔符 (delimiter) 用来在脚本中作为自定义的分隔符。 解释 delimiter: 这是一个变量名。 aa: 这是一个字符串&#xff0c;包含…

详解Sonar与Jenkins 的集成使用!

本文阅读前提 本文假设读者熟悉Jenkins和SonarQube的基础操作。 核心实现功能 Jenkins中运行的job来调用SonarScanner&#xff0c;最后可实现测试结果与SonarQube中同步查看。 Jenkins中安装Sonar相关插件 配置Sonarqube Dashboard>Manage Jenkins>Systems 指定son…

基于mybatis-plus历史背景下的多租户平台改造

前言 别误会&#xff0c;本篇【并不是】 要用mybatis-plus自身的多租户方案&#xff1a;在表中加一个tenant_id字段来区分不同的租户数据。并不是的&#xff01; 而是在假设业务系统已经使用mybatis-plus多数据源的前提下&#xff0c;如何实现业务数据库隔开的多租户系统。 这…

有关Redis的相关概述

一、Redis概述 1.1 Redis简介 Redis是一个开源的高性能键值对数据库&#xff0c;使用C语言编写&#xff0c;支持多种数据结构&#xff0c;如字符串&#xff08;String&#xff09;、列表&#xff08;List&#xff09;、哈希&#xff08;Hash&#xff09;、集合&#xff08;Set…

一文讲清计算机中的镜像,以及其在计算机中的作用

一、什么是计算机中的镜像 在计算机中&#xff0c;镜像&#xff08;Computer Image&#xff09;是对系统、磁盘、光盘或应用程序的完整复制或备份&#xff0c;它包含了所有的数据、文件系统、配置和应用程序。镜像技术广泛应用于系统备份、恢复、数据迁移、虚拟化以及软件部署…

9.4 visualStudio 2022 配置 cuda 和 torch (c++)

一、配置torch 1.Libtorch下载 该内容看了【Libtorch 一】libtorchwin10环境配置_vsixtorch-CSDN博客的博客&#xff0c;作为笔记用。我自己搭建后可以正常运行。 下载地址为windows系统下各种LibTorch下载地址_libtorch 百度云-CSDN博客 下载解压后的目录为&#xff1a; 2.vs…

npm i 报错

nodejs中 使用npm install命令时报错 npm err! file C: \user\admin\package.json_package.json 里缺少 description 和 repository 两个n字段。-CSDN博客