首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

注意:Fayson的github调整为:https://github.com/fayson/cdhproject,本文的代码在github中也能找到。

1.文档编写目的

在Kafka集群实际应用中,Kafka的消费者有很多种(如:应用程序、Flume、Spark Streaming、Storm等),本篇文章主要讲述如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS。关于Flume更多sink方式实现敬请关注Fayson后续的文章。本文的数据流图如下:

内容概述

1.Kafka集群启用Kerberos

2.环境准备及配置Flume Agent

3.java访问并测试

测试环境

1.CM和CDH版本为5.11.2

2.采用root用户操作

前置条件

1.集群已启用Kerberos

2.集群已安装Kafka

3.集群已安装Flume

2.Kafka集群启用Kerberos

保存配置并重启Kafka服务。

3.环境准备

由于Kafka集群已启用Kerberos认证,这里需要准备访问Kafka集群的环境,如Keytab、jaas.conf配置等

1.生成访问Kafka集群的keytab文件,在Kerberos所在服务上执行如下命令

2.创建jaas.conf文件,文件内容如下

3.将keytab文件和jaas.conf文件拷贝至所有Flume Agent运行的节点

这里我们将上面的配置文件拷贝放在Flume Agent节点的/flume-keytab目录下

4.修改目录文件属主,确保flume用户有权限访问

[ec2-user@ip-172-31-21-45flume-keytab]$sudo chown -R flume. /flume-keytab/

[ec2-user@ip-172-31-21-45flume-keytab]$sudo chmod-R755/flume-keytab/

4.配置Flume Agent

1.配置Flume Agent读取Kafka数据写入HDFS

kafka.channels= c1

kafka.sources= s1

kafka.sinks= k1

kafka.sources.s1.type=org.apache.flume.source.kafka.KafkaSource

kafka.sources.s1.kafka.bootstrap.servers=ip-172-31-26-80.ap-southeast-1.compute.internal:9092,ip-172-31-21-45.ap-southeast-1.compute.internal:9092, ip-172-31-26-102.ap-southeast-1.compute.internal:9092

kafka.sources.s1.kafka.topics= test4

kafka.sources.s1.kafka.consumer.group.id=flume-consumer

kafka.sources.s1.kafka.consumer.security.protocol= SASL_PLAINTEXT

kafka.sources.s1.kafka.consumer.sasl.mechanism= GSSAPI

kafka.sources.s1.kafka.consumer.sasl.kerberos.service.name= kafka

kafka.sources.s1.channels= c1

kafka.channels.c1.type= memory

kafka.sinks.k1.type= hdfs

kafka.sinks.k1.channel= c1

kafka.sinks.k1.hdfs.kerberosKeytab= /flume-keytab/fayson.keytab

kafka.sinks.k1.hdfs.kerberosPrincipal= fayson@CLOUDERA.COM

kafka.sinks.k1.hdfs.path=/tmp/kafka-test

kafka.sinks.k1.hdfs.filePrefix= events-

kafka.sinks.k1.hdfs.writeFormat= Text

2.增加Flume Agent启动参数

-Djava.security.auth.login.config=/flume-keytab/jaas.conf

配置完成后保存更改并重启FlumeAgent服务。

5.Java生产消息

1.编写jaas.conf文件

2.使用Java编写消息生产代码

packagecom.cloudera;

importorg.apache.kafka.clients.producer.KafkaProducer;

importorg.apache.kafka.clients.producer.Producer;

importorg.apache.kafka.clients.producer.ProducerConfig;

importorg.apache.kafka.clients.producer.ProducerRecord;

importjava.io.File;

importjava.util.Properties;

/**

* package: com.cloudera

* describe:TODO

* creat_user: Fayson

* email: htechinfo@163.com

* creat_date: 2017/12/12

* creat_time:下午3:35

*公众号:Hadoop实操

*/

public classProducerTest{

public staticStringTOPIC_NAME="test4";

public staticStringconfPath=System.getProperty("user.dir") +File.separator+"conf";

public static voidmain(String[]args) {

try{

Stringkrb5conf =confPath+File.separator+"krb5.conf";

Stringjaasconf =confPath+File.separator+"jaas.conf";

System.setProperty("java.security.krb5.conf", krb5conf);

System.setProperty("java.security.auth.login.config", jaasconf);

System.setProperty("javax.security.auth.useSubjectCredsOnly","false");

// System.setProperty("sun.security.krb5.debug", "true"); //Kerberos Debug模式

Propertiesprops =newProperties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");

props.put(ProducerConfig.ACKS_CONFIG,"all");

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol","SASL_PLAINTEXT");

props.put("sasl.kerberos.service.name","kafka");

Producer producer =newKafkaProducer(props);

for(inti =; i

Stringmessage = i +"\t"+"fayson"+ i +"\t"+22+i;

ProducerRecordrecord =newProducerRecord(TOPIC_NAME, message);

producer.send(record);

System.out.println(message);

}

producer.flush();

producer.close();

}catch(Exceptione) {

e.printStackTrace();

}

}

}

3.将工程编译打包kafka-demo-1.0-SNAPSHOT.jar

mvnclean package

4.使用mvn命令将工程依赖库导出

mvndependency:copy-dependencies -DoutputDirectory=/Users/fayson/Desktop/lib

将导出的jar包放在run-kafka/lib目录下。

5.编写run.sh脚本,运行测试jar包

#!/bin/bash

JAVA_HOME=/usr/java/jdk1.8.0_131-cloudera

forfilein`ls lib/*jar`

do

CLASSPATH=$CLASSPATH:$file

done

exportCLASSPATH

$/bin/javacom.cloudera.ProducerTest

6.conf目录文件

fayson.keytab:fayson的keytab文件

jaas.conf:java访问Kerberos环境下的配置

krb5.conf:集群的krb5配置文件

6.Kafka->Flume->HDFS流程测试

1.将第5章开发好的示例放在集群的服务器上

2.执行run.sh

[ec2-user@ip-172-31-22-86run-kafka]$sh run.sh

3.查看HDFS的/extwarehouse/student目录下数据

这里可以看到数据已写入HDFS指定的目录。

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

您可能还想看

安装

安全

数据科学

其他

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20171215G00UHD00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券