博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark自定义排序/分区
阅读量:4207 次
发布时间:2019-05-26

本文共 2430 字,大约阅读时间需要 8 分钟。

Spark简单应用

自定义排序

  有两种方式:【1】定义一个类,继承Ordering或者Ordered,注意该类还要with Serializable接口,因为比较的时候可能要shuffle通过网络传输,因此必须是可序列化的【2】通过隐式转换的方式

//方式一case class Men(val name:String,val faceValue:Int,val age:Int)extends Serializable{
override def toString:String=s"[name:$name,fv:$faceValue,age:$age]"}object Context {
implicit object MenOrdering extends Ordering[Men]{
override def compare(x: Men, y: Men): Int = {
if(x.faceValue>y.faceValue) 1 else if(x.faceValue==y.faceValue){
if(x.age>y.age)-1 else 1 }else -1 } }}//方式二case class Men(val name:String,val faceValue:Int,val age:Int)extends Ordered[Men] with Serializable{
override def compare(that: Men): Int = {
if(this.faceValue==that.faceValue){
that.age-this.age }else{
this.faceValue-that.faceValue } }}object Test {
def main(args:Array[String]): Unit ={
val conf=new SparkConf().setAppName("Test").setMaster("local[2]") val sc=new SparkContext(conf) val rdd=sc.parallelize(List(("xiaoming",90,28,1),("zhagnsan",90,27,2))) //import Context._ val result=rdd.sortBy(y=>Men(t._2,t._3),false).collect() sc.stop() }}

自定义分区

  Hadoop的Partitioner策略:(K.hashcode&Integer.max)%(reducer number)。HashPartitioner相同Key的数据一定在同一个Reducer中,一个Reducer中只有一个Key,可能出现Hash碰撞,解决的方式把Reducer的数量增大,则Hash碰撞的概率会减小。自定义分区需要extends Partitioner,重写两个方法:一个是numPartitions返回分区的数量,一个是getPartition。

//分区器:决定数据分到哪个分区中class MyPartitioner(ins:Array[String]) extends Partitioner{
val parMap=new mutable.HashMap[String,Int]() var count=0 for(i<-ins){
parMap+=(i->count) count+=1 } override def numPartitions:Int=ins.length override def getPartition(key:Any):Int={
parMap.getOrElse(key.toString,0) }}object Test {
def main(args:Array[String]){
val conf=new SparkConf().setAppName("Test").setMaster("local[2]") val sc=new SparkContext(conf) //rdd1将数据切分,元组中放的是(URL,1) val rdd1=sc.textFile("c://Log.log").map(line=>{
val f=line.split("\t") (f(1),1) }) val rdd2=rdd1.reduceByKey(_+_) val rdd3=rdd2.map(t=>{
val url=t._1 val host=new URL(url).getHost (host,url,t._2) }) val ints=rdd3.map(_._1).distinct().collect() val hostPartitioner=new MyPartitioner(ints) val rdd4=rdd3.partitionBy(hostPartitioner).mapPartitions(it=>{
it.toList.sortBy(_._2._2).reverse.take(2).iterator }) rdd4.saveAsTextFile("c://out") }}

转载地址:http://cyhli.baihongyu.com/

你可能感兴趣的文章
c++中的函数重载与函数指针相结合
查看>>
c++对c语言扩充和增强的几点具体体现
查看>>
c语言register关键字----最快的关键字
查看>>
c++中的封装和访问控制。
查看>>
c++中,class与struct的区别
查看>>
c++中类的声明和实现
查看>>
c++中构造函数和析构函数的概念
查看>>
c++构造函数的分类和调用
查看>>
c++编译器构造析构方案 PK 对象显示初始化方案
查看>>
拷贝构造函数调用时机第一种和第二种调用场景
查看>>
c++中copy构造函数调用的第三种情形
查看>>
c++设计模式之简单工厂模式
查看>>
c++设计模式之工厂模式
查看>>
c++设计模式之三~抽象工厂模式
查看>>
c++设计模式之单例模式
查看>>
c++设计模式之建造者模式
查看>>
c++设计模式之原型模式
查看>>
c++设计模式之适配器模式
查看>>
c++设计模式之桥接模式
查看>>
c++设计模式之一组合模式
查看>>