现在,网上基于spark的代码基本上都是Scala,很多书上也都是基于Scala,没办法,谁叫spark是Scala写出来的了,但是我现在还没系统的学习Scala,所以只能用java写spark程序了,spark支持java,而且Scala也基于JVM,不说了,直接上代码
这是官网上给出的例子,大数据学习中经典案例单词计数
在linux下一个终端 输入 $ nc -lk 9999
然后运行下面的代码
package com.tg.spark.streamimport java.util.Arraysimport org.apache.spark.*
import org.apache.spark.api.java.function.*
import org.apache.spark.streaming.*
import org.apache.spark.streaming.api.java.*
import scala.Tuple2
public class SparkStream {public static void main(String[] args) {// Create a local StreamingContext with two working thread and batch// interval of 1 secondSparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory","2147480000")JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))System.out.println(jssc)// Create a DStream that will connect to hostname:port, like// localhost:9999JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999)//JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream")// Split each line into wordsJavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {System.out.println(Arrays.asList(x.split(" ")).get(0))return Arrays.asList(x.split(" "))}})// Count each word in each batchJavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1)}})System.out.println(pairs)JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2}})// Print the first ten elements of each RDD generated in this DStream to// the consolewordCounts.print()//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>())wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark")//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable)//System.out.println(wordCounts.count())jssc.start()//System.out.println(wordCounts.count())jssc.awaitTermination()}}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
然后再刚刚的终端输入 hello world
# TERMINAL 1:
# Running Netcat$ nc -lk 9999hello world
就可以通过控制台看到
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
并且hdfs上也可以看到通过计算生成的实时文件
第二个案例是,不是通过socketTextStream套接字,而是直接通过hdfs上的某个文件目录来作为输入数据源
package com.tg.spark.streamimport java.util.Arraysimport org.apache.spark.*
import org.apache.spark.api.java.function.*
import org.apache.spark.streaming.*
import org.apache.spark.streaming.api.java.*
import scala.Tuple2
public class SparkStream2 {public static void main(String[] args) {// Create a local StreamingContext with two working thread and batch// interval of 1 secondSparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory","2147480000")JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))System.out.println(jssc)// Create a DStream that will connect to hostname:port, like// localhost:9999//JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999)JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream")// Split each line into wordsJavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {System.out.println(Arrays.asList(x.split(" ")).get(0))return Arrays.asList(x.split(" "))}})// Count each word in each batchJavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1)}})System.out.println(pairs)JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2}})// Print the first ten elements of each RDD generated in this DStream to// the consolewordCounts.print()//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>())wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark")//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable)//System.out.println(wordCounts.count())jssc.start()//System.out.println(wordCounts.count())jssc.awaitTermination()}}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
这样就存在端口一直在监控你的那个目录,只要它有文件生成,就会马上读取到它里面的内容,你可以先运行程序,然后手动添加一个文件到刚刚的目录,就可以看到输出结果了
码字不易,转载请指明出处http://blog.csdn.net/tanggao1314/article/details/51606721
参考
spark编程指南