我们的一个业务须要有对MR任务的提交和状态跟踪的功能,须要通过Java代码提交一个通用的MR任务(包括mr的jar、配置文件、依赖的第三方jar包),并且须要在提交后跟踪其状态和历史,所以在提交后程序须要拿到改提交的MR的JobID。
首先 可以想到的是通过 ${HADOOP_HOME}/bin/hadoop jar
命令来提交,并从命令执行的标准输出中取到jobID,这样确实可以解决,但是这样做有几个问题:
由于上述原因决定放弃这个方案。
另外 一个方案就是采用Java程序直接提交的方式,这种提交方式也有几个问题:
我们来解决上述两个问题:
通过继承 java.net.URLClassLoader 类实现MR job的类加载器。类加载器有个规则:A类中如果引用了B类,则加载B类的时候会通过A类的类加载器来加载,那么MR类加载器加载MR的主类时,其引用到的所有类也都会通过MR类加载器来加载。
public class MapReduceClassLoader extends URLClassLoader {
private static Logger LOG = LoggerFactory.getLogger(MapReduceClassLoader.class);
public MapReduceClassLoader() {
super(new URL[]{});
}
public MapReduceClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}
public synchronized Class<?> loadClass(String name) throws ClassNotFoundException {
// 判断类是否已被加载,如果已加载则直接返回
Class c = this.findLoadedClass(name);
if (c != null) {
return c;
}
ClassNotFoundException ex = null;
// 如果待加载的类为Job,则进行字节码转换后再加载类
if (name.equals("org.apache.hadoop.mapreduce.Job")) {
byte[] bytes = transformJobBytecode(name);
if (bytes == null) {
ex = new ClassNotFoundException("Transform job bytecode failed.");
} else {
c = defineClass(name, bytes, 0, bytes.length);
}
}
if (c == null) {
// 如果待加载的是JDK提供的系统类,则由父类加载器去完成,这里的父类加载器是sun.misc.Launcher.AppClassLoader
if (ClassPathUtils.isSystemClass(name)) {
try {
c = this.getParent().loadClass(name);
} catch (ClassNotFoundException e) {
ex = e;
}
}
// 当前类加载器来进行加载
if (c == null) {
try {
c = findClass(name);
} catch (Exception e) {
ex = new ClassNotFoundException(e.getMessage());
}
}
// 当前类加载器加载不到,尝试由父类加载器来完成
if (c == null && this.getParent() != null) {
try {
c = this.getParent().loadClass(name);
} catch (ClassNotFoundException e) {
ex = e;
}
}
}
if (c == null) {
throw ex;
} else {
LOG.info("loaded " + c + " from " + c.getClassLoader());
return c;
}
}
/**
* 添加由该类加载的classpath
* @param classPath
*/
public void addClassPath(String classPath) {
URL[] cpUrls = ClassPathUtils.getClassPathURLs(classPath);
for (URL cpUrl : cpUrls) {
addURL(cpUrl);
}
}
/**
* 转换Job类的字节码
* @param jobClassName
* @return
*/
private byte[] transformJobBytecode(String jobClassName) {
String path = jobClassName.replace('.', '/').concat(".class");
InputStream is = getResourceAsStream(path);
if (is == null) {
return null;
}
try {
byte[] b = getBytes(is);
ClassReader cr = new ClassReader(b);
ClassWriter cw = new ClassWriter(cr, 0);
cr.accept(new JobAdapter(cw), 0);
return cw.toByteArray();
} catch (IOException e) {
}
return null;
}
/**
* 从流对象中过去字节码
* @param is
* @return
* @throws IOException
*/
private byte[] getBytes(InputStream is) throws IOException {
try {
int available = is.available();
byte[] bytes = new byte[available];
int pos = 0;
byte[] buf = new byte[1024];
int len;
while ((len = is.read(buf)) != -1) {
System.arraycopy(buf, 0, bytes, pos, len);
pos += len;
if (pos >= available) {
break;
}
}
return bytes;
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
}
}
}
}
}
通过ASM来实现对Job类的字节码的修改,ASM的使用可以参考文档:asm4-guide
实现的ClassVisitor代码如下:
public class JobAdapter extends ClassVisitor {
public static final String JOB_FIELD_NAME = "currentJob";
public static final String JOB_FIELD_DESC = "Lorg/apache/hadoop/mapreduce/Job;";
private String owner;
private boolean isInterface;
public JobAdapter(ClassVisitor cv) {
super(ASM5, cv);
}
@Override
public void visit(int version, int access, String name, String signature,
String superName, String[] interfaces) {
super.visit(version, access, name, signature, superName, interfaces);
owner = name;
isInterface = (access & ACC_INTERFACE) == 1;
}
@Override
public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
MethodVisitor mv = super.visitMethod(access, name, desc, signature, exceptions);
if (isInterface || mv == null) {
return mv;
}
// 选取Job(JobConf conf)类构造方法注入赋值当前对象给currentJob属性的操作
if (access == 0
&& name.equals("<init>")
&& desc.equals("(Lorg/apache/hadoop/mapred/JobConf;)V")) {
return new JobMethodAdapter(mv);
}
return mv;
}
@Override
public void visitEnd() {
if (!isInterface) {
// 添加全局静态属性currentJob
FieldVisitor fv = super.visitField(
(ACC_PUBLIC | ACC_STATIC),
JOB_FIELD_NAME,
JOB_FIELD_DESC,
null, null);
if (fv != null) {
fv.visitEnd();
}
}
super.visitEnd();
}
// 类构造方法修改类
class JobMethodAdapter extends MethodVisitor {
public JobMethodAdapter(MethodVisitor mv) {
super(ASM5, mv);
}
@Override
public void visitInsn(int opcode) {
// 在构造方法返回之前注入赋值操作
if (opcode >= IRETURN && opcode <= RETURN) {
super.visitVarInsn(ALOAD, 0);
super.visitFieldInsn(PUTSTATIC, owner, JOB_FIELD_NAME, JOB_FIELD_DESC);
}
super.visitInsn(opcode);
}
@Override
public void visitMaxs(int maxStack, int maxLocals) {
super.visitMaxs(maxStack + 1, maxLocals);
}
}
}
下面是测试代码
arg[0] 为hadoop依赖(包括hadoop配置文件,jar等)+ MR主类jar包 + MR配置文件等 Classpath
arg[1] 为MR依赖的第三方Jar包classpath
public static void main(String[] args) {
String classPath = args[0];
String libJars = args[1];
try {
MapReduceClassLoader cl = new MapReduceClassLoader();
cl.addClassPath(classPath);
cl.addClassPath(libJars);
System.out.println("URLS:" + Arrays.toString(cl.getURLs()));
Thread.currentThread().setContextClassLoader(cl);
// 加载MR主类
Class mainClass = cl.loadClass("xxx.Main");
Method mainMethod = mainClass.getMethod("main", new Class[] { String[].class });
String libJarsParam = ClassPathUtils.getClassPathWithDotSep(libJars);
// 设置MR依赖的Jar包
String[] firstParams = new String[] { "-libjars", libJarsParam };
Object[] params = new Object[] {firstParams};
// 调用主类main方法
mainMethod.invoke(null, params);
// 测试获取Job的全局属性currentJob的值
Class jobClass = cl.loadClass("org.apache.hadoop.mapreduce.Job");
System.out.println(jobClass.getClassLoader());
Field field = jobClass.getField(JobAdapter.JOB_FIELD_NAME);
System.out.println(field.get(null));
} catch (Exception e) {
e.printStackTrace();
}
}