Recently I found myself working with Hadoop’s HDFS, which is the Hadoop file system in a Java project. The docs for working with Hadoop (especially HDFS) are somewhat sparse, as I guess most folks prefer to keep their code pretty secretive. Well, seeing as though the following will never give anyone a business advantage over anyone else, but will probably spare a few folks some sleepless nights, here you go!
First off, you need to start a new Maven project. I just used the simple archetype as I was just messing about really. You then will need to add the following dependencies to your POM.xml file (Oh, using Maven 3…)
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-tools</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.95.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.2</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
You will notice a couple of excludes there. They are really important as some of that code is dead and discontinued.
Next up, we need a class to interact with our hdfs store.
public class HbaseExample {
private static Configuration hBaseConfig = null;
private static String hbaseHost = "ubuntu.local";
private static String zookeeperHost = "ubuntu.local";
/**
* Initialization
*/
static {
hBaseConfig = HBaseConfiguration.create();
hBaseConfig.setInt("timeout", 120000);
hBaseConfig.set("hbase.master", "*" + hbaseHost + ":9000*");
hBaseConfig.set("hbase.zookeeper.quorum", zookeeperHost);
hBaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
new HTablePool(hBaseConfig, 10);
}
You will see that all is pretty standard and that I have defined a few static properties to hold the values of the zookeeper and Hadoop hosts. Note that these will work off IP addresses, but Hadoop most definitely prefers FQDN’s. The final line in the method is a sticky one. Basically is you do not create a connection pool, your code can take up to 5 seconds to re-initialize the database, which is obviously not cool.
After that, there is nothing too tricksy. I will paste a copy of the whole class next so that you can check your imports etc as well as have a look at full CRUD on an HDFS store:
package za.co.paulscott.hdfstest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseExample {
private static Configuration hBaseConfig = null;
private static String hbaseHost = "ubuntu.local";
private static String zookeeperHost = "ubuntu.local";
/**
* Initialization
*/
static {
hBaseConfig = HBaseConfiguration.create();
hBaseConfig.setInt("timeout", 120000);
hBaseConfig.set("hbase.master", "*" + hbaseHost + ":9000*");
hBaseConfig.set("hbase.zookeeper.quorum", zookeeperHost);
hBaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
new HTablePool(hBaseConfig, 10);
}
/**
* Create a table
*/
public static void creatTable(String tableName, String[] familys)
throws Exception {
HBaseAdmin admin = new HBaseAdmin(hBaseConfig);
boolean exists = false;
try {
exists = admin.tableExists(tableName);
} catch (NullPointerException e) {
exists = false;
}
if (exists = true) {
System.out.println("table already exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for (int i = 0; i < familys.length; i++) {
tableDesc.addFamily(new HColumnDescriptor(familys[i]));
}
admin.createTable(tableDesc);
System.out.println("create table " + tableName + " ok.");
}
}
/**
* Delete a table
*/
public static void deleteTable(String tableName) throws Exception {
try {
HBaseAdmin admin = new HBaseAdmin(hBaseConfig);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("delete table " + tableName + " ok.");
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
}
}
/**
* Put (or insert) a row
*/
public static void addRecord(String tableName, String rowKey,
String family, String qualifier, String value) throws Exception {
//System.out.print("Adding record to table: " + tableName);
try {
HTable table = new HTable(hBaseConfig, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),
Bytes.toBytes(value));
table.put(put);
System.out.println("insert recored " + rowKey + " to table "
+ tableName + " ok.");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Delete a row
*/
public static void delRecord(String tableName, String rowKey)
throws IOException {
HTable table = new HTable(hBaseConfig, tableName);
List list = new ArrayList();
Delete del = new Delete(rowKey.getBytes());
list.add(del);
table.delete(list);
System.out.println("del recored " + rowKey + " ok.");
}
/**
* Get a row
*/
public static void getOneRecord(String tableName, String rowKey)
throws IOException {
HTable table = new HTable(hBaseConfig, tableName);
Get get = new Get(rowKey.getBytes());
Result rs = table.get(get);
for (KeyValue kv : rs.raw()) {
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
/**
* Scan (or list) a table
*/
public static void getAllRecord(String tableName) {
try {
HTable table = new HTable(hBaseConfig, tableName);
Scan s = new Scan();
ResultScanner ss = table.getScanner(s);
for (Result r : ss) {
for (KeyValue kv : r.raw()) {
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] agrs) {
try {
String tablename = "scores";
String[] familys = { "grade", "course" };
HbaseExample.creatTable(tablename, familys);
// add record paul
HbaseExample.addRecord(tablename, "paul", "grade", "", "5");
HbaseExample.addRecord(tablename, "paul", "course", "", "90");
HbaseExample.addRecord(tablename, "paul", "course", "math", "97");
HbaseExample.addRecord(tablename, "paul", "course", "art", "87");
// add record caz
HbaseExample.addRecord(tablename, "caz", "grade", "", "4");
HbaseExample.addRecord(tablename, "caz", "course", "math", "89");
System.out.println("===========get one record========");
HbaseExample.getOneRecord(tablename, "paul");
System.out.println("===========show all record========");
HbaseExample.getAllRecord(tablename);
System.out.println("===========del one record========");
HbaseExample.delRecord(tablename, "caz");
HbaseExample.getAllRecord(tablename);
System.out.println("===========show all records========");
HbaseExample.getAllRecord(tablename);
} catch (Exception e) {
e.printStackTrace();
}
}
}
As you can see, there is also a Main method, so to execute this in Eclipse, you can simply run as… Java application, or compile is as a jar (using goal jar:jar) and run it.
NOTE: Make sure that you change the host(s)!
NOTE2: This is an example to connect to a remote hadoop cluster!
NOTE3: This is one too many notes, so go now and have fun!