快生活 - 生活常识大全

大数据技术版的对文


  mportjava.io.IOException;
  importorg.apache.hadoop.conf.Configuration;
  importorg.apache.hadoop.conf.Configured;
  importorg.apache.hadoop.fs.FileSystem;
  importorg.apache.hadoop.fs.Path;
  importorg.apache.hadoop.io.LongWritable;
  importorg.apache.hadoop.io.Text;
  importorg.apache.hadoop.mapreduce.Job;
  importorg.apache.hadoop.mapreduce.Mapper;
  importorg.apache.hadoop.mapreduce.Reducer;
  importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  importorg.apache.hadoop.util.Tool;
  importorg.apache.hadoop.util.ToolRunner;
  publicclassWordCountextendsConfiguredimplementsTool{
  /*
  *对文本文件进行Wordcount,文本文件的输入类型是TextInputFormat,它实现了createRecordReader,
  *返回创建的LineRecordReader实现类,这个类里就有对应的key和value的类型
  *
  *文本文件
  * KEYIN:行字节偏移量
  * VALUEIN:一行数据
  *
  *mapper的输入类型是由业务需求来自行确定类型,跟框架没关系,因为我们的需求是按照单词统计数量
  *
  * key:单词,String 类型的封装类 Text
  *value:数值,Long类型的封装类LongWritable
  *
  */
  publicstaticclassWordCountMapperextendsMapper<longwritable, longwritable="">{</longwritable,>
  ZZ
  //map(),一行调用一次
  @Override
  protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)
  throwsIOException,InterruptedException{
  Stringline=value.toString();
  System.out.println("map():keyIn:"+key.get()+";valueIn:"+value.toString());
  String[]splits=line.split("");
  for(Stringword:splits){
  keyOut.set(word);
  //map()输出数据,用context.write()
  context.write(keyOut,valueOut);
  System.out.println("map():keyOut:"+keyOut.toString()+";valueOut:"+valueOut.get());
  }
  }
  }
  /*
  *KEYIN,VALUEIN:根据map输出的类型来确定
  * KEYOUT, VALUEOUT:根据业务需求确定
  *KEYOUT是单词,String类型的封装类Text
  *VALUEOUT数值,Long类型的封装类LongWritable
  *
  */
  publicstaticclassWordCountReducerextendsReducer<text, longwritable="">{</text,>
  LongWritablevalueOut=newLongWritable();
  //一个key调用一次
  @Override
  protectedvoidreduce(Textkey,Iterable<longwritable>values,Contextcontext)throwsIOException,InterruptedException{</longwritable>
  StringBuildersb=newStringBuilder();
  sb.append("reduce():keyIn:"+key.toString()+";vlaueIn:[");
  longsum=0;
  for(LongWritablew:values){
  //通过get(),获取LongWritable对象的实际值
  longnum=w.get();
  sum+=num;
  sb.append(num).append(",");
  }
  sb.deleteCharAt(sb.length()-1);
  sb.append("]");
  System.out.println(sb.toString());
  valueOut.set(sum);
  context.write(key,valueOut);
  }
  }
  @Override
  publicintrun(String[]args)throwsException{
  //job创建及配置,提交任务
  Configurationconf=getConf();
  //创建job对象
  Jobjob=Job.getInstance(conf,"wordcount");
  //job任务运行类
  job.setJarByClass(WordCount.class);
  //job任务map运行类
  job.setMapperClass(WordCountMapper.class);
  //job任务reduce运行类
  job.setReducerClass(WordCountReducer.class);
  //job任务map阶段输出的key的类型
  job.setMapOutputKeyClass(Text.class);
  //job任务map阶段输出的value类型
  job.setMapOutputValueClass(LongWritable.class);
  //job任务reduce阶段(最后阶段)输出的key的类型
  job.setOutputKeyClass(Text.class);
  //job任务reduce阶段(最后阶段)输出的value的类型
  job.setOutputValueClass(LongWritable.class);
  //设置reduce个数
  job.setNumReduceTasks(2);
  //job任务的输入目录
  FileInputFormat.addInputPath(job,newPath(args[0]));
  PathoutputPath=newPath(args[1]);
  //job任务的输出目录
  FileOutputFormat.setOutputPath(job,outputPath);
  //解决自动删除输出目录
  FileSystemfs=FileSystem.get(conf);
  //判断文件系统下存不存在该目录,如果存在删除
  if(fs.exists(outputPath)){
  //递归删除
  fs.delete(outputPath,true);
  System.out.println("outputdir:"+outputPath.toString()+"deletedSUCCESS!");
  }
  //提交任务
  //waitForCompletion(false);false:代表不输出counter
  booleanstatus=job.waitForCompletion(false);
  returnstatus?0:1;
  }
  publicstaticvoidmain(String[]args)throwsException{
  //运行时将输入输出目录放到执行参数里,用main()的args接收到
  ///tmp/input/tmp/output
  System.exit(ToolRunner.run(newWordCount(),args));
  }
  }
网站目录投稿:冷海