从kafka的topic-car中读取卡口数据,将超速车辆写入mysql的select * from t_speeding_info表,当通过卡口的车速超过60就认定为超速
`action_time` long --摄像头拍摄时间戳,精确到秒,
`monitor_id` string --卡口号,
`camera_id` string --摄像头编号,
`car` string --车牌号码,
`speed` double --通过卡口的速度,
`road_id` string --道路id,
`area_id` string --区域id,
其中每个字段之间使用逗号隔开。 例如:1682219447,0001,1,豫DF09991,34.5,01,20 区域ID代表:一个城市的行政区域。 摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。 道路ID:城市中每一条道路都有名字,比如:航海路。交通部门会给航海路一个唯一编号。
注意这个t_monitor_info是限速信息表
CREATE TABLE `t_monitor_info` (
`monitor_id` varchar(255) NOT NULL,
`road_id` varchar(255) NOT NULL,
`speed_limit` int(11) DEFAULT NULL,
`area_id` varchar(255) DEFAULT NULL,
PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
限速信息:
超速表:
DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`car` varchar(255) NOT NULL,
`monitor_id` varchar(255) DEFAULT NULL,
`road_id` varchar(255) DEFAULT NULL,
`real_speed` double DEFAULT NULL,
`limit_speed` int(11) DEFAULT NULL,
`action_time` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
该需求使用两个.java文件编写,分为项目代码和javaBean代码。
Test1_OutSpeedMonitor.java:
package day110612;
import bean.MonitorInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import util.JdbcUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;
public class Test1_OutSpeedMonitor {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop10:9092");
properties.setProperty("group.id", "car-group1");
DataStream<String> ds1 = env.addSource(
new FlinkKafkaConsumer<>("topic-car",
new SimpleStringSchema(),
properties)
);
SingleOutputStreamOperator<MonitorInfo> ds2 = ds1.map(new MapFunction<String, MonitorInfo>() {
@Override
public MonitorInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new MonitorInfo(
Long.parseLong(arr[0]),
arr[1],
arr[2],
arr[3],
Double.parseDouble(arr[4]), arr[5], arr[6]);
}
});
ds2.filter(new RichFilterFunction<MonitorInfo>() {
Connection connection;
PreparedStatement ps;
ResultSet rs;
@Override
public void open(Configuration parameters) throws Exception {
connection = JdbcUtils.getconnection();
ps = connection.prepareStatement(
"select speed_limit from t_monitor_info where monitor_id = ?");
}
@Override
public boolean filter(MonitorInfo value) throws Exception {
ps.setString(1, value.getMonitorId());
rs = ps.executeQuery();
//如果t_monitor_info无法查询出该卡口的编号,则给定一个60的限速
int speed_limit = 60;
if (rs.next()) {
speed_limit = rs.getInt("speed_limit");
}
value.setSpeedLimit(speed_limit);
return value.getSpeed() > speed_limit * 1.1; //超速10%,判定为超速
}
@Override
public void close() throws Exception {
JdbcUtils.release(rs, ps, connection);
}
}).addSink(JdbcSink.sink(
"insert into t_speeding_info values(null,?,?,?,?,?,?)",
(PreparedStatement ps, MonitorInfo monitorInfo) -> {
ps.setString(1, monitorInfo.getCar());
ps.setString(2, monitorInfo.getMonitorId());
ps.setString(3, monitorInfo.getRoadId());
ps.setDouble(4, monitorInfo.getSpeed());
ps.setInt(5, monitorInfo.getSpeedLimit());
ps.setLong(6, monitorInfo.getActionTime());
},
JdbcExecutionOptions.builder().withBatchSize(1).withBatchIntervalMs(5000).build()
,
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop10:3306/yangyulin?useSSL=false&useUnicode=true&characterEncoding=utf8")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("0000")
.build()));
env.execute();
}
}
MonitorInfo.java:
package bean;
//这是郭亚超的java豆
public class MonitorInfo {
private Long actionTime;
private String monitorId;
private String cameraId;
private String car;
private Double speed; //车辆通过卡口的实际车速
private String roadId;
private String areaId;
private Integer speedLimit; //卡口的限速
public MonitorInfo() {
}
public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId) {
this.actionTime = actionTime;
this.monitorId = monitorId;
this.cameraId = cameraId;
this.car = car;
this.speed = speed;
this.roadId = roadId;
this.areaId = areaId;
}
public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId, Integer speedLimit) {
this.actionTime = actionTime;
this.monitorId = monitorId;
this.cameraId = cameraId;
this.car = car;
this.speed = speed;
this.roadId = roadId;
this.areaId = areaId;
this.speedLimit = speedLimit;
}
/**
* 获取
*
* @return actionTime
*/
public Long getActionTime() {
return actionTime;
}
/**
* 设置
*
* @param actionTime
*/
public void setActionTime(Long actionTime) {
this.actionTime = actionTime;
}
/**
* 获取
*
* @return monitorId
*/
public String getMonitorId() {
return monitorId;
}
/**
* 设置
*
* @param monitorId
*/
public void setMonitorId(String monitorId) {
this.monitorId = monitorId;
}
/**
* 获取
*
* @return cameraId
*/
public String getCameraId() {
return cameraId;
}
/**
* 设置
*
* @param cameraId
*/
public void setCameraId(String cameraId) {
this.cameraId = cameraId;
}
/**
* 获取
*
* @return car
*/
public String getCar() {
return car;
}
/**
* 设置
*
* @param car
*/
public void setCar(String car) {
this.car = car;
}
/**
* 获取
*
* @return speed
*/
public Double getSpeed() {
return speed;
}
/**
* 设置
*
* @param speed
*/
public void setSpeed(Double speed) {
this.speed = speed;
}
/**
* 获取
*
* @return roadId
*/
public String getRoadId() {
return roadId;
}
/**
* 设置
*
* @param roadId
*/
public void setRoadId(String roadId) {
this.roadId = roadId;
}
/**
* 获取
*
* @return areaId
*/
public String getAreaId() {
return areaId;
}
/**
* 设置
*
* @param areaId
*/
public void setAreaId(String areaId) {
this.areaId = areaId;
}
/**
* 获取
*
* @return speedLimit
*/
public Integer getSpeedLimit() {
return speedLimit;
}
/**
* 设置
*
* @param speedLimit
*/
public void setSpeedLimit(Integer speedLimit) {
this.speedLimit = speedLimit;
}
public String toString() {
return "MonitorInfo{actionTime = " + actionTime + ", monitorId = " + monitorId + ", cameraId = " + cameraId + ", car = " + car + ", speed = " + speed + ", roadId = " + roadId + ", areaId = " + areaId + ", speedLimit = " + speedLimit + "}";
}
}
java bean的生成使用了Idea插件。
这段代码实现了对车辆超速信息的实时监控和存储。
代码解释如下:
1. 导入所需的类和包。
2. 创建`StreamExecutionEnvironment`实例。
3. 创建Kafka的连接配置,并设置相关属性。
4. 创建一个`FlinkKafkaConsumer`,用于从Kafka主题中接收数据流。
5. 使用`map`函数将接收到的文本数据转换为`MonitorInfo`对象。
6. 使用`filter`函数对超速的车辆进行过滤。
- 在`open`方法中,建立与数据库的连接,并准备查询语句。
- 在`filter`方法中,根据卡口ID查询对应的限速值,并将查询结果设置到`MonitorInfo`对象中。
- 如果无法查询到限速值,则将限速值设置为默认值60。
- 判断车辆的实际速度是否超过限速值的10%(超速10%判定为超速),返回布尔值。
7. 在`addSink`中使用`JdbcSink.sink()`方法将超速的车辆信息写入到MySQL数据库。
- 设置插入数据的SQL语句,使用占位符表示待填充的参数。
- 使用lambda表达式定义参数填充逻辑,将`MonitorInfo`对象中的字段值设置到预编译语句中的对应位置。
- 使用`JdbcExecutionOptions`设置批处理大小和间隔时间。
- 使用`JdbcConnectionOptions`设置数据库连接信息。
8. 调用`env.execute()`方法启动Flink程序的执行。
总体来说,该代码通过从Kafka接收车辆数据流,并对超速的车辆进行监控和存储。在过滤阶段,根据卡口ID查询对应的限速值,并判断车辆的实际速度是否超过限速值的10%。超速的车辆信息将被写入到MySQL数据库中。
以上分析来自ChatGPT3.5,由我整理完善。
1686647522,0002,1,豫A99999,100.5,01,20
1686647522,0002,1,豫A99999,80.8,02,20
1686647522,0002,1,豫A99999,90.5,03,20
1686647522,0002,1,豫A99999,90.4,03,20
这里我写错了,当时road_id和area_id没有分清楚,应该是按照道路ID匹配限速,我当成了区域ID,但是不影响需求的实现流程。 超速车辆已经写入MySQL: