Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >这次来整个高端的API实时QPS流计算

这次来整个高端的API实时QPS流计算

作者头像
老李秀
发布于 2021-01-05 02:06:05
发布于 2021-01-05 02:06:05
1.8K00
代码可运行
举报
运行总次数:0
代码可运行

大家好,泥腿子安尼特又和大家见面了。转眼一年又要过去了,我也跌跌撞撞的算是翻完了这本。

就像读书的时候周日晚上补作业一样,我也想在2020年再写一篇文章。前段时间倒腾了下配置中心,但是因为学艺不精,再加上连个CAP都不懂的人,我感觉写了可能也没卵用,不能提升姿势。那我整点啥,那就来简介下今年火到爆的Flink。

开篇

那Flink到底是个啥,来我们来看下它官网的介绍。

是不是和我第一眼看到的一样,不知所云,先不用管,主要这个东西前面带个Apache就很牛逼。(扯个题外话

,几年前,我刚入行PHP的时候,我清晰的记得有个面试题,web服务器,nginx与apache比,然后为啥nginx牛逼,那时候我记得就百度到的答案默念一遍,然后apache在我心中一直是个拉胯的存在= =)

那Flink又有多牛逼呢!我来上个图,最近股价猛跌的福报厂双11的时候用Flink进行实时计算是这样的

是不是很牛逼!

统计关键词

好的 废话不多说,我们基于官网的demo 开始进入Flink的旅程

我们先不管什么是流什么是批,对着代码就是干

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package org.myorg.quickstart;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchJob {

public static void main(String[] args) throws Exception {
// set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.readTextFile("/path/xxxx.log");

        DataSet<Tuple2<String, Integer>> counts =
                text.flatMap(new LogLevelFilter())
                        .groupBy(0)
                        .sum(1);

        counts.print();

        env.execute("Flink Batch Java API Skeleton");
    }

// 自定义函数
public static class LogLevelFilter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// json decode每一行的log
            JSONObject jsonObject = JSON.parseObject(value);
// 统计不同level的log
if (jsonObject.containsKey("level")) {
String level = jsonObject.getString("level");
                out.collect(new Tuple2<>(level, 1));
            }
        }
    }
}

那输出结果如下

这时候有小伙伴要问了,这就是大数据,实时流计算???

差不多一行linux命令可以搞定

这只是个demo,能统计关键词了 那我们再扩展一下,基于nginx的access.log 我们搞个实时统计网站qps

实时QPS统计

我们先开启nginx access log 顺便把每一行的log记录成json串 比如这样

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
{
"@timestamp":"2020-12-27T18:58:38+08:00",
"remote_addr":"127.0.0.1",
"referer":"-",
"request":"GET /wechat/config HTTP/1.1",
"status":200,
"bytes":64,
"user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
"http_x_forwarded":"-",
"request_time":"0.145"
}

然后 上代码 实时读取文件流 算qps

flink 读取文件流有两种模式 一种是直接一次性读完 一种是持续性检测,因为nginx access log是会不断增加的 所以我们选择第二种 来实时统计网站请求状态码的count

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package org.myorg.quickstart;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingJob {

public static final Logger logger = LoggerFactory.getLogger(StreamingJob.class);

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TextInputFormat textInputFormat = new TextInputFormat(null);

DataStream<String> nginxAccessLog = env.readFile(textInputFormat, "path/to/nginx_log/access.log", FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

DataStream<StatusCount> nginxAccessLogStream = nginxAccessLog.flatMap((FlatMapFunction<String, StatusCount>) (s, collector) -> {
try {
JSONObject jsonObject = JSON.parseObject(s);
if (jsonObject.containsKey("status")) {
Integer status = jsonObject.getInteger("status");
if (status.compareTo(500) < 0) {
                        collector.collect(StatusCount.of(status + "", 1L));
                    }
                }
            } catch (Exception e) {
                logger.error("解析nginx access log 错误 {}", e.getMessage());
            }
        });
DataStream<StatusCount> windowsCounts = nginxAccessLogStream.
                keyBy("status").
                timeWindow(Time.seconds(1)).
                sum("count");
        windowsCounts.print().setParallelism(1);

        env.execute("Flink Streaming Java API Skeleton");
    }

public static class StatusCount {

public String status;
public Long count;

public static StatusCount of(String word, Long count) {
StatusCount statusCount = new StatusCount();
            statusCount.status = word;
            statusCount.count = count;
return statusCount;
        }

        @Override
public String toString() {
return "status:" + status + " count:" + count;
        }
    }
}

效果就是这样的

这里就实时的打印出了每秒中nginx access log中状态小于500的所有status

这样岂不是完成了实时统计QPS 而且还可以按状态分组。

这时候又有小伙伴要问了

我实时cat nginx log也差不多也行啊!

假如你公司有50台api服务器 每台每天产生500G的日志 而且日志按小时或者文件名分割 你cat给我一个看看!

实际生产环境,现在主流都是ELK一套来管理log(我之前也大致介绍过),运维也不会直接把log往ES插,因为高峰期的时候 ES的写入速度并不快 可能会插崩它。所以,运维一般还是把log收集到kafka,然后消费kafka的方式插入ES,flink也可以消费kafka,只要把这里的文件流换成消费kakfa就可以做到算出API整体的QPS了。

如果你看到了这里,实操之后,我们再回过头来解释下刚才的代码,再了解下flink是个啥。因为如果开篇就大肆介绍名词 简介,我感觉你们也不会看,因为感觉跟自己没啥关系。

我们看上述两个例子的代码,都是先读取一个文件流,然后用自定义的类来解析每行文本,然后第一个例子group就像你们sql中groupby 因为我把每行文本的level提取出来了,然后还有个计数,所以有个Tuple2。

第二个例子稍微麻烦点,可能也难以理解点,因为用到了时间窗口。就是我把每秒读取文本里的内容当做一个独立的时间窗口,这样每秒access log里各种status都打印出来了。而且他是可以一直在不断运行并且一直打印下去的。

那我还是不明白flink牛逼在哪啊!我再来介绍一个概念,是什么是有界流,什么是无界流

假如李老某年某月开了个网站,

那么网站的数据的开始时间就是他第一次网站发布的时候。现在这个网站也还开着,每天都不断有人陆陆续续的访问,数据一致在积累,假如50年后李老嗝屁了,但是小李还继续维护着这个网站,100年后,小李也嗝屁了,小小李说不定还继续维护着这个网站。所以你不知道这个数据的边界在哪,数据从现在到未来一直会源源不断的流进来,这就是无界的数据流。就像我上面两个demo,第一个我一次性读了这个文本,那么数据是有界限的,第二个例子,因为我nginx access log就可以类比李老的网站,没有界限,所以可以叫它无界流。而flink就是非常方便能处理这些无界流的数据。

我们再来看官网那句话 ——

Stateful Computations Over Streams

在流上进行有状态的计算,是不是有点觉得牛逼了呢。当然我只是单机随便演示下demo。flink可以稳定的运行在大数据成熟的yarn集群上,一个flink job可以消费多个流 而且可以保存多个状态。flink集成了消费kafka、rabbit MQ 等等之类的数据源,所以用起来也很方便。比如你可以消费kafka里的上报数据,kafka里的binlog数据,来实时计算比如一分钟的订单数啊,一分钟内的GVM啊等等之类。至于其它一些高端的概念,比如什么滑动窗口、滚动窗口、什么水印、什么反压机制,我也不懂。


本文主要基于一些简单的demo简介了flink 里面很多概念跟代码都没解释清楚,主要是不想让大家入门的时候接收太多的名词概念

本泥腿子在线上刚发布过一个flink job 目前也有很多不懂的,所以可能表述的不是那么好 大家见谅

如果大家有兴趣学这个玩意,真的学好了真的高薪,现在这东西火的一逼

如果有兴趣的话 可以对着官网看几眼,官网介绍的挺全的。

https://flink.apache.org/zh/flink-architecture.html

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 高性能API社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
零距离接触Flink:全面解读流计算框架入门与实操指南
Apache Flink作为开源的分布式流处理框架,受到了广泛的关注和应用。本文将分享如何从零开始搭建一个Flink运行环境,并在其上运行一个“WordCount”的例子程序。
司夜
2023/09/14
7720
零距离接触Flink:全面解读流计算框架入门与实操指南
「13章」Flink 从0到1实战实时风控系统
在进行 Flink 开发之前,需要先搭建好开发环境。主要步骤包括安装 Java(Flink 基于 Java 开发,推荐 Java 8 或更高版本)和下载 Flink 发行版,下载完成后解压到指定目录,配置好环境变量。可以通过以下命令检查是否安装成功:
瘦瘦itazs和fun
2025/02/09
4350
Flink教程(1) Flink DataStream 创建数据源 转换算子「建议收藏」
从前年开始,就被公众号上Flink文章频繁的刷屏,看来是时候了解下Flink了。 Flink官网第一句话介绍是数据流上的有状态计算。 我第一眼看这句话感觉很拗口,什么是流上的计算?什么是有状态? 作为菜鸟,我觉的学习Flink最好方法是看官网并敲代码实践,不会的百度些博客学学。
Java架构师必看
2022/05/26
1.6K0
Flink教程(1) Flink DataStream 创建数据源 转换算子「建议收藏」
流计算中的性能优化有哪些方法?请举例说明。
在流计算中,性能优化是非常重要的,可以提高系统的吞吐量、减少延迟,并提供更好的实时性能。下面是一些流计算中常用的性能优化方法,并结合一个具体的案例进行说明。
GeekLiHua
2025/01/21
1750
Flink入门宝典(详细截图版)
本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。
大数据流动
2019/09/29
8350
Flink入门宝典(详细截图版)
全网最详细4W字Flink全面解析与实践(上)
在大数据技术栈的探索中,我们曾讨论了离线计算的Spark,而当谈到实时计算,就不得不提Flink。本文将集中讨论Flink,旨在详尽展示其核心概念,从而助力你在大数据旅程中向前迈进。
BookSea
2023/10/28
1.3K2
全网最详细4W字Flink全面解析与实践(上)
流计算中的窗口操作是什么?请解释其作用和使用场景。
流计算中的窗口操作是一种将无限的数据流划分为有限大小的数据块,并在这些数据块上进行操作和计算的技术。窗口操作可以帮助我们处理实时数据流,并对数据进行统计、分析和聚合。
GeekLiHua
2025/01/21
1130
什么是Flink?请简要解释其概念和特点。
Flink是一个开源的流处理和批处理框架,旨在处理大规模、高吞吐量的实时数据流和批量数据。它提供了一种高效、可靠、可扩展的方式来处理和分析实时数据,具有以下特点:
GeekLiHua
2025/01/21
1330
Flink入门:读取Kafka实时数据流,实现WordCount
本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上。通过本文你可以了解如何编写和运行Flink程序。
PP鲁
2020/02/26
5.7K0
Flink入门:读取Kafka实时数据流,实现WordCount
Flink入门宝典(详细截图版)
本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。
用户6070864
2019/09/18
9180
Flink入门宝典(详细截图版)
2021年大数据Flink(十六):流批一体API Connectors ​​​​​​​​​​​​​​Redis
通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。
Lansonli
2021/10/11
9520
Flink 系列:Flink 入门不再难!3000字深入浅出 WordCount 实战及精解
在这个数据驱动的时代,掌握大数据技术成为了每一位开发者必不可少的技能。而在众多技术栈中,Flink无疑占据了重要的位置。作为一个高性能、可扩展的实时数据处理框架,Flink已经成为了很多企业和开发者的首选。但对于初学者来说,Flink的学习曲线可能会显得有些陡峭。因此,我们决定打造一系列通俗易懂的Flink学习文章,希望能帮助大家更快地掌握这一强大的技术。
create17
2024/04/15
5970
Flink 系列:Flink 入门不再难!3000字深入浅出 WordCount 实战及精解
flink 1.11.2 学习笔记(4)-状态示例
接上节继续,今天学习Flink中状态的使用。数据处理的过程中,对当前数据的处理,有时候要依赖前一条数据的值,这种被称为“有状态”的计算。
菩提树下的杨过
2020/12/22
1.1K0
flink 1.11.2 学习笔记(4)-状态示例
Flink的DataSource三部曲之一:直接API
本文是《Flink的DataSource三部曲》系列的第一篇,该系列旨在通过实战学习和了解Flink的DataSource,为以后的深入学习打好基础,由以下三部分组成:
程序员欣宸
2020/05/26
6070
2024年最新Flink教程,从基础到就业,大家一起学习--入门篇
在Java包下创建一个wordcount的包,然后创建一个文件 wc.txt,将下面的文字复制进去
小白的大数据之旅
2024/11/20
1.2K0
2024年最新Flink教程,从基础到就业,大家一起学习--入门篇
2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)(建议收藏!!)
下面为大家带来阿里巴巴极度热推的Flink,实时数仓是未来的方向,学好Flink,月薪过万不是梦!!
Maynor
2021/06/24
2.7K0
2021年大数据Flink(十二):流批一体API Transformation
Apache Flink 1.12 Documentation: Operators
Lansonli
2021/10/11
7000
轻装上阵Flink--在IDEA上开发基于Flink的实时数据流程序
链接:https://pan.baidu.com/s/12rXlY_z_Fck8-NRXdZ5row
壮壮熊
2022/08/18
5740
轻装上阵Flink--在IDEA上开发基于Flink的实时数据流程序
2021年大数据Flink(八):Flink入门案例
Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大
Lansonli
2021/10/11
1.4K0
2021年大数据Flink(二十五):Flink 状态管理
例如,之前下面代码,直接使用即可,不需要像SparkStreaming那样还得自己写updateStateByKey
Lansonli
2021/10/09
7330
推荐阅读
相关推荐
零距离接触Flink:全面解读流计算框架入门与实操指南
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验