/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2013-2015 ForgeRock AS. */ package org.opends.server.backends.pluggable; import java.io.ByteArrayOutputStream; import org.forgerock.opendj.ldap.ByteSequence; /** * This class represents a index buffer used to store the keys and entry IDs * processed from the LDIF file during phase one of an import, or rebuild index * process. Each key and ID is stored in a record in the buffer. *

* The records in the buffer are eventually sorted, based on the key, when the * maximum size of the buffer is reached and no more records will fit into the * buffer. The buffer is scheduled to be flushed to an index scratch file and * then re-cycled by the import, or rebuild-index process. *

*

* The structure of a record in the buffer is the following: * *

 * +-------------+-------------+---------+---------+------------+-----------+
 * | record size | INS/DEL bit | indexID | entryID | key length | key bytes |
 * +-------------+-------------+---------+---------+------------+-----------+
 * 
* * The record size is used for fast seeks to quickly "jump" over records. *

*

* The records are packed as much as possible, to optimize the buffer space. *

*

* This class is not thread safe. *

*/ final class IndexOutputBuffer implements Comparable { /** The number of bytes of a Java int. */ static final int INT_SIZE = 4; /** The number of bytes of a Java long. */ static final int LONG_SIZE = 8; /** * The record overhead. In addition to entryID, key length and key bytes, the * record overhead includes the INS/DEL bit + indexID */ private static final int REC_OVERHEAD = 1 + INT_SIZE; /** Buffer records are either insert records or delete records. */ private static final byte DEL = 0, INS = 1; /** The size of a buffer. */ private final int size; /** Byte array holding the actual buffer data. */ private final byte buffer[]; /** * Used to break a tie (keys equal) when the buffers are being merged * for writing to the index scratch file. */ private long bufferID; /** OffSet where next key is written. */ private int keyOffset; /** OffSet where next value record is written. */ private int recordOffset; /** Amount of bytes left in the buffer. */ private int bytesLeft; /** Number of keys in the buffer. */ private int keys; /** Used to iterate over the buffer when writing to a scratch file. */ private int position; /** * Used to make sure that an instance of this class is put on the * correct scratch file writer work queue for processing. */ private Importer.IndexKey indexKey; /** * Set to {@code true} if the buffer should not be recycled. Used when the * importer/rebuild index process is doing phase one cleanup and flushing * buffers not completed. */ private boolean discarded; private ImportRecord currentRecord; /** * Create an instance of a IndexBuffer using the specified size. * * @param size The size of the underlying byte array. */ public IndexOutputBuffer(int size) { this.size = size; this.buffer = new byte[size]; this.bytesLeft = size; this.recordOffset = size - 1; } /** * Reset an IndexBuffer so it can be re-cycled. */ public void reset() { bytesLeft = size; keyOffset = 0; recordOffset = size - 1; keys = 0; position = 0; currentRecord = null; indexKey = null; } /** * Creates a new poison buffer. Poison buffers are used to stop the processing of import tasks. * * @return a new poison buffer */ public static IndexOutputBuffer poison() { return new IndexOutputBuffer(0); } /** * Set the ID of a buffer to the specified value. * * @param bufferID The value to set the buffer ID to. */ public void setBufferID(long bufferID) { this.bufferID = bufferID; } /** * Return the ID of a buffer. * * @return The value of a buffer's ID. */ private long getBufferID() { return this.bufferID; } /** * Determines if a buffer is a poison buffer. A poison buffer is used to * shutdown work queues when import/rebuild index phase one is completed. * A poison buffer has a 0 size. * * @return {@code true} if a buffer is a poison buffer, or {@code false} * otherwise. */ public boolean isPoison() { return size == 0; } /** * Determines if buffer should be re-cycled by calling {@link #reset()}. * * @return {@code true} if buffer should be recycled, or {@code false} if it should not. */ boolean isDiscarded() { return discarded; } /** * Sets the discarded flag to {@code true}. */ public void discard() { discarded = true; } /** * Returns {@code true} if there is enough space available to write the * specified byte array in the buffer. It returns {@code false} otherwise. * * @param kBytes The byte array to check space against. * @param entryID The entryID value to check space against. * @return {@code true} if there is space to write the byte array in a * buffer, or {@code false} otherwise. */ public boolean isSpaceAvailable(ByteSequence kBytes, long entryID) { return getRequiredSize(kBytes.length(), entryID) < bytesLeft; } /** * Return a buffer's current position value. * * @return The buffer's current position value. */ public int getPosition() { return position; } /** * Set a buffer's position value to the specified position. * * @param position The value to set the position to. */ public void setPosition(int position) { this.position = position; this.currentRecord = toRecord(position); } /** * Sort the buffer. */ public void sort() { sort(0, keys); } /** * Add the specified key byte array and EntryID to the buffer. * * @param keyBytes The key byte array. * @param entryID The EntryID. * @param indexID The index ID the record belongs. * @param insert True if key is an insert, false otherwise. */ public void add(ByteSequence keyBytes, EntryID entryID, int indexID, boolean insert) { // write the record data, but leave the space to write the record size just // before it recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert); // then write the returned record size keyOffset = writeInt(buffer, keyOffset, recordOffset); bytesLeft = recordOffset - keyOffset; keys++; } /** * Writes the full record minus the record size itself. */ private int addRecord(ByteSequence key, long entryID, int indexID, boolean insert) { int retOffset = recordOffset - getRecordSize(key.length()); int offSet = retOffset; // write the INS/DEL bit buffer[offSet++] = insert ? INS : DEL; // write the indexID offSet = writeInt(buffer, offSet, indexID); // write the entryID offSet = writeLong(buffer, offSet, entryID); // write the key length offSet = writeInt(buffer, offSet, key.length()); // write the key bytes key.copyTo(buffer, offSet); return retOffset; } /** * Computes the full size of the record. * * @param keyLen The length of the key of index * @param entryID The entry id * @return The size that such record would take in the IndexOutputBuffer */ public static int getRequiredSize(int keyLen, long entryID) { // also add up the space needed to store the record size return getRecordSize(keyLen) + INT_SIZE; } private static int getRecordSize(int keyLen) { // Adds up (INS/DEL bit + indexID) + entryID + key length + key bytes return REC_OVERHEAD + LONG_SIZE + INT_SIZE + keyLen; } /** * Write the entryID at the specified position to the specified output stream. * Used when writing the index scratch files. * * @param stream The stream to write the record at the index to. * @param position The position of the record to write. */ public void writeEntryID(ByteArrayOutputStream stream, int position) { int offSet = getOffset(position); stream.write(buffer, offSet + REC_OVERHEAD, LONG_SIZE); } /** * Return {@code true} if the record specified by the position is an insert * record, or {@code false} if it a delete record. * * @param position The position of the record. * @return {@code true} if the record is an insert record, or {@code false} * if it is a delete record. */ public boolean isInsertRecord(int position) { int recOffset = getOffset(position); return buffer[recOffset] == INS; } private int getOffset(int position) { return readInt(position * INT_SIZE); } private int compare(int xPosition, int yPosition) { return toRecord(xPosition).compareTo(toRecord(yPosition)); } private ImportRecord toRecord(int position) { return ImportRecord.fromBufferAndPosition(buffer, position); } ImportRecord currentRecord() { return currentRecord; } /** * Compare current IndexBuffer to the specified index buffer using both the * comparator and index ID of both buffers. * * The key at the value of position in both buffers are used in the compare. * * @param b The IndexBuffer to compare to. * @return 0 if the buffers are equal, -1 if the current buffer is less * than the specified buffer, or 1 if it is greater. */ @Override public int compareTo(IndexOutputBuffer b) { int cmp = currentRecord().compareTo(b.currentRecord()); if (cmp == 0) { // This is tested in a tree set remove when a buffer is removed from the tree set. return compareLongs(bufferID, b.getBufferID()); } return cmp; } private static int compareLongs(long l1, long l2) { if (l1 == l2) { return 0; } else if (l1 < l2) { return -1; } else { return 1; } } /** * Compare the byte array at the current position with the byte array at the * provided position. * * @param position The index pointing to the byte array to compare. * @return {@code true} if the byte arrays are equal, or {@code false} otherwise */ public boolean sameKeyAndIndexID(int position) { return currentRecord().equals(toRecord(position)); } /** * Return the current number of keys. * * @return The number of keys currently in an index buffer. */ public int getNumberKeys() { return keys; } /** * Return {@code true} if the buffer has more data to process, or * {@code false} otherwise. Used when iterating over the buffer writing the * scratch index file. * * @return {@code true} if a buffer has more data to process, or * {@code false} otherwise. */ public boolean hasMoreData() { return position + 1 < keys; } /** * Advance the position pointer to the next record in the buffer. Used when * iterating over the buffer examining keys. */ public void nextRecord() { setPosition(position + 1); } private int writeInt(byte[] buffer, int offset, int val) { final int endOffset = offset + INT_SIZE; for (int i = endOffset - 1; i >= offset; i--) { buffer[i] = (byte) (val & 0xff); val >>>= 8; } return endOffset; } private int writeLong(byte[] buffer, int offset, long val) { final int endOffset = offset + LONG_SIZE; for (int i = endOffset - 1; i >= offset; i--) { buffer[i] = (byte) (val & 0xff); val >>>= 8; } return endOffset; } private int readInt(int index) { int answer = 0; for (int i = 0; i < INT_SIZE; i++) { byte b = buffer[index + i]; answer <<= 8; answer |= (b & 0xff); } return answer; } private int med3(int a, int b, int c) { ImportRecord ra = toRecord(a); ImportRecord rb = toRecord(b); ImportRecord rc = toRecord(c); return ra.compareTo(rb) < 0 ? (rb.compareTo(rc) < 0 ? b : ra.compareTo(rc) < 0 ? c : a) : (rb.compareTo(rc) > 0 ? b : ra.compareTo(rc) > 0 ? c : a); } private void sort(int off, int len) { if (len < 7) { for (int i=off; ioff && compare(j-1, j)>0; j--) { swap(j, j-1); } } return; } int m = off + (len >> 1); if (len > 7) { int l = off; int n = off + len - 1; if (len > 40) { int s = len/8; l = med3(l, l+s, l+2*s); m = med3(m-s, m, m+s); n = med3(n-2*s, n-s, n); } m = med3(l, m, n); } int a = off, b = a, c = off + len - 1, d = c; while(true) { ImportRecord rm = toRecord(m); while (b <= c) { int cmp = toRecord(b).compareTo(rm); if (cmp > 0) { break; } else if (cmp == 0) { swap(a++, b); } b++; } while (c >= b) { int cmp = toRecord(c).compareTo(rm); if (cmp < 0) { break; } else if (cmp == 0) { swap(c, d--); } c--; } if (b > c) { break; } swap(b++, c--); } // Swap partition elements back to middle int s, n = off + len; s = Math.min(a-off, b-a ); vectorSwap(off, b-s, s); s = Math.min(d-c, n-d-1); vectorSwap(b, n-s, s); s = b - a; // Recursively sort non-partition-elements if (s > 1) { sort(off, s); } s = d - c; if (s > 1) { sort(n-s, s); } } private void swap(int a, int b) { int aOffset = a * INT_SIZE; int bOffset = b * INT_SIZE; int tmp = readInt(bOffset); System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE); writeInt(buffer, aOffset, tmp); } private void vectorSwap(int a, int b, int n) { for (int i=0; i