开发者

Where's the problem with this BoundedBuffer class?

开发者 https://www.devze.com 2023-04-03 20:16 出处:网络
Properties of the bounded buffer class I\'m trying to build... Multiple producer, multiple consumer. Blocking producer and blocking consumer.

Properties of the bounded buffer class I'm trying to build...

  • Multiple producer, multiple consumer.
  • Blocking producer and blocking consumer.
  • Uses AtomicInteger as the read/write pointers.
  • Uses AtomicReferenceArray (taking in a generic type) for holding the buffer.
  • Buffer is of Short.MAX_VALUE size and it uses CAS to deal with overflows.

Now for the problem...

Problem: I can't seem to comment out the synchronized(this) blocks in the code below. The whole point of using AtomicInteger as pointers I thought was to avoid doing this.

Commenting out the synchronized(this) blocks yields in consumers missing some items that producers have put in. If I include the synchronized(this) block, everything's great and every single thing produced is consumed.

What am I missing?

public class BoundedBuffer<T> {
    private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
    private AtomicReferenceArray<T> m_buffer = null;
    private Semaphore m_full = new Semaphore(BUFFER_SIZE);
    private Semaphore m_empty = new Semaphore(0);
    private AtomicInteger m_writePointer = new AtomicInteger();
    private AtomicInteger m_readPointer = new AtomicInteger();

    public BoundedBuffer() {
        m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
    }

    public static int safeGetAndIncrement(AtomicInteger i) {
        int oldValue = 0, newValue = 0;
        do {
            oldValue = i.get();
            newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
        } while (!i.compareAndSet(oldValue, newValue));
        return oldValue;
    }

    public void add(T data) throws InterruptedException {
        m_full.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            m_buffer.set(safeGetAndIncrement(m_writePointer),data);
        }
        m_empty.release();
    }

    public T get() throws InterruptedException {
        T data = null;
        m_empty.acquire();
        synchronized (this) { // << Commenting this doesn't work
            // CAS-based overflow handling
            data = m_buffer.get(safeGetAndIncrement(m_readP开发者_运维知识库ointer));
        }
        m_full.release();
        return data;
    }
}


There could be a problem where the get() from the array is not atomic with the increment when the synchronized block is removed. The breaking scenario I speculate on requires the producer to be overrunning the consumers, then you could have the producer overwrite an array entry that has not been read yet IF the semaphore release was triggered by an out-of-order read.

Consider the situation where the buffer is full (writer index is at N, reader index is at N+1) and 2 threads are trying to read from the buffer. (Assume that N is not close to the wraparound point for simplicity.)

Thread 1 receives the index N+1 from which to read its item.

Thread 2 receives the index N+2 from which to read its item.

Due to a fluke of scheduling Thread 2 gets from the buffer array first and releases the m_full semaphore before Thread 1 gets its item from the array.

Thread 3 (a producer) wakes up and writes an item into the next available slot N+1 in the buffer, also before Thread 1 has read from the buffer.

Thread 1 then gets the item at index N+1, but has missed the item it wanted.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号