您现在的位置是:主页 > news > 天津做网站费用/站长工具日本

天津做网站费用/站长工具日本

admin2025/5/13 15:05:15news

简介天津做网站费用,站长工具日本,黄冈市建设局官方网站,.net做网站之前设置对于前两篇,学习了MapReduce的四大组件。这一篇写一下MapReduce的序列化和排序的写法。 一、序列化机制 在MapReduce里,难免会有用到对象的时候,另外,因为集群工作过程中需要用到RPC操作,并且mapTask和reduceTask之间…

天津做网站费用,站长工具日本,黄冈市建设局官方网站,.net做网站之前设置对于前两篇,学习了MapReduce的四大组件。这一篇写一下MapReduce的序列化和排序的写法。 一、序列化机制 在MapReduce里,难免会有用到对象的时候,另外,因为集群工作过程中需要用到RPC操作,并且mapTask和reduceTask之间…

  对于前两篇,学习了MapReduce的四大组件。这一篇写一下MapReduce的序列化和排序的写法。

一、序列化机制

  在MapReduce里,难免会有用到对象的时候,另外,因为集群工作过程中需要用到RPC操作,并且mapTaskreduceTask之间也是通过http请求来传输,所以想要MapReduce处理的对象类,必须可以进行序列化/反序列化操作。Hadoop并没有使用Java原生的序列化,它的底层其实是通过AVRO实现序列化/反序列化,并且在其基础上提供了便捷API。

1、默认MapReduce类型

  如下图:
在这里插入图片描述

2、实现序列化的方法

   现在有这样一个实例来引出这个序列化机制:

  如下文件内容,从左往右依次表示电话号码名称地区流量,现在需要统计这一个人在某地的流量总和:

12321445 zs bj 343
12321312 ww sh 234
12321445 zs bj 343
32434343 ls sz 232

具体实现:
  如果要自定义类型序列化,需要实现Writable接口,并且重写其中的方法。

(1)首先需要定义一个javaBean用来将这些具体的信息做序列化,如下:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;public class Flow implements Writable{private String phone;private String name;private String addr;private int flow;/*** 序列化方法* 如果是String,则调用writeUTF()方法* 此外还有writeInt()、writeLong()、writeDouble()等方法。*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phone);out.writeUTF(name);out.writeUTF(addr);out.writeInt(flow);}/*** 反序列化一定要注意顺序,一定和序列化的顺序一致*/@Overridepublic void readFields(DataInput in) throws IOException {this.phone = in.readUTF();this.name = in.readUTF();this.addr = in.readUTF();this.flow = in.readInt();}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddr() {return addr;}public void setAddr(String addr) {this.addr = addr;}public int getFlow() {return flow;}public void setFlow(int flow) {this.flow = flow;}/*** 当需要打印对象的时候,重写toString()方法。*/@Overridepublic String toString() {return "Flow [phone=" + phone + ", name=" + name + ", addr=" + addr + ", flow=" + flow + "]";}}
(2)编写MapTask:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Flow>.Context context)throws IOException, InterruptedException {String line = value.toString();Flow flow = new Flow();String[] info = line.split(" ");flow.setPhone(info[0]);flow.setName(info[1]);flow.setAddr(info[2]);flow.setFlow(Integer.parseInt(info[3]));//将名字作为key,一行数据转为对象后,作为valuecontext.write(new Text(flow.getName()), flow);}
}
(3)编写ReduceTask:
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class FlowReducer extends Reducer<Text, Flow, Flow, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<Flow> values, Reducer<Text, Flow, Flow, NullWritable>.Context context)throws IOException, InterruptedException {Flow result = new Flow();for (Flow flow : values) {result.setPhone(flow.getPhone());result.setName(flow.getName());result.setAddr(flow.getAddr());result.setFlow(result.getFlow() + flow.getFlow());}//结果集中可以对value使用NullWritable类型context.write(result , NullWritable.get());}
}
(4)编写driver:
package mrDay1.mapreduce.flow;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCountDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//获取job对象Job job = Job.getInstance(conf);//设置job方法入口的驱动类job.setJarByClass(FlowCountDriver.class);//设置Mapper组件类job.setMapperClass(FlowMapper.class);//设置mapper的输出key类型job.setMapOutputKeyClass(Text.class);//设置Mappper的输出value类型,注意Text的导包问题job.setMapOutputValueClass(Flow.class);//设置reduce组件类job.setReducerClass(FlowReducer.class);//设置reduce输出的key和value类型job.setOutputKeyClass(Flow.class);job.setOutputValueClass(NullWritable.class);//设置输入路径FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/flow"));//设置输出结果路径,要求结果路径事先不能存在FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/flow/result"));//提交jobjob.waitForCompletion(true);}
}
(5)打包上传到hadoop所在的服务器上,执行命令后,即可获取输出结果,具体如下:
[root@hadoop01 opt]# hadoop fs -cat /flow/result/part-r-00000
Flow{phone='32434343', name='ls', addr='sz', flow=232}
Flow{phone='12321312', name='ww', addr='sh', flow=234}
Flow{phone='12321445', name='zs', addr='bj', flow=686}

二、自定义排序

  如果要将自定义的类作为mapperkey来输出,因为mapper任务在输出时,会按照key自定义排序,因此自定义的类,也必须要进行排序。
  自定义类如果要排序,就必须要实现WritableComparable接口(该接口也是继承了ComparableWritable接口的),并重写compareTo方法。写法与实现Writable写法差不多,额外需要重写compareTo方法而已,其他不变,具体如下:

public class Flow implements WritableComparable<Flow> {private String phone;private String name;private String addr;private int flow;/**** 如果this比o大,则this放到后边去,即升序排序,反之则降序排序。* 如果相等,则相同的则只会留下一个,因为hadoop的地址复用技术,* 如果相等的话,只会留下后者。* */@Overridepublic int compareTo(Flow o) {int i = this.flow-o.flow;if(i ==0){ //如果相等,则进行另一种条件的比较,避免因为相等,数据丢失的问题i = this.name.hashCode()-o.name.hashCode();}return i;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phone);out.writeUTF(name);out.writeUTF(addr);out.writeInt(flow);}@Overridepublic void readFields(DataInput in) throws IOException {this.phone = in.readUTF();this.name = in.readUTF();this.addr = in.readUTF();this.flow = in.readInt();}//省略GET/SET/toString方法}

  实现了WritableComparable接口后,mapper里,就可以将该类作为key来输出了。