Spark 1.5.x版本引入的内置函数
在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。
| 种类 | 函数 |
|---|---|
| 聚合函数 | approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct |
| 集合函数 | array_contains, explode, size, sort_array |
| 日期/时间函数 | 日期时间转换 unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp 从日期时间中提取字段 year, month, dayofmonth, hour, minute, second |
| 日期/时间函数 | 日期/时间计算 datediff, date_add, date_sub, add_months, last_day, next_day, months_between 获取当前时间等 current_date, current_timestamp, trunc, date_format |
| 数学函数 | abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex |
| 混合函数 | array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when |
| 字符串函数 | ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper |
| 窗口函数 | cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber |
案例实战:根据每天的用户访问日志和用户购买日志,统计每日的uv和销售额
Scala版本countDistinct代码
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._
object DailyUV {
def main(args: Array[String]){
val conf = new SparkConf()
.setMaster("local")
.setAppName("DailyUV")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// 这里着重说明一下!!!
// 要使用Spark SQL的内置函数,就必须在这里导入SQLContext下的隐式转换
import sqlContext.implicits._
// 构造用户访问日志数据,并创建DataFrame
// 模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id
val userAccessLog = Array(
"2015-10-01,1122",
"2015-10-01,1122",
"2015-10-01,1123",
"2015-10-01,1124",
"2015-10-01,1124",
"2015-10-02,1122",
"2015-10-02,1121",
"2015-10-02,1123",
"2015-10-02,1123");
val userAccessLogRDD = sc.parallelize(userAccessLog, 5)
// 将模拟出来的用户访问日志RDD,转换为DataFrame
// 首先,将普通的RDD,转换为元素为Row的RDD
val userAccessLogRowRDD = userAccessLogRDD
.map{ log => Row(log.split(",")(0), log.split(",")(1).toInt)}
// 构造DataFrame的元数据
val structType = StructType(Array(
StructField("date", StringType, true),
StructField("userid", IntegerType, true)))
// 使用SQLContext创建DataFrame
val userAccessLogRowDF = sqlContext.createDataFrame(userAccessLogRowRDD, structType)
// 这里讲解一下uv的基本含义和业务
// 每天都有很多用户来访问,但是每个用户可能每天都会访问很多次
// 所以,uv,指的是,对用户进行去重以后的访问总数
// 这里,正式开始使用Spark 1.5.x版本提供的最新特性,内置函数,countDistinct
// 讲解一下聚合函数的用法
// 首先,对DataFrame调用groupBy()方法,对某一列进行分组
// 然后,调用agg()方法 ,第一个参数,必须,必须,传入之前在groupBy()方法中出现的字段
// 第二个参数,传入countDistinct、sum、first等,Spark提供的内置函数
// 内置函数中,传入的参数,也是用单引号作为前缀的,其他的字段
userAccessLogRowDF.groupBy("date")
.agg('date, countDistinct('userid))
.map { row => Row(row(1), row(2)) }
.collect()
.foreach(println)
}
}Scala版本sum代码
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.functions._
object DailySale {
def main(args: Array[String]){
val conf = new SparkConf()
.setMaster("local")
.setAppName("DailyUV")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 模拟数据
val userSaleLog = Array("2015-10-01,55.05,1122",
"2015-10-01,23.15,1133",
"2015-10-01,15.20,",
"2015-10-02,56.05,1144",
"2015-10-02,78.87,1155",
"2015-10-02,113.02,1123")
val userSaleLogRDD = sc.parallelize(userSaleLog, 5)
// 进行有效销售日志的过滤
val filteredUserSaleLogRDD = userSaleLogRDD
.filter {log =>if (log.split(",").length == 3) true else false }
val userSaleLogRowRDD = filteredUserSaleLogRDD
.map { log => Row(log.split(",")(0), log.split(",")(1).toDouble) }
val structType = StructType(Array(
StructField("date", StringType, true),
StructField("sale_amount", DoubleType, true)))
val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)
userSaleLogDF.groupBy("date")
.agg('date, sum('sale_amount))
.map{ row => Row(row(1),row(2)) }
.collect()
.foreach(println)
}
}开窗函数案例:统计每个种类的销售额排名前3的产品
java版本代码
package cn.spark.study.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
public class RowNumberWindowFunction {
@SuppressWarnings("deprecation")
public static void main(String[] arg){
SparkConf conf = new SparkConf()
.setAppName("JDBCDataSource");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
// 创建销售额表,sales表
hiveContext.sql("DROP TABLE IF EXISTS sales");
hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
+ "product STRING,"
+ "category STRING,"
+ "revenue BIGINT)");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "
+ "INTO TABLE sales");
// 开始编写我们的统计逻辑,使用row_number()开窗函数
// 先说明一下,row_number()开窗函数的作用
// 其实,就是给每个分组的数据,按照其排序顺序,打上一个分组内的行号
// 比如说,有一个分组date=20151001,里面有3条数据,1122,1121,1124,
// 那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号
// 行号从1开始递增,比如1122 1,1121 2,1124 3
DataFrame top3SalesDF = hiveContext.sql(""
+ "SELECT product, category, revenue "
+ "FROM ("
+ "SELECT "
+ "Product,"
+ "category,"
+ "revenue,"
// row_number()开窗函数的语法说明
// 首先可以,在SELECT查询时,使用row_number()函数
// 其次,row_number()函数后面先跟上OVER关键字
// 然后括号中,是PARTITION BY,也就是说根据哪个字段进行分组
// 其次是可以用ORDER BY进行组内排序
// 然后row_number()就可以给每个组内的行,一个组内行号
+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
+ "FROM sales "
+ ") tmp_sales "
+ "WHERE rank<=3");
// 将每组排名前3的数据,保存到一个表中
hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
top3SalesDF.saveAsTable("top3_sales");
sc.close();
}
}
编程那点事
