大數據 Hadoop(中) 筆記大全 收藏加關注


大數據 Hadoop(中) 筆記大全 收藏加關注


五大核心組件:

下述五大組件都是可編程,可以根據用戶需求自行配置

  • InputFormat 組件
  • Mapper組件
  • Partitioner組件
  • Redcer組件
  • OutputFormat組件

不是必備的組件:

  • Combiner

用於優化程序性能,可以在不影響最終業務結果的前提下使用


InputFormat組件

輸入組件:InputForamt組件有兩個功能

  • 數據的切分:按照某種規則將輸入數據切分成若干個Split,確定MapTask個數以及對應的Split
  • 為Mapper提供數據:給定某個Split,將其解析成一個一個的key、value對

(1)什麼是切片?


(2)如何切片?(重點)

使用時本地進行測試 所以blockSize 和SplitSize 通通都是32MB

package org.apache.hadoop.mapreduce.lib.input; 包

FileInputFormat 類getSplits 方法221-276行

<code>protected boolean isSplitable(JobContext context, Path filename) {    // 文件默認可以切分  return true;}​​// 計算切片大小的方法 protected long computeSplitSize(long blockSize, long minSize, long maxSize) {    return Math.max(minSize, Math.min(maxSize, blockSize));      //  本地 32MB          32MB               }​public List

大數據 Hadoop(中) 筆記大全 收藏加關注


如下錯誤是因為在虛擬機的環境下沒有Mysql環境(沒有jar)

<code>將 mysql jar 拷貝到/home/hadoop/hadoop-2.6.0/share/hadoop/yarn/ 文件夾即可 /<code>
跨平臺提交

基本和之前保持一致

<code>System.setProperty("HADOOP_USER_NAME", "root");​configuration.addResource("conf2/core-site.xml");configuration.addResource("conf2/hdfs-site.xml");configuration.addResource("conf2/mapred-site.xml");configuration.addResource("conf2/yarn-site.xml");configuration.set("mapreduce.app-submission.cross-platform", "true");configuration.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");/<code>

自定義InputFormat

解決小文件存儲

涉及到的知識點:自定義InputFormat, SequenceFileOutputFormat

<code>思路 : 首先通過自定義的InputFormat 代碼將數據讀取過來(一次進行處理,將小文件的內容一次性輸出到大文件中),使用SequenceFileOutputFormat輸出文件內容SequenceFile文件是Haadoop用來存儲二進制形式的key-value文件格式key - 文件的路徑+名稱value - 二進制文件的內容 /<code>
<code>package com.baizhi.test05;​import com.baizhi.test03.BeanJob;import com.baizhi.test03.BeanMapper;import com.baizhi.test03.BeanReducer;import com.baizhi.test03.FlowBean;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.ByteWritable;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.MRJobConfig;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;​public class FileJob {  public static void main(String[] args) throws Exception {​    // System.setProperty("HADOOP_USER_NAME", "root");​    Configuration conf = new Configuration();/*​    conf.addResource("conf2/core-site.xml");    conf.addResource("conf2/hdfs-site.xml");    conf.addResource("conf2/mapred-site.xml");    conf.addResource("conf2/yarn-site.xml");    conf.set("mapreduce.app-submission.cross-platform", "true");​    conf.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");*/​​    Job job = Job.getInstance(conf);​    /*     * 設置類加載器     * */    job.setJarByClass(FileJob.class);​    job.setInputFormatClass(OwnInputFormat.class);    job.setOutputFormatClass(SequenceFileOutputFormat.class);​​    //TextInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\作業\\\\數據\\\\log.txt"));    OwnInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test05\\\\in"));    //TextOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\作業\\\\數據\\\\out1"));    SequenceFileOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test05\\\\out01"));​​    job.setMapperClass(FileMapper.class);    job.setReducerClass(FileReducer.class);​​    job.setMapOutputKeyClass(Text.class);    job.setMapOutputValueClass(BytesWritable.class);​​    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(BytesWritable.class);​    job.waitForCompletion(true);​​   }}​/<code>


大數據 Hadoop(中) 筆記大全 收藏加關注


<code>package com.baizhi.test05;

import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FileMapper extends Mapper<text> {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key,value);

}
}

/<text>/<code>
<code>package com.baizhi.test05;​import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;​import java.io.IOException;​public class FileReducer extends Reducer<text> {  @Override  protected void reduce(Text key, Iterable<byteswritable> values, Context context) throws IOException, InterruptedException {​    for (BytesWritable value : values) {​      context.write(key, value);     }   }}​/<byteswritable>/<text>/<code>
<code>package com.baizhi.test05;​import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.ByteWritable;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;​import java.io.IOException;​public class OwnInputFormat extends FileInputFormat<text> {​​  /*  * 是否可以切分?  * */​  @Override  protected boolean isSplitable(JobContext context, Path filename) {    return false;   }​  public RecordReader<text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {​    OwnRecordReader recordReader = new OwnRecordReader();    recordReader.initialize(inputSplit, taskAttemptContext);    return recordReader;​   }}​/<text>/<text>/<code>
<code>package com.baizhi.test05;​import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;​import java.io.IOException;​public class OwnRecordReader extends RecordReader<text> {  // 定義key  Text key = new Text();  // 定義value  BytesWritable value = new BytesWritable();​  FileSplit fileSplit;​  Configuration configuration;​  boolean isProgress = true;​  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {​    // 初始化​    fileSplit = (FileSplit) inputSplit;    /*    *通過TaskAttemptContext拿到配置對象    * */    configuration = taskAttemptContext.getConfiguration();​​   }​  /*   * 在nextKeyValue () 方法中封裝key和value的值   * */  public boolean nextKeyValue() throws IOException, InterruptedException {​​    if (isProgress) {      /*      * 獲取path對象      * */      Path path = fileSplit.getPath();​​      /*       * 獲得文件系統對象       * */      FileSystem fileSystem = path.getFileSystem(configuration);​​      /*       * 這是key Text key       * */      String name = path.toString();​      /*      * 封裝key      * */      key.set(new Text(name));​      /*       * 當前遍歷到的文件流  這就是value bytes 為value       * */      FSDataInputStream fsDataInputStream = fileSystem.open(path);​      /*       * 存放value數據的數組       * */      byte[] bytes = new byte[(int) fileSplit.getLength()];​​      /*      * IOUtils直接拷貝      * */      IOUtils.readFully(fsDataInputStream, bytes, 0, bytes.length);​​      /*       * 封裝value中       * */      value.set(bytes, 0, bytes.length);​​      /*       * 關閉資源       * */      IOUtils.closeStream(fsDataInputStream);​      isProgress = false;​      return true;     }​    return false;   }​  public Text getCurrentKey() throws IOException, InterruptedException {    return this.key;   }​  public BytesWritable getCurrentValue() throws IOException, InterruptedException {    return this.value;   }​  public float getProgress() throws IOException, InterruptedException {    return 0;   }​  public void close() throws IOException {​   }}​/<text>/<code>

小問題?

(1) 能不能通過其他方式解決此問題?

(2) 嘗試讀取 SequenceFile 文件中的內容

CombineTextInputFormat

其主要目的就是為了優化小文件的計算

在使用TextInputFormat 情況下 有所少個文件就有多少個切片

<code>job.setInputFormatClass(CombineTextInputFormat.class);  CombineTextInputFormat.setInputPaths(job,new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test05\\\\in"));​CombineTextInputFormat.setMinInputSplitSize(job,10240000);/<code>

切片與MapTask的關係

MapTask的併發數據量由切片數量決定 ReduceTask數量的決定是可以手動進行設置 默認值為1

Partitioner 組件

為什麼相同的key會被分配到一起?

package org.apache.hadoop.mapreduce.lib.partition; 包 HashPartitioner 類 getPartition 方法

<code>package com.baizhi.test06;​import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;​import java.util.HashMap;​public class OwnPartitioner extends Partitioner<text> {​​  // 設置存儲地區和分區數據的Map  private static HashMap<string> map = new HashMap<string>();​  static {​     // 存儲數據    map.put("zz", 0);    map.put("bj", 1);    map.put("tj", 2);    map.put("sh", 3);​   }​​  public int getPartition(Text key, Text value, int i) {    // 獲得到地區名稱    String areaName = key.toString();   // 獲得分區數    Integer num = map.get(areaName);​    /*     * 如果num 沒有值  為空則返回4  不為空則返回正常的值     * */    return num == null ? 4 : num;​   }}​/<string>/<string>/<text>/<code>

需求:

<code>package com.baizhi.test06;​import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;​import java.util.HashMap;​public class OwnPartitioner extends Partitioner<text> {​​  // 設置存儲地區和分區數據的Map  private static HashMap<string> map = new HashMap<string>();​  static {​     // 存儲數據    map.put("zz", 0);    map.put("bj", 1);    map.put("tj", 2);    map.put("sh", 3);​   }​​  public int getPartition(Text key, Text value, int i) {    // 獲得到地區名稱    String areaName = key.toString();   // 獲得分區數    Integer num = map.get(areaName);​    /*     * 如果num 沒有值  為空則返回4  不為空則返回正常的值     * */    return num == null ? 4 : num;​   }}​/<string>/<string>/<text>/<code>

總結:

NumReduceTasks的個數 必須大於等於分區的數量(自定義)

在默認情況NumReduceTasks的個數為1 ,經過測試 ,不管如何手動分區,數據都將在一個分區中(也就是說在NumReduceTasks為1 的情況下,手動分區策略是失效)

另外,我們還觀察到 有多少個NumReduceTasks 就有多少個文件的輸出


<code>package com.baizhi.test06;​import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;​import java.util.HashMap;​public class OwnPartitioner extends Partitioner<text> {​​  // 設置存儲地區和分區數據的Map  private static HashMap<string> map = new HashMap<string>();​  static {​     // 存儲數據    map.put("zz", 0);    map.put("bj", 1);    map.put("tj", 2);    map.put("sh", 3);​   }​​  public int getPartition(Text key, Text value, int i) {    // 獲得到地區名稱    String areaName = key.toString();   // 獲得分區數    Integer num = map.get(areaName);​    /*     * 如果num 沒有值  為空則返回4  不為空則返回正常的值     * */    return num == null ? 4 : num;​   }}​/<string>/<string>/<text>/<code>
<code>package com.baizhi.test06;​import com.baizhi.test03.BeanJob;import com.baizhi.test03.BeanMapper;import com.baizhi.test03.BeanReducer;import com.baizhi.test03.FlowBean;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.MRJobConfig;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;​public class PJob {  public static void main(String[] args) throws Exception {​    //System.setProperty("HADOOP_USER_NAME", "root");​    Configuration conf = new Configuration();////     conf.addResource("conf2/core-site.xml");//     conf.addResource("conf2/hdfs-site.xml");//     conf.addResource("conf2/mapred-site.xml");//     conf.addResource("conf2/yarn-site.xml");//     conf.set("mapreduce.app-submission.cross-platform", "true");////     conf.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");//​    Job job = Job.getInstance(conf);​    /*     * 設置類加載器     * */    job.setJarByClass(PJob.class);​    job.setInputFormatClass(TextInputFormat.class);    job.setOutputFormatClass(TextOutputFormat.class);    /*     * 使用自定義的分區規則     * */    job.setPartitionerClass(OwnPartitioner.class);​    // 設置NumReduceTasks 的個數     job.setNumReduceTasks(5);​​    TextInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\sumFlow.txt"));    //TextInputFormat.setInputPaths(job, new Path("/flow.txt"));    TextOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\outP1111111"));    //TextOutputFormat.setOutputPath(job, new Path("/out111111"));​​    job.setMapperClass(PMapper.class);    job.setReducerClass(PReducer.class);​​    job.setMapOutputKeyClass(Text.class);    job.setMapOutputValueClass(Text.class);​​    job.setOutputKeyClass(NullWritable.class);    job.setOutputValueClass(Text.class);​    job.waitForCompletion(true);​​   }}​/<code>
<code>package com.baizhi.test06;​import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;​import java.io.IOException;​​/* * * reduceTask的個數和輸出文件的關係 * reduceTask 和 分區 之間的關係 * * */public class PMapper extends Mapper<longwritable> {  @Override  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {​    /*     * key 為地區  value 為流量信息     * */​    String areaName = value.toString().split(" ")[4];​    context.write(new Text(areaName), value);​​   }}​/<longwritable>/<code>
<code>package com.baizhi.test06;​import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;​import java.io.IOException;​public class PReducer extends Reducer<text> {  @Override  protected void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException {​    for (Text value : values) {      context.write(NullWritable.get(), value);     }​   }}​/<text>/<text>/<code>

注意 :在上述代碼中 我們沒有關注 業務 只是將不同地區的文件 通過自定義分區代碼 輸出到不同的文件中 ,還設置了ReduceTask的個數



大數據 Hadoop(中) 筆記大全 收藏加關注


分享到:


相關文章: