Monthly Archives: September 2013

An introduction to Apache Mesos

What is Apache Mesos you ask? Well, from their web site at http://mesos.apache.org/ it is

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, MPI, Hypertable, Spark, and other applications on a dynamically shared pool of nodes.

What exactly does that mean? Well, to me, it means that I can deploy applications and certain frameworks into a cluster of resources (VM’s) and it will look after my resource allocation for me. For example, if you have a particularly computationally intensive task, like a Hadoop Map/Reduce job that may require additional resources, it can allocate more to it temporarily so that your job finishes in the most efficient way possible.

Installing Mesos is not too difficult, but the docs are a little sparse. I set up a vanilla Ubuntu-12.04 LTS VM on my localhost to experiment. I will assume the same for you, so you may not need all the packages etc. but please do bear with me.

Disclaimers aside, let’s get cracking on the installation!

The first step will be to download the Apache Mesos tarball distribution. I used wget on my VM to grab the link from a local mirror, but you should check which mirror to use via ttp://www.apache.org/dyn/mirrors/mirrors.cgi/mesos/0.13.0/

Next up, you will need to prepare your machine to compile and host Mesos. On Ubuntu, you need the following packages:

apt-get update && apt-get install python-dev libunwind7-dev libcppunit-dev openjdk-7-jdk autoconf autopoint libltdl-dev libtool autotools-dev make gawk g++ curl libcurl4-openssl-dev

Once that is complete, unpack your Mesos tarball with:

tar xzvf mesos-0.13.0.tar.gz

Change directory to the newly created mesos directory and run the configure script

cd mesos-0.13.0
./configure

All the configure options should check out, but if not, make sure that you have all the relevant packages installed beforehand!
Next, in the same directory, compile the code

make

Depending on how many resources you gave your VM, this could take a while, so go get some milk and cookies…

Once the make is done, you should check everything with

make check

This will run a bunch of unit tests and checks that will ensure that there are no surprises later.

After you have done this, you can also set up a small Mesos cluster and run a job on it as follows:
In your Mesos directory, use

bin/mesos-master.sh

to start the master server. Make a note of the IP and port that the master is running on, so that you can use the web based UI later on!

Open up a browser and point it to http://yourhostname.com:5050 or http://yourIP:5050. As an example, mine is running on http://192.168.10.56:5050

Go back to your VM’s terminal and type

bin/mesos-slave.sh --master=192.168.10.56:5050

and refresh your browser. You should now notice that a slave has been added to your cluster!

Run the C++ test framework (a sample that just runs five tasks on the cluster) using

src/test-framework --master=localhost:5050 

It should successfully exit after running five tasks.
You can also try the example python or Java frameworks, with commands like the following:

    src/examples/java/test-framework 192.168.10.56:5050
    src/examples/python/test-framework 192.168.10.56:5050

If all of that is running OK, you have successfully completed the Mesos setup. Congratulations!

Follow @ApacheMesos on twitter as well as @DaveLester for more information and goodness!

Cloudera Hadoop and HBase example code

Earlier, I posted about connecting to Hadoop via a Java based client. I decided to try out Cloudera’s offering (http://www.cloudera.com) where they provide a manager app as well as an easy way to set up Hadoop for both Enterprise (includes support) and a free version.

I downloaded the free version of the Cloudera Manager, and quickly set up a 4 node Hadoop cluster using their tools. I must say, that as far as easy to use goes, they have done an awesome job!

Once everything was up and running, I wanted to create a Java based remote client to talk to my shiny new cluster. This was pretty simple, once I had figured out to use the Cloudera Maven repositories and which versions and combinations of packages to use.

I will save you the trouble and post the results here.

Versions in use are latest

hadoop version
Hadoop 2.0.0-cdh4.4.0
Subversion file:///var/lib/jenkins/workspace/generic-package-ubuntu64-12-04/CDH4.4.0-Packaging-Hadoop-2013-09-03_18-48-35/hadoop-2.0.0+1475-1.cdh4.4.0.p0.23~precise/src/hadoop-common-project/hadoop-common -r c0eba6cd38c984557e96a16ccd7356b7de835e79
Compiled by jenkins on Tue Sep  3 19:33:54 PDT 2013
From source with checksum ac7e170aa709b3ace13dc5f775487180
This command was run using /usr/lib/hadoop/hadoop-common-2.0.0-cdh4.4.0.jar

With this information, we now know which versions of the packages to use from the Cloudera Maven repository.

<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.0.0-cdh4.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.0.0-cdh4.4.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase</artifactId>
			<version>0.94.6-cdh4.4.0</version>
		</dependency>
	</dependencies>

I also make sure to add the Cloudera Maven repository in my pom.xml file

<repositories>
		<repository>
			<id>cloudera</id>
			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
		</repository>
	</repositories>

That is pretty much the hard part. If you don’t need HBase, then leave it off, the “hadoop-client” should do most of what you want.

Introducing Apache Whirr

I was a little disturbed to see that so few people had even heard of Apache Whirr, so I decided to write this as a bit of an introduction.

Apache Whirr is very basically a set of libraries to manage all of your cloud installations and set ups. It takes all the pain out of deploying clusters and apps to any one of the major cloud providers, including Rackspace and Amazon Elastic compute clouds.

The trick is that it provides a common API across all the platforms in a way that almost anyone can use. You may be thinking “Apache = Java”, but there are SDK’s and API’s in a few languages, including Java, C++ and Python. Whirr started out as a set of BASH scripts to manage Hadoop clusters, but quickly become a bigger project, which we now know as Whirr.

To get started with Whirr, you will need to download it from a local mirror. http://www.apache.org/dyn/closer.cgi/whirr/ should get you there. You could also grab the source at https://cwiki.apache.org/confluence/display/WHIRR/How+To+Contribute#HowToContribute-Gettingthesourcecode and then build it in Eclipse as per the instructions. I would suggest grabbing Whirr-0.8.2 (about 26MB).

You will also need Java 6 (or later, I use openjdk-7-jdk), an SSH client, and an account with either Rackspace or Amazon EC2.

I usually put stuff like this in my /opt/ directory, so once you have extracted the archive and ensured the dependencies are met, you can check everything is working with:

bin/whirr version

which should print out

Apache Whirr 0.8.2
jclouds 1.5.8

The next step is to set up your crdentials. First off copy the sample credentials file to your home directory, and then modify it to suit you.

mkdir -p ~/.whirr/
/opt/whirr-0.8.2/conf# cp credentials.sample ~/.whirr/credentials

I prefer using Rackspace (OK Rackspace, you may now send me gifts), so my config looks something like this

PROVIDER=cloudservers-us
IDENTITY=yourUsername
CREDENTIAL=someLongApiKey

Now to define what you want to deploy. The canonical examples are Hadoop cluster and Mahout cluster, so here we will start with a Hadoop cluster and let you figure out the rest!
In your /home/ directory, create a properties file. It doesn’t really matter too much what you call it, but we will call it hadoop.properties.

As you would have seen from the config credentials file, properties files override the base config, so you can actually do quite a lot in userland there. Let’s set up for our Hadoop cluster now:

whirr.cluster-name=testhadoopcluster 
whirr.instance-templates=1 hadoop-jobtracker+hadoop-namenode,1 hadoop-datanode+hadoop-tasktracker 
whirr.provider=cloudservers-us
whirr.private-key-file=${sys:user.home}/.ssh/id_rsa
whirr.public-key-file=${sys:user.home}/.ssh/id_rsa.pub

You need to now generate an SSH keypair with

ssh-keygen -t rsa -P ''

Note: You should use only RSA SSH keys, since DSA keys are not accepted yet.

OK, so now comes the fun part – setting up our Hadoop cluster!

/opt/whirr-0.8.2/bin# ./whirr launch-cluster --config /home/paul/hadoop.properties

You should start seeing some output almost immediately that looks like

Bootstrapping cluster
Configuring template for bootstrap-hadoop-datanode_hadoop-tasktracker
Starting 1 node(s) with roles [hadoop-datanode, hadoop-tasktracker]
Configuring template for bootstrap-hadoop-jobtracker_hadoop-namenode
Starting 1 node(s) with roles [hadoop-jobtracker, hadoop-namenode]
Starting 1 node(s) with roles [hadoop-datanode, hadoop-tasktracker]
Starting 1 node(s) with roles [hadoop-jobtracker, hadoop-namenode]

if something goes wrong you will get something along the lines of

Unable to start the cluster. Terminating all nodes.
Finished running destroy phase scripts on all cluster instances
Destroying testhadoopcluster cluster
Cluster testhadoopcluster destroyed

in which case you will need to review all your settings and try again… Hint: Usually this error indicates some sort of connectivity issues.
Whirr is unable to connect over SSH to the machines, assumes the bootstrap process failed and tries to start new ones.

For security reasons, traffic from the network your client is running on is proxied through the master node of the cluster using an SSH tunnel (a SOCKS proxy on port 6666).
A script to launch the proxy is created when you launch the cluster, and may be found in ~/.whirr/. Run it as a follows (in a new terminal window):

. ~/.whirr/myhadoopcluster/hadoop-proxy.sh

You will also need to configure your browser to use the proxy, to view all the pages served by your cluster. When you want to kill the proxy, just Ctrl-C it to kill it.

You can now run a map/reduce job on your shiny new cluster
After you launch a cluster, a hadoop-site.xml file is created in the directory ~/.whirr/. You can use this to connect to the cluster by setting the HADOOP_CONF_DIR environment variable. (It is also possible to set the configuration file to use by passing it as a -conf option to Hadoop Tools):

export HADOOP_CONF_DIR=~/.whirr/myhadoopcluster

You should now be able to browse HDFS:

hadoop fs -ls /

Note that the version of Hadoop installed locally should match the version installed on the cluster. You should also make sure that the HADOOP_HOME environment variable is set.

Here’s how you can run a MapReduce job:

hadoop fs -mkdir input 
hadoop fs -put $HADOOP_HOME/LICENSE.txt input 
hadoop jar $HADOOP_HOME/hadoop-*examples*.jar wordcount input output 
hadoop fs -cat output/part-* | head

Once you are done, you can then simply destroy your cluster with:

bin/whirr destroy-cluster --config hadoop.properties

Note of warning! This will destroy ALL data on your cluster!

Once your cluster is destroyed, don’t forget to kill your proxy too…

That is about it as an intro to Apache Whirr. Very easy to use and very powerful!

Hadoop HDFS “abstraction” class in Java

Recently I found myself working with Hadoop’s HDFS, which is the Hadoop file system in a Java project. The docs for working with Hadoop (especially HDFS) are somewhat sparse, as I guess most folks prefer to keep their code pretty secretive. Well, seeing as though the following will never give anyone a business advantage over anyone else, but will probably spare a few folks some sleepless nights, here you go!

First off, you need to start a new Maven project. I just used the simple archetype as I was just messing about really. You then will need to add the following dependencies to your POM.xml file (Oh, using Maven 3…)

<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-core</artifactId>
			<version>1.1.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-tools</artifactId>
			<version>1.2.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>0.95.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.3.2</version>
			<exclusions>
				<exclusion>
					<groupId>com.sun.jmx</groupId>
					<artifactId>jmxri</artifactId>
				</exclusion>
				<exclusion>
					<groupId>com.sun.jdmk</groupId>
					<artifactId>jmxtools</artifactId>
				</exclusion>
				<exclusion>
					<groupId>javax.jms</groupId>
					<artifactId>jms</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.0.3</version>
		</dependency>
	</dependencies>

You will notice a couple of excludes there. They are really important as some of that code is dead and discontinued.

Next up, we need a class to interact with our hdfs store.

public class HbaseExample {
	private static Configuration hBaseConfig = null;
	private static String hbaseHost = "ubuntu.local";
	private static String zookeeperHost = "ubuntu.local";

	/**
	 * Initialization
	 */
	static {
		hBaseConfig = HBaseConfiguration.create();
		hBaseConfig.setInt("timeout", 120000);
		hBaseConfig.set("hbase.master", "*" + hbaseHost + ":9000*");
		hBaseConfig.set("hbase.zookeeper.quorum", zookeeperHost);
		hBaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
        new HTablePool(hBaseConfig, 10);
	}

You will see that all is pretty standard and that I have defined a few static properties to hold the values of the zookeeper and Hadoop hosts. Note that these will work off IP addresses, but Hadoop most definitely prefers FQDN’s. The final line in the method is a sticky one. Basically is you do not create a connection pool, your code can take up to 5 seconds to re-initialize the database, which is obviously not cool.

After that, there is nothing too tricksy. I will paste a copy of the whole class next so that you canĀ  check your imports etc as well as have a look at full CRUD on an HDFS store:

package za.co.paulscott.hdfstest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseExample {
	private static Configuration hBaseConfig = null;
	private static String hbaseHost = "ubuntu.local";
	private static String zookeeperHost = "ubuntu.local";

	/**
	 * Initialization
	 */
	static {
		hBaseConfig = HBaseConfiguration.create();
		hBaseConfig.setInt("timeout", 120000);
		hBaseConfig.set("hbase.master", "*" + hbaseHost + ":9000*");
		hBaseConfig.set("hbase.zookeeper.quorum", zookeeperHost);
		hBaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
        new HTablePool(hBaseConfig, 10);
	}

	/**
	 * Create a table
	 */
	public static void creatTable(String tableName, String[] familys)
			throws Exception {
		HBaseAdmin admin = new HBaseAdmin(hBaseConfig);
		boolean exists = false;
		try {
			exists = admin.tableExists(tableName);
		} catch (NullPointerException e) {
			exists = false;
		}
		if (exists = true) {
			System.out.println("table already exists!");
		} else {
			HTableDescriptor tableDesc = new HTableDescriptor(tableName);
			for (int i = 0; i < familys.length; i++) {
				tableDesc.addFamily(new HColumnDescriptor(familys[i]));
			}
			admin.createTable(tableDesc);
			System.out.println("create table " + tableName + " ok.");
		}
	}

	/**
	 * Delete a table
	 */
	public static void deleteTable(String tableName) throws Exception {
		try {
			HBaseAdmin admin = new HBaseAdmin(hBaseConfig);
			admin.disableTable(tableName);
			admin.deleteTable(tableName);
			System.out.println("delete table " + tableName + " ok.");
		} catch (MasterNotRunningException e) {
			e.printStackTrace();
		} catch (ZooKeeperConnectionException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Put (or insert) a row
	 */
	public static void addRecord(String tableName, String rowKey,
			String family, String qualifier, String value) throws Exception {
		//System.out.print("Adding record to table:  " + tableName);
		try {
			HTable table = new HTable(hBaseConfig, tableName);
			Put put = new Put(Bytes.toBytes(rowKey));
			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),
					Bytes.toBytes(value));
			table.put(put);
			System.out.println("insert recored " + rowKey + " to table "
					+ tableName + " ok.");
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Delete a row
	 */
	public static void delRecord(String tableName, String rowKey)
			throws IOException {
		HTable table = new HTable(hBaseConfig, tableName);
		List list = new ArrayList();
		Delete del = new Delete(rowKey.getBytes());
		list.add(del);
		table.delete(list);
		System.out.println("del recored " + rowKey + " ok.");
	}

	/**
	 * Get a row
	 */
	public static void getOneRecord(String tableName, String rowKey)
			throws IOException {
		HTable table = new HTable(hBaseConfig, tableName);
		Get get = new Get(rowKey.getBytes());
		Result rs = table.get(get);
		for (KeyValue kv : rs.raw()) {
			System.out.print(new String(kv.getRow()) + " ");
			System.out.print(new String(kv.getFamily()) + ":");
			System.out.print(new String(kv.getQualifier()) + " ");
			System.out.print(kv.getTimestamp() + " ");
			System.out.println(new String(kv.getValue()));
		}
	}

	/**
	 * Scan (or list) a table
	 */
	public static void getAllRecord(String tableName) {
		try {
			HTable table = new HTable(hBaseConfig, tableName);
			Scan s = new Scan();
			ResultScanner ss = table.getScanner(s);
			for (Result r : ss) {
				for (KeyValue kv : r.raw()) {
					System.out.print(new String(kv.getRow()) + " ");
					System.out.print(new String(kv.getFamily()) + ":");
					System.out.print(new String(kv.getQualifier()) + " ");
					System.out.print(kv.getTimestamp() + " ");
					System.out.println(new String(kv.getValue()));
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] agrs) {
		try {
			String tablename = "scores";
			String[] familys = { "grade", "course" };
			HbaseExample.creatTable(tablename, familys);

			// add record paul
			HbaseExample.addRecord(tablename, "paul", "grade", "", "5");
			HbaseExample.addRecord(tablename, "paul", "course", "", "90");
			HbaseExample.addRecord(tablename, "paul", "course", "math", "97");
			HbaseExample.addRecord(tablename, "paul", "course", "art", "87");
			// add record caz
			HbaseExample.addRecord(tablename, "caz", "grade", "", "4");
			HbaseExample.addRecord(tablename, "caz", "course", "math", "89");

			System.out.println("===========get one record========");
			HbaseExample.getOneRecord(tablename, "paul");

			System.out.println("===========show all record========");
			HbaseExample.getAllRecord(tablename);

			System.out.println("===========del one record========");
			HbaseExample.delRecord(tablename, "caz");
			HbaseExample.getAllRecord(tablename);

			System.out.println("===========show all records========");
			HbaseExample.getAllRecord(tablename);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

As you can see, there is also a Main method, so to execute this in Eclipse, you can simply run as… Java application, or compile is as a jar (using goal jar:jar) and run it.

NOTE: Make sure that you change the host(s)!

NOTE2: This is an example to connect to a remote hadoop cluster!

NOTE3: This is one too many notes, so go now and have fun!

MongoDB for everyone – Part 8 – dbrefs and denormalization

Most of the MongoDB drivers (as well as the database engine itself) supports a concept called “dbrefs”. Basically, what a dbref is, is a reference to another document in a different collection. A dbref will have two pieces of information attached to it, an ID of the document that it references, as well as the collection that it is in.

Many ORM’s and abstraction layers that you may use within your own projects will create dbrefs by default, some may be configured not to. If at all possible, I would suggest avoiding dbrefs as much as possible, and rather use denormalization techniques to make use of larger documents in your collections. Remember, that you are now working with documents and not relationships, so that in bred fear of data duplication should be relaxed somewhat!

That being said, however, you should model your data to your application usage. Take, for instance a blog, with comments. You could have a collection for users, comments, and posts with many dbrefs, or you could stick the entire post into a single document (comments included!) That may be a bit extreme though, so consider using adding a user name field for any commenters inside of your blog document, as opposed to just a user ID. That way, and without adding very much data to your blog document, you avoid a lookup on a dbref and make your application faster! Win!

Thinking denormalised may take some getting used to, but it works well with document centric data stores. Just make sure you don’t hit those 16MB document size limits!