Testing the Coherence Simple Partition Assignment Strategy

The Simple Partition Assignment Strategy introduced with Coherence 3.7.1 now allows Coherence to re-balance cache data partitions for primary and backup data using a centralised partitioning strategy. Previously (and the current default) each cluster member determined how cache data partitions were distributed autonomously (by itself), for instance by taking the total number of partitions and dividing it by the number of cluster members.

A centralised strategy allows the complete topology of the cluster to be taken into account. Like the default autonomous strategy the simple partition assignment strategy tries to fairly distribute cache data partitions, but as far as is possible it also tries to ensure "data safety", by placing primary and backup copies of data on different sites, racks and machines. So if there are multiple sites then primary and backup data will be put on different sites.  If a cluster spans multiple racks then primary and backup partitions will be put on different racks so a complete rack failure will not result in data loss. Lastly if all the machines are on the same rack then primay and backup partitions will be placed on different machines.

It should be noted that in order for the the simple partition assignment strategy to determine where cache data partitions should be located each member in the cluster should specify its full identity, i.e. the name of the machine it is on, its rack and site using member identity parameters. These are usually set as system properties (as shown below) but can also be placed in the cluster override file.

-Dtangosol.coherence.cluster=ProdAppCluster

-Dtangosol.coherence.site=MainDataCentre

-Dtangosol.coherence.rack=Rack07

-Dtangosol.coherence.machine=Blade11

Normally to test that the simple partition assignment strategy works would involve quite a bit of setup. However, a small test framework called "littlegrid" (written by Jon Hall) enables the whole test to be run in a single JVM and written as a simple JUnit test. Below is the cache configuration file that introduces the new simple assignment strategy class (to the distributed service CustomDistributedCacheService) and is highlighted in bold.

<?xml version="1.0"?>

<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
  xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config 
    http://xmlns.oracle.com/coherence/coherence-cache-config/1.0/coherence-cache-config.xsd">
  <defaults>
    <serializer>pof</serializer>
  </defaults>
  <caching-scheme-mapping>
    <cache-mapping>
      <cache-name>*</cache-name>
      <scheme-name>CustomDistributedCacheScheme</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
  <caching-schemes>
    <distributed-scheme>
      <scheme-name>CustomDistributedCacheScheme</scheme-name>
      <service-name>CustomDistributedCacheService</service-name>
      <thread-count>5</thread-count>
      <partition-assignment-strategy>
        <instance>
          <class-name>com.tangosol.net.partition.SimpleAssignmentStrategy
          </class-name>
        </instance>
      </partition-assignment-strategy>
      <partition-listener>
        <class-name>com.oracle.coherence.test.PartitionLostListener</class-name>
      </partition-listener>
      <backing-map-scheme>
         "><local-scheme>
        </local-scheme>
      </backing-map-scheme>
      <autostart>true</autostart>
    </distributed-scheme>
  </caching-schemes>
</cache-config> 

Now here is the JUnit test setup using the "littlegrid" test framework to simulate a rack failure. 3 racks, each containing 2 machines with 2 nodes in each machine is created, along with a management node and obviously a storage disabled test client. The last 2 nodes are in a separate rack - "default rack". The test involves the following steps:

  1. Some data is added to a test cache so that data is held by each partition
  2. A random rack (from the 3 created) is selected and all the nodes in that rack shutdown at the same time
  3. After a short pause the JMX MBean "Partition Lost" - shown below - is checked on all remaining storage members to ensure that no partitions were lost.

This last step is possible because a Partition Event Listener is registered with the partitioned cache service (CustomDistributedCacheService) in the cache configuration file. This Partition Event Listener also exposes itself as  JMX MBean, making it possible to check for this kind of event.

The code for the test is shown below:


package com.oracle.coherence.test;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.littlegrid.ClusterMemberGroup;
import org.littlegrid.ClusterMemberGroupUtils;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;

/**
 * Coherence simple assignment strategy tests.
 * 
 * @author Dave Felcey
 */
public class TestCase {
  private ClusterMemberGroup memberGroup;
  private NamedCache cache = null;
  private int[][] racks = null;

  @Before
  public void setUp() {
    // Create member group now, so later code simply merges into this group
    memberGroup = ClusterMemberGroupUtils.newBuilder()
        .buildAndConfigureForNoClient();

    final int numberOfRacks = 3;
    final int numberOfMachines = 2;
    final int numberOfStorageEnabledMembersPerMachine = 2;
    final int expectedClusterSize = (numberOfRacks
        * numberOfMachines * numberOfStorageEnabledMembersPerMachine);

    racks = new int[numberOfRacks][numberOfMachines
        * numberOfStorageEnabledMembersPerMachine];

    // Start up the storage enabled members on different racks and machines
    for (int rack = 0; rack < numberOfRacks; rack++) {
      for (int machine = 0; machine < numberOfMachines; machine++) {
        // Build using the identity parameters
        memberGroup.merge(ClusterMemberGroupUtils
            .newBuilder()
            .setFastStartJoinTimeoutMilliseconds(100)
            .setSiteName("site1")
            .setMachineName("r-" + rack + "-m-" + machine)
            .setRackName("r-" + rack)
            .setStorageEnabledCount(
                numberOfStorageEnabledMembersPerMachine)
            .buildAndConfigureForNoClient());
      }

      // Save member id's for rack
      System.arraycopy(memberGroup.getStartedMemberIds(),
          rack * numberOfMachines
              * numberOfStorageEnabledMembersPerMachine,
          racks[rack], 0, numberOfMachines
              * numberOfStorageEnabledMembersPerMachine);
    }

    // Create Management and client members with default rack and machine
    // identities
    memberGroup.merge(ClusterMemberGroupUtils.newBuilder()
        .setJmxMonitorCount(1).setLogLevel(9)
        .buildAndConfigureForStorageDisabledClient());

    assertThat(
        "Cluster size check - includes storage disabled client and JMX monitor",
        CacheFactory.ensureCluster().getMemberSet().size(),
        is(expectedClusterSize + 2));

    assertThat(
        "Member group check size is as expected - includes JMX monitor, but not storage disabled client",
        memberGroup.getStartedMemberIds().length,
        is(expectedClusterSize + 1));

    cache = CacheFactory.getCache("test");
  }

  /**
   * Demonstrate SimpleAssignementStrategy.
   */
  @Test
  public void testSimpleAssignmentStrategy()
      throws Exception {

    final Map entries = new HashMap();
    final int totalEntries = 1000;

    // Load test data
    for (int i = 0; i < totalEntries; i++) {
      entries.put(i, "entry " + i);
    }

    cache.putAll(entries);
    assertThat(cache.size(), is(totalEntries));

    // Kill rack - if partition lost then will exit
    Random random = new Random();
    int rack = Math.abs(random.nextInt() % racks.length);
    System.out.println("Stopping rack: " + rack);

    memberGroup.stopMember(racks[rack]);

    System.out
        .println("Pausing to allow data to be recovered");
    TimeUnit.SECONDS.sleep(memberGroup
        .getSuggestedSleepAfterStopDuration()
        * racks[rack].length * 10);

    assertThat("Partition lost",
        getPartitionLostEventCount(), is(0));
    assertThat("Cache size", cache.size(), is(totalEntries));
  }

  @After
  public void tearDown() {
    // Quick stop all - members *don't* leave the cluster politely - done for
    // this test so it shuts down quicker
    memberGroup.stopAll();

    ClusterMemberGroupUtils
        .shutdownCacheFactoryThenClusterMemberGroups(memberGroup);
  }

  /**
   * Get the number of partitions lost.
   * 
   * @return partitions lost
   */
  private int getPartitionLostEventCount() throws Exception {

    // Create an MBeanServerConnection
    final MBeanServerConnection connection = ManagementFactory
        .getPlatformMBeanServer();

    @SuppressWarnings("unchecked")
    final Set members = ((PartitionedService) cache
        .getCacheService()).getOwnershipEnabledMembers();
    final String serviceName = cache.getCacheService()
        .getInfo().getServiceName();

    int lostPartitions = 0;

    // Get any partition lost event information from cluster members
    for (Member member : members) {
      String path = "Coherence:type=PartitionListener,name=PartitionLostCount,service="
          + serviceName
          + ",id=PartitionLost,nodeId="
          + member.getId();

      lostPartitions += (Integer) connection.getAttribute(
          new ObjectName(path), "PartitionLostCount");
    }

    return lostPartitions;
  }
}

By extending the pause time after the rack has been stopped you can view the PartitionLost MBean information in JConsole, as shown below:

JConsole

The PartitionLostListerner (based on an example by Andrew Wilson) looks like this: 

package com.oracle.coherence.test;

import java.util.concurrent.atomic.AtomicInteger;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.management.Registry;
import com.tangosol.net.partition.PartitionEvent;
import com.tangosol.net.partition.PartitionListener;

/**
 * Partition lost listener
 */
public class PartitionLostListener implements
    PartitionListener, PartitionLostListenerMBean {
  private final AtomicInteger lostCount = new AtomicInteger();
  private String serviceName;

  public PartitionLostListener() {
  }

  @Override
  public synchronized void onPartitionEvent(
      PartitionEvent partitionEvent) {
    // Initialize if necessary
    if (serviceName == null) {
      serviceName = partitionEvent.getService().getInfo()
          .getServiceName();
      System.out.println("Registering JMX");
      Registry reg = CacheFactory.getCluster()
          .getManagement();
      reg.register(
          reg.ensureGlobalName("Coherence:type=PartitionListener,name=PartitionLostCount,service="
              + serviceName + ",id=PartitionLost"), this);
      System.out.println("Registered JMX");
    }

    // Handle the event
    if (partitionEvent.getId() == PartitionEvent.PARTITION_LOST) {
      System.out.println("Partition lost: "
          + partitionEvent);
      lostCount.addAndGet(partitionEvent.getPartitionSet()
          .cardinality());
    }
  }

  // Returns any partitions lost and resets
  public synchronized Integer getPartitionLostCount() {
    int temp = lostCount.get();
    lostCount.set(0);
    return temp;
  }
}

If you would like to try this for yourself then you can download the complete example form here and find the "littlegrid" test tool here. Please note that to properly test the Simple Partition Strategy you should get the latest release of Coherence (which is available from Oracle Support and at the time of writting is 3.7.1.4). Unfortunately patch releases are not available from OTN. Another tip for using the simple partition assignment strategy with large clusters is to use the distribution quorum feature during cluster startup. It stops partition re-balancing taking place until a cluster membership threshold has been reached eliminating unneccessary network traffic and processing. A good starting point would be to set the threshold for an n rack cluster to be at n - 1 * number of machines per rack - 1. ⁞The restore and read quorum features can also be used to mitigate the the effects of a "split brain" scenario by making data read-only and enabling a cluster to reform should this occur.

Comments:

Post a Comment:
  • HTML Syntax: NOT allowed
About

Views and ideas about Oracle Coherence and other software

Search

Categories
Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today