Wednesday Oct 06, 2010

Hadoop Lab Now Available

I was really surprised at the turn-out for JavaOne this year. Judging by the packed halls and empty goodie carts, I think the conference organizers were a little surprised as well. Excellent! Well done.

As you may have noticed, I always seem to have my fingers in the JavaOne hands-on labs pie. This year my contribution was to bring Cloudera into the fold to run a Hadoop lab. Needless to say, that generated a lot of interest. Well before the conference, the slot we had for the lab was booked solid. Taking that as a sign, I had the conference organizers give us a second slot on Monday for the lab. That slot was also booked solid before the conference even began. Unfortunately, however, that Monday lab slot ended up getting canceled for <INSERT OFFICIAL REASON HERE>. As a concession to the folks who didn't get to attend because of the cancellation, I got the conference organizers to give me permission to have Cloudera host the lab materials from their site before it's available from the official Oracle JavaOne site.

You can find the semi-official JavaOne Hands-on Lab S314413: Extracting Real Value from Your Data With Apache Hadoop here under the training section of the Cloudera site. The file is not yet linked from anywhere but here, but they're working on it.

If you download the zip file, in it you will find a lab workbook. At the back of the workbook, you will find an appendix that describes how to set up your own lab environment. The lab was written for Solaris 11 Express and NetBeans, but the OS and IDE really play little role in the lab. If you refuse to see the light and accept Solaris as the one true OS, you can still do the lab on some other OS with some other IDE (but it won't be as satisfying).

The lab did run in its originally assigned slot at JavaOne, and it was really successful. Turnout was good and the comments were great! I've already incorporated lots of great feedback from that session into the lab materials that Cloudera is now hosting, but I'm always happy to hear any additional comments and/or feedback. Happy coding!

Thursday Jan 14, 2010

Leading the Herd

Wow! There's been a surprising amount of noise lately about the Apache Hadoop integration with Sun Grid Engine 6.2u5. Since folks seem to be interested, I figure it's a good place to start on my feature deep-dive posts that I promised.

I'm going to assume that you already understand the many virtues of Hadoop. (And if you don't, Cloudera will be happy to tell you all about it.) Instead, to set the stage, let's talk about what Hadoop doesn't do so well. I currently see two important deficiencies in Hadoop: it doesn't play well with others, and it has no real accounting framework. Pretty much every customer I've seen running Hadoop does it on a dedicated cluster. Why? Because the tasktrackers assume they own the machines on which they run. If there's anything on the cluster other than Hadoop, it's in direct competition with Hadoop. That wouldn't be such a big deal if Hadoop clusters didn't tend to be so huge. Folks are dedicating hundreds, thousands, or even tens of thousands of machines to their Hadoop applications. That's a lot of hardware to be walled off for a single purpose. Are those machines really being used? You may not be able to tell. You can monitor state in the moment, and you can grep through log files to find out about past usage (Gah!), but there's no historical accounting capability there.

Coincidentally, these two issues are things that most workload managers (like Sun Grid Engine) do really well. And I'm not the first to notice that. The Hadoop on Demand project, which is included in the Hadoop distribution, was an attempt to integrate Hadoop first with Condor and then with Torque, probably for those same reasons. It's easy enough to have the Hadoop framework started on demand by a workload manager. The problem is that most workload managers know nothing about HDFS data block locality. When a typical workload manager assigns a set of nodes to a Hadoop application, it's picking the nodes it thinks are best, generally the ones with the least load, not the ones with the data. The result is that most of your data is going to have to be shipped to the machines where the tasks are executing. Since the great innovation of Map/Reduce is that we move the execution to the data instead of vice versa, bringing a workload manager into the picture shoots Hadoop in the foot.

Enter Sun Grid Engine 6.2 update 5. One of the main strengths of the Sun Grid Engine software is its ability to model just about anything as a resource (called a "complex" in SGE terms) and then use those resources to make scheduling decisions. Using that capability, we've modeled HDFS rack location and the locally present HDFS data blocks as resources. We then taught SGE how to translate an HDFS path into a set of racks and blocks. Finally, we taught SGE how to start up a set of jobtrackers and tasktrackers, and voila! Ze Hadoop integration iz born. More detail? Glad you asked.

The Gory Details

The Hadoop integration consists of two halves. The first half is a parallel environment that can start up a jobtracker and tasktrackers on the nodes assigned by the scheduler. (HDFS is assumed to already be running on all the nodes in the cluster.) In the end, it's really no different than an MPI integration (especially MPICH2). The second half is called Herd. It's the part that talks to HDFS about blocks and racks, and it has two parts. The first part is the load sensor that reports on the block and rack data for each execution host (hosts that are running the SGE execution daemon). The second part is a Job Submission Verifier that translates requests for HDSF data paths into requests for racks and blocks.

The process for running a Hadoop job as an SGE job looks something like this:

  1. At regular intervals, all of the execution hosts report their load. This includes the rack and block information collected by the Herd load sensors.
  2. User submits a job, e.g. echo $HADOOP_HOME/hadoop --config \\$TMP/conf jar $HADOOP_HOME/hadoop-\*-examples.jar grep input output SGE | qsub -jsv jsv.sh -l hdfs_input=/user/dant/input -pe hadoop 128
  3. The jsv.sh script starts the Herd JSV that talks to the HDFS namenode. It removes the hard hdfs_input request and replaces it with a soft request for hdfs_primary_rack, hdfs_secondary_rack, and a set of hdfs_blk<n><n> resources. (A hard request is one that is required for a job to run. A soft request is one that is desired but not required.) The primary rack resource lists the racks where most of the data live. The secondary rack resource lists all of the racks where any of the data lives. Because the HDFS data block id space is so large, we aggregate blocks into 256 chunks by their first two hex digits.
  4. The scheduler will do its best to satisfy the job's soft resource requests when assigning it nodes, 128 in this example. It's probably not going to be able to assign the perfect set of nodes for the desired data, but it should get pretty close.
  5. After the scheduler assigns hosts, the qmaster will send the job (everything between the "echo" and the "|") to one of the assigned execution hosts.
  6. Before the job is started on that host, the Hadoop parallel environment kicks in. It starts a tasktracker remotely on every node assigned to the job (all 128 in this example) and a single jobtracker locally. An important point here is that the tasktrackers are all started through SGE rather than through ssh. Because SGE starts the tasktrackers, it is able to track their resource usage and clean up after them later.
  7. After the Hadoop PE has done its thing, the job itself will run. Notice in the example that I told it to look in $TMP/conf for its configuration. The Hadoop PE sets up a conf directory for the job that points to the jobtracker it set up. That conf directory gets put in the job's temp directory, which is exported into the job's environment as $TMP.
  8. After the job completes, the Hadoop parallel environment takes down the jobtracker and tasktrackers.
  9. Information about the job, how it ran, and how it completed is logged by SGE into the accounting files.

Since everyone loves pretty pictures, here's the diagram of the process:

The Hadoop integration will attempt to start a jobtracker (and corresponding tasktrackers) per job. For most uses, that should be perfectly fine. If, however, you wan to use the HoD allocate/deallocate model, you can do that, too. Instead of giving SGE a Hadoop job to run, give it something that blocks (like "sleep 10000000"). When the job is started, the address of its jobtracker is added to its job context. Just query the job context, grab the address, and build your own conf directory to talk to the jobtracker. You can then submit multiple Hadoop jobs within the same SGE job.

Hopefully this gives you a clear picture of how the Hadoop integration works. You can find more information in the docs. I think it's a testament to the flexibility of Sun Grid Engine that the integration did not require and changes to the product. All I did was add in some components through the hooks that SGE already provides. One more thing I should also point out. This integration is in the Sun Grid Engine product, but not in the Grid Engine courtesy binaries that we just announced.

Monday Nov 30, 2009

Beta Testing the Sun Grid Engine Hadoop Integration

In case you haven't heard yet, the upcoming release of Sun Grid Engine will include an integration with Apache Hadoop that will allow Map/Reduce jobs to be submitted to a Sun Grid Engine cluster while minding HDFS data locality. The 6.2u5 release will be out by the end of the year, but it's currently in the beta testing phase. And that's where you come in.

I'm looking for some volunteers to test the integration. To that end, this blog post will provide instructions for how to get the beta code checked out and built. The Hadoop integration is actually only loosely dependent on the Sun Grid Engine software itself. While it's planned to be part of u5, the integration should be usable with a cluster as old to 6.2u2, although I would really recommend at least 6.2u4.

In a nutshell, the integration consists of two components. The first is the hadoop parallel environment that allows Map/Reduce jobs to be started as parallel jobs in a Sun Grid Engine cluster. The second is the integration with HDFS, called Herd, that makes the Sun Grid Engine scheduler aware of the locations of the HDFS data blocks. Herd has two parts. One part is a load sensor that runs on every execution machine and reports the HDFS blocks on that machine. The other part is a JSV that translates HDFS data paths included in the job submission into a list of HDFS blocks needed by the job.

How to check out the source code

  1. Make sure you have a functional CVS client.
  2. cvs -d :pserver:guest@cvs.sunsource.net:/cvs login
  3. cvs -d :pserver:guest@cvs.sunsource.net:/cvs checkout gridengine/source

Technically, the above will only check out the source directory, but for the Hadoop integration, that's all you need. The Hadoop integration lives in three places. First, the scripts live in source/dist/hadoop. Second, the Herd code lives at source/libs/herd. Third, the JSV Java language binding upon which the Herd code depends lives at source/libs/jjsv.

How to build the source code

  1. Make sure you're using at least Ant 1.6.3 and the Java Standard Edition 6 platform.
  2. Copy the source/build.properties file to build_private.properties.
  3. Edit the build_private.properties file to include the corrects paths for the Java Standard Edition 6 platform and junit 3.8.
  4. Change to the gridengine/source directory.
  5. ant jjsv
  6. ant herd

After the above steps, you will find herd.jar at source/CLASSES/herd/herd.jar and JSV.jar at source/CLASSES/jjsv/JSV.jar.

How to install the integration

  1. Copy herd.jar and JSV.jar to the $SGE_ROOT/lib directory.
  2. Copy the source/dist/hadoop directory to somewhere accessible by all the execution nodes.

How to configure the integration

  1. Get HDFS up and running on your cluster. The most useful configuration will be to have every execution host be a data node, and to only have execution hosts as data nodes. Also, because of the way Hadoop does authentication and authorization, you'll need to make sure that either HDFS has security disabled or that root and the SGE admin user are in the HDFS super user group.
  2. Copy your Hadoop configuration directory to <hadoop>/conf, where <hadoop> is the directory that you copied in step 2 of How to install the integration.
  3. Delete the <hadoop>/conf/mapred.xml, <hadoop>/conf/masters, and <hadoop>/conf/slaves files.
  4. Edit the <hadoop>/env.sh file to contain the paths to the Java platform, the Hadoop install directory, and the Hadoop configuration directory you just created (<hadoop>/conf).
  5. Change into the <hadoop> directory.
  6. ./setup.pl -i
  7. Add the hadoop parallel environment to one or more of your queues

The setup.pl script will install the hadoop parallel environment and the complexes needed by Herd. It will also start the Herd load sensor on all the execution hosts. At this point, you should be ready to go. Wait for a couple of minutes to give all of the execution hosts a chance to start running the load sensor and reporting values. You can run qhost -F hdfs_primary_rack to check that the load sensor is functioning correctly. Every execution host should report an hdfs_primary_rack value. If one or more machines have not reported a value within about five minutes, see the troubleshooting section below.

Using the integration

To submit a job that uses the hadoop parallel environment, use -pe hadoop <n>, where <n> is the number of nodes. The hadoop parallel environment uses an allocation rule that guarantees that no more than one task tracker per job will run on a single host. To tell the scheduler what data the job needs, request the hfds_input resource with a value of the HDFS path to the job's data. The data path must be an absolute path.

Here's an example. Say I want to use the grep example to find occurrences of the word 'Sun' in a series of documents. First, I'd copy those document into HDFS to /user/dant/sungrep (e.g. bin/hadoop fs -copyFromLocal ~/Documents/\* /user/dant/sungrep). I would then submit the job with echo `pwd`/bin/hadoop --config \\$TMPDIR/conf jar `pwd`/hadoop-0.20.1-examples.jar grep sungrep output Sun | qsub -pe hadoop 16 -l hdfs_input=/user/dant/sungrep -jsv <hadoop>/jsv.sh.

Let's look at that in a little more detail. First, we're echoing the Hadoop command and piping it to qsub. Why? Well, when the integration runs, it creates a conf directory in the job's temp directory that is properly set up for the assigned hosts. Until the job runs, though, we don't know where the temp directory is. We get it's path from the $TMPDIR variable once the job starts. We therefore need to wrap the Hadoop command in a script. We could either write a script that contains the command, or we could let qsub write one for us by piping the command to qsub's stdin. Note that we used --config \\$TMPDIR/conf in the command. The backslash is important because it prevents the shell on the submission host from interpreting the $TMPDIR variable.

Next, the qsub command uses -pe hadoop 16 to request 16 nodes. When this job is run, a job tracker will be started on the "master" host, and a task tracker will be started on each of the 16 assigned nodes. The master host is the host where the parallel job's master task is started. After the job tracker and task trackers are running, the grep job itself will be started, launched from the master host. The hadoop PE is a tight integration with an allocation rule of "1". In order to run a Hadoop job on top of SGE, you must use the PE, even if it's only a single-node job.

The qsub command also uses -l hdfs_input=/user/dant/sungrep -jsv <hadoop>/jsv.sh. The -l resource request tells SGE what data will be used by the job. It must be specified as an absolute path. The -jsv switch actually translates the resource request for hdfs_input into requests for specific racks and blocks. Without the -jsv switch, the job would never run because no node offers the hdfs_input resource. (No node offers it because it doesn't really exist. It's just a placeholder for the JSV to replace with rack and block requests. In programming terms, it's a reference injection point.) The resource request and JSV can be left out of the qsub command. If they're left out, the scheduler will not take the HDFS data locality into consideration when scheduling the job.

You can also use the Hadoop integration to set up the job tracker and task trackers and then submit jobs to them directly. Instead of echoing the Hadoop command to qsub, echo sleep 300000 instead. That will cause the job tracker and task trackers to be set up, but instead of running a job, it will just sleep for a long time. You can then run qstat -j <jobid> | grep context to show the job's context. One of the context variables will be the URL for the job tracker. Using that URL, you can set up a Hadoop configuration to talk to the job tracker so that you can submit jobs to it from the command line.

It is also highly recommended that the use of the Hadoop integration be coupled with exclusive host access. The Hadoop task trackers all assume that they have exclusive access to their nodes. If you don't use exclusive host access with the Hadoop integration, you'll end up oversubscribing the nodes.

Troubleshooting

Hopefully everything will work perfectly the first time. If for some reason it doesn't, here are some tips to help diagnose the problem:

The execds aren't reporting any hdfs resources, i.e. qhost -F | grep hdfs shows nothing.
Sometimes it takes several minutes for the nodes to start reporting the hdfs resources. If after several minutes there's still nothing, pick an execution host and check if the load sensor is running: jps -l. Look for com.sun.grid.herd.HerdJsv. Note that it might be running as root or as the SGE admin user. Also note that jps may only show you your own processes. If the load sensor isn't running, look for log files in /tmp. They will be called sge_hadoop_loadsensor.out and sge_hadoop_<n>.log. The .out file is the output from starting the load sensor. The .log files are the logging output from the load sensor. One will be the log file from the load sensor framework, and the other will be the log file from the Herd load sensor. (You can control the logging verbosity from the logging.properties file in the <hadoop> directory.) The most common problem is that the load sensor is started as the user root on most platforms (for a reason I don't yet understand), but that HDFS usually is not. With HDFS, the user who started it is the super user, and only the super user can query the kind of information that the load sensor needs. As stated in the configuration section, you must either disable HDFS security or set a super user group that contains root (and probably the SGE admin user). The next most common problems are that the path to Hadoop or the Java platform is not correct in env.sh or that the conf directory contains bad configuration information. You can test the load sensor manually by changing into the <hadoop> directory and running loadsensor.sh. If it works, it will "hang". Press enter, and it should spit out the hdfs resource values for that host. Type QUIT and press enter to exit the load sensor.
The job tracker and/or task trackers aren't starting.
The first place to look is the PE output and error files. The output from starting the job tracker should be found there. The next place to look is the log files. The log files are written where the Hadoop configuration says to put them. Make sure that wherever that is, all the users have access to it from all the nodes. Inability to write the log file is a common reason why the job tracker and/or task trackers won't start. In addition to the usual Hadoop log files, the integration also write a hadoop-<adminuser>-sge-<hostname>.log file. That file contains the output from starting the task trackers from the master host. Another common reason for the job tracker and/or task trackers not to start is that the path to the Java platform isn't correctly configured in the hadoop-env.sh file.

Thursday Jul 30, 2009

Sun HPC Software Workshop '09 -- Early Bird's Almost Over!

Just wanted to remind everyone that the early bird registration for the Sun HPC Software Workshop '09, Sept 7-10 in Regensburg, Germany, ends tomorrow (31 July 2009). It's your last chance to sign up at the discounted rate. After tomorrow, you will still be able to register, but the cost of registration will be higher.

In a nutshell, the Sun HPC Software Workshop '09 is a combination of our annual Grid Engine Workshop, a European edition of the popular Lustre Users Group meeting, and a conference on developing applications and services for HPC and cloud environments. The Workshop lasts three days, with a presentation track representing each of these topics. One the day before the main Workshop starts, we're also holding deeper technology seminars: a Lustre Deep Dive, a Grid Engine admin training, and a class on parallel application development taught by Ruud van der Pas. The Workshop and the preceding seminars are an excellent opportunity to learn more about these technologies and connect with the product engineers, partners, and other community members.

There is an open Call for Presentations for the Workshop, but it also closes tomorrow. If you're interested in proposing a talk for the Workshop (and getting a discounted registration fee if it's accepted), send a title, duration, and brief summary to the email address listed on the Agenda page. But, hurry. We'll be making our final decisions and notifying the speakers soon.

I look forward to seeing you there!

Thursday May 22, 2008

Hadoop + Sun Grid Engine

If you're interested in integrating Hadoop with Grid Engine, check out this post from one of our fourth-line support engineers.

(Hadoop is a map/reduce framework in use by all the big web-scale players. It allows you to parallelize tasks across a compute/data grid, such as data mining.)

About

templedf

Search

Archives
« July 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  
       
Today