从上个星期开始,我用scala中的Flink构建了一个DataStream程序。但我的行为很奇怪,弗林克比我想象的要多。
我的processFunction中有一个4 processFunction的元组( INT,long),我用它在不同的时间范围内得到不同的唯一计数器,并且我预计大部分内存都被这个列表使用了。
但事实并非如此。所以我打印了JVM的他的生活。我很惊讶这么多的记忆被使用了。
num #instances #bytes class name
----------------------------------------------
1: 138920685 6668192880 java.util.HashMap$Node
2: 138893041 5555721640 org.apache.flink.streaming.api.operators.InternalTimer
3: 149680624 3592334976 java.lang.Integer
4: 48313229 3092046656 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
5: 14042723 2579684280 [Ljava.lang.Object;
6: 4492 2047983264 [Ljava.util.HashMap$Node;
7: 41686732 1333975424 com.myJob.flink.tupleState
8: 201 784339688 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
9: 17230300 689212000 com.myJob.flink.uniqStruct
10: 14025040 561001600 java.util.ArrayList
11: 8615581 413547888 com.myJob.flink.Data$FingerprintCnt
12: 6142006 393088384 com.myJob.flink.ProcessCountStruct
13: 4307549 172301960 com.myJob.flink.uniqresult
14: 4307841 137850912 com.myJob.flink.Data$FingerprintUniq
15: 2153904 137849856 com.myJob.flink.Data$StreamData
16: 1984742 79389680 scala.collection.mutable.ListBuffer
17: 1909472 61103104 scala.collection.immutable.$colon$colon
18: 22200 21844392 [B
19: 282624 9043968 org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
20: 59045 6552856 [C
21: 33194 2655520 java.nio.DirectByteBuffer
22: 32804 2361888 sun.misc.Cleaner
23: 35 2294600 [Lscala.concurrent.forkjoin.ForkJoinTask;
24: 640 2276352 [Lorg.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
25: 32768 2097152 org.apache.flink.core.memory.HybridMemorySegment
26: 12291 2082448 java.lang.Class
27: 58591 1874912 java.lang.String
28: 8581 1372960 java.lang.reflect.Method
29: 32790 1311600 java.nio.DirectByteBuffer$Deallocator
30: 18537 889776 java.util.concurrent.ConcurrentHashMap$Node
31: 4239 508680 java.lang.reflect.Field
32: 8810 493360 java.nio.HeapByteBuffer
33: 7389 472896 java.util.HashMap
34: 5208 400336 [I
com.myJob.flink.tupleState排在第7位。我看到元组使用的内存少于2G。
我不明白为什么flink在这些类中使用这么多的内存。
有人能给我介绍一下这种行为吗?谢谢。
更新:
我在一个独立集群(1 jobManager,3 taskManager)上运行我的工作
flink版本为1.5-快照提交: e4486ae
我让他住在一个taskManager节点上。
更新2:
在我的processFunction中,我使用:
ctx.timerService.registerProcessingTimeTimer(ctx.timestamp + 100)
在使用onTimer
函数之后,我处理我的listState
以检查所有旧数据。因此,它为processFunction上的每个调用创建一个计时器。
但是为什么在onTimer
函数触发后定时器在内存上是钢的?
发布于 2018-02-13 08:41:54
你最后有多少扇窗户?基于前两个条目,现在看到的是Flink用来跟踪何时清理窗口的“计时器”。对于窗口中的每个键,您将在计时器状态下有效地得到( key,endTimestamp)。如果您有非常多的窗口(可能是无序时间或延迟水印),或者每个窗口中有大量的密钥,那么每个窗口都会占用内存。
请注意,即使您使用的是RocksDB状态,TimerService也使用堆内存,因此您必须注意这一点。
https://stackoverflow.com/questions/48753108
复制相似问题