前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >某云平台Flink日志按Job名称收集到ES

某云平台Flink日志按Job名称收集到ES

原创
作者头像
用户6404053
修改2023-05-10 23:19:11
2950
修改2023-05-10 23:19:11
举报
文章被收录于专栏:CatororyCatorory

为什么有这篇文章

使用ES来查询业务日志在开发中是非常常见的一种方式,典型的解决方案是ELK,已经非常成熟了。Flink是一个比较年轻的开源项目,已经发展了好几年,但是周边的生态还是不是很完善,比如日志收集其实不太友好,比如session模式想要按job收集日志就做不到,另外比较流行的是yarn和k8s模式,k8s理论上应该是比较容易收日志的,越来越多的公司大量java应用都跑在k8s里。日志收集这块也限制只能用商业产品,不好用。我们希望接到es里

配置文件

原生的log4j配置文件支持yml格式和xml格式,全托管的只支持xml,并且xml配置出来日志信息是单行文本,不能直接进es,并且日志中其实没有job的标识,收到一起也没法区分哪条日志是哪个job的,这显然不行。我们要想日志收集到es可用,就必须解决这两个问题。一是需要json格式,二是需要将job名称带到每一条日志中。

日志收集

经过不断尝试,终于解决了这两个问题,在这里分享一下,核心其实就下面几行配置,通过配置JsonLayout让日志打出来是json格式,然后就是增加时间、host和appName,appName就是job的名字,可以每个任务都自定义一下

代码语言:javascript
复制
<JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true"> 
              <KeyValuePair key="appName" value="LogCollectDemo"/>
              <KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
              <KeyValuePair key="host_name" value="${hostName}"/> 
</JsonLayout>  

收集到的日志长这样,hostname其实没什么用,有用的其实就4个字段,时间、jobname、message、level

代码语言:javascript
复制
{
	"instant": {
		"epochSecond": 1653276334,
		"nanoOfSecond": 185000000
	},
	"thread": "main",
	"level": "INFO",
	"loggerName": "org.apache.flink.configuration.GlobalConfiguration",
	"message": "Loading configuration property: metrics.reporter.promappmgr.class, org.apache.flink.metrics.prometheus.PrometheusReporter",
	"endOfBatch": false,
	"loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
	"contextMap": {},
	"threadId": 1,
	"threadPriority": 5,
	"source": {
		"class": "org.apache.flink.configuration.GlobalConfiguration",
		"method": "loadYAMLResource",
		"file": "GlobalConfiguration.java",
		"line": 228
	},
	"appName": "LogCollectDemo",
	"@timestamp": "2023-01-01 16:45:33.376",
	"host_name": "job-abc-123456-taskmanager-1-1"
}

收集到oss之后,可以找台服务器拉下来再用filebeat收走丢到kafka,也可以走sls丢kafka,然后就随便玩了

完整的配置如下,这个配置日志是输出到oss的,sls还没买,等测试过了再写出来,但和oss的应该大同小异

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
strict="true" packages="com.ververica.platform.logging.appender" status="WARN">
  <Appenders>
    <Appender name="StdOut" type="Console">
      <Layout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{GMT+8} %-5p %-60c %x - %m%n" type="PatternLayout"/>
    </Appender>
    <Appender name="RollingFile" type="RollingFile" fileName="${sys:log.file}" filePattern="${sys:log.file}.%i">
      <JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true"> 
              <KeyValuePair key="appName" value="LogCollectDemo"/>
              <KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
              <KeyValuePair key="host_name" value="${hostName}"/> 
      </JsonLayout>   
      <Policies>
        <SizeBasedTriggeringPolicy size="20 MB"/>
      </Policies> 
      <DefaultRolloverStrategy max="4"/>
    </Appender> 
     
    <Appender name="OSS" type="OSS">
      <JsonLayout properties="true" locationInfo="true" compact="true" eventEol="true">
              <KeyValuePair key="appName" value="LogCollectDemo"/>
              <KeyValuePair key="@timestamp" value="${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
              <KeyValuePair key="host_name" value="${hostName}"/>
      </JsonLayout> 
      <Property name="namespace">{{ namespace }}</Property> <!-- Do not modify this line -->
      <Property name="baseUri">oss://YOUR-BUCKET-NAME/</Property>
      <Property name="endpoint">https://YOUR-ENDPOINT</Property> 
      <Property name="flushIntervalSeconds">60</Property> 
      <Property name="flushIntervalEventCount">200</Property> 
      <Property name="rollingBytes">10000000</Property> 
    </Appender>
  </Appenders> 
  <Loggers>
    <Logger level="INFO" name="org.apache.hadoop"/> 
    <Logger level="INFO" name="org.apache.kafka"/> 
    <Logger level="INFO" name="org.apache.zookeeper"/> 
    <Logger level="INFO" name="akka"/> 
    <Logger level="ERROR" name="org.jboss.netty.channel.DefaultChannelPipeline"/> 
    <Logger level="OFF" name="org.apache.flink.runtime.rest.handler.job.JobDetailsHandler"/>
    {%- for name, level in userConfiguredLoggers -%}
      <Logger level="{{ level }}" name="{{ name }}"/>
    {%- endfor -%}
    <Root level="{{ rootLoggerLogLevel }}">
      <AppenderRef ref="StdOut"/>
      <AppenderRef ref="RollingFile"/> 
      <AppenderRef ref="OSS"/>
    </Root>
  </Loggers>
</Configuration>

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么有这篇文章
  • 配置文件
  • 日志收集
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档