第04课:整合常用技术框架之

第04课:整合常用技术框架之 MongoDB 和 RabbitMQ

在第03课中,主要为各位展示了在工作中如何使用 JPA,Redis的相关知识。本课中主要为各位演示在工作中如何使用 MongoDB,RabbitMQ。

Spring Boot 整合 MongoDB

MongoDB 是一个基于分布式文件存储的数据库,它是一个介于关系数据库和非关系数据库之间的产品,其主要以 key、value 方式存储数据;其支持的数据结构非常松散,是类似 JSON 的 BJSON 格式,因此其存储数据非常灵活。随着近几年软件行业的蓬勃发展,用户的需求、业务的多样化,引发了软件系统自身数据的多样化,从而使 MongoDB 成为 NoSQL 数据库中的皎皎者。

传统关系数据库主要由数据库、表、记录三个层次概念组成,而 MongoDB 是由数据库、集合、文档三个层次组成。MongoDB 相对于关系型数据库里的表,但是集合中没有列、行和关系概念,这体现了其存储数据非常灵活的特点。

MongoDB 适合对大量或者无固定格式的数据进行存储,如日志、缓存等。对事物支持较弱,不适用于复杂的多集合的级联查询。

CentOS 7安装 MongoDB

1. 新建目录 /usr/local/software/,执行命令 curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-3.2.12.tgz 下载安装文件;

2. 执行命令解压:tar -zxvf mongodb-linux-x86_64-3.2.12.tgz

3. 将解压后文件移动到指定目录:mv mongodb-linux-x86_64-3.2.12/ /usr/local/mongodb

4. 接着,执行命令:mkdir -p /data/db /logs

5. 执行命令 cd bin,新建配置文件 vim mongodb.conf 并添加如下内容之后保存:

dbpath = /data/db #数据文件存放目录
logpath = /logs/mongodb.log #日志文件存放目录
port = 27017  #端口
fork = true  #以守护程序的方式启用,即在后台运行
nohttpinterface = true
auth=false
bind_ip=0.0.0.0

6. 修改环境变量:vi /etc/profile

export MONGODB_HOME=/usr/local/mongodb
export PATH=$PATH:$MONGODB_HOME/bin

保存后,重启系统配置:source /etc/profile

7. 进入目录 /usr/local/mongodb/bin,启动 mongod -f mongodb.conf

8. 执行命令./mongo

9. 执行命令创建数据库 use students,并添加用户:db.createUser({user:"root",pwd:"123456",roles:[{role:"readWrite",db:"students"}]})

下面接着演示在工作中如何将 Spring Boot 和 MongoDB 进行整合。

1. 在 pom 文件中添加依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>    
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

2. 在 application.yml 配置文件中添加 MongoDB 访问配置信息:

spring:
  data:
    mongodb:
      uri: mongodb://root:hcb13579@192.168.1.120:27017/students

3. 当前利用 Spring Boot 操作 MongoDB 比较常用的两种方式分别为:直接在 Dao 中注入 MongoTemplate 对象,及继承 MongoRepository。由于利用继承MongoRepository的方式和我们之前继承 JpaRepository 操作数据库非常类似。本课中将为大家演示如何利用 MongoTemplate 操作 MongoDB。

添加学生数据实体:

public class Student {
    private String id;
    private String studentName;
    private Integer age;

    public Student(){}

    public Student(String id, String studentName, Integer age) {
        this.id = id;
        this.studentName = studentName;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setSid(String id) {
        this.id = id;
    }

    public String getStudentName() {
        return studentName;
    }

    public void setStudentName(String studentName) {
        this.studentName = studentName;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

定义添加、删除、查询、修改学生信息接口:

public interface IStudentService {
    /**
     * 新增学生信息
     * @param student
     */
    void addStudent(Student student);

    /**
     * 删除学生信息
     * @param id
     */
    void deleteStudentByIs(String id);

    /**
     * 根据主键查询学生信息
     * @param id
     * @return
     */
    Student findById(String id);

    /**
     * 根据学生姓名查询学生信息
     * @param studentName
     * @return
     */
    Student findStudentByStudentName(String studentName);


    /**
     * 更新学生信息
     * @param student
     */
    void updateStudentInfo(Student student);
}

定义添加、删除、查询、修改学生信息接口实现类:

@Service("stdentService")
public class StudentServiceImpl implements IStudentService {
    @Autowired
    private MongoTemplate mongoTemplate;

    @Override
    public void addStudent(Student student) {
        mongoTemplate.save(student);
    }

    @Override
    public void deleteStudentById(String id) {
        Query query = new Query(Criteria.where("id").is(id));

        mongoTemplate.remove(query, Student.class);
    }

    @Override
    public Student findById(String id) {
        return mongoTemplate.findById(id, Student.class);
    }

    @Override
    public Student findStudentByStudentName(String studentName) {
        Query query = new Query(Criteria.where("studentName").is(studentName));
        return mongoTemplate.findOne(query, Student.class);
    }

    @Override
    public void updateStudentInfo(Student student) {
        Query query = new Query(Criteria.where("id").is(student.getId()));

        Update update = new Update().set("userName", student.getStudentName()).
                set("age", student.getAge());

        UpdateResult updateResult = mongoTemplate.updateFirst(query, update, Student.class);
    }
}

添加 Controller 层,启动程序测试:

@RestController
@RequestMapping("/student")
public class StudentController {
    @Autowired
    private IStudentService studentService;

    @RequestMapping("/addStudent")
    public String addStudent(String studentName, Integer age) {
        String msg = "success";

        try {
            String id = UUID.randomUUID().toString();
            Student student = new Student(id, studentName, age);

            studentService.addStudent(student);
        } catch (Exception e) {
            msg = "fail";
        }

        return msg;
    }

    @RequestMapping("/getStudentWithId")
    public Student getStudentWithId(String id) {
        try {
            return studentService.findById(id);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

        return null;
    }



    @RequestMapping("/deleteStudentById")
    public String deleteStudentById(String id) {
        try {
            studentService.deleteStudentById(id);
        } catch (Exception e) {
            System.out.println(e.getMessage());

            return "fail";
        }

        return "success";
    }

    @RequestMapping("/findStudentByStudentName")
    public Student findStudentByStudentName(String studentName) {
        try {
            return studentService.findStudentByStudentName(studentName);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

        return null;
    }

    @RequestMapping("/updateStudentInfo")
    public String updateStudentInfo(String id,String studentName,Integer age) {
        try {
            Student student = new Student(id,studentName,age);

            studentService.updateStudentInfo(student);
        } catch (Exception e) {
            System.out.println(e.getMessage());
            return "fail";
        }

        return "success";
    }
}

在实际开发工作中,除了上面的基本操作外,分页查询也很重要。

下面为各位演示下如何实现分页功能。

排序、分页接口定义:

 /**
     * 分页查询
     * @param page
     * @param size
     * @return
     */
List<Student> queryStudentList(String sortFiled,Integer page,Integer size);

排序、分页接口实现:

@Override
public List<Student> queryStudentList(String sortFiled,Integer page, Integer size) {
        Query query = new Query();

        query.with(new Sort(Sort.Direction.DESC,sortFiled));

        // 数量
        long total = mongoTemplate.count(query,Student.class);

        // 分页
        query.skip((page - 1) * size).limit(size);

        List<Student> students = mongoTemplate.find(query, Student.class);

        return students;
    }

控制层添加排序、分页查询接口:

 @RequestMapping("/queryStudentList")
 public List<Student> queryStudentList(String sortFiled,Integer page, Integer size){
        return studentService.queryStudentList(sortFiled,page,size);
    }

利用 Postman 进行测试:

enter image description here

Spring Boot 整合 RabbitMQ

RabbitMQ 是由 Erlang 语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为面向消息的中间件)。其支持 Windows、Linux/Unix、MAC OS 等操作系统和包括 Java 在内的多种编程语言。

AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计;基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品,不同的开发语言等条件的限制。

RabbitMQ 的重要概念有以下几个:

  • Broker:接收消息,分发消息应用;

  • Exchange:消息交换机;指定消息按照什么规则路由到哪个队列 Queue;

  • Queue:消息队列,存储消息的载体;

  • Binding:Exchange 和 Queue 之间的虚拟连接;Binding 中可以包含 RoutingKey,其信息被保存到 Exchange 中的查询表中,作为 Message 的分发依据;

  • RoutingKey:路由关键字,Exchange 根据 RoutingKey 将消息投递到对应的队列中;

  • Vhost:虚拟主机,一个 Broker 可以有多个虚拟主机,用作不同用户的权限分离;一个虚拟主机持有一组 Exchange、Queue 和 Binding;

  • Producer:消息生产者,主要将消息投递到对应的 Exchange 上面;

  • Consumer:消息消费者,消息的接收者,一般是独立的程序;

  • Channel:消息通道,也称信道。在客户端的每个连接里可以建立多个 Channel,每个 Channel 代表一个会话任务。

CentOS 7 安装 RabbitMQ

1. 添加 Erlang 源:vim /etc/yum.repos.d/rabbitmq-erlang.repo,文件中添加如下内容保存:

[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1

2. 新建目录:mkdir /usr/local/software

3. 下载 RabbitMQ rpm 安装文件:wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm

4. 安装 RabbitMQ Server:yum install -y rabbitmq-server-3.7.4-1.el7.noarch.rpm

5. 安装 RabbitMQ Web 管理界面并启动 RabbitMQ Server:rabbitmq-plugins enable rabbitmq_management systemctl start rabbitmq-server

6. 由于 RabbitMQ 默认用户 Guest 只能访问安装在 RabbitMQ 本机上的 Web 管理页面,因此当 RabbitMQ 安装在 Linux 服务器上时,需要做如下操作才能在别的机器上访问其 Web管理页面:

添加用户:rabbitmqctl add_user root 123456,其中 root 表示新添加用户名,123456 表示登录密码;

赋予用户权限:rabbitmqctl set_permissions -p "/" root '.*' '.*' '.*'

赋予用户角色:rabbitmqctl set_user_tags root administrator

查看 RabbitMQ 用户:rabbitmqctl list_users

7. 访问:http://192.168.1.120:15672,得到 RabbitMQ Web 管理页面:

enter image description here

此时,RabbitMQ 已经安装成功。

下面将为各位读者演示如何利用 Spring Boot 整合 RabbitMQ。

1. 在 pom 文件中添加依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 在 application.yml 配置文件中添加如下配置:

spring:
  rabbitmq:
    host: 192.168.1.120
    port: 5672
    username: root
    password: hcb13579

3. 在 RabbitMQ 中新建一个名为“rabbitmqQueue”的队列:

@Configuration
public class RabbitmqConfig {
    @Bean
    public Queue queue(){
        return new Queue("rabbitmqQueue");
    }
}

4. 利用 AmqpTemplate 向队列中发送消息:

@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqController {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping("/sendMessage")
    public String sendMessageToMq(String msg) {
        amqpTemplate.convertAndSend(CommonConstants.RABBITMQ_QUEUE_NAME, msg);

        return "send message successfully";
    }
}

5. 消费消息:

@Component
@RabbitListener(queues = {CommonConstants.RABBITMQ_QUEUE_NAME})
public class RabbitmqConsumer {

    @RabbitHandler
    public void consumeMessage(String msg){
        System.out.println("接受到的消息:"+ msg);
    }
}

6. 启动程序,利用 Postman 测试:

enter image description here

enter image description here

上面为各位演示的是一个生产者,一个消费者的情况,而在工作中往往不只是这种场景,有可能是一个生产者,多个消费者;有可能是多个生产者,多个消费者的场景。

下面就为各位演示下一个生产者,多个消费者的场景。

为了演示的方便,我在本项目中再新建一个类用于监听和生产者对应的同一队列:

@Component
@RabbitListener(queues = {CommonConstants.RABBITMQ_QUEUE_NAME})
public class SecondRabbitConsumer {

    @RabbitHandler
    public void consumeRabbitmqMessage(String msg){
        System.out.println("消费者2" + msg);
    }
}

重新启动程序,利用 Postman 向 RabbitMQ 发送多条消息,观察两个消费者对消息的消费情况:

enter image description here

从上面的消费截图可以看出,当一个生产者,两个消费者时,消息会均匀的分配给两个消费者进行消费。

下面演示下两个生产者,两个消费者对消息的生产、消费情况。

为了演示这种情况,在 RabbitmqController 中再增加一个 RESTful 风格的接口:

@RequestMapping("/sendSecondMessage")
    public String sendSecondMessage(String msg) {
        amqpTemplate.convertAndSend(CommonConstants.RABBITMQ_QUEUE_NAME, "生产者2:" + msg);

        return "send message successfully";
    }

重新启动程序,利用 Postman 分别调用模拟生产者的两个接口对消息的生产、消费情况进行测试:

enter image description here

通过以上测试,两个生产者产生的消息也是均匀的分配给两个消费者消费。

在实际工作中除了像上面一样使用 Spring Boot 整合 RabbitMQ 之外,常常因为一些特殊应用场景,我们还需要指定 Exchange 的类型。

Exchange(交换机)分四种类型:

  1. Direct:只有绑定时的 routing_key 与发送消息的 routing_key 完全匹配时,消息才会被交换器投送到绑定的队列中去;
  2. Topic:按规则转发消息;
  3. Headers:设置 Header Attribute 参数类型的交换机;
  4. Fanout:转发消息到所有绑定队列。

Direct 是 RabbitMQ Broker 默认 Exchange,当使用这个类型的 Exchange 时,可以不指定 RoutingKey 的名字。在此类型下创建的 Queue 有一个默认的 RoutingKey,这个 RoutingKey 一般与 Queue 名称相同。

创建 Direct 类型的交换机,并将交换机、队列分别利用 RoutingKey 进行绑定,代码如下:

@Configuration
public class RabbitmqConfig {
    private static final String DIRECT_ROUTING_KEY_RED = "color_red";
    private static final String DIRECT_ROUTING_KEY_BLACK = "color_black";

    @Bean
    public Queue queue(){
        return new Queue(CommonConstants.RABBITMQ_QUEUE_NAME);
    }

    @Bean
    public Queue redQueue(){
        return new Queue("redQueue");
    }


    @Bean
    public Queue blackQueue(){
        return new Queue("blackQueue");
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange("directExchange");
    }

    @Bean
    Binding bindingRedDirectExchange(Queue redQueue, DirectExchange exchange) {
        return BindingBuilder.bind(redQueue).to(exchange).with(this.DIRECT_ROUTING_KEY_RED);
    }

    @Bean
    Binding bindingBlackDirectExchange(Queue blackQueue, DirectExchange exchange) {
        return BindingBuilder.bind(blackQueue).to(exchange).with(this.DIRECT_ROUTING_KEY_BLACK);
    }
}

在 Controller 层中添加接口,分别向同一个交换机、不同的 RoutingKey 对应的队列发送消息:

 @RequestMapping("/sendRedColorMessage")
    public String sendRedColorMessage(String msg) {
        amqpTemplate.convertAndSend("directExchange","color_red",msg);
        return "send message successfully";
    }

    @RequestMapping("/sendBlackColorMessage")
    public String sendBlackColorMessage(String msg) {
        amqpTemplate.convertAndSend("directExchange","color_black",msg);
        return "send message successfully";
    }

启动程序、利用 Postman 分别调用上面两个接口进行发送消息测试:

enter image description here

enter image description here

enter image description here

enter image description here

Topic Exchange 是根据 RoutingKey 和 Exchange 的类型将 message 发送到一个或者多个 Queue 中,我们经常拿它来实现各种 Publish/Subscribe,即发布订阅。

创建 Topic 类型的交换机,并将交换机、队列分别利用 RoutingKey 进行绑定,代码如下:

@Bean
    public Queue queueMessageColor() {
        return new Queue("messageColor");
    }

    @Bean
    public Queue queueMessageColors() {
        return new Queue("messageColors");
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingTopicColorExchange(Queue queueMessageColor, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueMessageColor).to(topicExchange).with(TOPIC_ROUTING_COLOR);
    }

    @Bean
    Binding bindingTopicColorsExchange(Queue queueMessageColors, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueMessageColors).to(topicExchange).with("message.#");
    }

在 Controller 中添加向消息队列发送消息接口:

@RequestMapping("/sendTopicColorMessage")
    public String sendTopicColorMessage(String msg) {
        amqpTemplate.convertAndSend("topicExchange","message.color",msg);
        return "send message successfully";
    }

    @RequestMapping("/sendTopicColorsMessage")
    public String sendTopicColorsMessage(String msg) {
        amqpTemplate.convertAndSend("topicExchange","message.colors",msg);
        return "send message successfully";
    }

启动程序,利用 Postman 测试结果如下:

enter image description here

enter image description here

enter image description here

Fanout 类型交换机很简单,希望各位读者自行测试一下;而 Headers 类型交换机工作中用得不多,有兴趣的读者可以自己了解下。

RabbitMQ 之死信队列

在实际工作中,RabbitMQ 经常被用于多个系统间进行异步通信,以达到系统之间解耦的目的。但是在消息消费的过程中,不可避免会出现异常,如果此时不加任何处理的话,就可能一直对引起异常的消息一直消费,最终导致消息的大量堆积,使业务系统出现问题,死信队列是解决该问题的一种方法.

下面为各位演示在实际开发中,如何使用死信队列缓存帮助业务流程处理异常的消息。

1. 在配置文件中添加如下配置:

spring:
  rabbitmq:
    host: 192.168.1.120
    port: 5672
    username: root
    password: hcb13579
    listener:
      simple:
        #为了将引起异常的消息存入死信队列,该项必须配置为false
        default-requeue-rejected: false
        retry:
          #开启重试功能
          enabled: true
          #最大重试次数5次
          max-attempts: 5
          #重试间隔时间2秒
          initial-interval: 2000

2. 申明 mainQueue 及其对应的死信队列:

@Configuration
public class rabbitmqDeadLetter {
    //死信队列交换机名称
    final String DEAD_LETTER_EXCHANGE="dead_letter_exchange";
    //死信队列路由键
    final String DEAD_LETTER_ROUTING_KEY="dead_letter_routing_key";

    /**
     * 申明mainQueue队列并设置其死信队列相关参数
     * @return
     */
    @Bean
    public Queue maintainQueue() {
        Map<String,Object> args=new HashMap<>();
        // 设置该mainQueue的死信队列交换机
        args.put("x-dead-letter-exchange", this.DEAD_LETTER_EXCHANGE);
        // 设置mainQueue的死信队列路由键
        args.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY);

        return new Queue("mainQueue",true,false,false,args);
    }

    /**
     * 申明mainQueue队列交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("mainDirectExchange");
    }

    /**
     * 利用路由键将mainQueue与交换机绑定在一起
     * @return
     */
    @Bean
    public Binding maintainBinding() {
        return BindingBuilder.bind(maintainQueue()).to(directExchange())
                .with("mainQueue_routing_key");
    }

    /**
     * 申明死信队列
     * @return
     */
    @Bean
    public Queue deadLetterQueue(){
        return new Queue("dead_letter_queue");
    }

    /**
     * 申明死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(this.DEAD_LETTER_EXCHANGE, true, false);
    }

    /**
     * 利用路由键将dead_letter_queue与交换机绑定在一起
     * @return
     */
    @Bean
    public Binding deadLetterBindding(){
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).
                with(this.DEAD_LETTER_ROUTING_KEY);
    }
}

3. 在 RabbitmqController 中添加接口向 mainQueue 中发送消息:

 @RequestMapping("/sendMessageToMainQueue")
 public String sendMessageToMainQueue(){
        amqpTemplate.convertAndSend("mainDirectExchange",
                "mainQueue_routing_key","testMessage");

        return "success";
    }

4. 对 mainQueue 队列中消息进行消费,为了演示消费消息出现异常重试5次后进行死信队列,我们将模拟程序出现异常:

@Component
@RabbitListener(queues = {"mainQueue"})
public class Receiver {
    @RabbitHandler
    public void consumeMessage(String msg){
        try{
            System.out.println(msg);
            Integer.valueOf(msg);
        }
        catch (Exception e){
            System.out.println("exception ------ " +e.getMessage());
            throw new RuntimeException(e.getMessage());
        }
    }
}

5. 启动程序,利用 Postman 向 mainQueue 队列发送消息,观察测试结果:

enter image description here

消息重试5次后抛出异常:

enter image description here

登录 RabbitMQ Web 控制台,死信队列中有一条消息:

enter image description here

由于篇幅问题,利用 Spring Boot 整合 MongoDB、RabbitMQ 就介绍到这里了。RabbitMQ 的消息发送确认,消息消费确认将在后面利用 MQ 实现分布式事务相关课程中为各位详细介绍。

上一篇
下一篇
目录