Creating Hadoop pe under SGE

Sun Grid Engine is a DRM software with a flexible support for various parallel computing frameworks (Eg. MPI/PVM).

We could configure Grid Engine to provide a hadoop cluster environment where a map-reduce jobs can run. ( Hadoop is a framework for running applications on large clusters built of commodity hardware. )

Here we would try to setup a hadoop pe in SGE to run the jobs using hadoop-streaming.

Prerequisite: Refer hadoop wiki to setup a hadoop setup, just to make sure the setup works.

We create a hadoop pe for Sun Grid Engine that has:

  • DFS (HDFS) running on atleast 2 nodes (dfs.replication)
  • master and slaves are choosen by grid engine
  • master runs the NameNode, SecondaryNameNode,(DFS) JobTracker (MapRed)
  • slave runs DataNode (DFS), TaskTracker (MapRed)
  • each slave's TaskTracker runs (N/no.of slaves) tasks simultaneously which could be either of N mapper tasks and N reducer tasks on the whole (N is the no. of slots the user requests for on the qsub -pe )
The only place where we need to modify these are in $HADOOP_HOME/conf/hadoop-site.xml







Apart from this, we need to add master and slaves in conf/masters and conf/slaves respectively, which SGE will help us.

sgetest@master $ cat conf/masters
sgetest@master $ cat conf/slaves

Configuring Grid Engine:

Assume that we have a SGE setup,

sgetest@master $ qstat -f
queuename                      qtype used/tot. load_avg arch          states
all.q@master         BIPC  0/5       0.02     sol-x86
all.q@slave          BIPC  0/5       0.10     sol-x86

I have configured 5 slots to all.q (hence each queue instance can run 5 tasks simultaneously).
Now we need to create a hadoop pe,

sgetest@master $ qconf -sp hadoop
pe_name           hadoop
slots             5
user_lists        NONE
xuser_lists       NONE
start_proc_args   /export/home/sgetest/test/hadoop/
stop_proc_args    /export/home/sgetest/test/hadoop/
allocation_rule   $round_robin
control_slaves    FALSE
job_is_first_task TRUE

The and are responsible for start/stop of the hdfs and map-reduce daemons. The and would look something like:

NOTE: We depend on jps supplied by jdk to check if all the daemons are started, there could be a better way to acheive this!!
This is required so as to make sure the pe-start exits only after the daemons are up and running, we don't want to start the jobs before the daemons start and start complaining about unavailability of daemons!!!


## we get HADOOP_HOME from jobs env

### Create master and slave files
master=`head -1 $PE_HOSTFILE | cut -f1 -d' '`
echo $master > conf/masters

##TODO: Do we need master to run the tasks???
tail +2 $PE_HOSTFILE | cut -f1 -d' ' > conf/slaves
slave_cnt=`cat conf/slaves|wc -l`

### Create hadoop-site.xml
### We add the following vars are in conf/hadoop-site-template.xml
### so that it can be customized!
#HMTASKS                     : No. of Map tasks
#HRTASKS                     : No. of Reduce tasks
#HTPN                        : Simulataneos tasks per slave
#HTMPDIR                     : Hadoop temporary dir, we let hadoop use the SGE tmp dir

##In ideal cases mtasks=10xslaves, rtasks=2xslaves, tpn=2perslave
tpn=`expr $NSLOTS \\/ $slave_cnt`
tmp=`echo $TMPDIR | sed 's/\\//\\\\\\\\\\//g'`
sed -e "s/HADDOP_MASTER_HOST/$master/g" -e "s/HDFSPORT/54310/g" \\
    -e "s/HMPRPORT/54311/g" -e "s/HMTASKS/$NSLOTS/g" \\
    -e "s/HRTASKS/$NSLOTS/g" -e "s/HTPN/$tpn/g" -e "s/HTMPDIR/$tmp/g" \\
    conf/hadoop-site-template.xml > conf/hadoop-site.xml

echo 'Y' > /tmp/yes
### Format namenode
bin/hadoop namenode -format < /tmp/yes

### Start hdfs daemons

### Is there a better way to do this, may be use jstatd and use jps <slave>
### wait for dfs daemons to start
### 3 = NameNode, DataNode, SecondaryNameNode
while [ $dcnt -lt 3 ]
 sleep 1
 dcnt=`cat conf/slaves conf/masters | xargs -i rsh \\{\\} $JPS |grep -v Jps | wc -l`


echo "start PE STOP `date`"
## We get HADOOP_HOME from the user

### Start hdfs and mapred daemons

### We could reset master,slave and hadoop-site.xml!


These scripts need HADOOP_HOME set from qsub command and the mapper,reducer and the input files for the hadoop-streaming job.

Now we assign this to the queue, all.q

sgetest@master $ qconf -aattr queue pe_list hadoop all.q
sgetest@master $ qconf -sq all.q | grep pe_list

pe_list               hadoop

NOTE: I have used mapper and reducer from the example for hadoop-streaming using python (which can be found here, Writing_An_Hadoop_MapReduce_Program_In_Python) and rewrote using bash scripts.

A typical job would just need to add the input files to the dfs

sgetest@master $ bin/hadoop dfs -put INPUT DFSIP

and run the mapper/reducer using the hadoop-streaming.

sgetest@master $ bin/hadoop jar contrib/hadoop-streaming.jar -mapper $HADOOP_HOME/$HADOOP_MAPPER -reducer $HADOOP_HOME/$HADOOP_REDUCER -input $INPUT/\\\* -output $OUTPUT

The would look something like:

sgetest@master $ cat

### These env vars need to be exported at the qsub using -v
## $HADOOP_HOME   : Hadoop dir with conf, ip, mapper and reducer

###These 3 vars are passed as args
## $HADOOP_INPUT : dir containing the i/p files reqd for the job
##                  this better be in $HADOOP_HOME

### Assuming them to be in $HADOOP_HOME
## $HADOOP_MAPPER : the mapper program, can be java/sh/perl/python etc...
## $HADOOP_REDUCER: the reducer program, can be java/sh/perl/python etc...

## output will be stored to $OUTPUT in $HADOOP_HOME


## making sure the i/p and o/p dir names are unique

# usage: qsub -pe hadoop <n> -v HADOOP_HOME=/tmp <mapper> <reducer> <input dir>

## We can actaully add a step to move the i/p and o/p files to a tmp dir,
##  $TMPDIR supplied by SGE

### Upload the input files for HDFS
bin/hadoop dfs -copyFromLocal "${HADOOP_HOME}/${HADOOP_INPUT}" $INPUT

bin/hadoop dfs -ls

### Run the streamer job now!
bbin/hadoop jar contrib/hadoop-streaming.jar -mapper $HADOOP_HOME/$HADOOP_MAPPER -reducer $HADOOP_HOME/$HADOOP_REDUCER -input $INPUT/\\\* -output $OUTPUT

## We could have moved to the default o/p file location for SGE jobs, the $SGE_OUTPUT_PATH

## Download the output files back
bin/hadoop dfs -copyToLocal $OUTPUT $HADOOP_HOME/$OUTPUT


NOTE: The $OUTPUT and the logs can be configured to use Sun Grid Engine's TMP (refer to Sun Grid Engine docs) directory. Now the job can be submitted like,

sgetest@master $ qsub -pe hadoop 4 -v JAVA_HOME=/usr/jdk/latest,HADOOP_HOME=/export/home/sgetest/test/hadoop input_dir

  • input_dir contains the file(s) for use by the map-reduce task,
  • hadoop-mapper - mapper program
  • hadoop-reducer - reducer program
  • -v option we export JAVA_HOME and HADOOP_HOME to job env for use by the scripts

Output from grid engine

sgetest@master $ qstat -f
queuename                      qtype used/tot. load_avg arch          states
sus.q@master         BI    0/5       0.00     sol-x86       s
all.q@slave         BIPC  2/5       0.00     sol-x86
    818 0.55500 hadoop-job sgetest      r     01/15/2008 19:45:42     2
all.q@master         BIPC  2/5       0.00     sol-x86
    818 0.55500 hadoop-job sgetest      r     01/15/2008 19:45:42     2

sgetest@master $ qstat -g t
job-ID  prior   name       user         state submit/start at     queue         master ja-task-ID
    818 0.55500 hadoop-job sgetest      r     01/15/2008 19:45:42 all.q@slave   SLAVE
                                                                  all.q@slave   SLAVE
    818 0.55500 hadoop-job sgetest      r     01/15/2008 19:45:42 all.q@master  MASTER
                                                                  all.q@master  SLAVE

The error/output stream from the job can be seen in the job's error and output files (\*.[oe]JID), and output/error stream from the pe (hadoop daemons) can be seen in the job's pe error and output files (\*.p[oe]JID), typically in users home directory.
(It would be better if we set the logs to the job's tmp dir and view it later!)

sgetest@master $ ls -l ~/\*818
-rw-r--r--   1 sgetest  sgegrp      1361 Jan 16 05:00 /export/home/sgetest/
-rw-r--r--   1 sgetest  sgegrp      6287 Jan 16 05:00 /export/home/sgetest/
-rw-r--r--   1 sgetest  sgegrp      2172 Jan 16 05:00 /export/home/sgetest/
-rw-r--r--   1 sgetest  sgegrp      1056 Jan 16 05:00 /export/home/sgetest/

sgetest@master $ cat /export/home/sgetest/

start PE START Wed Jan 16 04:59:45 IST 2008
starting namenode, logging to
slave: starting datanode, logging to
master: starting secondarynamenode, logging to
starting jobtracker, logging to
slave: starting tasktracker, logging to
done PE START Wed Jan 16 04:59:57 IST 2008        5
start PE STOP Wed Jan 16 05:00:27 IST 2008
stopping jobtracker
slave: stopping tasktracker
stopping namenode
slave: stopping datanode
master: stopping secondarynamenode
done PE STOP Wed Jan 16 05:00:28 IST 2008

sgetest@master $ cat /export/home/sgetest/

start JOB Wed Jan 16 04:59:57 IST 2008
Found 1 items
/user/sgetest/hadoop-job.sh_818_IP  <dir>    2008-01-16 04:59
packageJobJar: [/tmp/testsuite_6444/818.1.all.q/hadoop-unjar32275/] []
/var/tmp/streamjob32276.jar tmpDir=null
done JOB Wed Jan 16 05:00:27 IST 2008

sgetest@master $ cat /export/home/sgetest/

08/01/16 05:00:02 INFO mapred.FileInputFormat: Total input paths to process : 1
08/01/16 05:00:02 INFO streaming.StreamJob: getLocalDirs():
08/01/16 05:00:02 INFO streaming.StreamJob: Running job: job_200801160459_0001
08/01/16 05:00:02 INFO streaming.StreamJob: To kill this job, run:
08/01/16 05:00:02 INFO streaming.StreamJob:
/export/home/sgetest/test/hadoop/bin/../bin/hadoop job
-Dmapred.job.tracker=master:54311 -kill job_200801160459_0001
08/01/16 05:00:02 INFO streaming.StreamJob: Tracking URL:
08/01/16 05:00:03 INFO streaming.StreamJob:  map 0%  reduce 0%
08/01/16 05:00:09 INFO streaming.StreamJob:  map 40%  reduce 0%
08/01/16 05:00:11 INFO streaming.StreamJob:  map 80%  reduce 0%
08/01/16 05:00:13 INFO streaming.StreamJob:  map 100%  reduce 0%
08/01/16 05:00:18 INFO streaming.StreamJob:  map 100%  reduce 25%
08/01/16 05:00:20 INFO streaming.StreamJob:  map 100%  reduce 50%
08/01/16 05:00:25 INFO streaming.StreamJob:  map 100%  reduce 75%
08/01/16 05:00:26 INFO streaming.StreamJob:  map 100%  reduce 100%
08/01/16 05:00:26 INFO streaming.StreamJob: Job complete: job_200801160459_0001
08/01/16 05:00:26 INFO streaming.StreamJob: Output: hadoop-job.sh_818_OP

sgetest@master $ cat /export/home/sgetest/

STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = master/
STARTUP_MSG:   args = [-format]
08/01/16 04:59:46 INFO dfs.Storage: Storage directory
/tmp/testsuite_6444/818.1.all.q/dfs/name has been successfully formatted.
08/01/16 04:59:46 INFO dfs.NameNode: SHUTDOWN_MSG:
SHUTDOWN_MSG: Shutting down NameNode at master/

The output is created in hadoop-job.sh_818_OP in our case, the contents

sgetest@master $ ls -l ~/test/hadoop/hadoop-job.sh_818_OP/
total 6
-rw-r--r--   1 sgetest  sgegrp         0 Jan 16 05:00 part-00000
-rw-r--r--   1 sgetest  sgegrp         8 Jan 16 05:00 part-00001
-rw-r--r--   1 sgetest  sgegrp        18 Jan 16 05:00 part-00002
-rw-r--r--   1 sgetest  sgegrp         7 Jan 16 05:00 part-00003

sgetest@master $ cat ~/test/hadoop/hadoop-job.sh_818_OP/\*
Hello   1
Hell    1
Hi      2
Rav     1
Ravi    2

sgetest@master $ cat ~/test/hadoop/input/test
Hi Hello Ravi Hi Rav Hell Ravi

The map-reduce job has run using the Sun Grid Engine to setup the cluster. Further options in hadoop setup can be tuned using Grid Engine wrapping up the required arguments.

It is better to use the HDFS to run the jobs as we needn't share the whole of hadoop related files (for input to job) and is handled at the master.

Here we are relying on master node to start othe daemons ( [rs]sh the machine and start daemons) and distribute jobs , and we donot have control on the TaskTracker threads. This way of setting a pe in Grid Engine is called loose-integration

With some more effort one could also achieve a tighter integration wherein the task of starting daemons and tasks on other slaves could be done by SGE. But this would require further understanding of Hadoop internals.

As far as my understanding goes, hadoop's TaskTracker spawns N threads where N is the mapred.tasktracker.tasks.maximum (set in hadoop-site.xml), though there might be more than tasks assigned for this slave node.
Hence I am not sure how one could map the concept of a 'slot' in Grid Engine's perspective to a task in hadoop env.

If the user has requested n slots on a hadoop pe, the slots are provided as they are available on the exec hosts. Now pe_slots alloted per exec host is not same, which indicates that current job is entitled to run those no. of pe slots as alloted.

In the above example I used N slots (total pe slots for the job) / No.of slaves, which might not be the same as the slots alloted per exec host by SGE.

I assume we would have some way of getting around this, and probably many other tunables for hadoop which can be provided by SGE.

I look forward to see a Hadoop pe on Sun Grid wherein the map-reduce jobs could be directly scheduled to make use of the Hadoop setup available on the setup. 


Hi I was hoping to find out if you knew how to configure this property such that

<description>A base for other temporary directories.</description>

My hadoop temp files will go to the e drive as opposed to my local c directory. You see, everything works fine if I don't change that configuration. But I'm dealing with more data and my C directory is running out of space. I want to switch over to my E directory (just the DFS)

I've tried all these values:
<value>file:////cygdrive/e/tmp/hadoop-${}</value> -- zilch!
<value>cygdrive/e/tmp/hadoop-${}</value> -- creates this local in my local hadoop directory
<value>E:/tmp/hadoop-${}</value> -- works until I start my jobs which results in a ConnectException (ConnectException doesn't occur if I don't configure this property)

I'm using a Windows OS combined with cygwin.

Any ideas?

Posted by Yih Sun Khoo on August 06, 2008 at 09:51 PM IST #

Hi Ravi

I am novice in grid and cluster. I want to ask you something.

1)Cluster and SGE are same if not then what is he difference.
2)I want to make a grid,can you guide me through that.
i want to use Hadoop cluster then SGE then, Globus . Is hadoop have compatibility with Globus.
Looking forward for your positive response.

Posted by kasim on November 12, 2010 at 05:21 AM IST #

Post a Comment:
  • HTML Syntax: NOT allowed



« September 2016