编程那点事编程那点事

专注编程入门及提高
探究程序员职业规划之道!

聚合函数

avg、sum、max、min、count、countDistinct

collect_list、collect_set

  • collect_list,就是将一个分组内,指定字段的值都收集到一起,不去重

  • collect_set,同上,但是唯一的区别是,会去重

  • 都用于将同一个分组内的指定字段的值串起来,变成一个数组,常用于行转列

Scala代码

package cn.study.spark2
import org.apache.spark.sql.SparkSession
object AggregateFunction {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
        .builder()
        .appName("AggregateFunction") 
        .master("local") 
        .config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
        .getOrCreate()
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
    val department = spark.read.json("C:\\Users\\htfeng\\Desktop\\department.json")
    // collect_list,就是将一个分组内,指定字段的值都收集到一起,不去重
    // collect_set,同上,但是唯一的区别是,会去重
    
    employee
        .join(department, $"depId" === $"id")  
        .groupBy(department("name"))
        .agg(avg(employee("salary")), sum(employee("salary")), max(employee("salary")), min(employee("salary")), count(employee("name")), countDistinct(employee("name"))) 
        .show()    
    
    // collect_list和collect_set,都用于将同一个分组内的指定字段的值串起来,变成一个数组
     // 常用于行转列
     // 比如说
     // depId=1, employee=leo
     // depId=1, employee=jack
     // depId=1, employees=[leo, jack]
    employee
         .groupBy(employee("depId"))
         .agg(collect_set(employee("name")), collect_list(employee("name"))) 
         .collect()
         .foreach(println(_))  
    
    
  }
}

其他常用函数

  • 日期函数:current_date、current_timestamp

  • 数学函数:round

  • 随机函数:rand

  • 字符串函数:concat、concat_ws

  • 自定义udf和udaf函数

package cn.study.spark2
import org.apache.spark.sql.SparkSession
object OtherFunction {
  
  def main(args: Array[String]) {
    val spark = SparkSession
        .builder()
        .appName("OtherFunction") 
        .master("local") 
        .config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
        .getOrCreate()
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
    val department = spark.read.json("C:\\Users\\htfeng\\Desktop\\department.json")
    
    // 日期函数:current_date、current_timestamp
    // 数学函数:round
    // 随机函数:rand
    // 字符串函数:concat、concat_ws
    // 自定义udf和udaf函数
    
    // http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
    
    employee
        .select(employee("name"), current_date(), current_timestamp(), rand(), round(employee("salary"), 2), concat(employee("gender"), employee("age")), concat_ws("|", employee("gender"), employee("age")))  
        .show()  
  }
  
}


未经允许不得转载: 技术文章 » 大数据 » 聚合函数