前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >添加kerberos后,Flink任务的运行认证及各组件的认证

添加kerberos后,Flink任务的运行认证及各组件的认证

作者头像
码客说
发布于 2023-08-11 08:00:05
发布于 2023-08-11 08:00:05
1.6K00
代码可运行
举报
文章被收录于专栏:码客码客
运行总次数:0
代码可运行

Kerberos安装配置

https://www.psvmc.cn/article/2022-11-08-bigdata-kerberos-centos.html

Flink任务认证

flink on yarn

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
flink run \
  -yD security.kerberos.login.keytab=/root/psvmc.keytab \
  -yD security.kerberos.login.principal=psvmc/hadoop@HADOOP.COM \
  yxzt-data-tcs-1.0-SNAPSHOT-jar-with-dependencies.jar -job /root/zjhome/test.json

认证原理

  1. flink程序启动,自动将keytab文件自动上传hdfs,由yarn管理,分发给每个executor缓存token,定时刷新。
  2. 基于以上原理,当自定义RichSinkFunction里需要是使用基于kerberos认证的组件时,不需要再做认证操作。
  3. 比如:hive、hbase、kudu等等,直接建立连接就可以访问

Hive JDBC认证

需要两个文件

  • 配置文件krb5.conf
  • 认证文件krb5.keytab,一般由服务器生成后获取

放到resources目录下

Kerberos认证

指定krb5配置文件:krb5.conf,根据实际情况替换

认证文件:krb5.keytab,根据实际情况替换

认证用户:hive,根据实际情况修改

这里是通过将配置文件和认证文件拷贝到临时目录进行认证,可以根据需要指定固定目录认证

使用项目中的配置

认证方法KerberosAuth.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;

public class KerberosAuth {
    private static final Logger log = LoggerFactory.getLogger(KerberosAuth.class);
    // kerberos配置文件,从服务上获取
    private static final String krbConfig = "krb5.conf";
    // kerberos认证文件
    private static final String krbKeytab = "psvmc.keytab";
    // kerberos认证用户
    private static final String principal = "psvmc/hadoop@HADOOP.COM";

    public static void init() {
        initkerberos();
    }

    public static void initkerberos() {
        log.info("Kerberos 登陆验证");
        try {
            // java临时目录,window为C:\Users\登录用户\AppData\Local\Temp\,linux为/tmp,需要根据情况添加斜杠
            String javaTempDir = System.getProperty("java.io.tmpdir");
            String tempDir = Paths.get(javaTempDir, "krb_" + System.currentTimeMillis()).toString();
            String configPath = getTempPath(tempDir, krbConfig);
            String keytabPath = getTempPath(tempDir, krbKeytab);
            log.error(configPath);
            log.error(keytabPath);
            System.setProperty("java.security.krb5.conf", configPath);//设置krb配置文件路径,注意一定要放在Configuration前面,不然不生效
            Configuration conf = new Configuration();
            conf.set("hadoop.security.authentication", "Kerberos");//设置认证模式Kerberos
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(principal, keytabPath);//设置认证用户和krb认证文件路径
            log.error("Kerberos 验证成功");
        } catch (Exception e) {
            log.error("Kerberos 验证失败", e);
        }
    }

    /**
     * 复制文件并根据文件名称获取文件路径(解决jar包不支持获取resource下文件问题)
     *
     * @param tempPath 临时目录
     * @param fileName 文件名称
     * @return 文件临时路径
     */
    @SuppressWarnings("ResultOfMethodCallIgnored")
    public static String getTempPath(String tempPath, String fileName) {
        InputStream in = KerberosAuth.class.getResourceAsStream("/" + fileName);
        String pathAll = tempPath + File.separator + fileName;
        File file = new File(pathAll);
        File tempPathFile = new File(tempPath);
        if (!tempPathFile.exists()) {
            tempPathFile.mkdirs();
        }
        try {
            copyInputStreamToFile(in, pathAll);
        } catch (Exception e) {
            log.error("getTempPath", e);
        }
        return file.getPath();
    }

    private static void copyInputStreamToFile(InputStream is, String strFileFullPath) throws IOException {
        long size = 0;
        BufferedInputStream in = new BufferedInputStream(is);
        BufferedOutputStream out = new BufferedOutputStream(Files.newOutputStream(Paths.get(strFileFullPath)));
        int len = -1;
        byte[] b = new byte[1024];
        while ((len = in.read(b)) != -1) {
            out.write(b, 0, len);
            size += len;
        }
        in.close();
        out.close();
        //修改文件的访问权限
        changeFolderPermission(strFileFullPath);
    }

    private static void changeFolderPermission(String dirPath) {
        File dirFile = new File(dirPath);
        dirFile.setReadable(true, false);
        dirFile.setExecutable(true, false);
        dirFile.setWritable(true, false);
    }

    public static void main(String[] args) {
        KerberosAuth.init();
    }
}

使用服务器上的配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;

public class KerberosAuthServer {
    private static final Logger log = LoggerFactory.getLogger(KerberosAuthServer.class);

    public static boolean initkerberos(String principal, String keytabPath) {
        log.info("Kerberos 登陆验证");
        try {
            log.error(principal);
            log.error(keytabPath);
            Configuration conf = new Configuration();
            conf.set("hadoop.security.authentication", "Kerberos");//设置认证模式Kerberos
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(principal, keytabPath);//设置认证用户和krb认证文件路径
            log.error("Kerberos 验证成功");
            return true;
        } catch (Exception e) {
            log.error("Kerberos 验证失败", e);
            return false;
        }
    }


    private static void connectHive() throws SQLException, ClassNotFoundException {
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection connection = DriverManager
                .getConnection("jdbc:hive2://hadoop01:10000/zdb;principal=hdfs/hadoop01@HADOOP.COM");
        PreparedStatement ps = connection.prepareStatement("show databases");
        ResultSet rs = ps.executeQuery();
        while (rs.next()) {
            System.out.println(rs.getString(1));
        }
        rs.close();
        ps.close();
        connection.close();
    }


    public static void main(String[] args) throws SQLException, ClassNotFoundException {
        boolean isAuth = KerberosAuthServer.initkerberos("hdfs/hadoop01@HADOOP.COM", "/data/tools/bigdata/kerberos/hdfs.keytab");
        if (isAuth) {
            connectHive();
        }
    }
}

JDBC连接

Hive中配置Kerberos认证后,JDBC连接要进行kerberos认证。

认证后JDBC的URL也要添加认证相关的配置

如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
jdbc:hive2://192.168.7.101:10000/zdb;principal=psvmc/hadoop@HADOOP.COM

其中

principal:

  • hive 用户名
  • hostname:主机名,也可以理解为组
  • PSVMC.CN:realms和krb5.conf文件里一致即可

工具类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.gientech.schedule.config.KerberosConnect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.sql.*;
import java.util.*;
 
public class HiveUtils {
    private static Logger logger = LoggerFactory.getLogger(HiveUtils.class.getName());
 
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://192.168.7.101:10000/zdb;principal=psvmc/hadoop@HADOOP.COM";//端口默认10000
 
    /**
     * 获取Connection
     * @return conn
     * @throws SQLException
     * @throws ClassNotFoundException
     */
 
    public static Connection getConnection() throws SQLException {
        Connection conn = null;
        try {
            KerberosAuth.init();
            conn = DriverManager.getConnection(url);
        } catch (SQLException e) {
            logger.info("获取数据库连接失败!");
            throw e;
        }
        return conn;
    }
 
    // 创建数据库
    public static void createDatabase(String databaseName) throws Exception {
        String sql = "create database "+databaseName;
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 查询所有数据库
    public static void showDatabases() throws Exception {
        String sql = "show databases";
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1));
        }
        closeConnection(rs,stmt,conn);
    }
 
    /**
     * 创建表(分割符为“,”)
     * 如create table tableName(name string,sex string) row format delimited fields terminated by ','
     * @param sql
     * @throws Exception
     */
    public static void createTable(String sql) throws Exception {
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 查询所有表
    public static void showTables() throws Exception {
        String sql = "show tables";
        logger.info("Running: " + sql);
        getConnection();
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1));
        }
        closeConnection(rs,stmt,conn);
    }
 
    // 查看表结构
    public static void descTable(String tableName) throws Exception {
        String sql = "desc formatted "+tableName;
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1) + "\t" + rs.getString(2));
        }
        closeConnection(rs,stmt,conn);
    }
 
    // 加载数据(请确保文件权限)
    public static void loadData(String filePath,String tableName) throws Exception {
        String sql = "load data inpath '" + filePath + "' into table tableName";
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 查询数据
    public static void selectData(String sql) throws Exception {
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1));
        }
        closeConnection(rs,stmt,conn);
    }
 
    // 删除数据库
    public static void dropDatabase(String databaseName) throws Exception {
        String sql = "drop database if exists "+databaseName;
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 删除数据库表
    public static void deopTable(String tableName) throws Exception {
        String sql = "drop table if exists "+tableName;
        logger.info("Running: " + sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
 
    public static Map<String,Object> queryMapBySql(String sql){
        //定义数据库连接
        Connection conn = null;
        //定义PreparedStatement对象
        PreparedStatement ps = null;
        //定义查询的结果集
        ResultSet rs = null;
        try {
            conn = getConnection();
            //定义执行的sql语句
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();
            return getMapFromResultSet(rs);
        } catch (Exception e) {
            logger.info("queryDataListBySql"+e.getMessage());
        }finally {
            closeConnection(rs,ps,conn);
        }
        return Collections.emptyMap();
    }
 
    /**
     * 关闭ResultSet、Statement、Connection
     *
     * @param rs
     * @param stmt
     * @param con
     */
 
    public static void closeConnection(ResultSet rs, Statement stmt, Connection con) {
        closeResultSet(rs);
        closeStatement(stmt);
        closeConnection(con);
    }
 
    /**
     * 关闭ResultSet
     *
     * @param rs
     */
 
    public static void closeResultSet(ResultSet rs) {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                logger.info(e.getMessage());
            }
        }
    }
 
    /**
     * 关闭Statement
     *
     * @param stmt
     */
 
    public static void closeStatement(Statement stmt) {
        if (stmt != null) {
            try {
                stmt.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
    }
 
    /**
     * 关闭Connection
     *
     * @param con
     */
 
    public static void closeConnection(Connection con) {
        if (con != null) {
            try {
                con.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
    }
 
    /**
     * 将resultset结果转为sonObject
     * @param rs ResultSet
     * @return List
     * @throws SQLException 异常
     */
    public static Map<String,Object> getMapFromResultSet(ResultSet rs)
            throws SQLException {
        Map<String,Object> hm = new HashMap();
        ResultSetMetaData rsmd = rs.getMetaData();
        int count = rsmd.getColumnCount();// 获取列的数量
        while(rs.next()) {
            for (int i = 1; i <= count; i++) {
                String key = rsmd.getColumnLabel(i);
                Object value = rs.getObject(i);
                hm.put(key, value);
            }
        }
        return hm;
    }
 
    public static List<Map<String,Object>> queryListBySql(String sql){
        //定义数据库连接
        Connection conn = null;
        //定义PreparedStatement对象
        PreparedStatement ps = null;
        //定义查询的结果集
        ResultSet rs = null;
        try {
            conn = getConnection();
            //定义执行的sql语句
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();
            return getListFromResultSet(rs);
        } catch (Exception e) {
            logger.info("queryDataListBySql"+e.getMessage());
        }finally {
            closeConnection(rs,ps,conn);
        }
        return Collections.emptyList();
    }
 
    /**
     * 将resultset结果转为list
     * @param rs ResultSet
     * @return List
     * @throws SQLException 异常
     */
    private static List<Map<String,Object>> getListFromResultSet(ResultSet rs)
            throws SQLException {
        List<Map<String,Object>> results= new ArrayList<>();//结果数据
        ResultSetMetaData metaData = rs.getMetaData(); // 获得列的结果
        List<String> colNameList= new ArrayList<>();
        int cols_len = metaData.getColumnCount(); // 获取总的列数
        for (int i = 0; i < cols_len; i++) {
            colNameList.add(metaData.getColumnName(i+1));
        }
        while (rs.next()) {
            Map<String, Object> map= new HashMap<>();
            for(int i=0;i<cols_len;i++){
                String key=colNameList.get(i);
                Object value=rs.getString(colNameList.get(i));
                map.put(key, value);
            }
            results.add(map);
        }
        return results;
    }
 
    public static void main(String[] args) throws Exception {
        String sql = "SELECT * FROM `t1` LIMIT 1";
        List<Map<String, Object>> maps = queryListBySql(sql);
        logger.info(maps.toString());
    }
}

服务器上测试

认证

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kinit -kt /data/tools/bigdata/kerberos/hdfs.keytab hdfs/hadoop01@HADOOP.COM

查看认证状态

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
klist

hive认证连接

在服务器上测试

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
hive

使用JDBC

之前

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
beeline -n hive -u jdbc:hive2://hadoop01:10000/default

注意一定要添加双引号,否则无效

配置中设置的

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
beeline -n hive -u "jdbc:hive2://hadoop01:10000/zdb;principal=hdfs/hadoop01@HADOOP.COM"

目前测试的只能使用配置文件中设置的用户才能连接。

查询

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
show databases;

工具类

异常类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ZRuntimeException extends RuntimeException {
    public ZRuntimeException(String format, Object... objs) {
        super(String.format(format, objs));
    }
}

读取配置类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

public class ZLoadConfig {
    public static String getHadoopConfigRootPath() {
        String conf = System.getenv("HADOOP_CONF_DIR");
        if (conf == null) {
            String hh = System.getenv("HADOOP_HOME");
            if (hh == null) {
                throw new ZRuntimeException("找不到配置文件");
            }
            conf = hh + "/etc/hadoop";
        }
        return conf;
    }

    public static String getZKConfigRootPath() {
        String conf = System.getenv("ZK_HOME");
        if (conf != null) {
            conf += "/conf";
        }
        return conf;
    }

    public static String getHbaseConfigRootPath() {
        String conf = System.getenv("HBASE_HOME");
        if (conf != null) {
            conf += "/conf";
        }
        return conf;
    }

    public static Configuration loadHDFS() throws ZRuntimeException {
        String conf = getHadoopConfigRootPath();
        Configuration config = new Configuration();
        config.addResource(new Path(conf + "/core-site.xml"));
        config.addResource(new Path(conf + "/hdfs-site.xml"));
        return config;
    }

    public static YarnConfiguration loadYarn() throws ZRuntimeException {
        String conf = getHadoopConfigRootPath();
        YarnConfiguration config = new YarnConfiguration();
        config.addResource(new Path(conf + "/core-site.xml"));
        config.addResource(new Path(conf + "/hdfs-site.xml"));
        config.addResource(new Path(conf + "/yarn-site.xml"));
        return config;
    }

    public static Configuration loadHbase() {
        String hadoopConfPath = getHadoopConfigRootPath();
        String hbaseConfPath = getHbaseConfigRootPath();
        Configuration config = HBaseConfiguration.create();
        config.addResource(new Path(hadoopConfPath + "/core-site.xml"));
        config.addResource(new Path(hadoopConfPath + "/hdfs-site.xml"));
        config.addResource(new Path(hadoopConfPath + "/yarn-site.xml"));
        config.addResource(new Path(hbaseConfPath += "/hbase-site.xml"));
        return config;
    }
}

测试类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;

import javax.security.auth.login.LoginContext;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;


public class ZKerberosUtil {

    public static void loginZK() {
        try {
            // 设置身份认证服务配置
            System.setProperty("java.security.auth.login.config", ZLoadConfig.getZKConfigRootPath() + "/jaas.conf");
            // 使用身份认证服务配置中的模块登录
            LoginContext context = new LoginContext("Client");
            context.login();
            // 创建zk客户端并创建watcher
            ZooKeeper zk = new ZooKeeper("hadoop01:2181", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println(event.getPath() + " : " + event.getState().toString());
                }

            });
            while (zk.getState() != ZooKeeper.States.CONNECTED) {
                Thread.sleep(500);
            }
            System.out.println("连接到zk");
            List<String> ss = zk.getChildren("/", true);
            ss.forEach((s) -> {
                System.out.println(s);
            });
            zk.close();
            context.logout();
        } catch (Exception e) {
            // 处理异常
            e.printStackTrace();
        }
    }

    public static boolean initkerberos(String principal, String keytabPath) {
        try {
            Configuration conf = new Configuration();
            conf.set("hadoop.security.authentication", "Kerberos");//设置认证模式Kerberos
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(principal, keytabPath);//设置认证用户和krb认证文件路径
            System.out.println("Kerberos 验证成功");
            return true;
        } catch (Exception e) {
            System.out.println("Kerberos 验证失败" + e.getMessage());
            return false;
        }
    }

    public static void loginHdfs(String principal, String keytabPath) throws IOException {
        Configuration conf = ZLoadConfig.loadHDFS();
        System.out.println(conf.get("fs.defaultFS"));
        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] fsStatus = fs.listStatus(new Path("/"));
        for (FileStatus st : fsStatus) {
            System.out.println(st.getPath());
        }
    }


    public static void loginYarn(String principal, String keytabPath) throws IOException, YarnException {
        YarnConfiguration conf = ZLoadConfig.loadYarn();
        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
        YarnClient yc = YarnClient.createYarnClient();
        yc.init(conf);
        yc.start();

        List<ApplicationReport> applications = yc.getApplications();
        applications.forEach((a) -> {
            System.out.println(a.getApplicationId());
        });
        yc.close();
    }

    public static void loginHive(String principal, String keytabPath) throws SQLException, IOException, ClassNotFoundException {
        Configuration conf = ZLoadConfig.loadHDFS();
        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromKeytab(
                principal,
                keytabPath
        );
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection connection = DriverManager
                .getConnection("jdbc:hive2://hadoop01:10000/yxdp_ys;principal=hdfs/hadoop01@HADOOP.COM");
        PreparedStatement ps = connection.prepareStatement("show databases");
        ResultSet rs = ps.executeQuery();
        while (rs.next()) {
            System.out.println(rs.getString(1));
        }
        rs.close();
        ps.close();
        connection.close();
    }

    public static void loginHbase(String principal, String keytabPath) throws IOException, SQLException {
        Configuration conf = ZLoadConfig.loadHbase();
        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromKeytab(
                principal,
                keytabPath
        );
        org.apache.hadoop.hbase.client.Connection con = ConnectionFactory.createConnection(conf);
        NamespaceDescriptor[] nds = con.getAdmin().listNamespaceDescriptors();
        for (NamespaceDescriptor nd : nds) {
            System.out.println(nd.getName());
        }
    }

    public static void loginPhoenix(String principal, String keytabPath) throws IOException, ClassNotFoundException, SQLException {
        Configuration conf = ZLoadConfig.loadHbase();
        Properties prop = new Properties();
        conf.getValByRegex(".*?").forEach(prop::setProperty);
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");

        // kerberos环境下Phoenix的jdbc字符串为 jdbc:phoenix:zk:2181:/znode:principal:keytab
        String url = "jdbc:phoenix:hadoop01,hadoop02,hadoop03:/hbase:" + principal + ":" + keytabPath;
        PhoenixConnection con = DriverManager.getConnection(url, prop).unwrap(PhoenixConnection.class);
        Statement stmt = con.createStatement();
        ResultSet rs = stmt.executeQuery("SELECT * FROM SYSTEM.CATALOG");
        int n = rs.getMetaData().getColumnCount();
        for (int i = 0; i < n; i++) {
            String cn = rs.getMetaData().getColumnName(n);
            System.out.println(cn);
        }
        rs.close();
        stmt.close();
        con.close();
    }
}

总结

首先在配置的时候注意一下几个环境变量的设置

  • ZK_HOME
  • HADOOP_HOME
  • HBASE_HOME

ZK直接使用的配置文件认证。

jaas.conf

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/tools/bigdata/kerberos/hdfs.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/hadoop01@HADOOP.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/tools/bigdata/kerberos/hdfs.keytab"
  storeKey=true
  useTicketCache=false
  principal="cli@HADOOP.COM";
};

HDFS、YARN、Hive、Hbase都使用Hadoop的认证。

Phoenix使用胖客户端形式,自身认证。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
添加kerberos后,Flink任务的运行认证及Hive使用JDBC连接的认证
https://www.psvmc.cn/article/2022-11-08-bigdata-kerberos-centos.html
码客说
2023/08/08
1.2K0
如何使用Java连接Kerberos的Phoenix
本文主要介绍如何使用Java代码访问Kerberos环境下的Phoenix5.0。
soundhearer
2020/10/15
2.3K0
如何使用Java连接Kerberos的Phoenix
Java代码连接带kerberos的Impala集群
目前impala的认证方式支持两种:用户名密码和kerberos,由于impala的表数据一般是存在HDFS上的,所以很多时候,impala集群也会开启kerberos的认证,初次新接入Impala的小伙伴,可能会对kerberos比较头疼,这里将通过一个简单的例子来告诉大家,如何在代码中访问带kerberos的impala集群。废话不多说,直接上代码:
skyyws
2022/05/20
1.1K0
如何使用java连接Kerberos和非kerberos和kerberos的Spark1.6 ThriftServer
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github: https://github.com/fayson/cdhproject 提示:代码块部分可以左右滑动查看噢 1.文档编写目的 ---- 前面Fayson介绍了《如何在CDH中启用Spark Thrift》和《如何在Kerberos环境下的CDH集群部署Spark1.6 Thrift及spark-sql客户端》,本篇文章Fayson主要介绍如何使用Java JDBC连接非Kerberos和Kerberos环境下Sp
Fayson
2018/07/11
2K0
基于Kerberos环境下,使用Java连接操作Hive
虽然可以使用 Hive 服务本身的 Principal 与 keytab 来连接 Hive ,但使用服务本身的 principal 不具有普遍性,所以还是建议使用自定义的 Principal 。
create17
2019/08/20
10.1K1
大数据之Phoenix使用代码或客户端连接
查看表的 TABLE_SCHEM 发现有些表这个属性为空。 那么如果你没有指定自动映射命名空间,就会报错。
码客说
2023/08/10
1.4K0
如何使用HAProxy实现Kerberos环境下的Impala负载均衡
前面Fayson介绍过《如何使用HAProxy实现Impala的负载均衡》,在Kerberos环境HAProxy的配置与非Kerberos环境下是一样的,只是在Impala的配置上需要做一些修改,接下来本篇文件主要讲述如何在Kerberos环境下使用HAProxy实现Impala的负载均衡。
Fayson
2018/03/29
1.7K0
如何使用HAProxy实现Kerberos环境下的Impala负载均衡
如何在Kerberos环境下使用Haproxy实现HiveServer2负载均衡
前面Fayson介绍了《如何使用HAProxy实现HiveServer2负载均衡》,本文主要介绍如何使用HAProxy实现Kerberos环境下HiveServer2的负载均衡。
Fayson
2018/03/29
1.7K0
如何使用java代码通过JDBC连接Hive(附github源码)
前面我们讲过《如何使用java代码通过JDBC连接Impala(附Github源码)》,本篇文章主要讲述如何使用Java代码通过JDBC的方式连接Hive。
Fayson
2018/03/29
7.4K0
如何使用java代码通过JDBC连接Hive(附github源码)
如何使用java代码通过JDBC连接Impala(附Github源码)
访问Impala的方式很多(如:impala-shell、ODBC、JDBC、Beeline),也可以通过Hue的来访问。关于Beeline方式连接Impala可以参考前面的《如何使用Beeline连接Impala》,本篇文章主要讲述如何使用JAVA代码通过JDBC的方式连接Kerberos和非Kerberos环境下的Impala。
Fayson
2018/03/29
7.2K0
如何使用java代码通过JDBC连接Impala(附Github源码)
如何使用Java连接Kerberos的HBase
出于CDH集群安全考虑,在CDH集群中增加了Kerberos认证机制。因为HBase的存储系统是基于Hadoop的存储,所以通过HBase客户端访问HBase数据库时需要进行身份认证。在Linux下使用HBase客户端访问HBase数据时需要先kinit初始化Kerberos账号,认证完成后我们就可以直接使用HBase shell操作HBase了。通过Linux的Kinit命令可以方便的完成Kerberos的认证,那么在Java开发中如何完成Kerberos的登录认证呢?本篇文章主要讲述如何使用Java连接Kerberos环境的HBase。
Fayson
2018/03/29
6K0
如何使用Java连接Kerberos的HBase
hive metastore配置kerberos认证
hive从3.0.0开始提供hive metastore单独服务作为像presto、flink、spark等组件的元数据中心。但是默认情况下hive metastore在启动之后是不需要进行认证就可以访问的。所以本文基于大数据组件中流行的kerberos认证方式,对hive metastore进行认证配置。
从大数据到人工智能
2022/01/18
1.2K0
hive metastore配置kerberos认证
java 版本SQLHelper
import java.sql.*; import java.util.logging.*; import javax.swing.table.*;
jack.yang
2025/04/05
690
Spark2Streaming读Kerberos环境的Kafka并写数据到HBase
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github: https://github.com/fayson/cdhproject 提示:代码块部分可以左右滑动查看噢 1.文档编写目的 ---- 在前面的文章Fayson介绍了在Kerberos环境下《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》,本篇文章Fayson主要介绍如何使用Spark2Streaming访问Kerberos环境的Kafka并将接收到的Kafka数据写入HBa
Fayson
2018/07/12
2.3K0
java连接kerberos用户认证
亲测可用,之前搜索了很多博客,啥样的都有,就是不介绍报错以及配置用处,根本不懂照抄那些配置是干啥的,稀里糊涂的按照博客搭完也跑不起来,因此记录这个。
刘大猫
2025/01/16
1830
时间夹缝中写文章之java-JDBC
讲真的 学了三天的积分 还不如晚上回宿舍写会代码更舒心 高数积分真的太难了 可是越难越要学,张宇老师说过 好多事情本身就是矛盾的 今天抽出来一点时间写一点jdbc吧 不然公众号就会和今天的天气一样慢慢就全是灰了 java链接mysql实现一些操作 直接上代码吧 首先Command接口类的源码 package com.imooc.hrapp.command; import java.sql.SQLException; public interface Command { public
Tom2Code
2022/04/15
2170
时间夹缝中写文章之java-JDBC
Flink开发-Mysql数据导入Hive中
Mysql中ResultSet默认会将一次查询的结果存入内存中。如果数据量比较大,就会占用大量的内存。如果内存不够,就会报错。
码客说
2023/03/06
2K0
HttpURLConnection获取开启kerberos的HDFS等组件的JMX信息
由于安全原因,需要对HDFS UI等端口进行限制访问,也就是配置kerberos认证,在core-site.xml文件中进行如下配置:
从大数据到人工智能
2022/04/17
5990
HttpURLConnection获取开启kerberos的HDFS等组件的JMX信息
0554-6.1.0-同一java进程中同时访问认证和非认证集群的问题(续)
Fayson在前面的文章《0553-6.1.0-如何使用Java代码同时访问安全和非安全CDH集群》,本篇文章介绍在同一Java进程中,通过多线程同时访问Kerberos认证集群和非认证集群时出现的一些异常及解决方法。
Fayson
2019/11/28
2.2K0
使用数据连接池进行数据库操作
void disconnect(Connection conn)用于断开数据库连接
顾翔
2025/06/06
1220
使用数据连接池进行数据库操作
推荐阅读
相关推荐
添加kerberos后,Flink任务的运行认证及Hive使用JDBC连接的认证
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档