背景

正在负责运维的jstorm集群因为CPU消耗过高,又没有机器投入, 只能想办法优化Jstorm的逻辑。 大家都说序列化\反序列化比较消耗资源, 就想着批量性的序列化。 不懂背景的胡乱修改前人的代码就是一个深不见底的坑。

表现

把代码修改完,小批量的数据测试功能基本正常,当小数据量数据消费的时候就出现结果有数据丢失。查询整个处理流程日志后发现,spout 有数据积压,导致消费异常。

原因

前文又说,本次修改主要是把emit单条数据修改为缓存后批量发送。 在 spout 的 nextTuple 方法中,代码如下:

1
2
3
4
5
6
7
8
9
@Override
    public void nextTuple() {
        try {
            HashMap<String, String> message = messageQueue.poll(1, TimeUnit.MILLISECONDS);
            if(message != null){
                spoutEmit(message);
            }
        }
    }

在 spoutEmit 方法里做的缓存,这里就是问题的原因了。在官方的代码注释中有这样的一段话

When this method is called, Storm is requesting that the Spout emit tuples to the output collector. This method should be non-blocking, so if the Spout has no tuples to emit, this method should return. nextTuple, ack, and fail are all called in a tight loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short amount of time (like a single millisecond) so as not to waste too much CPU.

重要的有一句, 当不存在数据项需要发送时,nextTuple()将会休眠一小段间隔,确保不会浪费过多的CPU资源。 就是这里,代码在 nextTupe方法里进行多次的缓存,没有消息发送有大量的时间进入休眠,使得吞吐量下降太多。

解决

找到原因了,就是 nextTuple 方法, 有大量的次数没有消息发送,导致主循环线程进入休眠状态的。修改nextTuple方法的处理逻辑, 在有大量消息的时候,保证每次都有消息发送即可。