From 3c931b0f1ba72ce655f1fe03295aff77b4bfcf38 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 12 Jun 2014 15:08:37 +0000
Subject: [PATCH] OPENDJ-1472 : File based changelog : optimize random seek in each log file CR-3727
---
opends/src/server/org/opends/server/types/ByteStringBuilder.java | 31 +
opends/src/server/org/opends/server/util/StaticUtils.java | 32
opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java | 27
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java | 490 ++++++++++++++++++
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java | 536 ++++++++++++++++++++
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 155 -----
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java | 37
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java | 2
opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java | 10
opends/src/messages/messages/replication.properties | 4
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java | 214 ++++++++
11 files changed, 1,364 insertions(+), 174 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index cb27792..844c5c1 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -612,4 +612,6 @@
SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
replica offline state file '%s' for domain %s
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
- offline state file '%s' for domain %s and server id %d
\ No newline at end of file
+ offline state file '%s' for domain %s and server id %d
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH_282=Could not retrieve file length of \
+ file '%s'
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
new file mode 100644
index 0000000..5aedc91
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -0,0 +1,536 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A log reader with binary search support.
+ * <p>
+ * The log file contains record offsets at fixed block size : given a block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ * <p>
+ * The reader provides both sequential access, using the {@code readRecord()} method,
+ * and reasonably fast random access, using the {@code seek(K, boolean)} method.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+class BlockLogReader<K extends Comparable<K>, V> implements Closeable
+{
+ static final int SIZE_OF_BLOCK_OFFSET = 4;
+
+ static final int SIZE_OF_RECORD_SIZE = 4;
+
+ /**
+ * Size of a block, after which an offset to the nearest record is written.
+ * <p>
+ * This value has been fixed based on performance tests. See
+ * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
+ * OPENDJ-1472</a> for details.
+ */
+ static final int BLOCK_SIZE = 256;
+
+ private final int blockSize;
+
+ private final RecordParser<K, V> parser;
+
+ private final RandomAccessFile reader;
+
+ private final File file;
+
+ /**
+ * Creates a reader for the provided file, file reader and parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param file
+ * The log file to read.
+ * @param reader
+ * The random access reader on the log file.
+ * @param parser
+ * The parser to decode the records read.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
+ final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
+ {
+ return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
+ }
+
+ /**
+ * Creates a reader for the provided file, file reader, parser and block size.
+ * <p>
+ * This method is intended for tests only, to allow tuning of the block size.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param file
+ * The log file to read.
+ * @param reader
+ * The random access reader on the log file.
+ * @param parser
+ * The parser to decode the records read.
+ * @param blockSize
+ * The size of each block, or frequency at which the record offset is
+ * present in the log file.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
+ final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
+ {
+ return new BlockLogReader<K, V>(file, reader, parser, blockSize);
+ }
+
+ private BlockLogReader(
+ final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
+ {
+ this.file = file;
+ this.reader = reader;
+ this.parser = parser;
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * Position the reader to the record corresponding to the provided key or to
+ * the nearest key (the lowest key higher than the provided key), and returns
+ * the last record read.
+ *
+ * @param key
+ * Key to use as a start position. Key must not be {@code null}.
+ * @param findNextRecord
+ * If {@code true}, start position is the lowest key that is higher
+ * than the provided key, otherwise start position is the provided
+ * key.
+ * @return The pair (key_found, last_record_read). key_found is a boolean
+ * indicating if reader is successfully positioned to the key or the
+ * nearest key. last_record_read is the last record that was read.
+ * When key_found is equals to {@code false}, then last_record_read is
+ * always {@code null}. When key_found is equals to {@code true},
+ * last_record_read can be valued or be {@code null}
+ * @throws ChangelogException
+ * If an error occurs when seeking the key.
+ */
+ public Pair<Boolean, Record<K,V>> seekToRecord(final K key, final boolean findNextRecord) throws ChangelogException
+ {
+ final long markerPosition = searchClosestBlockStartToKey(key);
+ if (markerPosition >= 0)
+ {
+ return positionToKeySequentially(markerPosition, key, findNextRecord);
+ }
+ return Pair.of(false, null);
+ }
+
+ /**
+ * Position the reader to the provided file position.
+ *
+ * @param filePosition
+ * offset from the beginning of the file, in bytes.
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ public void seekToPosition(final long filePosition) throws ChangelogException
+ {
+ try
+ {
+ reader.seek(filePosition);
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
+ }
+ }
+
+ /**
+ * Read a record from current position.
+ *
+ * @return the record read
+ * @throws ChangelogException
+ * If an error occurs during read.
+ */
+ public Record<K,V> readRecord() throws ChangelogException
+ {
+ return readRecord(-1);
+ }
+
+ /**
+ * Returns the file position for this reader.
+ *
+ * @return the position of reader on the log file
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ public long getFilePosition() throws ChangelogException
+ {
+ try
+ {
+ return reader.getFilePointer();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws IOException
+ {
+ reader.close();
+ }
+
+ /**
+ * Read a record, either from the provided start of block position or from
+ * the current position.
+ *
+ * @param blockStartPosition
+ * The position of start of block, where a record offset is written.
+ * If provided value is -1, then record is read from current position instead.
+ * @return the record read
+ * @throws ChangelogException
+ * If an error occurs during read.
+ */
+ private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
+ {
+ try
+ {
+ final ByteString recordData = blockStartPosition == -1 ?
+ readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition);
+ return recordData != null ? parser.decodeRecord(recordData) : null;
+ }
+ catch (IOException io)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
+ }
+ catch (DecodingException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e);
+ }
+ }
+
+ /**
+ * Reads the first record found after the provided file position of block
+ * start.
+ *
+ * @param blockStartPosition
+ * Position of read pointer in the file, expected to be the start of
+ * a block where a record offset is written.
+ * @return the record read
+ * @throws IOException
+ * If an error occurs during read.
+ */
+ private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException
+ {
+ reader.seek(blockStartPosition);
+ if (blockStartPosition > 0)
+ {
+ final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
+ reader.readFully(offsetData);
+ final int offsetToRecord = ByteString.wrap(offsetData).toInt();
+ if (offsetToRecord > 0)
+ {
+ reader.seek(blockStartPosition - offsetToRecord);
+ } // if offset is zero, reader is already well positioned
+ }
+ return readNextRecord();
+ }
+
+ /**
+ * Reads the next record.
+ *
+ * @return the bytes of the next record, or {@code null} if no record is available
+ * @throws IOException
+ * If an error occurs while reading.
+ */
+ private ByteString readNextRecord() throws IOException
+ {
+ try
+ {
+ // read length of record
+ final long filePosition = reader.getFilePointer();
+ int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
+ final int recordLength = readRecordLength(distanceToBlockStart);
+
+ // read the record
+ long currentPosition = reader.getFilePointer();
+ distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
+ final ByteStringBuilder recordBytes =
+ new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
+ int remainingBytesToRead = recordLength;
+ while (distanceToBlockStart < remainingBytesToRead)
+ {
+ if (distanceToBlockStart != 0)
+ {
+ recordBytes.append(reader, distanceToBlockStart);
+ }
+ // skip the offset
+ reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+
+ // next step
+ currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
+ remainingBytesToRead -= distanceToBlockStart;
+ distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
+ }
+ if (remainingBytesToRead > 0)
+ {
+ // last bytes of the record
+ recordBytes.append(reader, remainingBytesToRead);
+ }
+ return recordBytes.toByteString();
+ }
+ catch(EOFException e)
+ {
+ // end of stream, no record or uncomplete record
+ return null;
+ }
+ }
+
+ /**
+ * Returns the total length in bytes taken by a record when stored in log file,
+ * including size taken by block offsets.
+ *
+ * @param recordLength
+ * The length of record to write.
+ * @param distanceToBlockStart
+ * Distance before the next block start.
+ * @return the length in bytes necessary to store the record in the log
+ */
+ int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
+ {
+ int totalLength = recordLength;
+ if (recordLength > distanceToBlockStart)
+ {
+ totalLength += SIZE_OF_BLOCK_OFFSET;
+ final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
+ totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
+ }
+ return totalLength;
+ }
+
+ /** Read the length of a record. */
+ private int readRecordLength(final int distanceToBlockStart) throws IOException
+ {
+ final ByteStringBuilder lengthBytes = new ByteStringBuilder();
+ if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
+ {
+ lengthBytes.append(reader, distanceToBlockStart);
+ // skip the offset
+ reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+ lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
+ return lengthBytes.toByteString().toInt();
+ }
+ else
+ {
+ if (distanceToBlockStart == 0)
+ {
+ // skip the offset
+ reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+ }
+ lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
+ return lengthBytes.toByteString().toInt();
+ }
+ }
+
+ /**
+ * Search the closest block start to the provided key, using binary search.
+ * <p>
+ * Note that position of reader is modified by this method.
+ *
+ * @param key
+ * The key to search
+ * @return the file position of block start that must be used to find the given key,
+ * or a negative number if no position could be found.
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ long searchClosestBlockStartToKey(K key) throws ChangelogException
+ {
+ final long maxPos = getLastPositionInFile();
+ long lowPos = 0L;
+ long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
+
+ while (lowPos <= highPos)
+ {
+ final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
+ final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
+ final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
+ if (middleRecord == null)
+ {
+ return -1;
+ }
+
+ final int keyComparison = middleRecord.getKey().compareTo(key);
+ if (keyComparison < 0)
+ {
+ if (middleBlockStartPos <= lowPos)
+ {
+ return lowPos;
+ }
+ lowPos = middleBlockStartPos;
+ }
+ else if (keyComparison > 0)
+ {
+ if (middleBlockStartPos >= highPos)
+ {
+ return highPos;
+ }
+ highPos = middleBlockStartPos;
+ }
+ else
+ {
+ return middleBlockStartPos;
+ }
+ }
+ // Unable to find a position where key can be found
+ return -1;
+ }
+
+ private long getLastPositionInFile() throws ChangelogException
+ {
+ try
+ {
+ return reader.length() - 1;
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
+ }
+ }
+
+ /**
+ * Position to provided key, starting from provided block start position and
+ * reading sequentially until key is found.
+ *
+ * @param blockStartPosition
+ * Position of read pointer in the file, expected to be the start of
+ * a block where a record offset is written.
+ * @param key
+ * The key to find.
+ * @param findNextRecord
+ * If {@code true}, position at the end of this method is the lowest
+ * key that is higher than the provided key, otherwise position is
+ * the provided key.
+ * @return The pair ({@code true}, last record read) if reader is successfully
+ * positioned to the key or the nearest key (last record may be null
+ * if end of file is reached), ({@code false}, null) otherwise.
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ Pair<Boolean, Record<K,V>> positionToKeySequentially(
+ final long blockStartPosition,
+ final K key,
+ final boolean findNextRecord)
+ throws ChangelogException {
+ Record<K,V> record = readRecord(blockStartPosition);
+ do {
+ if (record != null)
+ {
+ final int keysComparison = record.getKey().compareTo(key);
+ final boolean matches = findNextRecord ? keysComparison >= 0 : record.getKey().equals(key);
+ if (matches)
+ {
+ if (findNextRecord && keysComparison == 0)
+ {
+ // skip key in order to position on lowest higher key
+ record = readRecord();
+ }
+ return Pair.of(true, record);
+ }
+ }
+ record = readRecord();
+ }
+ while (record != null);
+ return Pair.of(false, null);
+ }
+
+ /**
+ * Returns the closest start of block which has a position lower than or equal
+ * to the provided file position.
+ *
+ * @param filePosition
+ * The position of reader on file.
+ * @return the file position of the block start.
+ */
+ long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
+ {
+ final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+ return dist == 0 ? filePosition : filePosition + dist - blockSize;
+ }
+
+ /**
+ * Returns the closest start of block which has a position strictly
+ * higher than the provided file position.
+ *
+ * @param filePosition
+ * The position of reader on file.
+ * @return the file position of the block start.
+ */
+ long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
+ {
+ final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+ return dist == 0 ? filePosition + blockSize : filePosition + dist;
+ }
+
+ /**
+ * Returns the distance to next block for the provided file position.
+ *
+ * @param filePosition
+ * offset from the beginning of the file, in bytes.
+ * @param blockSize
+ * Size of each block in bytes.
+ * @return the distance to next block in bytes
+ */
+ static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
+ {
+ if (filePosition == 0)
+ {
+ return blockSize;
+ }
+ final int distance = (int) (filePosition % blockSize);
+ return distance == 0 ? 0 : blockSize - distance;
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
new file mode 100644
index 0000000..6fef3f8
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
@@ -0,0 +1,214 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
+/**
+ * A log writer, using fixed-size blocks to allow fast retrieval when reading.
+ * <p>
+ * The log file contains record offsets at fixed block size : given block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+class BlockLogWriter<K extends Comparable<K>, V> implements Closeable
+{
+ private final int blockSize;
+
+ private final RecordParser<K, V> parser;
+
+ private final LogWriter writer;
+
+ /**
+ * Creates a writer for the provided log writer and parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param writer
+ * The writer on the log file.
+ * @param parser
+ * The parser to encode the records.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter(
+ final LogWriter writer, final RecordParser<K, V> parser)
+ {
+ return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE);
+ }
+
+ /**
+ * Creates a writer for the provided log writer, parser and size for blocks.
+ * <p>
+ * This method is intended for tests only, to allow tuning of the block size.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param writer
+ * The writer on the log file.
+ * @param parser
+ * The parser to encode the records.
+ * @param blockSize
+ * The size of each block, or frequency at which the record offset is
+ * present in the log file.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests(
+ final LogWriter writer, final RecordParser<K, V> parser, final int blockSize)
+ {
+ return new BlockLogWriter<K, V>(writer, parser, blockSize);
+ }
+
+ /**
+ * Creates the writer with an underlying writer, a parser and a size for blocks.
+ *
+ * @param writer
+ * The writer to the log file.
+ * @param parser
+ * The parser to encode the records.
+ * @param blockSize
+ * The size of each block.
+ */
+ private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize)
+ {
+ Reject.ifNull(writer, parser);
+ this.writer = writer;
+ this.parser = parser;
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * Writes the provided record to the log file.
+ *
+ * @param record
+ * The record to write.
+ * @throws ChangelogException
+ * If a problem occurs during write.
+ */
+ public void write(final Record<K, V> record) throws ChangelogException
+ {
+ try
+ {
+ write(parser.encodeRecord(record));
+ writer.flush();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(),
+ writer.getFile().getPath()), e);
+ }
+ }
+
+ /**
+ * Returns the number of bytes written in the log file.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesWritten()
+ {
+ return writer.getBytesWritten();
+ }
+
+ /**
+ * Synchronize all modifications to the log file to the underlying device.
+ *
+ * @throws SyncFailedException
+ * If synchronization fails.
+ */
+ public void sync() throws SyncFailedException
+ {
+ writer.sync();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ writer.close();
+ }
+
+ /**
+ * Writes the provided byte string to the log file.
+ *
+ * @param record
+ * The value to write.
+ * @throws IOException
+ * If an error occurs while writing
+ */
+ private void write(final ByteString record) throws IOException
+ {
+ // Add length of record before writing
+ ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()).
+ append(record.length()).
+ append(record).
+ toByteString();
+
+ int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize);
+ int cumulatedDistanceToBeginning = distanceToBlockStart;
+ int dataPosition = 0;
+ int dataRemaining = data.length();
+ final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET;
+
+ while (distanceToBlockStart < dataRemaining)
+ {
+ if (distanceToBlockStart > 0)
+ {
+ // append part of record
+ final int dataEndPosition = dataPosition + distanceToBlockStart;
+ writer.write(data.subSequence(dataPosition, dataEndPosition));
+ dataPosition = dataEndPosition;
+ dataRemaining -= distanceToBlockStart;
+ }
+ // append the offset to the record
+ writer.write(ByteString.valueOf(cumulatedDistanceToBeginning));
+
+ // next step
+ distanceToBlockStart = dataSizeForOneBlock;
+ cumulatedDistanceToBeginning += blockSize;
+ }
+ // append the remaining bytes to finish the record
+ writer.write(data.subSequence(dataPosition, data.length()));
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 20522bd..28f0366 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -158,7 +158,7 @@
/**
* The last key appended to the log. In order to keep the ordering of the keys
* in the log, any attempt to append a record with a key lower or equal to
- * this is rejected (no error but an event is logged).
+ * this key is rejected (no error but an event is logged).
*/
private K lastAppendedKey;
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index f0bb7e9..b27c78e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -30,11 +30,9 @@
import java.io.BufferedWriter;
import java.io.Closeable;
-import java.io.EOFException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.RandomAccessFile;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
@@ -42,11 +40,11 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
+import com.forgerock.opendj.util.Pair;
+
/**
* A log file, containing part of a {@code Log}. The log file may be:
* <ul>
@@ -72,17 +70,14 @@
/** The file containing the records. */
private final File logfile;
- /** The parser of records, to convert bytes to record and record to bytes. */
- private final RecordParser<K, V> parser;
-
/** The pool to obtain a reader on the log. */
- private LogReaderPool readerPool;
+ private final LogReaderPool<K, V> readerPool;
/**
* The writer on the log file, which may be {@code null} if log file is not
- * write-enabled
+ * write-enabled.
*/
- private LogWriter writer;
+ private final BlockLogWriter<K, V> writer;
/** Indicates if log is enabled for write. */
private final boolean isWriteEnabled;
@@ -105,10 +100,11 @@
{
Reject.ifNull(logFilePath, parser);
this.logfile = logFilePath;
- this.parser = parser;
this.isWriteEnabled = isWriteEnabled;
- initialize();
+ createLogFileIfNotExists();
+ writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
+ readerPool = new LogReaderPool<K, V>(logfile, parser);
}
/**
@@ -155,25 +151,6 @@
}
/**
- * Initialize this log.
- * <p>
- * Create directories and file if necessary, and create a writer
- * and pool of readers.
- *
- * @throws ChangelogException
- * If an errors occurs during initialization.
- */
- private void initialize() throws ChangelogException
- {
- createLogFileIfNotExists();
- if (isWriteEnabled)
- {
- writer = new LogWriter(logfile);
- }
- readerPool = new LogReaderPool(logfile);
- }
-
- /**
* Returns the file containing the records.
*
* @return the file
@@ -221,23 +198,7 @@
void append(final Record<K, V> record) throws ChangelogException
{
checkLogIsEnabledForWrite();
- try
- {
- writer.write(encodeRecord(record));
- }
- catch (IOException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
- }
- }
-
- private ByteString encodeRecord(final Record<K, V> record)
- {
- final ByteString data = parser.encodeRecord(record);
- return new ByteStringBuilder()
- .append(data.length())
- .append(data)
- .toByteString();
+ writer.write(record);
}
/**
@@ -530,73 +491,21 @@
return logfile.getPath();
}
- /** Read a record from the provided reader. */
- private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
- {
- try
- {
- final ByteString recordData = readEncodedRecord(reader);
- return recordData != null ? parser.decodeRecord(recordData) : null;
- }
- catch(DecodingException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
- }
- }
-
- private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
- {
- try
- {
- final byte[] lengthData = new byte[4];
- reader.readFully(lengthData);
- int recordLength = ByteString.wrap(lengthData).toInt();
-
- final byte[] recordData = new byte[recordLength];
- reader.readFully(recordData);
- return ByteString.wrap(recordData);
- }
- catch(EOFException e)
- {
- // end of stream, no record or uncomplete record
- return null;
- }
- catch (IOException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
- }
- }
-
- /** Seek to given position on the provided reader. */
- private void seek(RandomAccessFile reader, long position) throws ChangelogException
- {
- try
- {
- reader.seek(position);
- }
- catch (IOException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
- }
- }
-
/**
- * Returns a random access file to read this log.
+ * Returns a reader for this log.
* <p>
* Assumes that calling methods ensure that log is not closed.
*/
- private RandomAccessFile getReader() throws ChangelogException
+ private BlockLogReader<K, V> getReader() throws ChangelogException
{
return readerPool.get();
}
/** Release the provided reader. */
- private void releaseReader(RandomAccessFile reader) {
+ private void releaseReader(BlockLogReader<K, V> reader) {
readerPool.release(reader);
}
-
-
/** {@inheritDoc} */
@Override
public int hashCode()
@@ -633,7 +542,7 @@
private final LogFile<K, V> logFile;
/** To read the records. */
- private final RandomAccessFile reader;
+ private final BlockLogReader<K, V> reader;
/** The current available record, may be {@code null}. */
private Record<K,V> currentRecord;
@@ -674,14 +583,14 @@
this.logFile = logFile;
this.reader = logFile.getReader();
this.currentRecord = record;
- logFile.seek(reader, filePosition);
+ reader.seekToPosition(filePosition);
}
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
- currentRecord = logFile.readRecord(reader);
+ currentRecord = reader.readRecord();
return currentRecord != null;
}
@@ -695,26 +604,10 @@
/** {@inheritDoc} */
@Override
public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
- do
- {
- if (currentRecord != null)
- {
- final boolean matches = findNearest ?
- currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key);
- if (matches)
- {
- if (findNearest && currentRecord.getKey().equals(key))
- {
- // skip key in order to position on lowest higher key
- next();
- }
- return true;
- }
- }
- next();
- }
- while (currentRecord != null);
- return false;
+ final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
+ final boolean found = result.getFirst();
+ currentRecord = found ? result.getSecond() : null;
+ return found;
}
/** {@inheritDoc} */
@@ -733,15 +626,7 @@
*/
long getFilePosition() throws ChangelogException
{
- try
- {
- return reader.getFilePointer();
- }
- catch (IOException e)
- {
- throw new ChangelogException(
- ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
- }
+ return reader.getFilePosition();
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
index b5aed97..5300ffd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
@@ -35,22 +35,33 @@
/**
* A Pool of readers to a log file.
+
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
*/
// TODO : implement a real pool - reusing readers instead of opening-closing them each time
-class LogReaderPool
+class LogReaderPool<K extends Comparable<K>, V>
{
/** The file to read. */
private final File file;
+ private final RecordParser<K, V> parser;
+
/**
* Creates a pool of readers for provided file.
*
* @param file
- * The file to read.
+ * The file to read.
+ * @param parser
+ * The parser to decode the records read.
*/
- LogReaderPool(File file)
+ LogReaderPool(File file, RecordParser<K, V> parser)
{
this.file = file;
+ this.parser = parser;
}
/**
@@ -63,9 +74,9 @@
* @throws ChangelogException
* If the file can't be found or read.
*/
- RandomAccessFile get() throws ChangelogException
+ BlockLogReader<K, V> get() throws ChangelogException
{
- return getRandomAccess(file);
+ return getReader(file);
}
/**
@@ -77,17 +88,17 @@
* The random access reader to a file previously acquired with this
* pool.
*/
- void release(RandomAccessFile reader)
+ void release(BlockLogReader<K, V> reader)
{
StaticUtils.close(reader);
}
/** Returns a random access file to read this log. */
- private RandomAccessFile getRandomAccess(File file) throws ChangelogException
+ private BlockLogReader<K, V> getReader(File file) throws ChangelogException
{
try
{
- return new RandomAccessFile(file, "r");
+ return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ;
}
catch (Exception e)
{
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
index 44ce685..5ec5f2b 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -76,6 +76,16 @@
}
}
+ /**
+ * Returns the file used by this writer.
+ *
+ * @return the file
+ */
+ File getFile()
+ {
+ return file;
+ }
+
/** {@inheritDoc} */
@Override
public void write(int b) throws IOException
diff --git a/opends/src/server/org/opends/server/types/ByteStringBuilder.java b/opends/src/server/org/opends/server/types/ByteStringBuilder.java
index e6010a3..fd18874 100644
--- a/opends/src/server/org/opends/server/types/ByteStringBuilder.java
+++ b/opends/src/server/org/opends/server/types/ByteStringBuilder.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions copyright 2012 ForgeRock AS.
+ * Portions copyright 2012-2014 ForgeRock AS.
*/
package org.opends.server.types;
@@ -30,6 +30,8 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
+import java.io.DataInput;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -566,7 +568,34 @@
return bytesRead;
}
+ /**
+ * Appends the provided {@code DataInput} to this byte string
+ * builder.
+ *
+ * @param stream
+ * The data input stream to be appended to this byte string
+ * builder.
+ * @param length
+ * The maximum number of bytes to be appended from {@code
+ * input}.
+ * @throws IndexOutOfBoundsException
+ * If {@code length} is less than zero.
+ * @throws EOFException
+ * If this stream reaches the end before reading all the bytes.
+ * @throws IOException
+ * If an I/O error occurs.
+ */
+ public void append(DataInput stream, int length) throws IndexOutOfBoundsException, EOFException, IOException
+ {
+ if (length < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ ensureAdditionalCapacity(length);
+ stream.readFully(buffer, this.length, length);
+ this.length += length;
+ }
/**
* Appends the provided {@code ReadableByteChannel} to this byte string
diff --git a/opends/src/server/org/opends/server/util/StaticUtils.java b/opends/src/server/org/opends/server/util/StaticUtils.java
index bc27bed..b3cfa18 100644
--- a/opends/src/server/org/opends/server/util/StaticUtils.java
+++ b/opends/src/server/org/opends/server/util/StaticUtils.java
@@ -3463,32 +3463,36 @@
/**
- * Attempts to delete the specified file or directory. If it is a directory,
+ * Attempts to delete the specified file or directory. If it is a directory,
* then any files or subdirectories that it contains will be recursively
* deleted as well.
*
- * @param file The file or directory to be removed.
- *
- * @return <CODE>true</CODE> if the specified file and any subordinates are
- * all successfully removed, or <CODE>false</CODE> if at least one
- * element in the subtree could not be removed.
+ * @param file
+ * The file or directory to be removed.
+ * @return {@code true} if the specified file and any subordinates are all
+ * successfully removed, or {@code false} if at least one element in
+ * the subtree could not be removed or file does not exists.
*/
public static boolean recursiveDelete(File file)
{
- boolean successful = true;
- if (file.isDirectory())
+ if (file.exists())
{
- File[] childList = file.listFiles();
- if (childList != null)
+ boolean successful = true;
+ if (file.isDirectory())
{
- for (File f : childList)
+ File[] childList = file.listFiles();
+ if (childList != null)
{
- successful &= recursiveDelete(f);
+ for (File f : childList)
+ {
+ successful &= recursiveDelete(f);
+ }
}
}
- }
- return (successful & file.delete());
+ return (successful & file.delete());
+ }
+ return false;
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
new file mode 100644
index 0000000..7561613
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
@@ -0,0 +1,490 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.forgerock.opendj.util.Pair;
+
+@SuppressWarnings("javadoc")
+public class BlockLogReaderWriterTest extends DirectoryServerTestCase
+{
+ private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+ private static final File TEST_FILE = new File(TEST_DIRECTORY, "file");
+
+ private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser();
+
+ private static final int INT_RECORD_SIZE = 12;
+
+ @BeforeClass
+ void createTestDirectory()
+ {
+ TEST_DIRECTORY.mkdirs();
+ }
+
+ @BeforeMethod
+ void ensureTestFileIsEmpty() throws Exception
+ {
+ StaticUtils.recursiveDelete(TEST_FILE);
+ }
+
+ @AfterClass
+ void cleanTestDirectory()
+ {
+ StaticUtils.recursiveDelete(TEST_DIRECTORY);
+ }
+
+ @DataProvider(name = "recordsData")
+ Object[][] recordsData()
+ {
+ return new Object[][]
+ {
+ // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes
+
+ // size of block, expected size of file after all records are written, records
+ { 12, 12, records(1) }, // zero block marker
+ { 10, 16, records(1) }, // one block marker
+ { 8, 16, records(1) }, // one block marker
+ { 7, 20, records(1) }, // two block markers
+ { 6, 24, records(1) }, // three block markers
+ { 5, 40, records(1) }, // seven block markers
+
+ { 16, 28, records(1,2) }, // one block marker
+ { 12, 32, records(1,2) }, // two block markers
+ { 10, 36, records(1,2) }, // three block markers
+ };
+ }
+
+ /**
+ * Tests that records can be written then read correctly for different block sizes.
+ */
+ @Test(dataProvider="recordsData")
+ public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records)
+ throws Exception
+ {
+ writeRecords(blockSize, records);
+
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+ for (int i = 0; i < records.size(); i++)
+ {
+ Record<Integer, Integer> record = reader.readRecord();
+ assertThat(record).isEqualTo(records.get(i));
+ }
+ assertThat(reader.readRecord()).isNull();
+ assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile);
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ @DataProvider(name = "recordsForSeek")
+ Object[][] recordsForSeek()
+ {
+ Object[][] data = new Object[][] {
+ // records, key, findNearest, expectedRecord, expectedFound
+
+ // no record
+ { records(), 1, false, null, false },
+
+ // 1 record exact find
+ { records(1), 1, false, record(1), true },
+
+ // 1 record nearest find
+ { records(1), 1, true, null, true },
+
+ // 2 records
+ { records(1,2), 1, false, record(1), true },
+ { records(1,2), 2, false, record(2), true },
+ { records(1,2), 1, true, record(2), true },
+ { records(1,2), 2, true, null, true },
+
+ // 3 records exact find
+ { records(1,2,3), 0, false, null, false },
+ { records(1,2,3), 1, false, record(1), true },
+ { records(1,2,3), 2, false, record(2), true },
+ { records(1,2,3), 3, false, record(3), true },
+ { records(1,2,3), 4, false, null, false },
+
+ // 3 records nearest find
+ { records(1,2,3), 0, true, record(1), true },
+ { records(1,2,3), 1, true, record(2), true },
+ { records(1,2,3), 2, true, record(3), true },
+ { records(1,2,3), 3, true, null, true },
+ { records(1,2,3), 4, true, null, false },
+
+ // 10 records exact find
+ { records(1,2,3,4,5,6,7,8,9,10), 0, false, null, false },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, false, record(1), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, false, record(5), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, false, record(10), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, false, null, false },
+
+ // 10 records nearest find
+ { records(1,2,3,4,5,6,7,8,9,10), 0, true, record(1), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, true, record(2), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, true, record(6), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, true, null, true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, true, null, false },
+ };
+
+ // For each test case, do a test with various block sizes to ensure algorithm is not broken
+ // on a given size
+ int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 };
+ Object[][] finalData = new Object[sizes.length*data.length][6];
+ for (int i = 0; i < data.length; i++)
+ {
+ for (int j = 0; j < sizes.length; j++)
+ {
+ Object[] a = data[i];
+ // add the block size at beginning of each test case
+ finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4]};
+ }
+ }
+ return finalData;
+ }
+
+ @Test(dataProvider="recordsForSeek")
+ public void testSeek(int blockSize, List<Record<Integer, Integer>> records, int key, boolean findNearest,
+ Record<Integer, Integer> expectedRecord, boolean expectedFound) throws Exception
+ {
+ writeRecords(blockSize, records);
+
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+ Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, findNearest);
+
+ assertThat(result.getFirst()).isEqualTo(expectedFound);
+ assertThat(result.getSecond()).isEqualTo(expectedRecord);
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ @Test
+ public void testGetClosestMarkerBeforeOrAtPosition() throws Exception
+ {
+ final int blockSize = 10;
+ BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20);
+ }
+
+ @Test
+ public void testGetClosestMarkerStrictlyAfterPosition() throws Exception
+ {
+ final int blockSize = 10;
+ BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30);
+ }
+
+ @Test
+ public void testSearchClosestMarkerToKey() throws Exception
+ {
+ int blockSize = 20;
+ writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20));
+
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+
+ assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0);
+ assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0);
+ assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20);
+ assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20);
+ assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40);
+ assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60);
+ assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80);
+ assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80);
+ assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100);
+ assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120);
+ assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140);
+ assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260);
+ assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280);
+ // out of reach keys
+ assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280);
+ assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280);
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ @Test
+ public void testLengthOfStoredRecord() throws Exception
+ {
+ final int blockSize = 100;
+ BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+ int recordLength = 10;
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+
+ recordLength = 150;
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+
+ recordLength = 200;
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+ }
+
+ /**
+ * This test is intended to be run only manually to check the performance between binary search
+ * and sequential access.
+ * Note that sequential run may be extremely long when using high values.
+ */
+ @Test(enabled=false)
+ public void seekPerformanceComparison() throws Exception
+ {
+ // You may change these values
+ long fileSizeInBytes = 100*1024*1024;
+ int numberOfValuesToSeek = 50000;
+ int blockSize = 256;
+
+ writeRecordsToReachFileSize(blockSize, fileSizeInBytes);
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+ List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek);
+ System.out.println("File size: " + TEST_FILE.length() + " bytes");
+
+ System.out.println("\n---- BINARY SEARCH");
+ long minTime = Long.MAX_VALUE;
+ long maxTime = Long.MIN_VALUE;
+ final long t0 = System.nanoTime();
+ for (Integer key : keysToSeek)
+ {
+ final long ts = System.nanoTime();
+ Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, false);
+ final long te = System.nanoTime() - ts;
+ if (te < minTime) minTime = te;
+ if (te > maxTime) maxTime = te;
+ // show time for seeks that last more than N microseconds (tune as needed)
+ if (te/1000 > 1000)
+ {
+ System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds");
+ }
+ assertThat(result.getSecond()).isEqualTo(record(key));
+ }
+ System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds");
+ System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+ System.out.println("Max time for a search: " + maxTime/1000 + " microseconds");
+ System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds");
+
+ System.out.println("\n---- SEQUENTIAL SEARCH");
+ minTime = Long.MAX_VALUE;
+ maxTime = Long.MIN_VALUE;
+ final long t1 = System.nanoTime();
+ for (Integer val : keysToSeek)
+ {
+ long ts = System.nanoTime();
+ Pair<Boolean, Record<Integer, Integer>> result = reader.positionToKeySequentially(0, val, false);
+ assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
+ long te = System.nanoTime() - ts;
+ if (te < minTime) minTime = te;
+ if (te > maxTime) maxTime = te;
+ }
+ System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds");
+ System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+ System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds");
+ System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds");
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ /** Write provided records. */
+ private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException
+ {
+ BlockLogWriter<Integer, Integer> writer = null;
+ try
+ {
+ writer = newWriter(blockSize);
+ for (Record<Integer, Integer> record : records)
+ {
+ writer.write(record);
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writer);
+ }
+ }
+
+ /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */
+ private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception
+ {
+ final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE;
+ final int[] values = new int[numberOfValues];
+ for (int i = 0; i < numberOfValues; i++)
+ {
+ values[i] = i+1;
+ }
+ writeRecords(blockSize, records(values));
+ }
+
+ /** Returns provided number of keys to seek in random order, for a file of provided size. */
+ private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys)
+ {
+ final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE;
+ final List<Integer> values = new ArrayList<Integer>(numberOfValues);
+ for (int i = 0; i < numberOfValues; i++)
+ {
+ values.add(i+1);
+ }
+ Collections.shuffle(values);
+ return values.subList(0, numberOfKeys);
+ }
+
+ private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException
+ {
+ return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock);
+ }
+
+ private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException
+ {
+ return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"),
+ RECORD_PARSER, blockSize);
+ }
+
+ private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException
+ {
+ return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize);
+ }
+
+ /** Helper to build a list of records. */
+ private List<Record<Integer, Integer>> records(int...keys)
+ {
+ List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>();
+ for (int key : keys)
+ {
+ records.add(Record.from(key, key));
+ }
+ return records;
+ }
+
+ /** Helper to build a record. */
+ private Record<Integer, Integer> record(int key)
+ {
+ return Record.from(key, key);
+ }
+
+ /**
+ * Record parser implementation for records with keys and values as integers to be used in tests.
+ * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value),
+ * which is useful for some tests.
+ */
+ private static class IntRecordParser implements RecordParser<Integer, Integer>
+ {
+ public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException
+ {
+ ByteSequenceReader reader = data.asReader();
+ int key = reader.getInt();
+ int value = reader.getInt();
+ return Record.from(key, value);
+ }
+
+ public ByteString encodeRecord(Record<Integer, Integer> record)
+ {
+ return new ByteStringBuilder().append(record.getKey()).append(record.getValue()).toByteString();
+ }
+
+ @Override
+ public Integer decodeKeyFromString(String key) throws ChangelogException
+ {
+ return Integer.valueOf(key);
+ }
+
+ @Override
+ public String encodeKeyToString(Integer key)
+ {
+ return String.valueOf(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Integer getMaxKey()
+ {
+ return Integer.MAX_VALUE;
+ }
+ }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
index 524d95a..2af0e8c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -58,6 +58,15 @@
}
};
+ static final RecordParser<String,String> CORRUPTING_RECORD_PARSER = new StringRecordParser() {
+ @Override
+ public ByteString encodeRecord(Record<String, String> record)
+ {
+ // write the key only, to corrupt the log file
+ return new ByteStringBuilder().append(record.getKey()).toByteString();
+ }
+ };
+
@BeforeClass
public void createTestDirectory()
{
@@ -78,7 +87,7 @@
for (int i = 1; i <= 10; i++)
{
- logFile.append(Record.from("key"+i, "value"+i));
+ logFile.append(Record.from(String.format("key%02d", i), "value"+i));
}
logFile.close();
}
@@ -106,7 +115,7 @@
try {
cursor = changelog.getCursor();
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
assertThatCursorCanBeFullyRead(cursor, 2, 10);
}
finally {
@@ -120,9 +129,9 @@
LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
DBCursor<Record<String, String>> cursor = null;
try {
- cursor = changelog.getCursor("key5");
+ cursor = changelog.getCursor("key05");
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key5", "value5"));
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
assertThatCursorCanBeFullyRead(cursor, 6, 10);
}
finally {
@@ -156,7 +165,7 @@
cursor = changelog.getCursor(null);
// should start from start
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
assertThatCursorCanBeFullyRead(cursor, 2, 10);
}
finally {
@@ -170,10 +179,10 @@
LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
DBCursor<Record<String, String>> cursor = null;
try {
- cursor = changelog.getNearestCursor("key1");
+ cursor = changelog.getNearestCursor("key01");
// lowest higher key is key2
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key2", "value2"));
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key02", "value2"));
assertThatCursorCanBeFullyRead(cursor, 3, 10);
}
finally {
@@ -187,10 +196,10 @@
LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
DBCursor<Record<String, String>> cursor = null;
try {
- cursor = changelog.getNearestCursor("key0");
+ cursor = changelog.getNearestCursor("key00");
// lowest higher key is key1
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
assertThatCursorCanBeFullyRead(cursor, 2, 10);
}
finally {
@@ -207,7 +216,7 @@
cursor = changelog.getNearestCursor(null);
// should start from start
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
assertThatCursorCanBeFullyRead(cursor, 2, 10);
}
finally {
@@ -235,7 +244,7 @@
{
Record<String, String> record = changelog.getOldestRecord();
- assertThat(record).isEqualTo(Record.from("key1", "value1"));
+ assertThat(record).isEqualTo(Record.from("key01", "value1"));
}
finally {
StaticUtils.close(changelog);
@@ -275,9 +284,9 @@
Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
writeLog.append(record);
assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
- assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1"));
+ assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));
assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
- assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key1", "value1"));
+ assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1"));
}
}
finally
@@ -296,7 +305,7 @@
for (int i = fromIndex; i <= endIndex; i++)
{
assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i));
+ assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i));
}
assertThatCursorIsExhausted(cursor);
}
--
Gitblit v1.10.0