应用于:对于不同用户创建的表目录,进行文件的下载,程序中执行hadoop cat命令 下载文件到本地,随后通过ftp传至目标服务器,并将hdfs文件目录的修改时间存入mysql中。每次修改前将mysql中记录的数据,与本批次下载的HDFS文件路径修改时间对比,如果改变,则决定是否下载文件:
入口:
1 package edm.spark.download.edm.spark.download;
2
3 import java.io.BufferedReader;
4 import java.io.InputStreamReader;
5 import java.util.Date;
6 import java.util.List;
7 import org.apache.hadoop.fs.Path;
8
9 import edm.spark.download.edm.spark.util.HdfsFileProcessor;
10 import edm.spark.download.edm.spark.util.JdbcDirectUtils;
11
12 public class FileDownload {
13
14 public static void main(String[] args) throws Exception {
15 String local_path = args[0];//"/home/hdfs/ysy/";
16 String hdfs_path = args[1];//"hdfs://hdp/user/";
17 ;
18 HdfsFileProcessor fileProcessor = new HdfsFileProcessor();
19 List<String> userLists = fileProcessor.getUserUnderFolder(hdfs_path);
20 List<Path> listPath = fileProcessor.getFileUnderFolder(userLists);
21 if (null != listPath && listPath.size() > 0) {
22 for (Path path : listPath) {
23 String pathName = path.toString();
24 String[] nameList = pathName.split("/");
25 String time = JdbcDirectUtils.DateTimeFormat(new Date());
26 String tableName = nameList[nameList.length - 1] + "_" + time
27 + ".txt";
28 String userName = nameList[nameList.length - 3];
29 Process ps = null;
30 try {
31 // 提交本地进程
32 ps = Runtime.getRuntime().exec(
33 local_path + "download.sh " + pathName + " "
34 + tableName + " " + userName);
35 System.out.println(local_path + "download.sh " + pathName
36 + " " + tableName);
37 // 更新mysql中记录的时间
38 JdbcDirectUtils jdbcForTime = new JdbcDirectUtils();
39 long dateTime = jdbcForTime
40 .queryDate("select modify_time,path from download_time where path="
41 + "'" + path.toString() + "'");
42 long insertTime = fileProcessor.getModifycationTime(path);
43 if (dateTime != 0) {
44 jdbcForTime.updateDateTime(insertTime, pathName);
45 } else {
46 // 第一次插入写入当前文件目录时间
47 jdbcForTime.insertDate(insertTime, path.toString());
48 }
49 jdbcForTime.destroy();
50 } catch (Exception e) {
51 e.printStackTrace();
52 }
53 BufferedReader br = new BufferedReader(new InputStreamReader(
54 ps.getInputStream()));
55 String line;
56 StringBuffer sb = new StringBuffer();
57 while ((line = br.readLine()) != null) {
58 sb.append(line).append("\n");
59 }
60 String result = sb.toString();
61 System.out.println(result);
62 ps.destroy();
63 }
64 } else {
65 System.out.println("no file to download");
66
67 }
68 // submit download cmd
69 }
70 }
HdfsFileProcessor:
1 package edm.spark.download.edm.spark.util;
2
3 import java.io.IOException;
4 import java.sql.SQLException;
5 import java.util.List;
6
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.fs.FileStatus;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.ipc.RemoteException;
12 import org.apache.hadoop.security.AccessControlException;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 import com.google.common.collect.Lists;
17
18 public class HdfsFileProcessor {
19
20 static final Logger logger = LoggerFactory.getLogger(HdfsFileProcessor.class);
21
22 protected FileSystem fileSystem;
23
24 private Configuration conf;
25
26 public HdfsFileProcessor(){
27 init();
28 }
29
30 public void init(){
31 conf = new Configuration();
32 conf.addResource("resources/hdfs-site.xml");
33 conf.addResource("resources/core-site.xml");
34 try {
35 fileSystem = FileSystem.get(conf);
36 } catch (IOException e) {
37 logger.error("init error.......",e);
38 e.printStackTrace();
39 }
40 }
41
42 public final boolean checkFile(String filePath){
43 boolean exists = false;
44 try{
45 Path path = new Path(filePath);
46 exists = fileSystem.exists(path);
47 }catch(IOException e){
48 logger.error("",e);
49 }catch(Exception e){
50 logger.error("",e);
51 }
52 return exists;
53 }
54
55 public List<Path> getFileUnderFolder(List<String> names) throws IOException, SQLException{
56 JdbcDirectUtils jdbcForTime = new JdbcDirectUtils();
57 List<Path> paths = Lists.newArrayList();
58 for(String name : names){
59 Path folderPath = new Path("hdfs://hdp/user/" + name +"/");
60 if(fileSystem.exists(folderPath)){
61 try{
62 FileStatus[] fileStatus = fileSystem.listStatus(folderPath);
63 for(int i = 0; i< fileStatus.length;i++){
64 FileStatus fileStatu = fileStatus[i];
65 Path path = fileStatu.getPath();
66 if(path.toString().contains("tosas")){
67 FileStatus[] tableStatus = fileSystem.listStatus(path);
68 for(int j = 0; j < tableStatus.length;j++){
69 FileStatus tableStatu = tableStatus[i];
70 Path tablePath = tableStatu.getPath();
71 long modifycationTime = fileSystem.getFileStatus(tablePath).getModificationTime();
72 long dataTime = jdbcForTime.queryDate("select modify_time,path from download_time where path="
73 +"'"
74 +tablePath.toString()
75 +"'");
76 if(modifycationTime > dataTime){
77 paths.add(tablePath);
78 }
79 }
80 }
81 }
82 }catch(RemoteException e){
83 logger.error("",e);
84 }catch(AccessControlException e){
85 logger.error("",e);
86 }
87 }
88 }
89
90 return paths;
91 }
92
93 /**
94 * 查找文件目录属于哪个用户
95 * @param path
96 * @return
97 * @throws IOException
98 */
99 public long getModifycationTime(Path path) throws IOException{
100 long modifycationTime = fileSystem.getFileStatus(path).getModificationTime();
101 return modifycationTime;
102 }
103
104 public List<String> getUserUnderFolder(String Path) throws Exception{
105 List<String> userList = Lists.newArrayList();
106 Path userPath = new Path(Path);
107 if(fileSystem.exists(userPath)){
108 FileStatus[] fileStatus = fileSystem.listStatus(userPath);
109 for(int i = 0 ;i< fileStatus.length;i++){
110 FileStatus fileStatu = fileStatus[i];
111 String path = fileStatu.getPath().toString();
112 String pathes[] = path.split("/");
113 if(pathes.length > 4){
114 userList.add(pathes[4]);
115 }
116 }
117 }
118 return userList;
119
120 }
121
122 public void destory() throws IOException{
123 if(fileSystem != null){
124 fileSystem.close();
125 }
126 fileSystem = null;
127 }
128 }
JdbcDirectUtils:
1 package edm.spark.download.edm.spark.util;
2
3 import java.io.IOException;
4 import java.sql.DriverManager;
5 import java.sql.ResultSet;
6 import java.sql.SQLException;
7 import java.text.SimpleDateFormat;
8 import java.util.Date;
9 import java.util.Map;
10
11 import com.google.common.collect.Maps;
12 import com.mysql.jdbc.Connection;
13 import com.mysql.jdbc.Statement;
14
15 public class JdbcDirectUtils {
16
17 private static Connection conn ;
18
19 private Statement stmt;
20
21 private String file_dir = "/template/download_mysql.txt";
22
23 private Map<String,String> jdbcConfMap = Maps.newHashMap();
24
25 private LoadHdfsConf mysqlConf;
26
27 public JdbcDirectUtils(){
28 initDriver();
29 }
30
31 public void initDriver(){
32 try{
33 if(conn == null){
34 mysqlConf = new LoadHdfsConf();
35 jdbcConfMap = mysqlConf.readHdfsFile(file_dir);
36 Class.forName("com.mysql.jdbc.Driver");
37 String url = "jdbc:mysql://" + jdbcConfMap.get("url") + ":"
38 + jdbcConfMap.get("port") + "/"
39 + jdbcConfMap.get("schema") + "?user="
40 + jdbcConfMap.get("user") + "@password="
41 + jdbcConfMap.get("password")
42 + "&useUnicode=true&characterEncoding="
43 + jdbcConfMap.get("characterEncoding");
44 conn = (Connection) DriverManager.getConnection(url);
45
46 }
47 }catch(ClassNotFoundException e){
48 e.printStackTrace();
49 }catch(IOException e){
50 e.printStackTrace();
51 }catch(SQLException e){
52 e.printStackTrace();
53 }
54 }
55
56 /**
57 * 查询最新更新记录
58 * @param date
59 * @param path
60 * @throws SQLException
61 */
62 public void updateDateTime(long date,String path) throws SQLException{
63 stmt.executeUpdate("update download_time set modify_time=" + date + "where path="+"'" + path + "'");
64 }
65
66 public long queryDate(String sql) throws SQLException{
67 ResultSet rs = stmt.executeQuery(sql);
68 long dateTime = 0;
69 while(rs.next()){
70 dateTime = rs.getLong("modify_time");
71 }
72 return dateTime;
73 }
74
75 public void insertDate(Long date,String path) throws SQLException{
76 stmt.executeUpdate("insert into download_time(path,modify_time) values " + "('" + path + "'" + "," + date + ")");
77 }
78
79 /**
80 * String格式转Long
81 * @param date
82 * @return
83 */
84 public long convert2Long(String date){
85 long time = 0;
86 String format = "yyyyMMdd";
87 SimpleDateFormat sf = new SimpleDateFormat(format);
88 try{
89 time = sf.parse(date).getTime();
90 }catch(java.text.ParseException e){
91 e.printStackTrace();
92 }
93 return time;
94 }
95
96 public static String DateTimeFormat(Date date){
97 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
98 String time = sdf.format(date);
99 return time;
100 }
101
102 public void destroy() throws SQLException{
103 if(conn != null){
104 conn.close();
105 }
106 conn = null;
107 }
108 }
LoadHdfsConf:
package edm.spark.download.edm.spark.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class LoadHdfsConf {
static final Logger logger = LoggerFactory.getLogger(LoadHdfsConf.class);
protected FileSystem fileSystem;
public final boolean checkFile(String filePath){
boolean exists = false;
try{
Path path = new Path(filePath);
exists = fileSystem.equals(path);
}catch(Exception e){
logger.error("",e);
}
return exists;
}
public Map<String,String> readHdfsFile(String hdfsPath) throws IOException{
Configuration conf = new Configuration();
conf.addResource("resources/hdfs-site.xml");
conf.addResource("resources/core-site.xml");
fileSystem = FileSystem.get(conf);
Path path = new Path(hdfsPath);
InputStream in = fileSystem.open(path);
List<String> lines = IOUtils.readLines(in);
if(null == lines || lines.isEmpty()){
return null;
}
Map<String,String> map = Maps.newHashMap();
int rowNum = 0;
for(String line : lines){
rowNum++;
String[] content = line.split("=");
String code = content[0];
String value = content[1];
if(StringUtils.isEmpty(line) || StringUtils.isEmpty(value)){
logger.error("{}",rowNum,line);
continue;
}
map.put(code, value);
}
return map;
}
}