验证MapReduce
运行mp自带的WordCount程序,
客户端可解压查看源码:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1-source.jar
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
由上可知WordCount类运行需要两个参数:
- 文件输入路径(存在):
/user/CTX/mpinput
- 结果输出路径(不存在):
/user/CTX/mpoutput
运行时要保证RecourseManager
运行(start-yarn.sh
),
命令:bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jar wordcount /myinput/ /myoutput bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /user/CTX/mpinput /user/CTX/mpoutput
wordcount大小写问题
1.由字节码examples-3.2.1.jar
中的META_INF/MANIFEST.MF
可得入口类Main-Class org.apache.hadoop.examples.ExampleDriver
2.由源码source.jar
中的ExampleDriver
可得:
public class ExampleDriver {
public static void main(String argv[]){
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("wordcount", WordCount.class,
"A map/reduce program that counts the words in the input files.");
//...
exitCode = pgd.run(argv);
}
catch(Throwable e){
e.printStackTrace();
}
System.exit(exitCode);
}
}
找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster
Hadoop3.x版本出现此报错解决方案:
1.命令行输入:hadoop classpath
2.yarn-site.xml
中添加配置参数
<property>
<name>yarn.application.classpath</name>
<value>【命令返回的classpath】</value>
</property>
3.重启yarn(stop-yarn.sh
、start-yarn.sh
)
相关端口
centos01:9870
:查看hdfscentos01:8088
:查看resourceManagercentos01:19888
:查看mapreuce历史运行情况
需要手动启动:sbin/mr-jobhistory-daemon.sh start historyserver
WordCount源码分析
public class WordCount {
public static class TokenizerMapper
/**
* Mapper<输入key, 输入value, 输出key, 输出value>
* 输入key:当前读取某一行数据 距离文件起始为值的 偏移量
* 输入value:当前某一行数据的内容
* 输出key:某个单词
* 输出value:某个单词出现的次数
*/
extends Mapper<Object, Text, Text, IntWritable>{
//计数器
private final static IntWritable one = new IntWritable(1);
//文本获取,相当于String
private Text word = new Text();
/**
*map(输入key, 输入value, Context context)
* @param key:某一行偏移量
* @param value:某一行内容
* @param context:上下文对象:map -shuffle-reduce
*/
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
//以空白字符为分隔符拆分
StringTokenizer itr = new StringTokenizer(value.toString());
//迭代每个元素
while (itr.hasMoreTokens()) {
//类型转换String->Test
word.set(itr.nextToken());
//给每个元素标“1”,传给下一个阶段shuffle(默认):key-单词,value-单词对应1的数组集[1,1,1],再到Reduce
context.write(word, one);
}
}
}
public static class IntSumReducer
/**
* Reducer<输入key, 输入value(元素类型 1), 输出key, 输出value>
*/
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
/**
* reduce(输入key, 输入values(集合 [1,1,1]),Context context)
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
//出现次数
int sum = 0;
for (IntWritable val : values) {
//累加数组中的每个 1
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
//自动加载配置文件
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//输入、输出参数判断
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);//字符串类型
job.setOutputValueClass(IntWritable.class);//次数类型
//输入路径
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
//输出路径
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MapReduce流程详解
处理流程:
文件
→ (按行拆分成多个)切片
→FileInputformat
(默认,继承自)
Inputformat<K,V>
类中的
RecordReader<K,V>
对象中的
nextKeyValue()
方法 读取每一行中的偏移量和内容
→MapTask-0/MapTask-1/...(每个切片由一个线程操作)
→Mapper类中真正的
map(偏移量in-K,内容in-V,上下文对象context)
通过
context.write(out-K,out-V)
传递给
→收集器,其
MapOutputCollection
的
collect()
方法传递到
→环形缓冲区(首尾相连的数组)
→溢出文件
每个溢出文件执行两步: 1.快速排序 、2.分区(将溢出文件中的数据,分成多个区)(可自定义分区)
同一个MapTask-x中,多个溢出文件合并(根据区号 ,归并排序),不同MapTask-x中,的相同区再进归并排序,分成不同分区,每个分区合并key,写成数组形式(A[1,1])(可自定义合并规则)。
每个分区由对应的ReduceTask-x,进入Reduce阶段
ReduceTask-x一次只传一个K-V对
→Reduce类中的
reduce()
,进行累加A[1,1],通过
context.wirte(A,sum)
传递给
→OutputFormat<K,V>
类中的
RecordWriter<K,V>
对象中的
write()
方法,写入
→HDFS
HDFS中生成:_SUCCESS
part-r-00000
第0区/part-r-00001
第一区
面试细节:
1.环形缓冲区MapOutputBuffer
在MRJobConfig.class中可知:默认大小100M、数据占存放空间百分比
String IO_SORT_MB = "mapreduce.task.io.sort.mb";
int DEFAULT_IO_SORT_MB = 100;
String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent";
在MapOutputBuffer.class(该类实现了MapOutputCollector,该类是MapTask的一个内部类)中可知:数据占存放空间百分比=80%
2.溢出文件SpillRecord
SpillRecord.class中有K-V结构对
private final ByteBuffer buf;
private final LongBuffer entries;
3.收集器MapOutputCollector
4.快速排序
对待排序的类,实现Comparable类并重写compareTo()方法
5.分区Partitioner
实现Partitioner,重写方法,通过返回值实现区号
public interface Partitioner<K2,V2> extends JobConfigurable {
int getPartition(K2 var1, V2 var2, int var3);
}
其默认实现类HashPartitioner.class中已实现分区方法
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
(分区号不能乱写,默认使用的HashPartitioner,已经将reduce的数量和分区个数设置为了一致)
6.第二次归并排序优化:依首字母合并
job.setGroupingComparatorClass(自定义规则); 自定义合并规则 张X:张三、张思
MapReduce优化
使用Combiner
Combiner:本地运算(也称为:本地Reduce)
Reduce:全局运算
一、实现:
a.自定义Combiner类(自定义xxx:继承Reducer)
public class WCCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0 ;
for(IntWritable value:values){//[1,1,1]
sum += value.get() ;
}
context.write( key , new IntWritable( sum) );
}
}
b.告诉mp使用了Combiner(job设置)
job.setCombinerClass(WCCombiner.class);
c.优化:若WCCombiner和WCReduce中代码相同,则可以直接略过a、b改为以下:
job.setCombinerClass(WCReader.class);
二、触发时机:
- 从环形缓冲区中溢出文件时,先执行Combiner操作再生成溢出文件
- 合并多个溢出文件时(归并排序),先执行Combiner操作再进行归并排序合并溢出文件
三、使用场景:
Combiner使用时会出现一些细节问题,需要考虑每个项目的需求来判断是否使用,如:
推荐使用:求和问题
不推荐:平均数问题
压缩优化传输
一、场景解读:
在分布式计算框架中,A→B发送数据(文件),若文件直接以文本形式发送则传输速率很慢,所以采用如下操作:A:将文本文件压缩、发送压缩文件
→B:接收压缩文件、解压
MapReduce内置了各种类型的压缩手段,只需开启需要的类型即可
二、六种压缩模式:
DefaultCodec
GzipCodec
BZip2Codec
LzopCOdec
Lz4Codec
SnappyCodec
【附】:四种常见压缩格式的特征和比较https://blog.csdn.net/gjggw123/article/details/104633899
三、实现:
//开启map阶段的压缩
conf.setBoolean("mapreduce.map.output.compress",true);
//哪种压缩类型,只需修改参数二
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
//在输入/输出文件路径底下
//解压缩:开始reduce阶段的解压缩
FileOutputFormat.setCompressOutput(job,true);
//解压缩类型(需要与压缩类型保持一致)
FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
单机版MapReduce搭载
在Windows的IDEA中编写代码再通过yarn集群上MapReduce访问HDFS,理论上可以,但是存在跨平台问题,需要配置兼容插件,非常麻烦而且存在版本升级问题,所以不推荐。
搭载
1.解压Linux上的Hadoop3.2.1到Windows下,直接去winutils下找对应版本的bin替换hadoop3.2.1/bin里面的文件,一般只需要添加winutils.exe
和hadoop.dll
2.配置环境变量
HADOOP_HOME C:\hadoop-3.2.1
---
PATH %HADOOP_HOME%\bin
3.在Maven项目中添加两个路径参数:
Application—项目—Configuration—Program arguments:D:\myinput D:\myoutput
4.Run
若还是报错HADOOP_HOME问题则拷贝一份hadoop.dll
到C:\Windows\System32
下
本地MapReduce+集群HDFS
1.将集群中core-site.xml
拷贝到项目的recourses资源文件底下,<configuration>只需配置一个:
<configuration>
<!--hdfs -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://centos01:9000</value>
</property>
</configuration>
2.修改项目中的路径参数
Application—项目—Configuration—Program arguments:/user/CTX/myinput /user/CTX/myoutput
3.Run
若出现【/temp】
权限不足问题:
(1)添加参数Application—项目—Configuration—VM options:-DHADOOP_USER_NAME=root
,通过运行参数,指定访问hdfs集群时的用户
(2)修改权限:bin/hadoop fs -chmod -R 777 /temp
MP组织架构和作业执行流程
源码解读
waitForCompletion() → submit() →
1. connect()
→ new Cluster () → initialize() → clientProtocol():
可以读取配置文件,没有为 null,若有则通过配置文件感知当前环境是yarn(YARNRunner)还是Local(LocalJobRunner)
2.submitter.submitJobInternal():
用于将任务真正的提交到yarn|local中(mp运行环境中)
this.checkSpecs(job) → checkOutputSpecs():
确保输出路径不能存在,否则报异常,例如在执行以下命令时: /myoutput不能存在
JobSubmissionFiles.getStagingDir(cluster, conf)
返回值:(暂存区):file:/tmp/hadoop-YANQUN/mapred/staging/YANQUN1195299999/.staging
JobID jobId = this.submitClient.getNewJobID()
获取jobid:job_local1195299999_0001
通过以下 给暂存区中 设置一个 名字为jobid的子文件夹
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
this.copyAndConfigureFiles(job, submitJobDir);
job(当前执行的mp任务) submitJobDir:当前任务在暂存区中的路径
this.writeSplits(job, submitJobDir);
将当前任务的切片信息 放入暂存区中,共后续使用(图7)
this.writeConf(conf, submitJobFile);
写配置文件(job.xml) 写入暂存区 (job.xml包含了job的相关配置信息 以及一些环境信息)
submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
提交 ->YARN
this.jtFs.delete(submitJobDir, true);
执行完毕之后,清空暂存区的数据
版本问题
硬编码configuration.set("name","value") > (项目classpath)xxx-site.xml > (项目classpath)xxx-default.xml > 集群中的7个配置文件
阅读源码可知:如果要将配置文件放入项目的classpath,只能放此6个
Configuration时,会自动加载:
Configuration.addDefaultResource("core-default.xml");
Configuration.addDefaultResource("core-site.xml");
Job时,会自动加载:
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");
版本差异
2.x: import org.apache.hadoop.mapreduce.xxx ,对于Mapper等,是抽象类;Job
ResourceManager/Application Master -> NodeManager
1.x import org.apache.hadoop.mapred.xxx,对于Mapper等,是接口;JobClient
JobTracker -> TaskTracker
1.x->2.x进化:(4000个节点为界限)JobTracker -> ResourceManager(资源) + Application Master(应用程序)
TaskTracker -> NodeManager
虽然1.x->2.x架构改变,但是大部分代码(业务逻辑)无需改变,1.x/2.x在map()/reduce()方法签名不一致(参数列表、修饰符)