Using Project OpenDMK APIs To Send Traps From Multiple Threads

I recently received a question from a member of the Java community asking for advises on how to implement a trap sending service for a highly multi-threaded application.

I thought I would quickly write a small example to show that the SnmpAdaptorServer provided by Project OpenDMK could be safely used to send traps in multi-threaded environments. But to my biggest surprise, my test failed.

A quick investigation of the code revealed the bug: the SnmpAdaptorServer will let you send traps when it is OFFLINE. However, in that case, it will open the trap socket on demand, and close it after the trap has been sent. In a highly multi-threaded environment this can result in the socket being closed by one thread while the other threads are still using it. In other words, when the adapter is OFFLINE sending traps is not thread safe. This is what I call a bug.
Fortunately the work around is simple:

  • either encapsulate the adapter in an object that will use a java.util.concurrent.Lock to ensure that only one thread can call xxxxTrap() at a time,
  • or - probably safer, start the adapter before using it.

This is how it should be in the majority of the cases anyway - since traps are usually sent by SNMP agents, and SNMP agents usually have MIBs (and hence an SnmpAdaptorServer which needs to be started).

Anyway - start the adapter is my advice - and here below is my little test case. You'll need to have jdmkrt.jar from Project OpenDMK in your classpath to run it.

Cheers,
-- daniel

/\*
 \* MultiThreadTrapTest.java
 \*
 \* Created on October 5, 2007, 11:42 AM
 \*
 \* Copyright 2007 Sun Microsystems, Inc.  All Rights Reserved.
 \*
 \* Redistribution and use in source and binary forms, with or without
 \* modification, are permitted provided that the following conditions
 \* are met:
 \*
 \*   - Redistributions of source code must retain the above copyright
 \*     notice, this list of conditions and the following disclaimer.
 \*
 \*   - Redistributions in binary form must reproduce the above copyright
 \*     notice, this list of conditions and the following disclaimer in the
 \*     documentation and/or other materials provided with the distribution.
 \*
 \*   - Neither the name of Sun Microsystems nor the names of its
 \*     contributors may be used to endorse or promote products derived
 \*     from this software without specific prior written permission.
 \*
 \* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 \* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 \* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 \* PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 \* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 \* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 \* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 \* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 \* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 \* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 \* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 \*/

package sendtrapmultithread;

import com.sun.jdmk.tasks.DaemonTaskServer;
import com.sun.management.comm.SnmpAdaptorServer;
import com.sun.management.comm.SnmpV3AdaptorServer;
import com.sun.management.snmp.SnmpOid;
import com.sun.management.snmp.SnmpPdu;
import com.sun.management.snmp.SnmpPduPacket;
import com.sun.management.snmp.SnmpPduRequest;
import com.sun.management.snmp.SnmpPduTrap;
import com.sun.management.snmp.SnmpScopedPduRequest;
import com.sun.management.snmp.SnmpStatusException;
import com.sun.management.snmp.SnmpTimeticks;
import com.sun.management.snmp.SnmpVarBindList;
import com.sun.management.snmp.SnmpEventReportDispatcher;
import com.sun.management.snmp.manager.SnmpPeer;
import com.sun.management.snmp.manager.SnmpSession;
import com.sun.management.snmp.manager.SnmpTrapListener;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/\*\*
 \* Class MultiThreadTrapTest - test sending traps from multiple threads...
 \*
 \* @author Sun Microsystems, 2007 - All Rights Reserved.
 \*/
public class MultiThreadTrapTest implements Runnable {

    /\*\*
     \* A logger for this class.
     \*\*/
    private static final Logger LOG =
            Logger.getLogger(MultiThreadTrapTest.class.getName());

    private final SnmpPeer peer;
    private final SnmpAdaptorServer server;
    private final CyclicBarrier barrier;
    private final CountDownLatch latch;

    public final int TRAP_COUNT_LIMIT;
    public final int THREAD_COUNT;

    private volatile int trapSent;

    private synchronized int incr() {
        return ++trapSent;
    }

    public int getSentCount() {
        return trapSent;
    }

    /\*\*
     \* Creates a new instance of MultiThreadTrapTest
     \*/
    public MultiThreadTrapTest(SnmpPeer peer) {
        final SnmpV3AdaptorServer srv;
        try {
            srv = new SnmpV3AdaptorServer(false,0,null);
        } catch (Exception x) {
            throw new IllegalStateException("Failed to initialize",x);
        }
        server  = srv;
        TRAP_COUNT_LIMIT = 1000;
        THREAD_COUNT = 20;
        this.peer = peer;
        barrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {
            public void run() {
                System.out.println("5, 4, 3, 2, 1, Go!");
            }
        });
        latch = new CountDownLatch(THREAD_COUNT);
    }

    // Creates and start all sender threads
    //
    public void launch() {
        // It is not safe to send traps from multiple threads when the
        // adaptor is offline...
        //
        if (!server.isActive()) {
            // if you comment these lines, the test will fail...
            server.start();
            server.waitState(server.ONLINE,1000);
        }
        for (int i=0; i<THREAD_COUNT; i++) {
            new Thread(this,"Sender["+i+"]").start();
        }
    }

    // Wait until all our sender threads have finished.
    //
    public void await() throws InterruptedException {
        try {
            latch.await();
        } finally {
            if (server.isActive())
                server.stop();
        }

    }

    // Send a dummy trap. This is the method that the sender threads
    // call in a loop
    //
    private void sendTrap(SnmpPeer peer) throws IOException, SnmpStatusException {
        // send a trap containing garbage oids...
        server.snmpV2Trap(peer, new SnmpOid("1.2.3.4.0.1"),
                          new SnmpVarBindList(),
                          new SnmpTimeticks(server.getSysUpTime()));
        incr();
    }

    // run() method for sender threads
    //
    public void run() {
        try {
            // Wait until all sender threads are ready.
            barrier.await();
            // send traps until limit is reached.
            while (trapSent <= TRAP_COUNT_LIMIT) {
                try {
                    sendTrap(peer);
                } catch (Exception x) {
                    System.err.println(
                            Thread.currentThread().getName()+
                            ": failed to send trap: "+x);
                    x.printStackTrace();
                    return;
                }
            }
        } catch (Exception x) {
            System.err.println(Thread.currentThread().getName()+": failed: "+x);
        } finally {
            // This thread has completed.
            latch.countDown();
        }

    }

    private static int waitForTraps(final BlockingQueue<SnmpPdu> trapQueue,
            final int count) {
        int receivedCount = 0;
        System.out.println("Excpecting "+count+" traps...");
        while (receivedCount < count) {
            SnmpPdu pdu = null;
            try {
                pdu = trapQueue.poll(1,TimeUnit.SECONDS);
            } catch (Exception x) {
                System.err.println("Failed to get trap: "+x);
            }
            if (pdu == null) {
                return receivedCount;
            }
            receivedCount++;
        }
        return receivedCount;
    }

    /\*\*
     \* @param args the command line arguments
     \*/
    public static void main(String[] args) throws Exception {
        // args[0] must be a port number
        if (args == null || args.length == 0) args = new String[] { "6789" };
        final int trapPort = Integer.parseInt(args[0]);

        // The task server that will handle received trap packets.
        // The SnmpEventReportDispatcher thread will empty the socket buffer
        // and delegate the bulk of the work to this task server. It is
        // important that the SnmpEventReportDispatcher has a higher priority,
        // in order to avoid loosing too many traps by buffer overflow.
        //
        final DaemonTaskServer srv = new DaemonTaskServer();
        srv.start();

        // The SnmpEventReportDispatcher listens for traps and pass them
        // to a SnmpTrapListener... The SnmpTrapListener will be invoked
        // by the task server we created above.
        //
        final SnmpEventReportDispatcher trapReceiver =
                new SnmpEventReportDispatcher(trapPort,
                    null,srv,null);

        // A queue in which we will be puting reiceive traps...
        //
        final BlockingQueue<SnmpPdu> trapQueue =
                new ArrayBlockingQueue<SnmpPdu>(1000);

        // Our SnmpTrapListener simply put the received traps into the queue
        // we created above.
        //
        final SnmpTrapListener handler = new SnmpTrapListener() {
            public void enqueue(SnmpPdu aTrapPDU) {
                try {
                    if (!trapQueue.offer(aTrapPDU,10,TimeUnit.MILLISECONDS))
                     System.out.println("Can't insert trap in the queue: timeout expired");
                } catch (InterruptedException x) {
                     System.out.println("Can't insert trap in the queue: "+x);
                }
            }
            public void processSnmpTrapV1(SnmpPduTrap aTrapPDU) {
                enqueue(aTrapPDU);
            }
            public void processSnmpTrapV2(SnmpPduRequest aTrapPDU) {
                enqueue(aTrapPDU);
            }
            public void processSnmpTrapV3(SnmpScopedPduRequest aTrapPDU) {
                enqueue(aTrapPDU);
            }
        };

        // Register the trap listener with the event dispatcher.
        trapReceiver.addTrapListener(handler);

        // Starts the trap receiving thread
        final Thread receiverThread = new Thread(trapReceiver,"TrapReceiver");
        receiverThread.setDaemon(true);
        receiverThread.setPriority(Thread.MAX_PRIORITY);
        receiverThread.start();

        // Launch the test
        final SnmpPeer peer = new SnmpPeer(null,trapPort);
        final MultiThreadTrapTest test = new MultiThreadTrapTest(peer);
        test.launch();

        int receivedCount = waitForTraps(trapQueue,test.TRAP_COUNT_LIMIT);
        if (receivedCount < test.TRAP_COUNT_LIMIT) {
            System.err.println("Test failed: timeout exceeded: "+
                    receivedCount + " traps received, " +
                    test.getSentCount() + "traps sent");
            throw new RuntimeException("Test failed");
        }

        // Make sure all sender have finished...
        //
        test.await();

        // Some traps might not have had time to come yet...
        //
        if (receivedCount < test.getSentCount()) {
            receivedCount += waitForTraps(trapQueue,
                            test.getSentCount()-receivedCount);
        }

        // Check we got everything...
        //
        if ((receivedCount+trapQueue.size()) < test.getSentCount() ) {
                System.err.println("Test failed: bad received count: "+
                        (int)(receivedCount + trapQueue.size())+
                        " traps received, " +
                        test.getSentCount() + "traps sent");
                throw new RuntimeException("Test failed");
        }
        System.out.println("Success! "+
                        receivedCount + " traps received, " +
                        test.getSentCount() + "traps sent");
    }

}
Comments:

Post a Comment:
  • HTML Syntax: NOT allowed
About

Daniel Fuchs blogs on Scene Builder, JMX, SNMP, Java, etc...

The views expressed on this blog are those of the author and do not necessarily reflect the views of Oracle.

Search

Categories
Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today