前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >freeswitch笔记(8)-esl outbound 填坑笔记

freeswitch笔记(8)-esl outbound 填坑笔记

作者头像
菩提树下的杨过
修改2020-07-12 22:22:29
1.6K0
修改2020-07-12 22:22:29
举报
文章被收录于专栏:菩提树下的杨过

github上的esl-client已经N年未更新了,上面有一堆bug,记录一下:

一、内存泄露

org.freeswitch.esl.client.transport.message.EslFrameDecoder 这个类,使用了netty的ByteBuf,对netty有了解的同学应该知道,netty底层大量使用了堆外内存,建议开发人员及时手动释放。

https://github.com/esl-client/esl-client/issues/24 也有记载

参考下图,手动加上释放处理即可

二、线程池优化

org.freeswitch.esl.client.outbound.OutboundChannelInitializer 这个类,每次freeswitch有来电时,会以outbound外联模式,通过tcp连接到esl client,初始化channel。这里用Executors创建一个单线程,正常情况下问题倒不大,但是jdk源码:

代码语言:javascript
复制
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

LinkedBlockingQueue默认是一个无界队列:

代码语言:javascript
复制
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

有点风险,有点风险,改成下面这样更安全点:

代码语言:javascript
复制
    private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("outbound-pool-%d").build();

    private ExecutorService callbackExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());

这个单线程池的用法也顺带研究了下,它真正使用的地方在于org.freeswitch.esl.client.outbound.OutboundClientHandler,用于处理freeswitch发过来的事件

代码语言:javascript
复制
@Override
protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {
    callbackExecutor.execute(() -> clientHandler.onEslEvent(
            new Context(ctx.channel(), OutboundClientHandler.this), event));
}

大家知道Netty本身就有2个线程池:bossGroup,workerGroup,默认大小在io.netty.channel.MultithreadEventLoopGroup中

代码语言:javascript
复制
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
 
    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

即:核数*2。 既然已经是线程池了,为啥这里esl的事件又单独交给1个单线程池来处理呢? 先来看OutboundChannelInitializer实例化的地方,在org.freeswitch.esl.client.outbound.SocketClient的doStart里

代码语言:javascript
复制
@Override
protected void doStart() {
    final ServerBootstrap bootstrap = new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new OutboundChannelInitializer(clientHandlerFactory));
 
    serverChannel = bootstrap.bind(bindAddress).syncUninterruptibly().channel();
    notifyStarted();
    log.info("SocketClient waiting for connections on [{}] ...", bindAddress);
}

也就是说,只有outbound tcp server启用时,才会对OutboundChannelInitializer做1次初始化,言外之意,刚才的单线程池实例也只会实例化1次。

试想一下,如果在outbound的处理过程中,一通电话进来,我们订阅了一堆事件,这堆事件发过来后,如果让workerGroup并行处理,事件的处理顺序就得不到保证了,这在电话系统中是很重要的,比如:响铃->接听->挂断。肯定要有顺序的!所以为了保证事件处理的顺序性,强制让所有事件,都交给这个单线程池实例来处理,保证了顺序性。

其实不光是outbound,inbound也是类似机制,保证事件接收时按顺序处理。明白这个原理后,回过头来想想,这个单线程池的callbackExector实例,应该处理成static静态实例更稳妥,这样强制让jvm保证肯定只有一个实例,处理事件绝对有顺序。

另外,在outbound的onConnect事件里,如果尝试跟freeswitch发命令,会发现block住,后面的代码完全无法执行,这也是一个大坑。解决办法:

将onConnect的处理,放在另外1个专用线程池里

代码语言:javascript
复制
class OutboundClientHandler extends AbstractEslClientHandler {
 
    //这是保证事件接收顺序的单线程池
    private final ExecutorService onEslEventExecutor;
    //这是用于并发处理onConnect的多线程池
    private final ExecutorService onConnectExecutor;
 
 
    public OutboundClientHandler(IClientHandler clientHandler, ExecutorService onEslEventExecutor, ExecutorService onConnectExecutor) {
        this.clientHandler = clientHandler;
        //构造函数里允许传入
        this.onEslEventExecutor = onEslEventExecutor;
        this.onConnectExecutor = onConnectExecutor;
    }
 
    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
 
        // Have received a connection from FreeSWITCH server, send connect response
        long threadId = Thread.currentThread().getId();
        log.debug("Received new connection from server, sending connect message,threadId:" + threadId);
 
        sendApiSingleLineCommand(ctx.channel(), "connect")
                .thenAccept(response ->
                        //这里改为线程池执行
                        onConnectExecutor.execute(() -> clientHandler.onConnect(
                                new Context(ctx.channel(), OutboundClientHandler.this),
                                new EslEvent(response, true)))
                )
                .exceptionally(throwable -> {
                    ctx.channel().close();
                    handleDisconnectionNotice();
                    return null;
                });
    }
 
    @Override
    protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {
        //这里仍然用单一线程池处理,保证顺序
        onEslEventExecutor.execute(() -> clientHandler.onEslEvent(
                new Context(ctx.channel(), OutboundClientHandler.this), event));
    }
 
    ...
}

然后

代码语言:javascript
复制
public class OutboundChannelInitializer extends ChannelInitializer<SocketChannel> {
 
    private final IClientHandlerFactory clientHandlerFactory;
 
    private static ThreadFactory onEslThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("outbound-onEsl-pool-%d").build();
 
    //专门接收订阅事件的单一线程池(保证顺序)
    private static ExecutorService onEslExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(100000), onEslThreadFactory);
 
    private static ThreadFactory onConnectThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("outbound-onConnect-pool-%d").build();
 
    //专用于处理新来电onConnect的多线程池
    private static ExecutorService onConnectExecutor = new ThreadPoolExecutor(32, 512,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2048), onConnectThreadFactory);
 
 
    public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory) {
        this.clientHandlerFactory = clientHandlerFactory;
    }
 
    /**
     * 重载版本,允许开发人员初始化时,传入自己的线程池
     * @param clientHandlerFactory
     * @param connExecutor
     * @param eslExecutor
     */
    public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory, ExecutorService connExecutor, ExecutorService eslExecutor) {
        this.clientHandlerFactory = clientHandlerFactory;
        onEslExecutor = eslExecutor;
        onConnectExecutor = connExecutor;
    }
 
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // Add the text line codec combination first
        pipeline.addLast("encoder", new StringEncoder());
        // Note that outbound mode requires the decoder to treat many 'headers' as body lines
        pipeline.addLast("decoder", new EslFrameDecoder(8092, true));
 
        // now the outbound client logic
        //将2个线程池,传入实例
        pipeline.addLast("clientHandler",
                new OutboundClientHandler(clientHandlerFactory.createClientHandler(), onEslExecutor, onConnectExecutor));
 
    }
}

三、源码上的Test示例代码各种错误

https://github.com/esl-client/esl-client/blob/master/src/test/java/OutboundTest.java 这是示例源码

代码语言:javascript
复制
String uuid = eslEvent.getEventHeaders().get("unique-id");

45行,这里应该是"Unique-ID",小写取不到值。

另外82行,outbound的onEslEvent方法,其实永远也不会被触发,因为根本没订阅任何事件,inbound的示例部分也有同样问题。

56行,执行后,实测下来,后面的操作其实都是阻塞的,代码无法向下执行,建议改在新线程里执行。

上述这些问题,笔者已经fork了一份代码进行了修改,有兴趣的同学,欢迎fork,地址:https://github.com/yjmyzz/esl-client

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-07-05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云联络中心
腾讯云联络中心(Tencent Cloud Contact Center,TCCC)是帮助企业快速搭建集电话、在线交流、音视频通话为一体的客户联络平台。支持被集成于SaaS或业务系统,为企业客服、销售、线下门店沟通、混合办公等场景提供灵活稳定的一体化云联络中心。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档