报错信息:
Source.java:120)] Event #: 0
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 965
2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 975
2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 985
2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 995
2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 1006
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0
2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}
java.lang.NullPointerException
at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)
at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:748)
--------------------------------------------
配置文件
kafkaLogger.sources = kaSource
kafkaLogger.channels = memoryChannel
kafkaLogger.sinks = kaSink
# The channel can be defined as follows.
kafkaLogger.sources.kaSource.channels = memoryChannel
kafkaLogger.sources.kaSource.type= org.apache.flume.source.kafka.KafkaSource
kafkaLogger.sources.kaSource.zookeeperConnect=192.168.130.4:2181,192.168.130.5:2181,192.168.130.6:2181
kafkaLogger.sources.kaSource.topic=dwd-topic
kafkaLogger.sources.kaSource.groupId = 0
kafkaLogger.channels.memoryChannel.type=memory
kafkaLogger.channels.memoryChannel.capacity = 1000
kafkaLogger.channels.memoryChannel.keep-alive = 60
kafkaLogger.sinks.kaSink.type = elasticsearch
kafkaLogger.sinks.kaSink.hostNames = 192.168.130.6:9300
kafkaLogger.sinks.kaSink.indexName = flume_mq_es_d
kafkaLogger.sinks.kaSink.indexType = flume_mq_es
kafkaLogger.sinks.kaSink.clusterName = zyuc-elasticsearch
kafkaLogger.sinks.kaSink.batchSize = 100
kafkaLogger.sinks.kaSink.client = transport
kafkaLogger.sinks.kaSink.serializer = com.commons.flume.sink.elasticsearch.CommonElasticSearchIndexRequestBuilderFactory
kafkaLogger.sinks.kaSink.serializer.parse = com.commons.log.parser.LogTextParse
kafkaLogger.sinks.kaSink.serializer.formatPattern = yyyyMMdd
kafkaLogger.sinks.kaSink.serializer.dateFieldName = time
kafkaLogger.sinks.kaSink.channel = memoryChannel
flume版本1.6.0.1 kafka版本kafka_2.11-2.0.1
相似问题