目标:了解实时计算需求及技术方案
路径
实施
实时计算需求
实时统计消息总量
select count(*) from tbname;
实时统计各个地区发送消息的总量
select sender_area,count(*) from tbname group by sender_area;
实时统计各个地区接收消息的总量
select receiver_area,count(*) from tbname group by receiver_area;
实时统计每个用户发送消息的总量
select sender_account,count(*) from tbname group by sender_account;
实时统计每个用户接收消息的总量
select receiver_account,count(*) from tbname group by receiver_account;
|
构建实时统计报表
技术方案
小结
目标:了解Flink的功能、特点及应用场景
路径
实施
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
小结
目标:实现开发环境的代码模块构建
实施
导入包中代码到IDEA中
flink包:应用类包,用于存放实际的应用类
pojo包:实体类包,用于存放所有实体类
MoMoCountBean:用于封装统计分析的结果
private Integer id ; //结果id
private Long moMoTotalCount ; //总消息数
private String moMoProvince ; //省份
private String moMoUsername ; //用户
private Long moMo_MsgCount ; //消息数
//结果类型:1-总消息数 2-各省份发送消息数 3-各省份接受消息数 4-每个用户发送消息数 5-每个用户接受消息数
private String groupType ;
utils包:工具类包,用于存放所有工具类
HttpClientUtils:用于实现将经纬度地址解析为省份的工具类
public static String findByLatAndLng(String lat , String lng)
小结
目标:了解省份解析的实现
路径
实施
注册百度开发者
百度地图开放平台:https://lbsyun.baidu.com/
逆地理编码:https://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding-abroad
https://api.map.baidu.com/reverse_geocoding/v3/?ak=您的ak&output=json&coordtype=wgs84ll&location=31.225696563611,121.49884033194 //GET请求
注册开放平台,获取AK码:参考《附录三》
测试省份解析
package bigdata.itcast.cn.momo.online.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import javax.swing.text.html.parser.Entity;
import java.io.IOException;
import java.util.Map;
public class HttpClientUtils {
//传入经纬度, 返回查询的地区
public static String findByLatAndLng(String lat , String lng){
try {
CloseableHttpClient httpClient = HttpClients.createDefault();
String url = "http://api.map.baidu.com/reverse_geocoding/v3/?ak=l8hKKRCuX2zrRa93jneDrPmc2UspGatO&output=json&coordtype=wgs84ll&location="+lat+","+lng;
System.out.println(url);
//请求解析
HttpGet httpGet = new HttpGet(url);
//得到结果
CloseableHttpResponse response = httpClient.execute(httpGet);
//获取数据
HttpEntity httpEntity = response.getEntity();
//转换成JSON
String json = EntityUtils.toString(httpEntity);
//从JSON中返回省份
Map<String,Object> result = JSONObject.parseObject(json, Map.class);
if(result.get("status").equals(0)){
Map<String,Object> resultMap = (Map<String,Object>)result.get("result");
resultMap = (Map<String, Object>) resultMap.get("addressComponent");
String province = (String) resultMap.get("province");
return province;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
//测试
String sf = findByLatAndLng("43.921297","124.655376");
System.out.println(sf);
}
}
小结
目标:了解Flink代码的基本实现
路径
实施
消费Kafka
//构建Kafka配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "momo2");
//构建消费者
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", new SimpleStringSchema(),props);
//Flink加载消费者
DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);
实时统计分析
//todo:3. 进行转换统计操作:
//3.1: 统计总消息量
countTotalMsg(streamSource);
//3.2: 基于经纬度, 统计各省份发送消息量
countProvinceSenderMsg(streamSource);
//3.3: 基于经纬度, 统计各省份接收消息量
countProvinceReceiverMsg(streamSource);
//3.4: 统计各个用户, 发送消息量
countUserNameSenderMsg(streamSource);
//3.5: 统计各个用户, 接收消息量
countUserNameReceiverMsg(streamSource);
//5. 执行flink操作
env.execute("momoFlinkCount");
实时更新结果到MySQL
streamOperator.addSink(new MysqlSink("2"));
if (status.equals("2")){
String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
ResultSet resultSet = stat.executeQuery(sql);
boolean flag = resultSet.next();
if(flag) {
sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
}else {
sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('"+value.getMoMoProvince()+"',"+value.getMoMo_MsgCount()+",'2') ";
}
stat.executeUpdate(sql);
}
小结
目标:实现Flink实时分析测试
路径
实施
MySQL准备
运行测试
启动Flink程序:运行MoMoFlinkCount
启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100
观察MySQL结果
小结