About Me

Michael Zucchi

 B.E. (Comp. Sys. Eng.)

  also known as Zed
  to his mates & enemies!

notzed at gmail >
fosstodon.org/@notzed >

Tags

android (44)
beagle (63)
biographical (104)
blogz (9)
business (1)
code (77)
compilerz (1)
cooking (31)
dez (7)
dusk (31)
esp32 (4)
extensionz (1)
ffts (3)
forth (3)
free software (4)
games (32)
gloat (2)
globalisation (1)
gnu (4)
graphics (16)
gsoc (4)
hacking (459)
haiku (2)
horticulture (10)
house (23)
hsa (6)
humour (7)
imagez (28)
java (231)
java ee (3)
javafx (49)
jjmpeg (81)
junk (3)
kobo (15)
libeze (7)
linux (5)
mediaz (27)
ml (15)
nativez (10)
opencl (120)
os (17)
panamaz (5)
parallella (97)
pdfz (8)
philosophy (26)
picfx (2)
players (1)
playerz (2)
politics (7)
ps3 (12)
puppybits (17)
rants (137)
readerz (8)
rez (1)
socles (36)
termz (3)
videoz (6)
vulkan (3)
wanki (3)
workshop (3)
zcl (4)
zedzone (26)
Monday, 03 June 2019, 01:17

Parallel Streams, Blocking Queues

I've been using Java Streams a bit to do various bits of work. One useful feature is the ability to go wide using parallel streams. But I've often found it doesn't perform all that well.

I have written a Stream Parallel.map(Stream, Function) call which wraps a stream mapping process in a one that farms it out to a pool of threads and recombines it afterwards. This works well for some tasks particularly as you can set the number of threads, but it is still quite coarse and you can't recursively call it (actually you can, it just launches lots of threads).

Anyway so i'm trying to think of a way to break up multiple levels of parallel streams into smaller blocks of tasks that can be more freely scheduled. Whilst trying to fit it within the Stream processing model which is pull oriented.

I'm still nutting out the details but for now I have written a lockless, bounded, blocking, priority queue. It only supports a fixed set of discrete priority levels.

I did some micro-benchmarks against ArrayBlockingQueue (without priority) and i'm surprised how well it worked - from about 5x to 20x faster depending on the contention level.

Each priority level has it's own lockless queue implemented using an array and 3 counters. The array is accessed using cyclic addressing so all operations are O(1).

static class Queue<T> {
        volatile int head;
        volatile int tail;
        volatile int alloc;
        final T[] queue;
}

The trick is that it doesn't implement any waiting operations because it uses external arbitration to avoid the need to.

This makes put() particularly simple. I'm using pseudo-code atomics below, but these are implemented using VarHandles.

       void put(T value) {
                int a = atomic_inc(alloc);
                int b = a + 1;

                volatile_set(queue, a & (queue.length-1), value);
                while (!atomic_cas(head, a, b))
                        Thread.onSpinWait();
       }

First, the allocation cannot fail and simply assigns a working slot for the new item. The item is then filled and then the atomic_cas() (compare-and-set) is used to ensure that the head pointer is incremented sequentially regardless of the number of threads which reserved slots.

The poll() method is slightly more complex.

        T poll() {
                int h, n, t;
                T node;

                do {
                        t = tail;
                        h = head;
                        if (h == t)
                                return null;
                        node = volatile_get(queue, t & (queue.length - 1));
                        n = t + 1;
                } while (!atomic_cas(tail, t, n));

                return node;
        }

First it checks it the queue is empty and if so simply exits. It then takes the current queue tail and then updates the tail counter. If the tail pointer changed because another poll() completed first, then it just retries.

The order of reading the head and tail counters is important here! If tail is read second it is possible to double-read the same value.

This isn't a full queue implementation as a number of important features still missing:

  1. Limiting the number of put()s so that the queue isn't overwritten;
  2. Blocking on full-write when the queue is full, without busy-waiting;
  3. Blocking on empty-read when the queue is empty, without busy waiting.

All of these can be almost trivially implemented using a pair of Semaphores and an atomic integer.

  1. A sempaphore with (queue.length-1) reservations limits put() calls. A successful poll releases a reservation.
  2. The first semaphore does this as well.
  3. An atomically updated counter and another semaphore is used to implement wake-up on empty-read.

It's a bit tricky to benchmark and the results are quite noisy despite setting the CPU to a specific (low) clock speed.

But in general this is around 10x faster than using ArrayBlockingQueue for "low-contested" situations (6x writers, 6x readers on a 12x thread cpu). In a "high-contested" situation (32x writers, 32x readers), it's more like 15-20x faster, and scales better. Despite tight loops the ArrayBlockingQueue is unable to saturate the CPU resources (via top) and much of the time is spent in overhead (?somewhere?). Profiling in netbeans didn't offer any particular insight on where.

These are of course highly-contrived situations but the performance was a pleasant surprise. It might not work on systems with a weaker memory model than AMD-64 but I don't have access to such exotic systems.

This still doesn't solve the base problem I was working on but it might be a useful part thereof.

Tagged code, hacking, java.
Kinect2 device for FFmpeg, libfreenect2 | jjmpeg callbacks
Copyright (C) 2019 Michael Zucchi, All Rights Reserved. Powered by gcc & me!