log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
public void append(final LogEvent event) {
if (event.getLoggerName().startsWith("org.apache.kafka")) {
LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
} else {
try {
final Layout<? extends Serializable> layout = getLayout();
byte[] data;
if (layout != null) {
if (layout instanceof SerializedLayout) {
final byte[] header = layout.getHeader();
final byte[] body = layout.toByteArray(event);
data = new byte[header.length + body.length];
System.arraycopy(header, 0, data, 0, header.length);
System.arraycopy(body, 0, data, header.length, body.length);
} else {
data = layout.toByteArray(event);
}
} else {
data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
}
manager.send(data);
} catch (final Exception e) {
LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
}
}
}
JsonLayout的话,是走data = layout.toByteArray(event);这一步,而toByteArray是调用 log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/layout/AbstractStringLayout.java
/**
* Formats the Log Event as a byte array.
*
* @param event The Log Event.
* @return The formatted event as a byte array.
*/
@Override
public byte[] toByteArray(final LogEvent event) {
return getBytes(toSerializable(event));
}
protected byte[] getBytes(final String s) {
if (useCustomEncoding) { // rely on branch prediction to eliminate this check if false
return StringEncoder.encodeSingleByteChars(s);
}
try { // LOG4J2-935: String.getBytes(String) gives better performance
return s.getBytes(charsetName);
} catch (final UnsupportedEncodingException e) {
return s.getBytes(charset);
}
}
toSerializable是调用 log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/layout/AbstractJacksonLayout.java
@Override
public String toSerializable(final LogEvent event) {
final StringBuilderWriter writer = new StringBuilderWriter();
try {
toSerializable(event, writer);
return writer.toString();
} catch (final IOException e) {
// Should this be an ISE or IAE?
LOGGER.error(e);
return Strings.EMPTY;
}
}
public void toSerializable(final LogEvent event, final Writer writer)
throws JsonGenerationException, JsonMappingException, IOException {
objectWriter.writeValue(writer, convertMutableToLog4jEvent(event));
writer.write(eol);
markEvent();
}
private static LogEvent convertMutableToLog4jEvent(final LogEvent event) {
// TODO Jackson-based layouts have certain filters set up for Log4jLogEvent.
// TODO Need to set up the same filters for MutableLogEvent but don't know how...
// This is a workaround.
return event instanceof MutableLogEvent
? ((MutableLogEvent) event).createMemento()
: event;
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("error-log");
KStream<String, Log4jLogEventJson> beanStream = source.map(new KeyValueMapper<String, String, KeyValue<String, Log4jLogEventJson>>() {
@Override
public KeyValue<String, Log4jLogEventJson> apply(String key, String value) {
Log4jLogEventJson bean = JSON.parseObject(value,Log4jLogEventJson.class);
return new KeyValue<>(bean.getLoggerName(), bean);
}
});
GenericJsonSerde<ErrorLogStats> statsGenericJsonSerde = new GenericJsonSerde<>(ErrorLogStats.class);
GenericJsonSerde<Log4jLogEventJson> log4jLogEventJsonGenericJsonSerde = new GenericJsonSerde<>(Log4jLogEventJson.class);
beanStream.groupByKey(Serdes.String(),log4jLogEventJsonGenericJsonSerde)
.aggregate(ErrorLogStats::new,
(key, value, stats) -> {
System.out.println("key:"+key.getClass());
System.out.println("value:"+value.getClass());
return stats.add(value);
},
TimeWindows.of(10000).advanceBy(10000),
statsGenericJsonSerde,
"aggregate")
.foreach((k,v) -> {
LOGGER.info("key:{},start:{},end:{},value:{}",k.key(),k.window().start(),k.window().end(),v.errors.size());
//这里进行报警即可
});
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:767) - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_2
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:767) - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90) - key:com.example.demo.controller.DemoController,start:1508060660000,end:1508060670000,value:3
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_2
2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90) - key:com.example.demo.controller.ErrorController,start:1508060660000,end:1508060670000,value:1
自从kafka有了stream之后,感觉可以减少很多技术栈了,比如我可以不用学storm或者spark,就可以直接在kakfa上进行流式操作。关于kafka stream如何进行分布式呢,后续再研究下。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有