我們要自定義輸出時,首先繼承兩個抽象類,一個是 OutputFormat,一個是 RecordWriter
。前者是主要是創(chuàng)建RecordWriter,后者就是主要實現(xiàn) write方法來將kv寫入文件。
1、需求
將reduce輸出的KV中,如果key中包含特定字符串,則將其輸出到一個文件中,剩下的KV則輸出到另外的文件中。
2、源碼
源數(shù)據(jù)
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.itstar.com
http://www.itstar1.com
http://www.itstar2.com
http://www.itstar3.com
http://www.baidu.com
http://www.sin2a.com
http://www.sin2a.comw.google.com
http://www.sin2desa.com
http://www.sin2desa.comw.google.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
outputFormat
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new MyRecordWriter(taskAttemptContext);
}
}
RecordWriter
public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream startOut;
private FSDataOutputStream otherOut;
public MyRecordWriter(TaskAttemptContext job) {
try {
FileSystem fs = FileSystem.get(job.getConfiguration());
startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log"));
otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String line = key.toString();
//如果key中包含itstar就寫入到另外一個文件中
if (line.contains("itstar")) {
this.startOut.writeUTF(line);
} else {
this.otherOut.writeUTF(line);
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.startOut.close();
this.otherOut.close();
}
}
mapper
public class MyOutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
reducer
public class MyOutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
Text k = new Text();
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String line = key.toString();
line = line + "\r\n";
k.set(line);
context.write(k, NullWritable.get());
}
}
driver
ublic class MyDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyDriver.class);
job.setMapperClass(MyOutputMapper.class);
job.setReducerClass(MyOutputReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//自定義輸出的實現(xiàn)子類,也是繼承FileOutputFormat
job.setOutputFormatClass(MyOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
//這個路徑輸出的是job的執(zhí)行成功successs文件的輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
分享題目:十五、MapReduce--自定義output輸出-創(chuàng)新互聯(lián)
URL標(biāo)題:http://m.rwnh.cn/article30/dscgpo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計公司、建站公司、標(biāo)簽優(yōu)化、網(wǎng)站設(shè)計、關(guān)鍵詞優(yōu)化、網(wǎng)站策劃
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容