最近,有一个之前的同事联系我,说有一个Flink相关的问题,问问看有没有好的解决方案,首先说下朋友的问题背景。
背景:
有一个Flink任务实时消费sls(阿里的一款中间件)中的数据,进行处理转换。最近因流量上涨了一些,导致这个任务极其不稳定,经常挂掉。在资源有限的情况下,想问问看有没有什么手段优化下。
经过多番交流得到了几个关键重要信息:
1、首先这个任务是 Flink datastream api写的 jar任务,非SQL任务
2、其次,这个任务呢其实并不需要用到所有的sls中的数据,只需要用到一小部分
看下第一轮交流结论:
1、我提议,先看下sls支不支持 条件下推,提前在sls端过滤一部分数据后再flink消费。ps 虽然我之前在阿里混了一段时间,但是我是真不记得这个了。哈哈
2、为啥我有这个提议,因为我朋友说,flink sql的sls connector是有这个query参数的,可以做到 谓词下推。所以才有了我的建议。
经过一轮交流,朋友说,暂时没找到里面有设置query的地方,我就猜想到可能两边的实现逻辑不一致?
过了一天,我实际去看了下官方的connector源码,好像确实没有,底层通过游标去一次次读取的,还有一堆状态管理。
我建议如果说朋友有精力,那就在官方connector的源码基础上修改下加入这个query参数,看下是不是可以做到。
所以第二轮的结果就是:
修改connector 源码不行就新自定义一个
又经过一天,朋友说复杂,搞不定。
我再次去找寻答案,后面突然想到,其实 Flink 在 代码里也是可以引入 使用table api的,如果说 source保持table api不变,然后读进来后,再转成datastream模式的api,再接着后面的操作,应该是可以走通的。
所以就有了第三轮建议
所以经过我的建议,朋友再次发起了尝试,我也并且给了 demo代码
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Properties;
public class SlsToKafkaExample {
public static void main(String[] args) throws Exception {
// 初始化 Flink 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 注册 SLS Source 表(使用查询下推)
tableEnv.executeSql(
"CREATE TABLE sls_logs (\n" +
" __time TIMESTAMP(3),\n" +
" level STRING,\n" +
" message STRING,\n" +
" user STRING\n" +
") WITH (\n" +
" 'connector' = 'aliyun-log',\n" +
" 'project' = 'your-project',\n" +
" 'logstore' = 'your-logstore',\n" +
" 'endpoint' = 'cn-hangzhou.log.aliyuncs.com',\n" +
" 'access.id' = 'your-access-key-id',\n" +
" 'access.key' = 'your-access-key-secret',\n" +
" 'consumer.group' = 'flink-consumer-group',\n" +
" 'query' = 'level:ERROR',\n" +
" 'start.time' = 'now - 30m'\n" +
")"
);
// 用 Table API 查询数据
Table resultTable = tableEnv.sqlQuery(
"SELECT level, message, user FROM sls_logs WHERE user IS NOT NULL"
);
// 将 Table 转换为 DataStream<Row>
DataStream<Row> rowStream = tableEnv.toDataStream(resultTable);
// 做一个简单的 Row → String 转换
DataStream<String> jsonStream = rowStream.map(row -> {
String level = String.valueOf(row.getField("level"));
String message = String.valueOf(row.getField("message"));
String user = String.valueOf(row.getField("user"));
return String.format("{\"level\":\"%s\", \"message\":\"%s\", \"user\":\"%s\"}", level, message, user);
});
// Kafka Producer 配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
"sls-flink-output", // Kafka topic
new SimpleStringSchema(), // 序列化 schema
kafkaProps
);
// Sink 到 Kafka
jsonStream.addSink(kafkaSink);
// 执行 Flink 作业
env.execute("SLS Table API to Kafka");
}
}当然到这里有杠精说,还不如去 直接改成 sql的 模式,对,你们说的都对,但是,可能里面有很多复杂的逻辑呢?他们没人或者没有太多的时间去改造呢?
说了这么多,其实就是想表达下,有的时候需要学会灵活灵用。