[TOC]
站在用戶的角度思考問題,與客戶深入溝通,找到豐林網(wǎng)站設(shè)計與豐林網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都做網(wǎng)站、網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、國際域名空間、網(wǎng)絡(luò)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋豐林地區(qū)。
前面進(jìn)行過wordcount的單詞統(tǒng)計例子,關(guān)鍵是,如何對統(tǒng)計的單詞按照單詞個數(shù)來進(jìn)行排序?
如下:
scala> val retRDD = sc.textFile("hdfs://ns1/hello").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
scala> val retSortRDD = retRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))
scala> retSortRDD.collect().foreach(println)
...
(hello,3)
(me,1)
(you,1)
(he,1)
下面的測試都需要引入maven的依賴
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
需要進(jìn)行二次排序的數(shù)據(jù)格式如下:
field_1' 'field_2(使用空格分割)
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
思路下面的代碼注釋會有詳細(xì)的說明,這里要指出的是,在下面的排序過程中,分別使用Java和Scala進(jìn)行排序的操作,并且:
所以這個二次排序的例子包含Java和Scala總共5個版本的實現(xiàn),非常有價值!
其實就是SecondarySort對象,如下:
package cn.xpleaf.bigdata.spark.java.core.domain;
import scala.Serializable;
public class SecondarySort implements Comparable<SecondarySort>, Serializable {
private int first;
private int second;
public SecondarySort(int first, int second) {
this.first = first;
this.second = second;
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public int compareTo(SecondarySort that) {
int ret = this.getFirst() - that.getFirst();
if(ret == 0) {
ret = that.getSecond() - this.getSecond();
}
return ret;
}
@Override
public String toString() {
return this.first + " " + this.second;
}
}
測試代碼如下:
package cn.xpleaf.bigdata.spark.java.core.p3;
import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySort;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Serializable;
import scala.Tuple2;
import java.util.Comparator;
/**
* Java 版本的二次排序
* field_1' 'field_2(使用空格分割)
* 20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
需求:首先按照第一列升序排序,如果第一列相等,按照第二列降序排序
分析:要排序的話,使用sortByKey,也可以使用sortBy
如果用sortByKey的話,只能按照key來排序,現(xiàn)在的是用第一列做key?還是第二列?
根據(jù)需求,只能使用復(fù)合key(既包含第一列,也包含第二列),因為要進(jìn)行比較,所以該復(fù)合key必須具備比較性,要么該操作提供一個比較器
問題是查看該操作的時候,并沒有給我們提供比較器,沒得選只能讓元素具備比較性
使用自定義的對象 可以使用comprable接口
*/
public class _01SparkSecondarySortOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSecondarySortOps.class.getSimpleName());
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> linesRDD = jsc.textFile("D:/data/spark/secondsort.csv");
JavaPairRDD<SecondarySort, String> ***DD = linesRDD.mapToPair(new PairFunction<String, SecondarySort, String>() {
@Override
public Tuple2<SecondarySort, String> call(String line) throws Exception {
String[] fields = line.split(" ");
int first = Integer.valueOf(fields[0].trim());
int second = Integer.valueOf(fields[1].trim());
SecondarySort ss = new SecondarySort(first, second);
return new Tuple2<SecondarySort, String>(ss, "");
}
});
/*
// 第一種方式:使元素具備比較性
JavaPairRDD<SecondarySort, String> sbkRDD = ***DD.sortByKey(true, 1); // 設(shè)置partition為1,這樣數(shù)據(jù)才整體有序,否則只是partition中有序
*/
/**
* 第二種方式,提供比較器
* 與前面方式相反,這次是:第一列降序,第二列升序
*/
JavaPairRDD<SecondarySort, String> sbkRDD = ***DD.sortByKey(new MyComparator<SecondarySort>() {
@Override
public int compare(SecondarySort o1, SecondarySort o2) {
int ret = o2.getFirst() - o1.getFirst();
if(ret == 0) {
ret = o1.getSecond() - o2.getSecond();
}
return ret;
}
}, true, 1);
sbkRDD.foreach(new VoidFunction<Tuple2<SecondarySort, String>>() {
@Override
public void call(Tuple2<SecondarySort, String> tuple2) throws Exception {
System.out.println(tuple2._1);
}
});
jsc.close();
}
}
/**
* 做一個中間的過渡接口
* 比較需要實現(xiàn)序列化接口,否則也會報異常
* 是用到了適配器Adapter模式
* 適配器模式(Adapter Pattern)是作為兩個不兼容的接口之間的橋梁,這里就是非常好的體現(xiàn)了。
*/
interface MyComparator<T> extends Comparator<T>, Serializable{}
輸出結(jié)果如下:
740 58
730 54
530 54
203 21
74 58
73 57
71 55
71 56
70 54
70 55
70 56
70 57
70 58
70 58
63 61
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
40 511
31 42
20 21
20 53
20 522
12 211
7 8
7 82
5 6
3 4
1 2
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySort
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.reflect.ClassTag
object _05SparkSecondarySortOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkSecondarySortOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val linesRDD = sc.textFile("D:/data/spark/secondsort.csv")
/*
val ***DD:RDD[(SecondarySort, String)] = linesRDD.map(line => {
val fields = line.split(" ")
val first = Integer.valueOf(fields(0).trim())
val second = Integer.valueOf(fields(1).trim())
val ss = new SecondarySort(first, second)
(ss, "")
})
// 第一種方式,使用元素具備比較性
val sbkRDD:RDD[(SecondarySort, String)] = ***DD.sortByKey(true, 1)
sbkRDD.foreach{case (ss:SecondarySort, str:String) => { // 使用模式匹配的方式
println(ss)
}}
*/
/*
// 使用sortBy的第一種方式,基于原始的數(shù)據(jù)
val retRDD = linesRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
override def compare(x: String, y: String): Int = {
val xFields = x.split(" ")
val yFields = y.split(" ")
var ret = xFields(0).toInt - yFields(0).toInt
if(ret == 0) {
ret = yFields(1).toInt - xFields(1).toInt
}
ret
}
}, ClassTag.Object.asInstanceOf[ClassTag[String]])
*/
// 使用sortBy的第二種方式,將原始數(shù)據(jù)做轉(zhuǎn)換--->sortBy()第一個參數(shù)的作用,就是做數(shù)據(jù)的轉(zhuǎn)換
val retRDD:RDD[String] = linesRDD.sortBy(line => {
// f: (T) => K
// 這里T的類型為String,K是SecondarySort類型
val fields = line.split(" ")
val first = Integer.valueOf(fields(0).trim())
val second = Integer.valueOf(fields(1).trim())
val ss = new SecondarySort(first, second)
ss
}, true, 1)(new Ordering[SecondarySort] {
override def compare(x: SecondarySort, y: SecondarySort): Int = {
var ret = x.getFirst - y.getFirst
if(ret == 0) {
ret = y.getSecond - x.getSecond
}
ret
}
}, ClassTag.Object.asInstanceOf[ClassTag[SecondarySort]])
retRDD.foreach(println)
sc.stop()
}
}
輸出結(jié)果如下:
1 2
3 4
5 6
7 82
7 8
12 211
20 522
20 53
20 21
31 42
40 511
50 522
50 512
50 62
50 54
50 53
50 53
50 52
50 51
60 61
60 57
60 57
60 56
60 56
60 53
60 52
60 51
63 61
70 58
70 58
70 57
70 56
70 55
70 54
71 56
71 55
73 57
74 58
203 21
530 54
730 54
740 58
需求與數(shù)據(jù)說明如下:
* TopN問題的說明:
* TopN問題顯然是可以使用action算子take來完成,但是因為take需要將所有數(shù)據(jù)都拉取到Driver上才能完成操作,
* 所以Driver的內(nèi)存壓力非常大,不建議使用take.
*
* 這里要進(jìn)行TopN問題的分析,數(shù)據(jù)及需求如下:
* chinese ls 91
* english ww 56
* chinese zs 90
* chinese zl 76
* english zq 88
* chinese wb 95
* chinese sj 74
* english ts 87
* english ys 67
* english mz 77
* chinese yj 98
* english gk 96
*
* 需求:排出每個科目的前三名
下面分別使用性能很低的groupByKey和性能很好的combineByKey來進(jìn)行操作,詳細(xì)的說明已經(jīng)在代碼中給出,注意其思想非常重要,尤其是使用combineByKey來解決groupByKey出現(xiàn)的性能問題,有興趣的話,可以好好閱讀一下代碼,以及其所體現(xiàn)的思想,因為這都跟Spark本身的理論緊密相關(guān)。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* TopN問題的說明:
* TopN問題顯然是可以使用action算子take來完成,但是因為take需要將所有數(shù)據(jù)都拉取到Driver上才能完成操作,
* 所以Driver的內(nèi)存壓力非常大,不建議使用take.
*
* 這里要進(jìn)行TopN問題的分析,數(shù)據(jù)及需求如下:
* chinese ls 91
* english ww 56
* chinese zs 90
* chinese zl 76
* english zq 88
* chinese wb 95
* chinese sj 74
* english ts 87
* english ys 67
* english mz 77
* chinese yj 98
* english gk 96
*
* 需求:排出每個科目的前三名
*
* 思路:先進(jìn)行map操作轉(zhuǎn)換為(subject, name + score)的元組
* 再根據(jù)subject這個key進(jìn)行g(shù)roupByKey,這樣就可以得到gbkRDD
* 之后再對其進(jìn)行map操作,在map操作中使用treeSet得到前三名(既能控制大小,又能進(jìn)行排序)
*
* 問題:
* 上面的方案在生產(chǎn)過程中慎用
* 因為,執(zhí)行g(shù)roupByKey,會將key相同的數(shù)據(jù)都拉取到同一個partition中,再執(zhí)行操作,
* 拉取的過程是shuffle,是分布式性能殺手!再一個,如果key對應(yīng)的數(shù)據(jù)過多,很有可能造成數(shù)據(jù)傾斜,或者OOM,
* 那么就需要盡量的避免這種操作方式。
* 那如何做到?可以參考MR中TopN問題的思想,MR中,是在每個map task中對數(shù)據(jù)進(jìn)行篩選,雖然最后還是需要shuffle到一個節(jié)點上,但是數(shù)據(jù)量會大大減少。
* Spark中參考其中的思想,就是可以在每個partition中對數(shù)據(jù)進(jìn)行篩選,然后再對各個分區(qū)篩選出來的數(shù)據(jù)進(jìn)行合并,再做一次排序,從而得到最終排序的結(jié)果。
* 顯然,這樣就可以解決前面說的數(shù)據(jù)到同一個partition中導(dǎo)致數(shù)據(jù)量過大的問題!因為分區(qū)篩選的工作已經(jīng)可以大大減少數(shù)據(jù)量。
* 那么在Spark中有什么算子可以做到這一點呢?那就是combineByKey或者aggregateByKey,其具體的用法可以參考我前面的博客文章,這里我使用combineByKey來操作。
*/
object _06SparkTopNOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_06SparkTopNOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
Logger.getLogger("org.spark_project").setLevel(Level.OFF)
// 1.轉(zhuǎn)換為linesRDD
val linesRDD:RDD[String] = sc.textFile("D:/data/spark/topn.txt")
// 2.轉(zhuǎn)換為pairsRDD
val pairsRDD:RDD[(String, String)] = linesRDD.map(line => {
val fields = line.split(" ")
val subject = fields(0).trim()
val name = fields(1).trim()
val score = fields(2).trim()
(subject, name + " " + score) // ("chinese", "zs 90")
})
// 3.轉(zhuǎn)換為gbkRDD
val gbkRDD:RDD[(String, Iterable[String])] = pairsRDD.groupByKey()
println("==========TopN前==========")
gbkRDD.foreach(println)
// (english,CompactBuffer(ww 56, zq 88, ts 87, ys 67, mz 77, gk 96))
// (chinese,CompactBuffer(ls 91, zs 90, zl 76, wb 95, sj 74, yj 98))
// 4.轉(zhuǎn)換為retRDD
val retRDD:RDD[(String, Iterable[String])] = gbkRDD.map(tuple => {
var ts = new mutable.TreeSet[String]()(new MyOrdering())
val subject = tuple._1 // chinese
val nameScores = tuple._2 // ("ls 91", "ww 56", "zs 90", ...)
for(nameScore <- nameScores) { // 遍歷每一份成績"ls 91"
// 添加到treeSet中
ts.add(nameScore)
if(ts.size > 3) { // 如果大小大于3,則彈出最后一份成績
ts = ts.dropRight(1)
}
}
(subject, ts)
})
println("==========TopN后==========")
retRDD.foreach(println)
sc.stop()
}
}
// gbkRDD.map中用于排序的treeSet的排序比較規(guī)則,根據(jù)需求,應(yīng)該為降序
class MyOrdering extends Ordering[String] {
override def compare(x: String, y: String): Int = {
// x或者y的格式為:"zs 90"
val xFields = x.split(" ")
val yFields = y.split(" ")
val xScore = xFields(1).toInt
val yScore = yFields(1).toInt
val ret = yScore - xScore
ret
}
}
輸出結(jié)果如下:
==========TopN前==========
(chinese,CompactBuffer(ls 91, zs 90, zl 76, wb 95, sj 74, yj 98))
(english,CompactBuffer(ww 56, zq 88, ts 87, ys 67, mz 77, gk 96))
==========TopN后==========
(chinese,TreeSet(yj 98, wb 95, ls 91))
(english,TreeSet(gk 96, zq 88, ts 87))
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
/**
* 使用combineByKey算子來優(yōu)化前面的TopN問題
* 關(guān)于combineByKey算子的使用,可以參考我的博客文章,上面有非常詳細(xì)的例子
* 一定要掌握,因為非常重要
*/
object _07SparkTopNOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_07SparkTopNOps.getClass().getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
Logger.getLogger("org.spark_project").setLevel(Level.OFF)
// 1.轉(zhuǎn)換為linesRDD
val linesRDD:RDD[String] = sc.textFile("D:/data/spark/topn.txt")
// 2.轉(zhuǎn)換為pairsRDD
val pairsRDD:RDD[(String, String)] = linesRDD.map(line => {
val fields = line.split(" ")
val subject = fields(0).trim()
val name = fields(1).trim()
val score = fields(2).trim()
(subject, name + " " + score) // ("chinese", "zs 90")
})
println("==========TopN前==========")
pairsRDD.foreach(println)
// (chinese,sj 74)
// (chinese,ls 91)
// (english,ts 87)
// (english,ww 56)
// (english,ys 67)
// (chinese,zs 90)
// (english,mz 77)
// (chinese,zl 76)
// (chinese,yj 98)
// (english,zq 88)
// (english,gk 96)
// (chinese,wb 95)
// 3.轉(zhuǎn)換為cbkRDD
val cbkRDD:RDD[(String, mutable.TreeSet[String])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
println("==========TopN后==========")
cbkRDD.foreach(println)
// (chinese,TreeSet(yj 98, wb 95, ls 91))
// (english,TreeSet(gk 96, zq 88, ts 87))
}
// 創(chuàng)建一個容器,這里返回一個treeSet,作為每個分區(qū)中相同key的value的容器
def createCombiner(nameScore: String):mutable.TreeSet[String] = {
// nameScore格式為:"zs 90"
// 指定排序規(guī)則MyOrdering,為降序排序
val ts = new mutable.TreeSet[String]()(new MyOrdering())
ts.add(nameScore)
ts
}
// 合并分區(qū)中key相同的value,同時使用treeSet來進(jìn)行排序
def mergeValue(ts:mutable.TreeSet[String], nameScore:String):mutable.TreeSet[String] = {
ts.add(nameScore)
if(ts.size > 3) { // 如果超過3個,刪除一個再返回
ts.dropRight(1) // scala中的集合進(jìn)行操作后,本身不變,但是會返回一個新的集合
}
ts
}
// 合并不同分區(qū)中key相同的value集合,同時使用treeSet來進(jìn)行排序
def mergeCombiners(ts1:mutable.TreeSet[String], ts2:mutable.TreeSet[String]):mutable.TreeSet[String] = {
var newTS = new mutable.TreeSet[String]()(new MyOrdering())
// 將分區(qū)1中集合的value添加到新的treeSet中,同時進(jìn)行排序和控制大小
for(nameScore <- ts1) {
newTS.add(nameScore)
if(newTS.size > 3) { // 如果數(shù)量大于3,則刪除一個后再賦值給本身
newTS = newTS.dropRight(1)
}
}
// 將分區(qū)2中集合的value添加到新的treeSet中,同時進(jìn)行排序和控制大小
for(nameScore <- ts2) {
newTS.add(nameScore)
if(newTS.size > 3) { // 如果數(shù)量大于3,則刪除一個后再賦值給本身
newTS = newTS.dropRight(1)
}
}
newTS
}
}
輸出結(jié)果如下:
==========TopN前==========
(chinese,ls 91)
(chinese,sj 74)
(english,ww 56)
(english,ts 87)
(chinese,zs 90)
(english,ys 67)
(chinese,zl 76)
(english,mz 77)
(english,zq 88)
(chinese,yj 98)
(chinese,wb 95)
(english,gk 96)
==========TopN后==========
(english,TreeSet(gk 96, zq 88, ts 87))
(chinese,TreeSet(yj 98, wb 95, ls 91))
網(wǎng)頁名稱:Spark筆記整理(六):Spark高級排序與TopN問題揭密
轉(zhuǎn)載來于:http://m.rwnh.cn/article10/igpddo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供商城網(wǎng)站、Google、軟件開發(fā)、關(guān)鍵詞優(yōu)化、服務(wù)器托管、
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)