开发者

Synchronized FIFO Buffer Usage

开发者 https://www.devze.com 2023-04-10 07:08 出处:网络
I am trying to create a system where one thread A adds items to a buffer, then another thread B is responsible for reading the items in the exact order they were entered, and then doing some potential

I am trying to create a system where one thread A adds items to a buffer, then another thread B is responsible for reading the items in the exact order they were entered, and then doing some potentially lengthily operations on them.

My best guess:

 Class B extends Thread {

    Buffer fifo = BufferUtils.synchronizedBuffer(new BoundedFifoBuffer());

    add(Object o) { // Thread A calls me, and doesn't deal well with delays :)
      fifo.add(o); // will the sync below prevent this from happening? 
                   // or can .add be independent of the sync ?开发者_JAVA百科
    }

    run() {
     synchronized (fifo) { // why am i sync'd here?  I am the only thread accessing...
         while ( item in buffer ) { // also how do i check this, and block otherwise?
            process(fifo.remove());
         }
     }
    |
  }

As you can see, I'm not even fully sure if synchronization is necessary. The thread safety issue I have has nothing to do with the get() access, as there will only be one thread accessing it, but what is most important is that thread A calls .add() without any Concurrent Access Exception during thread B's processing of the buffer's contents.

Maybe I'm overthinking this? Is it safe to being with? Your evaluation of this problem is much appreciated.

Sincerely,

Jay


If I am not wrong, you could also be interested in this ArrayBlockingQueue class.


If you have a stream of characters for logging the fastest approach may be to use a pipe.

    PipedOutputStream pos = new PipedOutputStream();
    final PipedInputStream pis = new PipedInputStream(pos, 256*1024);
    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(new Runnable() {
        @Override
        public void run() {
            byte[] bytes = new byte[256*1024];
            int length;
            try {
                while ((length = pis.read(bytes)) > 0) {
                    // something slow.
                    Thread.sleep(1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });

    // time latency
    PrintWriter pw = new PrintWriter(pos);
    long start = System.nanoTime();
    int runs = 10*1000*1000;
    for(int i=0;i<runs;i++) {
        pw.println("Hello "+i);
    }
    long time = System.nanoTime() - start;
    System.out.printf("Took an average of %,d nano-seconds per line%n", time/runs);
    es.shutdown();

prints

    Took an average of 269 nano-seconds per line

Note: the pipe itself doesn't create any garbage. (Unlike a queue)


You can use ExecutorService to wrap up a queue and thread(s)

ExecutorService es =

es.submit(new Runnable() {
  public void run() {
     process(o);
  }
});
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号