Spring Boot关闭时,如何确保内存里面的mq消息被消费完?

一、背景

在 《Spring Boot集成Disruptor快速入门Demo》 的讨论中,有网友提出两个核心问题:

  1. Spring Boot应用停机时,如何保证内存中的消息全部处理完成?
  2. Disruptor与Guava EventBus的差异(本文主要聚焦第一个问题)。

二、解决方案

(一)禁用强制终止命令 kill -9

原因

  • 数据丢失:强制终止进程会导致未处理的内存消息丢失。
  • 资源泄漏:无法释放内存、文件句柄或网络连接等资源。
  • 跳过清理逻辑:应用无法执行关闭前的清理操作(如关闭数据库连接、写入日志)。

替代方案:使用优雅停机机制。

(二)Spring Boot优雅停机方案

1. 引入Actuator依赖

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-actuator</artifactId>  
</dependency>  

2. 配置Shutdown端点

management:  
  endpoints:  
    web:  
      exposure:  
        include: "*"  # 暴露所有端点  
  endpoint:  
    shutdown:  
      enabled: true  # 启用shutdown端点  
server:  
  port: 8088  # 自定义端口  

3. 禁止停机后API继续对外服务(Spring Boot 2.3以下适用)

通过过滤器返回 503 Service Unavailable 状态码:

package com.et.disruptor.config;  

import org.springframework.stereotype.Component;  
import javax.servlet.*;  
import javax.servlet.http.HttpServletResponse;  
import java.io.IOException;  
import java.util.concurrent.atomic.AtomicBoolean;  

@Component  
public class GracefulShutdownFilter implements Filter {  
    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);  

    @Override  
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)  
            throws IOException, ServletException {  
        if (shuttingDown.get()) {  
            ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);  
            return;  
        }  
        chain.doFilter(request, response);  
    }  

    public void startShutdown() {  
        shuttingDown.set(true);  
    }  
}  

4. 实现 DisposableBean 接口(自定义清理逻辑)

package com.et.disruptor.config;  

import org.springframework.beans.factory.DisposableBean;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  

@Component  
public class GracefulShutdownManager implements DisposableBean {  
    @Autowired  
    private GracefulShutdownFilter shutdownFilter;  
    @Autowired  
    private MqManager mqManager;  

    @Override  
    public void destroy() throws Exception {  
        // 1. 拒绝新请求  
        shutdownFilter.startShutdown();  
        // 2. 优雅关闭Disruptor(等待消息处理完成)  
        mqManager.shutdownDisruptor();  
        // 3. 等待自定义任务完成(如CountDownLatch)  
        waitForTasksToComplete();  
    }  

    private void waitForTasksToComplete() throws InterruptedException {  
        System.out.println("Waiting for tasks to complete...");  
        // 示例:模拟任务等待(实际使用线程同步工具)  
        Thread.sleep(100000);  
    }  
}  

(三)Disruptor优雅关闭机制

1. 手动调用 shutdown() 方法

package com.et.disruptor.config;  

import com.lmax.disruptor.Disruptor;  
import org.springframework.context.annotation.Configuration;  

@Configuration  
public class MqManager {  
    private static Disruptor<MessageModel> disruptor;  

    // 关闭逻辑  
    public void shutdownDisruptor() {  
        if (disruptor != null) {  
            System.out.println("Closing Disruptor...");  
            disruptor.shutdown(); // 等待所有消息处理完成  
        }  
    }  
}  

2. 使用 @PreDestroy 注解(自动触发关闭)

@PreDestroy  
public void shutdownDisruptor() {  
    // 同上关闭逻辑  
}  

3. shutdown() 源码解析

public void shutdown(long timeout, TimeUnit timeUnit) throws TimeoutException {  
    long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);  
    do {  
        if (!hasBacklog()) { // 检查是否有未处理消息  
            halt(); // 无消息则终止  
            return;  
        }  
    } while (timeout < 0 || System.currentTimeMillis() <= timeOutAt);  
    throw new TimeoutException("Disruptor shutdown timed out");  
}  

private boolean hasBacklog() {  
    long cursor = ringBuffer.getCursor();  
    // 检查消费者序列是否追上生产者序列  
    for (Sequence consumer : consumerRepository.getLastSequenceInChain(false)) {  
        if (cursor > consumer.get()) return true; // 存在未处理消息  
    }  
    return false;  
}  


核心逻辑:通过循环检查生产者游标与消费者序列,确保所有内存消息处理完成后再关闭。

三、测试流程

  1. 启动应用:访问 http://127.0.0.1:8088/hello 验证服务正常。
  2. 触发优雅停机
   curl -X POST http://127.0.0.1:8088/actuator/shutdown  
  1. 验证结果
  • 再次访问 http://127.0.0.1:8088/hello 应返回 503 错误。
  • 控制台输出显示Disruptor正在处理剩余消息,完成后应用自动关闭。

留下评论

您的邮箱地址不会被公开。 必填项已用 * 标注