且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

HBase Thrift客户端Java API实践

更新时间:2022-02-20 02:32:34

HBase的Thrift API定义,可以通过链接 http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift?view=markup看到,我们需要安装Thrift编译器,才能生成HBase跨语言的API。
首先下载上面链接的内容,保存为Hbase.thrift。
然后,执行如下命令,生成不同编程语言的HBase API:

01 [hadoop@master hbase]$ thrift --gen cpp Hbase.thrift
02 [hadoop@master hbase]$ thrift --gen java Hbase.thrift
03 [hadoop@master hbase]$ thrift --gen py Hbase.thrift
04 [hadoop@master hbase]$ thrift --gen perl Hbase.thrift
05 [hadoop@master hbase]$ thrift --gen csharp Hbase.thrift
06 [hadoop@master hbase]$ thrift --gen php Hbase.thrift
07 [hadoop@master hbase]$ thrift --gen js Hbase.thrift
08 [hadoop@master hbase]$ thrift --gen go Hbase.thrift
09 [hadoop@master hbase]$ thrift --gen erl Hbase.thrift
10 [hadoop@master hbase]$ thrift --gen delphi Hbase.thrift
11 [hadoop@master hbase]$ thrift --gen hs Hbase.thrift
12 [hadoop@master hbase]$ thrift --gen html Hbase.thrift
13 [hadoop@master hbase]$ thrift --gen c_glib Hbase.thrift
14 [hadoop@master hbase]$ thrift --gen cocoa Hbase.thrift
15 [hadoop@master hbase]$ thrift --gen rb Hbase.thrift
16 [hadoop@master hbase]$ thrift --gen st Hbase.thrift
17 [hadoop@master hbase]$ thrift --gen xsd Hbase.thrift
18 [hadoop@master hbase]$ ls
19 gen-as3 gen-cocoa gen-csharp gen-erl gen-hs gen-java gen-perl gen-py gen-st Hbase.thrift
20 gen-c_glib gen-cpp gen-delphi gen-go gen-html gen-js gen-php gen-rb gen-xsd

这里,我们基于Java语言,使用HBase 的Thrift 客户端API访问HBase表。事实上,如果使用Java来实现对HBase表的操作,***是使用HBase的原生API,无论从性能还是便利性方面,都会提供更好的体验。使用Thrift API访问,实际也是在HBase API之上进行了一层封装,可能初次使用Thrift API感觉很别扭,有时候还要参考Thrift服务端的实现代码。
准备工作如下:

    1. 下载Thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)
    2. 将上面生成的gen-java目录中代码拷贝到工作区
    3. 保证HBase集群正常运行,接着启动HBase的Thrift服务,执行如下命令:
1 bin/hbase thrift -b master -p 9090 start

上面,HBase的Thrift服务端口为9090,下面通过Thrift API访问的时候,需要用到,而不是HBase的服务端口(默认60000)。
接着,实现一个简单的例子,访问Hbase表。
首先,我们通过HBase Shell创建一个表:

1 create 'test_info', 'info'

表名为test_info,列簇名称为info。
然后,我们开始基于上面生成的Thrift代码来实现对HBase表的操作。
我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,代码如下所示:

01 package org.shirdrn.cloud.hbase.thrift;
02
03 import java.util.List;
04 import java.util.Map;
05
06 import org.apache.hadoop.hbase.thrift.generated.Hbase;
07 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
08 import org.apache.thrift.TException;
09 import org.apache.thrift.protocol.TBinaryProtocol;
10 import org.apache.thrift.protocol.TProtocol;
11 import org.apache.thrift.transport.TSocket;
12 import org.apache.thrift.transport.TTransport;
13 import org.apache.thrift.transport.TTransportException;
14
15 public abstract class AbstractHBaseThriftService {
16
17 protected static final String CHARSET = "UTF-8";
18 private String host = "localhost";
19 private int port = 9090;
20 private final TTransport transport;
21 protected final Hbase.Client client;
22
23 public AbstractHBaseThriftService() {
24 transport = new TSocket(host, port);
25 TProtocol protocol = new TBinaryProtocol(transport, true, true);
26 client = new Hbase.Client(protocol);
27 }
28
29 public AbstractHBaseThriftService(String host, int port) {
30 super();
31 transport = new TSocket(host, port);
32 TProtocol protocol = new TBinaryProtocol(transport, true, true);
33 client = new Hbase.Client(protocol);
34 }
35
36 public void open() throws TTransportException {
37 if(transport != null) {
38 transport.open();
39 }
40 }
41
42 public void close() {
43 if(transport != null) {
44 transport.close();
45 }
46 }
47
48 public abstract List<String> getTables() throws TException;
49
50 public abstract void update(String table, String rowKey, boolean writeToWal,
51 String fieldName, String fieldValue, Map<String, String> attributes)throws TException;
52 public abstract void update(String table, String rowKey, boolean writeToWal,
53 Map<String, String> fieldNameValues, Map<String, String> attributes)throws TException;
54
55 public abstract void deleteCell(String table, String rowKey, boolean writeToWal,
56 String column, Map<String, String> attributes) throws TException;
57 public abstract void deleteCells(String table, String rowKey, boolean writeToWal,
58 List<String> columns, Map<String, String> attributes) throws TException;
59
60 public abstract void deleteRow(String table, String rowKey,
61 Map<String, String> attributes) throws TException;
62
63 public abstract int scannerOpen(String table, String startRow, List<String> columns,
64 Map<String, String> attributes) throws TException;
65 public abstract int scannerOpen(String table, String startRow, String stopRow, List<String> columns,
66 Map<String, String> attributes) throws TException;
67 public abstract int scannerOpenWithPrefix(String table, String startAndPrefix,
68 List<String> columns, Map<String, String> attributes) throws TException;
69 public abstract int scannerOpenTs(String table, String startRow,
70 List<String> columns, long timestamp, Map<String, String> attributes)throws TException;
71 public abstract int scannerOpenTs(String table, String startRow, String stopRow,
72 List<String> columns, long timestamp, Map<String, String> attributes)throws TException;
73
74 public abstract List<TRowResult> scannerGetList(int id, int nbRows) throwsTException;
75 public abstract List<TRowResult> scannerGet(int id) throws TException;
76
77 public abstract List<TRowResult> getRow(String table, String row,
78 Map<String, String> attributes) throws TException;
79 public abstract List<TRowResult> getRows(String table,
80 List<String> rows, Map<String, String> attributes) throws TException;
81 public abstract List<TRowResult> getRowsWithColumns(String table,
82 List<String> rows, List<String> columns, Map<String, String> attributes)throws TException;
83
84 public abstract void scannerClose(int id) throws TException;
85
86 /**
87 * Iterate result rows(just for test purpose)
88 * @param result
89 */
90 public abstract void iterateResults(TRowResult result);
91
92 }

这里,简单叙述一下,我们提供的客户端API的基本功能:

  • 建立到Thrift服务的连接:open()
  • 获取到HBase中的所有表名:getTables()
  • 更新HBase表记录:update()
  • 删除HBase表中一行的记录的数据(cell):deleteCell()和deleCells()
  • 删除HBase表中一行记录:deleteRow()
  • 打开一个Scanner,返回id:scannerOpen()、scannerOpenWithPrefix()和scannerOpenTs();然后用返回的id迭代记录:scannerGetList()和scannerGet()
  • 获取一行记录结果:getRow()、getRows()和getRowsWithColumns()
  • 关闭一个Scanner:scannerClose()
  • 迭代结果,用于调试:iterateResults()

比如,我们想要实现分页的逻辑,可能和传统的关系型数据库操作有些不同。基于HBase表的实现是,首先打开一个Scanner实例(例如调用scannerOpen()),返回一个id,然后再使用该id,调用scannerGetList()方法(可以指定每次返回几条记录的变量nbRows的值),返回一个记录列表,反复调用该scannerGetList()方法,直到此次没有结果返回为止。后面会通过测试用例来实际体会。
现在,我们基于上抽象出来的客户端操作接口,给出一个基本的实现,代码如下所示:

001 package org.shirdrn.cloud.hbase.thrift;
002
003 import java.io.UnsupportedEncodingException;
004 import java.nio.ByteBuffer;
005 import java.util.ArrayList;
006 import java.util.HashMap;
007 import java.util.Iterator;
008 import java.util.List;
009 import java.util.Map;
010 import java.util.Map.Entry;
011
012 import org.apache.hadoop.hbase.thrift.generated.IOError;
013 import org.apache.hadoop.hbase.thrift.generated.Mutation;
014 import org.apache.hadoop.hbase.thrift.generated.TCell;
015 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
016 import org.apache.thrift.TException;
017
018
019 public class HBaseThriftClient extends AbstractHBaseThriftService {
020
021 public HBaseThriftClient() {
022 super();
023 }
024
025 public HBaseThriftClient(String host, int port) {
026 super(host, port);
027 }
028
029 @Override
030 public List<String> getTables() throws TException {
031 List<String> list = new ArrayList<String>(0);
032 for (ByteBuffer buf : client.getTableNames()) {
033 byte[] name = decode(buf);
034 list.add(new String(name));
035 }
036 return list;
037 }
038
039 static ByteBuffer wrap(String value) {
040 ByteBuffer bb = null;
041 try {
042 bb = ByteBuffer.wrap(value.getBytes(CHARSET));
043 } catch (UnsupportedEncodingException e) {
044 e.printStackTrace();
045 }
046 return bb;
047 }
048
049 protected byte[] decode(ByteBuffer buffer) {
050 byte[] bytes = new byte[buffer.limit()];
051 for (int i = 0; i < buffer.limit(); i++) {
052 bytes[i] = buffer.get();
053 }
054 return bytes;
055 }
056
057 @Override
058 public void update(String table, String rowKey, boolean writeToWal,
059 Map<String, String> fieldNameValues, Map<String, String> attributes)throws TException {
060 List<Mutation> mutations = new ArrayList<Mutation>();
061 for(Map.Entry<String, String> entry : fieldNameValues.entrySet()) {
062 mutations.add(new Mutation(false, wrap(entry.getKey()), wrap(entry.getValue()), writeToWal));
063 }
064 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
065 ByteBuffer tableName = wrap(table);
066 ByteBuffer row = wrap(rowKey);
067 client.mutateRow(tableName, row, mutations, wrappedAttributes);
068 }
069
070 @Override
071 public void update(String table, String rowKey, boolean writeToWal,
072 String fieldName, String fieldValue, Map<String, String> attributes)throws IOError, TException {
073 Map<String, String> fieldNameValues = new HashMap<String, String>();
074 fieldNameValues.put(fieldName, fieldValue);
075 update(table, rowKey, writeToWal, fieldNameValues, attributes);
076 }
077
078
079 @Override
080 public void deleteCells(String table, String rowKey, boolean writeToWal,
081 List<String> columns, Map<String, String> attributes) throws TException {
082 List<Mutation> mutations = new ArrayList<Mutation>();
083 for(String column : columns) {
084 mutations.add(new Mutation(false, wrap(column), null, writeToWal));
085 }
086 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
087 ByteBuffer tableName = wrap(table);
088 ByteBuffer row = wrap(rowKey);
089 client.mutateRow(tableName, row, mutations, wrappedAttributes);
090 }
091
092 @Override
093 public void deleteCell(String table, String rowKey, boolean writeToWal,
094 String column, Map<String, String> attributes) throws TException {
095 List<String> columns = new ArrayList<String>(1);
096 columns.add(column);
097 deleteCells(table, rowKey, writeToWal, columns, attributes);
098 }
099
100
101 @Override
102 public void deleteRow(String table, String rowKey, Map<String, String> attributes)throws TException {
103 ByteBuffer tableName = wrap(table);
104 ByteBuffer row = wrap(rowKey);
105 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
106 client.deleteAllRow(tableName, row, wrappedAttributes);
107 }
108
109
110 @Override
111 public int scannerOpen(String table, String startRow, List<String> columns,
112 Map<String, String> attributes) throws TException {
113 ByteBuffer tableName = wrap(table);
114 List<ByteBuffer> fl = encodeColumns(columns);
115 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
116 return client.scannerOpen(tableName, wrap(startRow), fl, wrappedAttributes);
117 }
118
119 @Override
120 public int scannerOpen(String table, String startRow, String stopRow, List<String> columns,
121 Map<String, String> attributes) throws TException {
122 ByteBuffer tableName = wrap(table);
123 List<ByteBuffer> fl = encodeColumns(columns);
124 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
125 return client.scannerOpenWithStop(tableName, wrap(startRow), wrap(stopRow), fl, wrappedAttributes);
126 }
127
128 @Override
129 public int scannerOpenWithPrefix(String table, String startAndPrefix, List<String> columns,
130 Map<String, String> attributes) throws TException {
131 ByteBuffer tableName = wrap(table);
132 List<ByteBuffer> fl = encodeColumns(columns);
133 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
134 return client.scannerOpenWithPrefix(tableName, wrap(startAndPrefix), fl, wrappedAttributes);
135 }
136
137 @Override
138 public int scannerOpenTs(String table, String startRow, List<String> columns,
139 long timestamp, Map<String, String> attributes) throws TException {
140 ByteBuffer tableName = wrap(table);
141 List<ByteBuffer> fl = encodeColumns(columns);
142 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
143 return client.scannerOpenTs(tableName, wrap(startRow), fl, timestamp, wrappedAttributes);
144 }
145
146 @Override
147 public int scannerOpenTs(String table, String startRow, String stopRow, List<String> columns,
148 long timestamp, Map<String, String> attributes) throws TException {
149 ByteBuffer tableName = wrap(table);
150 List<ByteBuffer> fl = encodeColumns(columns);
151 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
152 return client.scannerOpenWithStopTs(tableName, wrap(startRow), wrap(stopRow), fl, timestamp, wrappedAttributes);
153 }
154
155 @Override
156 public List<TRowResult> scannerGetList(int id, int nbRows) throws TException {
157 return client.scannerGetList(id, nbRows);
158 }
159
160 @Override
161 public List<TRowResult> scannerGet(int id) throws TException {
162 return client.scannerGetList(id, 1);
163 }
164
165 @Override
166 public List<TRowResult> getRow(String table, String row,
167 Map<String, String> attributes) throws TException {
168 ByteBuffer tableName = wrap(table);
169 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
170 return client.getRow(tableName, wrap(row), wrappedAttributes);
171 }
172
173 @Override
174 public List<TRowResult> getRows(String table, List<String> rows,
175 Map<String, String> attributes) throws TException {
176 ByteBuffer tableName = wrap(table);
177 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
178 List<ByteBuffer> wrappedRows = encodeRows(rows);
179 return client.getRows(tableName, wrappedRows, wrappedAttributes);
180 }
181
182 @Override
183 public List<TRowResult> getRowsWithColumns(String table, List<String> rows,
184 List<String> columns, Map<String, String> attributes) throws TException {
185 ByteBuffer tableName = wrap(table);
186 List<ByteBuffer> wrappedRows = encodeRows(rows);
187 List<ByteBuffer> wrappedColumns = encodeColumns(columns);
188 Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
189 return client.getRowsWithColumns(tableName, wrappedRows, wrappedColumns, wrappedAttributes);
190 }
191
192 private List<ByteBuffer> encodeColumns(List<String> columns) {
193 List<ByteBuffer> fl = new ArrayList<ByteBuffer>(0);
194 for(String column : columns) {
195 fl.add(wrap(column));
196 }
197 return fl;
198 }
199
200 private Map<ByteBuffer, ByteBuffer> encodeAttributes(Map<String, String> attributes) {
201 Map<ByteBuffer, ByteBuffer> wrappedAttributes = null;
202 if(attributes != null && !attributes.isEmpty()) {
203 wrappedAttributes = new HashMap<ByteBuffer, ByteBuffer>(1);
204 for(Map.Entry<String, String> entry : attributes.entrySet()) {
205 wrappedAttributes.put(wrap(entry.getKey()), wrap(entry.getValue()));
206 }
207 }
208 return wrappedAttributes;
209 }
210
211 private List<ByteBuffer> encodeRows(List<String> rows) {
212 List<ByteBuffer> list = new ArrayList<ByteBuffer>(0);
213 for(String row : rows) {
214 list.add(wrap(row));
215 }
216 return list;
217 }
218
219 @Override
220 public void iterateResults(TRowResult result) {
221 Iterator<Entry<ByteBuffer, TCell>> iter = result.columns.entrySet().iterator();
222 System.out.println("RowKey=" + new String(result.getRow()));
223 while (iter.hasNext()) {
224 Entry<ByteBuffer, TCell> entry = iter.next();
225 System.out.println("\tCol=" + new String(decode(entry.getKey())) + ", Value=" + new String(entry.getValue().getValue()));
226 }
227 }
228
229 @Override
230 public void scannerClose(int id) throws TException {
231 client.scannerClose(id);
232 }
233 }

上面代码,给出了基本的实现,接着我们给出测试用例,调用我们实现的客户端操作,与HBase表进行交互。实现的测试用例类如下所示:

001 package org.shirdrn.cloud.hbase.thrift;
002
003 import java.io.UnsupportedEncodingException;
004 import java.nio.ByteBuffer;
005 import java.text.DecimalFormat;
006 import java.util.ArrayList;
007 import java.util.HashMap;
008 import java.util.List;
009 import java.util.Map;
010 import java.util.Random;
011
012 import org.apache.hadoop.hbase.thrift.generated.IOError;
013 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
014 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
015 import org.apache.thrift.TException;
016 import org.apache.thrift.transport.TTransportException;
017
018 public class Test {
019
020 private static final String CHARSET = "UTF-8";
021 static DecimalFormat formatter = new DecimalFormat("00");
022 private final AbstractHBaseThriftService client;
023
024 public Test(String host, int port) {
025 client = new HBaseThriftClient(host, port);
026 try {
027 client.open();
028 } catch (TTransportException e) {
029 e.printStackTrace();
030 }
031 }
032
033 public Test() {
034 this("master", 9090);
035 }
036
037 static String randomlyBirthday() {
038 Random r = new Random();
039 int year = 1900 + r.nextInt(100);
040 int month = 1 + r.nextInt(12);
041 int date = 1 + r.nextInt(30);
042 return String.valueOf(year + "-" + formatter.format(month) + "-" + formatter.format(date));
043 }
044
045 static String randomlyGender() {
046 Random r = new Random();
047 int flag = r.nextInt(2);
048 return flag == 0 ? "M" : "F";
049 }
050
051 static String randomlyUserType() {
052 Random r = new Random();
053 int flag = 1 + r.nextInt(10);
054 return String.valueOf(flag);
055 }
056
057 static ByteBuffer wrap(String value) {
058 ByteBuffer bb = null;
059 try {
060 bb = ByteBuffer.wrap(value.getBytes(CHARSET));
061 } catch (UnsupportedEncodingException e) {
062 e.printStackTrace();
063 }
064 return bb;
065 }
066
067 static DecimalFormat rowKeyFormatter = new DecimalFormat("0000");
068
069 public void caseForUpdate() throws TException {
070 boolean writeToWal = false;
071 Map<String, String> attributes = new HashMap<String, String>(0);
072 String table = setTable();
073 // put kv pairs
074 for (long i = 0; i < 10000; i++) {
075 String rowKey = rowKeyFormatter.format(i);
076 Map<String, String> fieldNameValues = new HashMap<String, String>();
077 fieldNameValues.put("info:birthday", randomlyBirthday());
078 fieldNameValues.put("info:user_type", randomlyUserType());
079 fieldNameValues.put("info:gender", randomlyGender());
080 client.update(table, rowKey, writeToWal, fieldNameValues, attributes);
081 }
082 }
083
084 public void caseForDeleteCells() throws TException {
085 boolean writeToWal = false;
086 Map<String, String> attributes = new HashMap<String, String>(0);
087 String table = setTable();
088 // put kv pairs
089 for (long i = 5; i < 10; i++) {
090 String rowKey = rowKeyFormatter.format(i);
091 List<String> columns = new ArrayList<String>(0);
092 columns.add("info:birthday");
093 client.deleteCells(table, rowKey, writeToWal, columns, attributes);
094 }
095 }
096
097 private String setTable() {
098 String table = "test_info";
099 return table;
100 }
101
102 public void caseForDeleteRow() throws TException {
103 Map<String, String> attributes = new HashMap<String, String>(0);
104 String table = setTable();
105 // delete rows
106 for (long i = 5; i < 10; i++) {
107 String rowKey = rowKeyFormatter.format(i);
108 client.deleteRow(table, rowKey, attributes);
109 }
110 }
111
112 public void caseForScan() throws TException {
113 Map<String, String> attributes = new HashMap<String, String>(0);
114 String table = setTable();
115 String startRow = "0005";
116 String stopRow = "0015";
117 List<String> columns = new ArrayList<String>(0);
118 columns.add("info:birthday");
119 int id = client.scannerOpen(table, startRow, stopRow, columns, attributes);
120 int nbRows = 2;
121 List<TRowResult> results = client.scannerGetList(id, nbRows);
122 while(results != null && !results.isEmpty()) {
123 for(TRowResult result : results) {
124 client.iterateResults(result);
125 }
126 results = client.scannerGetList(id, nbRows);
127 }
128 client.scannerClose(id);
129 }
130
131 public void caseForGet() throws TException {
132 Map<String, String> attributes = new HashMap<String, String>(0);
133 String table = setTable();
134 List<String> rows = new ArrayList<String>(0);
135 rows.add("0009");
136 rows.add("0098");
137 rows.add("0999");
138 List<String> columns = new ArrayList<String>(0);
139 columns.add("info:birthday");
140 columns.add("info:gender");
141 List<TRowResult> results = client.getRowsWithColumns(table, rows, columns, attributes);
142 for(TRowResult result : results) {
143 client.iterateResults(result);
144 }
145 }
146
147 public static void main(String[] args)
148 throws IOError, IllegalArgument, TException, UnsupportedEncodingException {
149 Test test = new Test();
150 // test.caseForUpdate(); // insert or update rows/cells
151 // test.caseForDelete(); // delete cells
152 // test.caseForDeleteRow(); // delete rows
153 // test.caseForScan(); // scan rows
154 test.caseForGet(); // get rows
155 }
156
157 }

上面的测试可以实现操作Hbase表数据。另外,在生成的Thrift客户端代码中,org.apache.hadoop.hbase.thrift.generated.Hbase.Iface中给出了全部的服务接口,可以根据需要来选择,客户端org.apache.hadoop.hbase.thrift.generated.Hbase.Client实现了与Thrift交互的一些逻辑的处理,通过该类对象可以代理HBase提供的Thrift服务。