在基于Disruptor开发新的性能测试QPS模型时候,中间遇到了很多问题,踩了很多坑。今天就分享一个比较典型的问题:shutdown失效。
问题在于这么优秀的一个框架,怎么可能会存在这么明显的BUG?
经过查阅资料,还真特么存在,只不过在极少数使用场景下会发生,刚好FunTester性能测试框架设计中就属于这个场景。下面听说娓娓道来。
首先我是把每一个消费者线程都当做性能测试线程使用,此为前提。下面是两个因此带来的设定:
以上是四个因为Disruptor框架特性和FunTester框架设计带来的难以避免,然后就会在线程数远超(难以量化界定)需求的时候,会导致性能测试结束之后,Disruptor执行shutdown方法后,Disruptor所有线程并没有全部结束,导致程序无法正常结束且CPU使用率飙升(线程数设定较多)。具体原因大家可以自行搜索,有大佬做了非常优秀的分析、分享和演示。总结起来就是两点:
但是这两种情况其实除非特意构造,否则极难发生,重点还是了解一点点Disruptor源码的结构和运行逻辑。经过一阵子摸索和学习,我发现了问题所在,消费者线程太多了。
在我初步的测试中,有以下几条经验:
PS:以上数据在QPS:5w,平均响应时间10ms设定下完成测试。
使用Disruptor做性能测试坑还是挺多的,可能之前也没人这么用过,还有几个大坑我后面会继续分享,目前总体来说,性能测试最好的模型还是线程模型,当QPS在万级别上时,QPS模型的精确很难控制。
关于较多消费者时,Disruptor框架shutdown失效的问题已经反馈给了开发者。下面是我的测试脚本,为了更容易验证,我特意写了Java版本的。
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class DisJava {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
return thread;
}
};
Disruptor<Event> disruptor = new Disruptor<Event>(
Event::new,
256 * 256,
threadFactory,
ProducerType.MULTI,
new TimeoutBlockingWaitStrategy(1000, TimeUnit.MILLISECONDS)
);
RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
int num = 3000;
EventFun[] consumers = new EventFun[num];
for (int i = 0; i < num; i++) {
consumers[i] = new EventFun();
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.start();
for (int i = 0; i < 10; i++) {
ringBuffer.publishEvent((e, s) -> {
e.setEvent("123");
System.out.println(System.currentTimeMillis());
});
}
disruptor.shutdown();
System.out.println("结束了");
}
private static class EventFun implements EventHandler<Event>, WorkHandler<Event> {
public EventFun() {
}
/**
* 多消费者
*
* @param event
* @throws Exception
*/
@Override
public void onEvent(Event event) throws Exception {
sleep(10);
}
/**
* 单消费者
*
* @param event
* @param sequence
* @param endOfBatch
* @throws Exception
*/
@Override
public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
sleep(10);
}
}
/**
* 消息体
*/
private static class Event {
public String getEvent() {
return event;
}
public void setEvent(String event) {
this.event = event;
}
String event;
}
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
}
}
}