本文共 2430 字,大约阅读时间需要 8 分钟。
有两种方式:【1】定义一个类,继承Ordering或者Ordered,注意该类还要with Serializable接口,因为比较的时候可能要shuffle通过网络传输,因此必须是可序列化的【2】通过隐式转换的方式
//方式一case class Men(val name:String,val faceValue:Int,val age:Int)extends Serializable{ override def toString:String=s"[name:$name,fv:$faceValue,age:$age]"}object Context { implicit object MenOrdering extends Ordering[Men]{ override def compare(x: Men, y: Men): Int = { if(x.faceValue>y.faceValue) 1 else if(x.faceValue==y.faceValue){ if(x.age>y.age)-1 else 1 }else -1 } }}//方式二case class Men(val name:String,val faceValue:Int,val age:Int)extends Ordered[Men] with Serializable{ override def compare(that: Men): Int = { if(this.faceValue==that.faceValue){ that.age-this.age }else{ this.faceValue-that.faceValue } }}object Test { def main(args:Array[String]): Unit ={ val conf=new SparkConf().setAppName("Test").setMaster("local[2]") val sc=new SparkContext(conf) val rdd=sc.parallelize(List(("xiaoming",90,28,1),("zhagnsan",90,27,2))) //import Context._ val result=rdd.sortBy(y=>Men(t._2,t._3),false).collect() sc.stop() }}
Hadoop的Partitioner策略:(K.hashcode&Integer.max)%(reducer number)。HashPartitioner相同Key的数据一定在同一个Reducer中,一个Reducer中只有一个Key,可能出现Hash碰撞,解决的方式把Reducer的数量增大,则Hash碰撞的概率会减小。自定义分区需要extends Partitioner,重写两个方法:一个是numPartitions返回分区的数量,一个是getPartition。
//分区器:决定数据分到哪个分区中class MyPartitioner(ins:Array[String]) extends Partitioner{ val parMap=new mutable.HashMap[String,Int]() var count=0 for(i<-ins){ parMap+=(i->count) count+=1 } override def numPartitions:Int=ins.length override def getPartition(key:Any):Int={ parMap.getOrElse(key.toString,0) }}object Test { def main(args:Array[String]){ val conf=new SparkConf().setAppName("Test").setMaster("local[2]") val sc=new SparkContext(conf) //rdd1将数据切分,元组中放的是(URL,1) val rdd1=sc.textFile("c://Log.log").map(line=>{ val f=line.split("\t") (f(1),1) }) val rdd2=rdd1.reduceByKey(_+_) val rdd3=rdd2.map(t=>{ val url=t._1 val host=new URL(url).getHost (host,url,t._2) }) val ints=rdd3.map(_._1).distinct().collect() val hostPartitioner=new MyPartitioner(ints) val rdd4=rdd3.partitionBy(hostPartitioner).mapPartitions(it=>{ it.toList.sortBy(_._2._2).reverse.take(2).iterator }) rdd4.saveAsTextFile("c://out") }}
转载地址:http://cyhli.baihongyu.com/