盒子
盒子

一次RabbitMQ问题的解决

公司使用的Java技术栈,MQ中间件是RabbitMQ,RabbitMQ的客户端使用的是Spring-AMQP。临近元旦几天却频繁宕机,影响到维护的一个老程序。经过排查发现可能是由于RabbitMQ 2.x版本的一个Bug,Queue超过一定的数量,两个节点复制Queue时候致使无法响应外界请求。Queue数量突然增多,其实是最近两个项目迁移合并导致。

整理一下所有的Queue,发现两个不同的Vhost中存在相同Queue、Exchange、Binding,但其中有一个Vhost使用到。也就是说存在某种原因导致Queue在两个Vhost都创建了一次。查看我维护的那个项目发现两个不同的业务都各自使用自己Vhost,到底是什么原因导致其中一个Vhost中的队列再另一个中也创建了?

查看Spring-AMQP的官方文档,跟踪它的源码,这个答案终于揭晓了。且看一步一步来分析。在官方文档中看到这么一段:

By default, all queues, exchanges, and bindings are declared by all RabbitAdmin instances (that have auto-startup="true") in the application context.

也就是说queues、exchanges、bindings都是通过RabbitAdmin来声明创建的。客户端的代码中配置MQ的地方确实有RabbitAdmin对象的实例化,但没发现任何地方去调用它。既然没地方使用,那么肯定又是Spring在管理。打开RabbitAdmin的源码,看到RabbitAdmin实现了接口InitializingBean。这个接口只有一个方法afterPropertiesSet(),实现了这个接口的类在初始化设置完熟悉之后会调用这个方法。RabbitAdmin实现了这个方法,方法的部分实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
connectionFactory.addConnectionListener(new ConnectionListener() {
// Prevent stack overflow...
private final AtomicBoolean initializing = new AtomicBoolean(false);
public void onCreate(Connection connection) {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...
return;
}
try {
/*
* ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
* declared for every connection. If anyone has a problem with it: use auto-startup="false".
*/
initialize();
} finally {
initializing.compareAndSet(true, false);
}
}
public void onClose(Connection connection) {}
});

这里添加一个Connection的监听,监听在Connection被创建的时候触发,并调用了initialize()方法。关于initialize()方法中有下面关键的几行代码:

1
2
3
final Collection<Exchange> exchanges = filterDeclarables(applicationContext.getBeansOfType(Exchange.class).values());
final Collection<Queue> queues = filterDeclarables(applicationContext.getBeansOfType(Queue.class).values());
final Collection<Binding> bindings = filterDeclarables(applicationContext.getBeansOfType(Binding.class).values());

监听在Connection第一次初始化的时候被触发,并执行了initialize()。在上面的代码中看到所有定义的Exchange、Queue、Binding对象由Spring来管理,这里使用getBeanOfType从Spring的applicationContext中获取对象。也就是说两个业务的客户端各自的RabbitAdmin在执行的时候都会拿到对方定义的Exchange、Queue、Binding,接下来filterDeclarables方法对这些对象做了过滤。看一看filterDeclarables方法的部分实现,就能解答需要探讨的问题了。代码如下:

1
2
3
4
5
6
7
for (T declarable : declarables) {
Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins();
if (declarable.shouldDeclare() &&
(adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) {
filtered.add(declarable);
}
}

如上所示,declarables就是上面从context中拿到的Exchange、Queue、Binding的对象集合。这里遍历集合,调用集合元素的getDeclaringAdmins方法获取RabbitAdmin的集合,getDeclaringAdmins是Declarable接口的一个方法,Queue、Exchange、Binding都实现了这个接口。并且它们各自都有一个方法叫setAdminsThatShouldDeclare,用于设置它们应该被托管的RabbitAdmin的集合。

当getDeclaringAdmins拿到的RabbitAdmin集合为空或者包含当前正在执行的RabbitAdmin时都不会过滤掉。原因就在于我们的客户端代码当中都没有调用setAdminsThatShouldDeclare,执行到这里时获取到的RabbitAdmin集合为空,从applicationContext中拿到的所有Exchanges、Queues、Bindings在两个RabbitAdmin中都各自声明了一遍,于是也就有了两个Vhost有相同的Exchange、Queue、Binding。

记下这个问题,也是因为它出现在元旦,整个元旦假期也因为此问题没有过好。虽然代码不是我写的,但让我感觉到写代码的人在使用Spring-Amqp的时候并没有认真的了解过,属于半生不熟的状态。当我们使用自己不熟悉的东西时,不应该马上拿起就用,而是应该先尽可能的先去了解一下要用的东西,要学会思考。

支持一下
扫一扫,支持forsigner