/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
|
* or http://forgerock.org/license/CDDLv1.0.html.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at legal-notices/CDDLv1_0.txt.
|
* If applicable, add the following below this CDDL HEADER, with the
|
* fields enclosed by brackets "[]" replaced with your own identifying
|
* information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Copyright 2014 ForgeRock AS.
|
*/
|
package org.opends.server.replication.server.changelog.file;
|
|
import static com.forgerock.opendj.util.Validator.*;
|
|
import static org.opends.messages.ReplicationMessages.*;
|
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
|
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
|
|
import java.io.Closeable;
|
import java.io.EOFException;
|
import java.io.File;
|
import java.io.IOException;
|
import java.io.RandomAccessFile;
|
|
import org.opends.server.replication.server.changelog.api.ChangelogException;
|
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
|
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
|
import org.opends.server.types.ByteString;
|
import org.opends.server.types.ByteStringBuilder;
|
import org.opends.server.util.StaticUtils;
|
|
import com.forgerock.opendj.util.Pair;
|
|
/**
|
* A log reader with binary search support.
|
* <p>
|
* The log file contains record offsets at fixed block size : given a block size N,
|
* an offset is written at every N bytes. The offset contains the number of bytes to
|
* reach the beginning of previous record (or next record if offset equals 0).
|
* <p>
|
* The reader provides both sequential access, using the {@code readRecord()} method,
|
* and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
|
*
|
* @param <K>
|
* Type of the key of a record, which must be comparable.
|
* @param <V>
|
* Type of the value of a record.
|
*/
|
class BlockLogReader<K extends Comparable<K>, V> implements Closeable
|
{
|
static final int SIZE_OF_BLOCK_OFFSET = 4;
|
|
static final int SIZE_OF_RECORD_SIZE = 4;
|
|
/**
|
* Size of a block, after which an offset to the nearest record is written.
|
* <p>
|
* This value has been fixed based on performance tests. See
|
* <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
|
* OPENDJ-1472</a> for details.
|
*/
|
static final int BLOCK_SIZE = 256;
|
|
private final int blockSize;
|
|
private final RecordParser<K, V> parser;
|
|
private final RandomAccessFile reader;
|
|
private final File file;
|
|
/**
|
* Creates a reader for the provided file, file reader and parser.
|
*
|
* @param <K>
|
* Type of the key of a record, which must be comparable.
|
* @param <V>
|
* Type of the value of a record.
|
* @param file
|
* The log file to read.
|
* @param reader
|
* The random access reader on the log file.
|
* @param parser
|
* The parser to decode the records read.
|
* @return a new log reader
|
*/
|
static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
|
final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
|
{
|
return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
|
}
|
|
/**
|
* Creates a reader for the provided file, file reader, parser and block size.
|
* <p>
|
* This method is intended for tests only, to allow tuning of the block size.
|
*
|
* @param <K>
|
* Type of the key of a record, which must be comparable.
|
* @param <V>
|
* Type of the value of a record.
|
* @param file
|
* The log file to read.
|
* @param reader
|
* The random access reader on the log file.
|
* @param parser
|
* The parser to decode the records read.
|
* @param blockSize
|
* The size of each block, or frequency at which the record offset is
|
* present in the log file.
|
* @return a new log reader
|
*/
|
static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
|
final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
|
{
|
return new BlockLogReader<K, V>(file, reader, parser, blockSize);
|
}
|
|
private BlockLogReader(
|
final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
|
{
|
this.file = file;
|
this.reader = reader;
|
this.parser = parser;
|
this.blockSize = blockSize;
|
}
|
|
/**
|
* Position the reader to the record corresponding to the provided key and
|
* matching and positioning strategies. Returns the last record read.
|
*
|
* @param key
|
* Key to use as a start position. Key must not be {@code null}.
|
* @param matchStrategy
|
* The key matching strategy.
|
* @param positionStrategy
|
* The positioning strategy.
|
* @return The pair (key_found, last_record_read). key_found is a boolean
|
* indicating if reader is successfully positioned. last_record_read
|
* is the last record that was read. When key_found is equals to
|
* {@code false}, then last_record_read is always {@code null}. When
|
* key_found is equals to {@code true}, last_record_read can be valued
|
* or be {@code null}
|
* @throws ChangelogException
|
* If an error occurs when seeking the key.
|
*/
|
public Pair<Boolean, Record<K,V>> seekToRecord(
|
final K key,
|
final KeyMatchingStrategy matchStrategy,
|
final PositionStrategy positionStrategy)
|
throws ChangelogException
|
{
|
ensureNotNull(key);
|
final long markerPosition = searchClosestBlockStartToKey(key);
|
if (markerPosition >= 0)
|
{
|
return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy);
|
}
|
return Pair.of(false, null);
|
}
|
|
/**
|
* Position the reader to the provided file position.
|
*
|
* @param filePosition
|
* offset from the beginning of the file, in bytes.
|
* @throws ChangelogException
|
* If an error occurs.
|
*/
|
public void seekToPosition(final long filePosition) throws ChangelogException
|
{
|
try
|
{
|
reader.seek(filePosition);
|
}
|
catch (IOException e)
|
{
|
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
|
}
|
}
|
|
/**
|
* Read a record from current position.
|
*
|
* @return the record read
|
* @throws ChangelogException
|
* If an error occurs during read.
|
*/
|
public Record<K,V> readRecord() throws ChangelogException
|
{
|
return readRecord(-1);
|
}
|
|
/**
|
* Returns the file position for this reader.
|
*
|
* @return the position of reader on the log file
|
* @throws ChangelogException
|
* If an error occurs.
|
*/
|
public long getFilePosition() throws ChangelogException
|
{
|
try
|
{
|
return reader.getFilePointer();
|
}
|
catch (IOException e)
|
{
|
throw new ChangelogException(
|
ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
|
}
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void close() throws IOException
|
{
|
reader.close();
|
}
|
|
/**
|
* Read a record, either from the provided start of block position or from
|
* the current position.
|
*
|
* @param blockStartPosition
|
* The position of start of block, where a record offset is written.
|
* If provided value is -1, then record is read from current position instead.
|
* @return the record read
|
* @throws ChangelogException
|
* If an error occurs during read.
|
*/
|
private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
|
{
|
try
|
{
|
if (blockStartPosition != -1)
|
{
|
positionToRecordFromBlockStart(blockStartPosition);
|
}
|
final ByteString recordData = readNextRecord();
|
return recordData != null ? parser.decodeRecord(recordData) : null;
|
}
|
catch (Exception io)
|
{
|
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
|
}
|
}
|
|
/**
|
* Position to the record given by the offset read from provided block
|
* start.
|
*
|
* @param blockStartPosition
|
* Position of read pointer in the file, expected to be the start of
|
* a block where a record offset is written.
|
* @throws IOException
|
* If an error occurs during read.
|
*/
|
private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
|
{
|
reader.seek(blockStartPosition);
|
if (blockStartPosition > 0)
|
{
|
final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
|
reader.readFully(offsetData);
|
final int offsetToRecord = ByteString.wrap(offsetData).toInt();
|
if (offsetToRecord > 0)
|
{
|
reader.seek(blockStartPosition - offsetToRecord);
|
} // if offset is zero, reader is already well positioned
|
}
|
}
|
|
/**
|
* Reads the next record.
|
*
|
* @return the bytes of the next record, or {@code null} if no record is available
|
* @throws IOException
|
* If an error occurs while reading.
|
*/
|
private ByteString readNextRecord() throws IOException
|
{
|
try
|
{
|
// read length of record
|
final long filePosition = reader.getFilePointer();
|
int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
|
final int recordLength = readRecordLength(distanceToBlockStart);
|
|
// read the record
|
long currentPosition = reader.getFilePointer();
|
distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
|
final ByteStringBuilder recordBytes =
|
new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
|
int remainingBytesToRead = recordLength;
|
while (distanceToBlockStart < remainingBytesToRead)
|
{
|
if (distanceToBlockStart != 0)
|
{
|
recordBytes.append(reader, distanceToBlockStart);
|
}
|
// skip the offset
|
reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
|
|
// next step
|
currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
|
remainingBytesToRead -= distanceToBlockStart;
|
distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
|
}
|
if (remainingBytesToRead > 0)
|
{
|
// last bytes of the record
|
recordBytes.append(reader, remainingBytesToRead);
|
}
|
return recordBytes.toByteString();
|
}
|
catch (EOFException e)
|
{
|
// end of stream, no record or uncomplete record
|
return null;
|
}
|
}
|
|
/**
|
* Returns the total length in bytes taken by a record when stored in log file,
|
* including size taken by block offsets.
|
*
|
* @param recordLength
|
* The length of record to write.
|
* @param distanceToBlockStart
|
* Distance before the next block start.
|
* @return the length in bytes necessary to store the record in the log
|
*/
|
int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
|
{
|
int totalLength = recordLength;
|
if (recordLength > distanceToBlockStart)
|
{
|
totalLength += SIZE_OF_BLOCK_OFFSET;
|
final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
|
totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
|
}
|
return totalLength;
|
}
|
|
/** Read the length of a record. */
|
private int readRecordLength(final int distanceToBlockStart) throws IOException
|
{
|
final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
|
if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
|
{
|
lengthBytes.append(reader, distanceToBlockStart);
|
// skip the offset
|
reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
|
lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
|
return lengthBytes.toByteString().toInt();
|
}
|
else
|
{
|
if (distanceToBlockStart == 0)
|
{
|
// skip the offset
|
reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
|
}
|
lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
|
return lengthBytes.toByteString().toInt();
|
}
|
}
|
|
/**
|
* Search the closest block start to the provided key, using binary search.
|
* <p>
|
* Note that position of reader is modified by this method.
|
*
|
* @param key
|
* The key to search
|
* @return the file position of block start that must be used to find the given key,
|
* or a negative number if no position could be found.
|
* @throws ChangelogException
|
* if a problem occurs
|
*/
|
long searchClosestBlockStartToKey(K key) throws ChangelogException
|
{
|
final long maxPos = getFileLength() - 1;
|
long lowPos = 0L;
|
long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
|
|
while (lowPos <= highPos)
|
{
|
final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
|
final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
|
final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
|
if (middleRecord == null)
|
{
|
return -1;
|
}
|
|
final int keyComparison = middleRecord.getKey().compareTo(key);
|
if (keyComparison < 0)
|
{
|
if (middleBlockStartPos <= lowPos)
|
{
|
return lowPos;
|
}
|
lowPos = middleBlockStartPos;
|
}
|
else if (keyComparison > 0)
|
{
|
if (middleBlockStartPos >= highPos)
|
{
|
return highPos;
|
}
|
highPos = middleBlockStartPos;
|
}
|
else
|
{
|
return middleBlockStartPos;
|
}
|
}
|
// Unable to find a position where key can be found
|
return -1;
|
}
|
|
private long getFileLength() throws ChangelogException
|
{
|
try
|
{
|
return reader.length();
|
}
|
catch (IOException e)
|
{
|
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
|
}
|
}
|
|
/**
|
* Position before, at or after provided key, starting from provided block
|
* start position and reading sequentially until key is found according to
|
* matching and positioning strategies.
|
*
|
* @param blockStartPosition
|
* Position of read pointer in the file, expected to be the start of
|
* a block where a record offset is written
|
* @param key
|
* The key to find
|
* @param matchStrategy
|
* The key matching strategy
|
* @param positionStrategy
|
* The positioning strategy
|
* @return The pair ({@code true}, selected record) if reader is successfully
|
* positioned (selected record may be null if end of file is reached),
|
* ({@code false}, null) otherwise.
|
* @throws ChangelogException
|
* If an error occurs.
|
*/
|
Pair<Boolean, Record<K,V>> positionToKeySequentially(final long blockStartPosition, final K key,
|
final KeyMatchingStrategy matchStrategy, final PositionStrategy positionStrategy) throws ChangelogException
|
{
|
Record<K,V> record = readRecord(blockStartPosition);
|
Record<K,V> previousRecord = null;
|
long previousPosition = blockStartPosition;
|
boolean matchingKeyIsLowerThanAnyRecord = true;
|
while (record != null)
|
{
|
final int keysComparison = record.getKey().compareTo(key);
|
if (keysComparison <= 0)
|
{
|
matchingKeyIsLowerThanAnyRecord = false;
|
}
|
if ((keysComparison == 0 && matchStrategy == EQUAL_TO_KEY)
|
|| (keysComparison >= 0 && matchStrategy != EQUAL_TO_KEY))
|
{
|
return getMatchingRecord(matchStrategy, positionStrategy, keysComparison, matchingKeyIsLowerThanAnyRecord,
|
record, previousRecord, previousPosition);
|
}
|
previousRecord = record;
|
previousPosition = getFilePosition();
|
record = readRecord();
|
}
|
|
if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
|
{
|
return getRecordNoMatchForLessStrategy(positionStrategy, previousRecord, previousPosition);
|
}
|
return Pair.of(false, null);
|
}
|
|
private Pair<Boolean,Record<K,V>> getMatchingRecord(KeyMatchingStrategy matchStrategy,
|
PositionStrategy positionStrategy, int keysComparison, boolean matchKeyIsLowerThanAnyRecord,
|
Record<K, V> currentRecord, Record<K, V> previousRecord, long previousPosition)
|
throws ChangelogException
|
{
|
Record<K, V> record = currentRecord;
|
|
if (positionStrategy == AFTER_MATCHING_KEY)
|
{
|
if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && matchKeyIsLowerThanAnyRecord)
|
{
|
return Pair.of(false, null);
|
}
|
if (keysComparison == 0)
|
{
|
// skip matching key
|
record = readRecord();
|
}
|
}
|
else if (positionStrategy == ON_MATCHING_KEY && matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && keysComparison > 0)
|
{
|
seekToPosition(previousPosition);
|
return Pair.of(previousRecord != null, previousRecord);
|
}
|
return Pair.of(true, record);
|
}
|
|
private Pair<Boolean, Record<K, V>> getRecordNoMatchForLessStrategy(
|
final PositionStrategy positionStrategy, final Record<K, V> previousRecord, final long previousPosition)
|
throws ChangelogException
|
{
|
if (positionStrategy == ON_MATCHING_KEY)
|
{
|
seekToPosition(previousPosition);
|
return Pair.of(previousRecord != null, previousRecord);
|
}
|
else
|
{
|
return Pair.of(true, null);
|
}
|
}
|
|
/**
|
* Returns the closest start of block which has a position lower than or equal
|
* to the provided file position.
|
*
|
* @param filePosition
|
* The position of reader on file.
|
* @return the file position of the block start.
|
*/
|
long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
|
{
|
final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
|
return dist == 0 ? filePosition : filePosition + dist - blockSize;
|
}
|
|
/**
|
* Returns the closest start of block which has a position strictly
|
* higher than the provided file position.
|
*
|
* @param filePosition
|
* The position of reader on file.
|
* @return the file position of the block start.
|
*/
|
long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
|
{
|
final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
|
return dist == 0 ? filePosition + blockSize : filePosition + dist;
|
}
|
|
/**
|
* Returns the distance to next block for the provided file position.
|
*
|
* @param filePosition
|
* offset from the beginning of the file, in bytes.
|
* @param blockSize
|
* Size of each block in bytes.
|
* @return the distance to next block in bytes
|
*/
|
static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
|
{
|
if (filePosition == 0)
|
{
|
return blockSize;
|
}
|
final int distance = (int) (filePosition % blockSize);
|
return distance == 0 ? 0 : blockSize - distance;
|
}
|
|
/**
|
* Check if the log file is valid, by reading the latest records at the end of
|
* the log file.
|
* <p>
|
* The intent of this method is to allow self-recovery in case a partial
|
* record as been written at the end of file (eg, after a server crash).
|
* <p>
|
* Any unexpected exception is considered as a severe error where
|
* self-recovery is not appropriate and thus will lead to a
|
* ChangelogException.
|
*
|
* @return -1 if log is valid, or a positive number if log is invalid, where
|
* the number represents the last valid position in the log file.
|
* @throws ChangelogException
|
* if an error occurs while checking the log
|
*/
|
long checkLogIsValid() throws ChangelogException
|
{
|
try
|
{
|
final long fileSize = getFileLength();
|
final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
|
positionToRecordFromBlockStart(lastBlockStart);
|
|
long lastValidPosition = lastBlockStart;
|
for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
|
parser.decodeRecord(recordData);
|
lastValidPosition = reader.getFilePointer();
|
}
|
|
final boolean isFileValid = lastValidPosition == fileSize;
|
return isFileValid ? -1 : lastValidPosition;
|
}
|
catch (Exception e)
|
{
|
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
|
file.getPath(),
|
StaticUtils.stackTraceToSingleLineString(e)));
|
}
|
}
|
|
}
|