PTLQueue : a scalable bounded-capacity MPMC queue

I've used the following concurrent queue algorithm enough that it warrants a blog entry. I'll sketch out the design of a fast and scalable multiple-producer multiple-consumer (MPMC) concurrent queue called PTLQueue. The queue has bounded capacity and is implemented via a circular array. Bounded capacity can be a useful property if there's a mismatch between producer rates and consumer rates where an unbounded queue might otherwise result in excessive memory consumption by virtue of the container nodes that -- in some queue implementations -- are used to hold values. A bounded-capacity queue can provide flow control between components. Beware, however, that bounded collections can also result in resource deadlock if abused. The put() and take() operators are partial and wait for the collection to become non-full or non-empty, respectively. Put() and take() do not allocate memory, and are not vulnerable to the ABA pathologies. The PTLQueue algorithm can be implemented equally well in C/C++ and Java.

Partial operators are often more convenient than total methods. In many use cases if the preconditions aren't met, there's nothing else useful the thread can do, so it may as well wait via a partial method. An exception is in the case of work-stealing queues where a thief might scan a set of queues from which it could potentially steal. Total methods return ASAP with a success-failure indication. (It's tempting to describe a queue or API as blocking or non-blocking instead of partial or total, but non-blocking is already an overloaded concurrency term. Perhaps waiting/non-waiting or patient/impatient might be better terms). It's also trivial to construct partial operators by busy-waiting via total operators, but such constructs may be less efficient than an operator explicitly and intentionally designed to wait.

A PTLQueue instance contains an array of slots, where each slot has volatile Turn and MailBox fields. The array has power-of-two length allowing mod/div operations to be replaced by masking. We assume sensible padding and alignment to reduce the impact of false sharing. (On x86 I recommend 128-byte alignment and padding because of the adjacent-sector prefetch facility). Each queue also has PutCursor and TakeCursor cursor variables, each of which should be sequestered as the sole occupant of a cache line or sector. You can opt to use 64-bit integers if concerned about wrap-around aliasing in the cursor variables. Put(null) is considered illegal, but the caller or implementation can easily check for and convert null to a distinguished non-null proxy value if null happens to be a value you'd like to pass. Take() will accordingly convert the proxy value back to null. An advantage of PTLQueue is that you can use atomic fetch-and-increment for the partial methods. We initialize each slot at index I with (Turn=I, MailBox=null). Both cursors are initially 0. All shared variables are considered "volatile" and atomics such as CAS and AtomicFetchAndIncrement are presumed to have bidirectional fence semantics. Finally T is the templated type.

 

// PTLQueue :

Put(v) : // producer : partial method - waits as necessary
assert v != null
assert Mask >= 1 && (Mask & (Mask+1)) == 0 // Document invariants
// doorway step
// Obtain a sequence number -- ticket
// As a practical concern the ticket value is temporally unique
// The ticket also identifies and selects a slot
auto tkt = AtomicFetchIncrement (&PutCursor, 1)
slot * s = &Slots[tkt & Mask]
// waiting phase :
// wait for slot's generation to match the tkt value assigned to this put() invocation.
// The "generation" is implicitly encoded as the upper bits in the cursor
// above those used to specify the index : tkt div (Mask+1)
// The generation serves as an epoch number to identify a cohort of threads
// accessing disjoint slots
while s->Turn != tkt : Pause
assert s->MailBox == null
s->MailBox = v // deposit and pass message

Take() : // consumer : partial method - waits as necessary
auto tkt = AtomicFetchIncrement (&TakeCursor,1)
slot * s = &Slots[tkt & Mask]
// 2-stage waiting :
// First wait for turn for our generation
// Acquire exclusive "take" access to slot's MailBox field
// Then wait for the slot to become occupied
while s->Turn != tkt : Pause
// Concurrency in this section of code is now reduced to just 1 producer thread
// vs 1 consumer thread.
// For a given queue and slot, there will be most one Take() operation running
// in this section.
// Consumer waits for producer to arrive and make slot non-empty
// Extract message; clear mailbox; advance Turn indicator
// We have an obvious happens-before relation : HBO
// Put(m) happens-before corresponding Take() that returns that same "m" - antecedent
for
T v = s->MailBox
if v != null :
s->MailBox = null
ST-ST barrier
s->Turn = tkt + Mask + 1 // unlock slot to admit next producer and consumer
return v
Pause


PTLQueue borrows and derives from the Partitioned Ticket Lock "PTL" (US20120240126-A1) and the MultiLane Concurrent Bag (US8689237). The latter is essentially a circular ring-buffer where the elements themselves are queues or concurrent collections. You can think of the PTLQueue as a partitioned ticket lock "PTL" augmented to pass values from lock to unlock via the slots. Alternatively, you could conceptualize of PTLQueue as a degenerate MultiLane bag where each slot or "lane" consists of a simple single-word MailBox instead of a general queue. Each lane in PTLQueue also has a private Turn field which acts like the Turn (Grant) variables found in PTL. Turn enforces strict FIFO ordering and restricts concurrency on the slot mailbox field to at most one simultaneous put() and take() operation.

PTL uses a single "ticket" variable and per-slot Turn (grant) fields while MultiLane has distinct PutCursor and TakeCursor cursors and abstract per-slot sub-queues but does not use per-slot Turn variables. Both PTL and MultiLane advance their cursor and ticket variables with atomic fetch-and-increment. PTLQueue borrows from both PTL and MultiLane and incorporates distinct put and take cursors and per-slot Turn fields. Instead of a per-slot queues, PTLQueue uses a simple single-word MailBox field. PutCursor and TakeCursor act like a pair of ticket locks, conferring "put" and "take" access to a given slot. PutCursor, for instance, assigns an incoming put() request to a slot and serves as a PTL "Ticket" to acquire "put" permission to that slot's MailBox field. To better explain the operation of PTLQueue we deconstruct the operation of put() and take() as follows. Put() first increments PutCursor obtaining a new unique ticket. That ticket value also identifies a slot. Put() next waits for that slot's Turn field to match that ticket value. This is tantamount to using a PTL to acquire "put" permission on the slot's MailBox field. Finally, having obtained exclusive "put" permission on the slot, put() stores the message value into the slot's MailBox. Take() similarly advances TakeCursor, identifying a slot, and then acquires and secures "take" permission on a slot by waiting for Turn. Take() then waits for the slot's MailBox to become non-empty, extracts the message, and clears MailBox. Finally, take() advances the slot's Turn field, which releases both "put" and "take" access to the slot's MailBox. Note the asymmetry : put() acquires "put" access to the slot, but take() releases that lock. At any given time, for a given slot in a PTLQueue, at most one thread has "put" access and at most one thread has "take" access. This restricts concurrency from general MPMC to 1-vs-1. We have 2 ticket locks -- one for put() and one for take() -- each with its own "ticket" variable in the form of the corresponding cursor, but they share a single "Grant" egress variable in the form of the slot's Turn variable. Advancing the PutCursor, for instance, serves two purposes. First, we obtain a unique ticket which identifies a slot. Second, incrementing the cursor is the doorway protocol step to acquire the per-slot mutual exclusion "put" lock. The cursors and operations to increment those cursors serve double-duty : slot-selection and ticket assignment for locking the slot's MailBox field.

At any given time a slot MailBox field can be in one of the following states: empty with no pending operations -- neutral state; empty with one or more waiting take() operations pending -- deficit; occupied with no pending operations; occupied with one or more waiting put() operations -- surplus; empty with a pending put() or pending put() and take() operations -- transitional; or occupied with a pending take() or pending put() and take() operations -- transitional.

The partial put() and take() operators can be implemented with an atomic fetch-and-increment operation, which may confer a performance advantage over a CAS-based loop. In addition we have independent PutCursor and TakeCursor cursors. Critically, a put() operation modifies PutCursor but does not access the TakeCursor and a take() operation modifies the TakeCursor cursor but does not access the PutCursor. This acts to reduce coherence traffic relative to some other queue designs.

It's worth noting that slow threads or obstruction in one slot (or "lane") does not impede or obstruct operations in other slots -- this gives us some degree of obstruction isolation. With respect to progress properties, however, PTLQueue is not lock-free.

The implementation above is expressed with polite busy-waiting (Pause) but it's trivial to implement per-slot parking and unparking to deschedule waiting threads. It's also easy to convert the queue to a more general deque by replacing the PutCursor and TakeCursor cursors with Left/Front and Right/Back cursors that can move either direction. Specifically, to push and pop from the "left" side of the deque we would decrement and increment the Left cursor, respectively, and to push and pop from the "right" side of the deque we would increment and decrement the Right cursor, respectively.

We used a variation of PTLQueue for message passing in our recent OPODIS 2013 paper.

I'll now introduce a variation on the above, but borrow Dmitry Vyukov's encoding for the Turn variable. After a thread deposits a value into the MailBox, it advances Turn by 1 to indicate to Take() operations that a value is present. Turn values follow this trajectory : K, K+1, K+1+Mask, etc. (More precisely, the Turn field for slot I takes on values of the form: I*G, (I*G)+1, I*(G+1), etc where G is the generation number). This form is friendlier for a total tryTake() operator. Put() and Take() operators are also more symmetric than in the version above. It also allows null to be passed without substituting proxy values. Like the form above, it can use and benefit from fetch-and-increment instead of CAS for the partial forms. the partial tryTake() operator, however, uses CAS. tryPut() -- not shown -- is analogous to tryTake(). UPDATE: see the comments section for a concern identified by Nitsan.

 

// PTLQueueV2 :

Put(v) : // producer : partial method - waits as necessary
assert Mask > 1 && (Mask & (Mask+1)) == 0 // Document invariants
// doorway step
// Obtain a sequence number -- ticket
// As a practical concern the ticket value is temporally unique
// The ticket also identifies and selects a slot
auto tkt = AtomicFetchIncrement (&PutCursor, 1)
slot * s = &Slots[tkt & Mask]

// waiting phase :
// wait for slot's generation to match the tkt value assigned to this put() invocation.
// The "generation" is implicitly encoded as the upper bits in the cursor
// above those used to specify the index : tkt div (Mask+1)
// The generation serves as an epoch number to identify a cohort of threads
// accessing disjoint slots
while s->Turn != tkt : Pause

assert s->MailBox == null
s->MailBox = v // deposit and pass message
s->Turn = tkt + 1 // mark occupied - release corresponding Take() operation
// We pass & cede ownership and exclusive access of MailBox to the next Take()
// That Take() operation will be in the same generation as this Put()

Take() : // consumer : partial method - waits as necessary
auto tkt = AtomicFetchIncrement (&TakeCursor,1)
slot * s = &Slots[tkt & Mask]

// Wait turn for our generation and for the slot become occupied
while s->Turn != (tkt+1) : Pause

T v = s->MailBox // extract message
// In a garbage-collected environment we might want to set s->MailBox = null
s->Turn = tkt + Mask + 1 // mark unoccupied - unlock slot to admit next Put()
// We pass & cede ownership and exclusive access to MailBox to the next Put()
// That Put() will be in the next generation
return v

// tryTake() is a "strong" operator in that it can return null IFF
// the queue was empty at some point during the tryTake() invocation.
// Spurious false-null return values are not allowed.
// Note that if we provide tryTake() with the interface below, then null can not
// be a legal message value passed via Put(null). Alternatively, we could allow
// null messages but augment tryTake() to return a success-failure indication as well
// as the value.
// Thanks to Nathan Reyonds for comments on an earlier version of this algorithm.

tryTake() : // consume : "strong" total method - returns ASAP
for
auto tkt = TakeCursor
slot * s = &Slots[tkt & Mask]
auto delta = s->Turn - (tkt+1)
if delta == 0 :
if CAS (&TakeCursor, tkt, tkt+1) == tkt :
// our thread has exclusive access to s->MailBox
T v = s->MailBox
s->Turn = tkt + Mask + 1 // ceded MailBox access to next Put()
return v
continue
if delta lessThan 0 :
return null
// inopportune concurrent interleaving - race
// tkt is stale with respect to TakeCursor
// Other Take() or tryTake() operations bypassed this operation
// and raced past
// This can happen if we stall after loading TakeCursor, for instance
// just retry
assert TakeCursor != tkt


There's quite a bit of related literature in this area. I'll call out a few relevant references:

I'll propose an optimization left as an exercise for the reader. Say we wanted to reduce memory usage by eliminating inter-slot padding. Such padding is usually "dark" memory and otherwise unused and wasted. But eliminating the padding leaves us at risk of increased false sharing. Furthermore lets say it was usually the case that the PutCursor and TakeCursor were numerically close to each other. (That's true in some use cases). We might still reduce false sharing by incrementing the cursors by some value other than 1 that is not trivially small and is coprime with the number of slots. Alternatively, we might increment the cursor by one and mask as usual, resulting in a logical index. We then use that logical index value to index into a permutation table, yielding an effective index for use in the slot array. The permutation table would be constructed so that nearby logical indices would map to more distant effective indices. (Open question: what should that permutation look like? Possibly some perversion of a Gray code or De Bruijn sequence might be suitable).

As an aside, say we need to busy-wait for some condition as follows : "while C == 0 : Pause". Lets say that C is usually non-zero, so we typically don't wait. But when C happens to be 0 we'll have to spin for some period, possibly brief. We can arrange for the code to be more machine-friendly with respect to the branch predictors by transforming the loop into : "if C == 0 : for { Pause; if C != 0 : break; }". Critically, we want to restructure the loop so there's one branch that controls entry and another that controls loop exit. A concern is that your compiler or JIT might be clever enough to transform this back to "while C == 0 : Pause". You can sometimes avoid this by inserting a call to a some type of very cheap "opaque" method that the compiler can't elide or reorder. On Solaris, for instance, you could use :"if C == 0 : { gethrtime(); for { Pause; if C != 0 : break; }}".

It's worth noting the obvious duality between locks and queues. If you have strict FIFO lock implementation with local spinning and succession by direct handoff such as MCS or CLH,then you can usually transform that lock into a queue.

If you want a multiple-producer single-consumer MPSC queue then you can replace atomic operations on the TakeCursor with normal updates. More generally you can take a simple SPSC queue and wrap the put() and take() operations with put and take mutexes, restricting concurrency to 1-vs-1, but the performance isn't usually as good as what you'd get from a queue designed for MPSC usage.

Comments:

Is it possible to implement this queue in Java without creating any heap objects?

Posted by guest on December 31, 2014 at 02:55 PM EST #

It would require some allocation at creation time, but other than that, it doesn't allocate during normal operation.

Regards, -Dave

Posted by guest on January 01, 2015 at 12:18 AM EST #

Minor typo: "multiple-producer multiple-consumer (MPSC) " should be "multiple-producer multiple-consumer (MPMC)"

Posted by Nitsan Wakart on January 08, 2015 at 05:29 AM EST #

For a close interpretation of the D.Vyukov MPMC for Java see:
https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java

Posted by Nitsan Wakart on January 08, 2015 at 05:45 AM EST #

I believe there's a bug in tryPoll().
Consider 2 producer threads, P1 and P2:
- P1 successfully increments PutCursor and is then interrupted immediately (before storing to mailbox/turn).
- P2 successfully increments and deposits to slot[1].
For the sake of argument we can have now P2 call tryTake():
- TakeCursor is 0, slot[0].turn is 0 -> delta = 0 - (0+1) = -1
- delta < 0 -> return null

Posted by Nitsan Wakart on January 08, 2015 at 06:54 AM EST #

Hi Nitsan, thanks for catching the typo. Fixed. Regards, -Dave

Posted by guest on January 19, 2015 at 05:19 PM EST #

Hi Nitsan, good point on the tryTake() issue.

Posted by guest on January 19, 2015 at 05:37 PM EST #

Following up on the flaw in tryTake() that Nitsan pointed out ...

The crux issue is that Put() requires multiple steps to complete and tryTake() can see intermediate Put()-related state and incorrectly return null. A possibly remedy is to modify tryTake() to check PutCursor before before it's about to return null. If PutCursor matches the original fetched value of TakeCursor then it's viable to return null. If not, tryTake() would keep retrying in its outer loop. The bad news is that tryTake() might need wait an indefinite time for a stalled Put(), violating it's "expedient return" mandate. Also, we'd need to check the PutCursor, which may induce more coherence traffic. (Perhaps the latter is not so bad in practice, as the fetch of PutCursor would appear only in the path where we're about to return null and the queue appears empty).

It appears Dmitry's MPMC bounded queue has the same issue.

The following queue avoids the problem using the checks I described above : https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java. See poll().

PTLQueueV2 Put() and Take() remain safe, but for the moment tryTake() should be
considered wrong.

Posted by guest on January 21, 2015 at 05:48 PM EST #

> It appears Dmitry's MPMC bounded queue has the same issue.
Well... I think Vyukov is not committed to the same API we are, or particularly the the Java Queue::poll API (lucky, lucky man). He seems to offer the same guarantees more explicitly in his linked MPSC queue.
We could add a poll()/weakPoll() to the API where poll reflects the Queue::poll requirement (null iff empty) and weakPoll() follows Vyukov's implementation (null if next element not visible). Both serve a valid purpose.

Posted by Nitsan Wakart on January 26, 2015 at 11:59 AM EST #

Hi Nitsan, Agreed. Thanks again for pointing out the issue. -Dave

Posted by guest on January 26, 2015 at 12:18 PM EST #

Post a Comment:
  • HTML Syntax: NOT allowed
About

Dave is a senior research scientist in the Scalable Synchronization Research Group within Oracle Labs : Google Scholar.

Search

Categories
Archives
« March 2015
SunMonTueWedThuFriSat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    
       
Today