编程那点事编程那点事

专注编程入门及提高
探究程序员职业规划之道!

action操作和基础操作

action操作

collect、count、first、foreach、reduce、show、take

Scala代码示例

package cn.study.spark2
import org.apache.spark.sql.SparkSession
/**
 * action操作详解
 * 
 * collect、count、first、foreach、reduce、show、take
 * 
 */
object ActionOperation {
  
  def main(args: Array[String]) {
    val spark = SparkSession
        .builder()
        .appName("ActionOperation") 
        .master("local") 
        .config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
        .getOrCreate()
    
    import spark.implicits._
    
    val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
    
    // collect:将分布式存储在集群上的分布式数据集(比如dataset),中的所有数据都获取到driver端来
    employee.collect().foreach { println(_) }  
    // count:对dataset中的记录数进行统计个数的操作
    println(employee.count())
    // first:获取数据集中的第一条数据
    println(employee.first())  
    // foreach:遍历数据集中的每一条数据,对数据进行操作,这个跟collect不同,collect是将数据获取到driver端进行操作
    // foreach是将计算操作推到集群上去分布式执行
    // foreach(println(_))这种,真正在集群中执行的时候,是没用的,因为输出的结果是在分布式的集群中的,我们是看不到的
    employee.foreach { println(_) }  
    // reduce:对数据集中的所有数据进行归约的操作,多条变成一条
    // 用reduce来实现数据集的个数的统计
    println(employee.map(employee => 1).reduce(_ + _))
    // show,默认将dataset数据打印前20条
    employee.show()
    // take,从数据集中获取指定条数
    employee.take(3).foreach { println(_) } 
  }
  
}

基础操作

  • 持久化:cache、persist

  • 创建临时视图:createTempView、createOrReplaceTempView

  • 获取执行计划:explain

  • 查看schema:printSchema

  • 写数据到外部存储:write

  • dataset与dataframe互相转换:as、toDF

Scala代码示例

package cn.study.spark2
import org.apache.spark.sql.SparkSession
/**
 * 基础操作
 * 
 * 持久化:cache、persist
 * 创建临时视图:createTempView、createOrReplaceTempView
 * 获取执行计划:explain
 * 查看schema:printSchema
 * 写数据到外部存储:write
 * dataset与dataframe互相转换:as、toDF
 * 
 */
object BasicOperation {
  
  case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long)
  
  def main(args: Array[String]) {
    val spark = SparkSession
        .builder()
        .appName("BasicOperation") 
        .master("local") 
        .config("spark.sql.warehouse.dir", "C:\\Users\\htfeng\\Desktop\\spark-warehouse")
        .getOrCreate()
    
    import spark.implicits._
    
    val employee = spark.read.json("C:\\Users\\htfeng\\Desktop\\employee.json")
    // 持久化,在rdd部分仔细讲解过,我们这里就不展开讲了
    // 持久化,如果要对一个dataset重复计算两次的话,那么建议先对这个dataset进行持久化再进行操作,避免重复计算
    
    employee.cache()
    println(employee.count())
    println(employee.count())
    
    // 创建临时视图,主要是为了,可以直接对数据执行sql语句
    employee.createOrReplaceTempView("employee")
    spark.sql("select * from employee where age > 30").show()
    
    // 获取spark sql的执行计划
    // dataframe/dataset,比如执行了一个sql语句获取的dataframe,实际上内部包含一个logical plan,逻辑执行计划
    // 设计执行的时候,首先会通过底层的catalyst optimizer,生成物理执行计划,比如说会做一些优化,比如push filter
    // 还会通过whole-stage code generation技术去自动化生成代码,提升执行性能
    spark.sql("select * from employee where age > 30").explain()
    
    // employee.printSchema()
    
        // 以前给大家演示过,写hdfs是肯定没有问题的
//    val employeeWithAgeGreaterThen30DF = spark.sql("select * from employee where age > 30")
//    employeeWithAgeGreaterThen30DF.write.json("C:\\Users\\htfeng\\Desktop\\employeeWithAgeGreaterThen30DF.json")
      
    val employeeDS = employee.as[Employee]
    employeeDS.show()
    employeeDS.printSchema()
    
    val employeeDF = employeeDS.toDF()
    employeeDF.show()
    employeeDF.printSchema()
    
  }
}


未经允许不得转载: 技术文章 » 大数据 » action操作和基础操作