开发者

Java PipedInputStream available() method return value

开发者 https://www.devze.com 2023-04-07 19:30 出处:网络
I am trying to write a piece of unblocking code toread from a PipedInputStream. It basically checks if there is anything to be read before calling the blocking read API:

I am trying to write a piece of unblocking code to read from a PipedInputStream. It basically checks if there is anything to be read before calling the blocking read API:

int n = 0;
if ((n = pipedInputStream_.available()) > 0) {
     pipedInputStream_.read(...)
}

Reading through the java API doc I cant tell for sure what that check should be, since the possible values are zero (implie开发者_JAVA技巧s no data, or closed/broken stream) or greater than zero . So how can the caller know if there is anything to be read at all?

"Returns the number of bytes that can be read from this input stream without blocking, or 0 if this input stream has been closed by invoking its close() method, or if the pipe is unconnected, or broken."

Looking at the source it seems like the only values are zero or greater than zero.

public synchronized int available() throws IOException {
    if(in < 0)
        return 0;
    else if(in == out)
        return buffer.length;
    else if (in > out)
        return in - out;
    else
        return in + buffer.length - out;
}


If available() returns zero, there are no bytes available to read at present. Per the documentation you quote, that can be so for several reasons:

  • The pipe was closed.
  • The pipe is broken.
  • All the previously-available input (if any) was already consumed.

A zero return value from available() might imply that an error had occurred, implying that you will never be able to read any more data through the pipe in the future, but you can't tell for sure here, because zero might be indicating that third condition above, where blocking on InputStream#read() might eventually yield more data that the corresponding OutputStream side will push through the pipe.

I don't see that it's possible to poll a PipedInputStream with available() until more data becomes available, because you'll never be able to distinguish the terminal cases above (the first and the second) from the reader being more hungry than the writer. Like so many stream interfaces, here too you have to try to consume and be ready to fail. That's the trap; InputStream#read() will block, but not until you commit to blocking on an attempt to read will you be able to discern that no more input is coming.

It is not feasible to base your consuming actions on available(). If it returns a positive number, there's something to be read, but of course even what is available now might not be "enough" to satisfy your consumer. You will find your application easier to manage if you commit a thread to consuming the InputStream in blocking fashion and skip the polling with available(). Let InputStream#read() be your sole oracle here.


I needed a filter to intercept slow connections where I need to close DB connections ASAP so I initially used Java pipes but when looked closer at their implementation, it is all synchronized so I ended up creating my own QueueInputStream using a small buffer and Blocking queue to put the buffer in the queue once was full, it is lock free except when for the lock conditions used at LinkedBlockingQueue which with the aid of the small buffer it should be cheap, this class is only intended to be used for a single producer and consumer per instance:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{-1};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

}
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号