`

spark sql自定义函数udf

阅读更多
 def visitview(vtimes : Iterable[String]): Long ={
    var times = 0L
    if(vtimes.size == 0){
    }else{
      val lb = scala.collection.mutable.ListBuffer.empty[String]
      for(vtime <- vtimes){
        lb.append(vtime)
      }
      times = 1L
      val list = lb.toList.sorted
      for(i <- 0 to list.size-2){
        if(list(i+1).toLong - list(i).toLong > DEFAULT_VISIT_TIMEOUT){
          times = times + 1
        }
      }
    }
    times
  }

 

    val sparkConf = new SparkConf().setAppName("ChexunHourCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    sqlContext.udf.register("visitview",ConstantUtil.visitview _)

 

case class Loging(vtime:Long, userIp:Long, muid:String, uref:String, ucp:String)
val df = file.map(_.split("\t")).filter(x=>ConstantUtil.isNotPromote(x(3))).filter(y=>ConstantUtil.isNotPromote(y(2))).map(t => Loging(t(9).toLong,t(8).toLong,t(1),t(2),t(3))).toDF()
df.registerTempTable("loging")
val vvCount = sqlContext.sql("select sum(visitview(vtime)) from loging group by muid  limit 10 ").collect()

    1、自定义函数

    2、注册函数

    3、在spark sql中使用自定义函数

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics