X

Information, tips, tricks and sample code for Big Data Warehousing in an autonomous, cloud-driven world

Big Data Resource management Looks Hard, But it isn't

Alexey Filanovskiy
Product Manager

Hadoop is an ecosystem consisting of multiple different components in which each component (or engine) consumes certain resources. There are a few resource management techniques that allow administrators to define how finite resources are divided across multiple engines. In this post, I'm going to talk about these different techniques in detail.

Talking about resource management, I'll divide the topic in these sub-topics:

1. Dealing with low latency engines (realtime/near-realtime)

2. Division of resources across multiple engines within a single cluster

3. Division of resources between different users within a single technology

1. Low latency engines

These engines assume low latency response times. Examples are NoSQL databases (HBase, MongoDB, Cassandra...) or message based systems like Kafka. These systems should be placed on a dedicated cluster if you have real and specific low latency Service Level Agreements (SLAs). Yes, for highly utilized HBase or highly utilized Kafka with the strict SLA we do recommend to put it on a dedicated cluster, otherwise the SLAs can't be met.

2. Division of resources across multiple engines in a single cluster

It's quite common to put multiple processing engines (such as YARN, Impala, etc.) in a single cluster. As soon as this happens, administrators face the challenge of how to divide resources among these engines. The short answer for Cloudera clusters is "Static Service Pools". In Cloudera Manager you can find the "Static Service Pool" configuration option here:

You use this functionality to divide resources between different processing engines, such as:

  • YARN
  • Impala
  • Big Data SQL
  • Etc.

When applied, under the hood, these engines are using Linux cgroup to partition resources across these engines. 

I've already explained how to setup Static Service Pools in context of Big Data SQL, please review this for more details on configuring Static Resource Pools.

3. Division of resources between different users within a single technology

3.1 Resource division within YARN. Dynamic service pool

To work with resource allocation in Yarn, Cloudera Manager offers "Dynamic Service Pools". Its purpose is to divide resources within different user groups inside YARN (a single engine):

Because you work on Yarn, many different engines are impacted - e.g. those engines that run inside the Yarn framework. For example Spark, Hive (MapReduce) etc.

The following steps are planned to be automated for Big Data Appliance and Big Data Cloud Service in an upcoming but if you want to apply this beforehand or on your own Cloudera clusters, here are the high level steps for this:

a) Enable Fair Scheduler Preemption

b) Configure example pools: for Low Priority, Medium Priority and High Priority jobs

c) Setup placement rules

The following sections dive deeper into the details.

3.1.1. Enable Fair Scheduler Preemption

To enable fair scheduler preemption go to Cloudera Manager -> YARN -> Configuration. Then set:

- yarn.scheduler.fair.preemption=true

- yarn.scheduler.fair.preemption.cluster-utilization-threshold=0.7

The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources.

Next we must configure:

- yarn.scheduler.fair.allow-undeclared-pools = false

When set to true, pools specified in applications but not explicitly configured, are created at runtime with default settings. When set to false, applications specifying pools not explicitly configured run in a pool named default. This setting applies when an application explicitly specifies a pool and when the application runs in a pool named with the username associated with the application

- yarn.scheduler.fair.user-as-default-queue = false

When set to true, the Fair Scheduler uses the username as the default pool name, in the event that a pool name is not specified. When set to false, all applications are run in a shared pool, called default.

Note: parameter "Enable ResourceManager ACLs" should be set to true by default, but its worth checking it, just in case.

"yarn.admin.acl" shouldn't be equal '*'. set it equal to "yarn"

After modifying these configuration settings you will need to restart the YARN cluster to activate these settings in your cluster. Next step is the configuration of the Dynamic Service Pools.

3.1.2. Configure example pools

Go to Cloudera Manager -> Dynamic Resource Pool Configuration.

Here we recommend (in future BDA/BDCS versions we will create these by default for you) to create three pools:

  • low
  • medium
  • high

We also recommend that you remove the root.default pool as shown below:

Different pools will use different resources (CPU and Memory). To illustrate this I'll run 3 jobs and put these into the different pools: high, medium, low:

[root@bdax72bur09node01]# hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.low 1000 1000000000000

[root@bdax72bur09node01]# hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.medium 1000 1000000000000

[root@bdax72bur09node01]# hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.high 1000 1000000000000

after running for a while, navigate to Cloudera Manager -> YARN -> Resource Pool and take a look at "Fair Share VCores" (or memory):

In this diagram we can see that vCores are allocated according to our configured proportion: 220/147/73 roughly the same as 15/10/5.

Second important configuration is limit of maximum pool usage:

we recommend to put a cap on the resource pool so small applications can jump into the cluster even if a long-running job has been launched. Here is a small test case:

- Run Long running job in root.low pool (which takes days to be done):

[root@bdax72bur09node01]# hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.low 1000 1000000000000

Check resource usage:

This graph shows that we have some unused CPU, as we wanted. Also, notice below that we have some pending containers which shows that our application wants to run more jobs, but as expected YARN disallows to do this:

So, despite that we have spare resource in our cluster, YARN disallows to use it because of the capping maximum resource usage for certain pools.

- Now run some small job, which belong to the different pool:

hive> set mapred.job.queue.name=root.medium;

hive> select count(1) from date_dim;

...

Now, jump to the resource usage page (Cloudera Manager -> YARN -> Resource Pools):

Here we can see that the number of pending containers for the first job hasn't changed. This is the reserve we allocated for newly running small jobs. This enables short start up times for small jobs so, no preemption needed and end users will not feel like their jobs hangs.

Third key things of Resource Management configuration is Preemption configuration. We recommend to configure different preemption levels for each different pool (double check you've enabled preemption earlier at the cluster level).

There are two configuration settings to change:

Fair Share Preemption ThresholdThis is a value between 0 and 1. If set to x and the fair share of the resource pool is F, we start preempting resources from other resource pools if the allocation is under (x * F).

In other words, it defines in lack of which portion of the resources we start to do preemption.

Fair Share Preemption TimeoutThe number of seconds a resource pool is under its fair share before it will try to preempt containers to take resources from other resource pools.

In other words, this setting defines when YARN will Start to preempt. 

To configure, go to Cloudera Manager -> Dynamic Service Pools -> Edit for certain pool -> Preemption.

We suggest the following settings:

For high. Immediately start preemption if job didn't get all requested resources, which is implemented as below:

For medium. Wait 60 seconds before starting preemption if the job didn't get at least 80% of the requested resources:

And for low. Wait a 180 seconds before starting preemption if a job didn't get at least 50% of its requested resources:

These parameters define how aggressively containers will be preempted (how quickly job will get required resources). Here is my test case - I've run some long running job in root.low pool and run some job in parallel, assign it to low, medium and high pool respectively.

hive> set mapred.job.queue.name=root.low;

hive> select count(1) from store_sales;

...

hive> set mapred.job.queue.name=root.medium;

hive> select count(1) from store_sales;

...

hive> set mapred.job.queue.name=root.high;

hive> select count(1) from store_sales;

...

as a measure of the result we could consider elapsed time (which will consist of waiting time, which will be different, according our configuration, plus elapsed time which also will be different because of resource usage). This table shows the result:

Pool name Elapsed time, min
Low 14.5
Medium 8.8
High 8

In the graph below you can see how preemption was accomplished:

There is another aspect of preemption, which is whether a pool can be preempted or not.

To set this up, go to: Cloudera Manager -> Dynamic Resource Pool -> root.high pool, there click on "Edit":

After this click on "Preemption" and disable preemption from root.high pool.

That will mean that nobody can preempt tasks from this pool:

Note: this setting makes root.high pool incredibly strong and you may have to consider enabling preemption again.

3.1.3. Setup Placement rules

Another key component of Dynamic Resource management is Placement rules. Placement rules define which pool a job will be assigned to. By default, we suggest keeping it simple

To configure, go to Cloudera Manager -> Dynamic Service Pools -> Placement Rules:

With this configuration your user may belong to one of the secondary groups, which we named low/medium/high. If not, you can define the pool assigned for the job at runtime. If you don't do that, by default the job will be allocated resources in the low pool. So, if administrator knows what to do she will put user in certain pool, if users know what to do, they will specify certain pool (low, medium or high). We recommend administrators defining this for the system.

For example, I do have user alex, who belongs to secondary group "medium":

[root@bdax72bur09node01]# id alex

uid=1002(alex) gid=1006(alex) groups=1006(alex),1009(medium)

so, if I'll try to specify a different group (consider it as user wants to cheat the system settings) at runtime it will not overwrite medium group:

[root@bdax72bur09node01]# sudo -u alex hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.high 1000 1000000000000

3.1.4. How to specify resource pool at runtime?

While there are a few engines, let's focus on MapReduce (Hive) and Spark. Earlier, in this blog I've showed how to specify a pool for MapReduce job, with mapred.job.queue.name parameter. You can specify it with the -D parameter when you launch the job from the console:

[root@bdax72bur09node01]# hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.low 1000 1000000000000

Or in case of hive you can set it up as a parameter:

hive> set mapred.job.queue.name=root.low;                                            

Another engine is Spark, and here you can simply add the "queue" parameter:

[root@bdax72bur09node01]# spark-submit --class org.apache.spark.examples.SparkPi --queue root.high spark-examples.jar

in Spark2-Shell console you need to specify the same parameter:

[root@bdax72bur09node01 lib]# spark2-shell --queue root.high

3.1.5. Group Mapping

The first thing that Resource Manager is checking is user secondary group. How do you define this? I've posted it earlier in my security Blog Post, but in the nutshell it is defined either with LDAP mapping or UnixShell and defined under "hadoop.security.group.mapping" parameter in Cloudera Manager (HDFS -> Configuration):

Below is a list of useful commands, which could be used for managing user and groups on BDA/BDCS/BDCC (note all users and groups have to exist on each cluster node and have same id):

// Add new user

# dcli -C "useradd  -u 1102 user1"

// Add new group

# dcli -C "groupadd -g 1111 high"

// Add user to the group

# dcli -C "usermod -a -G high user1"

// Remove user from the group

# dcli -C "gpasswd -d user1 high"

// Delete user

# dcli -C "userdel user1"

// Delete group

# dcli -C "groupdel high"

Oracle and Cloudera recommend to use "org.apache.hadoop.security.ShellBasedUnixGroupsMapping" plus SSSD to replicate users from Active Directory. From Cloudera's documentation: "The LdapGroupsMapping library may not be as robust a solution needed for large organizations in terms of scalability and manageability, especially for organizations managing identity across multiple systems and not exclusively for Hadoop clusters. Support for the LdapGroupsMapping library is not consistent across all operating systems."

3.1.6. Control user access to certain pool (ACL)

You may want to restrict the group of users that has access to certain pools (especially to high and medium). You accomplish this with ACLs in Dynamic Resource Pools. First, just an a reminder, that you have to set up "yarn.admin.acl" to something different than '*'. Set it equal to "yarn".

Before setting restrictions for certain pools, you need to set up a restriction for the root pool:

due Cloudera Manager bug, you can't leave ACL for submission and admin empty (otherwise everyone will allow to submit jobs in any pool), so set it up equal to desired groups or as workaround set it up to ",".

After this you are ready to create rules for the other pools. Let's start from low. Here as soon as it plays role of default pool in our config, we should allow everyone to submit jobs there:

next, let's move to medium. There we will config access for users, who belongs to groups medium and etl:

and finally, we are ready to config high pool. Here we will allow to submit jobs for users who belong to groups managers or high:

let's do a quick test. let's take some user, who is not belongs to privileged groups: medium, high, etl, managers.

[root@bdax72bur09node01 hadoop-mapreduce]# hdfs groups user2

user2 : user2

and run some jobs after:

[root@bdax72bur09node01 hadoop-mapreduce]# sudo -u user2 hadoop jar hadoop-mapreduce-examples.jar pi 1 10

...

18/11/19 21:22:31 INFO mapreduce.Job: Job job_1542663460911_0026 running in uber mode : false

18/11/19 21:22:31 INFO mapreduce.Job:  map 0% reduce 0%

18/11/19 21:22:36 INFO mapreduce.Job:  map 100% reduce 0%

after this check that this user was place in root.low queue (everyone is allowed to run jobs there):

so far so good. Now, let's try to submit job from the same user to high priority pool:

[root@bdax72bur09node01 hadoop-mapreduce]# sudo -u user2 hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.high 1 10

...

18/11/19 21:27:33 WARN security.UserGroupInformation: PriviledgedActionException as:user2 (auth:SIMPLE) cause:java.io.IOException: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1542663460911_0027 to YARN : User user2 cannot submit applications to queue root.high

to put this job in pool root.high, we need to add this user to any group, which is listed in ACL for pool root.high, let's use managers (create it first):

[root@bdax72bur09node01 hadoop-mapreduce]# dcli -C "groupadd -g 1112 managers"

[root@bdax72bur09node01 hadoop-mapreduce]# dcli -C "usermod -a -G managers user2"

second try and validation:

[root@bdax72bur09node01 hadoop-mapreduce]# hdfs groups user2

user2 : user2 managers

[root@bdax72bur09node01 hadoop-mapreduce]# sudo -u user2 hadoop jar hadoop-mapreduce-examples.jar pi -Dmapred.job.queue.name=root.high 1 10

...

18/11/19 21:34:00 INFO mapreduce.Job: Job job_1542663460911_0029 running in uber mode : false

18/11/19 21:34:00 INFO mapreduce.Job:  map 0% reduce 0%

18/11/19 21:34:05 INFO mapreduce.Job:  map 100% reduce 0%

wonderful! everything works as expected. Once again, here is important to set up root pool to some values. If you don't want put the list of available groups in the root pool and want to put it later or you may want to have a pool, like root.low where everyone could submit their jobs, simply use workaround with "," character.

3.1.7. Analyzing of resource usage

The tricky thing with the pools, that sometimes many people or divisions use the same pool and it's hard to define who get which portion. 

[root@bdax72bur09node01 hadoop-mapreduce]# cat getresource_usage.sh

#!/bin/bash

 

STARTDATE=`date -d " -1 day " +%s%N | cut -b1-13`

ENDDATE=`date +%s%N | cut -b1-13`

result=`curl -s http://bdax72bur09node04:8088/ws/v1/cluster/apps?finishedTimeBegin=$STARTDATE&finishedTimeEnd=$ENDDATE`

if [[ $result =~ "standby RM" ]]; then

result=`curl -s http://bdax72bur09node05:8088/ws/v1/cluster/apps?finishedTimeBegin=$STARTDATE&finishedTimeEnd=$ENDDATE`

fi

#echo $result

echo $result | python -m json.tool | sed 's/["|,]//g' | grep -E "user|coreSeconds" | awk ' /user/ { user = $2 }

/vcoreSeconds/ { arr[user]+=$2 ; }

END { for (x in arr) {print "yarn." x ".cpums="arr[x]} } '

 

echo $result | python -m json.tool | sed 's/["|,]//g' | grep -E "user|memorySeconds" | awk ' /user/ { user = $2 }

/memorySeconds/ { arr1[user]+=$2 ; }

END { for (y in arr1) {print "yarn." y ".memorySeconds="arr1[y]} } '

 

3.2.1. Impala Admission Control

Another popular engine in the Hadoop world is Impala. Impala has own mechanism to control resources - called admission control. Many MPP systems recommend queueing queries in case of high concurrency instead of running it in parallel. Impala is not an exception. For config this, go to Cloudera Manager -> Dynamic Resource Pools -> Impala Admission Control:

Admission Control has few key parameters for configuring queue:

Max Running Queries - Maximum number of concurrently running queries in this pool

Max Queued Queries - Maximum number of queries that can be queued in this pool

Queue Timeout - The maximum time a query can be queued waiting for execution in this pool before timing out

so, you will be able to place up to Max Running Queries queries, after this rest Max Queued Queries queries will be queued. They will stay in the queue for Queue Timeout, after this, it will be canceled.

Example. I've config Max Running Queries =3 (allow to run 3 simultaneous SQLs), Max Queued Queries =2, which allows running two simultaneous queries and Queue Timeout is default 60 seconds. 

After this, I've run 6 queries and wait for a minute. Three queries were successfully executed, three other queries failed for different reasons:

One query been rejected right away, because it were not place for it in a queue (3 running, 2 queued next rejected). Two others were in queue for a 60 seconds, but as soon as rest queries were not executed within this timeout, they were failed as well.

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.