From db6c2473d37efac054dd9d0d4c097da3942e4dff Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 26 Jun 2013 13:01:46 +0000
Subject: [PATCH] OPENDJ-746 ArrayIndexOutOfBoundsException during OnlineImport / Replication Initialization
---
opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java | 140 ++++++++++++++++++----------
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 104 +++++++++++---------
2 files changed, 148 insertions(+), 96 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index efc5095..7241925 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -25,15 +25,13 @@
* Copyright 2008-2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2013 ForgeRock AS
*/
-
package org.opends.server.backends.jeb.importLDIF;
import static org.opends.messages.JebMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.util.DynamicConstants.BUILD_ID;
-import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
-import static org.opends.server.util.StaticUtils.getFileForPath;
+import static org.opends.server.util.StaticUtils.*;
import java.io.*;
import java.nio.ByteBuffer;
@@ -87,122 +85,133 @@
+ MAX_DB_LOG_SIZE;
private static final int BYTE_BUFFER_CAPACITY = 128;
- //Min and MAX sizes of phase one buffer.
+ /** Max size of phase one buffer. */
private static final int MAX_BUFFER_SIZE = 2 * MB;
+ /** Min size of phase one buffer. */
private static final int MIN_BUFFER_SIZE = 4 * KB;
- //Min size of phase two read-ahead cache.
+ /** Min size of phase two read-ahead cache. */
private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
- //Small heap threshold used to give more memory to JVM to attempt OOM errors.
+ /**
+ * Small heap threshold used to give more memory to JVM to attempt OOM errors.
+ */
private static final int SMALL_HEAP_SIZE = 256 * MB;
- //The DN attribute type.
+ /** The DN attribute type. */
private static AttributeType dnType;
static final IndexOutputBuffer.IndexComparator indexComparator =
new IndexOutputBuffer.IndexComparator();
- //Phase one buffer and imported entries counts.
+ /** Phase one buffer count. */
private final AtomicInteger bufferCount = new AtomicInteger(0);
+ /** Phase one imported entries count. */
private final AtomicLong importCount = new AtomicLong(0);
- //Phase one buffer size in bytes.
+ /** Phase one buffer size in bytes. */
private int bufferSize;
- //Temp scratch directory.
+ /** Temp scratch directory. */
private final File tempDir;
- //Index and thread counts.
+ /** Index count. */
private final int indexCount;
+ /** Thread count. */
private int threadCount;
- //Set to true when validation is skipped.
+ /** Set to true when validation is skipped. */
private final boolean skipDNValidation;
- //Temporary environment used when DN validation is done in first phase.
+ /** Temporary environment used when DN validation is done in first phase. */
private final TmpEnv tmpEnv;
- //Root container.
+ /** Root container. */
private RootContainer rootContainer;
- //Import configuration.
+ /** Import configuration. */
private final LDIFImportConfig importConfiguration;
- //Backend configuration.
+ /** Backend configuration. */
private final LocalDBBackendCfg backendConfiguration;
- //LDIF reader.
+ /** LDIF reader. */
private LDIFReader reader;
- //Migrated entry count.
+ /** Migrated entry count. */
private int migratedCount;
- // Size in bytes of temporary env.
+ /** Size in bytes of temporary env. */
private long tmpEnvCacheSize;
- // Available memory at the start of the import.
+ /** Available memory at the start of the import. */
private long availableMemory;
- // Size in bytes of DB cache.
+ /** Size in bytes of DB cache. */
private long dbCacheSize;
- //The executor service used for the buffer sort tasks.
+ /** The executor service used for the buffer sort tasks. */
private ExecutorService bufferSortService;
- //The executor service used for the scratch file processing tasks.
+ /** The executor service used for the scratch file processing tasks. */
private ExecutorService scratchFileWriterService;
- //Queue of free index buffers -- used to re-cycle index buffers;
+ /** Queue of free index buffers -- used to re-cycle index buffers. */
private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
new LinkedBlockingQueue<IndexOutputBuffer>();
- //Map of index keys to index buffers. Used to allocate sorted
- //index buffers to a index writer thread.
+ /**
+ * Map of index keys to index buffers. Used to allocate sorted index buffers
+ * to a index writer thread.
+ */
private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap =
new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
- //Map of DB containers to index managers. Used to start phase 2.
+ /** Map of DB containers to index managers. Used to start phase 2. */
private final List<IndexManager> indexMgrList =
new LinkedList<IndexManager>();
- //Map of DB containers to DN-based index managers. Used to start phase 2.
+ /** Map of DB containers to DN-based index managers. Used to start phase 2. */
private final List<IndexManager> DNIndexMgrList =
new LinkedList<IndexManager>();
- //Futures used to indicate when the index file writers are done flushing
- //their work queues and have exited. End of phase one.
+ /**
+ * Futures used to indicate when the index file writers are done flushing
+ * their work queues and have exited. End of phase one.
+ */
private final List<Future<?>> scratchFileWriterFutures;
- //List of index file writer tasks. Used to signal stopScratchFileWriters to
- //the index file writer tasks when the LDIF file has been done.
+ /**
+ * List of index file writer tasks. Used to signal stopScratchFileWriters to
+ * the index file writer tasks when the LDIF file has been done.
+ */
private final List<ScratchFileWriterTask> scratchFileWriterList;
- //Map of DNs to Suffix objects.
+ /** Map of DNs to Suffix objects. */
private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
- //Map of container ids to database containers.
+ /** Map of container ids to database containers. */
private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap =
new ConcurrentHashMap<Integer, DatabaseContainer>();
- //Map of container ids to entry containers
+ /** Map of container ids to entry containers. */
private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
new ConcurrentHashMap<Integer, EntryContainer>();
- //Used to synchronize when a scratch file index writer is first setup.
+ /** Used to synchronize when a scratch file index writer is first setup. */
private final Object synObj = new Object();
- //Rebuild index manager used when rebuilding indexes.
+ /** Rebuild index manager used when rebuilding indexes. */
private final RebuildIndexManager rebuildManager;
- //Set to true if the backend was cleared.
+ /** Set to true if the backend was cleared. */
private boolean clearedBackend = false;
- //Used to shutdown import if an error occurs in phase one.
+ /** Used to shutdown import if an error occurs in phase one. */
private volatile boolean isCanceled = false;
private volatile boolean isPhaseOneDone = false;
- //Number of phase one buffers
+ /** Number of phase one buffers. */
private int phaseOneBufferCount;
static
@@ -624,8 +633,10 @@
}
}
- //Mainly used to support multiple suffixes. Each index in each suffix gets
- //an unique ID to identify which DB it needs to go to in phase two processing.
+ /**
+ * Mainly used to support multiple suffixes. Each index in each suffix gets an
+ * unique ID to identify which DB it needs to go to in phase two processing.
+ */
private void generateIndexID(Suffix suffix)
{
for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
@@ -728,7 +739,6 @@
* There are no branches in the explicitly defined include list under
* this base DN. Skip this base DN all together.
*/
-
return null;
}
@@ -1792,9 +1802,11 @@
}
else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
{
+ // complete the current buffer...
indexBuffer.setComparator(comparator);
indexBuffer.setIndexKey(indexKey);
bufferSortService.submit(new SortTask(indexBuffer));
+ // ... and get a new one
indexBuffer = getNewIndexBuffer(sizeNeeded);
indexBufferMap.put(indexKey, indexBuffer);
}
@@ -4102,7 +4114,7 @@
}
catch (DatabaseException e)
{
-
+ // Unlikely to happen and not critical.
}
previousProcessed = entriesProcessed;
previousTime = latestTime;
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
index 1b46232..fb15fa2 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
@@ -25,92 +25,116 @@
* Copyright 2009-2010 Sun Microsystems, Inc.
* Portions Copyright 2013 ForgeRock AS.
*/
-
-
package org.opends.server.backends.jeb.importLDIF;
+import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
-import org.opends.server.backends.jeb.*;
-import com.sleepycat.util.PackedInteger;
+import org.opends.server.backends.jeb.EntryID;
+import com.sleepycat.util.PackedInteger;
/**
* This class represents a index buffer used to store the keys and entry IDs
* processed from the LDIF file during phase one of an import, or rebuild index
* process. Each key and ID is stored in a record in the buffer.
- *
+ * <p>
* The records in the buffer are eventually sorted, based on the key, when the
* maximum size of the buffer is reached and no more records will fit into the
- * buffer. The buffer is the scheduled to be flushed to an indexes scratch file
- * and then re-cycled by the import, or rebuild-index process.
+ * buffer. The buffer is scheduled to be flushed to an index scratch file and
+ * then re-cycled by the import, or rebuild-index process.
+ * </p>
+ * <p>
+ * The structure of a record in the buffer is the following:
*
- * The records are packed as much as possible, to optimize the buffer space.
+ * <pre>
+ * +-------------+-------------+---------+---------+------------+-----------+
+ * | record size | INS/DEL bit | indexID | entryID | key length | key bytes |
+ * +-------------+-------------+---------+---------+------------+-----------+
+ * </pre>
+ *
+ * The record size is used for fast seeks to quickly "jump" over records.
+ * </p>
+ * <p>
+ * The records are packed as much as possible, to optimize the buffer space.<br>
* This class is not thread safe.
- *
+ * </p>
*/
public final class IndexOutputBuffer implements Comparable<IndexOutputBuffer> {
/**
- * Enumeration used when sorting a buffer.
- */
+ * Enumeration used when sorting a buffer.
+ */
private enum CompareOp {
LT, GT, LE, GE, EQ
}
- //The record over head.
- private static final int REC_OVERHEAD = 5;
-
- //The size of int.
+ /** The size of a Java int. A Java int is 32 bits, i.e. 4 bytes. */
private static final int INT_SIZE = 4;
- //Buffer records are either insert records or delete records.
+ /**
+ * The record overhead. In addition to entryID, key length and key bytes, the
+ * record overhead includes the indexID + INS/DEL bit
+ */
+ private static final int REC_OVERHEAD = INT_SIZE + 1;
+
+ /** Buffer records are either insert records or delete records. */
private static final byte DEL = 0, INS = 1;
- //The size of a buffer.
+ /** The size of a buffer. */
private final int size;
- //Byte array holding the actual buffer data.
+ /** Byte array holding the actual buffer data. */
private final byte buffer[];
- //id is used to break a tie (keys equal) when the buffers are being merged
- //for writing to the index scratch file.
+ /**
+ * id is used to break a tie (keys equal) when the buffers are being merged
+ * for writing to the index scratch file.
+ */
private long id;
- //Temporary buffer used to store integer values.
+ /** Temporary buffer used to store integer values. */
private final byte[] intBytes = new byte[INT_SIZE];
- /*
- keyOffset - offSet where next key is written
- recordOffset- offSet where next value record is written
- bytesLeft - amount of bytes left in the buffer
- */
- private int keyOffset =0, recordOffset=0, bytesLeft = 0;
+ /** keyOffset - offSet where next key is written. */
+ private int keyOffset = 0;
+ /** recordOffset- offSet where next value record is written. */
+ private int recordOffset = 0;
+ /** bytesLeft - amount of bytes left in the buffer. */
+ private int bytesLeft = 0;
- //keys - number of keys in the buffer
- //position - used to iterate over the buffer when writing to a scratch file.
- private int keys = 0, position = 0;
+ /** keys - number of keys in the buffer. */
+ private int keys = 0;
+ /**
+ * position - used to iterate over the buffer when writing to a scratch file.
+ */
+ private int position = 0;
- //The comparator to use sort the keys.
+ /** The comparator to use sort the keys. */
private ComparatorBuffer<byte[]> comparator;
- //This is used to make sure that an instance of this class is put on the
- //correct scratch file writer work queue for processing.
+ /**
+ * This is used to make sure that an instance of this class is put on the
+ * correct scratch file writer work queue for processing.
+ */
private Importer.IndexKey indexKey;
- //Initial capacity of re-usable buffer used in key compares.
+ /** Initial capacity of re-usable buffer used in key compares. */
private static final int CAP = 32;
- //This buffer is reused during key compares. It's main purpose is to keep
- //memory footprint as small as possible.
+ /**
+ * This buffer is reused during key compares. It's main purpose is to keep
+ * memory footprint as small as possible.
+ */
private ByteBuffer keyBuffer = ByteBuffer.allocate(CAP);
- //Set to {@code true} if the buffer should not be recycled. Used when the
- //importer/rebuild index process is doing phase one cleanup and flushing
- //buffers not completed.
+ /**
+ * Set to {@code true} if the buffer should not be recycled. Used when the
+ * importer/rebuild index process is doing phase one cleanup and flushing
+ * buffers not completed.
+ */
private boolean discard = false;
@@ -173,7 +197,7 @@
*/
public boolean isPoison()
{
- return (size == 0);
+ return size == 0;
}
@@ -208,10 +232,9 @@
* buffer, or {@code false} otherwise.
*/
public boolean isSpaceAvailable(byte[] kBytes, long id) {
- return (getRecordSize(kBytes.length, id) + INT_SIZE) < bytesLeft;
+ return getRequiredSize(kBytes.length, id) < bytesLeft;
}
-
/**
* Set the comparator to be used in the buffer processing to the specified
* comparator.
@@ -262,10 +285,12 @@
* @param indexID The index ID the record belongs.
* @param insert <CODE>True</CODE> if key is an insert, false otherwise.
*/
-
public void add(byte[] keyBytes, EntryID entryID, int indexID,
boolean insert) {
+ // write the record data, but leave the space to write the record size just
+ // before it
recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert);
+ // then write the returned record size
System.arraycopy(getIntBytes(recordOffset), 0, buffer, keyOffset, INT_SIZE);
keyOffset += INT_SIZE;
bytesLeft = recordOffset - keyOffset;
@@ -273,16 +298,24 @@
}
+ /**
+ * Writes the full record minus the record size itself.
+ */
private int addRecord(byte[]key, long id, int indexID, boolean insert)
{
int retOffset = recordOffset - getRecordSize(key.length, id);
int offSet = retOffset;
+ // write the INS/DEL bit
buffer[offSet++] = insert ? INS : DEL;
+ // write the indexID
System.arraycopy(getIntBytes(indexID), 0, buffer, offSet, INT_SIZE);
offSet += INT_SIZE;
+ // write the entryID
offSet = PackedInteger.writeLong(buffer, offSet, id);
+ // write the key length
offSet = PackedInteger.writeInt(buffer, offSet, key.length);
+ // write the key bytes
System.arraycopy(key, 0, buffer, offSet, key.length);
return retOffset;
}
@@ -297,13 +330,16 @@
*/
public static int getRequiredSize(int keyLen, long id)
{
- return PackedInteger.getWriteIntLength(keyLen) + keyLen +
- PackedInteger.getWriteLongLength(id) + REC_OVERHEAD + INT_SIZE;
+ // Adds up the key length + key bytes + entryID + indexID + the INS/DEL bit
+ // and finally the space needed to store the record size
+ return getRecordSize(keyLen, id) + INT_SIZE;
}
- private int getRecordSize(int keyLen, long id)
+ private static int getRecordSize(int keyLen, long id)
{
- return PackedInteger.getWriteIntLength(keyLen) + keyLen +
+ // Adds up the key length + key bytes + ...
+ return PackedInteger.getWriteIntLength(keyLen) + keyLen +
+ // ... entryID + (indexID + INS/DEL bit).
PackedInteger.getWriteLongLength(id) + REC_OVERHEAD;
}
@@ -363,7 +399,7 @@
return getKey(position);
}
- //Used to minimized memory usage when comparing keys.
+ /** Used to minimized memory usage when comparing keys. */
private ByteBuffer getKeyBuf(int x)
{
keyBuffer.clear();
@@ -489,6 +525,7 @@
* @return 0 if the buffers are equal, -1 if the current buffer is less
* than the specified buffer, or 1 if it is greater.
*/
+ @Override
public int compareTo(IndexOutputBuffer b)
{
ByteBuffer keyBuf = b.getKeyBuf(b.position);
@@ -693,7 +730,7 @@
private void swap(int a, int b)
{
- int aOffset = a * INT_SIZE;
+ int aOffset = a * INT_SIZE;
int bOffset = b * INT_SIZE;
int bVal = getIntegerValue(bOffset);
System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE);
@@ -818,6 +855,7 @@
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second.
*/
+ @Override
public int compare(byte[] b, int offset, int length, int indexID,
int otherOffset, int otherLength, int otherIndexID)
{
@@ -876,6 +914,7 @@
* offset value is less than, equal to, or greater than the second
* byte array.
*/
+ @Override
public int compare(byte[] b, int offset, int length, int indexID,
byte[] other, int otherLength, int otherIndexID)
{
@@ -931,6 +970,7 @@
* offset value is less than, equal to, or greater than the second
* byte array.
*/
+ @Override
public int compare(byte[] b, int offset, int length, byte[] other,
int otherLength)
{
--
Gitblit v1.10.0