您现在的位置是:主页 > news > 每日新闻/厦门网站搜索引擎优化

每日新闻/厦门网站搜索引擎优化

admin2025/6/22 16:54:18news

简介每日新闻,厦门网站搜索引擎优化,做网站就上凡科建设,做网站的公司主要做shm现在,网上基于spark的代码基本上都是Scala,很多书上也都是基于Scala,没办法,谁叫spark是Scala写出来的了,但是我现在还没系统的学习Scala,所以只能用java写spark程序了,spark支持java&#xff0…

每日新闻,厦门网站搜索引擎优化,做网站就上凡科建设,做网站的公司主要做shm现在,网上基于spark的代码基本上都是Scala,很多书上也都是基于Scala,没办法,谁叫spark是Scala写出来的了,但是我现在还没系统的学习Scala,所以只能用java写spark程序了,spark支持java&#xff0…

现在,网上基于spark的代码基本上都是Scala,很多书上也都是基于Scala,没办法,谁叫spark是Scala写出来的了,但是我现在还没系统的学习Scala,所以只能用java写spark程序了,spark支持java,而且Scala也基于JVM,不说了,直接上代码

这是官网上给出的例子,大数据学习中经典案例单词计数 
在linux下一个终端 输入 $ nc -lk 9999

然后运行下面的代码

package com.tg.spark.stream;import java.util.Arrays;import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
/*** * @author 汤高**/
public class SparkStream {public static void main(String[] args) {// Create a local StreamingContext with two working thread and batch// interval of 1 secondSparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory","2147480000");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));System.out.println(jssc);// Create a DStream that will connect to hostname:port, like// localhost:9999JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);//JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream");// Split each line into wordsJavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {System.out.println(Arrays.asList(x.split(" ")).get(0));return Arrays.asList(x.split(" "));}});// Count each word in each batchJavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}});System.out.println(pairs);JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});// Print the first ten elements of each RDD generated in this DStream to// the consolewordCounts.print();//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>());wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark");//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable);//System.out.println(wordCounts.count());jssc.start(); //System.out.println(wordCounts.count());// Start the computationjssc.awaitTermination();   // Wait for the computation to terminate}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

然后再刚刚的终端输入 hello world


# TERMINAL 1:
# Running Netcat$ nc -lk 9999hello world
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

就可以通过控制台看到

-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

并且hdfs上也可以看到通过计算生成的实时文件

第二个案例是,不是通过socketTextStream套接字,而是直接通过hdfs上的某个文件目录来作为输入数据源

package com.tg.spark.stream;import java.util.Arrays;import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
/*** * @author 汤高**/
public class SparkStream2 {public static void main(String[] args) {// Create a local StreamingContext with two working thread and batch// interval of 1 secondSparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount").set("spark.testing.memory","2147480000");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));System.out.println(jssc);// Create a DStream that will connect to hostname:port, like// localhost:9999//JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/stream");// Split each line into wordsJavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String x) {System.out.println(Arrays.asList(x.split(" ")).get(0));return Arrays.asList(x.split(" "));}});// Count each word in each batchJavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}});System.out.println(pairs);JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});// Print the first ten elements of each RDD generated in this DStream to// the consolewordCounts.print();//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark", new Text(), new IntWritable(), JavaPairDStream<Text,IntWritable>());wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/testFile/", "spark");//wordCounts.saveAsHadoopFiles("hdfs://master:9000/testFile/", "spark",Text,IntWritable);//System.out.println(wordCounts.count());jssc.start(); //System.out.println(wordCounts.count());// Start the computationjssc.awaitTermination();   // Wait for the computation to terminate}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

这样就存在端口一直在监控你的那个目录,只要它有文件生成,就会马上读取到它里面的内容,你可以先运行程序,然后手动添加一个文件到刚刚的目录,就可以看到输出结果了

码字不易,转载请指明出处http://blog.csdn.net/tanggao1314/article/details/51606721

参考 
spark编程指南