首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >换一个思路思考问题,可能Flink问题就迎刃而解了

换一个思路思考问题,可能Flink问题就迎刃而解了

作者头像
睡前大数据
发布2025-07-12 11:17:20
发布2025-07-12 11:17:20
790
举报
文章被收录于专栏:睡前大数据睡前大数据

    最近,有一个之前的同事联系我,说有一个Flink相关的问题,问问看有没有好的解决方案,首先说下朋友的问题背景。

    背景:

    有一个Flink任务实时消费sls(阿里的一款中间件)中的数据,进行处理转换。最近因流量上涨了一些,导致这个任务极其不稳定,经常挂掉。在资源有限的情况下,想问问看有没有什么手段优化下。

经过多番交流得到了几个关键重要信息:

1、首先这个任务是 Flink datastream api写的 jar任务,非SQL任务

2、其次,这个任务呢其实并不需要用到所有的sls中的数据,只需要用到一小部分

看下第一轮交流结论:

1、我提议,先看下sls支不支持 条件下推,提前在sls端过滤一部分数据后再flink消费。ps 虽然我之前在阿里混了一段时间,但是我是真不记得这个了。哈哈

2、为啥我有这个提议,因为我朋友说,flink sql的sls connector是有这个query参数的,可以做到 谓词下推。所以才有了我的建议。

经过一轮交流,朋友说,暂时没找到里面有设置query的地方,我就猜想到可能两边的实现逻辑不一致?

过了一天,我实际去看了下官方的connector源码,好像确实没有,底层通过游标去一次次读取的,还有一堆状态管理。

我建议如果说朋友有精力,那就在官方connector的源码基础上修改下加入这个query参数,看下是不是可以做到。

所以第二轮的结果就是:

修改connector 源码不行就新自定义一个

又经过一天,朋友说复杂,搞不定。

图片
图片

我再次去找寻答案,后面突然想到,其实 Flink 在 代码里也是可以引入 使用table api的,如果说 source保持table api不变,然后读进来后,再转成datastream模式的api,再接着后面的操作,应该是可以走通的。

所以就有了第三轮建议

图片
图片
图片
图片

所以经过我的建议,朋友再次发起了尝试,我也并且给了 demo代码

代码语言:javascript
复制
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class SlsToKafkaExample {

    public static void main(String[] args) throws Exception {
        // 初始化 Flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 注册 SLS Source 表(使用查询下推)
        tableEnv.executeSql(
            "CREATE TABLE sls_logs (\n" +
            "  __time TIMESTAMP(3),\n" +
            "  level STRING,\n" +
            "  message STRING,\n" +
            "  user STRING\n" +
            ") WITH (\n" +
            "  'connector' = 'aliyun-log',\n" +
            "  'project' = 'your-project',\n" +
            "  'logstore' = 'your-logstore',\n" +
            "  'endpoint' = 'cn-hangzhou.log.aliyuncs.com',\n" +
            "  'access.id' = 'your-access-key-id',\n" +
            "  'access.key' = 'your-access-key-secret',\n" +
            "  'consumer.group' = 'flink-consumer-group',\n" +
            "  'query' = 'level:ERROR',\n" +
            "  'start.time' = 'now - 30m'\n" +
            ")"
        );

        // 用 Table API 查询数据
        Table resultTable = tableEnv.sqlQuery(
            "SELECT level, message, user FROM sls_logs WHERE user IS NOT NULL"
        );

        // 将 Table 转换为 DataStream<Row>
        DataStream<Row> rowStream = tableEnv.toDataStream(resultTable);

        // 做一个简单的 Row → String 转换
        DataStream<String> jsonStream = rowStream.map(row -> {
            String level = String.valueOf(row.getField("level"));
            String message = String.valueOf(row.getField("message"));
            String user = String.valueOf(row.getField("user"));
            return String.format("{\"level\":\"%s\", \"message\":\"%s\", \"user\":\"%s\"}", level, message, user);
        });

        // Kafka Producer 配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");

        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
            "sls-flink-output",                  // Kafka topic
            new SimpleStringSchema(),           // 序列化 schema
            kafkaProps
        );

        // Sink 到 Kafka
        jsonStream.addSink(kafkaSink);

        // 执行 Flink 作业
        env.execute("SLS Table API to Kafka");
    }
}

    当然到这里有杠精说,还不如去 直接改成 sql的 模式,对,你们说的都对,但是,可能里面有很多复杂的逻辑呢?他们没人或者没有太多的时间去改造呢?

    说了这么多,其实就是想表达下,有的时候需要学会灵活灵用。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 睡前大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档