在Kafka客户端与服务端通信的过程中,为了正确地发出、读取不同命令,需要定义通信的格式。org.apache.kafka.common.protocol包就负责该功能。
包内有以下成员:
比较重要的几个类是ApiKeys、Schema、struct、Field,我们逐个理解。
Type是抽象类,定义了多个接口,主要是write和read,对ByteBuffer进行读写。Type内定义了多个内部静态类,比如Type.BOOLEAN, Type.INT8, Type.INT16。 我们看看Type.BOOLEAN的实现,write操作就是简单地往ByteBuffer写入byte类型的0或1,read操作就是读取一个byte并转换为Boolean类型。
如图
ArrayOf 继承了Type,但本身表示type的数组。其write方法首先为数组的每个元素调用write,再写入数组长度;read方法首先读取数组长度,再依次读取数组的每个元素。
/**
* Represents a type for an array of a particular type
*/
public class ArrayOf extends Type {
private final Type type;
private final boolean nullable;
public ArrayOf(Type type) {
this(type, false);
}
public static ArrayOf nullable(Type type) {
return new ArrayOf(type, true);
}
private ArrayOf(Type type, boolean nullable) {
this.type = type;
this.nullable = nullable;
}
@Override
public boolean isNullable() {
return nullable;
}
@Override
public void write(ByteBuffer buffer, Object o) {
if (o == null) {
buffer.putInt(-1);
return;
}
Object[] objs = (Object[]) o;
int size = objs.length;
buffer.putInt(size);
for (Object obj : objs)
type.write(buffer, obj);
}
@Override
public Object read(ByteBuffer buffer) {
int size = buffer.getInt();
if (size < 0 && isNullable())
return null;
else if (size < 0)
throw new SchemaException("Array size " + size + " cannot be negative");
if (size > buffer.remaining())
throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available");
Object[] objs = new Object[size];
for (int i = 0; i < size; i++)
objs[i] = type.read(buffer);
return objs;
}
Field类的意思是"值域"
Field类的结构图示如下,其中defaultValue为虚线,因为该属性在hasDefaultValue为false时不存在。
各种Field的继承类与type类型的对应关系如下:
Schema顾名思义,就是格式的意思,按顺序定义了一个格式中多个值域的顺序。它继承了Type类,可对ByteBuffer进行读写操作。
/**
* The schema for a compound record definition
*/
public class Schema extends Type {
private final BoundField[] fields;
private final Map<String, BoundField> fieldsByName;
/**
* Construct the schema with a given list of its field values
*
* @throws SchemaException If the given list have duplicate fields
*/
public Schema(Field... fs) {
this.fields = new BoundField[fs.length];
this.fieldsByName = new HashMap<>();
for (int i = 0; i < this.fields.length; i++) {
Field def = fs[i];
if (fieldsByName.containsKey(def.name))
throw new SchemaException("Schema contains a duplicate field: " + def.name);
this.fields[i] = new BoundField(def, this, i);
this.fieldsByName.put(def.name, this.fields[i]);
}
}
...
/**
* Read a struct from the buffer
*/
@Override
public Struct read(ByteBuffer buffer) {
Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
try {
objects[i] = fields[i].def.type.read(buffer);
} catch (Exception e) {
throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
return new Struct(this, objects);
}
/**
* A field definition bound to a particular schema.
*/
public class BoundField {
public final Field def;
final int index;
final Schema schema;
public BoundField(Field def, Schema schema, int index) {
this.def = def;
this.schema = schema;
this.index = index;
}
@Override
public String toString() {
return def.name + ":" + def.type;
}
}
图示如下:
维护了values和schema两种变量,分别代表一组值域和可以解析它的格式。
/**
* A record that can be serialized and deserialized according to a pre-defined schema
*/
public class Struct {
private final Schema schema;
private final Object[] values;
提供各种setter和getter,都是按照Schema填入values,和从values取出值域。
/**
* Get the record value for the field with the given name by doing a hash table lookup (slower!)
*
* @param name The name of the field
* @return The value in the record
* @throws SchemaException If no such field exists
*/
public Object get(String name) {
BoundField field = schema.get(name);
if (field == null)
throw new SchemaException("No such field: " + name);
return getFieldOrDefault(field);
}
public Object[] getArray(String name) {
return (Object[]) get(name);
}
/**
* Return the value of the given pre-validated field, or if the value is missing return the default value.
*
* @param field The field for which to get the default value
* @throws SchemaException if the field has no value and has no default.
*/
private Object getFieldOrDefault(BoundField field) {
Object value = this.values[field.index];
if (value != null)
return value;
else if (field.def.hasDefaultValue)
return field.def.defaultValue;
else if (field.def.type.isNullable())
return null;
else
throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value.");
}
AbstractResponse提供toStruct和parseResponse,负责Struct与Abstract之间的转换。
AbstractRequest同理,此处不赘述
ApiKeys是enum类型,有很多个实例。它为很多组Api的不同版本,定义了请求和响应的格式。每个Api,比如PRODUCE、FETCH等,都分为请求和响应两部分,它们各自有一个格式,在不同版本下的格式还不同。
ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, false, requestSchemas, responseSchemas);
}
ApiKeys(int id, String name, boolean clusterAction, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, requestSchemas, responseSchemas);
}
ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic,
Schema[] requestSchemas, Schema[] responseSchemas) {
if (id < 0)
throw new IllegalArgumentException("id must not be negative, id: " + id);
this.id = (short) id;
this.name = name;
...
this.requestSchemas = requestSchemas;
this.responseSchemas = responseSchemas;
}
外界经常调用的是parseRequest和parseResponse,根据版本来解析请求/响应。 parseResponse的行为:
parseRequest同理。因此ApiKeys下的每个实例(PRODUCE、FETCH等)都能根据版本解析请求/响应
public Schema requestSchema(short version) {
return schemaFor(requestSchemas, version);
}
public Schema responseSchema(short version) {
return schemaFor(responseSchemas, version);
}
public Struct parseRequest(short version, ByteBuffer buffer) {
return requestSchema(version).read(buffer);
}
public Struct parseResponse(short version, ByteBuffer buffer) {
return responseSchema(version).read(buffer);
}
protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
int bufferPosition = buffer.position();
try {
return responseSchema(version).read(buffer);
} catch (SchemaException e) {
if (version != fallbackVersion) {
buffer.position(bufferPosition);
return responseSchema(fallbackVersion).read(buffer);
} else
throw e;
}
}
private Schema schemaFor(Schema[] versions, short version) {
if (!isVersionSupported(version))
throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
return versions[version];
}
parseStruct... 调用ApiKeys::parseResponse将ByteBuffer解析为Struct。调用中有两个细节:
fields[i].def.type.read
这么长的调用,违反迪米特法则了啊)// ApiKeys.java
public Struct parseResponse(short version, ByteBuffer buffer) {
return responseSchema(version).read(buffer);
}
// Schema.java
/**
* Read a struct from the buffer
*/
@Override
public Struct read(ByteBuffer buffer) {
Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
try {
objects[i] = fields[i].def.type.read(buffer);
} catch (Exception e) {
throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
(e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
}
}
return new Struct(this, objects);
}
AbstractResponse.parseResponse 不赘述,可选择某个Response查看实现。