From 550e2a28adafc893524ac0dab8b3cf42f9e4cff5 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 03 Apr 2012 09:13:03 +0000
Subject: [PATCH] Fix OPENDJ-462: Spinning threads in JE backend importer
---
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 75 ++++++++++++++++++-------------------
1 files changed, 36 insertions(+), 39 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 4a02da5..8cc703f 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -2683,59 +2683,56 @@
* {@inheritDoc}
*/
@Override
- public Void call() throws IOException
+ public Void call() throws IOException, InterruptedException
{
long offset = 0;
List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
try {
while(true)
{
- IndexOutputBuffer indexBuffer = queue.poll();
- if(indexBuffer != null)
+ final IndexOutputBuffer indexBuffer = queue.take();
+ long beginOffset = offset;
+ long bufferLen;
+ if(!queue.isEmpty())
{
- long beginOffset = offset;
- long bufferLen;
- if(!queue.isEmpty())
+ queue.drainTo(l, DRAIN_TO);
+ l.add(indexBuffer);
+ bufferLen = writeIndexBuffers(l);
+ for(IndexOutputBuffer id : l)
{
- queue.drainTo(l, DRAIN_TO);
- l.add(indexBuffer);
- bufferLen = writeIndexBuffers(l);
- for(IndexOutputBuffer id : l)
+ if(!id.isDiscard())
{
- if(!id.isDiscard())
- {
- id.reset();
- freeBufferQueue.add(id);
- }
- }
- l.clear();
- }
- else
- {
- if(indexBuffer.isPoison())
- {
- break;
- }
- bufferLen = writeIndexBuffer(indexBuffer);
- if(!indexBuffer.isDiscard())
- {
- indexBuffer.reset();
- freeBufferQueue.add(indexBuffer);
+ id.reset();
+ freeBufferQueue.add(id);
}
}
- offset += bufferLen;
-
- // Write buffer index information.
- bufferIndexStream.writeLong(beginOffset);
- bufferIndexStream.writeLong(offset);
-
- bufferCount++;
- Importer.this.bufferCount.incrementAndGet();
-
- if(poisonSeen)
+ l.clear();
+ }
+ else
+ {
+ if(indexBuffer.isPoison())
{
break;
}
+ bufferLen = writeIndexBuffer(indexBuffer);
+ if(!indexBuffer.isDiscard())
+ {
+ indexBuffer.reset();
+ freeBufferQueue.add(indexBuffer);
+ }
+ }
+ offset += bufferLen;
+
+ // Write buffer index information.
+ bufferIndexStream.writeLong(beginOffset);
+ bufferIndexStream.writeLong(offset);
+
+ bufferCount++;
+ Importer.this.bufferCount.incrementAndGet();
+
+ if(poisonSeen)
+ {
+ break;
}
}
}
--
Gitblit v1.10.0