typed操作
coalesce、repartition
都是用来重新定义分区的
区别在于:coalesce,只能用于减少分区数量,而且可以选择不发生shuffle
repartiton,可以增加分区,也可以减少分区,必须会发生shuffle,相当于是进行了一次重分区操作
distinct、dropDuplicates
都是用来进行去重的,区别在哪儿呢?
distinct,是根据每一条数据,进行完整内容的比对和去重
dropDuplicates,可以根据指定的字段进行去重
except、filter、intersect
except:获取在当前dataset中有,但是在另外一个dataset中没有的元素
filter:根据我们自己的逻辑,如果返回true,那么就保留该元素,否则就过滤掉该元素
intersect:获取两个数据集的交集
map、flatMap、mapPartitions
map:将数据集中的每条数据都做一个映射,返回一条新数据
flatMap:数据集中的每条数据都可以返回多条数据
mapPartitions:一次性对一个partition中的数据进行处理
joinWith
sort
randomSplit、sample
Scala代码
package cn.study.spark2
import org.apache.spark.sql.SparkSession
/**
* typed操作
*/
object TypedOperation {
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long)
case class Department(id: Long, name: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("TypedOperation")
.master("local")
.config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
.getOrCreate()
import spark.implicits._
val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
val employee2 = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee2.json")
val department = spark.read.json("C:\\Users\\htfeng\\Desktop\\department.json")
val employeeDS = employee.as[Employee]
val employeeDS2 = employee2.as[Employee]
val departmentDS = department.as[Department]
// println(employeeDS.rdd.partitions.size)
//
// // coalesce和repartition操作
// // 都是用来重新定义分区的
// // 区别在于:coalesce,只能用于减少分区数量,而且可以选择不发生shuffle
// // repartiton,可以增加分区,也可以减少分区,必须会发生shuffle,相当于是进行了一次重分区操作
// val employeeDSRepartitioned = employeeDS.repartition(7);
// println(employeeDSRepartitioned.rdd.partitions.size)
//
//
// val employeeDSCoalesced = employeeDSRepartitioned.coalesce(3)
// println(employeeDSCoalesced.rdd.partitions.size)
//
// employeeDSCoalesced.show()
// distinct和dropDuplicates
// 都是用来进行去重的,区别在哪儿呢?
// distinct,是根据每一条数据,进行完整内容的比对和去重
// dropDuplicates,可以根据指定的字段进行去重
// val distinctEmployeeDS = employeeDS.distinct()
// distinctEmployeeDS.show()
// val dropDuplicatesEmployeeDS = employeeDS.dropDuplicates(Seq("name"))
// dropDuplicatesEmployeeDS.show()
// except:获取在当前dataset中有,但是在另外一个dataset中没有的元素
// filter:根据我们自己的逻辑,如果返回true,那么就保留该元素,否则就过滤掉该元素
// intersect:获取两个数据集的交集
// employeeDS.except(employeeDS2).show()
// employeeDS.filter { employee => employee.age > 30 }.show()
// employeeDS.intersect(employeeDS2).show()
// map:将数据集中的每条数据都做一个映射,返回一条新数据
// flatMap:数据集中的每条数据都可以返回多条数据
// mapPartitions:一次性对一个partition中的数据进行处理
// employeeDS.map { employee => (employee.name, employee.salary + 100) }.show()
// departmentDS.flatMap{
// department => Seq(Department(department.id + 1, department.name + "_1"), Department(department.id + 2, department.name + "_2"))
// }.show()
//
// employeeDS.mapPartitions{
// employees => {
// val result = scala.collection.mutable.ArrayBuffer[(String, Long)]()
// while(employees.hasNext){
// var emp = employees.next()
// result += ((emp.name, emp.salary + 1000))
// }
// result.iterator
// }
// }.show()
// employeeDS.joinWith(departmentDS, $"depId" === $"id").show()
//
// employeeDS.sort($"salary".desc).show()
val employeeDSArr = employeeDS.randomSplit(Array(3, 10, 20))
employeeDSArr.foreach { ds => ds.show() }
employeeDS.sample(false, 0.3).show()
}
}untyped操作
select、where、groupBy、agg、col、join
Scala代码
package cn.study.spark2
import org.apache.spark.sql.SparkSession
/*
* untyped操作
* sql语法
* select
* where
* join
* groupBy
* agg
*/
object UntypedOperation {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("UntypedOperation")
.master("local")
.config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
val department = spark.read.json("C:\\Users\\htfeng\\Desktop\\department.json")
employee
.where("age > 20")
.join(department, $"depId" === $"id")
.groupBy(department("name"), employee("gender"))
.agg(avg(employee("salary")))
.show()
employee
.select($"name", $"depId", $"salary")
.where("age > 20")
.show()
}
}