将sample.log的数据发送到Kafka中,经过Spark Streaming处理,将数据格式变为以下形式:
commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid
在发送到kafka的另一个队列中
要求:
1、sample.log => 读文件,将数据发送到kafka队列中
2、从kafka队列中获取数据(0.10 接口不管理offset),变更数据格式
3、处理后的数据在发送到kafka另一个队列中
分析
1 使用课程中的redis工具类管理offset
2 读取日志数据发送数据到topic1
3 消费主题,将数据的分割方式修改为竖线分割,再次发送到topic2
1.OffsetsWithRedisUtils
package home.one
import java.util
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
import scala.collection.mutable
object OffsetsWithRedisUtils {
// 定义Redis参数
private val redisHost = "linux123"
private val redisPort = 6379
// 获取Redis的连接
private val config = new JedisPoolConfig
// 最大空闲数
config.setMaxIdle(5)
// 最大连接数
config.setMaxTotal(10)
private val pool = new JedisPool(config, redisHost, redisPort, 10000)
private def getRedisConnection: Jedis = pool.getResource
private val topicPrefix = "kafka:topic"
// Key:kafka:topic:TopicName:groupid
private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"
// 根据 key 获取offsets
def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
val jedis: Jedis = getRedisConnection
val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
val key = getKey(topic, groupId)
import scala.collection.JavaConverters._
// 将获取到的redis数据由Java的map转换为scala的map,数据格式为{key:[{partition,offset}]}
jedis.hgetAll(key)
.asScala
.map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
}
// 归还资源
jedis.close()
offsets.flatten.toMap
}
// 将offsets保存到Redis中
def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
// 获取连接
val jedis: Jedis = getRedisConnection
// 组织数据
offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
.groupBy(_._1)
.foreach{case (topic, buffer) =>
val key: String = getKey(topic, groupId)
import scala.collection.JavaConverters._
// 同样将scala的map转换为Java的map存入redis中
val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava
// 保存数据
jedis.hmset(key, maps)
}
jedis.close()
}
}
package home.one
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object KafkaProducer {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
// 读取sample.log文件数据
val lines: RDD[String] = sc.textFile("data/sample.log")
// 定义 kafka producer参数
val prop = new Properties()
// kafka的访问地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
// key和value的序列化方式
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
// 将读取到的数据发送到mytopic1
lines.foreachPartition{iter =>
// 初始化KafkaProducer
val producer = new KafkaProducer[String, String](prop)
iter.foreach{line =>
// 封装数据
val record = new ProducerRecord[String, String]("mytopic1", line)
// 发送数据
producer.send(record)
}
producer.close()
}
}
}
3.HomeOne
package home.one
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HomeOne {
val log = Logger.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
// 需要消费的topic
val topics: Array[String] = Array("mytopic1")
val groupid = "mygroup1"
// 定义kafka相关参数
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)
// 从Redis获取offset
val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)
// 创建DStream
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
// 从kafka中读取数据
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)
)
// 转换后的数据发送到另一个topic
dstream.foreachRDD { rdd =>
if (!rdd.isEmpty) {
// 获取消费偏移量
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 处理数据发送到topic2
rdd.foreachPartition(process)
// 将offset保存到Redis
OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)
}
}
// 启动作业
ssc.start()
// 持续执行
ssc.awaitTermination()
}
// 将处理后的数据发送到topic2
def process(iter: Iterator[ConsumerRecord[String, String]]) = {
iter.map(line => parse(line.value))
.filter(!_.isEmpty)
.foreach(line => sendMsg2Topic(line, "mytopic2"))
}
// 调用kafka生产者发送消息
def sendMsg2Topic(msg: String, topic: String): Unit = {
val producer = new KafkaProducer[String, String](getKafkaProducerParameters())
val record = new ProducerRecord[String, String](topic, msg)
producer.send(record)
}
// 修改数据格式,将逗号分隔变成竖线分割
def parse(text: String): String = {
try {
val arr = text.replace("<<<!>>>", "").split(",")
if (arr.length != 15) return ""
arr.mkString("|")
} catch {
case e: Exception =>
log.error("解析数据出错!", e)
""
}
}
// 定义kafka消费者的配置信息
def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
)
}
// 定义生产者的kafka配置
def getKafkaProducerParameters(): Properties = {
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop
}
}
/*
假设机场的数据如下:
1, "SFO"
2, "ORD"
3, "DFW"
机场两两之间的航线及距离如下:
1, 2,1800
2, 3, 800
3, 1, 1400
用 GraphX 完成以下需求:
求所有的顶点
求所有的边
求所有的triplets
求顶点数
求边数
求机场距离大于1000的有几个,有哪些
按所有机场之间的距离排序(降序),输出结果
*/
代码:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
object TwoHome {
def main(args: Array[String]): Unit = {
// 初始化
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
//初始化数据
val vertexArray: Array[(Long, String)] = Array((1L, "SFO"), (2L, "ORD"), (3L, "DFW"))
val edgeArray: Array[Edge[Int]] = Array(
Edge(1L, 2L, 1800),
Edge(2L, 3L, 800),
Edge(3L, 1L, 1400)
)
//构造vertexRDD和edgeRDD
val vertexRDD: RDD[(VertexId, String)] = sc.makeRDD(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)
//构造图
val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD)
//所有的顶点
println("所有顶点:")
graph.vertices.foreach(println)
//所有的边
println("所有边:")
graph.edges.foreach(println)
//所有的triplets
println("所有三元组信息:")
graph.triplets.foreach(println)
//求顶点数
val vertexCnt = graph.vertices.count()
println(s"总顶点数:$vertexCnt")
//求边数
val edgeCnt = graph.edges.count()
println(s"总边数:$edgeCnt")
//机场距离大于1000的
println("机场距离大于1000的边信息:")
graph.edges.filter(_.attr > 1000).foreach(println)
//按所有机场之间的距离排序(降序)
println("降序排列所有机场之间距离")
graph.edges.sortBy(-_.attr).collect().foreach(println)
}
}
运行结果