Monday, February 20, 2012

Getting started with HBase Coprocoessors - EndPoints

EndPoints allow for deploying custom operations which are not provided by the core HBase and which is specific to an application. AggregateImplementation is an EndPoint which comes with HBase.

In HBase the table is split across multiple regions based on row key ranges. The jar file containing the EndPoint is deployed on all the region servers. The client ultimately needs to specify the regions on which the EndPoints need to be executed. Since, the client doesn't deal directly with the regions, the regions are indirectly specified by row key or row key ranges.

If a row key is specified then the EndPoint is executed on the region to which the row key belongs to. Alternatively, if a row key range is specified then the EndPoint is applied to all the regions on which the row key range belongs. The client needs to iterate the results from all the regions and consolidate them to get the final result.


HBase EndPoints are very similar to MapReduce. The EndPoint execution is similar to the map task and happens on the Region Server and close to the data. The client code iterates the results from all the regions and consolidates them which is similar to the reduce task. Since in most of the cases the EndPoint execution happens close to the data EndPoints are efficient.

1) Compile the following code and prepare a jar file out of it.
package coprocessor;
import java.io.IOException;

import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;

public interface RowCountProtocol extends CoprocessorProtocol {
   
    long getRowCount() throws IOException;

    long getRowCount(Filter filter) throws IOException;

    long getKeyValueCount() throws IOException;

}
package coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;

public class RowCountEndpoint extends BaseEndpointCoprocessor implements
        RowCountProtocol {
    private long getCount(Filter filter, boolean countKeyValues)
            throws IOException {
        Scan scan = new Scan();
        scan.setMaxVersions(1);
        if (filter != null) {
            scan.setFilter(filter);
        }
        RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment();
        // use an internal scanner to perform scanning.
        InternalScanner scanner = environment.getRegion().getScanner(scan);
        int result = 0;
        try {
            List<KeyValue> curVals = new ArrayList<KeyValue>();
            boolean done = false;
            do {
                curVals.clear();
                done = scanner.next(curVals);
                result += countKeyValues ? curVals.size() : 1;
            } while (done);
        } finally {
            scanner.close();
        }
        return result;
    }

    @Override
    public long getRowCount() throws IOException {
        return getRowCount(new FirstKeyOnlyFilter());
    }

    @Override
    public long getRowCount(Filter filter) throws IOException {
        return getCount(filter, false);
    }

    @Override
    public long getKeyValueCount() throws IOException {
        return getCount(null, true);
    }
}

2) Modify the hbase-env.sh file on all the Region Server to include the jar file created earlier containing the coprocessor code.
export HBASE_CLASSPATH="/home/praveensripati/Installations/hbase-0.92.0/lib/coprocessor.jar"
3) Modify the hbase-site.xml to include the class name of the Endpoint on all the Region Servers.
    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>coprocessor.RowCountEndpoint</value>
    </property>
4) Restart the HBase cluster.

5) Create a 'testtable' table and populate the data in it. The table will be deployed in 5 regions based on row keys. The Endpoint will execute on muliple regions based on the input row keys and send the result to the client.
create 'testtable', 'colfam1', { SPLITS => ['row-300', 'row-500', 'row-700' , 'row-900'] }
for i in '0'..'9' do for j in '0'..'9' do \
for k in '0'..'9' do put 'testtable', "row-#{i}#{j}#{k}", \
"colfam1:#{j}#{k}", "#{j}#{k}" end end end
6) Execute the following on the client to get the count of the the number of rows in the 'testtable'
package coprocessor;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.util.Bytes;

public class EndpointExample {
    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "testtable");
        try {
            Map<byte[], Long> results = table.coprocessorExec(
                    RowCountProtocol.class, null, null,
                    new Batch.Call<RowCountProtocol, Long>() {
                        @Override
                        public Long call(RowCountProtocol counter)
                                throws IOException {
                            return counter.getRowCount();
                        }
                    });

            long total = 0;
            for (Map.Entry<byte[], Long> entry : results.entrySet()) {
                total += entry.getValue().longValue();
                System.out.println("Region: " + Bytes.toString(entry.getKey())
                        + ", Count: " + entry.getValue());
            }
            System.out.println("Total Count: " + total);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
    }
}
7) Here is output of the above program.
Region: testtable,,1329653922153.d88dbec04c8b3093bd256a1e70c5bfe6., Count: 300
Region: testtable,row-300,1329653922157.2431482c120bb0c5939688ef764e3137., Count: 200
Region: testtable,row-500,1329653922157.c8843b18b612d4d8632135d7b8aff0c3., Count: 200
Region: testtable,row-700,1329653922157.abc2ceba898d196334d9561d8eddc431., Count: 200
Region: testtable,row-900,1329653922157.42cfb7cf277782c5cbeba1cc9d3874af., Count: 100
Total Count: 1000
Here is the output from the HBase shell.
hbase(main):006:0> count 'testtable'
Current count: 1000, row: row-999                                                                                          
1000 row(s) in 0.4060 seconds
Note that the output from the EndPoint and the HBase shell is the same.

The above example of EndPoint tries to convey a simple scenario, but more complex scenarios can be built. Also observers which was discussed in the earlier blog can be integrated to build even more complex scenarios.

Edit (10th February, 2013) : Coprocessors can also be deployed dynamically without restaring the cluster to avoid any downtime. Check the `Coprocessor Deployment` section here for more details.

1 comment:

  1. I am getting org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No matching handler for protocol RowCountProtocol in region regionSplit,,1392800220854.3390b20ce66ca67b090400c6cea9b333.

    ReplyDelete