| | |
| | | */ |
| | | package org.opends.server.backends.jeb; |
| | | |
| | | import static org.opends.messages.JebMessages.ERR_JEB_IMPORT_BUFFER_IO_ERROR; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import static org.opends.messages.JebMessages.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.nio.ByteBuffer; |
| | | import java.nio.channels.FileChannel; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.backends.jeb.Importer.IndexManager; |
| | | |
| | | import com.sleepycat.util.PackedInteger; |
| | |
| | | final class IndexInputBuffer implements Comparable<IndexInputBuffer> |
| | | { |
| | | |
| | | /** Possible states while reading a record. */ |
| | | private static enum RecordState |
| | | { |
| | | START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private final IndexManager indexMgr; |
| | | private final FileChannel channel; |
| | | private final long begin; |
| | | private final long end; |
| | | private final int id; |
| | | private final int bufferID; |
| | | |
| | | private long offset; |
| | | private final ByteBuffer cache; |
| | | |
| | | // Next fields are the fetched record data |
| | | private Integer indexID; |
| | | private ByteBuffer keyBuf = ByteBuffer.allocate(128); |
| | | |
| | | |
| | | |
| | | /** |
| | | * Possible states while reading a record. |
| | | */ |
| | | private enum RecordState |
| | | { |
| | | START, NEED_INSERT_ID_SET, NEED_DELETE_ID_SET |
| | | } |
| | | |
| | | |
| | | |
| | | private RecordState recordState = RecordState.START; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Creates a new index input buffer. |
| | | * |
| | |
| | | * The position of the start of the buffer in the scratch file. |
| | | * @param end |
| | | * The position of the end of the buffer in the scratch file. |
| | | * @param id |
| | | * The index ID. |
| | | * @param bufferID |
| | | * The buffer ID. |
| | | * @param cacheSize |
| | | * The cache size. |
| | | * @throws IOException |
| | | * If an IO error occurred when priming the cache. |
| | | */ |
| | | public IndexInputBuffer(IndexManager indexMgr, FileChannel channel, |
| | | long begin, long end, int id, int cacheSize) throws IOException |
| | | long begin, long end, int bufferID, int cacheSize) throws IOException |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | this.channel = channel; |
| | | this.begin = begin; |
| | | this.end = end; |
| | | this.offset = 0; |
| | | this.id = id; |
| | | this.bufferID = bufferID; |
| | | this.cache = ByteBuffer.allocate(Math.max(cacheSize - 384, 256)); |
| | | |
| | | loadCache(); |
| | |
| | | keyBuf.flip(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void loadCache() throws IOException |
| | | { |
| | | channel.position(begin + offset); |
| | |
| | | indexMgr.addBytesRead(bytesRead); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns {@code true} if this buffer has more data. |
| | | * |
| | |
| | | return cache.remaining() != 0 || hasMore; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the length of the next key. |
| | | * |
| | |
| | | return keyBuf.limit(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the next key. |
| | | * Fetches the next key into the provided byte buffer. |
| | | * |
| | | * @param b |
| | | * A buffer into which the key should be added. |
| | | * A buffer where to fetch the key |
| | | */ |
| | | public void getKey(ByteBuffer b) |
| | | public void fetchKey(ByteBuffer b) |
| | | { |
| | | keyBuf.get(b.array(), 0, keyBuf.limit()); |
| | | b.limit(keyBuf.limit()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the index ID of the next record. |
| | | * |
| | |
| | | { |
| | | try |
| | | { |
| | | getNextRecord(); |
| | | fetchNextRecord(); |
| | | } |
| | | catch (IOException ex) |
| | | { |
| | | logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr |
| | | .getBufferFileName()); |
| | | logger.error(ERR_JEB_IMPORT_BUFFER_IO_ERROR, indexMgr.getBufferFileName()); |
| | | throw new RuntimeException(ex); |
| | | } |
| | | } |
| | | return indexID; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Reads the next record from the buffer, skipping any remaining data in the |
| | | * current record. |
| | |
| | | * @throws IOException |
| | | * If an IO error occurred. |
| | | */ |
| | | public void getNextRecord() throws IOException |
| | | public void fetchNextRecord() throws IOException |
| | | { |
| | | switch (recordState) |
| | | { |
| | |
| | | recordState = RecordState.NEED_INSERT_ID_SET; |
| | | } |
| | | |
| | | |
| | | |
| | | private int getInt() throws IOException |
| | | { |
| | | ensureData(4); |
| | | return cache.getInt(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Reads the next ID set from the record and merges it with the provided ID |
| | | * set. |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private boolean ensureData(int len) throws IOException |
| | | { |
| | | boolean ret = false; |
| | | if (cache.remaining() == 0) |
| | | { |
| | | cache.clear(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | return true; |
| | | } |
| | | else if (cache.remaining() < len) |
| | | { |
| | | cache.compact(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | return true; |
| | | } |
| | | return ret; |
| | | return false; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Compares this buffer with the provided key and index ID. |
| | | * |
| | |
| | | */ |
| | | int compare(ByteBuffer cKey, Integer cIndexID) |
| | | { |
| | | int returnCode, rc; |
| | | if (keyBuf.limit() == 0) |
| | | ensureRecordFetched(); |
| | | int cmp = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), cKey.array(), cKey.limit()); |
| | | if (cmp == 0) |
| | | { |
| | | getIndexID(); |
| | | return (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; |
| | | } |
| | | rc = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), |
| | | cKey.array(), cKey.limit()); |
| | | if (rc != 0) |
| | | { |
| | | returnCode = 1; |
| | | } |
| | | else |
| | | { |
| | | returnCode = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; |
| | | } |
| | | return returnCode; |
| | | return 1; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int compareTo(IndexInputBuffer o) |
| | | { |
| | | // used in remove. |
| | |
| | | return 0; |
| | | } |
| | | |
| | | ensureRecordFetched(); |
| | | o.ensureRecordFetched(); |
| | | |
| | | byte[] oKey = o.keyBuf.array(); |
| | | int oLen = o.keyBuf.limit(); |
| | | int cmp = Importer.indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), oKey, oLen); |
| | | if (cmp == 0) |
| | | { |
| | | cmp = indexID.intValue() - o.getIndexID().intValue(); |
| | | if (cmp == 0) |
| | | { |
| | | return bufferID - o.bufferID; |
| | | } |
| | | } |
| | | return cmp; |
| | | } |
| | | |
| | | private void ensureRecordFetched() |
| | | { |
| | | if (keyBuf.limit() == 0) |
| | | { |
| | | getIndexID(); |
| | | } |
| | | |
| | | if (o.keyBuf.limit() == 0) |
| | | { |
| | | o.getIndexID(); |
| | | } |
| | | |
| | | byte[] oKey = o.keyBuf.array(); |
| | | int oLen = o.keyBuf.limit(); |
| | | int returnCode = Importer.indexComparator.compare(keyBuf.array(), 0, |
| | | keyBuf.limit(), oKey, oLen); |
| | | if (returnCode == 0) |
| | | { |
| | | returnCode = indexID.intValue() - o.getIndexID().intValue(); |
| | | if (returnCode == 0) |
| | | { |
| | | returnCode = id - o.id; |
| | | } |
| | | } |
| | | return returnCode; |
| | | } |
| | | } |