mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.01.2013 db6c2473d37efac054dd9d0d4c097da3942e4dff
OPENDJ-746 ArrayIndexOutOfBoundsException during OnlineImport / Replication Initialization

Added comments + turned comments into javadocs.
2 files modified
244 ■■■■■ changed files
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java 104 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java 140 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -25,15 +25,13 @@
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.backends.jeb.importLDIF;
import static org.opends.messages.JebMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.util.DynamicConstants.BUILD_ID;
import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.*;
import java.io.*;
import java.nio.ByteBuffer;
@@ -87,122 +85,133 @@
      + MAX_DB_LOG_SIZE;
  private static final int BYTE_BUFFER_CAPACITY = 128;
  //Min and MAX sizes of phase one buffer.
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  /** Min size of phase one buffer. */
  private static final int MIN_BUFFER_SIZE = 4 * KB;
  //Min size of phase two read-ahead cache.
  /** Min size of phase two read-ahead cache. */
  private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
  //Small heap threshold used to give more memory to JVM to attempt OOM errors.
  /**
   * Small heap threshold used to give more memory to JVM to attempt OOM errors.
   */
  private static final int SMALL_HEAP_SIZE = 256 * MB;
  //The DN attribute type.
  /** The DN attribute type. */
  private static AttributeType dnType;
  static final IndexOutputBuffer.IndexComparator indexComparator =
      new IndexOutputBuffer.IndexComparator();
  //Phase one buffer and imported entries counts.
  /** Phase one buffer count. */
  private final AtomicInteger bufferCount = new AtomicInteger(0);
  /** Phase one imported entries count. */
  private final AtomicLong importCount = new AtomicLong(0);
  //Phase one buffer size in bytes.
  /** Phase one buffer size in bytes. */
  private int bufferSize;
  //Temp scratch directory.
  /** Temp scratch directory. */
  private final File tempDir;
  //Index and thread counts.
  /** Index count. */
  private final int indexCount;
  /** Thread count. */
  private int threadCount;
  //Set to true when validation is skipped.
  /** Set to true when validation is skipped. */
  private final boolean skipDNValidation;
  //Temporary environment used when DN validation is done in first phase.
  /** Temporary environment used when DN validation is done in first phase. */
  private final TmpEnv tmpEnv;
  //Root container.
  /** Root container. */
  private RootContainer rootContainer;
  //Import configuration.
  /** Import configuration. */
  private final LDIFImportConfig importConfiguration;
  //Backend configuration.
  /** Backend configuration. */
  private final LocalDBBackendCfg backendConfiguration;
  //LDIF reader.
  /** LDIF reader. */
  private LDIFReader reader;
  //Migrated entry count.
  /** Migrated entry count. */
  private int migratedCount;
  // Size in bytes of temporary env.
  /** Size in bytes of temporary env. */
  private long tmpEnvCacheSize;
  // Available memory at the start of the import.
  /** Available memory at the start of the import. */
  private long availableMemory;
  // Size in bytes of DB cache.
  /** Size in bytes of DB cache. */
  private long dbCacheSize;
  //The executor service used for the buffer sort tasks.
  /** The executor service used for the buffer sort tasks. */
  private ExecutorService bufferSortService;
  //The executor service used for the scratch file processing tasks.
  /** The executor service used for the scratch file processing tasks. */
  private ExecutorService scratchFileWriterService;
  //Queue of free index buffers -- used to re-cycle index buffers;
  /** Queue of free index buffers -- used to re-cycle index buffers. */
  private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
      new LinkedBlockingQueue<IndexOutputBuffer>();
  //Map of index keys to index buffers.  Used to allocate sorted
  //index buffers to a index writer thread.
  /**
   * Map of index keys to index buffers. Used to allocate sorted index buffers
   * to a index writer thread.
   */
  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap =
      new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
  //Map of DB containers to index managers. Used to start phase 2.
  /** Map of DB containers to index managers. Used to start phase 2. */
  private final List<IndexManager> indexMgrList =
      new LinkedList<IndexManager>();
  //Map of DB containers to DN-based index managers. Used to start phase 2.
  /** Map of DB containers to DN-based index managers. Used to start phase 2. */
  private final List<IndexManager> DNIndexMgrList =
      new LinkedList<IndexManager>();
  //Futures used to indicate when the index file writers are done flushing
  //their work queues and have exited. End of phase one.
  /**
   * Futures used to indicate when the index file writers are done flushing
   * their work queues and have exited. End of phase one.
   */
  private final List<Future<?>> scratchFileWriterFutures;
  //List of index file writer tasks. Used to signal stopScratchFileWriters to
  //the index file writer tasks when the LDIF file has been done.
  /**
   * List of index file writer tasks. Used to signal stopScratchFileWriters to
   * the index file writer tasks when the LDIF file has been done.
   */
  private final List<ScratchFileWriterTask> scratchFileWriterList;
  //Map of DNs to Suffix objects.
  /** Map of DNs to Suffix objects. */
  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
  //Map of container ids to database containers.
  /** Map of container ids to database containers. */
  private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap =
      new ConcurrentHashMap<Integer, DatabaseContainer>();
  //Map of container ids to entry containers
  /** Map of container ids to entry containers. */
  private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
      new ConcurrentHashMap<Integer, EntryContainer>();
  //Used to synchronize when a scratch file index writer is first setup.
  /** Used to synchronize when a scratch file index writer is first setup. */
  private final Object synObj = new Object();
  //Rebuild index manager used when rebuilding indexes.
  /** Rebuild index manager used when rebuilding indexes. */
  private final RebuildIndexManager rebuildManager;
  //Set to true if the backend was cleared.
  /** Set to true if the backend was cleared. */
  private boolean clearedBackend = false;
  //Used to shutdown import if an error occurs in phase one.
  /** Used to shutdown import if an error occurs in phase one. */
  private volatile boolean isCanceled = false;
  private volatile boolean isPhaseOneDone = false;
  //Number of phase one buffers
  /** Number of phase one buffers. */
  private int phaseOneBufferCount;
  static
@@ -624,8 +633,10 @@
    }
  }
  //Mainly used to support multiple suffixes. Each index in each suffix gets
  //an unique ID to identify which DB it needs to go to in phase two processing.
  /**
   * Mainly used to support multiple suffixes. Each index in each suffix gets an
   * unique ID to identify which DB it needs to go to in phase two processing.
   */
  private void generateIndexID(Suffix suffix)
  {
    for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
@@ -728,7 +739,6 @@
           * There are no branches in the explicitly defined include list under
           * this base DN. Skip this base DN all together.
           */
          return null;
        }
@@ -1792,9 +1802,11 @@
      }
      else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
      {
        // complete the current buffer...
        indexBuffer.setComparator(comparator);
        indexBuffer.setIndexKey(indexKey);
        bufferSortService.submit(new SortTask(indexBuffer));
        // ... and get a new one
        indexBuffer = getNewIndexBuffer(sizeNeeded);
        indexBufferMap.put(indexKey, indexBuffer);
      }
@@ -4102,7 +4114,7 @@
      }
      catch (DatabaseException e)
      {
        // Unlikely to happen and not critical.
      }
      previousProcessed = entriesProcessed;
      previousTime = latestTime;
opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
@@ -25,92 +25,116 @@
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 */
package org.opends.server.backends.jeb.importLDIF;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import org.opends.server.backends.jeb.*;
import com.sleepycat.util.PackedInteger;
import org.opends.server.backends.jeb.EntryID;
import com.sleepycat.util.PackedInteger;
/**
 * This class represents a index buffer used to store the keys and entry IDs
 * processed from the LDIF file during phase one of an import, or rebuild index
 * process. Each key and ID is stored in a record in the buffer.
 *
 * <p>
 * The records in the buffer are eventually sorted, based on the key, when the
 * maximum size of the buffer is reached and no more records will fit into the
 * buffer. The buffer is the scheduled to be flushed to an indexes scratch file
 * and then re-cycled by the import, or rebuild-index process.
 * buffer. The buffer is scheduled to be flushed to an index scratch file and
 * then re-cycled by the import, or rebuild-index process.
 * </p>
 * <p>
 * The structure of a record in the buffer is the following:
 *
 * The records are packed as much as possible, to optimize the buffer space.
 * <pre>
 * +-------------+-------------+---------+---------+------------+-----------+
 * | record size | INS/DEL bit | indexID | entryID | key length | key bytes |
 * +-------------+-------------+---------+---------+------------+-----------+
 * </pre>
 *
 * The record size is used for fast seeks to quickly "jump" over records.
 * </p>
 * <p>
 * The records are packed as much as possible, to optimize the buffer space.<br>
 * This class is not thread safe.
 *
 * </p>
 */
public final class IndexOutputBuffer implements Comparable<IndexOutputBuffer> {
  /**
  * Enumeration used when sorting a buffer.
  */
   * Enumeration used when sorting a buffer.
   */
  private enum CompareOp {
    LT, GT, LE, GE, EQ
  }
  //The record over head.
  private static final int REC_OVERHEAD = 5;
  //The size of int.
  /** The size of a Java int. A Java int is 32 bits, i.e. 4 bytes. */
  private static final int INT_SIZE = 4;
  //Buffer records are either insert records or delete records.
  /**
   * The record overhead. In addition to entryID, key length and key bytes, the
   * record overhead includes the indexID + INS/DEL bit
   */
  private static final int REC_OVERHEAD = INT_SIZE + 1;
  /** Buffer records are either insert records or delete records. */
  private static final byte DEL = 0, INS = 1;
  //The size of a buffer.
  /** The size of a buffer. */
  private final int size;
  //Byte array holding the actual buffer data.
  /** Byte array holding the actual buffer data. */
  private final byte buffer[];
  //id is used to break a tie (keys equal) when the buffers are being merged
  //for writing to the index scratch file.
  /**
   * id is used to break a tie (keys equal) when the buffers are being merged
   * for writing to the index scratch file.
   */
  private long id;
  //Temporary buffer used to store integer values.
  /** Temporary buffer used to store integer values. */
  private final byte[] intBytes = new byte[INT_SIZE];
  /*
    keyOffset - offSet where next key is written
    recordOffset- offSet where next value record is written
    bytesLeft - amount of bytes left in the buffer
  */
  private int keyOffset =0, recordOffset=0, bytesLeft = 0;
  /** keyOffset - offSet where next key is written. */
  private int keyOffset = 0;
  /** recordOffset- offSet where next value record is written. */
  private int recordOffset = 0;
  /** bytesLeft - amount of bytes left in the buffer. */
  private int bytesLeft = 0;
  //keys - number of keys in the buffer
  //position - used to iterate over the buffer when writing to a scratch file.
  private int keys = 0, position = 0;
  /** keys - number of keys in the buffer. */
  private int keys = 0;
  /**
   * position - used to iterate over the buffer when writing to a scratch file.
   */
  private int position = 0;
  //The comparator to use sort the keys.
  /** The comparator to use sort the keys. */
  private ComparatorBuffer<byte[]> comparator;
  //This is used to make sure that an instance of this class is put on the
  //correct scratch file writer work queue for processing.
  /**
   * This is used to make sure that an instance of this class is put on the
   * correct scratch file writer work queue for processing.
   */
  private Importer.IndexKey indexKey;
  //Initial capacity of re-usable buffer used in key compares.
  /** Initial capacity of re-usable buffer used in key compares. */
  private static final int CAP = 32;
  //This buffer is reused during key compares. It's main purpose is to keep
  //memory footprint as small as possible.
  /**
   * This buffer is reused during key compares. It's main purpose is to keep
   * memory footprint as small as possible.
   */
  private ByteBuffer keyBuffer = ByteBuffer.allocate(CAP);
  //Set to {@code true} if the buffer should not be recycled. Used when the
  //importer/rebuild index process is doing phase one cleanup and flushing
  //buffers not completed.
  /**
   * Set to {@code true} if the buffer should not be recycled. Used when the
   * importer/rebuild index process is doing phase one cleanup and flushing
   * buffers not completed.
   */
  private boolean discard = false;
@@ -173,7 +197,7 @@
   */
  public boolean isPoison()
  {
    return (size == 0);
    return size == 0;
  }
@@ -208,10 +232,9 @@
   *         buffer, or {@code false} otherwise.
   */
  public boolean isSpaceAvailable(byte[] kBytes, long id) {
    return (getRecordSize(kBytes.length, id) + INT_SIZE) < bytesLeft;
    return getRequiredSize(kBytes.length, id) < bytesLeft;
  }
  /**
   * Set the comparator to be used in the buffer processing to the specified
   * comparator.
@@ -262,10 +285,12 @@
   * @param indexID The index ID the record belongs.
   * @param insert <CODE>True</CODE> if key is an insert, false otherwise.
   */
  public void add(byte[] keyBytes, EntryID entryID, int indexID,
                  boolean insert) {
    // write the record data, but leave the space to write the record size just
    // before it
    recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert);
    // then write the returned record size
    System.arraycopy(getIntBytes(recordOffset), 0, buffer, keyOffset, INT_SIZE);
    keyOffset += INT_SIZE;
    bytesLeft = recordOffset - keyOffset;
@@ -273,16 +298,24 @@
  }
  /**
   * Writes the full record minus the record size itself.
   */
  private int addRecord(byte[]key, long id, int indexID, boolean insert)
  {
     int retOffset = recordOffset - getRecordSize(key.length, id);
     int offSet = retOffset;
     // write the INS/DEL bit
     buffer[offSet++] = insert ? INS : DEL;
     // write the indexID
     System.arraycopy(getIntBytes(indexID), 0, buffer, offSet, INT_SIZE);
     offSet += INT_SIZE;
     // write the entryID
     offSet = PackedInteger.writeLong(buffer, offSet, id);
     // write the key length
     offSet = PackedInteger.writeInt(buffer, offSet, key.length);
     // write the key bytes
     System.arraycopy(key, 0, buffer, offSet, key.length);
     return retOffset;
  }
@@ -297,13 +330,16 @@
   */
  public static int getRequiredSize(int keyLen, long id)
  {
    return PackedInteger.getWriteIntLength(keyLen) +  keyLen +
        PackedInteger.getWriteLongLength(id) + REC_OVERHEAD + INT_SIZE;
    // Adds up the key length + key bytes + entryID + indexID + the INS/DEL bit
    // and finally the space needed to store the record size
    return getRecordSize(keyLen, id) + INT_SIZE;
  }
  private int getRecordSize(int keyLen, long id)
  private static int getRecordSize(int keyLen, long id)
  {
     return PackedInteger.getWriteIntLength(keyLen) +  keyLen +
     // Adds up the key length + key bytes + ...
     return PackedInteger.getWriteIntLength(keyLen) + keyLen +
            // ... entryID + (indexID + INS/DEL bit).
            PackedInteger.getWriteLongLength(id) + REC_OVERHEAD;
  }
@@ -363,7 +399,7 @@
    return getKey(position);
  }
  //Used to minimized memory usage when comparing keys.
  /** Used to minimized memory usage when comparing keys. */
  private ByteBuffer getKeyBuf(int x)
  {
    keyBuffer.clear();
@@ -489,6 +525,7 @@
   * @return  0 if the buffers are equal, -1 if the current buffer is less
   *          than the specified buffer, or 1 if it is greater.
   */
  @Override
  public int compareTo(IndexOutputBuffer b)
  {
    ByteBuffer keyBuf = b.getKeyBuf(b.position);
@@ -693,7 +730,7 @@
  private void swap(int a, int b)
  {
    int aOffset = a * INT_SIZE;
     int aOffset = a * INT_SIZE;
     int bOffset = b * INT_SIZE;
     int bVal = getIntegerValue(bOffset);
     System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE);
@@ -818,6 +855,7 @@
     * @return a negative integer, zero, or a positive integer as the first
     *         offset value is less than, equal to, or greater than the second.
     */
    @Override
    public int compare(byte[] b, int offset, int length, int indexID,
                       int otherOffset, int otherLength, int otherIndexID)
    {
@@ -876,6 +914,7 @@
     *         offset value is less than, equal to, or greater than the second
     *         byte array.
     */
    @Override
    public int compare(byte[] b, int offset, int length, int indexID,
                       byte[] other, int otherLength, int otherIndexID)
    {
@@ -931,6 +970,7 @@
     *         offset value is less than, equal to, or greater than the second
     *         byte array.
     */
    @Override
    public int compare(byte[] b, int offset, int length, byte[] other,
                       int otherLength)
    {