From 83325bf8cc3b480c00a01ee4c43391cd0238e041 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 12 Mar 2015 16:32:12 +0000
Subject: [PATCH] Simplified loops in import/rebuild-index code.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 138 +++++++++++++++++++--------------------------
1 files changed, 58 insertions(+), 80 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 520cf1e..e685a1d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1309,7 +1309,8 @@
final List<ByteString> includeBranches = includeBranchesAsBytes(suffix);
boolean success = cursor.next();
while (success
- && !importConfiguration.isCancelled() && !isCanceled)
+ && !importConfiguration.isCancelled()
+ && !isCanceled)
{
final ByteString key = cursor.getKey();
if (!includeBranches.contains(key))
@@ -1559,6 +1560,7 @@
}
/** Examine the DN for duplicates and missing parents. */
+ @SuppressWarnings("javadoc")
boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
throws StorageRuntimeException, InterruptedException
{
@@ -1668,17 +1670,12 @@
void flushIndexBuffers() throws InterruptedException, ExecutionException
{
final ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
- Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> it = indexBufferMap.entrySet().iterator();
- while (it.hasNext())
+ for (IndexOutputBuffer indexBuffer : indexBufferMap.values())
{
- Map.Entry<IndexKey, IndexOutputBuffer> e = it.next();
- IndexKey indexKey = e.getKey();
- IndexOutputBuffer indexBuffer = e.getValue();
- it.remove();
- indexBuffer.setIndexKey(indexKey);
indexBuffer.discard();
futures.add(bufferSortService.submit(new SortTask(indexBuffer)));
}
+ indexBufferMap.clear();
getAll(futures);
}
@@ -1689,16 +1686,15 @@
IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
if (indexBuffer == null)
{
- indexBuffer = getNewIndexBuffer(sizeNeeded);
+ indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
indexBufferMap.put(indexKey, indexBuffer);
}
else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
{
// complete the current buffer...
- indexBuffer.setIndexKey(indexKey);
bufferSortService.submit(new SortTask(indexBuffer));
// ... and get a new one
- indexBuffer = getNewIndexBuffer(sizeNeeded);
+ indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
indexBufferMap.put(indexKey, indexBuffer);
}
int indexID = getIndexID(container);
@@ -1706,7 +1702,7 @@
return indexID;
}
- IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException
+ IndexOutputBuffer getNewIndexBuffer(int size, IndexKey indexKey) throws InterruptedException
{
IndexOutputBuffer indexBuffer;
if (size > bufferSize)
@@ -1726,6 +1722,7 @@
{
throw new InterruptedException("Cancel processing received.");
}
+ indexBuffer.setIndexKey(indexKey);
return indexBuffer;
}
@@ -1951,15 +1948,15 @@
@Override
public Void call() throws Exception
{
- ByteStringBuilder key = null;
- ImportIDSet insertIDSet = null;
- ImportIDSet deleteIDSet = null;
-
if (isCanceled)
{
return null;
}
+ final ByteStringBuilder key = new ByteStringBuilder(BYTE_BUFFER_CAPACITY);
+ ImportIDSet insertIDSet = null;
+ ImportIDSet deleteIDSet = null;
+ Integer indexID = null;
try
{
beginWriteTask();
@@ -1972,25 +1969,18 @@
return null;
}
- Integer indexID = null;
while (!bufferSet.isEmpty())
{
IndexInputBuffer b = bufferSet.pollFirst();
- if (key == null)
+ if (!b.sameKeyAndIndexID(key, indexID))
{
- key = new ByteStringBuilder(b.getKeyLen());
+ if (indexID != null)
+ {
+ // save the previous record
+ addToDB(indexID, insertIDSet, deleteIDSet);
+ }
- indexID = b.getIndexID();
- b.fetchKey(key);
-
- insertIDSet = newImportIDSet(key, indexID);
- deleteIDSet = newImportIDSet(key, indexID);
- }
- else if (b.compare(key, indexID) != 0)
- {
- addToDB(indexID, insertIDSet, deleteIDSet);
- keyCount.incrementAndGet();
-
+ // this is a new record, reinitialize all
indexID = b.getIndexID();
b.fetchKey(key);
@@ -1998,6 +1988,7 @@
deleteIDSet = newImportIDSet(key, indexID);
}
+ // merge all entryIds into the idSets
b.mergeIDSet(insertIDSet);
b.mergeIDSet(deleteIDSet);
@@ -2008,7 +1999,7 @@
}
}
- if (key != null)
+ if (indexID != null)
{
addToDB(indexID, insertIDSet, deleteIDSet);
}
@@ -2039,6 +2030,7 @@
private void addToDB(int indexID, ImportIDSet insertSet, ImportIDSet deleteSet) throws DirectoryException
{
+ keyCount.incrementAndGet();
if (indexMgr.isDN2ID())
{
addDN2ID(indexID, insertSet);
@@ -2414,29 +2406,29 @@
private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
{
- indexBuffer.setPosition(-1);
- resetStreams();
-
long bufferLen = 0;
final int numberKeys = indexBuffer.getNumberKeys();
for (int i = 0; i < numberKeys; i++)
{
- if (indexBuffer.getPosition() == -1)
+ if (i == 0)
{
- indexBuffer.setPosition(i);
- insertOrDeleteKey(indexBuffer, i);
- continue;
- }
- if (!indexBuffer.byteArraysEqual(i))
- {
- bufferLen += writeRecord(indexBuffer);
+ // first record, initialize all
indexBuffer.setPosition(i);
resetStreams();
}
- insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
+ else if (!indexBuffer.sameKeyAndIndexID(i))
+ {
+ // this is a new record, save previous record ...
+ bufferLen += writeRecord(indexBuffer);
+ // ... and reinitialize all
+ indexBuffer.setPosition(i);
+ resetStreams();
+ }
+ appendNextEntryIDToStream(indexBuffer, i);
}
- if (indexBuffer.getPosition() != -1)
+ if (numberKeys > 0)
{
+ // save the last record
bufferLen += writeRecord(indexBuffer);
}
return bufferLen;
@@ -2466,24 +2458,21 @@
while (!indexSortedSet.isEmpty())
{
final IndexOutputBuffer b = indexSortedSet.pollFirst();
- if (saveKey == null)
+ if (!b.sameKeyAndIndexID(saveKey, saveIndexID))
{
+ if (saveKey != null)
+ {
+ // save the previous record
+ bufferLen += writeRecord(saveKey, saveIndexID);
+ resetStreams();
+ }
+ // this is a new record, reinitialize all
saveKey = b.getKey();
saveIndexID = b.getIndexID();
- insertOrDeleteKey(b, b.getPosition());
}
- else if (!b.recordsEqual(saveKey, saveIndexID))
- {
- bufferLen += writeRecord(saveKey, saveIndexID);
- resetStreams();
- saveKey = b.getKey();
- saveIndexID = b.getIndexID();
- insertOrDeleteKey(b, b.getPosition());
- }
- else
- {
- insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
- }
+
+ appendNextEntryIDToStream(b, b.getPosition());
+
if (b.hasMoreData())
{
b.nextRecord();
@@ -2505,28 +2494,16 @@
deleteKeyCount = 0;
}
- private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int position)
+ private void appendNextEntryIDToStream(IndexOutputBuffer indexBuffer, int position)
{
if (indexBuffer.isInsertRecord(position))
{
- indexBuffer.writeEntryID(insertByteStream, position);
- insertKeyCount++;
- }
- else
- {
- indexBuffer.writeEntryID(deleteByteStream, position);
- deleteKeyCount++;
- }
- }
-
- private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int position)
- {
- if (indexBuffer.isInsertRecord(position))
- {
- if (insertKeyCount++ <= indexMgr.getLimit())
+ if (insertKeyCount++ <= indexMgr.getIndexEntryLimit())
{
indexBuffer.writeEntryID(insertByteStream, position);
}
+ // else do not bother appending, this value will not be read.
+ // instead, a special value will be written to show the index entry limit is exceeded
}
else
{
@@ -2537,8 +2514,9 @@
private int writeByteStreams() throws IOException
{
- if (insertKeyCount > indexMgr.getLimit())
+ if (insertKeyCount > indexMgr.getIndexEntryLimit())
{
+ // special handling when index entry limit has been exceeded
insertKeyCount = 1;
insertByteStream.reset();
insertByteStream.write(-1);
@@ -2671,21 +2649,21 @@
private final String bufferFileName;
private final File bufferIndexFile;
private final boolean isDN2ID;
- private final int limit;
+ private final int indexEntryLimit;
private int numberOfBuffers;
private long bufferFileSize;
private long totalDNs;
private volatile IndexDBWriteTask writer;
- private IndexManager(String fileName, boolean isDN2ID, int limit)
+ private IndexManager(String fileName, boolean isDN2ID, int indexEntryLimit)
{
this.bufferFileName = fileName;
this.bufferFile = new File(tempDir, bufferFileName);
this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
this.isDN2ID = isDN2ID;
- this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
+ this.indexEntryLimit = indexEntryLimit > 0 ? indexEntryLimit : Integer.MAX_VALUE;
}
private void setIndexDBWriteTask(IndexDBWriteTask writer)
@@ -2761,9 +2739,9 @@
return bufferFileName;
}
- private int getLimit()
+ private int getIndexEntryLimit()
{
- return limit;
+ return indexEntryLimit;
}
/** {@inheritDoc} */
--
Gitblit v1.10.0