From 7bdd54b23c2ba2f1facb885f312f6d55ce9549e6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Jan 2015 13:32:42 +0000
Subject: [PATCH] Code cleanup in jeb importer: - extracted methods - renamed methods - cleaned up comments/javadocs
---
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 578 ++++++++++++++++++-------------------
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java | 281 ++++++++----------
2 files changed, 411 insertions(+), 448 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 6610e83..d569381 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -174,7 +174,7 @@
* Map of index keys to index buffers. Used to allocate sorted index buffers
* to a index writer thread.
*/
- private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap =
+ private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap =
new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
/** Map of DB containers to index managers. Used to start phase 2. */
@@ -1009,16 +1009,18 @@
private void phaseOne() throws InterruptedException, ExecutionException
{
initializeIndexBuffers();
- FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
- ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
- timerService.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+
+ final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+ scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
bufferSortService = Executors.newFixedThreadPool(threadCount);
- ExecutorService execService = Executors.newFixedThreadPool(threadCount);
- List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
+ final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
+
+ final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
tasks.add(new MigrateExistingTask());
getAll(execService.invokeAll(tasks));
tasks.clear();
+
if (importConfiguration.appendToExistingData()
&& importConfiguration.replaceExistingEntries())
{
@@ -1036,49 +1038,60 @@
}
getAll(execService.invokeAll(tasks));
tasks.clear();
+
tasks.add(new MigrateExcludedTask());
getAll(execService.invokeAll(tasks));
+
stopScratchFileWriters();
getAll(scratchFileWriterFutures);
- // Shutdown the executor services
- timerService.shutdown();
- timerService.awaitTermination(30, TimeUnit.SECONDS);
- execService.shutdown();
- execService.awaitTermination(30, TimeUnit.SECONDS);
- bufferSortService.shutdown();
- bufferSortService.awaitTermination(30, TimeUnit.SECONDS);
- scratchFileWriterService.shutdown();
- scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS);
+ shutdownAll(timerService, execService, bufferSortService, scratchFileWriterService);
// Try to clear as much memory as possible.
- scratchFileWriterList.clear();
- scratchFileWriterFutures.clear();
- indexKeyQueMap.clear();
- freeBufferQueue.clear();
+ clearAll(scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
+ indexKeyQueueMap.clear();
+ }
+
+ private void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
+ {
+ timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ private void shutdownAll(ExecutorService... executorServices) throws InterruptedException
+ {
+ for (ExecutorService executorService : executorServices)
+ {
+ executorService.shutdown();
+ }
+ for (ExecutorService executorService : executorServices)
+ {
+ executorService.awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ private void clearAll(Collection<?>... cols)
+ {
+ for (Collection<?> col : cols)
+ {
+ col.clear();
+ }
}
private void phaseTwo() throws InterruptedException, ExecutionException
{
- SecondPhaseProgressTask progress2Task =
- new SecondPhaseProgressTask(reader.getEntriesRead());
- ScheduledThreadPoolExecutor timerService =
- new ScheduledThreadPoolExecutor(1);
- timerService.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL,
- TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+ ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+ scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
try
{
processIndexFiles();
}
finally
{
- timerService.shutdown();
- timerService.awaitTermination(30, TimeUnit.SECONDS);
+ shutdownAll(timerService);
}
}
- private void processIndexFiles() throws InterruptedException,
- ExecutionException
+ private void processIndexFiles() throws InterruptedException, ExecutionException
{
if (bufferCount.get() == 0)
{
@@ -1096,15 +1109,15 @@
int buffers;
while (true)
{
- final List<IndexManager> totList = new ArrayList<IndexManager>(DNIndexMgrList);
- totList.addAll(indexMgrList);
- Collections.sort(totList, Collections.reverseOrder());
+ final List<IndexManager> allIndexMgrs = new ArrayList<IndexManager>(DNIndexMgrList);
+ allIndexMgrs.addAll(indexMgrList);
+ Collections.sort(allIndexMgrs, Collections.reverseOrder());
buffers = 0;
- final int limit = Math.min(dbThreads, totList.size());
+ final int limit = Math.min(dbThreads, allIndexMgrs.size());
for (int i = 0; i < limit; i++)
{
- buffers += totList.get(i).numberOfBuffers;
+ buffers += allIndexMgrs.get(i).numberOfBuffers;
}
readAheadSize = (int) (usableMemory / buffers);
@@ -1135,7 +1148,7 @@
}
}
- // Ensure that there are always two threads available for parallel
+ // Ensure that there are minimum two threads available for parallel
// processing of smaller indexes.
dbThreads = Math.max(2, dbThreads);
@@ -1147,17 +1160,19 @@
Semaphore permits = new Semaphore(buffers);
// Start DN processing first.
- for (IndexManager dnMgr : DNIndexMgrList)
- {
- futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits, buffers, readAheadSize)));
- }
- for (IndexManager mgr : indexMgrList)
- {
- futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers, readAheadSize)));
- }
+ submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
getAll(futures);
+ shutdownAll(dbService);
+ }
- dbService.shutdown();
+ private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService, Semaphore permits,
+ int buffers, int readAheadSize, List<Future<Void>> futures)
+ {
+ for (IndexManager indexMgr : indexMgrs)
+ {
+ futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, permits, buffers, readAheadSize)));
+ }
}
private <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
@@ -1170,16 +1185,14 @@
private void stopScratchFileWriters()
{
- IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
+ final IndexOutputBuffer stopProcessing = IndexOutputBuffer.poison();
for (ScratchFileWriterTask task : scratchFileWriterList)
{
- task.queue.add(indexBuffer);
+ task.queue.add(stopProcessing);
}
}
- /**
- * Task used to migrate excluded branch.
- */
+ /** Task used to migrate excluded branch. */
private final class MigrateExcludedTask extends ImportTask
{
@@ -1244,9 +1257,7 @@
}
}
- /**
- * Task to migrate existing entries.
- */
+ /** Task to migrate existing entries. */
private final class MigrateExistingTask extends ImportTask
{
@@ -1363,8 +1374,7 @@
{
if (importConfiguration.isCancelled() || isCanceled)
{
- IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
- freeBufferQueue.add(indexBuffer);
+ freeBufferQueue.add(IndexOutputBuffer.poison());
return null;
}
oldEntry = null;
@@ -1480,8 +1490,7 @@
{
if (importConfiguration.isCancelled() || isCanceled)
{
- IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
- freeBufferQueue.add(indexBuffer);
+ freeBufferQueue.add(IndexOutputBuffer.poison());
return null;
}
Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
@@ -1639,7 +1648,7 @@
setIterator.remove();
indexBuffer.setComparator(indexComparator);
indexBuffer.setIndexKey(indexKey);
- indexBuffer.setDiscard();
+ indexBuffer.discard();
Future<Void> future = bufferSortService.submit(new SortTask(indexBuffer));
future.get();
}
@@ -1677,7 +1686,7 @@
if (size > bufferSize)
{
indexBuffer = new IndexOutputBuffer(size);
- indexBuffer.setDiscard();
+ indexBuffer.discard();
}
else
{
@@ -2378,16 +2387,14 @@
private final int DRAIN_TO = 3;
private final IndexManager indexMgr;
private final BlockingQueue<IndexOutputBuffer> queue;
- private final ByteArrayOutputStream insertByteStream =
- new ByteArrayOutputStream(2 * bufferSize);
- private final ByteArrayOutputStream deleteByteStream =
- new ByteArrayOutputStream(2 * bufferSize);
+ private final ByteArrayOutputStream insertByteStream = new ByteArrayOutputStream(2 * bufferSize);
+ private final ByteArrayOutputStream deleteByteStream = new ByteArrayOutputStream(2 * bufferSize);
private final DataOutputStream bufferStream;
private final DataOutputStream bufferIndexStream;
private final byte[] tmpArray = new byte[8];
+ private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<IndexOutputBuffer>();
private int insertKeyCount, deleteKeyCount;
private int bufferCount;
- private final SortedSet<IndexOutputBuffer> indexSortedSet;
private boolean poisonSeen;
public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue,
@@ -2395,13 +2402,13 @@
{
this.queue = queue;
this.indexMgr = indexMgr;
- this.bufferStream =
- new DataOutputStream(new BufferedOutputStream(new FileOutputStream(
- indexMgr.getBufferFile()), READER_WRITER_BUFFER_SIZE));
- this.bufferIndexStream =
- new DataOutputStream(new BufferedOutputStream(new FileOutputStream(
- indexMgr.getBufferIndexFile()), READER_WRITER_BUFFER_SIZE));
- this.indexSortedSet = new TreeSet<IndexOutputBuffer>();
+ this.bufferStream = newDataOutputStream(indexMgr.getBufferFile());
+ this.bufferIndexStream = newDataOutputStream(indexMgr.getBufferIndexFile());
+ }
+
+ private DataOutputStream newDataOutputStream(File file) throws FileNotFoundException
+ {
+ return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), READER_WRITER_BUFFER_SIZE));
}
/** {@inheritDoc} */
@@ -2424,7 +2431,7 @@
bufferLen = writeIndexBuffers(l);
for (IndexOutputBuffer id : l)
{
- if (!id.isDiscard())
+ if (!id.isDiscarded())
{
id.reset();
freeBufferQueue.add(id);
@@ -2439,7 +2446,7 @@
break;
}
bufferLen = writeIndexBuffer(indexBuffer);
- if (!indexBuffer.isDiscard())
+ if (!indexBuffer.isDiscarded())
{
indexBuffer.reset();
freeBufferQueue.add(indexBuffer);
@@ -2475,54 +2482,28 @@
return null;
}
- private long writeIndexBuffer(IndexOutputBuffer indexBuffer)
- throws IOException
+ private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
{
- int numberKeys = indexBuffer.getNumberKeys();
indexBuffer.setPosition(-1);
+ resetStreams();
+
long bufferLen = 0;
- insertByteStream.reset();
- insertKeyCount = 0;
- deleteByteStream.reset();
- deleteKeyCount = 0;
+ final int numberKeys = indexBuffer.getNumberKeys();
for (int i = 0; i < numberKeys; i++)
{
if (indexBuffer.getPosition() == -1)
{
indexBuffer.setPosition(i);
- if (indexBuffer.isInsert(i))
- {
- indexBuffer.writeID(insertByteStream, i);
- insertKeyCount++;
- }
- else
- {
- indexBuffer.writeID(deleteByteStream, i);
- deleteKeyCount++;
- }
+ insertOrDeleteKey(indexBuffer, i);
continue;
}
if (!indexBuffer.compare(i))
{
bufferLen += writeRecord(indexBuffer);
indexBuffer.setPosition(i);
- insertByteStream.reset();
- insertKeyCount = 0;
- deleteByteStream.reset();
- deleteKeyCount = 0;
+ resetStreams();
}
- if (indexBuffer.isInsert(i))
- {
- if (insertKeyCount++ <= indexMgr.getLimit())
- {
- indexBuffer.writeID(insertByteStream, i);
- }
- }
- else
- {
- indexBuffer.writeID(deleteByteStream, i);
- deleteKeyCount++;
- }
+ insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
}
if (indexBuffer.getPosition() != -1)
{
@@ -2531,15 +2512,12 @@
return bufferLen;
}
- private long writeIndexBuffers(List<IndexOutputBuffer> buffers)
- throws IOException
+ private long writeIndexBuffers(List<IndexOutputBuffer> buffers) throws IOException
{
+ resetStreams();
+
long id = 0;
long bufferLen = 0;
- insertByteStream.reset();
- insertKeyCount = 0;
- deleteByteStream.reset();
- deleteKeyCount = 0;
for (IndexOutputBuffer b : buffers)
{
if (b.isPoison())
@@ -2557,58 +2535,28 @@
int saveIndexID = 0;
while (!indexSortedSet.isEmpty())
{
- IndexOutputBuffer b = indexSortedSet.first();
- indexSortedSet.remove(b);
+ final IndexOutputBuffer b = indexSortedSet.pollFirst();
if (saveKey == null)
{
saveKey = b.getKey();
saveIndexID = b.getIndexID();
- if (b.isInsert(b.getPosition()))
- {
- b.writeID(insertByteStream, b.getPosition());
- insertKeyCount++;
- }
- else
- {
- b.writeID(deleteByteStream, b.getPosition());
- deleteKeyCount++;
- }
+ insertOrDeleteKey(b, b.getPosition());
}
else if (!b.compare(saveKey, saveIndexID))
{
bufferLen += writeRecord(saveKey, saveIndexID);
- insertByteStream.reset();
- deleteByteStream.reset();
- insertKeyCount = 0;
- deleteKeyCount = 0;
+ resetStreams();
saveKey = b.getKey();
saveIndexID = b.getIndexID();
- if (b.isInsert(b.getPosition()))
- {
- b.writeID(insertByteStream, b.getPosition());
- insertKeyCount++;
- }
- else
- {
- b.writeID(deleteByteStream, b.getPosition());
- deleteKeyCount++;
- }
- }
- else if (b.isInsert(b.getPosition()))
- {
- if (insertKeyCount++ <= indexMgr.getLimit())
- {
- b.writeID(insertByteStream, b.getPosition());
- }
+ insertOrDeleteKey(b, b.getPosition());
}
else
{
- b.writeID(deleteByteStream, b.getPosition());
- deleteKeyCount++;
+ insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
}
if (b.hasMoreData())
{
- b.getNextRecord();
+ b.nextRecord();
indexSortedSet.add(b);
}
}
@@ -2619,6 +2567,44 @@
return bufferLen;
}
+ private void resetStreams()
+ {
+ insertByteStream.reset();
+ insertKeyCount = 0;
+ deleteByteStream.reset();
+ deleteKeyCount = 0;
+ }
+
+ private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int i)
+ {
+ if (indexBuffer.isInsertRecord(i))
+ {
+ indexBuffer.writeID(insertByteStream, i);
+ insertKeyCount++;
+ }
+ else
+ {
+ indexBuffer.writeID(deleteByteStream, i);
+ deleteKeyCount++;
+ }
+ }
+
+ private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int i)
+ {
+ if (indexBuffer.isInsertRecord(i))
+ {
+ if (insertKeyCount++ <= indexMgr.getLimit())
+ {
+ indexBuffer.writeID(insertByteStream, i);
+ }
+ }
+ else
+ {
+ indexBuffer.writeID(deleteByteStream, i);
+ deleteKeyCount++;
+ }
+ }
+
private int writeByteStreams() throws IOException
{
if (insertKeyCount > indexMgr.getLimit())
@@ -2699,12 +2685,11 @@
return null;
}
indexBuffer.sort();
- if (!indexKeyQueMap.containsKey(indexBuffer.getIndexKey()))
+ if (!indexKeyQueueMap.containsKey(indexBuffer.getIndexKey()))
{
createIndexWriterTask(indexBuffer.getIndexKey());
}
- BlockingQueue<IndexOutputBuffer> q = indexKeyQueMap.get(indexBuffer.getIndexKey());
- q.add(indexBuffer);
+ indexKeyQueueMap.get(indexBuffer.getIndexKey()).add(indexBuffer);
return null;
}
@@ -2712,14 +2697,13 @@
{
synchronized (synObj)
{
- if (indexKeyQueMap.containsKey(indexKey))
+ if (indexKeyQueueMap.containsKey(indexKey))
{
return;
}
- boolean isDN = ImportIndexType.DN.equals(indexKey.getIndexType());
- IndexManager indexMgr = new IndexManager(
- indexKey.getName(), isDN, indexKey.getEntryLimit());
- if (isDN)
+ boolean isDN2ID = ImportIndexType.DN.equals(indexKey.getIndexType());
+ IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN2ID, indexKey.getEntryLimit());
+ if (isDN2ID)
{
DNIndexMgrList.add(indexMgr);
}
@@ -2727,44 +2711,44 @@
{
indexMgrList.add(indexMgr);
}
- BlockingQueue<IndexOutputBuffer> newQue =
+ BlockingQueue<IndexOutputBuffer> newQueue =
new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
- ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQue, indexMgr);
+ ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr);
scratchFileWriterList.add(indexWriter);
scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter));
- indexKeyQueMap.put(indexKey, newQue);
+ indexKeyQueueMap.put(indexKey, newQueue);
}
}
}
/**
- * The index manager class has several functions: 1. It used to carry
- * information about index processing created in phase one to phase two. 2. It
- * collects statistics about phase two processing for each index. 3. It
- * manages opening and closing the scratch index files.
+ * The index manager class has several functions:
+ * <ol>
+ * <li>It is used to carry information about index processing created in phase one to phase two</li>
+ * <li>It collects statistics about phase two processing for each index</li>
+ * <li>It manages opening and closing the scratch index files</li>
+ * </ol>
*/
final class IndexManager implements Comparable<IndexManager>
{
private final File bufferFile;
private final String bufferFileName;
private final File bufferIndexFile;
- private final String bufferIndexFileName;
- private long bufferFileSize;
- private long totalDNS;
- private final boolean isDN;
+ private final boolean isDN2ID;
private final int limit;
+
private int numberOfBuffers;
+ private long bufferFileSize;
+ private long totalDNs;
private volatile IndexDBWriteTask writer;
- private IndexManager(String fileName, boolean isDN, int limit)
+ private IndexManager(String fileName, boolean isDN2ID, int limit)
{
this.bufferFileName = fileName;
- this.bufferIndexFileName = fileName + ".index";
-
this.bufferFile = new File(tempDir, bufferFileName);
- this.bufferIndexFile = new File(tempDir, bufferIndexFileName);
+ this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
- this.isDN = isDN;
+ this.isDN2ID = isDN2ID;
this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
}
@@ -2810,17 +2794,17 @@
private void addTotDNCount(int delta)
{
- totalDNS += delta;
+ totalDNs += delta;
}
private long getDNCount()
{
- return totalDNS;
+ return totalDNs;
}
private boolean isDN2ID()
{
- return isDN;
+ return isDN2ID;
}
private void printStats(long deltaTime)
@@ -3302,13 +3286,10 @@
ExecutionException
{
initializeIndexBuffers();
- RebuildFirstPhaseProgressTask progressTask = new RebuildFirstPhaseProgressTask();
- Timer timer = new Timer();
- timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
+ Timer timer = scheduleAtFixedRate(new RebuildFirstPhaseProgressTask());
scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
bufferSortService = Executors.newFixedThreadPool(threadCount);
- ExecutorService rebuildIndexService =
- Executors.newFixedThreadPool(threadCount);
+ ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount);
List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
for (int i = 0; i < threadCount; i++)
{
@@ -3320,29 +3301,31 @@
getAll(scratchFileWriterFutures);
// Try to clear as much memory as possible.
- rebuildIndexService.shutdown();
- rebuildIndexService.awaitTermination(30, TimeUnit.SECONDS);
- bufferSortService.shutdown();
- bufferSortService.awaitTermination(30, TimeUnit.SECONDS);
- scratchFileWriterService.shutdown();
- scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS);
+ shutdownAll(rebuildIndexService, bufferSortService, scratchFileWriterService);
timer.cancel();
- tasks.clear();
- results.clear();
- scratchFileWriterList.clear();
- scratchFileWriterFutures.clear();
- indexKeyQueMap.clear();
- freeBufferQueue.clear();
+ clearAll(tasks, results, scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
+ indexKeyQueueMap.clear();
}
private void phaseTwo() throws InterruptedException, ExecutionException
{
- SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask(entriesProcessed.get());
- Timer timer2 = new Timer();
- timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
- processIndexFiles();
- timer2.cancel();
+ final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
+ try
+ {
+ processIndexFiles();
+ }
+ finally
+ {
+ timer.cancel();
+ }
+ }
+
+ private Timer scheduleAtFixedRate(TimerTask task)
+ {
+ final Timer timer = new Timer();
+ timer.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL);
+ return timer;
}
private int getIndexCount() throws ConfigException, JebException,
@@ -3365,126 +3348,127 @@
private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
throws JebException, ConfigException, InitializationException
{
- int indexCount = 0;
- List<String> rebuildList = rebuildConfig.getRebuildList();
- if (!rebuildList.isEmpty())
+ final List<String> rebuildList = rebuildConfig.getRebuildList();
+ if (rebuildList.isEmpty())
{
- for (String index : rebuildList)
+ return 0;
+ }
+
+ int indexCount = 0;
+ for (String index : rebuildList)
+ {
+ final String lowerName = index.toLowerCase();
+ if ("dn2id".equals(lowerName))
{
- String lowerName = index.toLowerCase();
- if ("dn2id".equals(lowerName))
+ indexCount += 3;
+ }
+ else if ("dn2uri".equals(lowerName))
+ {
+ indexCount++;
+ }
+ else if (lowerName.startsWith("vlv."))
+ {
+ if (lowerName.length() < 5)
{
- indexCount += 3;
+ throw new JebException(ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName));
}
- else if ("dn2uri".equals(lowerName))
- {
- indexCount++;
- }
- else if (lowerName.startsWith("vlv."))
- {
- if (lowerName.length() < 5)
- {
- LocalizableMessage msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
- throw new JebException(msg);
- }
- indexCount++;
- }
- else if ("id2subtree".equals(lowerName)
- || "id2children".equals(lowerName))
+ indexCount++;
+ }
+ else if ("id2subtree".equals(lowerName)
+ || "id2children".equals(lowerName))
+ {
+ LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new InitializationException(msg);
+ }
+ else
+ {
+ String[] attrIndexParts = lowerName.split("\\.");
+ if (attrIndexParts.length <= 0 || attrIndexParts.length > 3)
{
LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
throw new InitializationException(msg);
}
- else
+ AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
+ if (attrType == null)
{
- String[] attrIndexParts = lowerName.split("\\.");
- if (attrIndexParts.length <= 0 || attrIndexParts.length > 3)
+ LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new InitializationException(msg);
+ }
+ if (attrIndexParts.length != 1)
+ {
+ String indexType = attrIndexParts[1];
+ if (attrIndexParts.length == 2)
{
- LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new InitializationException(msg);
- }
- AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
- if (attrType == null)
- {
- LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new InitializationException(msg);
- }
- if (attrIndexParts.length != 1)
- {
- String indexType = attrIndexParts[1];
- if (attrIndexParts.length == 2)
+ if ("presence".equals(indexType)
+ || "equality".equals(indexType)
+ || "substring".equals(indexType)
+ || "ordering".equals(indexType)
+ || "approximate".equals(indexType))
{
- if ("presence".equals(indexType)
- || "equality".equals(indexType)
- || "substring".equals(indexType)
- || "ordering".equals(indexType)
- || "approximate".equals(indexType))
- {
- indexCount++;
- }
- else
- {
- LocalizableMessage msg =
- ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new InitializationException(msg);
- }
- }
- else
- {
- if (!findExtensibleMatchingRule(cfg, indexType + "." + attrIndexParts[2]))
- {
- LocalizableMessage msg =
- ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new InitializationException(msg);
- }
indexCount++;
}
- }
- else
- {
- boolean found = false;
- for (final String idx : cfg.listLocalDBIndexes())
- {
- if (!idx.equalsIgnoreCase(index))
- {
- continue;
- }
- found = true;
- LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
- SortedSet<IndexType> indexType = indexCfg.getIndexType();
- if (indexType.contains(EQUALITY)
- || indexType.contains(ORDERING)
- || indexType.contains(PRESENCE)
- || indexType.contains(SUBSTRING)
- || indexType.contains(APPROXIMATE))
- {
- indexCount++;
- }
- if (indexType.contains(EXTENSIBLE))
- {
- Set<String> extensibleRules = indexCfg.getIndexExtensibleMatchingRule();
- boolean shared = false;
- for (final String exRule : extensibleRules)
- {
- if (exRule.endsWith(".sub"))
- {
- indexCount++;
- }
- else if (!shared)
- {
- shared = true;
- indexCount++;
- }
- }
- }
- }
- if (!found)
+ else
{
LocalizableMessage msg =
ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
throw new InitializationException(msg);
}
}
+ else
+ {
+ if (!findExtensibleMatchingRule(cfg, indexType + "." + attrIndexParts[2]))
+ {
+ LocalizableMessage msg =
+ ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new InitializationException(msg);
+ }
+ indexCount++;
+ }
+ }
+ else
+ {
+ boolean found = false;
+ for (final String idx : cfg.listLocalDBIndexes())
+ {
+ if (!idx.equalsIgnoreCase(index))
+ {
+ continue;
+ }
+ found = true;
+ LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
+ SortedSet<IndexType> indexType = indexCfg.getIndexType();
+ if (indexType.contains(EQUALITY)
+ || indexType.contains(ORDERING)
+ || indexType.contains(PRESENCE)
+ || indexType.contains(SUBSTRING)
+ || indexType.contains(APPROXIMATE))
+ {
+ indexCount++;
+ }
+ if (indexType.contains(EXTENSIBLE))
+ {
+ Set<String> extensibleRules = indexCfg.getIndexExtensibleMatchingRule();
+ boolean shared = false;
+ for (final String exRule : extensibleRules)
+ {
+ if (exRule.endsWith(".sub"))
+ {
+ indexCount++;
+ }
+ else if (!shared)
+ {
+ shared = true;
+ indexCount++;
+ }
+ }
+ }
+ }
+ if (!found)
+ {
+ LocalizableMessage msg =
+ ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new InitializationException(msg);
+ }
}
}
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
index 4c251cb..e5fdcd8 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2015 ForgeRock AS.
*/
package org.opends.server.backends.jeb.importLDIF;
@@ -63,9 +63,7 @@
*/
public final class IndexOutputBuffer implements Comparable<IndexOutputBuffer> {
- /**
- * Enumeration used when sorting a buffer.
- */
+ /** Enumeration used when sorting a buffer. */
private enum CompareOp {
LT, GT, LE, GE, EQ
}
@@ -84,12 +82,11 @@
/** The size of a buffer. */
private final int size;
-
/** Byte array holding the actual buffer data. */
private final byte buffer[];
/**
- * id is used to break a tie (keys equal) when the buffers are being merged
+ * Used to break a tie (keys equal) when the buffers are being merged
* for writing to the index scratch file.
*/
private long id;
@@ -97,25 +94,22 @@
/** Temporary buffer used to store integer values. */
private final byte[] intBytes = new byte[INT_SIZE];
- /** keyOffset - offSet where next key is written. */
- private int keyOffset = 0;
- /** recordOffset- offSet where next value record is written. */
- private int recordOffset = 0;
- /** bytesLeft - amount of bytes left in the buffer. */
- private int bytesLeft = 0;
-
- /** keys - number of keys in the buffer. */
- private int keys = 0;
- /**
- * position - used to iterate over the buffer when writing to a scratch file.
- */
- private int position = 0;
+ /** OffSet where next key is written. */
+ private int keyOffset;
+ /** OffSet where next value record is written. */
+ private int recordOffset;
+ /** Amount of bytes left in the buffer. */
+ private int bytesLeft;
+ /** Number of keys in the buffer. */
+ private int keys;
+ /** Used to iterate over the buffer when writing to a scratch file. */
+ private int position;
/** The comparator to use sort the keys. */
private ComparatorBuffer<byte[]> comparator;
/**
- * This is used to make sure that an instance of this class is put on the
+ * Used to make sure that an instance of this class is put on the
* correct scratch file writer work queue for processing.
*/
private Importer.IndexKey indexKey;
@@ -134,7 +128,7 @@
* importer/rebuild index process is doing phase one cleanup and flushing
* buffers not completed.
*/
- private boolean discard = false;
+ private boolean discarded;
/**
@@ -163,6 +157,15 @@
indexKey = null;
}
+ /**
+ * Creates a new poison buffer. Poison buffers are used to stop the processing of import tasks.
+ *
+ * @return a new poison buffer
+ */
+ public static IndexOutputBuffer poison()
+ {
+ return new IndexOutputBuffer(0);
+ }
/**
* Set the ID of a buffer to the specified value.
@@ -199,28 +202,25 @@
return size == 0;
}
-
/**
- * Determines of a buffer should be re-cycled.
+ * Determines if buffer should be re-cycled by calling {@link #reset()}.
*
* @return {@code true} if buffer should be recycled, or {@code false} if it
* should not.
*/
- public boolean isDiscard()
+ public boolean isDiscarded()
{
- return discard;
+ return discarded;
}
-
/**
- * Set the discard flag to {@code true}.
+ * Sets the discarded flag to {@code true}.
*/
- public void setDiscard()
+ public void discard()
{
- discard = true;
+ discarded = true;
}
-
/**
* Returns {@code true} if there is enough space available to write the
* specified byte array in the buffer. It returns {@code false} otherwise.
@@ -367,7 +367,7 @@
* @return {@code true} if the record is an insert record, or {@code false}
* if it is a delete record.
*/
- public boolean isInsert(int index)
+ public boolean isInsertRecord(int index)
{
int recOffset = getIntegerValue(index * INT_SIZE);
return buffer[recOffset] != DEL;
@@ -503,14 +503,8 @@
offset += PackedInteger.getReadIntLength(buffer, offset);
int keyLen = PackedInteger.readInt(buffer, offset);
int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
- if( comparator.compare(buffer, key, keyLen, b, b.length) == 0)
- {
- if(indexID == bIndexID)
- {
- return true;
- }
- }
- return false;
+ return comparator.compare(buffer, key, keyLen, b, b.length) == 0
+ && indexID == bIndexID;
}
@@ -527,46 +521,50 @@
@Override
public int compareTo(IndexOutputBuffer b)
{
- ByteBuffer keyBuf = b.getKeyBuf(b.position);
+ final ByteBuffer keyBuf = b.getKeyBuf(b.position);
int offset = getIntegerValue(position * INT_SIZE);
int indexID = getIntegerValue(offset + 1);
offset += REC_OVERHEAD;
offset += PackedInteger.getReadIntLength(buffer, offset);
int keyLen = PackedInteger.readInt(buffer, offset);
int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
- int returnCode = comparator.compare(buffer, key, keyLen, keyBuf.array(),
- keyBuf.limit());
- if(returnCode == 0)
+
+ final int cmp = comparator.compare(buffer, key, keyLen, keyBuf.array(), keyBuf.limit());
+ if (cmp != 0)
{
- int bIndexID = b.getIndexID();
- if(indexID == bIndexID)
- {
- long otherBufferID = b.getBufferID();
- //This is tested in a tree set remove when a buffer is removed from
- //the tree set.
- if(this.id == otherBufferID)
- {
- returnCode = 0;
- }
- else if(this.id < otherBufferID)
- {
- returnCode = -1;
- }
- else
- {
- returnCode = 1;
- }
- }
- else if(indexID < bIndexID)
- {
- returnCode = -1;
- }
- else
- {
- returnCode = 1;
- }
+ return cmp;
}
- return returnCode;
+
+ final int bIndexID = b.getIndexID();
+ if (indexID == bIndexID)
+ {
+ // This is tested in a tree set remove when a buffer is removed from the tree set.
+ return compare(this.id, b.getBufferID());
+ }
+ else if (indexID < bIndexID)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+
+ private int compare(long l1, long l2)
+ {
+ if (l1 == l2)
+ {
+ return 0;
+ }
+ else if (l1 < l2)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
}
@@ -587,21 +585,18 @@
dataStream.write(buffer, offSet, keyLen);
}
-
- /**
+ /**
* Compare the byte array at the current position with the byte array at the
* specified index.
*
* @param i The index pointing to the byte array to compare.
- * @return {@code true} if the byte arrays are equal, or {@code false}
- * otherwise.
+ * @return {@code true} if the byte arrays are equal, or {@code false} otherwise
*/
public boolean compare(int i)
{
- return is(i, position, CompareOp.EQ);
+ return is(i, position, CompareOp.EQ);
}
-
/**
* Return the current number of keys.
*
@@ -612,7 +607,6 @@
return keys;
}
-
/**
* Return {@code true} if the buffer has more data to process, or
* {@code false} otherwise. Used when iterating over the buffer writing the
@@ -622,20 +616,18 @@
* {@code false} otherwise.
*/
public boolean hasMoreData()
- {
- return (position + 1) < keys;
- }
-
+ {
+ return position + 1 < keys;
+ }
/**
* Advance the position pointer to the next record in the buffer. Used when
* iterating over the buffer examining keys.
*/
- public void getNextRecord()
- {
- position++;
- }
-
+ public void nextRecord()
+ {
+ position++;
+ }
private byte[] getIntBytes(int val)
{
@@ -661,9 +653,9 @@
private int med3(int a, int b, int c)
{
- return (is(a,b, CompareOp.LT) ?
+ return is(a, b, CompareOp.LT) ?
(is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) :
- (is(b,c,CompareOp.GT) ? b :is(a,c,CompareOp.GT) ? c : a));
+ (is(b,c,CompareOp.GT) ? b : is(a,c,CompareOp.GT) ? c : a);
}
@@ -671,8 +663,12 @@
{
if (len < 7) {
for (int i=off; i<len+off; i++)
+ {
for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--)
+ {
swap(j, j-1);
+ }
+ }
return;
}
@@ -695,20 +691,26 @@
int a = off, b = a, c = off + len - 1, d = c;
while(true)
{
- while ((b <= c) && is(b, mKey, CompareOp.LE, mIndexID))
+ while (b <= c && is(b, mKey, CompareOp.LE, mIndexID))
{
if (is(b, mKey, CompareOp.EQ, mIndexID))
+ {
swap(a++, b);
+ }
b++;
}
while (c >= b && is(c, mKey, CompareOp.GE, mIndexID))
{
if (is(c, mKey, CompareOp.EQ, mIndexID))
+ {
swap(c, d--);
+ }
c--;
}
if (b > c)
+ {
break;
+ }
swap(b++, c--);
}
@@ -719,11 +721,17 @@
s = Math.min(d-c, n-d-1);
vectorSwap(b, n-s, s);
+ s = b - a;
// Recursively sort non-partition-elements
- if ((s = b-a) > 1)
+ if (s > 1)
+ {
sort(off, s);
- if ((s = d-c) > 1)
+ }
+ s = d - c;
+ if (s > 1)
+ {
sort(n-s, s);
+ }
}
@@ -740,31 +748,28 @@
private void vectorSwap(int a, int b, int n)
{
for (int i=0; i<n; i++, a++, b++)
+ {
swap(a, b);
+ }
}
private boolean evaluateReturnCode(int rc, CompareOp op)
{
- boolean returnCode = false;
switch(op) {
case LT:
- returnCode = rc < 0;
- break;
+ return rc < 0;
case GT:
- returnCode = rc > 0;
- break;
+ return rc > 0;
case LE:
- returnCode = rc <= 0;
- break;
+ return rc <= 0;
case GE:
- returnCode = rc >= 0;
- break;
+ return rc >= 0;
case EQ:
- returnCode = rc == 0;
- break;
+ return rc == 0;
+ default:
+ return false;
}
- return returnCode;
}
@@ -835,8 +840,7 @@
* Implementation of ComparatorBuffer interface. Used to compare keys when
* they are non-DN indexes.
*/
- public static
- class IndexComparator implements IndexOutputBuffer.ComparatorBuffer<byte[]>
+ public static class IndexComparator implements IndexOutputBuffer.ComparatorBuffer<byte[]>
{
/**
@@ -869,34 +873,9 @@
return -1;
}
}
- //The arrays are equal, make sure they are in the same index since
- //multiple suffixes might have the same key.
- if(length == otherLength)
- {
- if(indexID == otherIndexID)
- {
- return 0;
- }
- else if(indexID > otherIndexID)
- {
- return 1;
- }
- else
- {
- return -1;
- }
- }
- if (length > otherLength)
- {
- return 1;
- }
- else
- {
- return -1;
- }
+ return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
}
-
/**
* Compare an offset in an byte array with the specified byte array,
* using the DN compare algorithm. The specified index ID is used in the
@@ -928,24 +907,20 @@
return -1;
}
}
- //The arrays are equal, make sure they are in the same index since
- //multiple suffixes might have the same key.
- if(length == otherLength)
+ return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
+ }
+
+ /**
+ * The arrays are equal, make sure they are in the same index
+ * since multiple suffixes might have the same key.
+ */
+ private int compareLengthThenIndexID(int length, int indexID, int otherLength, int otherIndexID)
+ {
+ if (length == otherLength)
{
- if(indexID == otherIndexID)
- {
- return 0;
- }
- else if(indexID > otherIndexID)
- {
- return 1;
- }
- else
- {
- return -1;
- }
+ return compare(indexID, otherIndexID);
}
- if (length > otherLength)
+ else if (length > otherLength)
{
return 1;
}
@@ -955,7 +930,6 @@
}
}
-
/**
* Compare an offset in an byte array with the specified byte array,
* using the DN compare algorithm.
@@ -984,11 +958,16 @@
return -1;
}
}
- if(length == otherLength)
+ return compare(length, otherLength);
+ }
+
+ private int compare(int i1, int i2)
+ {
+ if (i1 == i2)
{
return 0;
}
- if (length > otherLength)
+ else if (i1 > i2)
{
return 1;
}
--
Gitblit v1.10.0