Monday, February 20, 2012

Getting started with HBase Coprocoessors - EndPoints

EndPoints allow for deploying custom operations which are not provided by the core HBase and which is specific to an application. AggregateImplementation is an EndPoint which comes with HBase.

In HBase the table is split across multiple regions based on row key ranges. The jar file containing the EndPoint is deployed on all the region servers. The client ultimately needs to specify the regions on which the EndPoints need to be executed. Since, the client doesn't deal directly with the regions, the regions are indirectly specified by row key or row key ranges.

If a row key is specified then the EndPoint is executed on the region to which the row key belongs to. Alternatively, if a row key range is specified then the EndPoint is applied to all the regions on which the row key range belongs. The client needs to iterate the results from all the regions and consolidate them to get the final result.


HBase EndPoints are very similar to MapReduce. The EndPoint execution is similar to the map task and happens on the Region Server and close to the data. The client code iterates the results from all the regions and consolidates them which is similar to the reduce task. Since in most of the cases the EndPoint execution happens close to the data EndPoints are efficient.

1) Compile the following code and prepare a jar file out of it.
package coprocessor;
import java.io.IOException;

import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;

public interface RowCountProtocol extends CoprocessorProtocol {
   
    long getRowCount() throws IOException;

    long getRowCount(Filter filter) throws IOException;

    long getKeyValueCount() throws IOException;

}
package coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;

public class RowCountEndpoint extends BaseEndpointCoprocessor implements
        RowCountProtocol {
    private long getCount(Filter filter, boolean countKeyValues)
            throws IOException {
        Scan scan = new Scan();
        scan.setMaxVersions(1);
        if (filter != null) {
            scan.setFilter(filter);
        }
        RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment();
        // use an internal scanner to perform scanning.
        InternalScanner scanner = environment.getRegion().getScanner(scan);
        int result = 0;
        try {
            List<KeyValue> curVals = new ArrayList<KeyValue>();
            boolean done = false;
            do {
                curVals.clear();
                done = scanner.next(curVals);
                result += countKeyValues ? curVals.size() : 1;
            } while (done);
        } finally {
            scanner.close();
        }
        return result;
    }

    @Override
    public long getRowCount() throws IOException {
        return getRowCount(new FirstKeyOnlyFilter());
    }

    @Override
    public long getRowCount(Filter filter) throws IOException {
        return getCount(filter, false);
    }

    @Override
    public long getKeyValueCount() throws IOException {
        return getCount(null, true);
    }
}

2) Modify the hbase-env.sh file on all the Region Server to include the jar file created earlier containing the coprocessor code.
export HBASE_CLASSPATH="/home/praveensripati/Installations/hbase-0.92.0/lib/coprocessor.jar"
3) Modify the hbase-site.xml to include the class name of the Endpoint on all the Region Servers.
    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>coprocessor.RowCountEndpoint</value>
    </property>
4) Restart the HBase cluster.

5) Create a 'testtable' table and populate the data in it. The table will be deployed in 5 regions based on row keys. The Endpoint will execute on muliple regions based on the input row keys and send the result to the client.
create 'testtable', 'colfam1', { SPLITS => ['row-300', 'row-500', 'row-700' , 'row-900'] }
for i in '0'..'9' do for j in '0'..'9' do \
for k in '0'..'9' do put 'testtable', "row-#{i}#{j}#{k}", \
"colfam1:#{j}#{k}", "#{j}#{k}" end end end
6) Execute the following on the client to get the count of the the number of rows in the 'testtable'
package coprocessor;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.util.Bytes;

public class EndpointExample {
    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "testtable");
        try {
            Map<byte[], Long> results = table.coprocessorExec(
                    RowCountProtocol.class, null, null,
                    new Batch.Call<RowCountProtocol, Long>() {
                        @Override
                        public Long call(RowCountProtocol counter)
                                throws IOException {
                            return counter.getRowCount();
                        }
                    });

            long total = 0;
            for (Map.Entry<byte[], Long> entry : results.entrySet()) {
                total += entry.getValue().longValue();
                System.out.println("Region: " + Bytes.toString(entry.getKey())
                        + ", Count: " + entry.getValue());
            }
            System.out.println("Total Count: " + total);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
    }
}
7) Here is output of the above program.
Region: testtable,,1329653922153.d88dbec04c8b3093bd256a1e70c5bfe6., Count: 300
Region: testtable,row-300,1329653922157.2431482c120bb0c5939688ef764e3137., Count: 200
Region: testtable,row-500,1329653922157.c8843b18b612d4d8632135d7b8aff0c3., Count: 200
Region: testtable,row-700,1329653922157.abc2ceba898d196334d9561d8eddc431., Count: 200
Region: testtable,row-900,1329653922157.42cfb7cf277782c5cbeba1cc9d3874af., Count: 100
Total Count: 1000
Here is the output from the HBase shell.
hbase(main):006:0> count 'testtable'
Current count: 1000, row: row-999                                                                                          
1000 row(s) in 0.4060 seconds
Note that the output from the EndPoint and the HBase shell is the same.

The above example of EndPoint tries to convey a simple scenario, but more complex scenarios can be built. Also observers which was discussed in the earlier blog can be integrated to build even more complex scenarios.

Edit (10th February, 2013) : Coprocessors can also be deployed dynamically without restaring the cluster to avoid any downtime. Check the `Coprocessor Deployment` section here for more details.

Sunday, February 19, 2012

Getting started with HBase Coprocoessors - Observers

HBase 0.92 release provides coprocessors functionality which includes observers (similar to triggers for certain events) and endpoints (similar to stored procedures to be invoked from the client).

Observers can be at the region, master or at the WAL (Write Ahead Log) level. In this blog entry we will create a Region Observer.  Once a Region Observer has been created, it can specified in the hbase-default.xml which applies to all the regions and the tables in it or else the Region Observer can be specified on a table in which case it applies only to that table. Make sure that HBase is restarted after following the below steps for the coprocessor to be executed.

Excellent introduction to the coprocessors is available here.

1) Here is the code for the coprocessor which is triggered when the clients HTable.get() method is executed. The method preGet method is overridden to check if rowkey equals to @@@GETTIME@@@ in the HTable.get() and populates the result with an additional row containing the current time in ms.
package coprocessor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;

public class RegionObserverExample extends BaseRegionObserver {
    public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");

    @Override
    public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e,
            final Get get, final List<KeyValue> results) throws IOException {
        if (Bytes.equals(get.getRow(), FIXED_ROW)) {
            KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW,
                    Bytes.toBytes(System.currentTimeMillis()));
            results.add(kv);
        }
    }
}
2) Compile the above class and prepare a jar file to be copied on all the Region Servers.

3) Modify the hbase-env.sh file on all the Region Server to include the jar file created earlier containing the coprocessor code.
export HBASE_CLASSPATH="/home/praveensripati/Installations/hbase-0.92.0/lib/coprocessor.jar"
4) Modify the hbase-site.xml to include the class name of the Region Observer on all the Region Servers.
    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>coprocessor.RegionObserverExample</value>
    </property>
5) Restart the HBase cluster.

6) Run the below command to create a 'testtable' table.
create 'testtable', 'colfam1' 
Run the below command to retrieve a row with a rowkey @@@GETTIME@@@ which triggers the above coprocessor to add the current time in ms to the response. Since there is no row with a rowkey @@@GETTIME@@@, only a single row is returned which is what was created in the RegionObserverExample.preGet() method.
hbase(main):002:0> get 'testtable','@@@GETTIME@@@'
COLUMN                                 CELL
 @@@GETTIME@@@:@@@GETTIME@@@           timestamp=9223372036854775807, value=\x00\x00\x015\x938\xD2i

Observers provide a look of hooks to HBase to add trigger like functionality. In the above example we have a Region Observer with a trigger to execute before the call to HTable.get() is executed. In the next blog, we will see how to create a endpoint which is similar to Stored Procedures which can be invoked from the client.

Edit (10th February, 2013) : Coprocessors can also be deployed dynamically without restaring the cluster to avoid any downtime. Check the `Coprocessor Deployment` section here for more details.

Friday, February 17, 2012

Getting started with HBase - Part 2


Once HBase has been installed as shown in the previous blog, now it's time to create some tables and populate data in them. HBase provides a shell for executing the DDL (Data Definition Language) and DML (Data Manipulation Language) commands which can be invoked using the following command

$HBASE_HOME/bin/hbase shell

A HBase cluster will have a single master and multiple region servers on different nodes. Each region server will host multiple regions and each region will host a complete table or a part of the table.


1) For creating a table 'testtable' with a column family 'colfam1'
create 'testtable', 'colfam1'
create 'testtable', 'colfam1', { SPLITS => ['row-300', 'row-500', 'row-700' , 'row-900'] }
The first command creates a table in single region, when the 'testtable1' table size crosses a certain threshold the table is split into two regions.

While the second command splits the table into five regions based on the row keys. Each of the 5 regions might be hosted on different nodes, which will lead to better throughput/latency and also better utilization of the cluster.

2) To get the list of all the table created in the HBase cluster.
list 'testtable'
3) To insert data into the 'testtable' table.
put 'testtable', 'myrow-1', 'colfam1:q1', 'value-1'
put 'testtable', 'myrow-2', 'colfam1:q2', 'value-2'
put 'testtable', 'myrow-2', 'colfam1:q3', 'value-3'
The HBase Shell is (J)Ruby’s IRB with some HBase-related commands added. Anything that can be done in IRB, can also be done in the HBase Shell. The below command will insert 1K rows into the 'testtable' table.
for i in '0'..'9' do for j in '0'..'9' do \
for k in '0'..'9' do put 'testtable', "row-#{i}#{j}#{k}", \
"colfam1:#{j}#{k}", "#{j}#{k}" end end end
4) For getting data from the 'testtable' table
get 'testtable', 'myrow-1'
scan 'testtable'
5) For deleting data from the 'testtable' table.
delete 'testtable', 'myrow-2', 'colfam1:q2'
6) For deleting the table.
disable 'testtable'
drop 'testtable'
In the coming blog, we will go through how to create coprocessors which were introduced in HBase 0.92 release. Coprocessors have observers (which are similar to triggers) and end points (which are similar to stored procedures) in RDBMS.

As I mention again and again, HBase is not a solution to solve every problem.

Getting started with HBase - Part 1


After setting the Hadoop (both HDFS and MapReduce) on Single or a Multiple nodes. Now it's time to install HBase. HBase provides real-time/random read/write access on top of HDFS. HBase runs a Master on one of the nodes (similar to NameNode/JobTracker) and RegionServer (similar to DataNode/TaskTracker) on the slave nodes.

HDFS and HBase daemons can co-exist on the same nodes or exist on different nodes. In this blog entry, we will assume that Hadoop and HBase daemons co-exist. The master node will have the HDFS NameNode, HBase Master and the ZooKeeper daemons, while the slave nodes will have the HDFS DataNode and the HBase RegionServer daemons.


- Follow the instructions to setup Hadoop on all the nodes (single and multiple node). HBase depends on HDFS, so starting the HDFS Daemons (NameNode and DataNode) is enough. The MapReduce Daemons (JobTracker and TaskTracker) are not required unless a MapReduce job is run on HBase.

- Download and extract the hbase-0.92.0.tar.gz on all the nodes in the $HBASE_HOME folder.

- Make sure that Hadoop version installed in the above step matches with the version of the Hadoop jars in the $HBASE_HOME/lib folder. If there is a version mismatch, copy the Hadoop jar files to the $HBASE_HOME/lib folder.

- Add the below properties in the hbase-site.xml on all the master/slave nodes in $HBASE_HOME/conf folder.
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://master:9000/hbase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2222</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>ubuntu-host</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/home/praveensripati/Installations/hbase-0.92.0/tmp</value>
    </property>
</configuration>
- Set export JAVA_HOME=/usr/lib/jvm/jdk1.6.0_27 in $HBASE_HOME/conf/hbase-env.sh file on the master and on all the slaves. The Java location should be changed appropriately.

- Add the host name of the slave machine in the $HBASE_HOME/conf/regionservers file on the master as
slave1
slave2
- The following should be included in the /etc/hosts file on the master and on all the slaves (change the ip address appropriately).
127.0.0.1       localhost
192.168.56.1    master
192.168.56.101  slave1
192.168.56.102  slave2
- Start HDFS as $HADOOP_HOME/bin/start-dfs.sh and HBase as $HBASE_HOME/bin/start-hbase.sh. Check the Hadoop and HBase log files for any errors.

- Check the WebInterfaces for the HBase master (http://master:60010/master-status) and the Region Servers (http://slave1:60030/rs-status and http://slave2:60030/rs-status).

The master web page (http://master:60010/master-status) shows the list of tables (-ROOT- and .META. by default) and the Region Servers in HBase.


The Region Server web page (http://slave1:60030/rs-status and http://slave2:60030/rs-status) shows the list of tables hosted in it.



Now that HBase has been setup properly, in the next blog we will go through how to create tables, insert data in HBase.

Friday, February 10, 2012

Apache Giraph 0.1 released

As I mention again and again, Hadoop is not for solving every thing. But, Hadoop is acting like a Kernel on which a lot of things are getting built (some of HDFS, some for MR and some for both HDFS and MR). One of it is Giraph which is still in incubator and uses Hadoop MapReduce to do graph processing. Here is a nice introduction to Apache Giraph from LinkedIn. The blog also mentions why not to process graphs on MapReduce directly and use Giraph at a very high level.

http://engineering.linkedin.com/open-source/apache-giraph-framework-large-scale-graph-processing-hadoop-reaches-01-milestone

The 0.1 release binaries are not available in the mirrors, but the source code is. The Giraph home has instructions on how to build from the source and test it. Would recommend to give it a shot.

Lately I had been spending good amount of my time on HBase which is largely dependent on HDFS to store the data and can easily used as a source/sink for MR jobs. I would be following with a series of blogs on the pros and cons, getting started and some of the new features in HBase.

Thursday, February 2, 2012

How to know if an Apache project is active or not?


As with any software (open source or proprietary) active development ceases after a period of time. This may be due to lack of intrest in the developers/users of that particular software or due to availability of a better software.

Same is the case with Apache also. All the projects start at incubator and based on the progress they make and intrest shown by the community will get promoted to the main. But, before using a particular software or getting involved in it's development, it's important if the development of the software is happening actively and also the bugs are getting fixed on a regular basis.

Here are a couple of ways to make sure that the project is active

- Check the mail groups if the user and the dev groups are active.

- Check when was the last, the code has been updated in SVN (1, 2).

- Check the open JIRAs and how quickly they are getting closed.

Apache has got a lot of interesting projects covering a wide area of interests and the code is open to everyone, so it's easy to get involved with Apache projects. Go ahead and get involved in one which is of interest.