内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

OutputFormat在java中怎么實現(xiàn)自定義-創(chuàng)新互聯(lián)

本篇文章給大家分享的是有關(guān)OutputFormat在java 中怎么實現(xiàn)自定義,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

成都創(chuàng)新互聯(lián)公司自成立以來,一直致力于為企業(yè)提供從網(wǎng)站策劃、網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、網(wǎng)站制作、電子商務(wù)、網(wǎng)站推廣、網(wǎng)站優(yōu)化到為企業(yè)提供個性化軟件開發(fā)等基于互聯(lián)網(wǎng)的全面整合營銷服務(wù)。公司擁有豐富的網(wǎng)站建設(shè)和互聯(lián)網(wǎng)應用系統(tǒng)開發(fā)管理經(jīng)驗、成熟的應用系統(tǒng)解決方案、優(yōu)秀的網(wǎng)站開發(fā)工程師團隊及專業(yè)的網(wǎng)站設(shè)計師團隊。

java 中 自定義OutputFormat

實例代碼:

package com.ccse.hadoop.outputformat; 
 
import java.io.IOException; 
import java.net.URI; 
import java.net.URISyntaxException; 
import java.util.StringTokenizer; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.OutputCommitter; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 
 
 
public class MySelfOutputFormatApp { 
   
  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput"; 
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput"; 
  public final static String OUTPUT_FILENAME = "/abc"; 
   
  public static void main(String[] args) throws IOException, URISyntaxException,  
    ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); 
    fileSystem.delete(new Path(OUTPUT_PATH), true); 
     
    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName()); 
    job.setJarByClass(MySelfOutputFormatApp.class); 
     
    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); 
    job.setMapperClass(MyMapper.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(LongWritable.class); 
     
    job.setReducerClass(MyReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(LongWritable.class); 
    job.setOutputFormatClass(MyselfOutputFormat.class); 
     
    job.waitForCompletion(true); 
  } 
   
  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 
 
    private Text word = new Text(); 
    private LongWritable writable = new LongWritable(1); 
     
    @Override 
    protected void map(LongWritable key, Text value, 
        Mapper<LongWritable, Text, Text, LongWritable>.Context context) 
        throws IOException, InterruptedException { 
      if (value != null) { 
        String line = value.toString(); 
        StringTokenizer tokenizer = new StringTokenizer(line); 
        while (tokenizer.hasMoreTokens()) { 
          word.set(tokenizer.nextToken()); 
          context.write(word, writable); 
        } 
      } 
    } 
     
  } 
   
  public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 
 
    @Override 
    protected void reduce(Text key, Iterable<LongWritable> values, 
        Reducer<Text, LongWritable, Text, LongWritable>.Context context) 
        throws IOException, InterruptedException { 
      long sum = 0;  
      for (LongWritable value : values) { 
        sum += value.get(); 
      } 
      context.write(key, new LongWritable(sum)); 
    } 
  } 
 
  public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { 
 
    private FSDataOutputStream outputStream = null; 
     
    @Override 
    public RecordWriter<Text, LongWritable> getRecordWriter( 
        TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      try { 
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); 
        //指定文件的輸出路徑 
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH  
                     + MySelfOutputFormatApp.OUTPUT_FILENAME); 
        this.outputStream = fileSystem.create(path, false); 
      } catch (URISyntaxException e) { 
        e.printStackTrace(); 
      } 
      return new MySelfRecordWriter(outputStream); 
    } 
 
    @Override 
    public void checkOutputSpecs(JobContext context) throws IOException, 
        InterruptedException { 
    } 
 
    @Override 
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
        throws IOException, InterruptedException { 
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); 
    } 
     
  } 
   
  public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { 
 
    private FSDataOutputStream outputStream = null; 
     
    public MySelfRecordWriter(FSDataOutputStream outputStream) { 
      this.outputStream = outputStream; 
    } 
     
    @Override 
    public void write(Text key, LongWritable value) throws IOException, 
        InterruptedException { 
      this.outputStream.writeBytes(key.toString()); 
      this.outputStream.writeBytes("\t"); 
      this.outputStream.writeLong(value.get()); 
    } 
 
    @Override 
    public void close(TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      this.outputStream.close(); 
    } 
     
  } 
   
} 

當前題目:OutputFormat在java中怎么實現(xiàn)自定義-創(chuàng)新互聯(lián)
轉(zhuǎn)載注明:http://m.rwnh.cn/article26/djidjg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、電子商務(wù)、關(guān)鍵詞優(yōu)化網(wǎng)站設(shè)計公司、網(wǎng)站排名、用戶體驗

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
济南市| 方山县| 平远县| 南皮县| 太和县| 固始县| 通许县| 扎囊县| 平顶山市| 涟源市| 新竹市| 枣庄市| 横山县| 青浦区| 灵璧县| 奉新县| 绵阳市| 泾川县| 百色市| 长阳| 兴山县| 漠河县| 上虞市| 当涂县| 克东县| 温宿县| 奇台县| 胶州市| 洞口县| 北票市| 姜堰市| 太保市| 蒙自县| 木里| 南开区| 简阳市| 恩平市| 贞丰县| 罗平县| 彭水| 衡阳县|