spark streaming可以从任何数据源来接收数据,哪怕是除了它内置支持的数据源以外的其他数据源(比如flume、kafka、socket等)。
如果我们想要从spark streaming没有内置支持的数据源中接收实时数据,那么我们需要自己实现一个receiver。
实现一个自定义的receiver
一个自定义的receiver必须实现以下两个方法:onStart()、onStop()。onStart()和onStop()方法必须不能阻塞数据,一般来说,
onStart()方法会启动负责接收数据的线程,onStop()方法会确保之前启动的线程都已经停止了。负责接收数据的线程可以调用isStopped()方法来检查它们是否应该停止接收数据。
一旦数据被接收了,就可以调用store(data)方法,数据就可以被存储在Spark内部。有一系列的store()重载方法供我们调用,来将数据每次一条进行存储,或是每次存储一个集合或序列化的数据。
接收线程中的任何异常都应该被捕获以及正确处理,从而避免receiver的静默失败。restart()方法会通过异步地调用onStop()和onStart()方法来重启receiver。stop()方法会调用onStop()方法来停止receiver。reportError()方法会汇报一个错误消息给driver,但是不停止或重启receiver。
package cn.spark.study.streaming.upgrade; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.ConnectException; import java.net.Socket; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; public class JavaCustomReceiver extends Receiver<String> { private static final long serialVersionUID = 3184893502610667539L; String host = null; int port = -1; public JavaCustomReceiver(String host_ , int port_) { super(StorageLevel.MEMORY_AND_DISK_2()); host = host_; port = port_; } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { receive(); } }.start(); } public void onStop() { } /** Create a socket connection and receive data until receiver is stopped */ private void receive() { Socket socket = null; String userInput = null; try { // connect to the server socket = new Socket(host, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // Until stopped or connection broken continue reading while (!isStopped() && (userInput = reader.readLine()) != null) { System.out.println("Received data '" + userInput + "'"); store(userInput); } reader.close(); socket.close(); // Restart in an attempt to connect again when server is active again restart("Trying to connect again"); } catch(ConnectException ce) { // restart if could not connect to server restart("Could not connect", ce); } catch(Throwable t) { // restart if there is any other error restart("Error receiving data", t); } } }
在spark streaming中使用自定义的receiver
JavaDStream<String> lines = jssc.receiverStream(new JavaCustomReceiver(host, port)); Java代码 package cn.spark.study.streaming.upgrade; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class CustomReceiverWordCount { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("wordcount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaDStream<String> lines = jssc.receiverStream(new JavaCustomReceiver("localhost", 9999)); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }
安装网络服务工具
wget http://vault.centos.org/6.3/os/i386/Packages/nc-1.84-22.el6.i686.rpm rpm -iUv nc-1.84-22.el6.i686.rpm nc -lk 9999