编程那点事编程那点事

专注编程入门及提高
探究程序员职业规划之道!

通用的load和save操作

通用的load和save操作

  • 对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中。

Java版本

DataFrame df = sqlContext.read().load("users.parquet");
df.select("name","favorite_color").write().save("namesAndFavColors.parquet");

java版本代码

package cn.spark.study.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class GenericLoadSave {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("GenericLoadSave");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame usersDF = sqlContext.read().load("hdfs://spark1:9000/users.parquet");
usersDF.select("name","favorite_color").write()
.save("hdfs://spark1:9000/namesAndFavColors.parquet"); 
}
}

Scala版本

val df = sqlContext.read.load("users.parquet")
df.select("name","favorite_color").write.save("namesAndFavColors.parquet")

java版本代码

package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object GenericLoadSave {
  def main(args: Array[String]){
    val conf = new SparkConf()
    .setAppName("GenericLoadSave")
    
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    
    val usersDF = sqlContext.read.load("hdfs://spark1:9000/users.parquet")
    usersDF.write.save("hdfs://spark1:9000/namesAndFavColors_scala")  
  }
}

手动指定数据源类型

  • 也可以手动指定用来操作的数据源类型。数据源通常需要使用其全限定名来指定,比如parquet是org.apache.spark.sql.parquet。但是Spark SQL内置了一些数据源类型,比如json,parquet,jdbc等等。实际上,通过这个功能,就可以在不同类型的数据源之间进行转换了。比如将json文件中的数据保存到parquet文件中。默认情况下,如果不指定数据源类型,那么就是parquet。

Java版本

DataFrame df = sqlContext.read().format("json").load("people.json");
df.select("name","age").write().format("parquet").save("namesAndAges.parquet");

java版本代码

package cn.spark.study.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class ManuallySpecifyOptions {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("GenericLoadSave");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame peopleDF = sqlContext.read().format("json")
.load("hdfs://spark1:9000/people.json");
peopleDF.select("name").write().format("parquet")
.save("hdfs://spark1:9000/peopleName_java");
}
}

Scala版本

val df = sqlContext.read.format("json").load("people.json")
df.select("name","age").write.format("parquet").save("namesAndAges.parquet")

Scala版本代码

package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object ManuallySpecifyOptions {
  def main(args: Array[String]){
    val conf = new SparkConf()
    .setAppName("ManuallySpecifyOptions")
    
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc);
    
    val peopleDF = sqlContext.read.format("json").load("hdfs://spark1:9000/people.json")
    peopleDF.select("name").write.format("paruet").save("hdfs://spark1:9000/people.json")
  }
}

Save Mode

  • Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。

Save Mode意义
SaveMode.ErrorIfExists (默认)如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
SaveMode.Ignore如果目标位置已经存在数据,那么就忽略,不做任何操作。

java版本代码

package cn.spark.study.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
public class SaveModeTest {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("GenericLoadSave");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame peopleDF = sqlContext.read().format("json")
.load("hdfs://spark1:9000/people.json");
peopleDF.save("hdfs://spark1:9000/people_savemode_test", "json", SaveMode.Append);
}
}


未经允许不得转载: 技术文章 » 大数据 » 通用的load和save操作