一、背景
在 《Spring Boot集成Disruptor快速入门Demo》 的讨论中,有网友提出两个核心问题:
- Spring Boot应用停机时,如何保证内存中的消息全部处理完成?
- Disruptor与Guava EventBus的差异(本文主要聚焦第一个问题)。
二、解决方案
(一)禁用强制终止命令 kill -9
原因:
- 数据丢失:强制终止进程会导致未处理的内存消息丢失。
- 资源泄漏:无法释放内存、文件句柄或网络连接等资源。
- 跳过清理逻辑:应用无法执行关闭前的清理操作(如关闭数据库连接、写入日志)。
替代方案:使用优雅停机机制。
(二)Spring Boot优雅停机方案
1. 引入Actuator依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
2. 配置Shutdown端点
management: endpoints: web: exposure: include: "*" # 暴露所有端点 endpoint: shutdown: enabled: true # 启用shutdown端点 server: port: 8088 # 自定义端口
3. 禁止停机后API继续对外服务(Spring Boot 2.3以下适用)
通过过滤器返回 503 Service Unavailable
状态码:
package com.et.disruptor.config; import org.springframework.stereotype.Component; import javax.servlet.*; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @Component public class GracefulShutdownFilter implements Filter { private final AtomicBoolean shuttingDown = new AtomicBoolean(false); @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { if (shuttingDown.get()) { ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } chain.doFilter(request, response); } public void startShutdown() { shuttingDown.set(true); } }
4. 实现 DisposableBean
接口(自定义清理逻辑)
package com.et.disruptor.config; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class GracefulShutdownManager implements DisposableBean { @Autowired private GracefulShutdownFilter shutdownFilter; @Autowired private MqManager mqManager; @Override public void destroy() throws Exception { // 1. 拒绝新请求 shutdownFilter.startShutdown(); // 2. 优雅关闭Disruptor(等待消息处理完成) mqManager.shutdownDisruptor(); // 3. 等待自定义任务完成(如CountDownLatch) waitForTasksToComplete(); } private void waitForTasksToComplete() throws InterruptedException { System.out.println("Waiting for tasks to complete..."); // 示例:模拟任务等待(实际使用线程同步工具) Thread.sleep(100000); } }
(三)Disruptor优雅关闭机制
1. 手动调用 shutdown()
方法
package com.et.disruptor.config; import com.lmax.disruptor.Disruptor; import org.springframework.context.annotation.Configuration; @Configuration public class MqManager { private static Disruptor<MessageModel> disruptor; // 关闭逻辑 public void shutdownDisruptor() { if (disruptor != null) { System.out.println("Closing Disruptor..."); disruptor.shutdown(); // 等待所有消息处理完成 } } }
2. 使用 @PreDestroy
注解(自动触发关闭)
@PreDestroy public void shutdownDisruptor() { // 同上关闭逻辑 }
3. shutdown()
源码解析
public void shutdown(long timeout, TimeUnit timeUnit) throws TimeoutException { long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); do { if (!hasBacklog()) { // 检查是否有未处理消息 halt(); // 无消息则终止 return; } } while (timeout < 0 || System.currentTimeMillis() <= timeOutAt); throw new TimeoutException("Disruptor shutdown timed out"); } private boolean hasBacklog() { long cursor = ringBuffer.getCursor(); // 检查消费者序列是否追上生产者序列 for (Sequence consumer : consumerRepository.getLastSequenceInChain(false)) { if (cursor > consumer.get()) return true; // 存在未处理消息 } return false; }
核心逻辑:通过循环检查生产者游标与消费者序列,确保所有内存消息处理完成后再关闭。
三、测试流程
- 启动应用:访问
http://127.0.0.1:8088/hello
验证服务正常。 - 触发优雅停机:
curl -X POST http://127.0.0.1:8088/actuator/shutdown
- 验证结果:
- 再次访问
http://127.0.0.1:8088/hello
应返回503
错误。 - 控制台输出显示Disruptor正在处理剩余消息,完成后应用自动关闭。