实验5 MapReduce初级编程实践
1. 实验目的
(1) 通过实验掌握基本的 MapReduce 编程方法;
(2) 掌握用 MapReduce 解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。
(1) 操作系统: Linux(Ubuntu22.04);
(2) Hadoop 版本: 3.1.3;
对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例供参考。
输入文件 A 的样例如下:
1 2 3 4 5 6 7 8 9 10 11
| 20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 x
输入文件 B 的样例如下:
1 2 3 4 5 6 7 8 9
| 20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入文件 A 和 B 合并得到的输出文件 C 的样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 x
1.启动 hadoop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;
public class Merge {
public static class Map extends Mapper<Object, Text, Text, Text>{ private static Text text = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException{ text = value; context.write(text, new Text("")); } }
public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{ context.write(key, new Text("")); } }
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://localhost:9000"); String[] otherArgs = new String[]{"input","output"}; if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in><out>"); System.exit(2); } Job job = Job.getInstance(conf,"Merge and duplicate removal"); job.setJarByClass(Merge.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
输入文件 1 的样例如下:
33 37 12 40
输入文件 2 的样例如下:
4 16 39 5
输入文件 3 的样例如下:
1 45 25
根据输入文件 1、 2 和 3 得到的输出文件如下:
1 1 2 4 3 5 4 12 5 16 6 25 7 33 8 37 9 39 10 40 11 45
1.启动 hadoop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| import java.io.IOException; import java.util.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;
public class simple_data_mining { public static int time = 0;
public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context) throws IOException,InterruptedException{ String child_name = new String(); String parent_name = new String(); String relation_type = new String(); String line = value.toString(); int i = 0; while(line.charAt(i) != ' '){ i++; } String[] values = {line.substring(0,i),line.substring(i+1)}; if(values[0].compareTo("child") != 0){ child_name = values[0]; parent_name = values[1]; relation_type = "1"; context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
relation_type = "2"; context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
} } }
public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{ if(time == 0){ context.write(new Text("grand_child"), new Text("grand_parent")); time++; } int grand_child_num = 0; String grand_child[] = new String[10]; int grand_parent_num = 0; String grand_parent[]= new String[10]; Iterator ite = values.iterator(); while(ite.hasNext()){ String record = ite.next().toString(); int len = record.length(); int i = 2; if(len == 0) continue; char relation_type = record.charAt(0); String child_name = new String(); String parent_name = new String();
while(record.charAt(i) != '+'){ child_name = child_name + record.charAt(i); i++; } i=i+1;
while(i<len){ parent_name = parent_name+record.charAt(i); i++; }
if(relation_type == '1'){ grand_child[grand_child_num] = child_name; grand_child_num++; } else{ grand_parent[grand_parent_num] = parent_name; grand_parent_num++; } }
if(grand_parent_num != 0 && grand_child_num != 0 ){ for(int m = 0;m<grand_child_num;m++){ for(int n=0;n<grand_parent_num;n++){ context.write(new Text(grand_child[m]), new Text(grand_parent[n]));
} } } } } public static void main(String[] args) throws Exception{
Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://localhost:9000"); String[] otherArgs = new String[]{"input","output"}; if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in><out>"); System.exit(2); } Job job = Job.getInstance(conf,"Single table join"); job.setJarByClass(simple_data_mining.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
下面给出一个 child-parent 的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
grandchild grandparent
Steven Alice Steven Jesse
Jone Alice Jone Jesse
Steven Mary Steven Frank
Jone Mary Jone Frank
Philip Alice Philip Jesse
Mark Alice Mark Jesse
1.启动 hadoop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;
public class MergeSort {
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
private static IntWritable data = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException{ String text = value.toString(); data.set(Integer.parseInt(text)); context.write(data, new IntWritable(1)); } }
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ private static IntWritable line_num = new IntWritable(1); public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{ for(IntWritable val : values){ context.write(line_num, key); line_num = new IntWritable(line_num.get() + 1); } } }
public static class Partition extends Partitioner<IntWritable, IntWritable>{ public int getPartition(IntWritable key, IntWritable value, int num_Partition){ int Maxnumber = 65223; int bound = Maxnumber/num_Partition+1; int keynumber = key.get(); for (int i = 0; i<num_Partition; i++){ if(keynumber<bound * (i+1) && keynumber>=bound * i){ return i; } } return -1; } }
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://localhost:9000"); String[] otherArgs = new String[]{"input","output"}; if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in><out>"); System.exit(2); } Job job = Job.getInstance(conf,"Merge and sort"); job.setJarByClass(MergeSort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setPartitionerClass(Partition.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
问题1: 对MapReduce程序不熟悉
解决:参考http://dblab.xmu.edu.cn/blog/2481-2/ 的MapReduce编程实践教程
问题2: 启动hadoop时报错:util.NativeCodeLoader: Unable to load native-hadoop library for your platform
解决: 这个消息是一个警告,出现在Hadoop应用程序或服务的日志中,表示当前平台无法加载本机Hadoop库。Hadoop是使用Java语言开发的,但是有一些需求和操作并不适合使用java所以会引入了本地库(Native Libraries)的概念,通过本地库,Hadoop可以更加高效地执行某一些操作.
1 2 3 4 5 6 7 8 9
| <property>
<description>Should native hadoop libraries, if present, be used.</description>
下载完后传到namenode 和datanode服务器上
删除native 下的所有包:
| rm -rf /[hadoopHome的目录]/lib/native/*
| tar -xvf hadoop-native-64-2.6.0.tar /[hadoopHome的目录]/lib/native