Spark 1.5.x版本引入的内置函数
在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。
种类 | 函数 |
---|---|
聚合函数 | approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct |
集合函数 | array_contains, explode, size, sort_array |
日期/时间函数 | 日期时间转换 unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp 从日期时间中提取字段 year, month, dayofmonth, hour, minute, second |
日期/时间函数 | 日期/时间计算 datediff, date_add, date_sub, add_months, last_day, next_day, months_between 获取当前时间等 current_date, current_timestamp, trunc, date_format |
数学函数 | abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex |
混合函数 | array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when |
字符串函数 | ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper |
窗口函数 | cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber |
案例实战:根据每天的用户访问日志和用户购买日志,统计每日的uv和销售额
Scala版本countDistinct代码
package cn.spark.study.sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.functions._ object DailyUV { def main(args: Array[String]){ val conf = new SparkConf() .setMaster("local") .setAppName("DailyUV") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // 这里着重说明一下!!! // 要使用Spark SQL的内置函数,就必须在这里导入SQLContext下的隐式转换 import sqlContext.implicits._ // 构造用户访问日志数据,并创建DataFrame // 模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id val userAccessLog = Array( "2015-10-01,1122", "2015-10-01,1122", "2015-10-01,1123", "2015-10-01,1124", "2015-10-01,1124", "2015-10-02,1122", "2015-10-02,1121", "2015-10-02,1123", "2015-10-02,1123"); val userAccessLogRDD = sc.parallelize(userAccessLog, 5) // 将模拟出来的用户访问日志RDD,转换为DataFrame // 首先,将普通的RDD,转换为元素为Row的RDD val userAccessLogRowRDD = userAccessLogRDD .map{ log => Row(log.split(",")(0), log.split(",")(1).toInt)} // 构造DataFrame的元数据 val structType = StructType(Array( StructField("date", StringType, true), StructField("userid", IntegerType, true))) // 使用SQLContext创建DataFrame val userAccessLogRowDF = sqlContext.createDataFrame(userAccessLogRowRDD, structType) // 这里讲解一下uv的基本含义和业务 // 每天都有很多用户来访问,但是每个用户可能每天都会访问很多次 // 所以,uv,指的是,对用户进行去重以后的访问总数 // 这里,正式开始使用Spark 1.5.x版本提供的最新特性,内置函数,countDistinct // 讲解一下聚合函数的用法 // 首先,对DataFrame调用groupBy()方法,对某一列进行分组 // 然后,调用agg()方法 ,第一个参数,必须,必须,传入之前在groupBy()方法中出现的字段 // 第二个参数,传入countDistinct、sum、first等,Spark提供的内置函数 // 内置函数中,传入的参数,也是用单引号作为前缀的,其他的字段 userAccessLogRowDF.groupBy("date") .agg('date, countDistinct('userid)) .map { row => Row(row(1), row(2)) } .collect() .foreach(println) } }
Scala版本sum代码
package cn.spark.study.sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.functions._ object DailySale { def main(args: Array[String]){ val conf = new SparkConf() .setMaster("local") .setAppName("DailyUV") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 模拟数据 val userSaleLog = Array("2015-10-01,55.05,1122", "2015-10-01,23.15,1133", "2015-10-01,15.20,", "2015-10-02,56.05,1144", "2015-10-02,78.87,1155", "2015-10-02,113.02,1123") val userSaleLogRDD = sc.parallelize(userSaleLog, 5) // 进行有效销售日志的过滤 val filteredUserSaleLogRDD = userSaleLogRDD .filter {log =>if (log.split(",").length == 3) true else false } val userSaleLogRowRDD = filteredUserSaleLogRDD .map { log => Row(log.split(",")(0), log.split(",")(1).toDouble) } val structType = StructType(Array( StructField("date", StringType, true), StructField("sale_amount", DoubleType, true))) val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType) userSaleLogDF.groupBy("date") .agg('date, sum('sale_amount)) .map{ row => Row(row(1),row(2)) } .collect() .foreach(println) } }
开窗函数案例:统计每个种类的销售额排名前3的产品
java版本代码
package cn.spark.study.sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; public class RowNumberWindowFunction { @SuppressWarnings("deprecation") public static void main(String[] arg){ SparkConf conf = new SparkConf() .setAppName("JDBCDataSource"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); // 创建销售额表,sales表 hiveContext.sql("DROP TABLE IF EXISTS sales"); hiveContext.sql("CREATE TABLE IF NOT EXISTS sales (" + "product STRING," + "category STRING," + "revenue BIGINT)"); hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' " + "INTO TABLE sales"); // 开始编写我们的统计逻辑,使用row_number()开窗函数 // 先说明一下,row_number()开窗函数的作用 // 其实,就是给每个分组的数据,按照其排序顺序,打上一个分组内的行号 // 比如说,有一个分组date=20151001,里面有3条数据,1122,1121,1124, // 那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号 // 行号从1开始递增,比如1122 1,1121 2,1124 3 DataFrame top3SalesDF = hiveContext.sql("" + "SELECT product, category, revenue " + "FROM (" + "SELECT " + "Product," + "category," + "revenue," // row_number()开窗函数的语法说明 // 首先可以,在SELECT查询时,使用row_number()函数 // 其次,row_number()函数后面先跟上OVER关键字 // 然后括号中,是PARTITION BY,也就是说根据哪个字段进行分组 // 其次是可以用ORDER BY进行组内排序 // 然后row_number()就可以给每个组内的行,一个组内行号 + "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank " + "FROM sales " + ") tmp_sales " + "WHERE rank<=3"); // 将每组排名前3的数据,保存到一个表中 hiveContext.sql("DROP TABLE IF EXISTS top3_sales"); top3SalesDF.saveAsTable("top3_sales"); sc.close(); } }