mportjava.io.IOException; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.conf.Configured; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.Tool; importorg.apache.hadoop.util.ToolRunner; publicclassWordCountextendsConfiguredimplementsTool{ /* *对文本文件进行Wordcount,文本文件的输入类型是TextInputFormat,它实现了createRecordReader, *返回创建的LineRecordReader实现类,这个类里就有对应的key和value的类型 * *文本文件 * KEYIN:行字节偏移量 * VALUEIN:一行数据 * *mapper的输入类型是由业务需求来自行确定类型,跟框架没关系,因为我们的需求是按照单词统计数量 * * key:单词,String 类型的封装类 Text *value:数值,Long类型的封装类LongWritable * */ publicstaticclassWordCountMapperextendsMapper<longwritable, longwritable="">{</longwritable,> ZZ //map(),一行调用一次 @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext) throwsIOException,InterruptedException{ Stringline=value.toString(); System.out.println("map():keyIn:"+key.get()+";valueIn:"+value.toString()); String[]splits=line.split(""); for(Stringword:splits){ keyOut.set(word); //map()输出数据,用context.write() context.write(keyOut,valueOut); System.out.println("map():keyOut:"+keyOut.toString()+";valueOut:"+valueOut.get()); } } } /* *KEYIN,VALUEIN:根据map输出的类型来确定 * KEYOUT, VALUEOUT:根据业务需求确定 *KEYOUT是单词,String类型的封装类Text *VALUEOUT数值,Long类型的封装类LongWritable * */ publicstaticclassWordCountReducerextendsReducer<text, longwritable="">{</text,> LongWritablevalueOut=newLongWritable(); //一个key调用一次 @Override protectedvoidreduce(Textkey,Iterable<longwritable>values,Contextcontext)throwsIOException,InterruptedException{</longwritable> StringBuildersb=newStringBuilder(); sb.append("reduce():keyIn:"+key.toString()+";vlaueIn:["); longsum=0; for(LongWritablew:values){ //通过get(),获取LongWritable对象的实际值 longnum=w.get(); sum+=num; sb.append(num).append(","); } sb.deleteCharAt(sb.length()-1); sb.append("]"); System.out.println(sb.toString()); valueOut.set(sum); context.write(key,valueOut); } } @Override publicintrun(String[]args)throwsException{ //job创建及配置,提交任务 Configurationconf=getConf(); //创建job对象 Jobjob=Job.getInstance(conf,"wordcount"); //job任务运行类 job.setJarByClass(WordCount.class); //job任务map运行类 job.setMapperClass(WordCountMapper.class); //job任务reduce运行类 job.setReducerClass(WordCountReducer.class); //job任务map阶段输出的key的类型 job.setMapOutputKeyClass(Text.class); //job任务map阶段输出的value类型 job.setMapOutputValueClass(LongWritable.class); //job任务reduce阶段(最后阶段)输出的key的类型 job.setOutputKeyClass(Text.class); //job任务reduce阶段(最后阶段)输出的value的类型 job.setOutputValueClass(LongWritable.class); //设置reduce个数 job.setNumReduceTasks(2); //job任务的输入目录 FileInputFormat.addInputPath(job,newPath(args[0])); PathoutputPath=newPath(args[1]); //job任务的输出目录 FileOutputFormat.setOutputPath(job,outputPath); //解决自动删除输出目录 FileSystemfs=FileSystem.get(conf); //判断文件系统下存不存在该目录,如果存在删除 if(fs.exists(outputPath)){ //递归删除 fs.delete(outputPath,true); System.out.println("outputdir:"+outputPath.toString()+"deletedSUCCESS!"); } //提交任务 //waitForCompletion(false);false:代表不输出counter booleanstatus=job.waitForCompletion(false); returnstatus?0:1; } publicstaticvoidmain(String[]args)throwsException{ //运行时将输入输出目录放到执行参数里,用main()的args接收到 ///tmp/input/tmp/output System.exit(ToolRunner.run(newWordCount(),args)); } }