您现在的位置是:主页 > news > 做网站费用滁州/南昌搜索引擎优化
做网站费用滁州/南昌搜索引擎优化
admin2025/4/30 5:41:59【news】
简介做网站费用滁州,南昌搜索引擎优化,深圳外贸网站优化,个人做免费网页一、前提: 无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,系统提供的InputFor就不适用了,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。 …
做网站费用滁州,南昌搜索引擎优化,深圳外贸网站优化,个人做免费网页一、前提:
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,系统提供的InputFor就不适用了,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
…
一、前提:
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,系统提供的InputFor就不适用了,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
二、需求:
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
(1)输入数据:
(2)期望输出文件格式:
三、实现步骤:
1、自定义类实现继承FileInputFormat
(1)重写isSplitable()方法,返回 false不可分割。
(2)重写createRecordReader()方法,创建自定义的RecordReader对象,并初始化。
package com.c21.demo;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {//重写isSplitable()方法,返回 false不可分割。@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}//重写createRecordReader()方法,创建自定义的RecordReader对象,并初始化。@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {return new WholeRecordReader();}
}
2、自定义RecordReader,实现一次读取一个文件封装为kv
package com.c21.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class WholeRecordReader extends RecordReader<Text, BytesWritable> {private Configuration configuration;private FileSplit split;private boolean isProgress= true;private BytesWritable value = new BytesWritable();private Text k = new Text();public WholeRecordReader() {super();}//初始化@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit)split;configuration = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (isProgress) {// 1 定义缓存区byte[] contents = new byte[(int)split.getLength()];FileSystem fs = null;FSDataInputStream fis = null;try {// 2 获取文件系统Path path = split.getPath();fs = path.getFileSystem(configuration);// 3 读取数据fis = fs.open(path);// 4 读取文件内容IOUtils.readFully(fis, contents, 0, contents.length);// 5 输出文件内容value.set(contents, 0, contents.length);// 6 获取文件路径及名称String name = split.getPath().toString();// 7 设置输出的key值k.set(name);} catch (Exception e) {}finally {IOUtils.closeStream(fis);}isProgress = false;return true;}return false;}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return k;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}
3、自定义Mapper类:
package com.c21.demo;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}
4、自定义Reducer类:
package com.c21.demo;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {context.write(key,values.iterator().next());}
}
5、Driver 类:
package com.c21.demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;public class SequenceFileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "e:/input/", "e:/output" };// 1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置jar包存储位置、关联自定义的mapper和reducerjob.setJarByClass(SequenceFileDriver.class);job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);// 7设置输入的inputFormatjob.setInputFormatClass(WholeFileInputformat.class);// 8设置输出的outputFormatjob.setOutputFormatClass(SequenceFileOutputFormat.class);// 3 设置map输出端的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);// 4 设置最终输出端的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);// 5 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
运行结果: