| | |
| | | |
| | | import static org.opends.messages.JebMessages.*; |
| | | import static org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType.*; |
| | | import static org.opends.server.backends.jeb.IndexOutputBuffer.*; |
| | | import static org.opends.server.util.DynamicConstants.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | |
| | | import java.io.FileNotFoundException; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.OutputStream; |
| | | import java.io.RandomAccessFile; |
| | | import java.nio.ByteBuffer; |
| | | import java.util.ArrayList; |
| | |
| | | new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>(); |
| | | |
| | | /** Map of DB containers to index managers. Used to start phase 2. */ |
| | | private final List<IndexManager> indexMgrList = |
| | | new LinkedList<IndexManager>(); |
| | | private final List<IndexManager> indexMgrList = new LinkedList<IndexManager>(); |
| | | /** Map of DB containers to DN-based index managers. Used to start phase 2. */ |
| | | private final List<IndexManager> DNIndexMgrList = |
| | | new LinkedList<IndexManager>(); |
| | | private final List<IndexManager> DNIndexMgrList = new LinkedList<IndexManager>(); |
| | | |
| | | /** |
| | | * Futures used to indicate when the index file writers are done flushing |
| | |
| | | { |
| | | if (index != null) |
| | | { |
| | | int id = System.identityHashCode(index); |
| | | idContainerMap.putIfAbsent(id, index); |
| | | idContainerMap.putIfAbsent(getIndexID(index), index); |
| | | } |
| | | } |
| | | |
| | | private static int getIndexID(DatabaseContainer index) |
| | | { |
| | | return System.identityHashCode(index); |
| | | } |
| | | |
| | | private Suffix getSuffix(EntryContainer entryContainer) |
| | | throws ConfigException, InitializationException |
| | | { |
| | |
| | | indexBuffer = getNewIndexBuffer(sizeNeeded); |
| | | indexBufferMap.put(indexKey, indexBuffer); |
| | | } |
| | | int id = System.identityHashCode(container); |
| | | indexBuffer.add(key, entryID, id, insert); |
| | | return id; |
| | | int indexID = getIndexID(container); |
| | | indexBuffer.add(key, entryID, indexID, insert); |
| | | return indexID; |
| | | } |
| | | |
| | | IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException |
| | |
| | | ByteBuffer key = null; |
| | | ImportIDSet insertIDSet = null; |
| | | ImportIDSet deleteIDSet = null; |
| | | Integer indexID = null; |
| | | |
| | | if (isCanceled) |
| | | { |
| | |
| | | return null; |
| | | } |
| | | |
| | | Integer indexID = null; |
| | | while (!bufferSet.isEmpty()) |
| | | { |
| | | IndexInputBuffer b = bufferSet.pollFirst(); |
| | |
| | | |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | key.flip(); |
| | | b.getKey(key); |
| | | b.fetchKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | } |
| | | key.flip(); |
| | | b.getKey(key); |
| | | b.fetchKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | |
| | | |
| | | if (b.hasMoreData()) |
| | | { |
| | | b.getNextRecord(); |
| | | b.fetchNextRecord(); |
| | | bufferSet.add(b); |
| | | } |
| | | } |
| | |
| | | |
| | | private void id2child(EntryID childID) throws DirectoryException |
| | | { |
| | | ImportIDSet idSet; |
| | | if (parentID != null) |
| | | { |
| | | if (!id2childTree.containsKey(parentID.getDatabaseEntry().getData())) |
| | | ImportIDSet idSet; |
| | | byte[] parentIDBytes = parentID.getDatabaseEntry().getData(); |
| | | if (!id2childTree.containsKey(parentIDBytes)) |
| | | { |
| | | idSet = new ImportIDSet(1, childLimit, childDoCount); |
| | | id2childTree.put(parentID.getDatabaseEntry().getData(), idSet); |
| | | id2childTree.put(parentIDBytes, idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2childTree.get(parentID.getDatabaseEntry().getData()); |
| | | idSet = id2childTree.get(parentIDBytes); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | if (id2childTree.size() > DN_STATE_CACHE_SIZE) |
| | |
| | | if (parentID != null) |
| | | { |
| | | ImportIDSet idSet; |
| | | if (!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData())) |
| | | byte[] parentIDBytes = parentID.getDatabaseEntry().getData(); |
| | | if (!id2subtreeTree.containsKey(parentIDBytes)) |
| | | { |
| | | idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount); |
| | | id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet); |
| | | id2subtreeTree.put(parentIDBytes, idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData()); |
| | | idSet = id2subtreeTree.get(parentIDBytes); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | // TODO: |
| | |
| | | // Just ignore. |
| | | break; |
| | | } |
| | | if (!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData())) |
| | | |
| | | byte[] nodeIDBytes = nodeID.getDatabaseEntry().getData(); |
| | | if (!id2subtreeTree.containsKey(nodeIDBytes)) |
| | | { |
| | | idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount); |
| | | id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet); |
| | | id2subtreeTree.put(nodeIDBytes, idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData()); |
| | | idSet = id2subtreeTree.get(nodeIDBytes); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | } |
| | |
| | | { |
| | | resetStreams(); |
| | | |
| | | long id = 0; |
| | | long bufferID = 0; |
| | | long bufferLen = 0; |
| | | for (IndexOutputBuffer b : buffers) |
| | | { |
| | |
| | | else |
| | | { |
| | | b.setPosition(0); |
| | | b.setID(id++); |
| | | b.setBufferID(bufferID++); |
| | | indexSortedSet.add(b); |
| | | } |
| | | } |
| | |
| | | { |
| | | insertKeyCount = 1; |
| | | insertByteStream.reset(); |
| | | PackedInteger.writeInt(tmpArray, 0, -1); |
| | | insertByteStream.write(tmpArray, 0, 1); |
| | | writePackedInt(insertByteStream, -1); |
| | | } |
| | | int insertSize = PackedInteger.getWriteIntLength(insertKeyCount); |
| | | PackedInteger.writeInt(tmpArray, 0, insertKeyCount); |
| | | bufferStream.write(tmpArray, 0, insertSize); |
| | | |
| | | int insertSize = writePackedInt(bufferStream, insertKeyCount); |
| | | if (insertByteStream.size() > 0) |
| | | { |
| | | insertByteStream.writeTo(bufferStream); |
| | | } |
| | | int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount); |
| | | PackedInteger.writeInt(tmpArray, 0, deleteKeyCount); |
| | | bufferStream.write(tmpArray, 0, deleteSize); |
| | | |
| | | int deleteSize = writePackedInt(bufferStream, deleteKeyCount); |
| | | if (deleteByteStream.size() > 0) |
| | | { |
| | | deleteByteStream.writeTo(bufferStream); |
| | | } |
| | | return insertSize + deleteSize; |
| | | return insertSize + insertByteStream.size() + deleteSize + deleteByteStream.size(); |
| | | } |
| | | |
| | | private int writeHeader(int indexID, int keySize) throws IOException |
| | | { |
| | | bufferStream.writeInt(indexID); |
| | | int packedSize = PackedInteger.getWriteIntLength(keySize); |
| | | PackedInteger.writeInt(tmpArray, 0, keySize); |
| | | bufferStream.write(tmpArray, 0, packedSize); |
| | | return packedSize; |
| | | return INT_SIZE + writePackedInt(bufferStream, keySize); |
| | | } |
| | | |
| | | private int writeRecord(IndexOutputBuffer b) throws IOException |
| | | { |
| | | int keySize = b.getKeySize(); |
| | | int packedSize = writeHeader(b.getIndexID(), keySize); |
| | | int headerSize = writeHeader(b.getIndexID(), keySize); |
| | | b.writeKey(bufferStream); |
| | | packedSize += writeByteStreams(); |
| | | return packedSize + keySize + insertByteStream.size() + deleteByteStream.size() + 4; |
| | | int bodySize = writeByteStreams(); |
| | | return headerSize + keySize + bodySize; |
| | | } |
| | | |
| | | private int writeRecord(byte[] k, int indexID) throws IOException |
| | | { |
| | | int packedSize = writeHeader(indexID, k.length); |
| | | int keySize = k.length; |
| | | int headerSize = writeHeader(indexID, keySize); |
| | | bufferStream.write(k); |
| | | packedSize += writeByteStreams(); |
| | | return packedSize + k.length + insertByteStream.size() + deleteByteStream.size() + 4; |
| | | int bodySize = writeByteStreams(); |
| | | return headerSize + keySize + bodySize; |
| | | } |
| | | |
| | | private int writePackedInt(OutputStream stream, int value) throws IOException |
| | | { |
| | | int writeSize = PackedInteger.getWriteIntLength(value); |
| | | PackedInteger.writeInt(tmpArray, 0, value); |
| | | stream.write(tmpArray, 0, writeSize); |
| | | return writeSize; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + "(" + indexMgr.getBufferFileName() + ": " + indexMgr.getBufferFile() + ")"; |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | return numberOfBuffers; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + "(" + bufferFileName + ": " + bufferFile + ")"; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | package org.opends.server.backends.jeb; |
| | | |
| | | import static org.opends.messages.JebMessages.ERR_JEB_IMPORT_BUFFER_IO_ERROR; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import static org.opends.messages.JebMessages.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.nio.ByteBuffer; |
| | | import java.nio.channels.FileChannel; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.backends.jeb.Importer.IndexManager; |
| | | |
| | | import com.sleepycat.util.PackedInteger; |
| | |
| | | final class IndexInputBuffer implements Comparable<IndexInputBuffer> |
| | | { |
| | | |
| | | /** Possible states while reading a record. */ |
| | | private static enum RecordState |
| | | { |
| | | START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private final IndexManager indexMgr; |
| | | private final FileChannel channel; |
| | | private final long begin; |
| | | private final long end; |
| | | private final int id; |
| | | private final int bufferID; |
| | | |
| | | private long offset; |
| | | private final ByteBuffer cache; |
| | | |
| | | // Next fields are the fetched record data |
| | | private Integer indexID; |
| | | private ByteBuffer keyBuf = ByteBuffer.allocate(128); |
| | | |
| | | |
| | | |
| | | /** |
| | | * Possible states while reading a record. |
| | | */ |
| | | private enum RecordState |
| | | { |
| | | START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET |
| | | } |
| | | |
| | | |
| | | |
| | | private RecordState recordState = RecordState.START; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Creates a new index input buffer. |
| | | * |
| | |
| | | * The position of the start of the buffer in the scratch file. |
| | | * @param end |
| | | * The position of the end of the buffer in the scratch file. |
| | | * @param id |
| | | * The index ID. |
| | | * @param bufferID |
| | | * The buffer ID. |
| | | * @param cacheSize |
| | | * The cache size. |
| | | * @throws IOException |
| | | * If an IO error occurred when priming the cache. |
| | | */ |
| | | public IndexInputBuffer(IndexManager indexMgr, FileChannel channel, |
| | | long begin, long end, int id, int cacheSize) throws IOException |
| | | long begin, long end, int bufferID, int cacheSize) throws IOException |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | this.channel = channel; |
| | | this.begin = begin; |
| | | this.end = end; |
| | | this.offset = 0; |
| | | this.id = id; |
| | | this.bufferID = bufferID; |
| | | this.cache = ByteBuffer.allocate(Math.max(cacheSize - 384, 256)); |
| | | |
| | | loadCache(); |
| | |
| | | keyBuf.flip(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void loadCache() throws IOException |
| | | { |
| | | channel.position(begin + offset); |
| | |
| | | indexMgr.addBytesRead(bytesRead); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns {@code true} if this buffer has more data. |
| | | * |
| | |
| | | return cache.remaining() != 0 || hasMore; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the length of the next key. |
| | | * |
| | |
| | | return keyBuf.limit(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the next key. |
| | | * Fetches the next key into the provided byte buffer. |
| | | * |
| | | * @param b |
| | | * A buffer into which the key should be added. |
| | | * A buffer where to fetch the key |
| | | */ |
| | | public void getKey(ByteBuffer b) |
| | | public void fetchKey(ByteBuffer b) |
| | | { |
| | | keyBuf.get(b.array(), 0, keyBuf.limit()); |
| | | b.limit(keyBuf.limit()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the index ID of the next record. |
| | | * |
| | |
| | | { |
| | | try |
| | | { |
| | | getNextRecord(); |
| | | fetchNextRecord(); |
| | | } |
| | | catch (IOException ex) |
| | | { |
| | | logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr |
| | | .getBufferFileName()); |
| | | logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr.getBufferFileName()); |
| | | throw new RuntimeException(ex); |
| | | } |
| | | } |
| | | return indexID; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Reads the next record from the buffer, skipping any remaining data in the |
| | | * current record. |
| | |
| | | * @throws IOException |
| | | * If an IO error occurred. |
| | | */ |
| | | public void getNextRecord() throws IOException |
| | | public void fetchNextRecord() throws IOException |
| | | { |
| | | switch (recordState) |
| | | { |
| | |
| | | recordState = RecordState.NEED_INSERT_ID_SET; |
| | | } |
| | | |
| | | |
| | | |
| | | private int getInt() throws IOException |
| | | { |
| | | ensureData(4); |
| | | return cache.getInt(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Reads the next ID set from the record and merges it with the provided ID |
| | | * set. |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private boolean ensureData(int len) throws IOException |
| | | { |
| | | boolean ret = false; |
| | | if (cache.remaining() == 0) |
| | | { |
| | | cache.clear(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | return true; |
| | | } |
| | | else if (cache.remaining() < len) |
| | | { |
| | | cache.compact(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | return true; |
| | | } |
| | | return ret; |
| | | return false; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Compares this buffer with the provided key and index ID. |
| | | * |
| | |
| | | */ |
| | | int compare(ByteBuffer cKey, Integer cIndexID) |
| | | { |
| | | int returnCode, rc; |
| | | if (keyBuf.limit() == 0) |
| | | ensureRecordFetched(); |
| | | int cmp = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), cKey.array(), cKey.limit()); |
| | | if (cmp == 0) |
| | | { |
| | | getIndexID(); |
| | | return (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; |
| | | } |
| | | rc = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), |
| | | cKey.array(), cKey.limit()); |
| | | if (rc != 0) |
| | | { |
| | | returnCode = 1; |
| | | } |
| | | else |
| | | { |
| | | returnCode = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; |
| | | } |
| | | return returnCode; |
| | | return 1; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int compareTo(IndexInputBuffer o) |
| | | { |
| | | // used in remove. |
| | |
| | | return 0; |
| | | } |
| | | |
| | | ensureRecordFetched(); |
| | | o.ensureRecordFetched(); |
| | | |
| | | byte[] oKey = o.keyBuf.array(); |
| | | int oLen = o.keyBuf.limit(); |
| | | int cmp = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), oKey, oLen); |
| | | if (cmp == 0) |
| | | { |
| | | cmp = indexID.intValue() - o.getIndexID().intValue(); |
| | | if (cmp == 0) |
| | | { |
| | | return bufferID - o.bufferID; |
| | | } |
| | | } |
| | | return cmp; |
| | | } |
| | | |
| | | private void ensureRecordFetched() |
| | | { |
| | | if (keyBuf.limit() == 0) |
| | | { |
| | | getIndexID(); |
| | | } |
| | | |
| | | if (o.keyBuf.limit() == 0) |
| | | { |
| | | o.getIndexID(); |
| | | } |
| | | |
| | | byte[] oKey = o.keyBuf.array(); |
| | | int oLen = o.keyBuf.limit(); |
| | | int returnCode = Importer.indexComparator.compare(keyBuf.array(), 0, |
| | | keyBuf.limit(), oKey, oLen); |
| | | if (returnCode == 0) |
| | | { |
| | | returnCode = indexID.intValue() - o.getIndexID().intValue(); |
| | | if (returnCode == 0) |
| | | { |
| | | returnCode = id - o.id; |
| | | } |
| | | } |
| | | return returnCode; |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** The size of a Java int. A Java int is 32 bits, i.e. 4 bytes. */ |
| | | private static final int INT_SIZE = 4; |
| | | static final int INT_SIZE = 4; |
| | | |
| | | /** |
| | | * The record overhead. In addition to entryID, key length and key bytes, the |
| | |
| | | * Used to break a tie (keys equal) when the buffers are being merged |
| | | * for writing to the index scratch file. |
| | | */ |
| | | private long id; |
| | | |
| | | /** Temporary buffer used to store integer values. */ |
| | | private final byte[] intBytes = new byte[INT_SIZE]; |
| | | private long bufferID; |
| | | |
| | | /** OffSet where next key is written. */ |
| | | private int keyOffset; |
| | |
| | | /** |
| | | * Set the ID of a buffer to the specified value. |
| | | * |
| | | * @param id The value to set the ID to. |
| | | * @param bufferID The value to set the buffer ID to. |
| | | */ |
| | | public void setID(long id) |
| | | public void setBufferID(long bufferID) |
| | | { |
| | | this.id = id; |
| | | this.bufferID = bufferID; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return the ID of a buffer. |
| | | * |
| | |
| | | */ |
| | | private long getBufferID() |
| | | { |
| | | return this.id; |
| | | return this.bufferID; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Determines if a buffer is a poison buffer. A poison buffer is used to |
| | | * shutdown work queues when import/rebuild index phase one is completed. |
| | |
| | | * specified byte array in the buffer. It returns {@code false} otherwise. |
| | | * |
| | | * @param kBytes The byte array to check space against. |
| | | * @param id The id value to check space against. |
| | | * @param entryID The entryID value to check space against. |
| | | * @return {@code true} if there is space to write the byte array in a |
| | | * buffer, or {@code false} otherwise. |
| | | */ |
| | | public boolean isSpaceAvailable(byte[] kBytes, long id) { |
| | | return getRequiredSize(kBytes.length, id) < bytesLeft; |
| | | public boolean isSpaceAvailable(byte[] kBytes, long entryID) { |
| | | return getRequiredSize(kBytes.length, entryID) < bytesLeft; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param indexID The index ID the record belongs. |
| | | * @param insert <CODE>True</CODE> if key is an insert, false otherwise. |
| | | */ |
| | | public void add(byte[] keyBytes, EntryID entryID, int indexID, |
| | | boolean insert) { |
| | | public void add(byte[] keyBytes, EntryID entryID, int indexID, boolean insert) { |
| | | // write the record data, but leave the space to write the record size just |
| | | // before it |
| | | recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert); |
| | | // then write the returned record size |
| | | System.arraycopy(getIntBytes(recordOffset), 0, buffer, keyOffset, INT_SIZE); |
| | | keyOffset += INT_SIZE; |
| | | keyOffset += writeIntBytes(buffer, keyOffset, recordOffset); |
| | | bytesLeft = recordOffset - keyOffset; |
| | | keys++; |
| | | } |
| | |
| | | // write the INS/DEL bit |
| | | buffer[offSet++] = insert ? INS : DEL; |
| | | // write the indexID |
| | | System.arraycopy(getIntBytes(indexID), 0, buffer, offSet, INT_SIZE); |
| | | offSet += INT_SIZE; |
| | | offSet += writeIntBytes(buffer, offSet, indexID); |
| | | // write the entryID |
| | | offSet = PackedInteger.writeLong(buffer, offSet, id); |
| | | // write the key length |
| | |
| | | * Computes the full size of the record. |
| | | * |
| | | * @param keyLen The length of the key of index |
| | | * @param id The entry id |
| | | * @param entryID The entry id |
| | | * @return The size that such record would take in the IndexOutputBuffer |
| | | */ |
| | | public static int getRequiredSize(int keyLen, long id) |
| | | public static int getRequiredSize(int keyLen, long entryID) |
| | | { |
| | | // Adds up the key length + key bytes + entryID + indexID + the INS/DEL bit |
| | | // and finally the space needed to store the record size |
| | | return getRecordSize(keyLen, id) + INT_SIZE; |
| | | return getRecordSize(keyLen, entryID) + INT_SIZE; |
| | | } |
| | | |
| | | private static int getRecordSize(int keyLen, long id) |
| | | private static int getRecordSize(int keyLen, long entryID) |
| | | { |
| | | // Adds up the key length + key bytes + ... |
| | | return PackedInteger.getWriteIntLength(keyLen) + keyLen + |
| | | // ... entryID + (indexID + INS/DEL bit). |
| | | PackedInteger.getWriteLongLength(id) + REC_OVERHEAD; |
| | | PackedInteger.getWriteLongLength(entryID) + REC_OVERHEAD; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Write record at specified index to the specified output stream. Used when |
| | | * when writing the index scratch files. |
| | | |
| | | * |
| | | * @param stream The stream to write the record at the index to. |
| | | * @param index The index of the record to write. |
| | | */ |
| | | public void writeID(ByteArrayOutputStream stream, int index) |
| | | { |
| | | int offSet = getIntegerValue(index * INT_SIZE); |
| | | int offSet = getOffset(index); |
| | | int len = PackedInteger.getReadLongLength(buffer, offSet + REC_OVERHEAD); |
| | | stream.write(buffer, offSet + REC_OVERHEAD, len); |
| | | } |
| | |
| | | */ |
| | | public boolean isInsertRecord(int index) |
| | | { |
| | | int recOffset = getIntegerValue(index * INT_SIZE); |
| | | return buffer[recOffset] != DEL; |
| | | int recOffset = getOffset(index); |
| | | return buffer[recOffset] == INS; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return the size of the key part of the record. |
| | | * |
| | |
| | | */ |
| | | public int getKeySize() |
| | | { |
| | | int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(position) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | return PackedInteger.readInt(buffer, offSet); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return the key value part of a record indicated by the current buffer |
| | | * position. |
| | |
| | | private ByteBuffer getKeyBuf(int x) |
| | | { |
| | | keyBuffer.clear(); |
| | | int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(x) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | int keyLen = PackedInteger.readInt(buffer, offSet); |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | |
| | | */ |
| | | private byte[] getKey(int x) |
| | | { |
| | | int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(x) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | int keyLen = PackedInteger.readInt(buffer, offSet); |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | |
| | | return key; |
| | | } |
| | | |
| | | |
| | | private int getIndexID(int x) |
| | | private int getOffset(int position) |
| | | { |
| | | return getIntegerValue(getIntegerValue(x * INT_SIZE) + 1); |
| | | return getIntegerValue(position * INT_SIZE); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return index id associated with the current position's record. |
| | | * |
| | |
| | | */ |
| | | public int getIndexID() |
| | | { |
| | | return getIntegerValue(getIntegerValue(position * INT_SIZE) + 1); |
| | | return getIndexID(position); |
| | | } |
| | | |
| | | |
| | | private boolean is(int x, int y, CompareOp op) |
| | | private int getIndexID(int position) |
| | | { |
| | | int xoffSet = getIntegerValue(x * INT_SIZE); |
| | | int xIndexID = getIntegerValue(xoffSet + 1); |
| | | return getIndexIDFromOffset(getOffset(position)); |
| | | } |
| | | |
| | | private int getIndexIDFromOffset(int offset) |
| | | { |
| | | return getIntegerValue(offset + 1); |
| | | } |
| | | |
| | | private boolean is(CompareOp op, int x, int y) |
| | | { |
| | | int xoffSet = getOffset(x); |
| | | int xIndexID = getIndexIDFromOffset(xoffSet); |
| | | xoffSet += REC_OVERHEAD; |
| | | xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet); |
| | | int xKeyLen = PackedInteger.readInt(buffer, xoffSet); |
| | | int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet; |
| | | int yoffSet = getIntegerValue(y * INT_SIZE); |
| | | int yIndexID = getIntegerValue(yoffSet + 1); |
| | | int yoffSet = getOffset(y); |
| | | int yIndexID = getIndexIDFromOffset(yoffSet); |
| | | yoffSet += REC_OVERHEAD; |
| | | yoffSet += PackedInteger.getReadIntLength(buffer, yoffSet); |
| | | int yKeyLen = PackedInteger.readInt(buffer, yoffSet); |
| | | int yKey = PackedInteger.getReadIntLength(buffer, yoffSet) + yoffSet; |
| | | return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen, |
| | | xIndexID, yKey, yKeyLen, yIndexID), op); |
| | | int cmp = comparator.compare(buffer, xKey, xKeyLen, xIndexID, yKey, yKeyLen, yIndexID); |
| | | return evaluateReturnCode(cmp, op); |
| | | } |
| | | |
| | | |
| | | private boolean is(int x, byte[] yKey, CompareOp op, int yIndexID) |
| | | private boolean is(CompareOp op, int x, byte[] yKey, int yIndexID) |
| | | { |
| | | int xoffSet = getIntegerValue(x * INT_SIZE); |
| | | int xIndexID = getIntegerValue(xoffSet + 1); |
| | | int xoffSet = getOffset(x); |
| | | int xIndexID = getIndexIDFromOffset(xoffSet); |
| | | xoffSet += REC_OVERHEAD; |
| | | xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet); |
| | | int xKeyLen = PackedInteger.readInt(buffer, xoffSet); |
| | | int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet; |
| | | return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen, |
| | | xIndexID, yKey, yKey.length, yIndexID), op); |
| | | int cmp = comparator.compare(buffer, xKey, xKeyLen, xIndexID, yKey, yKey.length, yIndexID); |
| | | return evaluateReturnCode(cmp, op); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Compare the byte array at the current position with the specified one and |
| | | * using the specified index id. It will return {@code true} if the byte |
| | |
| | | */ |
| | | public boolean compare(byte[]b, int bIndexID) |
| | | { |
| | | int offset = getIntegerValue(position * INT_SIZE); |
| | | int indexID = getIntegerValue(offset + 1); |
| | | int offset = getOffset(position); |
| | | int indexID = getIndexIDFromOffset(offset); |
| | | offset += REC_OVERHEAD; |
| | | offset += PackedInteger.getReadIntLength(buffer, offset); |
| | | int keyLen = PackedInteger.readInt(buffer, offset); |
| | |
| | | public int compareTo(IndexOutputBuffer b) |
| | | { |
| | | final ByteBuffer keyBuf = b.getKeyBuf(b.position); |
| | | int offset = getIntegerValue(position * INT_SIZE); |
| | | int indexID = getIntegerValue(offset + 1); |
| | | int offset = getOffset(position); |
| | | int indexID = getIndexIDFromOffset(offset); |
| | | offset += REC_OVERHEAD; |
| | | offset += PackedInteger.getReadIntLength(buffer, offset); |
| | | int keyLen = PackedInteger.readInt(buffer, offset); |
| | |
| | | if (indexID == bIndexID) |
| | | { |
| | | // This is tested in a tree set remove when a buffer is removed from the tree set. |
| | | return compare(this.id, b.getBufferID()); |
| | | return compare(this.bufferID, b.getBufferID()); |
| | | } |
| | | else if (indexID < bIndexID) |
| | | { |
| | |
| | | */ |
| | | public void writeKey(DataOutputStream dataStream) throws IOException |
| | | { |
| | | int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(position) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | int keyLen = PackedInteger.readInt(buffer, offSet); |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | |
| | | */ |
| | | public boolean compare(int i) |
| | | { |
| | | return is(i, position, CompareOp.EQ); |
| | | return is(CompareOp.EQ, i, position); |
| | | } |
| | | |
| | | /** |
| | |
| | | position++; |
| | | } |
| | | |
| | | private byte[] getIntBytes(int val) |
| | | private int writeIntBytes(byte[] buffer, int offset, int val) |
| | | { |
| | | for (int i = 3; i >= 0; i--) { |
| | | intBytes[i] = (byte) (val & 0xff); |
| | | for (int i = offset + INT_SIZE - 1; i >= offset; i--) { |
| | | buffer[i] = (byte) (val & 0xff); |
| | | val >>>= 8; |
| | | } |
| | | return intBytes; |
| | | return INT_SIZE; |
| | | } |
| | | |
| | | |
| | | private int getIntegerValue(int index) |
| | | { |
| | | int answer = 0; |
| | |
| | | return answer; |
| | | } |
| | | |
| | | |
| | | private int med3(int a, int b, int c) |
| | | { |
| | | return is(a, b, CompareOp.LT) ? |
| | | (is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) : |
| | | (is(b,c,CompareOp.GT) ? b : is(a,c,CompareOp.GT) ? c : a); |
| | | return is(CompareOp.LT, a, b) ? |
| | | (is(CompareOp.LT,b,c) ? b : is(CompareOp.LT,a,c) ? c : a) : |
| | | (is(CompareOp.GT,b,c) ? b : is(CompareOp.GT,a,c) ? c : a); |
| | | } |
| | | |
| | | |
| | | private void sort(int off, int len) |
| | | { |
| | | if (len < 7) { |
| | | for (int i=off; i<len+off; i++) |
| | | { |
| | | for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--) |
| | | for (int j=i; j>off && is(CompareOp.GT, j-1, j); j--) |
| | | { |
| | | swap(j, j-1); |
| | | } |
| | |
| | | int a = off, b = a, c = off + len - 1, d = c; |
| | | while(true) |
| | | { |
| | | while (b <= c && is(b, mKey, CompareOp.LE, mIndexID)) |
| | | while (b <= c && is(CompareOp.LE, b, mKey, mIndexID)) |
| | | { |
| | | if (is(b, mKey, CompareOp.EQ, mIndexID)) |
| | | if (is(CompareOp.EQ, b, mKey, mIndexID)) |
| | | { |
| | | swap(a++, b); |
| | | } |
| | | b++; |
| | | } |
| | | while (c >= b && is(c, mKey, CompareOp.GE, mIndexID)) |
| | | while (c >= b && is(CompareOp.GE, c, mKey, mIndexID)) |
| | | { |
| | | if (is(c, mKey, CompareOp.EQ, mIndexID)) |
| | | if (is(CompareOp.EQ, c, mKey, mIndexID)) |
| | | { |
| | | swap(c, d--); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | private void swap(int a, int b) |
| | | { |
| | | int aOffset = a * INT_SIZE; |
| | | int bOffset = b * INT_SIZE; |
| | | int bVal = getIntegerValue(bOffset); |
| | | System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE); |
| | | System.arraycopy(getIntBytes(bVal), 0, buffer, aOffset, INT_SIZE); |
| | | writeIntBytes(buffer, aOffset, bVal); |
| | | } |
| | | |
| | | |
| | | private void vectorSwap(int a, int b, int n) |
| | | { |
| | | for (int i=0; i<n; i++, a++, b++) |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | private boolean evaluateReturnCode(int rc, CompareOp op) |
| | | { |
| | | switch(op) { |
| | |
| | | new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>(); |
| | | |
| | | /** Map of DB containers to index managers. Used to start phase 2. */ |
| | | private final List<IndexManager> indexMgrList = |
| | | new LinkedList<IndexManager>(); |
| | | private final List<IndexManager> indexMgrList = new LinkedList<IndexManager>(); |
| | | /** Map of DB containers to DN-based index managers. Used to start phase 2. */ |
| | | private final List<IndexManager> DNIndexMgrList = |
| | | new LinkedList<IndexManager>(); |
| | | private final List<IndexManager> DNIndexMgrList = new LinkedList<IndexManager>(); |
| | | |
| | | /** |
| | | * Futures used to indicate when the index file writers are done flushing |
| | |
| | | { |
| | | if (index != null) |
| | | { |
| | | int id = System.identityHashCode(index); |
| | | idContainerMap.putIfAbsent(id, index); |
| | | idContainerMap.putIfAbsent(getIndexID(index), index); |
| | | } |
| | | } |
| | | |
| | | private static int getIndexID(DatabaseContainer index) |
| | | { |
| | | return System.identityHashCode(index); |
| | | } |
| | | |
| | | private Suffix getSuffix(WriteableStorage txn, EntryContainer entryContainer) |
| | | throws ConfigException |
| | | { |
| | |
| | | indexBuffer = getNewIndexBuffer(sizeNeeded); |
| | | indexBufferMap.put(indexKey, indexBuffer); |
| | | } |
| | | int id = System.identityHashCode(container); |
| | | indexBuffer.add(key, entryID, id, insert); |
| | | return id; |
| | | int indexID = getIndexID(container); |
| | | indexBuffer.add(key, entryID, indexID, insert); |
| | | return indexID; |
| | | } |
| | | |
| | | IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException |
| | |
| | | ByteBuffer key = null; |
| | | ImportIDSet insertIDSet = null; |
| | | ImportIDSet deleteIDSet = null; |
| | | Integer indexID = null; |
| | | |
| | | if (isCanceled) |
| | | { |
| | |
| | | return null; |
| | | } |
| | | |
| | | Integer indexID = null; |
| | | while (!bufferSet.isEmpty()) |
| | | { |
| | | IndexInputBuffer b = bufferSet.pollFirst(); |
| | |
| | | |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | key.flip(); |
| | | b.getKey(key); |
| | | b.fetchKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | } |
| | | key.flip(); |
| | | b.getKey(key); |
| | | b.fetchKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | |
| | | |
| | | if (b.hasMoreData()) |
| | | { |
| | | b.getNextRecord(); |
| | | b.fetchNextRecord(); |
| | | bufferSet.add(b); |
| | | } |
| | | } |
| | |
| | | |
| | | private void id2child(EntryID childID) throws DirectoryException |
| | | { |
| | | ImportIDSet idSet; |
| | | if (parentID != null) |
| | | { |
| | | if (!id2childTree.containsKey(parentID.toByteString())) |
| | | ImportIDSet idSet; |
| | | final ByteString parentIDBytes = parentID.toByteString(); |
| | | if (!id2childTree.containsKey(parentIDBytes)) |
| | | { |
| | | idSet = new ImportIDSet(1, childLimit, childDoCount); |
| | | id2childTree.put(parentID.toByteString(), idSet); |
| | | id2childTree.put(parentIDBytes, idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2childTree.get(parentID.toByteString()); |
| | | idSet = id2childTree.get(parentIDBytes); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | if (id2childTree.size() > DN_STATE_CACHE_SIZE) |
| | |
| | | if (parentID != null) |
| | | { |
| | | ImportIDSet idSet; |
| | | if (!id2subtreeTree.containsKey(parentID.toByteString())) |
| | | final ByteString parentIDBytes = parentID.toByteString(); |
| | | if (!id2subtreeTree.containsKey(parentIDBytes)) |
| | | { |
| | | idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount); |
| | | id2subtreeTree.put(parentID.toByteString(), idSet); |
| | | id2subtreeTree.put(parentIDBytes, idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(parentID.toByteString()); |
| | | idSet = id2subtreeTree.get(parentIDBytes); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | // TODO: |
| | |
| | | // Just ignore. |
| | | break; |
| | | } |
| | | if (!id2subtreeTree.containsKey(nodeID.toByteString())) |
| | | |
| | | final ByteString nodeIDBytes = nodeID.toByteString(); |
| | | if (!id2subtreeTree.containsKey(nodeIDBytes)) |
| | | { |
| | | idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount); |
| | | id2subtreeTree.put(nodeID.toByteString(), idSet); |
| | | id2subtreeTree.put(nodeIDBytes, idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(nodeID.toByteString()); |
| | | idSet = id2subtreeTree.get(nodeIDBytes); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | } |
| | |
| | | { |
| | | resetStreams(); |
| | | |
| | | long id = 0; |
| | | long bufferID = 0; |
| | | long bufferLen = 0; |
| | | for (IndexOutputBuffer b : buffers) |
| | | { |
| | |
| | | else |
| | | { |
| | | b.setPosition(0); |
| | | b.setID(id++); |
| | | b.setBufferID(bufferID++); |
| | | indexSortedSet.add(b); |
| | | } |
| | | } |
| | |
| | | insertByteStream.reset(); |
| | | insertByteStream.write(-1); |
| | | } |
| | | bufferStream.writeInt(insertKeyCount); |
| | | |
| | | int insertSize = 4; |
| | | bufferStream.writeInt(insertKeyCount); |
| | | if (insertByteStream.size() > 0) |
| | | { |
| | | insertByteStream.writeTo(bufferStream); |
| | | } |
| | | bufferStream.write(deleteKeyCount); |
| | | |
| | | int deleteSize = 4; |
| | | bufferStream.write(deleteKeyCount); |
| | | if (deleteByteStream.size() > 0) |
| | | { |
| | | deleteByteStream.writeTo(bufferStream); |
| | | } |
| | | return insertSize + deleteSize; |
| | | return insertSize + insertByteStream.size() + deleteSize + deleteByteStream.size(); |
| | | } |
| | | |
| | | private int writeHeader(int indexID, int keySize) throws IOException |
| | | { |
| | | bufferStream.writeInt(indexID); |
| | | bufferStream.writeInt(keySize); |
| | | return 4; |
| | | return 2 * INT_SIZE; |
| | | } |
| | | |
| | | private int writeRecord(IndexOutputBuffer b) throws IOException |
| | | { |
| | | int keySize = b.getKeySize(); |
| | | int packedSize = writeHeader(b.getIndexID(), keySize); |
| | | int headerSize = writeHeader(b.getIndexID(), keySize); |
| | | b.writeKey(bufferStream); |
| | | packedSize += writeByteStreams(); |
| | | return packedSize + keySize + insertByteStream.size() + deleteByteStream.size() + 4; |
| | | int bodySize = writeByteStreams(); |
| | | return headerSize + bodySize + keySize; |
| | | } |
| | | |
| | | private int writeRecord(byte[] k, int indexID) throws IOException |
| | | { |
| | | int packedSize = writeHeader(indexID, k.length); |
| | | int keySize = k.length; |
| | | int headerSize = writeHeader(indexID, keySize); |
| | | bufferStream.write(k); |
| | | packedSize += writeByteStreams(); |
| | | return packedSize + k.length + insertByteStream.size() + deleteByteStream.size() + 4; |
| | | int bodySize = writeByteStreams(); |
| | | return headerSize + bodySize + keySize; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + "(" + indexMgr.getBufferFileName() + ": " + indexMgr.getBufferFile() + ")"; |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | return numberOfBuffers; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + "(" + bufferFileName + ": " + bufferFile + ")"; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | final class IndexInputBuffer implements Comparable<IndexInputBuffer> |
| | | { |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private final IndexManager indexMgr; |
| | | private final FileChannel channel; |
| | | private final long begin; |
| | | private final long end; |
| | | private final int id; |
| | | |
| | | private long offset; |
| | | private final ByteBuffer cache; |
| | | private Integer indexID; |
| | | private ByteBuffer keyBuf = ByteBuffer.allocate(128); |
| | | |
| | | /** Possible states while reading a record. */ |
| | | private static enum RecordState |
| | | { |
| | | START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private final IndexManager indexMgr; |
| | | private final FileChannel channel; |
| | | private final long begin; |
| | | private final long end; |
| | | private final int bufferID; |
| | | |
| | | private long offset; |
| | | private final ByteBuffer cache; |
| | | |
| | | // Next fields are the fetched record data |
| | | private Integer indexID; |
| | | private ByteBuffer keyBuf = ByteBuffer.allocate(128); |
| | | private RecordState recordState = RecordState.START; |
| | | |
| | | /** |
| | |
| | | * The position of the start of the buffer in the scratch file. |
| | | * @param end |
| | | * The position of the end of the buffer in the scratch file. |
| | | * @param id |
| | | * The index ID. |
| | | * @param bufferID |
| | | * The buffer ID. |
| | | * @param cacheSize |
| | | * The cache size. |
| | | * @throws IOException |
| | | * If an IO error occurred when priming the cache. |
| | | */ |
| | | public IndexInputBuffer(IndexManager indexMgr, FileChannel channel, |
| | | long begin, long end, int id, int cacheSize) throws IOException |
| | | long begin, long end, int bufferID, int cacheSize) throws IOException |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | this.channel = channel; |
| | | this.begin = begin; |
| | | this.end = end; |
| | | this.offset = 0; |
| | | this.id = id; |
| | | this.bufferID = bufferID; |
| | | this.cache = ByteBuffer.allocate(Math.max(cacheSize - 384, 256)); |
| | | |
| | | loadCache(); |
| | |
| | | keyBuf.flip(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void loadCache() throws IOException |
| | | { |
| | | channel.position(begin + offset); |
| | |
| | | indexMgr.addBytesRead(bytesRead); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns {@code true} if this buffer has more data. |
| | | * |
| | |
| | | return cache.remaining() != 0 || hasMore; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the length of the next key. |
| | | * |
| | |
| | | return keyBuf.limit(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the next key. |
| | | * Fetches the next key into the provided byte buffer. |
| | | * |
| | | * @param b |
| | | * A buffer into which the key should be added. |
| | | * A buffer where to fetch the key |
| | | */ |
| | | public void getKey(ByteBuffer b) |
| | | public void fetchKey(ByteBuffer b) |
| | | { |
| | | keyBuf.get(b.array(), 0, keyBuf.limit()); |
| | | b.limit(keyBuf.limit()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the index ID of the next record. |
| | | * |
| | |
| | | { |
| | | try |
| | | { |
| | | getNextRecord(); |
| | | fetchNextRecord(); |
| | | } |
| | | catch (IOException ex) |
| | | { |
| | | logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr |
| | | .getBufferFileName()); |
| | | logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr.getBufferFileName()); |
| | | throw new RuntimeException(ex); |
| | | } |
| | | } |
| | | return indexID; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Reads the next record from the buffer, skipping any remaining data in the |
| | | * current record. |
| | |
| | | * @throws IOException |
| | | * If an IO error occurred. |
| | | */ |
| | | public void getNextRecord() throws IOException |
| | | public void fetchNextRecord() throws IOException |
| | | { |
| | | switch (recordState) |
| | | { |
| | |
| | | recordState = RecordState.NEED_INSERT_ID_SET; |
| | | } |
| | | |
| | | |
| | | |
| | | private int getInt() throws IOException |
| | | { |
| | | ensureData(4); |
| | | return cache.getInt(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Reads the next ID set from the record and merges it with the provided ID |
| | | * set. |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private boolean ensureData(int len) throws IOException |
| | | { |
| | | boolean ret = false; |
| | | if (cache.remaining() == 0) |
| | | { |
| | | cache.clear(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | return true; |
| | | } |
| | | else if (cache.remaining() < len) |
| | | { |
| | | cache.compact(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | return true; |
| | | } |
| | | return ret; |
| | | return false; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Compares this buffer with the provided key and index ID. |
| | | * |
| | |
| | | */ |
| | | int compare(ByteBuffer cKey, Integer cIndexID) |
| | | { |
| | | if (keyBuf.limit() == 0) |
| | | { |
| | | getIndexID(); |
| | | } |
| | | final int rc = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), cKey.array(), cKey.limit()); |
| | | if (rc == 0) |
| | | ensureRecordFetched(); |
| | | final int cmp = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), cKey.array(), cKey.limit()); |
| | | if (cmp == 0) |
| | | { |
| | | return (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; |
| | | } |
| | |
| | | return 0; |
| | | } |
| | | |
| | | if (keyBuf.limit() == 0) |
| | | { |
| | | getIndexID(); |
| | | } |
| | | |
| | | if (o.keyBuf.limit() == 0) |
| | | { |
| | | o.getIndexID(); |
| | | } |
| | | ensureRecordFetched(); |
| | | o.ensureRecordFetched(); |
| | | |
| | | byte[] oKey = o.keyBuf.array(); |
| | | int oLen = o.keyBuf.limit(); |
| | | int returnCode = Importer.indexComparator.compare(keyBuf.array(), 0, |
| | | keyBuf.limit(), oKey, oLen); |
| | | if (returnCode == 0) |
| | | int cmp = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), oKey, oLen); |
| | | if (cmp == 0) |
| | | { |
| | | returnCode = indexID.intValue() - o.getIndexID().intValue(); |
| | | if (returnCode == 0) |
| | | cmp = indexID.intValue() - o.getIndexID().intValue(); |
| | | if (cmp == 0) |
| | | { |
| | | returnCode = id - o.id; |
| | | return bufferID - o.bufferID; |
| | | } |
| | | } |
| | | return returnCode; |
| | | return cmp; |
| | | } |
| | | |
| | | private void ensureRecordFetched() |
| | | { |
| | | if (keyBuf.limit() == 0) |
| | | { |
| | | try |
| | | { |
| | | fetchNextRecord(); |
| | | } |
| | | catch (IOException ex) |
| | | { |
| | | logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr.getBufferFileName()); |
| | | throw new RuntimeException(ex); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | * Used to break a tie (keys equal) when the buffers are being merged |
| | | * for writing to the index scratch file. |
| | | */ |
| | | private long id; |
| | | private long bufferID; |
| | | |
| | | /** OffSet where next key is written. */ |
| | | private int keyOffset; |
| | |
| | | /** |
| | | * Set the ID of a buffer to the specified value. |
| | | * |
| | | * @param id The value to set the ID to. |
| | | * @param bufferID The value to set the buffer ID to. |
| | | */ |
| | | public void setID(long id) |
| | | public void setBufferID(long bufferID) |
| | | { |
| | | this.id = id; |
| | | this.bufferID = bufferID; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return the ID of a buffer. |
| | | * |
| | |
| | | */ |
| | | private long getBufferID() |
| | | { |
| | | return this.id; |
| | | return this.bufferID; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Determines if a buffer is a poison buffer. A poison buffer is used to |
| | | * shutdown work queues when import/rebuild index phase one is completed. |
| | |
| | | * specified byte array in the buffer. It returns {@code false} otherwise. |
| | | * |
| | | * @param kBytes The byte array to check space against. |
| | | * @param id The id value to check space against. |
| | | * @param entryID The entryID value to check space against. |
| | | * @return {@code true} if there is space to write the byte array in a |
| | | * buffer, or {@code false} otherwise. |
| | | */ |
| | | public boolean isSpaceAvailable(ByteSequence kBytes, long id) { |
| | | return getRequiredSize(kBytes.length(), id) < bytesLeft; |
| | | public boolean isSpaceAvailable(ByteSequence kBytes, long entryID) { |
| | | return getRequiredSize(kBytes.length(), entryID) < bytesLeft; |
| | | } |
| | | |
| | | /** |
| | |
| | | // before it |
| | | recordOffset = addRecord(keyBytes, entryID.longValue(), indexID, insert); |
| | | // then write the returned record size |
| | | writeIntBytes(recordOffset, buffer, keyOffset); |
| | | keyOffset += INT_SIZE; |
| | | keyOffset += writeIntBytes(buffer, keyOffset, recordOffset); |
| | | bytesLeft = recordOffset - keyOffset; |
| | | keys++; |
| | | } |
| | |
| | | /** |
| | | * Writes the full record minus the record size itself. |
| | | */ |
| | | private int addRecord(ByteSequence key, long id, int indexID, boolean insert) |
| | | private int addRecord(ByteSequence key, long entryID, int indexID, boolean insert) |
| | | { |
| | | int retOffset = recordOffset - getRecordSize(key.length(), id); |
| | | int retOffset = recordOffset - getRecordSize(key.length(), entryID); |
| | | int offSet = retOffset; |
| | | |
| | | // write the INS/DEL bit |
| | | buffer[offSet++] = insert ? INS : DEL; |
| | | // write the indexID |
| | | writeIntBytes(indexID, buffer, offSet); |
| | | offSet += INT_SIZE; |
| | | offSet += writeIntBytes(buffer, offSet, indexID); |
| | | // write the entryID |
| | | offSet = PackedInteger.writeLong(buffer, offSet, id); |
| | | offSet = PackedInteger.writeLong(buffer, offSet, entryID); |
| | | // write the key length |
| | | offSet = PackedInteger.writeInt(buffer, offSet, key.length()); |
| | | // write the key bytes |
| | |
| | | * Computes the full size of the record. |
| | | * |
| | | * @param keyLen The length of the key of index |
| | | * @param id The entry id |
| | | * @param entryID The entry id |
| | | * @return The size that such record would take in the IndexOutputBuffer |
| | | */ |
| | | public static int getRequiredSize(int keyLen, long id) |
| | | public static int getRequiredSize(int keyLen, long entryID) |
| | | { |
| | | // Adds up the key length + key bytes + entryID + indexID + the INS/DEL bit |
| | | // and finally the space needed to store the record size |
| | | return getRecordSize(keyLen, id) + INT_SIZE; |
| | | return getRecordSize(keyLen, entryID) + INT_SIZE; |
| | | } |
| | | |
| | | private static int getRecordSize(int keyLen, long id) |
| | | private static int getRecordSize(int keyLen, long entryID) |
| | | { |
| | | // Adds up the key length + key bytes + ... |
| | | return PackedInteger.getWriteIntLength(keyLen) + keyLen + |
| | | // ... entryID + (indexID + INS/DEL bit). |
| | | PackedInteger.getWriteLongLength(id) + REC_OVERHEAD; |
| | | PackedInteger.getWriteLongLength(entryID) + REC_OVERHEAD; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Write record at specified index to the specified output stream. Used when |
| | | * when writing the index scratch files. |
| | | |
| | | * |
| | | * @param stream The stream to write the record at the index to. |
| | | * @param index The index of the record to write. |
| | | */ |
| | | public void writeID(ByteArrayOutputStream stream, int index) |
| | | { |
| | | int offSet = getIntegerValue(index * INT_SIZE); |
| | | int offSet = getOffset(index); |
| | | int len = PackedInteger.getReadLongLength(buffer, offSet + REC_OVERHEAD); |
| | | stream.write(buffer, offSet + REC_OVERHEAD, len); |
| | | } |
| | |
| | | */ |
| | | public boolean isInsertRecord(int index) |
| | | { |
| | | int recOffset = getIntegerValue(index * INT_SIZE); |
| | | return buffer[recOffset] != DEL; |
| | | int recOffset = getOffset(index); |
| | | return buffer[recOffset] == INS; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public int getKeySize() |
| | | { |
| | | int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(position) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | return PackedInteger.readInt(buffer, offSet); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return the key value part of a record indicated by the current buffer |
| | | * position. |
| | |
| | | private ByteBuffer getKeyBuf(int x) |
| | | { |
| | | keyBuffer.clear(); |
| | | int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(x) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | int keyLen = PackedInteger.readInt(buffer, offSet); |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | |
| | | */ |
| | | private byte[] getKey(int x) |
| | | { |
| | | int offSet = getIntegerValue(x * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(x) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | int keyLen = PackedInteger.readInt(buffer, offSet); |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | |
| | | return key; |
| | | } |
| | | |
| | | |
| | | private int getIndexID(int x) |
| | | private int getOffset(int position) |
| | | { |
| | | return getIntegerValue(getIntegerValue(x * INT_SIZE) + 1); |
| | | return getIntegerValue(position * INT_SIZE); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return index id associated with the current position's record. |
| | | * |
| | |
| | | */ |
| | | public int getIndexID() |
| | | { |
| | | return getIntegerValue(getIntegerValue(position * INT_SIZE) + 1); |
| | | return getIndexID(position); |
| | | } |
| | | |
| | | |
| | | private boolean is(int x, int y, CompareOp op) |
| | | private int getIndexID(int position) |
| | | { |
| | | int xoffSet = getIntegerValue(x * INT_SIZE); |
| | | int xIndexID = getIntegerValue(xoffSet + 1); |
| | | return getIndexIDFromOffset(getOffset(position)); |
| | | } |
| | | |
| | | private int getIndexIDFromOffset(int offset) |
| | | { |
| | | return getIntegerValue(offset + 1); |
| | | } |
| | | |
| | | private boolean is(CompareOp op, int x, int y) |
| | | { |
| | | int xoffSet = getOffset(x); |
| | | int xIndexID = getIndexIDFromOffset(xoffSet); |
| | | xoffSet += REC_OVERHEAD; |
| | | xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet); |
| | | int xKeyLen = PackedInteger.readInt(buffer, xoffSet); |
| | | int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet; |
| | | int yoffSet = getIntegerValue(y * INT_SIZE); |
| | | int yIndexID = getIntegerValue(yoffSet + 1); |
| | | int yoffSet = getOffset(y); |
| | | int yIndexID = getIndexIDFromOffset(yoffSet); |
| | | yoffSet += REC_OVERHEAD; |
| | | yoffSet += PackedInteger.getReadIntLength(buffer, yoffSet); |
| | | int yKeyLen = PackedInteger.readInt(buffer, yoffSet); |
| | | int yKey = PackedInteger.getReadIntLength(buffer, yoffSet) + yoffSet; |
| | | return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen, |
| | | xIndexID, yKey, yKeyLen, yIndexID), op); |
| | | int cmp = comparator.compare(buffer, xKey, xKeyLen, xIndexID, yKey, yKeyLen, yIndexID); |
| | | return evaluateReturnCode(cmp, op); |
| | | } |
| | | |
| | | |
| | | private boolean is(int x, byte[] yKey, CompareOp op, int yIndexID) |
| | | private boolean is(CompareOp op, int x, byte[] yKey, int yIndexID) |
| | | { |
| | | int xoffSet = getIntegerValue(x * INT_SIZE); |
| | | int xIndexID = getIntegerValue(xoffSet + 1); |
| | | int xoffSet = getOffset(x); |
| | | int xIndexID = getIndexIDFromOffset(xoffSet); |
| | | xoffSet += REC_OVERHEAD; |
| | | xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet); |
| | | int xKeyLen = PackedInteger.readInt(buffer, xoffSet); |
| | | int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet; |
| | | return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen, |
| | | xIndexID, yKey, yKey.length, yIndexID), op); |
| | | int cmp = comparator.compare(buffer, xKey, xKeyLen, xIndexID, yKey, yKey.length, yIndexID); |
| | | return evaluateReturnCode(cmp, op); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Compare the byte array at the current position with the specified one and |
| | | * using the specified index id. It will return {@code true} if the byte |
| | |
| | | */ |
| | | public boolean compare(byte[]b, int bIndexID) |
| | | { |
| | | int offset = getIntegerValue(position * INT_SIZE); |
| | | int indexID = getIntegerValue(offset + 1); |
| | | int offset = getOffset(position); |
| | | int indexID = getIndexIDFromOffset(offset); |
| | | offset += REC_OVERHEAD; |
| | | offset += PackedInteger.getReadIntLength(buffer, offset); |
| | | int keyLen = PackedInteger.readInt(buffer, offset); |
| | |
| | | public int compareTo(IndexOutputBuffer b) |
| | | { |
| | | final ByteBuffer keyBuf = b.getKeyBuf(b.position); |
| | | int offset = getIntegerValue(position * INT_SIZE); |
| | | int indexID = getIntegerValue(offset + 1); |
| | | int offset = getOffset(position); |
| | | int indexID = getIndexIDFromOffset(offset); |
| | | offset += REC_OVERHEAD; |
| | | offset += PackedInteger.getReadIntLength(buffer, offset); |
| | | int keyLen = PackedInteger.readInt(buffer, offset); |
| | |
| | | if (indexID == bIndexID) |
| | | { |
| | | // This is tested in a tree set remove when a buffer is removed from the tree set. |
| | | return compare(this.id, b.getBufferID()); |
| | | return compare(this.bufferID, b.getBufferID()); |
| | | } |
| | | else if (indexID < bIndexID) |
| | | { |
| | |
| | | */ |
| | | public void writeKey(DataOutputStream dataStream) throws IOException |
| | | { |
| | | int offSet = getIntegerValue(position * INT_SIZE) + REC_OVERHEAD; |
| | | int offSet = getOffset(position) + REC_OVERHEAD; |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | | int keyLen = PackedInteger.readInt(buffer, offSet); |
| | | offSet += PackedInteger.getReadIntLength(buffer, offSet); |
| | |
| | | */ |
| | | public boolean compare(int i) |
| | | { |
| | | return is(i, position, CompareOp.EQ); |
| | | return is(CompareOp.EQ, i, position); |
| | | } |
| | | |
| | | /** |
| | |
| | | position++; |
| | | } |
| | | |
| | | private void writeIntBytes(int val, byte[] b, int offset) |
| | | private int writeIntBytes(byte[] b, int offset, int val) |
| | | { |
| | | for (int i = offset + 3; i >= 0; i--) { |
| | | for (int i = offset + INT_SIZE -1; i >= offset; i--) { |
| | | b[i] = (byte) (val & 0xff); |
| | | val >>>= 8; |
| | | } |
| | | return INT_SIZE; |
| | | } |
| | | |
| | | |
| | | private int getIntegerValue(int index) |
| | | { |
| | | int answer = 0; |
| | |
| | | return answer; |
| | | } |
| | | |
| | | |
| | | private int med3(int a, int b, int c) |
| | | { |
| | | return is(a, b, CompareOp.LT) ? |
| | | (is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) : |
| | | (is(b,c,CompareOp.GT) ? b : is(a,c,CompareOp.GT) ? c : a); |
| | | return is(CompareOp.LT, a, b) ? |
| | | (is(CompareOp.LT,b,c) ? b : is(CompareOp.LT,a,c) ? c : a) : |
| | | (is(CompareOp.GT,b,c) ? b : is(CompareOp.GT,a,c) ? c : a); |
| | | } |
| | | |
| | | |
| | | private void sort(int off, int len) |
| | | { |
| | | if (len < 7) { |
| | | for (int i=off; i<len+off; i++) |
| | | { |
| | | for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--) |
| | | for (int j=i; j>off && is(CompareOp.GT, j-1, j); j--) |
| | | { |
| | | swap(j, j-1); |
| | | } |
| | |
| | | int a = off, b = a, c = off + len - 1, d = c; |
| | | while(true) |
| | | { |
| | | while (b <= c && is(b, mKey, CompareOp.LE, mIndexID)) |
| | | while (b <= c && is(CompareOp.LE, b, mKey, mIndexID)) |
| | | { |
| | | if (is(b, mKey, CompareOp.EQ, mIndexID)) |
| | | if (is(CompareOp.EQ, b, mKey, mIndexID)) |
| | | { |
| | | swap(a++, b); |
| | | } |
| | | b++; |
| | | } |
| | | while (c >= b && is(c, mKey, CompareOp.GE, mIndexID)) |
| | | while (c >= b && is(CompareOp.GE, c, mKey, mIndexID)) |
| | | { |
| | | if (is(c, mKey, CompareOp.EQ, mIndexID)) |
| | | if (is(CompareOp.EQ, c, mKey, mIndexID)) |
| | | { |
| | | swap(c, d--); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | private void swap(int a, int b) |
| | | { |
| | | int aOffset = a * INT_SIZE; |
| | | int bOffset = b * INT_SIZE; |
| | | int bVal = getIntegerValue(bOffset); |
| | | System.arraycopy(buffer, aOffset, buffer, bOffset, INT_SIZE); |
| | | writeIntBytes(bVal, buffer, aOffset); |
| | | writeIntBytes(buffer, aOffset, bVal); |
| | | } |
| | | |
| | | |
| | | private void vectorSwap(int a, int b, int n) |
| | | { |
| | | for (int i=0; i<n; i++, a++, b++) |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | private boolean evaluateReturnCode(int rc, CompareOp op) |
| | | { |
| | | switch(op) { |