Loading... ## 验证MapReduce 运行mp自带的WordCount程序, 客户端可解压查看源码:`/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1-source.jar` ```Java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ``` 由上可知WordCount类运行需要两个参数: * 文件输入路径(存在):`/user/CTX/mpinput` * 结果输出路径(不存在):`/user/CTX/mpoutput` 运行时要保证 `RecourseManager`运行(`start-yarn.sh`), 命令:`bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jar wordcount /myinput/ /myoutput bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /user/CTX/mpinput /user/CTX/mpoutput` ### wordcount大小写问题 1.由字节码 `examples-3.2.1.jar`中的 `META_INF/MANIFEST.MF`可得**入口类Main-Class** `org.apache.hadoop.examples.ExampleDriver` 2.由源码 `source.jar`中的 `ExampleDriver`可得: ```Java public class ExampleDriver { public static void main(String argv[]){ int exitCode = -1; ProgramDriver pgd = new ProgramDriver(); try { pgd.addClass("wordcount", WordCount.class, "A map/reduce program that counts the words in the input files."); //... exitCode = pgd.run(argv); } catch(Throwable e){ e.printStackTrace(); } System.exit(exitCode); } } ``` ### 找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster Hadoop3.x版本出现此报错解决方案: 1.命令行输入:`hadoop classpath` 2.`yarn-site.xml`中添加配置参数 ```XML <property> <name>yarn.application.classpath</name> <value>【命令返回的classpath】</value> </property> ``` 3.重启yarn(`stop-yarn.sh`、`start-yarn.sh`) ### 相关端口 * `centos01:9870`:查看hdfs * `centos01:8088`:查看resourceManager * `centos01:19888`:查看mapreuce历史运行情况 需要手动启动:`sbin/mr-jobhistory-daemon.sh start historyserver` --- ## WordCount源码分析 ```Java public class WordCount { public static class TokenizerMapper /** * Mapper<输入key, 输入value, 输出key, 输出value> * 输入key:当前读取某一行数据 距离文件起始为值的 偏移量 * 输入value:当前某一行数据的内容 * 输出key:某个单词 * 输出value:某个单词出现的次数 */ extends Mapper<Object, Text, Text, IntWritable>{ //计数器 private final static IntWritable one = new IntWritable(1); //文本获取,相当于String private Text word = new Text(); /** *map(输入key, 输入value, Context context) * @param key:某一行偏移量 * @param value:某一行内容 * @param context:上下文对象:map -shuffle-reduce */ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { //以空白字符为分隔符拆分 StringTokenizer itr = new StringTokenizer(value.toString()); //迭代每个元素 while (itr.hasMoreTokens()) { //类型转换String->Test word.set(itr.nextToken()); //给每个元素标“1”,传给下一个阶段shuffle(默认):key-单词,value-单词对应1的数组集[1,1,1],再到Reduce context.write(word, one); } } } public static class IntSumReducer /** * Reducer<输入key, 输入value(元素类型 1), 输出key, 输出value> */ extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); /** * reduce(输入key, 输入values(集合 [1,1,1]),Context context) */ public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { //出现次数 int sum = 0; for (IntWritable val : values) { //累加数组中的每个 1 sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { //自动加载配置文件 Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //输入、输出参数判断 if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class);//字符串类型 job.setOutputValueClass(IntWritable.class);//次数类型 //输入路径 for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //输出路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ``` --- ## MapReduce流程详解 ### 处理流程: `文件`→ `(按行拆分成多个)切片`→**`FileInputformat`**`(默认,继承自)`**`Inputformat<K,V>`**`类中的`**`RecordReader<K,V>`**`对象中的`**`nextKeyValue()`**`方法 读取每一行中的偏移量和内容`→`MapTask-0/MapTask-1/...(每个切片由一个线程操作)`→`Mapper类中真正的`**`map(偏移量in-K,内容in-V,上下文对象context)`**`通过`**`context.write(out-K,out-V)`**`传递给`→`收集器,其`**`MapOutputCollection`**`的`**`collect()`**`方法传递到`→`环形缓冲区(首尾相连的数组)`→`溢出文件` 每个溢出文件执行两步: 1.**快速排序** 、2.分区(将溢出文件中的数据,分成多个区)**(可自定义分区)** 同一个MapTask-x中,多个溢出文件合并(根据区号 ,**归并排序**),不同MapTask-x中,的相同区再进**归并排序**,分成不同分区,每个分区合并key,写成数组形式(A[1,1])**(可自定义合并规则)**。 每个分区由对应的ReduceTask-x,进入Reduce阶段 `ReduceTask-x一次只传一个K-V对`→`Reduce类中的`**`reduce()`**`,进行累加A[1,1],通过`**`context.wirte(A,sum)`**`传递给`→**`OutputFormat<K,V>`**`类中的`**`RecordWriter<K,V>`**`对象中的`**`write()`**`方法,写入`→`HDFS` HDFS中生成: `_SUCCESS` `part-r-00000`第0区/`part-r-00001`第一区 ### 面试细节: 1.**环形缓冲区MapOutputBuffer** 在MRJobConfig.class中可知:默认大小100M、数据占存放空间百分比 ```Java String IO_SORT_MB = "mapreduce.task.io.sort.mb"; int DEFAULT_IO_SORT_MB = 100; String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent"; ``` 在MapOutputBuffer.class(该类实现了MapOutputCollector,该类是MapTask的一个内部类)中可知:数据占存放空间百分比=80% 2.**溢出文件SpillRecord** SpillRecord.class中有K-V结构对 ```Java private final ByteBuffer buf; private final LongBuffer entries; ``` 3.**收集器MapOutputCollector** 4.**快速排序** 对待排序的类,实现Comparable类并重写compareTo()方法 5.**分区Partitioner** 实现Partitioner,重写方法,通过返回值实现区号 ```Java public interface Partitioner<K2,V2> extends JobConfigurable { int getPartition(K2 var1, V2 var2, int var3); } ``` 其默认实现类HashPartitioner.class中已实现分区方法 ```Java public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & 2147483647) % numReduceTasks; } ``` (分区号不能乱写,默认使用的HashPartitioner,已经将reduce的数量和分区个数设置为了一致) 6.**第二次归并排序优化:依首字母合并** job.setGroupingComparatorClass(自定义规则); 自定义合并规则 张X:张三、张思  --- ## MapReduce优化 ### 使用Combiner  Combiner:本地运算(也称为:本地Reduce) Reduce:全局运算 **一、实现:** a.自定义Combiner类(自定义xxx:继承Reducer) ```Java public class WCCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for(IntWritable value:values){//[1,1,1] sum += value.get() ; } context.write( key , new IntWritable( sum) ); } } ``` b.告诉mp使用了Combiner(job设置) ```Java job.setCombinerClass(WCCombiner.class); ``` c.优化:若WCCombiner和WCReduce中代码相同,则可以直接略过a、b改为以下: ```Java job.setCombinerClass(WCReader.class); ``` **二、触发时机:** 1. 从环形缓冲区中溢出文件时,先执行Combiner操作再生成溢出文件 2. 合并多个溢出文件时(归并排序),先执行Combiner操作再进行归并排序合并溢出文件 **三、使用场景:** Combiner使用时会出现一些细节问题,需要考虑每个项目的需求来判断是否使用,如: 推荐使用:求和问题 不推荐:平均数问题 ### 压缩优化传输 **一、场景解读:** 在**分布式计算框架**中,A→B发送数据(文件),若文件直接以文本形式发送则传输速率很慢,所以采用如下操作: `A:将文本文件压缩、发送压缩文件`→`B:接收压缩文件、解压` MapReduce内置了各种类型的压缩手段,只需开启需要的类型即可 **二、六种压缩模式:** DefaultCodec GzipCodec BZip2Codec LzopCOdec Lz4Codec SnappyCodec 【附】:四种常见压缩格式的特征和比较https://blog.csdn.net/gjggw123/article/details/104633899 **三、实现:** ```Java //开启map阶段的压缩 conf.setBoolean("mapreduce.map.output.compress",true); //哪种压缩类型,只需修改参数二 conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); //在输入/输出文件路径底下 //解压缩:开始reduce阶段的解压缩 FileOutputFormat.setCompressOutput(job,true); //解压缩类型(需要与压缩类型保持一致) FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class); ``` --- ## 单机版MapReduce搭载 在Windows的IDEA中编写代码再通过yarn集群上MapReduce访问HDFS,理论上可以,但是存在跨平台问题,需要配置兼容插件,非常麻烦而且存在版本升级问题,所以不推荐。 ### 搭载 1.解压Linux上的Hadoop3.2.1到Windows下,直接去[winutils](https://github.com/TangSong99/winutils)下找对应版本的bin替换hadoop3.2.1/bin里面的文件,一般只需要添加 `winutils.exe`和 `hadoop.dll` 2.配置环境变量 ```path HADOOP_HOME C:\hadoop-3.2.1 --- PATH %HADOOP_HOME%\bin ``` 3.在Maven项目中添加两个路径参数: **Application—项目—Configuration—Program arguments:`D:\myinput D:\myoutput`** 4.Run 若还是报错HADOOP_HOME问题则拷贝一份 `hadoop.dll`到 `C:\Windows\System32`下 ### 本地MapReduce+集群HDFS 1.将集群中 `core-site.xml`拷贝到项目的recourses资源文件底下,<configuration>只需配置一个: ```xml <configuration> <!--hdfs --> <property> <name>fs.defaultFS</name> <value>hdfs://centos01:9000</value> </property> </configuration> ``` 2.修改项目中的路径参数 **Application—项目—Configuration—Program arguments:`/user/CTX/myinput /user/CTX/myoutput`** 3.Run 若出现 `【/temp】`权限不足问题: (1)添加参数**Application—项目—Configuration—VM options:`-DHADOOP_USER_NAME=root`**,通过运行参数,指定访问hdfs集群时的用户 (2)修改权限:`bin/hadoop fs -chmod -R 777 /temp` --- ## MP组织架构和作业执行流程  ### 源码解读 ```Java waitForCompletion() → submit() → 1. connect() → new Cluster () → initialize() → clientProtocol(): 可以读取配置文件,没有为 null,若有则通过配置文件感知当前环境是yarn(YARNRunner)还是Local(LocalJobRunner) 2.submitter.submitJobInternal(): 用于将任务真正的提交到yarn|local中(mp运行环境中) this.checkSpecs(job) → checkOutputSpecs(): 确保输出路径不能存在,否则报异常,例如在执行以下命令时: /myoutput不能存在 JobSubmissionFiles.getStagingDir(cluster, conf) 返回值:(暂存区):file:/tmp/hadoop-YANQUN/mapred/staging/YANQUN1195299999/.staging JobID jobId = this.submitClient.getNewJobID() 获取jobid:job_local1195299999_0001 通过以下 给暂存区中 设置一个 名字为jobid的子文件夹 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); this.copyAndConfigureFiles(job, submitJobDir); job(当前执行的mp任务) submitJobDir:当前任务在暂存区中的路径 this.writeSplits(job, submitJobDir); 将当前任务的切片信息 放入暂存区中,共后续使用(图7) this.writeConf(conf, submitJobFile); 写配置文件(job.xml) 写入暂存区 (job.xml包含了job的相关配置信息 以及一些环境信息) submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); 提交 ->YARN this.jtFs.delete(submitJobDir, true); 执行完毕之后,清空暂存区的数据 ``` ### 版本问题 **硬编码configuration.set("name","value") > (项目classpath)xxx-site.xml > (项目classpath)xxx-default.xml > 集群中的7个配置文件** 阅读源码可知:如果要将配置文件放入项目的classpath,只能放此6个 ```Java Configuration时,会自动加载: Configuration.addDefaultResource("core-default.xml"); Configuration.addDefaultResource("core-site.xml"); Job时,会自动加载: Configuration.addDefaultResource("mapred-default.xml"); Configuration.addDefaultResource("mapred-site.xml"); Configuration.addDefaultResource("yarn-default.xml"); Configuration.addDefaultResource("yarn-site.xml"); ``` **版本差异** ```Java 2.x: import org.apache.hadoop.mapreduce.xxx ,对于Mapper等,是抽象类;Job ResourceManager/Application Master -> NodeManager 1.x import org.apache.hadoop.mapred.xxx,对于Mapper等,是接口;JobClient JobTracker -> TaskTracker ``` 1.x->2.x进化:(4000个节点为界限) `JobTracker -> ResourceManager(资源) + Application Master(应用程序)` `TaskTracker -> NodeManager` 虽然1.x->2.x架构改变,但是大部分代码(业务逻辑)无需改变,1.x/2.x在map()/reduce()方法签名不一致(参数列表、修饰符) Last modification:August 12, 2022 © Allow specification reprint Like 0 喵ฅฅ