package org.apache.hadoop.studyhdfs.mapreduce;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.jboss.netty.util.internal.StringUtil;
public class ProvinceCountMapReduce extends Configured implements Tool {
//1.map
/*
* <KEYIN,VALUEIN,KEYOUT,VALUEOUT>
*/
public static class WordCountMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable>{
private IntWritable mapOutputKey =new IntWritable();
private IntWritable mapOutputValue =new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//get lineValue
String lineValue =value.toString();
//split
String[] strs =lineValue.split("\t");
//line blank
String url=strs[1];
String provinceIdValue =strs[23];
//guolv
if(strs.length < 30 || StringUtils.isBlank(provinceIdValue) || StringUtils.isBlank(url)){
return;
}
int provinceId =Integer.MAX_VALUE;
try {
provinceId=Integer.valueOf(provinceIdValue);
} catch (Exception e) {
return;
}
if(provinceId == Integer.MAX_VALUE){
return;
}
mapOutputKey.set(provinceId);
context.write(mapOutputKey, mapOutputValue);
}
}
//2.reduce
public static class WordCountReduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
private IntWritable outputValue =new IntWritable();
@Override
public void reduce(IntWritable key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
//to do
int sum = 0;
for(IntWritable value:values){
sum +=value.get();
}
outputValue.set(sum);
context.write(key, outputValue);
}
}
public int run(String[] args) throws Exception{
//1.get Configuration
Configuration conf =super.getConf();
//2.create job
Job job =Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(ProvinceCountMapReduce.class);
//3.set job
//3.1 set input
Path inputPath =new Path(args[0]);
FileInputFormat.addInputPath(job, inputPath);
//3.2 set mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//3.3 set reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//3.4 set input
Path outputPath =new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//4.submmit
boolean isSuccess =job.waitForCompletion(true);
return isSuccess?0:1;
}
public static void main(String[] args) throws Exception {
args =new String[]{
"hdfs://Hadoop-senior02.beifeng.com:8020/input/2015082818",
"hdfs://Hadoop-senior02.beifeng.com:8020/output15/"
};
Configuration conf =new Configuration();
conf.set("mapreduce.map.output.compress", "true");
int status=ToolRunner.run(conf, new ProvinceCountMapReduce() , args);
System.exit(status);
}
}
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有