本篇文章給大家分享的是有關(guān)OutputFormat在java 中怎么實現(xiàn)自定義,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
成都創(chuàng)新互聯(lián)公司自成立以來,一直致力于為企業(yè)提供從網(wǎng)站策劃、網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、網(wǎng)站制作、電子商務(wù)、網(wǎng)站推廣、網(wǎng)站優(yōu)化到為企業(yè)提供個性化軟件開發(fā)等基于互聯(lián)網(wǎng)的全面整合營銷服務(wù)。公司擁有豐富的網(wǎng)站建設(shè)和互聯(lián)網(wǎng)應用系統(tǒng)開發(fā)管理經(jīng)驗、成熟的應用系統(tǒng)解決方案、優(yōu)秀的網(wǎng)站開發(fā)工程師團隊及專業(yè)的網(wǎng)站設(shè)計師團隊。java 中 自定義OutputFormat
實例代碼:
package com.ccse.hadoop.outputformat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; public class MySelfOutputFormatApp { public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput"; public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput"; public final static String OUTPUT_FILENAME = "/abc"; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName()); job.setJarByClass(MySelfOutputFormatApp.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(MyselfOutputFormat.class); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text word = new Text(); private LongWritable writable = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { if (value != null) { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, writable); } } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { private FSDataOutputStream outputStream = null; @Override public RecordWriter<Text, LongWritable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { try { FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); //指定文件的輸出路徑 final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH + MySelfOutputFormatApp.OUTPUT_FILENAME); this.outputStream = fileSystem.create(path, false); } catch (URISyntaxException e) { e.printStackTrace(); } return new MySelfRecordWriter(outputStream); } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); } } public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { private FSDataOutputStream outputStream = null; public MySelfRecordWriter(FSDataOutputStream outputStream) { this.outputStream = outputStream; } @Override public void write(Text key, LongWritable value) throws IOException, InterruptedException { this.outputStream.writeBytes(key.toString()); this.outputStream.writeBytes("\t"); this.outputStream.writeLong(value.get()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { this.outputStream.close(); } } }
當前題目:OutputFormat在java中怎么實現(xiàn)自定義-創(chuàng)新互聯(lián)
轉(zhuǎn)載注明:http://m.rwnh.cn/article26/djidjg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、電子商務(wù)、關(guān)鍵詞優(yōu)化、網(wǎng)站設(shè)計公司、網(wǎng)站排名、用戶體驗
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容