首页
学习
活动
专区
圈层
工具
发布
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    Kafka 消费者旧版低级 API

    Kafka 消费者总共有 3 种 API,新版 API、旧版高级 API、旧版低级 API,新版 API 是在 kafka 0.9 版本后增加的,推荐使用新版 API,但由于旧版低级 API 可以对消息进行更加灵活的控制...; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping...(),获取最开始的消费偏移量,不一定是0,因为segment会删除 * kafka.api.OffsetRequest.LatestTime(),获取最新的消费偏移量...OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion...配置获取offset的策略为,获取分区最开始的消费偏移量 long offset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime

    1.7K30

    大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

    zk                      zk kafka                        kafka                   kafka 2.1.2 jar包下载 http...100000, 64 * 1024, clientName);         long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime...last element to reset                     readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime... request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName...2)案例实操 (1)创建一个工程,并添加jar包或在pom文件中添加依赖     <!

    1.4K20

    Kafka源码系列之实现自己的kafka监控

    读过前面的文章,Kafka源码系列之源码解析SimpleConsumer的消费过程>和Kafka源码系列之Consumer高级API性能分析>这两篇文章的兄弟姐妹应该看本篇文章会很简单。...4,OffsetRequest 消费者去获取分区数据偏移的请求类,对应的请求key是:RequestKeys.OffsetsKey。...在kafka的服务端kafkaApis的处理函数是:handleOffsetRequest(request) 5,OffsetFetchRequest 这个是请求某个topic的某个消费组的消费偏移,对应的请求...解决获取topic的分区的最大偏移,实际思路是构建simpleConsumer,然后由其 去请求偏移,再跟获取的消费者偏移做差就得到消费者最大偏移。...(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val logSize

    2.1K50

    大数据实时处理实战

    d)Kafka+Storm+Hdfs+Hbase拓扑开发 我们使用Eclipse创建MAVEN工程,在pom.xml配置文件中添加Storm及Hdfs的相关依赖,本例是Storm从Kafka中消费数据,...经过ETL处理后存储到Hdfs和Hbase中,因此需要添加Storm-Kafka、Storm-Hdfs、Storm-Hbase等依赖,注意依赖包版本要与集群一致。...spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); spoutConf.useStartOffsetTimeIfOffsetOutOfRange...编译后的jar包上传到集群,使用storm命令行提交Topology: storm jar ....究其原因是在某个Hdfs节点上,Yarn任务正在进行Reduce操作,用iostat -x 1 10命令查看,Yarn的中间盘I/O长时间被100%占用,同时Yarn的中间盘也是Hdfs的数据盘,导致写入请求无法响应

    2.6K100

    Flume、Kafka、Storm如何结合使用

    flume和kafka的整合 复制flume要用到的kafka相关jar到flume目录下的lib里面。...demo(java api)测试》),然后在s1机器上再启动一个消息消费者consumer 在m1启动flume 在m1上再打开一个窗口,测试向flume中发送syslog m1打开的flume窗口中看最后一行的信息...flume、kafka、storm的整合 从上面两个例子我们可以看到,flume和kafka之前已经完成了通讯和部署,kafka和storm之间可以正常通讯,只差把storm的相关文件打包成jar部署到...Storm的安装、配置、部署,如果不了解,可以参考这篇文章《ubuntu12.04+storm0.9.2分布式集群的搭建》 复制kafka相关的jar包到storm的lib里面。...(因为在上面我们已经说过,kafka和storm的整合,主要是重写storm的spout,调用kafka的Consumer来接收消息并打印,所在需要用到这些jar包) 在m1上启动storm nimbus

    1.1K20

    kafka的JavaAPI操作

    一、创建maven工程并添加jar包 创建maven工程并添加以下依赖jar包的坐标到pom.xml org.apache.kafka...如果在处理代码中正常处理了,但是在提交offset请求的时候,没有连接到kafka或者出现了故障,那么该次修 改offset的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的offset...高阶API(High Level API) kafka消费者高阶API简单;隐藏Consumer与Broker细节;相关信息保存在zookeeper中。...(Low Level API) kafka消费者低级API非常灵活;需要自己负责维护连接Controller Broker。...四、kafka Streams API开发 需求:使用StreamAPI获取test这个topic当中的数据,然后将数据全部转为大写,写入到test2这个topic当中去 第一步:创建一个topic node01

    62730
    领券