本文共 3799 字,大约阅读时间需要 12 分钟。
代码
1 package zhouls.bigdata.myMapReduce.wordcount4; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.util.StringUtils;10 11 public class WordCountMapper extends Mapper{12 13 //该方法循环调用,从文件的split中读取每行调用一次,把该行所在的下标为key,该行的内容为value14 protected void map(LongWritable key, Text value,15 Context context)16 throws IOException, InterruptedException {17 String[] words = StringUtils.split(value.toString(), ' ');18 for(String w :words){19 context.write(new Text(w), new IntWritable(1));20 }21 }22 }
1 package zhouls.bigdata.myMapReduce.wordcount4; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class WordCountReducer extends Reducer{10 11 //每组调用一次,这一组数据特点:key相同,value可能有多个。12 protected void reduce(Text arg0, Iterable arg1,13 Context arg2)14 throws IOException, InterruptedException {15 int sum =0;16 for(IntWritable i: arg1){17 sum=sum+i.get();18 }19 arg2.write(arg0, new IntWritable(sum));20 }21 }
//System.setProperty("HADOOP_USER_NAME", "root");
// //1、MR执行环境有两种:本地测试环境,服务器环境 // //本地测试环境(windows):(便于调试) // 在windows的hadoop目录bin目录有一个winutils.exe // 1、在windows下配置hadoop的环境变量 // 2、拷贝debug工具(winutils.exe)到HADOOP_HOME/bin // 3、修改hadoop的源码 ,注意:确保项目的lib需要真实安装的jdk的lib // // 4、MR调用的代码需要改变: // a、src不能有服务器的hadoop配置文件(因为,本地是调试,去服务器环境集群那边的) // b、再调用是使用: // Configuration config = new Configuration(); // config.set("fs.defaultFS", "hdfs://HadoopMaster:9000"); // config.set("yarn.resourcemanager.hostname", "HadoopMaster");//服务器环境:(不便于调试),有两种方式。
//首先需要在src下放置服务器上的hadoop配置文件(都要这一步) //1、在本地直接调用,执行过程在服务器上(真正企业运行环境) // a、把MR程序打包(jar),直接放到本地 // b、修改hadoop的源码 ,注意:确保项目的lib需要真实安装的jdk的lib // c、增加一个属性: // config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar"); // d、本地执行main方法,servlet调用MR。 // // //2、直接在服务器上,使用命令的方式调用,执行过程也在服务器上 // a、把MR程序打包(jar),传送到服务器上 // b、通过: hadoop jar jar路径 类的全限定名 // // // // //a,1 b,1 //a,3 c,3 //a,2 d,2 // // //a,3 c,3 //a,2 d,2 //a,1 b,1 //
1 package zhouls.bigdata.myMapReduce.wordcount4; 2 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job;10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;12 13 public class RunJob {14 15 public static void main(String[] args) {16 Configuration config =new Configuration();17 config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");18 config.set("yarn.resourcemanager.hostname", "HadoopMaster");19 // config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");//先打包好wc.jar20 try {21 FileSystem fs =FileSystem.get(config);22 23 Job job =Job.getInstance(config);24 job.setJarByClass(RunJob.class);25 26 job.setJobName("wc");27 28 job.setMapperClass(WordCountMapper.class);29 job.setReducerClass(WordCountReducer.class);30 31 job.setMapOutputKeyClass(Text.class);32 job.setMapOutputValueClass(IntWritable.class);33 34 FileInputFormat.addInputPath(job, new Path("/usr/input/wc/wc.txt"));//新建好输入路径,且数据源35 36 Path outpath =new Path("/usr/output/wc");37 if(fs.exists(outpath)){38 fs.delete(outpath, true);39 }40 FileOutputFormat.setOutputPath(job, outpath);41 42 boolean f= job.waitForCompletion(true);43 if(f){44 System.out.println("job任务执行成功");45 }46 } catch (Exception e) {47 e.printStackTrace();48 }49 }50 }