2015年11月21日土曜日

HBase リージョン分割した後にクライアントが最新行キーを持つリージョンからしかデータが取れなくなった時の対処法

HBase リージョン分割した後にクライアントが最新行キーを
持つリージョンからしかデータが取れなくなった時の対処法

HBaseからとにかく全件を取得したいクライアントで起こる事象。
例えばHBaseをキューのように溜まってるデータを取ったら
消すようにして使用している場合。

具体的には org.apache.hadoop.hbase.client.Scan を何も設定せず
そのまま使用していて、HBaseサーバ側でリージョン分割が
発生した場合に現象が発生する。

これはリージョンサーバの情報をクライアントが覚えておき、
無駄な通信をしないようにする仕様から来ていると思われる。
分散システム前提のシステムをキューとして使用しようとすること
自体が設計に反しているが、それを敢えてやる場合の話である。
(上司が勘違いされている場合は、部下が必死で「HBaseを
 キュートして使うのはちょっと違いますよ。」と説得しましょう。
 HBaseはビッグデータの最終的な入れ物。前段でキューが必要なら
 RDBを使った方がよほど良い。)

1番いいのはリージョン分割しないようにHBaseの分割サイズを
調整しHBaseに入るデータ量を上回るように、HBaseのパラメータおよび
取得クライアント側の性能を設計することだ。

HBaseリージョンサーバの分割設計は、MemStoreのフラッシュサイズと
リージョンサーバの最大サイズの設計からなる。

フラッシュサイズは
1. 固定値(hbase.hregion.memstore.flush.size)と
2. 計算値(hbase-env.shの HBASE_REGIONSERVER_OPTSで設定するヒープサイズ ×
 hbase.regionserver.global.memstore.upperLimit and lowerLimit)
で調整できる。
(フラッシュとは、メモリからディスクへのフラッシュのこと)
また、リージョン分割のサイズは hbase.hregion.max.filesize で調整できる。


とは言ってもHBaseの仕組み上、リージョン分割は念頭に置かなければならない。
リージョン分割ありきで考えるならば、クライアントはいつ分割が発生しても
いいように備えなければならない。

単純な順かつリソース消費量の順に案を挙げていくと、
1. 毎回HTableから作り直す。
2. 毎回リージョン情報をクリアする。(HTable#clearRegionCache())
3. 開始行キーを先頭にリセットする。(Scan#setStartRow())

となるだろうか。(実際の検証はしてないのであくまで推測。)

ということで3. の開始行キーを設定する方法を紹介する。
// HTable htable = ... 
Scan scan = new Scan();

// use addFamily() when you want to set row family only, 
// but it's better to specify a column, because you're going to get 
// all of data. this column would be like a second key.
// take care of your machine's memory.
scan.addColumn(Bytes.toBytes(FAMILY_XXX), Bytes.toBytes(COL_XXX)); 

// set 0x00(null) so that HBase will scan from the first region server.
scan.setStartRow(new byte[]{ (byte)0x00} ); 

HashMap dataMap = new HashMap<>();
try(ResultScanner result = htable.getScanner(scan);) {
    for(Result item : result){
        // Row ID
        byte[] rowId = result.getRow();
         // Column data example.
        byte[] data = result.getValue(Bytes.toBytes(FAMILY_XXX), Bytes.toBytes(COL_XXX));
        // for example, get small sortable value like date_and_time value.
        dataMap.put(rowId, data);
    }
}

// then sort the sortable value using Entry<>, sort of value.
ArrayList> dataEntries = new ArrayList<>(dataMap.entrySet());
Collections.sort(dataEntries, (o1, o2) -> {
    // define how to sort between byte[].
    // for example parse byte[] to Long when the data is UNIX epoch time. (8bytes)
    Long t1 = ...   // convert with ByteBuffer or something.
    return t1.compareTo(t2);
});

// loop with sorted key list.
for( Entry  dataEntry : dataEntries){
     // then get all of columns with row id.
     // this is safer way because you can manage when you should stop the function or not.
     // you can choose resource or spec or ..
    Get get = new Get(dataEntry.getKey());
    Result result = htable.get(get);
    byte[] content = result.getValue(xxx);
    ....
}