且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

HBase结合MapReduce批量导入

更新时间:2022-09-17 08:10:39

  • Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据。
    开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出:
    HBase结合MapReduce批量导入
    图片格式描述:
    HBase结合MapReduce批量导入
    先介绍一个日期格式的转换:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class TestDate
    {
         public static void main(String[] args)
         {
               Date d = new Date();
               SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
               String time = df.format(d);
               System.out.println(time);
         }
    }
    /*2016-05-14 13:32:24*/
    1
    2
    3
    4
    5
    6
    7
    8
    9
     在Java当中,我们经常利用SimpledateFormat这个类将给定的日期转化成指定的格式。
     接下来在归纳一下Hbase结合MapReduce批量导入数据的时候,在代码当中应该注意的事项:
     ①MyReducer类继承的是TableReduce类,而不在是MapReduce中常用的Reducer类
     ②的数值类型没有什么用,通常将k3的数值类型设置为NullWritable即可
     ③只设置map函数的输出类型,不在设置reduce函数的输出类型,因为②的原因
     ④指定对输出文件格式化处理的类改为TableOutputFormat,而不在是TextOutputFormat
     ⑤输出文件的路径改为指定的表名,在Configuration中进行设定,而不在是path的方式
     ⑥如果想过jar包的方式运行程序,貌似还需要加入什么jar包,我没有整出来。
     接下来将贴出我在编程的时候第一次写出的业务代码:当然遇到了很多的问题。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    package IT01;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    public class HbaseApp
    {
         public static String path1 = "hdfs://hadoop80:9000/FlowData.txt";
         public static void main(String[] args) throws Exception
         {
               Configuration conf = new Configuration();
               conf.set("hbaser.rootdir","hdfs://hadoop80:9000/hbase");
               conf.set("hbase.zookeeper.quorum","hadoop80");
               conf.set(TableOutputFormat.OUTPUT_TABLE,"wlan_log");//在这里需要指定表的名字:相当于输出文件的路径
               conf.set("dfs.socket.timeout","2000");
     
               Job job = new Job(conf,"HbaseApp");
               FileInputFormat.setInputPaths(job, new Path(path1));
               job.setInputFormatClass(TextInputFormat.class);
               job.setMapperClass(MyMapper.class);
               job.setMapOutputKeyClass(Text.class);
               job.setMapOutputValueClass(Text.class);
     
               job.setNumReduceTasks(1);
               job.setPartitionerClass(HashPartitioner.class);
     
               job.setReducerClass(MyReducer.class);
    //         job.setOutputKeyClass(Text.class);
    //         job.setOutputValueClass(NullWritable.class);
               job.setOutputFormatClass(TableOutputFormat.class);
    //         FileOutputFormat.setOutputPath(job, new Path(path2));
               job.waitForCompletion(true);
         }
         public static class MyMapper extends Mapper{
                protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
                {
                      String[] splited = v1.toString().split("\t");
                      String reportTime = splited[0];
                      String msisdn = splited[1];
                      Date date = new Date(Long.parseLong(reportTime));
                      String time = DateConvert.dateParse(date);
                      String rowkey = msisdn+":"+time;//获取到行健
                      context.write(new Text(rowkey),new Text(v1.toString()));      
                }
         }
         public static class MyReducer extends TableReducer{
                protected void reduce(Text k2, Iterablev2s,Context context)throws IOException, InterruptedException
                {
                      for (Text v2 : v2s)
                     {
                         String[] splited = v2.toString().split("\t");
                         /**添加记录的时候需要指定行健、列族、列名、数值***/
                         Put put = new Put(k2.toString().getBytes());
                         put.add("cf".getBytes(),"reportTime".getBytes(), splited[0].getBytes());
                         put.add("cf".getBytes(),"msisdn".getBytes(), splited[1].getBytes());
                         put.add("cf".getBytes(),"apmac1".getBytes(), splited[2].getBytes());
                         put.add("cf".getBytes(),"apmac2".getBytes(), splited[3].getBytes());
                         put.add("cf".getBytes(),"host".getBytes(), splited[4].getBytes());
                         put.add("cf".getBytes(),"sitetype".getBytes(), splited[5].getBytes());
                         put.add("cf".getBytes(),"upPackNum".getBytes(), splited[6].getBytes());
                         put.add("cf".getBytes(),"downPackNum".getBytes(), splited[7].getBytes());
                         put.add("cf".getBytes(),"upPayLoad".getBytes(), splited[8].getBytes());
                         put.add("cf".getBytes(),"downPayLoad".getBytes(), splited[9].getBytes());
                         put.add("cf".getBytes(),"httpstatus".getBytes(), splited[10].getBytes());
                         context.write(NullWritable.get(),put);
                     }       
                }
         }
    }
    class DateConvert
    {
        public static String dateParse(Date  date)
        {
             SimpleDateFormat df = new SimpleDateFormat("yyyyMMddhhmmss");//构造一个日期解析器
             return df.format(date); 
        }
    }

    程序运行完之后:显示如下异常NumberFormatException
    HBase结合MapReduce批量导入
    显示的是数字格式异常, 于是我在map函数当中又加了一个throws NumberFormatException

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
                protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException,NumberFormatException
                {
                      String[] splited = v1.toString().split("\t");
                      String reportTime = splited[0];
                      String msisdn = splited[1];
                      Date date = new Date(Long.parseLong(reportTime));
                      String time = DateConvert.dateParse(date);
                      String rowkey = msisdn+":"+time;//获取到行健
                      context.write(new Text(rowkey),new Text(v1.toString()));      
                }

    但是这样我发现也不对,因为当我追踪Mapp这个类的源代码时,我发现父类的map方法并没有抛出NumberFormatException这个异常,根据子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围,我又将上面这段代码用try——catch这种异常处理方式进行处理:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
                {
                      try
                      {
                          String[] splited = v1.toString().split("\t");
                          String reportTime = splited[0];
                          String msisdn = splited[1];
                          Date date = new Date(Long.parseLong(reportTime));
                          String time = DateConvert.dateParse(date);
                          String rowkey = msisdn+":"+time;//获取到行健
                          context.write(new Text(rowkey),new Text(v1.toString()));      
                      }catch(Exception e)
                      {
                          Counter counter = context.getCounter("NumberExceptionNum""num");
                          counter.increment(1L);
                      }
                }

    当我将代码改成这样的时候,此时程序并没有显示抛出NumberFormatException这个异常,说明异常得到了处理,但是当我去查看Hbase数据的时候,发现HDFS中的日志数据并没有导入到Hbase数据库中,于是我又查看了一下MapReduce的运行日志:
    HBase结合MapReduce批量导入
    也就是我的22行数据在map函数中当中并没有输出,这个问题就匪夷所思了,为什么22行数据都会抛出数字格式异常呢,而且都没有输出,于是我想到可能是SimpleDateFZ喎"/kf/ware/vc/" target="_blank" class="keylink">vcm1hdNXiuPbA4LXEzsrM4qOs09rKx87S09a/qsq8uPfW1rDZtsijrLeiz9bN+MnPuty24M7E1cK2vMrHxfrF0NXiuPbA4LXEo6zX7tbV1tXT2tXStb3By87KzOK1xL3ivva3vbC4o6zTw3RyaW0oKdXiuPa3vbeoyKWz/dfWt/u0rsewuvO1xL/VuPG8tL/JoaM8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;">protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { try { String[] splited = v1.toString().split("\t"); String reportTime = splited[0].trim(); String msisdn = splited[1].trim(); Date date = new Date(Long.parseLong(reportTime)); String time = DateConvert.dateParse(date); String rowkey = msisdn+":"+time;//获取到行健 context.write(new Text(rowkey),new Text(v1.toString())); }catch(Exception e) { Counter counter = context.getCounter("NumberExceptionNum", "num"); counter.increment(1L); } }

    于是我又开始运行程序,但是当我运行完之后,从MapReduce的计数器当中,我发现第一条数据文本并没有导入:因为数字格式异常的这个原因估计在运行过程中被终止了。下面是计数器的显示:
    HBase结合MapReduce批量导入
    于是我又想到了一个解决方案,将第一条数据多复制一条即可,然后重写将数据上传到HDFS中。
    此时在一次 运行程序,显示正确,此时数据也全部导入到Hbase数据库中。
    HBase结合MapReduce批量导入
    Hbase中数据查看核实:
    HBase结合MapReduce批量导入
    将HDFS中的数据通过MapReduce导入到Hbase数据库时,总结如下:
    核心步骤:先将数据文件上传到HDFS,然后用MapReduce进行处理,将处理后的数据插入到 hbase中
    注意事项:
    1>子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围
    2>用trim()这个方法可以去除字符串前后的空格,换行符。
    3>既然第一条数据总是显示数字格式异常,将第一条数据复制为2份即可。



本文转自 SimplePoint 51CTO博客,原文链接:http://blog.51cto.com/2226894115/1896517,如需转载请自行联系原作者