在Java中,提到“优化”这两个字,很多人首先都会想到JVM优化。的确,JVM提供了很多参数,让优化工作看起来更为直观。例如我们通过Xms、Xmx就可以调整程序的启动内存,通过 -XX:+UseG1GC我们就可以使用G1垃圾收集器。
我在大数据开发中,遇到过大数据量的数据转换、接入,为了避免程序的OOM,除了在前期增加处理主机之外,后来更多的是在代码层面进行优化。所以今天就看看我在代码开发时,可以从代码层做哪些优化。
在Java中,list、set、map是我们使用比较多的,就拿list来说,常用的实现类有ArrayList、LinkedList,对于这两种list的选择,我们还是需要根据实际业务来。
当时在大学学习数据结构的时候,书中写到数组是访问快,但是删除和增加困难。链表是访问慢,但是删除和增加快。那时候对这句话真的是一点都不理解,后来在代码开发中就慢慢明白了这个道理。
ArrayList使用数组elementData来存储数据。
对于Array来说,使用下标index直接访问元素,不需要遍历整个数组,时间复杂度为0(1)。而如果删除或增加一个元素,后面元素的下标都会修改,例如删除index为7的元素,那么删除之后index为8的元素就要前移1,变成index为7的元素,后面是元素整体前移。
所以在ArrayList的remove中,使用arrayCopy直接将elementData后半部分数据,前移一位。
所以说每次删除或者增加数据,都要调用一次arrayCopy数组复制。
对于LinkedList,存储结构是Node,而且会有一个first头结点和一个last尾节点。
对于每个Node对象,都会有prev前置节点指向前一个Node,和next后继节点指向下一个Node。
链表的存储结构大概是这样的:头节点没有prev前置节点,为节点没有next后继节点,除此之外每个Node都有pre和next。
在LinkedList中,在指定的index插入时,如果index与size相等,则表示是在尾部插入,即插入尾结点。如果不是,则调用linkBefore插入节点。
在linkeBefore中,我们可以看到:当在链表中下标index处,增加一个节点时,会将index的Node的prev修改为新节点,index原本的前置节点的next指向新节点。对于新节点,next指向index节点,prev指向index节点原来的前置节点。
流程如图所示:
这样LinkedList在增加或删除元素时,就不需要做数据复制和index位移,只需要修改几个节点的prev和next节点即可。所以在选择List时,如果查询操作多,就选择ArrayList,删除或者增加元素多,就选择LinkedList。
对于Map的时候,主要是注意对Map容量大小的初始化,默认容量为16,且loadFactor为0.75。
我new一个HashMap,然后put插入数据,通过断点可以发现,当++size > threshold时,则调用resize扩容。
那么threshold是如何而来的?通过对map的容量capacity loadFactor*而来。
也就是说,容量为16的map只能存储12个元素,当存入第13个的时候,就会发生扩容,这时候threshold就变成了24,而容量就是24 / 0.75 = 32,直接扩容2倍。
在resize()中,newCap表示新容量,扩容运算规则是在oldCap基础上,<<右移一位,即*2.
而扩容之后,会为map新建一个容量为32的Node数据结构,然后将原来16的Node中的元素,复制到新Node中。
所以,在使用map时,要预估要存放元素的个数,然后指定初始化大小,避免扩容带来的性能问题。
遇到过这么一个场景:消费kafka中的数据,然后去做处理。但是kakfa中的一个分区只能被一个thread消费,所以thread的数量最大为分区数。但是这些线程满足不了我的处理性能需求。
所以将Kafka的消费与数据处理逻辑代码解耦,先利用少量线程消费kakfa,将数据放入queue中,然后数据处理模块读取queue消费。为了保证高并发下的线程数据安全,使用了ConcurrentLinkedQueue。
ConcurrentLinkedQueue优点就是无锁,在poll()中看不到一个关于锁(synchronized、lock等)。
那么,ConcurrentLinkedQueue是如何保证线程安全的呢?这就要提到上图中的CAS。
CAS,comprare and swap,第一次接触还是在java的Atomic类中。CAS不是锁,只是CPU提供的一个原子性操作指令,直接使用UnSafe类将原子操作实现在硬件级别实现,解决ABA问题。
poll()中调用了casItem方法,而casItem调用的是UNSAFE.compareAndSwapObject。
假如queue中的head节点元素(item)为1,将item设置为null,就表示这个节点被移除了,head就指向了下一个元素。而使用compareAndSwapObject,就是为了将item中的元素替换(swap)为null,但是在swap之前需要compare一下,这个item还是之前的item吗。只要当item还是之前的item,才能被swap。
可以看到在poll()中的最开始部分,有一个for(;;),这就是死循环的一个写法,类似于while true,但是在这里被称作自旋,如果多个线程都在调用poll(),那么每个线程都会陷入自旋,等到有一个线程获取到head节点的数据,并通过CompareAndSwap这个过程,将其修改为null为止。
所以,ConcurrentLinkedQueue使用CAS代替了锁保证线程安全,但是有一个问题就是如果queue中没有数据,调用poll()返回的是null,所以在数据处理时要增加非空判断。
如果想要进一步提升性能,推荐使用disruptor来替换ConcurrentLinkedQueue。
disruptor也是无锁的CAS设计,有着高并发高吞吐的性能,同时也有着类似于kafka中生产者和消费者的定义。
其底层是基于数组实现的缓冲区RingBuffer,生产者,消费者,都有各自独立的Sequence,在RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。
如果想要使用disruptor,首先要构建Event载体,也就是数据对象。
public class ByteArrayEvent {
private byte[] bytes;
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
}
public class ByteArrayEventFactory implements EventFactory<ByteArrayEvent> {
@Override
public ByteArrayEvent newInstance() {
return new ByteArrayEvent();
}
}
public class ByteArrayEventHandler implements EventHandler<ByteArrayEvent> {
@Override
public void onEvent(ByteArrayEvent byteArrayEvent, long sequence, boolean endOfBatch) throws Exception {
// 处理事件的逻辑
}
}
默认情况下,Disruptor会将生产者指定为多线程模式,ProducerType.SINGLE来设置生产者为单线程模式。
// 必须是2的幂
int bufferSize = 1024;
/**
DaemonThreadFactory:线程池,create threads for processors.
ProducerType#SINGLE:一个ringbuffer支持多个publisher; ProducerType#MULTI:支持多个publisher
BlockingWaitStrategy:消费者等待策略。SleepingWaitStrategy:对生产者影响小。BlockingWaitStrategy使用了lock,效率不太行。
YieldingWaitStrategy性能最好无锁策略,使用了 Thread.yield() 多线程交替执行
**/
disruptor = new Disruptor<>(new ByteArrayEventFactory(), bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
// handleEventsWith:消费数据,每一次绑定一个消费者,可以使用then进行链式处理
// 一个handler就是一个消费者
disruptor.handleEventsWith(new ByteArrayEventHandler());
// 启动
disruptor.start();
可以参考为了研究Java内存模型(JMM),我又学了一点汇编指中提到的volatile和synchronized部分
我最近遇到一个需求,将TLV格式的二进制数据解析为二进制明文。TLV是什么意思呢,就是每条数据的每个字段由TLV格式表示的,T代表tag,是一个字段的唯一id,L是length,表示后面V即value的长度。每条数据都是固定的字段个数,但是数据中有的字段为空,则这个字段就不用TLV表示,则直接跳到下一个字段。
这里字段存储使用的数据结构是数组,原因就是按照index查找速度快。这里先创建一个DataObject类。
private Object[] data;
private static final String KAFKA_SEPARATOR = "|";
public DataObject(Object[] data) {
this.data = data;
}
@Override
public void setData(int index, Object f) {
data[index] = f;
}
@Override
public void data2String(StringBuilder sb) {
sb.append(data[0]).append(KAFKA_SEPARATOR);
sb.append(data[1]).append(KAFKA_SEPARATOR);
// 省略其他字段...
}
解析每个字段时,都先解析出来tag,然后以tag为index,调用setData将value放到Array中。
int tag = buffer[off] & 0xFF;
int formatAndTagHigh = buffer[off + 1] & 0xFF;
int format = ((buffer[off + 1] & 0xFF) >> 4) & 0X0F;
off += 2;
int length = 0;
switch (format) {
case 1:
length = 1;
dataObject.setData(tag, buffer[off] & 0xff);
break;
case 4:
xdr.dataObject(tag, ConvToByte.byteToUnsignedInt(buffer, off));
length = 4;
break;
但在java中array的默认值为null,但是我在字段中默认值想要设置为空字符,所以当时思考了几个方案。
方案一就是,在将array中的数据转换成String的时候,使用replace替换字符串。但是这个方案直接被我否决了,对于数据量很大且实时性要求很高的解析程序来说,replace是个很耗费性能的事情。
方案二就是,array作为数据对象的私有属性,在构造函数constructer中初始化array,并循环array将每个值设置为""。但是这样带来的问题也显而易见,就是每创造一个数据对象时,都会循环array,而且每条数据有200个字段,就要循环200次,也会带来性能问题。
所以后来我就采用了方案三,就是在外部构造一个array,循环遍历将每个值设置为"",在每次创造数据对象之前,我调用array.clone()复制一个array,在new数据对象时,作为构造参数赋值给数据对象的私有变量。这样就完美解决了我的需求。
private Object[] data;
public DataDecode() {
int length = 200;
data = new Object[length];
data20 = new Object[length20];
for (int i = 0; i < length; i++) {
data[i] = "";
}
}
DataObject dataObject = new DataObject(data.clone());
// 解码处理逻辑,省略....
dataObject.data2String(sb);
clone()有哪些优势?
综合以上,在合适的场景选择clone()是一个不错的选择。
在java的开发中,很多时候都会以实现功能为最终目的,而往往会忽略相同功能的不同选择,会带给自己代码性能和技术层面的提升。
这篇文章只是整个java开发中可优化部分的缩影,尤其在高并发多线程、锁这一方面可优化的地方还有很多。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。