OPENDJ-1472 : File based changelog : optimize random seek in each log file
CR-3727
Implements read and write with blocks, to enable binary search.
* New BlockLogReader class: read records sequentially or using
binary search provided a key
* New BlockLogWriter class: write records by blocks
* New BlockLogReaderWriterTest class : tests read/write with blocks
* Update LogFile : delegate responsibility for read/write to BlockLogXXX
* Add method ByteStringBuilder#append(DataInput, int) to avoid
byte array copy
* Minor changes in other files
3 files added
8 files modified
| | |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \ |
| | | replica offline state file '%s' for domain %s |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \ |
| | | offline state file '%s' for domain %s and server id %d |
| | | offline state file '%s' for domain %s and server id %d |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH_282=Could not retrieve file length of \ |
| | | file '%s' |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.EOFException; |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * A log reader with binary search support. |
| | | * <p> |
| | | * The log file contains record offsets at fixed block size : given a block size N, |
| | | * an offset is written at every N bytes. The offset contains the number of bytes to |
| | | * reach the beginning of previous record (or next record if offset equals 0). |
| | | * <p> |
| | | * The reader provides both sequential access, using the {@code readRecord()} method, |
| | | * and reasonably fast random access, using the {@code seek(K, boolean)} method. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | class BlockLogReader<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | static final int SIZE_OF_BLOCK_OFFSET = 4; |
| | | |
| | | static final int SIZE_OF_RECORD_SIZE = 4; |
| | | |
| | | /** |
| | | * Size of a block, after which an offset to the nearest record is written. |
| | | * <p> |
| | | * This value has been fixed based on performance tests. See |
| | | * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472"> |
| | | * OPENDJ-1472</a> for details. |
| | | */ |
| | | static final int BLOCK_SIZE = 256; |
| | | |
| | | private final int blockSize; |
| | | |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | private final RandomAccessFile reader; |
| | | |
| | | private final File file; |
| | | |
| | | /** |
| | | * Creates a reader for the provided file, file reader and parser. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param file |
| | | * The log file to read. |
| | | * @param reader |
| | | * The random access reader on the log file. |
| | | * @param parser |
| | | * The parser to decode the records read. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader( |
| | | final File file, final RandomAccessFile reader, final RecordParser<K, V> parser) |
| | | { |
| | | return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE); |
| | | } |
| | | |
| | | /** |
| | | * Creates a reader for the provided file, file reader, parser and block size. |
| | | * <p> |
| | | * This method is intended for tests only, to allow tuning of the block size. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param file |
| | | * The log file to read. |
| | | * @param reader |
| | | * The random access reader on the log file. |
| | | * @param parser |
| | | * The parser to decode the records read. |
| | | * @param blockSize |
| | | * The size of each block, or frequency at which the record offset is |
| | | * present in the log file. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests( |
| | | final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize) |
| | | { |
| | | return new BlockLogReader<K, V>(file, reader, parser, blockSize); |
| | | } |
| | | |
| | | private BlockLogReader( |
| | | final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize) |
| | | { |
| | | this.file = file; |
| | | this.reader = reader; |
| | | this.parser = parser; |
| | | this.blockSize = blockSize; |
| | | } |
| | | |
| | | /** |
| | | * Position the reader to the record corresponding to the provided key or to |
| | | * the nearest key (the lowest key higher than the provided key), and returns |
| | | * the last record read. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position. Key must not be {@code null}. |
| | | * @param findNextRecord |
| | | * If {@code true}, start position is the lowest key that is higher |
| | | * than the provided key, otherwise start position is the provided |
| | | * key. |
| | | * @return The pair (key_found, last_record_read). key_found is a boolean |
| | | * indicating if reader is successfully positioned to the key or the |
| | | * nearest key. last_record_read is the last record that was read. |
| | | * When key_found is equals to {@code false}, then last_record_read is |
| | | * always {@code null}. When key_found is equals to {@code true}, |
| | | * last_record_read can be valued or be {@code null} |
| | | * @throws ChangelogException |
| | | * If an error occurs when seeking the key. |
| | | */ |
| | | public Pair<Boolean, Record<K,V>> seekToRecord(final K key, final boolean findNextRecord) throws ChangelogException |
| | | { |
| | | final long markerPosition = searchClosestBlockStartToKey(key); |
| | | if (markerPosition >= 0) |
| | | { |
| | | return positionToKeySequentially(markerPosition, key, findNextRecord); |
| | | } |
| | | return Pair.of(false, null); |
| | | } |
| | | |
| | | /** |
| | | * Position the reader to the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * offset from the beginning of the file, in bytes. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | public void seekToPosition(final long filePosition) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | reader.seek(filePosition); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Read a record from current position. |
| | | * |
| | | * @return the record read |
| | | * @throws ChangelogException |
| | | * If an error occurs during read. |
| | | */ |
| | | public Record<K,V> readRecord() throws ChangelogException |
| | | { |
| | | return readRecord(-1); |
| | | } |
| | | |
| | | /** |
| | | * Returns the file position for this reader. |
| | | * |
| | | * @return the position of reader on the log file |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | public long getFilePosition() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return reader.getFilePointer(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() throws IOException |
| | | { |
| | | reader.close(); |
| | | } |
| | | |
| | | /** |
| | | * Read a record, either from the provided start of block position or from |
| | | * the current position. |
| | | * |
| | | * @param blockStartPosition |
| | | * The position of start of block, where a record offset is written. |
| | | * If provided value is -1, then record is read from current position instead. |
| | | * @return the record read |
| | | * @throws ChangelogException |
| | | * If an error occurs during read. |
| | | */ |
| | | private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final ByteString recordData = blockStartPosition == -1 ? |
| | | readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition); |
| | | return recordData != null ? parser.decodeRecord(recordData) : null; |
| | | } |
| | | catch (IOException io) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io); |
| | | } |
| | | catch (DecodingException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the first record found after the provided file position of block |
| | | * start. |
| | | * |
| | | * @param blockStartPosition |
| | | * Position of read pointer in the file, expected to be the start of |
| | | * a block where a record offset is written. |
| | | * @return the record read |
| | | * @throws IOException |
| | | * If an error occurs during read. |
| | | */ |
| | | private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException |
| | | { |
| | | reader.seek(blockStartPosition); |
| | | if (blockStartPosition > 0) |
| | | { |
| | | final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET]; |
| | | reader.readFully(offsetData); |
| | | final int offsetToRecord = ByteString.wrap(offsetData).toInt(); |
| | | if (offsetToRecord > 0) |
| | | { |
| | | reader.seek(blockStartPosition - offsetToRecord); |
| | | } // if offset is zero, reader is already well positioned |
| | | } |
| | | return readNextRecord(); |
| | | } |
| | | |
| | | /** |
| | | * Reads the next record. |
| | | * |
| | | * @return the bytes of the next record, or {@code null} if no record is available |
| | | * @throws IOException |
| | | * If an error occurs while reading. |
| | | */ |
| | | private ByteString readNextRecord() throws IOException |
| | | { |
| | | try |
| | | { |
| | | // read length of record |
| | | final long filePosition = reader.getFilePointer(); |
| | | int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize); |
| | | final int recordLength = readRecordLength(distanceToBlockStart); |
| | | |
| | | // read the record |
| | | long currentPosition = reader.getFilePointer(); |
| | | distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize); |
| | | final ByteStringBuilder recordBytes = |
| | | new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart)); |
| | | int remainingBytesToRead = recordLength; |
| | | while (distanceToBlockStart < remainingBytesToRead) |
| | | { |
| | | if (distanceToBlockStart != 0) |
| | | { |
| | | recordBytes.append(reader, distanceToBlockStart); |
| | | } |
| | | // skip the offset |
| | | reader.skipBytes(SIZE_OF_BLOCK_OFFSET); |
| | | |
| | | // next step |
| | | currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET; |
| | | remainingBytesToRead -= distanceToBlockStart; |
| | | distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET; |
| | | } |
| | | if (remainingBytesToRead > 0) |
| | | { |
| | | // last bytes of the record |
| | | recordBytes.append(reader, remainingBytesToRead); |
| | | } |
| | | return recordBytes.toByteString(); |
| | | } |
| | | catch(EOFException e) |
| | | { |
| | | // end of stream, no record or uncomplete record |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the total length in bytes taken by a record when stored in log file, |
| | | * including size taken by block offsets. |
| | | * |
| | | * @param recordLength |
| | | * The length of record to write. |
| | | * @param distanceToBlockStart |
| | | * Distance before the next block start. |
| | | * @return the length in bytes necessary to store the record in the log |
| | | */ |
| | | int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart) |
| | | { |
| | | int totalLength = recordLength; |
| | | if (recordLength > distanceToBlockStart) |
| | | { |
| | | totalLength += SIZE_OF_BLOCK_OFFSET; |
| | | final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET); |
| | | totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET; |
| | | } |
| | | return totalLength; |
| | | } |
| | | |
| | | /** Read the length of a record. */ |
| | | private int readRecordLength(final int distanceToBlockStart) throws IOException |
| | | { |
| | | final ByteStringBuilder lengthBytes = new ByteStringBuilder(); |
| | | if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE) |
| | | { |
| | | lengthBytes.append(reader, distanceToBlockStart); |
| | | // skip the offset |
| | | reader.skipBytes(SIZE_OF_BLOCK_OFFSET); |
| | | lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart); |
| | | return lengthBytes.toByteString().toInt(); |
| | | } |
| | | else |
| | | { |
| | | if (distanceToBlockStart == 0) |
| | | { |
| | | // skip the offset |
| | | reader.skipBytes(SIZE_OF_BLOCK_OFFSET); |
| | | } |
| | | lengthBytes.append(reader, SIZE_OF_RECORD_SIZE); |
| | | return lengthBytes.toByteString().toInt(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Search the closest block start to the provided key, using binary search. |
| | | * <p> |
| | | * Note that position of reader is modified by this method. |
| | | * |
| | | * @param key |
| | | * The key to search |
| | | * @return the file position of block start that must be used to find the given key, |
| | | * or a negative number if no position could be found. |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | long searchClosestBlockStartToKey(K key) throws ChangelogException |
| | | { |
| | | final long maxPos = getLastPositionInFile(); |
| | | long lowPos = 0L; |
| | | long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos); |
| | | |
| | | while (lowPos <= highPos) |
| | | { |
| | | final long middlePos = Math.min((lowPos + highPos) / 2, maxPos); |
| | | final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos); |
| | | final Record<K, V> middleRecord = readRecord(middleBlockStartPos); |
| | | if (middleRecord == null) |
| | | { |
| | | return -1; |
| | | } |
| | | |
| | | final int keyComparison = middleRecord.getKey().compareTo(key); |
| | | if (keyComparison < 0) |
| | | { |
| | | if (middleBlockStartPos <= lowPos) |
| | | { |
| | | return lowPos; |
| | | } |
| | | lowPos = middleBlockStartPos; |
| | | } |
| | | else if (keyComparison > 0) |
| | | { |
| | | if (middleBlockStartPos >= highPos) |
| | | { |
| | | return highPos; |
| | | } |
| | | highPos = middleBlockStartPos; |
| | | } |
| | | else |
| | | { |
| | | return middleBlockStartPos; |
| | | } |
| | | } |
| | | // Unable to find a position where key can be found |
| | | return -1; |
| | | } |
| | | |
| | | private long getLastPositionInFile() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return reader.length() - 1; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Position to provided key, starting from provided block start position and |
| | | * reading sequentially until key is found. |
| | | * |
| | | * @param blockStartPosition |
| | | * Position of read pointer in the file, expected to be the start of |
| | | * a block where a record offset is written. |
| | | * @param key |
| | | * The key to find. |
| | | * @param findNextRecord |
| | | * If {@code true}, position at the end of this method is the lowest |
| | | * key that is higher than the provided key, otherwise position is |
| | | * the provided key. |
| | | * @return The pair ({@code true}, last record read) if reader is successfully |
| | | * positioned to the key or the nearest key (last record may be null |
| | | * if end of file is reached), ({@code false}, null) otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | Pair<Boolean, Record<K,V>> positionToKeySequentially( |
| | | final long blockStartPosition, |
| | | final K key, |
| | | final boolean findNextRecord) |
| | | throws ChangelogException { |
| | | Record<K,V> record = readRecord(blockStartPosition); |
| | | do { |
| | | if (record != null) |
| | | { |
| | | final int keysComparison = record.getKey().compareTo(key); |
| | | final boolean matches = findNextRecord ? keysComparison >= 0 : record.getKey().equals(key); |
| | | if (matches) |
| | | { |
| | | if (findNextRecord && keysComparison == 0) |
| | | { |
| | | // skip key in order to position on lowest higher key |
| | | record = readRecord(); |
| | | } |
| | | return Pair.of(true, record); |
| | | } |
| | | } |
| | | record = readRecord(); |
| | | } |
| | | while (record != null); |
| | | return Pair.of(false, null); |
| | | } |
| | | |
| | | /** |
| | | * Returns the closest start of block which has a position lower than or equal |
| | | * to the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * The position of reader on file. |
| | | * @return the file position of the block start. |
| | | */ |
| | | long getClosestBlockStartBeforeOrAtPosition(final long filePosition) |
| | | { |
| | | final int dist = getDistanceToNextBlockStart(filePosition, blockSize); |
| | | return dist == 0 ? filePosition : filePosition + dist - blockSize; |
| | | } |
| | | |
| | | /** |
| | | * Returns the closest start of block which has a position strictly |
| | | * higher than the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * The position of reader on file. |
| | | * @return the file position of the block start. |
| | | */ |
| | | long getClosestBlockStartStrictlyAfterPosition(final long filePosition) |
| | | { |
| | | final int dist = getDistanceToNextBlockStart(filePosition, blockSize); |
| | | return dist == 0 ? filePosition + blockSize : filePosition + dist; |
| | | } |
| | | |
| | | /** |
| | | * Returns the distance to next block for the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * offset from the beginning of the file, in bytes. |
| | | * @param blockSize |
| | | * Size of each block in bytes. |
| | | * @return the distance to next block in bytes |
| | | */ |
| | | static int getDistanceToNextBlockStart(final long filePosition, final int blockSize) |
| | | { |
| | | if (filePosition == 0) |
| | | { |
| | | return blockSize; |
| | | } |
| | | final int distance = (int) (filePosition % blockSize); |
| | | return distance == 0 ? 0 : blockSize - distance; |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.file.BlockLogReader.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.IOException; |
| | | import java.io.SyncFailedException; |
| | | |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | |
| | | /** |
| | | * A log writer, using fixed-size blocks to allow fast retrieval when reading. |
| | | * <p> |
| | | * The log file contains record offsets at fixed block size : given block size N, |
| | | * an offset is written at every N bytes. The offset contains the number of bytes to |
| | | * reach the beginning of previous record (or next record if offset equals 0). |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | class BlockLogWriter<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | private final int blockSize; |
| | | |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | private final LogWriter writer; |
| | | |
| | | /** |
| | | * Creates a writer for the provided log writer and parser. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param writer |
| | | * The writer on the log file. |
| | | * @param parser |
| | | * The parser to encode the records. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter( |
| | | final LogWriter writer, final RecordParser<K, V> parser) |
| | | { |
| | | return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE); |
| | | } |
| | | |
| | | /** |
| | | * Creates a writer for the provided log writer, parser and size for blocks. |
| | | * <p> |
| | | * This method is intended for tests only, to allow tuning of the block size. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param writer |
| | | * The writer on the log file. |
| | | * @param parser |
| | | * The parser to encode the records. |
| | | * @param blockSize |
| | | * The size of each block, or frequency at which the record offset is |
| | | * present in the log file. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests( |
| | | final LogWriter writer, final RecordParser<K, V> parser, final int blockSize) |
| | | { |
| | | return new BlockLogWriter<K, V>(writer, parser, blockSize); |
| | | } |
| | | |
| | | /** |
| | | * Creates the writer with an underlying writer, a parser and a size for blocks. |
| | | * |
| | | * @param writer |
| | | * The writer to the log file. |
| | | * @param parser |
| | | * The parser to encode the records. |
| | | * @param blockSize |
| | | * The size of each block. |
| | | */ |
| | | private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize) |
| | | { |
| | | Reject.ifNull(writer, parser); |
| | | this.writer = writer; |
| | | this.parser = parser; |
| | | this.blockSize = blockSize; |
| | | } |
| | | |
| | | /** |
| | | * Writes the provided record to the log file. |
| | | * |
| | | * @param record |
| | | * The record to write. |
| | | * @throws ChangelogException |
| | | * If a problem occurs during write. |
| | | */ |
| | | public void write(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | write(parser.encodeRecord(record)); |
| | | writer.flush(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), |
| | | writer.getFile().getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of bytes written in the log file. |
| | | * |
| | | * @return the number of bytes |
| | | */ |
| | | public long getBytesWritten() |
| | | { |
| | | return writer.getBytesWritten(); |
| | | } |
| | | |
| | | /** |
| | | * Synchronize all modifications to the log file to the underlying device. |
| | | * |
| | | * @throws SyncFailedException |
| | | * If synchronization fails. |
| | | */ |
| | | public void sync() throws SyncFailedException |
| | | { |
| | | writer.sync(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | writer.close(); |
| | | } |
| | | |
| | | /** |
| | | * Writes the provided byte string to the log file. |
| | | * |
| | | * @param record |
| | | * The value to write. |
| | | * @throws IOException |
| | | * If an error occurs while writing |
| | | */ |
| | | private void write(final ByteString record) throws IOException |
| | | { |
| | | // Add length of record before writing |
| | | ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()). |
| | | append(record.length()). |
| | | append(record). |
| | | toByteString(); |
| | | |
| | | int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize); |
| | | int cumulatedDistanceToBeginning = distanceToBlockStart; |
| | | int dataPosition = 0; |
| | | int dataRemaining = data.length(); |
| | | final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET; |
| | | |
| | | while (distanceToBlockStart < dataRemaining) |
| | | { |
| | | if (distanceToBlockStart > 0) |
| | | { |
| | | // append part of record |
| | | final int dataEndPosition = dataPosition + distanceToBlockStart; |
| | | writer.write(data.subSequence(dataPosition, dataEndPosition)); |
| | | dataPosition = dataEndPosition; |
| | | dataRemaining -= distanceToBlockStart; |
| | | } |
| | | // append the offset to the record |
| | | writer.write(ByteString.valueOf(cumulatedDistanceToBeginning)); |
| | | |
| | | // next step |
| | | distanceToBlockStart = dataSizeForOneBlock; |
| | | cumulatedDistanceToBeginning += blockSize; |
| | | } |
| | | // append the remaining bytes to finish the record |
| | | writer.write(data.subSequence(dataPosition, data.length())); |
| | | } |
| | | |
| | | } |
| | |
| | | /** |
| | | * The last key appended to the log. In order to keep the ordering of the keys |
| | | * in the log, any attempt to append a record with a key lower or equal to |
| | | * this is rejected (no error but an event is logged). |
| | | * this key is rejected (no error but an event is logged). |
| | | */ |
| | | private K lastAppendedKey; |
| | | |
| | |
| | | |
| | | import java.io.BufferedWriter; |
| | | import java.io.Closeable; |
| | | import java.io.EOFException; |
| | | import java.io.File; |
| | | import java.io.FileWriter; |
| | | import java.io.IOException; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * A log file, containing part of a {@code Log}. The log file may be: |
| | | * <ul> |
| | |
| | | /** The file containing the records. */ |
| | | private final File logfile; |
| | | |
| | | /** The parser of records, to convert bytes to record and record to bytes. */ |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | /** The pool to obtain a reader on the log. */ |
| | | private LogReaderPool readerPool; |
| | | private final LogReaderPool<K, V> readerPool; |
| | | |
| | | /** |
| | | * The writer on the log file, which may be {@code null} if log file is not |
| | | * write-enabled |
| | | * write-enabled. |
| | | */ |
| | | private LogWriter writer; |
| | | private final BlockLogWriter<K, V> writer; |
| | | |
| | | /** Indicates if log is enabled for write. */ |
| | | private final boolean isWriteEnabled; |
| | |
| | | { |
| | | Reject.ifNull(logFilePath, parser); |
| | | this.logfile = logFilePath; |
| | | this.parser = parser; |
| | | this.isWriteEnabled = isWriteEnabled; |
| | | |
| | | initialize(); |
| | | createLogFileIfNotExists(); |
| | | writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null; |
| | | readerPool = new LogReaderPool<K, V>(logfile, parser); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Initialize this log. |
| | | * <p> |
| | | * Create directories and file if necessary, and create a writer |
| | | * and pool of readers. |
| | | * |
| | | * @throws ChangelogException |
| | | * If an errors occurs during initialization. |
| | | */ |
| | | private void initialize() throws ChangelogException |
| | | { |
| | | createLogFileIfNotExists(); |
| | | if (isWriteEnabled) |
| | | { |
| | | writer = new LogWriter(logfile); |
| | | } |
| | | readerPool = new LogReaderPool(logfile); |
| | | } |
| | | |
| | | /** |
| | | * Returns the file containing the records. |
| | | * |
| | | * @return the file |
| | |
| | | void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | checkLogIsEnabledForWrite(); |
| | | try |
| | | { |
| | | writer.write(encodeRecord(record)); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e); |
| | | } |
| | | } |
| | | |
| | | private ByteString encodeRecord(final Record<K, V> record) |
| | | { |
| | | final ByteString data = parser.encodeRecord(record); |
| | | return new ByteStringBuilder() |
| | | .append(data.length()) |
| | | .append(data) |
| | | .toByteString(); |
| | | writer.write(record); |
| | | } |
| | | |
| | | /** |
| | |
| | | return logfile.getPath(); |
| | | } |
| | | |
| | | /** Read a record from the provided reader. */ |
| | | private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final ByteString recordData = readEncodedRecord(reader); |
| | | return recordData != null ? parser.decodeRecord(recordData) : null; |
| | | } |
| | | catch(DecodingException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final byte[] lengthData = new byte[4]; |
| | | reader.readFully(lengthData); |
| | | int recordLength = ByteString.wrap(lengthData).toInt(); |
| | | |
| | | final byte[] recordData = new byte[recordLength]; |
| | | reader.readFully(recordData); |
| | | return ByteString.wrap(recordData); |
| | | } |
| | | catch(EOFException e) |
| | | { |
| | | // end of stream, no record or uncomplete record |
| | | return null; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** Seek to given position on the provided reader. */ |
| | | private void seek(RandomAccessFile reader, long position) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | reader.seek(position); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a random access file to read this log. |
| | | * Returns a reader for this log. |
| | | * <p> |
| | | * Assumes that calling methods ensure that log is not closed. |
| | | */ |
| | | private RandomAccessFile getReader() throws ChangelogException |
| | | private BlockLogReader<K, V> getReader() throws ChangelogException |
| | | { |
| | | return readerPool.get(); |
| | | } |
| | | |
| | | /** Release the provided reader. */ |
| | | private void releaseReader(RandomAccessFile reader) { |
| | | private void releaseReader(BlockLogReader<K, V> reader) { |
| | | readerPool.release(reader); |
| | | } |
| | | |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | |
| | | private final LogFile<K, V> logFile; |
| | | |
| | | /** To read the records. */ |
| | | private final RandomAccessFile reader; |
| | | private final BlockLogReader<K, V> reader; |
| | | |
| | | /** The current available record, may be {@code null}. */ |
| | | private Record<K,V> currentRecord; |
| | |
| | | this.logFile = logFile; |
| | | this.reader = logFile.getReader(); |
| | | this.currentRecord = record; |
| | | logFile.seek(reader, filePosition); |
| | | reader.seekToPosition(filePosition); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | currentRecord = logFile.readRecord(reader); |
| | | currentRecord = reader.readRecord(); |
| | | return currentRecord != null; |
| | | } |
| | | |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(final K key, boolean findNearest) throws ChangelogException { |
| | | do |
| | | { |
| | | if (currentRecord != null) |
| | | { |
| | | final boolean matches = findNearest ? |
| | | currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key); |
| | | if (matches) |
| | | { |
| | | if (findNearest && currentRecord.getKey().equals(key)) |
| | | { |
| | | // skip key in order to position on lowest higher key |
| | | next(); |
| | | } |
| | | return true; |
| | | } |
| | | } |
| | | next(); |
| | | } |
| | | while (currentRecord != null); |
| | | return false; |
| | | final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest); |
| | | final boolean found = result.getFirst(); |
| | | currentRecord = found ? result.getSecond() : null; |
| | | return found; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | */ |
| | | long getFilePosition() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return reader.getFilePointer(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e); |
| | | } |
| | | return reader.getFilePosition(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | |
| | | /** |
| | | * A Pool of readers to a log file. |
| | | |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | // TODO : implement a real pool - reusing readers instead of opening-closing them each time |
| | | class LogReaderPool |
| | | class LogReaderPool<K extends Comparable<K>, V> |
| | | { |
| | | /** The file to read. */ |
| | | private final File file; |
| | | |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | /** |
| | | * Creates a pool of readers for provided file. |
| | | * |
| | | * @param file |
| | | * The file to read. |
| | | * The file to read. |
| | | * @param parser |
| | | * The parser to decode the records read. |
| | | */ |
| | | LogReaderPool(File file) |
| | | LogReaderPool(File file, RecordParser<K, V> parser) |
| | | { |
| | | this.file = file; |
| | | this.parser = parser; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws ChangelogException |
| | | * If the file can't be found or read. |
| | | */ |
| | | RandomAccessFile get() throws ChangelogException |
| | | BlockLogReader<K, V> get() throws ChangelogException |
| | | { |
| | | return getRandomAccess(file); |
| | | return getReader(file); |
| | | } |
| | | |
| | | /** |
| | |
| | | * The random access reader to a file previously acquired with this |
| | | * pool. |
| | | */ |
| | | void release(RandomAccessFile reader) |
| | | void release(BlockLogReader<K, V> reader) |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | |
| | | /** Returns a random access file to read this log. */ |
| | | private RandomAccessFile getRandomAccess(File file) throws ChangelogException |
| | | private BlockLogReader<K, V> getReader(File file) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return new RandomAccessFile(file, "r"); |
| | | return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the file used by this writer. |
| | | * |
| | | * @return the file |
| | | */ |
| | | File getFile() |
| | | { |
| | | return file; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void write(int b) throws IOException |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions copyright 2012 ForgeRock AS. |
| | | * Portions copyright 2012-2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.types; |
| | | |
| | |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import java.io.DataInput; |
| | | import java.io.EOFException; |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | |
| | | return bytesRead; |
| | | } |
| | | |
| | | /** |
| | | * Appends the provided {@code DataInput} to this byte string |
| | | * builder. |
| | | * |
| | | * @param stream |
| | | * The data input stream to be appended to this byte string |
| | | * builder. |
| | | * @param length |
| | | * The maximum number of bytes to be appended from {@code |
| | | * input}. |
| | | * @throws IndexOutOfBoundsException |
| | | * If {@code length} is less than zero. |
| | | * @throws EOFException |
| | | * If this stream reaches the end before reading all the bytes. |
| | | * @throws IOException |
| | | * If an I/O error occurs. |
| | | */ |
| | | public void append(DataInput stream, int length) throws IndexOutOfBoundsException, EOFException, IOException |
| | | { |
| | | if (length < 0) |
| | | { |
| | | throw new IndexOutOfBoundsException(); |
| | | } |
| | | |
| | | ensureAdditionalCapacity(length); |
| | | stream.readFully(buffer, this.length, length); |
| | | this.length += length; |
| | | } |
| | | |
| | | /** |
| | | * Appends the provided {@code ReadableByteChannel} to this byte string |
| | |
| | | |
| | | |
| | | /** |
| | | * Attempts to delete the specified file or directory. If it is a directory, |
| | | * Attempts to delete the specified file or directory. If it is a directory, |
| | | * then any files or subdirectories that it contains will be recursively |
| | | * deleted as well. |
| | | * |
| | | * @param file The file or directory to be removed. |
| | | * |
| | | * @return <CODE>true</CODE> if the specified file and any subordinates are |
| | | * all successfully removed, or <CODE>false</CODE> if at least one |
| | | * element in the subtree could not be removed. |
| | | * @param file |
| | | * The file or directory to be removed. |
| | | * @return {@code true} if the specified file and any subordinates are all |
| | | * successfully removed, or {@code false} if at least one element in |
| | | * the subtree could not be removed or file does not exists. |
| | | */ |
| | | public static boolean recursiveDelete(File file) |
| | | { |
| | | boolean successful = true; |
| | | if (file.isDirectory()) |
| | | if (file.exists()) |
| | | { |
| | | File[] childList = file.listFiles(); |
| | | if (childList != null) |
| | | boolean successful = true; |
| | | if (file.isDirectory()) |
| | | { |
| | | for (File f : childList) |
| | | File[] childList = file.listFiles(); |
| | | if (childList != null) |
| | | { |
| | | successful &= recursiveDelete(f); |
| | | for (File f : childList) |
| | | { |
| | | successful &= recursiveDelete(f); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return (successful & file.delete()); |
| | | return (successful & file.delete()); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.BlockLogReader.*; |
| | | |
| | | import java.io.File; |
| | | import java.io.FileNotFoundException; |
| | | import java.io.RandomAccessFile; |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class BlockLogReaderWriterTest extends DirectoryServerTestCase |
| | | { |
| | | private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | private static final File TEST_FILE = new File(TEST_DIRECTORY, "file"); |
| | | |
| | | private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser(); |
| | | |
| | | private static final int INT_RECORD_SIZE = 12; |
| | | |
| | | @BeforeClass |
| | | void createTestDirectory() |
| | | { |
| | | TEST_DIRECTORY.mkdirs(); |
| | | } |
| | | |
| | | @BeforeMethod |
| | | void ensureTestFileIsEmpty() throws Exception |
| | | { |
| | | StaticUtils.recursiveDelete(TEST_FILE); |
| | | } |
| | | |
| | | @AfterClass |
| | | void cleanTestDirectory() |
| | | { |
| | | StaticUtils.recursiveDelete(TEST_DIRECTORY); |
| | | } |
| | | |
| | | @DataProvider(name = "recordsData") |
| | | Object[][] recordsData() |
| | | { |
| | | return new Object[][] |
| | | { |
| | | // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes |
| | | |
| | | // size of block, expected size of file after all records are written, records |
| | | { 12, 12, records(1) }, // zero block marker |
| | | { 10, 16, records(1) }, // one block marker |
| | | { 8, 16, records(1) }, // one block marker |
| | | { 7, 20, records(1) }, // two block markers |
| | | { 6, 24, records(1) }, // three block markers |
| | | { 5, 40, records(1) }, // seven block markers |
| | | |
| | | { 16, 28, records(1,2) }, // one block marker |
| | | { 12, 32, records(1,2) }, // two block markers |
| | | { 10, 36, records(1,2) }, // three block markers |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Tests that records can be written then read correctly for different block sizes. |
| | | */ |
| | | @Test(dataProvider="recordsData") |
| | | public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records) |
| | | throws Exception |
| | | { |
| | | writeRecords(blockSize, records); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | for (int i = 0; i < records.size(); i++) |
| | | { |
| | | Record<Integer, Integer> record = reader.readRecord(); |
| | | assertThat(record).isEqualTo(records.get(i)); |
| | | } |
| | | assertThat(reader.readRecord()).isNull(); |
| | | assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "recordsForSeek") |
| | | Object[][] recordsForSeek() |
| | | { |
| | | Object[][] data = new Object[][] { |
| | | // records, key, findNearest, expectedRecord, expectedFound |
| | | |
| | | // no record |
| | | { records(), 1, false, null, false }, |
| | | |
| | | // 1 record exact find |
| | | { records(1), 1, false, record(1), true }, |
| | | |
| | | // 1 record nearest find |
| | | { records(1), 1, true, null, true }, |
| | | |
| | | // 2 records |
| | | { records(1,2), 1, false, record(1), true }, |
| | | { records(1,2), 2, false, record(2), true }, |
| | | { records(1,2), 1, true, record(2), true }, |
| | | { records(1,2), 2, true, null, true }, |
| | | |
| | | // 3 records exact find |
| | | { records(1,2,3), 0, false, null, false }, |
| | | { records(1,2,3), 1, false, record(1), true }, |
| | | { records(1,2,3), 2, false, record(2), true }, |
| | | { records(1,2,3), 3, false, record(3), true }, |
| | | { records(1,2,3), 4, false, null, false }, |
| | | |
| | | // 3 records nearest find |
| | | { records(1,2,3), 0, true, record(1), true }, |
| | | { records(1,2,3), 1, true, record(2), true }, |
| | | { records(1,2,3), 2, true, record(3), true }, |
| | | { records(1,2,3), 3, true, null, true }, |
| | | { records(1,2,3), 4, true, null, false }, |
| | | |
| | | // 10 records exact find |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, false, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, false, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, false, record(5), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, false, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, false, null, false }, |
| | | |
| | | // 10 records nearest find |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, true, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, true, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, true, record(6), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, true, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, true, null, false }, |
| | | }; |
| | | |
| | | // For each test case, do a test with various block sizes to ensure algorithm is not broken |
| | | // on a given size |
| | | int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 }; |
| | | Object[][] finalData = new Object[sizes.length*data.length][6]; |
| | | for (int i = 0; i < data.length; i++) |
| | | { |
| | | for (int j = 0; j < sizes.length; j++) |
| | | { |
| | | Object[] a = data[i]; |
| | | // add the block size at beginning of each test case |
| | | finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4]}; |
| | | } |
| | | } |
| | | return finalData; |
| | | } |
| | | |
| | | @Test(dataProvider="recordsForSeek") |
| | | public void testSeek(int blockSize, List<Record<Integer, Integer>> records, int key, boolean findNearest, |
| | | Record<Integer, Integer> expectedRecord, boolean expectedFound) throws Exception |
| | | { |
| | | writeRecords(blockSize, records); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, findNearest); |
| | | |
| | | assertThat(result.getFirst()).isEqualTo(expectedFound); |
| | | assertThat(result.getSecond()).isEqualTo(expectedRecord); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetClosestMarkerBeforeOrAtPosition() throws Exception |
| | | { |
| | | final int blockSize = 10; |
| | | BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize); |
| | | |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20); |
| | | } |
| | | |
| | | @Test |
| | | public void testGetClosestMarkerStrictlyAfterPosition() throws Exception |
| | | { |
| | | final int blockSize = 10; |
| | | BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize); |
| | | |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30); |
| | | } |
| | | |
| | | @Test |
| | | public void testSearchClosestMarkerToKey() throws Exception |
| | | { |
| | | int blockSize = 20; |
| | | writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | |
| | | assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0); |
| | | assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0); |
| | | assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20); |
| | | assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20); |
| | | assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40); |
| | | assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60); |
| | | assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80); |
| | | assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80); |
| | | assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100); |
| | | assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120); |
| | | assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140); |
| | | assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260); |
| | | assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280); |
| | | // out of reach keys |
| | | assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280); |
| | | assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testLengthOfStoredRecord() throws Exception |
| | | { |
| | | final int blockSize = 100; |
| | | BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize); |
| | | |
| | | int recordLength = 10; |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | |
| | | recordLength = 150; |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | |
| | | recordLength = 200; |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET); |
| | | } |
| | | |
| | | /** |
| | | * This test is intended to be run only manually to check the performance between binary search |
| | | * and sequential access. |
| | | * Note that sequential run may be extremely long when using high values. |
| | | */ |
| | | @Test(enabled=false) |
| | | public void seekPerformanceComparison() throws Exception |
| | | { |
| | | // You may change these values |
| | | long fileSizeInBytes = 100*1024*1024; |
| | | int numberOfValuesToSeek = 50000; |
| | | int blockSize = 256; |
| | | |
| | | writeRecordsToReachFileSize(blockSize, fileSizeInBytes); |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek); |
| | | System.out.println("File size: " + TEST_FILE.length() + " bytes"); |
| | | |
| | | System.out.println("\n---- BINARY SEARCH"); |
| | | long minTime = Long.MAX_VALUE; |
| | | long maxTime = Long.MIN_VALUE; |
| | | final long t0 = System.nanoTime(); |
| | | for (Integer key : keysToSeek) |
| | | { |
| | | final long ts = System.nanoTime(); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, false); |
| | | final long te = System.nanoTime() - ts; |
| | | if (te < minTime) minTime = te; |
| | | if (te > maxTime) maxTime = te; |
| | | // show time for seeks that last more than N microseconds (tune as needed) |
| | | if (te/1000 > 1000) |
| | | { |
| | | System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds"); |
| | | } |
| | | assertThat(result.getSecond()).isEqualTo(record(key)); |
| | | } |
| | | System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds"); |
| | | System.out.println("Min time for a search: " + minTime/1000 + " microseconds"); |
| | | System.out.println("Max time for a search: " + maxTime/1000 + " microseconds"); |
| | | System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds"); |
| | | |
| | | System.out.println("\n---- SEQUENTIAL SEARCH"); |
| | | minTime = Long.MAX_VALUE; |
| | | maxTime = Long.MIN_VALUE; |
| | | final long t1 = System.nanoTime(); |
| | | for (Integer val : keysToSeek) |
| | | { |
| | | long ts = System.nanoTime(); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.positionToKeySequentially(0, val, false); |
| | | assertThat(result.getSecond()).isEqualTo(Record.from(val, val)); |
| | | long te = System.nanoTime() - ts; |
| | | if (te < minTime) minTime = te; |
| | | if (te > maxTime) maxTime = te; |
| | | } |
| | | System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds"); |
| | | System.out.println("Min time for a search: " + minTime/1000 + " microseconds"); |
| | | System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds"); |
| | | System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds"); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | /** Write provided records. */ |
| | | private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException |
| | | { |
| | | BlockLogWriter<Integer, Integer> writer = null; |
| | | try |
| | | { |
| | | writer = newWriter(blockSize); |
| | | for (Record<Integer, Integer> record : records) |
| | | { |
| | | writer.write(record); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writer); |
| | | } |
| | | } |
| | | |
| | | /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */ |
| | | private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception |
| | | { |
| | | final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE; |
| | | final int[] values = new int[numberOfValues]; |
| | | for (int i = 0; i < numberOfValues; i++) |
| | | { |
| | | values[i] = i+1; |
| | | } |
| | | writeRecords(blockSize, records(values)); |
| | | } |
| | | |
| | | /** Returns provided number of keys to seek in random order, for a file of provided size. */ |
| | | private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys) |
| | | { |
| | | final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE; |
| | | final List<Integer> values = new ArrayList<Integer>(numberOfValues); |
| | | for (int i = 0; i < numberOfValues; i++) |
| | | { |
| | | values.add(i+1); |
| | | } |
| | | Collections.shuffle(values); |
| | | return values.subList(0, numberOfKeys); |
| | | } |
| | | |
| | | private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException |
| | | { |
| | | return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock); |
| | | } |
| | | |
| | | private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException |
| | | { |
| | | return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"), |
| | | RECORD_PARSER, blockSize); |
| | | } |
| | | |
| | | private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException |
| | | { |
| | | return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize); |
| | | } |
| | | |
| | | /** Helper to build a list of records. */ |
| | | private List<Record<Integer, Integer>> records(int...keys) |
| | | { |
| | | List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>(); |
| | | for (int key : keys) |
| | | { |
| | | records.add(Record.from(key, key)); |
| | | } |
| | | return records; |
| | | } |
| | | |
| | | /** Helper to build a record. */ |
| | | private Record<Integer, Integer> record(int key) |
| | | { |
| | | return Record.from(key, key); |
| | | } |
| | | |
| | | /** |
| | | * Record parser implementation for records with keys and values as integers to be used in tests. |
| | | * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value), |
| | | * which is useful for some tests. |
| | | */ |
| | | private static class IntRecordParser implements RecordParser<Integer, Integer> |
| | | { |
| | | public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException |
| | | { |
| | | ByteSequenceReader reader = data.asReader(); |
| | | int key = reader.getInt(); |
| | | int value = reader.getInt(); |
| | | return Record.from(key, value); |
| | | } |
| | | |
| | | public ByteString encodeRecord(Record<Integer, Integer> record) |
| | | { |
| | | return new ByteStringBuilder().append(record.getKey()).append(record.getValue()).toByteString(); |
| | | } |
| | | |
| | | @Override |
| | | public Integer decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | return Integer.valueOf(key); |
| | | } |
| | | |
| | | @Override |
| | | public String encodeKeyToString(Integer key) |
| | | { |
| | | return String.valueOf(key); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Integer getMaxKey() |
| | | { |
| | | return Integer.MAX_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | } |
| | | }; |
| | | |
| | | static final RecordParser<String,String> CORRUPTING_RECORD_PARSER = new StringRecordParser() { |
| | | @Override |
| | | public ByteString encodeRecord(Record<String, String> record) |
| | | { |
| | | // write the key only, to corrupt the log file |
| | | return new ByteStringBuilder().append(record.getKey()).toByteString(); |
| | | } |
| | | }; |
| | | |
| | | @BeforeClass |
| | | public void createTestDirectory() |
| | | { |
| | |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | logFile.append(Record.from("key"+i, "value"+i)); |
| | | logFile.append(Record.from(String.format("key%02d", i), "value"+i)); |
| | | } |
| | | logFile.close(); |
| | | } |
| | |
| | | try { |
| | | cursor = changelog.getCursor(); |
| | | |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1")); |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor("key5"); |
| | | cursor = changelog.getCursor("key05"); |
| | | |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key5", "value5")); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5")); |
| | | assertThatCursorCanBeFullyRead(cursor, 6, 10); |
| | | } |
| | | finally { |
| | |
| | | cursor = changelog.getCursor(null); |
| | | |
| | | // should start from start |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1")); |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getNearestCursor("key1"); |
| | | cursor = changelog.getNearestCursor("key01"); |
| | | |
| | | // lowest higher key is key2 |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key2", "value2")); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key02", "value2")); |
| | | assertThatCursorCanBeFullyRead(cursor, 3, 10); |
| | | } |
| | | finally { |
| | |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getNearestCursor("key0"); |
| | | cursor = changelog.getNearestCursor("key00"); |
| | | |
| | | // lowest higher key is key1 |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1")); |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | |
| | | cursor = changelog.getNearestCursor(null); |
| | | |
| | | // should start from start |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1")); |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | |
| | | { |
| | | Record<String, String> record = changelog.getOldestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(record).isEqualTo(Record.from("key01", "value1")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(changelog); |
| | |
| | | Record<String, String> record = Record.from("newkey" + i, "newvalue" + i); |
| | | writeLog.append(record); |
| | | assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record); |
| | | assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1")); |
| | | assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record); |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1")); |
| | | } |
| | | } |
| | | finally |
| | |
| | | for (int i = fromIndex; i <= endIndex; i++) |
| | | { |
| | | assertThat(cursor.next()).as("next() value when i=" + i).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i)); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i)); |
| | | } |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |