前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【基于Flink的城市交通实时监控平台】需求一:卡口车辆超速情况检测

【基于Flink的城市交通实时监控平台】需求一:卡口车辆超速情况检测

作者头像
火之高兴
发布2024-07-25 15:49:34
880
发布2024-07-25 15:49:34
举报
文章被收录于专栏:大数据应用技术
案例需求:

从kafka的topic-car中读取卡口数据,将超速车辆写入mysql的select * from t_speeding_info表,当通过卡口的车速超过60就认定为超速

卡口数据格式:
代码语言:javascript
复制
 `action_time` long  --摄像头拍摄时间戳,精确到秒, 
 `monitor_id` string  --卡口号, 
 `camera_id` string   --摄像头编号, 
 `car` string  --车牌号码, 
 `speed` double  --通过卡口的速度, 
 `road_id` string  --道路id, 
 `area_id` string  --区域id, 

其中每个字段之间使用逗号隔开。 例如:1682219447,0001,1,豫DF09991,34.5,01,20 区域ID代表:一个城市的行政区域。 摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。 道路ID:城市中每一条道路都有名字,比如:航海路。交通部门会给航海路一个唯一编号。

MySQL建表语句

注意这个t_monitor_info是限速信息表

代码语言:javascript
复制
CREATE TABLE `t_monitor_info` (
  `monitor_id` varchar(255) NOT NULL,  
  `road_id` varchar(255) NOT NULL,
  `speed_limit` int(11) DEFAULT NULL,
  `area_id` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

限速信息:

在这里插入图片描述
在这里插入图片描述

超速表:

代码语言:javascript
复制
DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `car` varchar(255) NOT NULL,
  `monitor_id` varchar(255) DEFAULT NULL,
  `road_id` varchar(255) DEFAULT NULL,
  `real_speed` double DEFAULT NULL,
  `limit_speed` int(11) DEFAULT NULL,
`action_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
项目代码
代码语言:javascript
复制
该需求使用两个.java文件编写,分为项目代码和javaBean代码。

Test1_OutSpeedMonitor.java:

代码语言:javascript
复制
package day110612;


import bean.MonitorInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;



import util.JdbcUtils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;


public class Test1_OutSpeedMonitor {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "car-group1");
        DataStream<String> ds1 = env.addSource(
                new FlinkKafkaConsumer<>("topic-car",
                        new SimpleStringSchema(),
                        properties)
        );


        SingleOutputStreamOperator<MonitorInfo> ds2 = ds1.map(new MapFunction<String, MonitorInfo>() {
            @Override
            public MonitorInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new MonitorInfo(
                        Long.parseLong(arr[0]),
                        arr[1],
                        arr[2],
                        arr[3],
                        Double.parseDouble(arr[4]), arr[5], arr[6]);
            }
        });

        ds2.filter(new RichFilterFunction<MonitorInfo>() {

            Connection connection;
            PreparedStatement ps;
            ResultSet rs;

            @Override
            public void open(Configuration parameters) throws Exception {
                connection = JdbcUtils.getconnection();
                ps = connection.prepareStatement(
                        "select speed_limit from t_monitor_info where monitor_id = ?");
            }

            @Override
            public boolean filter(MonitorInfo value) throws Exception {
                ps.setString(1, value.getMonitorId());
                rs = ps.executeQuery();
                //如果t_monitor_info无法查询出该卡口的编号,则给定一个60的限速
                int speed_limit = 60;
                if (rs.next()) {
                    speed_limit = rs.getInt("speed_limit");
                }
                value.setSpeedLimit(speed_limit);
                return value.getSpeed() > speed_limit * 1.1; //超速10%,判定为超速
            }

            @Override
            public void close() throws Exception {
                JdbcUtils.release(rs, ps, connection);
            }

        }).addSink(JdbcSink.sink(
                "insert into t_speeding_info values(null,?,?,?,?,?,?)",
                (PreparedStatement ps, MonitorInfo monitorInfo) -> {
                    ps.setString(1, monitorInfo.getCar());
                    ps.setString(2, monitorInfo.getMonitorId());
                    ps.setString(3, monitorInfo.getRoadId());
                    ps.setDouble(4, monitorInfo.getSpeed());
                    ps.setInt(5, monitorInfo.getSpeedLimit());
                    ps.setLong(6, monitorInfo.getActionTime());

                },
                JdbcExecutionOptions.builder().withBatchSize(1).withBatchIntervalMs(5000).build()
                ,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://hadoop10:3306/yangyulin?useSSL=false&useUnicode=true&characterEncoding=utf8")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("0000")
                        .build()));

        env.execute();

    }
}

MonitorInfo.java:

代码语言:javascript
复制
package bean;

//这是郭亚超的java豆
public class MonitorInfo {
    private Long actionTime;
    private String monitorId;
    private String cameraId;
    private String car;
    private Double speed;  //车辆通过卡口的实际车速
    private String roadId;
    private String areaId;

    private Integer speedLimit;  //卡口的限速


    public MonitorInfo() {
    }

    public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId) {
        this.actionTime = actionTime;
        this.monitorId = monitorId;
        this.cameraId = cameraId;
        this.car = car;
        this.speed = speed;
        this.roadId = roadId;
        this.areaId = areaId;
    }

    public MonitorInfo(Long actionTime, String monitorId, String cameraId, String car, Double speed, String roadId, String areaId, Integer speedLimit) {
        this.actionTime = actionTime;
        this.monitorId = monitorId;
        this.cameraId = cameraId;
        this.car = car;
        this.speed = speed;
        this.roadId = roadId;
        this.areaId = areaId;
        this.speedLimit = speedLimit;
    }

    /**
     * 获取
     *
     * @return actionTime
     */
    public Long getActionTime() {
        return actionTime;
    }

    /**
     * 设置
     *
     * @param actionTime
     */
    public void setActionTime(Long actionTime) {
        this.actionTime = actionTime;
    }

    /**
     * 获取
     *
     * @return monitorId
     */
    public String getMonitorId() {
        return monitorId;
    }

    /**
     * 设置
     *
     * @param monitorId
     */
    public void setMonitorId(String monitorId) {
        this.monitorId = monitorId;
    }

    /**
     * 获取
     *
     * @return cameraId
     */
    public String getCameraId() {
        return cameraId;
    }

    /**
     * 设置
     *
     * @param cameraId
     */
    public void setCameraId(String cameraId) {
        this.cameraId = cameraId;
    }

    /**
     * 获取
     *
     * @return car
     */
    public String getCar() {
        return car;
    }

    /**
     * 设置
     *
     * @param car
     */
    public void setCar(String car) {
        this.car = car;
    }

    /**
     * 获取
     *
     * @return speed
     */
    public Double getSpeed() {
        return speed;
    }

    /**
     * 设置
     *
     * @param speed
     */
    public void setSpeed(Double speed) {
        this.speed = speed;
    }

    /**
     * 获取
     *
     * @return roadId
     */
    public String getRoadId() {
        return roadId;
    }

    /**
     * 设置
     *
     * @param roadId
     */
    public void setRoadId(String roadId) {
        this.roadId = roadId;
    }

    /**
     * 获取
     *
     * @return areaId
     */
    public String getAreaId() {
        return areaId;
    }

    /**
     * 设置
     *
     * @param areaId
     */
    public void setAreaId(String areaId) {
        this.areaId = areaId;
    }

    /**
     * 获取
     *
     * @return speedLimit
     */
    public Integer getSpeedLimit() {
        return speedLimit;
    }

    /**
     * 设置
     *
     * @param speedLimit
     */
    public void setSpeedLimit(Integer speedLimit) {
        this.speedLimit = speedLimit;
    }

    public String toString() {
        return "MonitorInfo{actionTime = " + actionTime + ", monitorId = " + monitorId + ", cameraId = " + cameraId + ", car = " + car + ", speed = " + speed + ", roadId = " + roadId + ", areaId = " + areaId + ", speedLimit = " + speedLimit + "}";
    }
}

java bean的生成使用了Idea插件。

代码解释
代码语言:javascript
复制
这段代码实现了对车辆超速信息的实时监控和存储。

代码解释如下:

1. 导入所需的类和包。

2. 创建`StreamExecutionEnvironment`实例。

3. 创建Kafka的连接配置,并设置相关属性。

4. 创建一个`FlinkKafkaConsumer`,用于从Kafka主题中接收数据流。

5. 使用`map`函数将接收到的文本数据转换为`MonitorInfo`对象。

6. 使用`filter`函数对超速的车辆进行过滤。

    - 在`open`方法中,建立与数据库的连接,并准备查询语句。
    - 在`filter`方法中,根据卡口ID查询对应的限速值,并将查询结果设置到`MonitorInfo`对象中。
    - 如果无法查询到限速值,则将限速值设置为默认值60。
    - 判断车辆的实际速度是否超过限速值的10%(超速10%判定为超速),返回布尔值。

7. 在`addSink`中使用`JdbcSink.sink()`方法将超速的车辆信息写入到MySQL数据库。

    - 设置插入数据的SQL语句,使用占位符表示待填充的参数。
    - 使用lambda表达式定义参数填充逻辑,将`MonitorInfo`对象中的字段值设置到预编译语句中的对应位置。
    - 使用`JdbcExecutionOptions`设置批处理大小和间隔时间。
    - 使用`JdbcConnectionOptions`设置数据库连接信息。

8. 调用`env.execute()`方法启动Flink程序的执行。

总体来说,该代码通过从Kafka接收车辆数据流,并对超速的车辆进行监控和存储。在过滤阶段,根据卡口ID查询对应的限速值,并判断车辆的实际速度是否超过限速值的10%。超速的车辆信息将被写入到MySQL数据库中。

以上分析来自ChatGPT3.5,由我整理完善。

测试流程
测试数据
代码语言:javascript
复制
1686647522,0002,1,豫A99999,100.5,01,20
1686647522,0002,1,豫A99999,80.8,02,20
1686647522,0002,1,豫A99999,90.5,03,20
1686647522,0002,1,豫A99999,90.4,03,20
通过kafka发送
在这里插入图片描述
在这里插入图片描述
限速对照表
在这里插入图片描述
在这里插入图片描述

这里我写错了,当时road_id和area_id没有分清楚,应该是按照道路ID匹配限速,我当成了区域ID,但是不影响需求的实现流程。 超速车辆已经写入MySQL:

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 案例需求:
  • 卡口数据格式:
  • MySQL建表语句
  • 项目代码
  • 代码解释
  • 测试流程
    • 测试数据
      • 通过kafka发送
        • 限速对照表
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档