一、hadoop搭建 1、修改主机名 2、ip等 3、主机映射 4、关闭防火墙(两种) 5、ssh免密登录(免密脚本) 6、安装jdk 配置环境变量 7、安装hadoop 配置文件的修改 hadoop-env.sh JAVA_HOME core-site.xml 客户端参数 namenode在哪里 hdfs-site.xml namenode和datanode存放的目录 mapred-site.xml 经过重命名之后才得到的文件,提交任务到哪里 yarn-site.xml 配置resourcemanager在哪里,资源的多少 HADOOP环境变量 8、安装程序分发集群 hosts文件 jdk安装文件 /etc.profile hadoop安装文件 9、namenode进行格式化 hadoop namenode -format 10、启动测试 start-dfs.sh hadoop-deams.sh 单独启动单台机器的进程 start-yarn 是在resourcemaneger启动的 二、shell命令操作hdfs
启动hadoop :start-dfs.sh
常用命令参数介绍 Shell客户端启动
Hadoop fs hdfs dfs
-help 功能:输出这个命令参数手册 -ls 功能:显示目录信息 示例: hadoop fs -ls hdfs://hadoop-server01:9000/ 备注:这些参数中,所有的hdfs路径都可以简写 -->hadoop fs -ls / 等同于上一条命令的效果 -mkdir 功能:在hdfs上创建目录 示例:hadoop fs -mkdir -p /aaa/bbb/cc/dd -moveFromLocal 功能:从本地剪切粘贴到hdfs 示例:hadoop fs - moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc/dd -moveToLocal 功能:从hdfs剪切粘贴到本地 示例:hadoop fs - moveToLocal /aaa/bbb/cc/dd /home/hadoop/a.txt --appendToFile 功能:追加一个文件到已经存在的文件末尾 示例:hadoop fs -appendToFile ./hello.txt hdfs://hadoop-server01:9000/hello.txt 可以简写为: Hadoop fs -appendToFile ./hello.txt /hello.txt
-cat 功能:显示文件内容 示例:hadoop fs -cat /hello.txt
-tail 功能:显示一个文件的末尾 示例:hadoop fs -tail /weblog/access_log.1 -text 功能:以字符形式打印一个文件的内容 示例:hadoop fs -text /weblog/access_log.1 -chgrp -chmod -chown 功能:linux文件系统中的用法一样,对文件所属权限 示例: hadoop fs -chmod 666 /hello.txt hadoop fs -chown someuser:somegrp /hello.txt -copyFromLocal 功能:从本地文件系统中拷贝文件到hdfs路径去 示例:hadoop fs -copyFromLocal ./jdk.tar.gz /aaa/ -copyToLocal 功能:从hdfs拷贝到本地 示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz -cp 功能:从hdfs的一个路径拷贝hdfs的另一个路径 示例: hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
-mv 功能:在hdfs目录中移动文件 示例: hadoop fs -mv /aaa/jdk.tar.gz / -get 功能:等同于copyToLocal,就是从hdfs下载文件到本地 示例:hadoop fs -get /aaa/jdk.tar.gz -getmerge 功能:合并下载多个文件 示例:比如hdfs的目录 /aaa/下有多个文件:log.1, log.2,log.3,... hadoop fs -getmerge /aaa/log.* ./log.sum -put 功能:等同于copyFromLocal 示例:hadoop fs -put /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2
-rm 功能:删除文件或文件夹 示例:hadoop fs -rm -r /aaa/bbb/
-rmdir 功能:删除空目录 示例:hadoop fs -rmdir /aaa/bbb/ccc -df 功能:统计文件系统的可用空间信息 示例:hadoop fs -df -h /
-du 功能:统计文件夹的大小信息 示例: hadoop fs -du -s -h /aaa/*
-count 功能:统计一个指定目录下的文件节点数量 示例:hadoop fs -count /aaa/
-setrep 功能:设置hdfs中文件的副本数量 示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz 三、 java操作hdfs(Maven)
1.pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.pengpeng</groupId>
<artifactId>bigdata36-hadoop</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<!-- maven打包插件jdk的版本 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.简单的增删改查操作
package hdfs.test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import javax.annotation.WillClose;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class hdfsDemo {
FileSystem fs =null;
@Before
public void init() throws Exception{
//连接hdfs
Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://hadoop01:9000"),conf,"root");
}
/**
* 将本地文件上传到hdfs
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testUpLoad() throws Exception{
fs.copyFromLocalFile(new Path("D:\\data\\http.log"), new Path("/"));
}
/**
* 从hdfs下载文件
* @throws Exception
* @throws IllegalArgumentException
*/
@Test
public void testDownLoad() throws Exception{
fs.copyToLocalFile(new Path("/http.log"), new Path("d:/http"));
}
/**
* 删除hdfs文件
* @throws Exception
* @throws IllegalArgumentException
*/
@Test
public void TestDel() throws Exception{
fs.delete(new Path("/http.log"),true);
}
/**
* 创建文件夹
* @throws IOException
* @throws IllegalArgumentException
* @throws Exception
*/
@Test
public void testMkdir() throws IllegalArgumentException, IOException{
fs.mkdirs(new Path("/files"));
}
/**
* 改名字和移动文件
* @throws IOException
* @throws IllegalArgumentException
* @throws Exception
*/
@Test
public void rename() throws IllegalArgumentException, IOException{
//如果文件夹不存在,移动不会成功,也不会报错。
//fs.rename(new Path("/test.sh"), new Path("/test2.sh"));
fs.rename(new Path("/test2.sh"), new Path("/files/test.sh"));
}
/**
* 查看文件状态
* @throws IOException
* @throws IllegalArgumentException
* @throws Exception
*/
@Test
public void testStatus() throws IllegalArgumentException, IOException{
/*FileStatus status = fs.getFileStatus(new Path("/files"));
System.out.println(status);*/
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()){
LocatedFileStatus file = listFiles.next();
System.out.println(file.getLen());
System.out.println(file.getBlockSize());
System.out.println(file.getPath());
System.out.println(file.getAccessTime()); //最后修改时间
System.out.println(file.getReplication()); //副本
System.out.println("******************");
//输出每个块的存储位置
BlockLocation[] blockLocations = file.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
System.out.println(blockLocation);
}
System.out.println("++++++++++++++++++++++++");
}
}
/**
* 遍历文件夹
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
* @throws Exception
*/
@Test
public void listStatus() throws FileNotFoundException, IllegalArgumentException, IOException{
FileStatus[] fileStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus2 : fileStatus) {
if(fileStatus2.isDirectory())
System.out.println("文件夹");
if(fileStatus2.isFile())
System.out.println("文件");
}
}
@After
public void close() throws Exception{
fs.close();
}
}
3.简单的流输入与输出
package hdfs.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class StreamTest {
FileSystem fs =null;
@Before
public void init() throws IOException{
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
fs = FileSystem.get(conf);
}
/**
* 向文件中插入数据
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void writeFile() throws IllegalArgumentException, IOException{
FSDataOutputStream outputStream = fs.create(new Path("/writeFile.txt"));
outputStream.writeDouble(3.1415);
outputStream.writeUTF("嘿嘿");
outputStream.write("哈哈".getBytes());
outputStream.close();
}
/**
* 读取文件数据
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void readFile() throws IllegalArgumentException, IOException{
FSDataInputStream inputStream = fs.open(new Path("/writeFile.txt"));
double readDouble = inputStream.readDouble();
String string = inputStream.readUTF();
byte[] b = new byte[6];
int len = 0;
while((len = inputStream.read(b))!=-1){
System.out.println(new String(b));
}
System.out.println(readDouble);
System.out.println(string);
inputStream.close();
}
@After
public void after() throws IOException{
fs.close();
}
}
4.通过流操作将本地文件上传到hdfs,并下载。
package hdfs.test;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
public class homework {
FileSystem fs =null;
@Before
public void before() throws IOException, InterruptedException, URISyntaxException{
Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://hadoop01:9000"),conf,"root");
}
/**
* �������������hdfs
* @throws Exception
*/
@Test
public void UpLoad() throws Exception{
FSDataOutputStream stream = fs.create(new Path("/.txt"));
FileReader fileReader = new FileReader(new File("D:\\data\\好友.txt"));
int len = 0;
while((len=fileReader.read())!=-1){
stream.write(len);
}
stream.close();
fileReader.close();
}
/**
* ������好友
* @throws Exception
*/
@Test
public void downlod() throws Exception{
//打开
FSDataInputStream stream = fs.open(new Path("/好友.txt"));
FileWriter fw = new FileWriter("d:/好友.txt");
int len = 0;
while((len = stream.read())!=-1){
fw.write(len);
}
fw.close();
stream.close();
}
}
5.定时上传日志;日志超过24小时删除
TimerTest.java(定时控制)
package hdfs.test.day03.log;
import java.util.Timer;
public class TimerTest {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new CollactionTask(), 0, 2*60*1000);
timer.schedule(new CleanTask(), 0, 24*60*60*1000);
}
}
CollactionTask.java(上传日志)
package hdfs.test.day03.log;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* 日志收集步骤:
* 1、从日志目录查看哪些文件需要上传
* 2、把需要上传的文件移动到待上传目录
* 3、上传到hdfs上
* 4、移动到备份目录
* @author hasee
*
*/
public class CollactionTask extends TimerTask{
@Override
public void run() {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
String date = format.format(new Date());
// TODO Auto-generated method stub
//1、查看上传文件
File logDir = new File("d:/testlog/");
File[] listFiles = logDir.listFiles(new FilenameFilter() {
//FileNameFilter 哪些文件是需要获取的
@Override
public boolean accept(File dir, String name) {
// TODO Auto-generated method stub
return name.startsWith("test.log.");
}
});
//2、将文件移动到待上传目录
for (File file : listFiles) {
FileUtils.moveFileToDirectory(file, new File("d:/waitUpload"), true);
}
//3、将待上传的文件逐个上传到hdfs上并移动到备份目录.
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), new Configuration(),"root");
Path depath = new Path("/log/"+date.substring(0, 10));
boolean exists = fs.exists(depath);
//3 判断待上传的目录是否已经存在 不存在则创建一个
if(!exists){
fs.mkdirs(depath);
}
//判断备份目录是否存在
File backDir = new File("d:/backDir/"+date);
boolean exists2 = backDir.exists();
if(!exists2){
backDir.mkdirs();
}
//得到上传的是哪一个服务上的日志文件
String hostName = InetAddress.getLocalHost().getHostName();
//4 遍历待上传的目录
File file = new File("d:/waitUpload");
File[] list = file.listFiles();
for (File f : list) {
//上传到hdfs上
fs.copyFromLocalFile(new Path(f.getPath()), new Path(depath,hostName+"_"+f.getName()+"_"+System.currentTimeMillis()));
//cp到备份目录
FileUtils.moveFileToDirectory(f, backDir, true);
}
}catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
删除超时日志
package hdfs.test.day03.log;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;
import org.apache.commons.io.FileUtils;
public class CleanTask extends TimerTask {
/**
* 清理备份日志
* 1.遍历出来所有的日志记录文件夹
* 2.把文件夹名字 转化为时间
* 3.如果文件夹时间与当前时间 时间差大于24小时,则删除
*/
@Override
public void run() {
try{
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
File file = new File("d:/backDir");
Date date = new Date();
File[] files = file.listFiles();
for (File dir : files) {
String name = dir.getName();
Date date2 = format.parse(name);
if(date.getTime()-date2.getTime() > 24*60*60*1000){
FileUtils.deleteDirectory(dir);
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
6.统计词频(分布式统计)
第一步,分类:将相同的单词交给同一个节点。
MapTask.java
执行的时候,先右键run as-->java application一次,出错。然后右键 run as-->run configurations 配置参数。 1 /word.txt 0 40
package hdfs.test.day04;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class MapTask {
public static void main(String[] args) throws Exception {
/**
* taskId 标识哪台机器运行
* file 统计哪个文件
* startOffSet 从哪个位置开始
* lenth 读多长
*/
int taskId = Integer.parseInt(args[0]);
String file = args[1];
long startOffSet = Long.parseLong(args[2]);
long lenth = Long.parseLong(args[3]);
//连接hdfs
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), new Configuration(),"root");
FSDataInputStream inputStream = fs.open(new Path(file));
//创建输出文件
FSDataOutputStream out_tem_1 = fs.create(new Path("/wordcount/tmp/part-m"+taskId+"-1"),true);
FSDataOutputStream out_tmp_2 = fs.create(new Path("/wordcount/tmp/part-m"+taskId+"-2"),true);
//定位从哪里读
inputStream.seek(startOffSet);
//创建字符缓冲输入流
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
//除了taskId=1的能读第一行,后面的task都需要跳过一行。
if(taskId != 1){
reader.readLine();
}
//读取并且写入
long count = 0;
String line = null;
while((line = reader.readLine())!=null){
String[] split = line.split(" ");
for (String word : split) {
if(word.hashCode()%2 == 0){
out_tem_1.write((word+"\t"+1+"\n").getBytes());
}else{
out_tmp_2.write((word+"\t"+1+"\n").getBytes());
}
//累加每行的数据个数
count += line.length();
if(count > lenth){
break;
}
}
}
reader.close();
out_tem_1.close();
out_tmp_2.close();
fs.close();
}
}
第二步,分布式计算。
Reduce.java
package hdfs.test.day04;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.BR;
public class ReduceTask {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
//获取运行机器代号
int taskId = Integer.parseInt(args[0]);
//创建map用于存储数据
Map<String,Integer> map = new HashMap<>();
//连接hdfs
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"),new Configuration(),"root");
//遍历hdfs文件
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/wordcount/tmp/"), true);
//开始计算
while(listFiles.hasNext()){
LocatedFileStatus file = listFiles.next();
//判断是否是自己需要计算的文件
if(file.getPath().getName().endsWith("-"+taskId)){
//创建读文件对象
FSDataInputStream inputStream = fs.open(file.getPath());
//缓冲流
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = reader.readLine()) != null){
String[] word = line.split("\t");
//计算单词的个数
Integer count = map.getOrDefault(word[0], 0);
count += Integer.parseInt(word[1]);
map.put(word[0], count);
}
inputStream.close();
reader.close();
}
//将结果写入到hdfs
FSDataOutputStream outputStream = fs.create(new Path("/wordcount/result/part-r-"+taskId));
Set<Entry<String,Integer>> entrySet = map.entrySet();
for (Entry<String, Integer> entry : entrySet) {
outputStream.write((entry.getKey()+"="+entry.getValue()+"\n").getBytes());
}
outputStream.close();
}
fs.close();
}
}
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有