mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
03.13.2012 550e2a28adafc893524ac0dab8b3cf42f9e4cff5
Fix OPENDJ-462: Spinning threads in JE backend importer

Use Queue.take() instead of Queue.poll(). Thanks to Radiant Logic for spotting this.
1 files modified
75 ■■■■ changed files
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java 75 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -2683,59 +2683,56 @@
     * {@inheritDoc}
     */
    @Override
    public Void call() throws IOException
    public Void call() throws IOException, InterruptedException
    {
      long offset = 0;
      List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
      try {
        while(true)
        {
          IndexOutputBuffer indexBuffer = queue.poll();
          if(indexBuffer != null)
          final IndexOutputBuffer indexBuffer = queue.take();
          long beginOffset = offset;
          long bufferLen;
          if(!queue.isEmpty())
          {
            long beginOffset = offset;
            long bufferLen;
            if(!queue.isEmpty())
            queue.drainTo(l, DRAIN_TO);
            l.add(indexBuffer);
            bufferLen = writeIndexBuffers(l);
            for(IndexOutputBuffer id : l)
            {
              queue.drainTo(l, DRAIN_TO);
              l.add(indexBuffer);
              bufferLen = writeIndexBuffers(l);
              for(IndexOutputBuffer id : l)
              if(!id.isDiscard())
              {
                if(!id.isDiscard())
                {
                  id.reset();
                  freeBufferQueue.add(id);
                }
              }
              l.clear();
            }
            else
            {
              if(indexBuffer.isPoison())
              {
                break;
              }
              bufferLen = writeIndexBuffer(indexBuffer);
              if(!indexBuffer.isDiscard())
              {
                indexBuffer.reset();
                freeBufferQueue.add(indexBuffer);
                id.reset();
                freeBufferQueue.add(id);
              }
            }
            offset += bufferLen;
            // Write buffer index information.
            bufferIndexStream.writeLong(beginOffset);
            bufferIndexStream.writeLong(offset);
            bufferCount++;
            Importer.this.bufferCount.incrementAndGet();
            if(poisonSeen)
            l.clear();
          }
          else
          {
            if(indexBuffer.isPoison())
            {
              break;
            }
            bufferLen = writeIndexBuffer(indexBuffer);
            if(!indexBuffer.isDiscard())
            {
              indexBuffer.reset();
              freeBufferQueue.add(indexBuffer);
            }
          }
          offset += bufferLen;
          // Write buffer index information.
          bufferIndexStream.writeLong(beginOffset);
          bufferIndexStream.writeLong(offset);
          bufferCount++;
          Importer.this.bufferCount.incrementAndGet();
          if(poisonSeen)
          {
            break;
          }
        }
      }