| | |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public Void call() throws IOException |
| | | public Void call() throws IOException, InterruptedException |
| | | { |
| | | long offset = 0; |
| | | List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>(); |
| | | try { |
| | | while(true) |
| | | { |
| | | IndexOutputBuffer indexBuffer = queue.poll(); |
| | | if(indexBuffer != null) |
| | | final IndexOutputBuffer indexBuffer = queue.take(); |
| | | long beginOffset = offset; |
| | | long bufferLen; |
| | | if(!queue.isEmpty()) |
| | | { |
| | | long beginOffset = offset; |
| | | long bufferLen; |
| | | if(!queue.isEmpty()) |
| | | queue.drainTo(l, DRAIN_TO); |
| | | l.add(indexBuffer); |
| | | bufferLen = writeIndexBuffers(l); |
| | | for(IndexOutputBuffer id : l) |
| | | { |
| | | queue.drainTo(l, DRAIN_TO); |
| | | l.add(indexBuffer); |
| | | bufferLen = writeIndexBuffers(l); |
| | | for(IndexOutputBuffer id : l) |
| | | if(!id.isDiscard()) |
| | | { |
| | | if(!id.isDiscard()) |
| | | { |
| | | id.reset(); |
| | | freeBufferQueue.add(id); |
| | | } |
| | | } |
| | | l.clear(); |
| | | } |
| | | else |
| | | { |
| | | if(indexBuffer.isPoison()) |
| | | { |
| | | break; |
| | | } |
| | | bufferLen = writeIndexBuffer(indexBuffer); |
| | | if(!indexBuffer.isDiscard()) |
| | | { |
| | | indexBuffer.reset(); |
| | | freeBufferQueue.add(indexBuffer); |
| | | id.reset(); |
| | | freeBufferQueue.add(id); |
| | | } |
| | | } |
| | | offset += bufferLen; |
| | | |
| | | // Write buffer index information. |
| | | bufferIndexStream.writeLong(beginOffset); |
| | | bufferIndexStream.writeLong(offset); |
| | | |
| | | bufferCount++; |
| | | Importer.this.bufferCount.incrementAndGet(); |
| | | |
| | | if(poisonSeen) |
| | | l.clear(); |
| | | } |
| | | else |
| | | { |
| | | if(indexBuffer.isPoison()) |
| | | { |
| | | break; |
| | | } |
| | | bufferLen = writeIndexBuffer(indexBuffer); |
| | | if(!indexBuffer.isDiscard()) |
| | | { |
| | | indexBuffer.reset(); |
| | | freeBufferQueue.add(indexBuffer); |
| | | } |
| | | } |
| | | offset += bufferLen; |
| | | |
| | | // Write buffer index information. |
| | | bufferIndexStream.writeLong(beginOffset); |
| | | bufferIndexStream.writeLong(offset); |
| | | |
| | | bufferCount++; |
| | | Importer.this.bufferCount.incrementAndGet(); |
| | | |
| | | if(poisonSeen) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | } |