avg、sum、max、min、count、countDistinct
collect_list、collect_set
collect_list,就是将一个分组内,指定字段的值都收集到一起,不去重
collect_set,同上,但是唯一的区别是,会去重
都用于将同一个分组内的指定字段的值串起来,变成一个数组,常用于行转列
Scala代码
package cn.study.spark2
import org.apache.spark.sql.SparkSession
object AggregateFunction {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("AggregateFunction")
.master("local")
.config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
val department = spark.read.json("C:\\Users\\htfeng\\Desktop\\department.json")
// collect_list,就是将一个分组内,指定字段的值都收集到一起,不去重
// collect_set,同上,但是唯一的区别是,会去重
employee
.join(department, $"depId" === $"id")
.groupBy(department("name"))
.agg(avg(employee("salary")), sum(employee("salary")), max(employee("salary")), min(employee("salary")), count(employee("name")), countDistinct(employee("name")))
.show()
// collect_list和collect_set,都用于将同一个分组内的指定字段的值串起来,变成一个数组
// 常用于行转列
// 比如说
// depId=1, employee=leo
// depId=1, employee=jack
// depId=1, employees=[leo, jack]
employee
.groupBy(employee("depId"))
.agg(collect_set(employee("name")), collect_list(employee("name")))
.collect()
.foreach(println(_))
}
}其他常用函数
日期函数:current_date、current_timestamp
数学函数:round
随机函数:rand
字符串函数:concat、concat_ws
自定义udf和udaf函数
package cn.study.spark2
import org.apache.spark.sql.SparkSession
object OtherFunction {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("OtherFunction")
.master("local")
.config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
val department = spark.read.json("C:\\Users\\htfeng\\Desktop\\department.json")
// 日期函数:current_date、current_timestamp
// 数学函数:round
// 随机函数:rand
// 字符串函数:concat、concat_ws
// 自定义udf和udaf函数
// http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
employee
.select(employee("name"), current_date(), current_timestamp(), rand(), round(employee("salary"), 2), concat(employee("gender"), employee("age")), concat_ws("|", employee("gender"), employee("age")))
.show()
}
}