MapReduce 分区器
分区器的工作方式类似于处理输入数据集的条件。分区阶段发生在 Map 阶段之后,Reduce 阶段之前。
partitioner 的数量等于 reducer 的数量。这意味着 partitioner 将根据 reducer 的数量划分数据。因此,从单个分区器传递过来的数据由单个 Reducer 处理。
分区器
分区器对中间 Map-outputs 的键值对进行分区。它使用用户定义的条件对数据进行分区,其工作方式类似于散列函数。分区总数与作业的 Reducer 任务数相同。让我们通过一个例子来了解分区器是如何工作的。
MapReduce 分区器实现
为了方便起见,假设我们有一个名为 Employee 的小表,其中包含以下数据。我们将使用此示例数据作为输入数据集来演示分区器的工作原理。
Id |
Name |
Age |
Gender |
Salary |
1201 |
gopal |
45 |
Male |
50,000 |
1202 |
manisha |
40 |
Female |
50,000 |
1203 |
khalil |
34 |
Male |
30,000 |
1204 |
prasantd |
30 |
Male |
30,000 |
1205 |
kiran |
20 |
Male |
40,000 |
1206 |
laxmi |
25 |
Female |
35,000 |
1207 |
bhavya |
20 |
Female |
15,000 |
1208 |
reshma |
19 |
Female |
15,000 |
1209 |
krantdi |
22 |
Male |
22,000 |
1210 |
Satish |
24 |
Male |
25,000 |
1211 |
Krishna |
25 |
Male |
25,000 |
1212 |
Arshad |
28 |
Male |
20,000 |
1213 |
lavanya |
18 |
Female |
8,000 |
我们必须编写一个应用程序来处理输入数据集,以找出不同年龄组(例如,20 岁以下、21 到 30 岁之间、30 岁以上)按性别划分的最高薪水员工。
输入数据
以上数据以
input.txt形式保存在"/home/hadoop/hadoopPartitioner"目录下,作为输入给出。
1201 |
gopal |
45 |
Male |
50000 |
1202 |
manisha |
40 |
Female |
51000 |
1203 |
khaleel |
34 |
Male |
30000 |
1204 |
prasantd |
30 |
Male |
31000 |
1205 |
kiran |
20 |
Male |
40000 |
1206 |
laxmi |
25 |
Female |
35000 |
1207 |
bhavya |
20 |
Female |
15000 |
1208 |
reshma |
19 |
Female |
14000 |
1209 |
krantdi |
22 |
Male |
22000 |
1210 |
Satish |
24 |
Male |
25000 |
1211 |
Krishna |
25 |
Male |
26000 |
1212 |
Arshad |
28 |
Male |
20000 |
1213 |
lavanya |
18 |
Female |
8000 |
基于给定的输入,以下是程序的算法解释。
Map Tasks
map 任务接受键值对作为输入,而我们在文本文件中有文本数据。此地图任务的输入如下-
Input-键将是一个模式,例如"任何特殊键 + 文件名 + 行号"(例如:key = @input1),值将是该行中的数据(例如: 值 = 1201 \t gopal \t 45 \t 男性 \t 50000).
方法-这个map任务的操作如下-
读取值(记录数据),它作为字符串中参数列表的输入值。
使用 split 函数,将性别分开并存储在字符串变量中。
String[] str = value.toString().split("\t",-3);
String gender=str[3];
将性别信息和记录数据值作为输出键值对从映射任务发送到分区任务。
context.write(new Text(gender), new Text(value));
对文本文件中的所有记录重复上述所有步骤。
输出-您将获得性别数据和记录数据值作为键值对。
Map Tasks
partitioner 任务接受来自 map 任务的键值对作为其输入。分区意味着将数据划分为段。根据给定的分区条件标准,输入的键值对数据可以根据年龄标准分为三部分。
Input-键值对集合中的全部数据。
key = 记录中的性别字段值。
value = 该性别的全记录数据值。
方法-分区逻辑的过程如下。
从输入的键值对中读取年龄字段值。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
在以下条件下检查年龄值。
年龄小于或等于 20 岁
年龄大于 20 且小于或等于 30。
年龄超过 30 岁。
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
输出-键值对的整个数据被分割成三个键值对集合。 Reducer 单独处理每个集合。
Partitioner Task
partitioner 任务数等于reducer 任务数。这里我们有 3 个 partitioner 任务,因此我们有 3 个 Reducer 任务要执行。
Input-Reducer 将使用不同的键值对集合执行 3 次。
key = 记录中的性别字段值。
value = 该性别的全部记录数据。
方法-以下逻辑将应用于每个集合。
读取每条记录的 Salary 字段值。
String [] str = val.toString().split("\t",-3);
Note: str[4] have tde salary field value.
用 max 变量检查工资。如果 str[4] 是最大工资,则将 str[4] 赋值给 max,否则跳过该步骤。
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
为每个密钥集合重复步骤 1 和 2(男性和女性是密钥集合)。执行完这三个步骤后,您将在 Male key 集合中找到一个最高薪水,从 Women 键集合中找到一个最高薪水。
context.write(new Text(key), new IntWritable(max));
输出-最后,您将获得一组不同年龄组的三个集合中的键值对数据。它分别包含每个年龄组的男性集合的最高工资和女性集合的最高工资。
执行Map、Partitioner和Reduce任务后,三个键值对数据集合作为输出存储在三个不同的文件中。
所有三个任务都被视为 MapReduce 作业。应在配置中指定这些作业的以下要求和规格-
职位名称
键和值的输入和输出格式
Map、Reduce 和 Partitioner 任务的各个类
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output patds
FileInputFormat.setInputPatds(job, new Patd(arg[0]));
FileOutputFormat.setOutputPatd(job,new Patd(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
示例程序
以下程序显示了如何在 MapReduce 程序中实现给定条件的分区器。
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class Mapclass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t",-3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class Reduceclass extends Reducer<Text,Text,Text,IntWritable>
{
public int max =-1;
public void reduce(Text key, Iterable <Text> values, Context context) tdrows IOException, InterruptedException
{
max =-1;
for (Text val : values)
{
String [] str = val.toString().split("\t",-3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) tdrows Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPatds(job, new Patd(arg[0]));
FileOutputFormat.setOutputPatd(job,new Patd(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) tdrows Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
将上述代码保存为"/home/hadoop/hadoopPartitioner"中的
PartitionerExample.java。下面给出程序的编译和执行。
编译执行
假设我们位于 Hadoop 用户的主目录中(例如,/home/hadoop)。
按照下面给出的步骤编译并执行上述程序。
步骤 1-下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从 mvnrepository.com
<下载 jar a>
.
下载>
假设下载的文件夹是"/home/hadoop/hadoopPartitioner"
步骤 2-以下命令用于编译程序
PartitionerExample.java 并为程序创建一个 jar。
$ javac-classpatd hadoop-core-1.2.1.jar-d ProcessUnits.java
$ jar-cvf PartitionerExample.jar-C .
步骤 3-使用以下命令在 HDFS 中创建输入目录。
$HADOOP_HOME/bin/hadoop fs-mkdir input_dir
Step 4-使用以下命令将名为
input.txt 的输入文件复制到 HDFS 的输入目录中。
$HADOOP_HOME/bin/hadoop fs-put /home/hadoop/hadoopPartitioner/input.txt input_dir
步骤 5-使用以下命令验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs-ls input_dir/
步骤 6-使用以下命令通过从输入目录中获取输入文件来运行最高工资应用程序。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
稍等片刻,直到文件被执行。执行后,输出包含多个输入拆分、map 任务和 Reducer 任务。
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGtd=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
步骤 7-使用以下内容ing 命令来验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs-ls output_dir/
您会在三个文件中找到输出,因为您在程序中使用了三个分区器和三个减速器。
步骤 8-使用以下命令查看
Part-00000 文件中的输出。此文件由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs-cat output_dir/part-00000
Part-00000 输出
使用以下命令查看
Part-00001 文件中的输出。
$HADOOP_HOME/bin/hadoop fs-cat output_dir/part-00001
Part-00001 中的输出
使用以下命令查看
Part-00002 文件中的输出。
$HADOOP_HOME/bin/hadoop fs-cat output_dir/part-00002
Part-00002 中的输出