Socket:之前的wordcount例子,已经演示过了,StreamingContext.socketTextStream()
HDFS文件
基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory) streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。
基于HDFS的实时wordcount程序
Java版本代码
package cn.spark.study.streaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; /** * 基于HDFS文件的实时wordcount程序 * @author Administrator * */ public class HDFSWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("HDFSWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流 JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir"); // 执行wordcount操作 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }
Scala版本代码
package cn.spark.study.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object HDFSWordCount { def main(args: Array[String]){ val conf = new SparkConf() .setMaster("local[2]") .setAppName("HDFSWordCount") val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir") val words = lines.flatMap { _.split(" ") } val pairs = words.map { word => (word, 1) } val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }