验证MapReduce

运行mp自带的WordCount程序,
客户端可解压查看源码:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1-source.jar

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

由上可知WordCount类运行需要两个参数:

  • 文件输入路径(存在):/user/CTX/mpinput
  • 结果输出路径(不存在):/user/CTX/mpoutput

运行时要保证RecourseManager运行(start-yarn.sh),
命令:bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jar wordcount /myinput/ /myoutput bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /user/CTX/mpinput /user/CTX/mpoutput

wordcount大小写问题

1.由字节码examples-3.2.1.jar中的META_INF/MANIFEST.MF可得入口类Main-Class org.apache.hadoop.examples.ExampleDriver
2.由源码source.jar中的ExampleDriver可得:

public class ExampleDriver {
  
  public static void main(String argv[]){
    int exitCode = -1;
    ProgramDriver pgd = new ProgramDriver();
    try {
      pgd.addClass("wordcount", WordCount.class, 
                   "A map/reduce program that counts the words in the input files.");
      //...
      exitCode = pgd.run(argv);
    }
    catch(Throwable e){
      e.printStackTrace();
    }
    
    System.exit(exitCode);
  }
}

找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster

Hadoop3.x版本出现此报错解决方案:
1.命令行输入:hadoop classpath
2.yarn-site.xml中添加配置参数

   <property>
         <name>yarn.application.classpath</name>
         <value>【命令返回的classpath】</value>
   </property>

3.重启yarn(stop-yarn.shstart-yarn.sh)

相关端口

  • centos01:9870:查看hdfs
  • centos01:8088:查看resourceManager
  • centos01:19888:查看mapreuce历史运行情况
    需要手动启动:sbin/mr-jobhistory-daemon.sh start historyserver

WordCount源码分析

public class WordCount {

  public static class TokenizerMapper
          /**
           * Mapper<输入key, 输入value, 输出key, 输出value>
           * 输入key:当前读取某一行数据  距离文件起始为值的 偏移量
           * 输入value:当前某一行数据的内容
           * 输出key:某个单词
           * 输出value:某个单词出现的次数
           */
       extends Mapper<Object, Text, Text, IntWritable>{

    //计数器
    private final static IntWritable one = new IntWritable(1);
    //文本获取,相当于String
    private Text word = new Text();

    /**
     *map(输入key, 输入value, Context context)
     * @param key:某一行偏移量
     * @param value:某一行内容
     * @param context:上下文对象:map -shuffle-reduce
     */
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      //以空白字符为分隔符拆分
      StringTokenizer itr = new StringTokenizer(value.toString());
      //迭代每个元素
      while (itr.hasMoreTokens()) {
        //类型转换String->Test
        word.set(itr.nextToken());
        //给每个元素标“1”,传给下一个阶段shuffle(默认):key-单词,value-单词对应1的数组集[1,1,1],再到Reduce
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer
          /**
           * Reducer<输入key, 输入value(元素类型 1), 输出key, 输出value>
            */
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    /**
     * reduce(输入key, 输入values(集合 [1,1,1]),Context context)
     */
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      //出现次数
      int sum = 0;
      for (IntWritable val : values) {
        //累加数组中的每个 1
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    //自动加载配置文件
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    //输入、输出参数判断
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);//字符串类型
    job.setOutputValueClass(IntWritable.class);//次数类型
    //输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    //输出路径
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

MapReduce流程详解

处理流程:

文件(按行拆分成多个)切片FileInputformat(默认,继承自)Inputformat<K,V>类中的RecordReader<K,V>对象中的nextKeyValue()方法 读取每一行中的偏移量和内容MapTask-0/MapTask-1/...(每个切片由一个线程操作)Mapper类中真正的map(偏移量in-K,内容in-V,上下文对象context)通过context.write(out-K,out-V)传递给收集器,其MapOutputCollectioncollect()方法传递到环形缓冲区(首尾相连的数组)溢出文件

每个溢出文件执行两步: 1.快速排序 、2.分区(将溢出文件中的数据,分成多个区)(可自定义分区)
同一个MapTask-x中,多个溢出文件合并(根据区号 ,归并排序),不同MapTask-x中,的相同区再进归并排序,分成不同分区,每个分区合并key,写成数组形式(A[1,1])(可自定义合并规则)
每个分区由对应的ReduceTask-x,进入Reduce阶段

ReduceTask-x一次只传一个K-V对Reduce类中的reduce(),进行累加A[1,1],通过context.wirte(A,sum)传递给OutputFormat<K,V>类中的RecordWriter<K,V>对象中的write()方法,写入HDFS

HDFS中生成:
_SUCCESS
part-r-00000第0区/part-r-00001第一区

面试细节:

1.环形缓冲区MapOutputBuffer
在MRJobConfig.class中可知:默认大小100M、数据占存放空间百分比

String IO_SORT_MB = "mapreduce.task.io.sort.mb";
int DEFAULT_IO_SORT_MB = 100;

String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent";

在MapOutputBuffer.class(该类实现了MapOutputCollector,该类是MapTask的一个内部类)中可知:数据占存放空间百分比=80%
2.溢出文件SpillRecord
SpillRecord.class中有K-V结构对

  private final ByteBuffer buf;
  private final LongBuffer entries;

3.收集器MapOutputCollector
4.快速排序
对待排序的类,实现Comparable类并重写compareTo()方法
5.分区Partitioner
实现Partitioner,重写方法,通过返回值实现区号

public interface Partitioner<K2,V2> extends JobConfigurable {
    int getPartition(K2 var1, V2 var2, int var3);
}

其默认实现类HashPartitioner.class中已实现分区方法

  public int getPartition(K2 key, V2 value, int numReduceTasks) {
      return (key.hashCode() & 2147483647) % numReduceTasks;
  }

(分区号不能乱写,默认使用的HashPartitioner,已经将reduce的数量和分区个数设置为了一致)
6.第二次归并排序优化:依首字母合并
job.setGroupingComparatorClass(自定义规则); 自定义合并规则 张X:张三、张思


MapReduce优化

使用Combiner


Combiner:本地运算(也称为:本地Reduce)
Reduce:全局运算

一、实现:
a.自定义Combiner类(自定义xxx:继承Reducer)

public class WCCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0 ;
        for(IntWritable value:values){//[1,1,1]
            sum +=    value.get() ;
        }
        context.write( key , new IntWritable( sum) );
    }
}

b.告诉mp使用了Combiner(job设置)

    job.setCombinerClass(WCCombiner.class);

c.优化:若WCCombiner和WCReduce中代码相同,则可以直接略过a、b改为以下:

    job.setCombinerClass(WCReader.class);

二、触发时机:

  1. 从环形缓冲区中溢出文件时,先执行Combiner操作再生成溢出文件
  2. 合并多个溢出文件时(归并排序),先执行Combiner操作再进行归并排序合并溢出文件

三、使用场景:
Combiner使用时会出现一些细节问题,需要考虑每个项目的需求来判断是否使用,如:
推荐使用:求和问题
不推荐:平均数问题

压缩优化传输

一、场景解读:
分布式计算框架中,A→B发送数据(文件),若文件直接以文本形式发送则传输速率很慢,所以采用如下操作:
A:将文本文件压缩、发送压缩文件B:接收压缩文件、解压
MapReduce内置了各种类型的压缩手段,只需开启需要的类型即可

二、六种压缩模式:
DefaultCodec
GzipCodec
BZip2Codec
LzopCOdec
Lz4Codec
SnappyCodec
【附】:四种常见压缩格式的特征和比较https://blog.csdn.net/gjggw123/article/details/104633899

三、实现:

//开启map阶段的压缩
conf.setBoolean("mapreduce.map.output.compress",true);
//哪种压缩类型,只需修改参数二
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

//在输入/输出文件路径底下
//解压缩:开始reduce阶段的解压缩
FileOutputFormat.setCompressOutput(job,true);
//解压缩类型(需要与压缩类型保持一致)
FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);

单机版MapReduce搭载

在Windows的IDEA中编写代码再通过yarn集群上MapReduce访问HDFS,理论上可以,但是存在跨平台问题,需要配置兼容插件,非常麻烦而且存在版本升级问题,所以不推荐。

搭载

1.解压Linux上的Hadoop3.2.1到Windows下,直接去winutils下找对应版本的bin替换hadoop3.2.1/bin里面的文件,一般只需要添加winutils.exehadoop.dll
2.配置环境变量

HADOOP_HOME    C:\hadoop-3.2.1
---
PATH           %HADOOP_HOME%\bin

3.在Maven项目中添加两个路径参数:
Application—项目—Configuration—Program arguments:D:\myinput D:\myoutput
4.Run
若还是报错HADOOP_HOME问题则拷贝一份hadoop.dllC:\Windows\System32

本地MapReduce+集群HDFS

1.将集群中core-site.xml拷贝到项目的recourses资源文件底下,<configuration>只需配置一个:

<configuration>
    <!--hdfs -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://centos01:9000</value>
    </property>
</configuration>

2.修改项目中的路径参数
Application—项目—Configuration—Program arguments:/user/CTX/myinput /user/CTX/myoutput
3.Run
若出现【/temp】权限不足问题:
(1)添加参数Application—项目—Configuration—VM options:-DHADOOP_USER_NAME=root,通过运行参数,指定访问hdfs集群时的用户
(2)修改权限:bin/hadoop fs -chmod -R 777 /temp


MP组织架构和作业执行流程

源码解读

waitForCompletion() → submit() →
    1. connect()
        → new Cluster () → initialize() → clientProtocol():
        可以读取配置文件,没有为 null,若有则通过配置文件感知当前环境是yarn(YARNRunner)还是Local(LocalJobRunner)
       
        
    2.submitter.submitJobInternal():
    用于将任务真正的提交到yarn|local中(mp运行环境中)
        this.checkSpecs(job) → checkOutputSpecs():
        确保输出路径不能存在,否则报异常,例如在执行以下命令时: /myoutput不能存在
        
        JobSubmissionFiles.getStagingDir(cluster, conf)
        返回值:(暂存区):file:/tmp/hadoop-YANQUN/mapred/staging/YANQUN1195299999/.staging
        
        JobID jobId = this.submitClient.getNewJobID()
        获取jobid:job_local1195299999_0001
        
        通过以下 给暂存区中 设置一个 名字为jobid的子文件夹
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        this.copyAndConfigureFiles(job, submitJobDir);
        job(当前执行的mp任务)   submitJobDir:当前任务在暂存区中的路径
        this.writeSplits(job, submitJobDir);
        将当前任务的切片信息 放入暂存区中,共后续使用(图7)
        this.writeConf(conf, submitJobFile);
        写配置文件(job.xml) 写入暂存区  (job.xml包含了job的相关配置信息 以及一些环境信息)
        submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
        提交 ->YARN
        this.jtFs.delete(submitJobDir, true);
        执行完毕之后,清空暂存区的数据

版本问题

硬编码configuration.set("name","value") > (项目classpath)xxx-site.xml > (项目classpath)xxx-default.xml > 集群中的7个配置文件

阅读源码可知:如果要将配置文件放入项目的classpath,只能放此6个

Configuration时,会自动加载:
Configuration.addDefaultResource("core-default.xml");         
Configuration.addDefaultResource("core-site.xml");
Job时,会自动加载:
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");

版本差异

2.x:  import org.apache.hadoop.mapreduce.xxx ,对于Mapper等,是抽象类;Job
      ResourceManager/Application Master -> NodeManager
1.x   import org.apache.hadoop.mapred.xxx,对于Mapper等,是接口;JobClient
      JobTracker  -> TaskTracker

1.x->2.x进化:(4000个节点为界限)
JobTracker -> ResourceManager(资源) + Application Master(应用程序)
TaskTracker -> NodeManager
虽然1.x->2.x架构改变,但是大部分代码(业务逻辑)无需改变,1.x/2.x在map()/reduce()方法签名不一致(参数列表、修饰符)

Last modification:May 7th, 2020 at 03:29 pm
喵ฅฅ