且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

将对象从驱动程序传递给MapReduce

更新时间:2023-11-23 07:50:33

这与边数据分布的问题有关。

有两种方法可用于边数据分发。

分布式缓存

2)配置

由于您有要共享的对象,因此可以使用Configuration类。

这个讨论将取决于配置类,以便在整个集群中提供一个对象,所有映射器和(或)还原器都可以访问该对象。这里的方法很简单。配置分类的setString(String,String)setter用于实现此任务。必须共享的对象在驱动程序端序列化为一个Java字符串,并在Mapper或Reducer中反序列化为对象。



在下面的示例代码,我已经使用com.google.gson.Gson类进行简单的序列化和反序列化。您也可以使用Java序列化。



代表对象的类您需要共享

  public class TestBean {
String string1;
字符串string2;
public TestBean(String test1,String test2){
super();
this.string1 = test1;
this.string2 = test2;

public TestBean(){
this(,);
}
public String getString1(){
return string1;
}
public void setString1(String test1){
this.string1 = test1;
}
public String getString2(){
return string2;
}
public void setString2(String test2){
this.string2 = test2;


您可以设置的主类配置

  public class GSONTestDriver {
public static void main(String [] args)throws Exception {
System.out.println(In Main);
Configuration conf = new Configuration();
TestBean testB1 = new TestBean(Hello1,Gson1);
TestBean testB2 = new TestBean(Hello2,Gson2);
Gson gson = new Gson();
String testSerialization1 = gson.toJson(testB1);
String testSerialization2 = gson.toJson(testB2);
conf.set(instance1,testSerialization1);
conf.set(instance2,testSerialization2);
工作职位=新职位(conf,GSON Test);
job.setJarByClass(GSONTestDriver.class);
job.setMapperClass(GSONTestMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
job.waitForCompletion(true);






您可以从中检索的mapper类对象

  public class GSONTestMapper extends 
Mapper< LongWritable,Text,Text,NullWritable> {
配置conf;
String inst1;
String inst2;
public void setup(Context context){
conf = context.getConfiguration();
inst1 = conf.get(instance1);
inst2 = conf.get(instance2);
Gson gson = new Gson();
TestBean tb1 = gson.fromJson(inst1,TestBean.class);
System.out.println(tb1.getString1());
System.out.println(tb1.getString2());
TestBean tb2 = gson.fromJson(inst2,TestBean.class);
System.out.println(tb2.getString1());
System.out.println(tb2.getString2());

public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
context.write(value,NullWritable.get());




$ b $ p
$ b

将bean转换为序列化的Json字符串,使用com.google.gson.Gson类的toJson(Object src)方法。然后,序列化的Json字符串作为值通过配置实例传递,并通过Mapper中的名称访问。该字符串在那里使用相同Gson类的fromJson(String json,Class classOfT)方法进行反序列化。而不是我的测试bean,你可以放置你的对象。

I created a driver which reads a config file, builds a list of objects (based on the config) and passes that list to MapReduce (MapReduce has a static attribute which holds a reference to that list of object).

It works but only locally. As soon as I run the job on a cluster config I will get all sort of errors suggesting that the list hasn't been built. It makes me think that I'm doing it wrong and on a cluster setup MapReduce is being run independently from the driver.

My question is how to correctly initialise a Mapper.

(I'm using Hadoop 2.4.1)

This is related to the problem of side data distribution.

There are two approaches for side data distribution.

1) Distributed Caches

2) Configuration

As you have the objects to be shared, we can use the Configuration class.

This discussion will depend on the Configuration class to make available an Object across the cluster, accessible to all Mappers and(or) Reducers. The approach here is quite simple. The setString(String, String) setter of the Configuration classed is harnessed to achieve this task. The Object that has to be shared across is serialized into a java string at the driver end and is de-serialized back to the object at the Mapper or Reducer.

In the example code below, I have used com.google.gson.Gson class for the easy serialization and deserialization. You can use Java Serialization as well.

Class that Represents the Object You need to Share

 public class TestBean {
    String string1;
    String string2;
    public TestBean(String test1, String test2) {
        super();
        this.string1 = test1;
        this.string2 = test2;
    }
    public TestBean() {
        this("", "");
    }
    public String getString1() {
        return string1;
    }
    public void setString1(String test1) {
        this.string1 = test1;
    }
    public String getString2() {
        return string2;
    }
    public void setString2(String test2) {
        this.string2 = test2;
    }
}

The Main Class from where you can set the Configurations

public class GSONTestDriver {
    public static void main(String[] args) throws Exception {
        System.out.println("In Main");
        Configuration conf = new Configuration();
        TestBean testB1 = new TestBean("Hello1","Gson1");
        TestBean testB2 = new TestBean("Hello2","Gson2");
        Gson gson = new Gson();
        String testSerialization1 = gson.toJson(testB1);
        String testSerialization2 = gson.toJson(testB2);
        conf.set("instance1", testSerialization1);
        conf.set("instance2", testSerialization2);
        Job job = new Job(conf, " GSON Test");
        job.setJarByClass(GSONTestDriver.class);
        job.setMapperClass(GSONTestMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

The mapper class from where you can retrieve the object

public class GSONTestMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
    Configuration conf;
    String inst1;
    String inst2;
    public void setup(Context context) {
        conf = context.getConfiguration();
        inst1 = conf.get("instance1");
        inst2 = conf.get("instance2");
        Gson gson = new Gson();
        TestBean tb1 = gson.fromJson(inst1, TestBean.class);
        System.out.println(tb1.getString1());
        System.out.println(tb1.getString2());
        TestBean tb2 = gson.fromJson(inst2, TestBean.class);
        System.out.println(tb2.getString1());
        System.out.println(tb2.getString2());
    } 
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

The bean is converted to a serialized Json String using the toJson(Object src) method of the class com.google.gson.Gson. Then the serialised Json string is passed as value through the configuration instance and accessed by name from the Mapper. The string is deserialized there using the fromJson(String json, Class classOfT) method of the same Gson class. Instead of my test bean, you could place your objects.