From 3b08e64ccf908fa4c57e6ee13aa8901efcc53ee2 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 22 Sep 2014 15:35:12 +0000
Subject: [PATCH] Actual merge of complete changelog.file package which contains implementation of file-based changelog
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java | 101
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java | 72
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 2
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java | 632 ++++++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 607 +++++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java | 214 ++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java | 130 +
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 860 ++++++++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java | 144 +
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 409 ++++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 982 ++++++++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java | 151 +
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java | 1187 +++++++++++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java | 118 +
opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java | 24
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java | 394 +++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java | 79
17 files changed, 6,008 insertions(+), 98 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java b/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
index 12d6b8f..2ba1c94 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -36,6 +36,7 @@
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.types.AttributeType;
@@ -285,4 +286,27 @@
throw new RuntimeException("Not implemented");
}
+ /**
+ * Get the instance of backend.
+ *
+ * @return the instance
+ */
+ public static ChangelogBackend getInstance()
+ {
+ throw new RuntimeException("Not implemented");
+ }
+
+ /**
+ * Notify.
+ *
+ * @param baseDN
+ * The base DN
+ * @param updateMsg
+ * The update msg
+ */
+ public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg)
+ {
+ throw new RuntimeException("Not implemented");
+ }
+
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
new file mode 100644
index 0000000..1290261
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -0,0 +1,632 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A log reader with binary search support.
+ * <p>
+ * The log file contains record offsets at fixed block size : given a block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ * <p>
+ * The reader provides both sequential access, using the {@code readRecord()} method,
+ * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+class BlockLogReader<K extends Comparable<K>, V> implements Closeable
+{
+ static final int SIZE_OF_BLOCK_OFFSET = 4;
+
+ static final int SIZE_OF_RECORD_SIZE = 4;
+
+ /**
+ * Size of a block, after which an offset to the nearest record is written.
+ * <p>
+ * This value has been fixed based on performance tests. See
+ * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
+ * OPENDJ-1472</a> for details.
+ */
+ static final int BLOCK_SIZE = 256;
+
+ private final int blockSize;
+
+ private final RecordParser<K, V> parser;
+
+ private final RandomAccessFile reader;
+
+ private final File file;
+
+ /**
+ * Creates a reader for the provided file, file reader and parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param file
+ * The log file to read.
+ * @param reader
+ * The random access reader on the log file.
+ * @param parser
+ * The parser to decode the records read.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
+ final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
+ {
+ return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
+ }
+
+ /**
+ * Creates a reader for the provided file, file reader, parser and block size.
+ * <p>
+ * This method is intended for tests only, to allow tuning of the block size.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param file
+ * The log file to read.
+ * @param reader
+ * The random access reader on the log file.
+ * @param parser
+ * The parser to decode the records read.
+ * @param blockSize
+ * The size of each block, or frequency at which the record offset is
+ * present in the log file.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
+ final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
+ {
+ return new BlockLogReader<K, V>(file, reader, parser, blockSize);
+ }
+
+ private BlockLogReader(
+ final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
+ {
+ this.file = file;
+ this.reader = reader;
+ this.parser = parser;
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * Position the reader to the record corresponding to the provided key and
+ * matching and positioning strategies. Returns the last record read.
+ *
+ * @param key
+ * Key to use as a start position. Key must not be {@code null}.
+ * @param matchStrategy
+ * The key matching strategy.
+ * @param positionStrategy
+ * The positioning strategy.
+ * @return The pair (key_found, last_record_read). key_found is a boolean
+ * indicating if reader is successfully positioned. last_record_read
+ * is the last record that was read. When key_found is equals to
+ * {@code false}, then last_record_read is always {@code null}. When
+ * key_found is equals to {@code true}, last_record_read can be valued
+ * or be {@code null}
+ * @throws ChangelogException
+ * If an error occurs when seeking the key.
+ */
+ public Pair<Boolean, Record<K,V>> seekToRecord(
+ final K key,
+ final KeyMatchingStrategy matchStrategy,
+ final PositionStrategy positionStrategy)
+ throws ChangelogException
+ {
+ Reject.ifNull(key);
+ final long markerPosition = searchClosestBlockStartToKey(key);
+ if (markerPosition >= 0)
+ {
+ return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy);
+ }
+ return Pair.of(false, null);
+ }
+
+ /**
+ * Position the reader to the provided file position.
+ *
+ * @param filePosition
+ * offset from the beginning of the file, in bytes.
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ public void seekToPosition(final long filePosition) throws ChangelogException
+ {
+ try
+ {
+ reader.seek(filePosition);
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
+ }
+ }
+
+ /**
+ * Read a record from current position.
+ *
+ * @return the record read
+ * @throws ChangelogException
+ * If an error occurs during read.
+ */
+ public Record<K,V> readRecord() throws ChangelogException
+ {
+ return readRecord(-1);
+ }
+
+ /**
+ * Returns the file position for this reader.
+ *
+ * @return the position of reader on the log file
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ public long getFilePosition() throws ChangelogException
+ {
+ try
+ {
+ return reader.getFilePointer();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws IOException
+ {
+ reader.close();
+ }
+
+ /**
+ * Read a record, either from the provided start of block position or from
+ * the current position.
+ *
+ * @param blockStartPosition
+ * The position of start of block, where a record offset is written.
+ * If provided value is -1, then record is read from current position instead.
+ * @return the record read
+ * @throws ChangelogException
+ * If an error occurs during read.
+ */
+ private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
+ {
+ try
+ {
+ if (blockStartPosition != -1)
+ {
+ positionToRecordFromBlockStart(blockStartPosition);
+ }
+ final ByteString recordData = readNextRecord();
+ return recordData != null ? parser.decodeRecord(recordData) : null;
+ }
+ catch (Exception io)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
+ }
+ }
+
+ /**
+ * Position to the record given by the offset read from provided block
+ * start.
+ *
+ * @param blockStartPosition
+ * Position of read pointer in the file, expected to be the start of
+ * a block where a record offset is written.
+ * @throws IOException
+ * If an error occurs during read.
+ */
+ private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
+ {
+ reader.seek(blockStartPosition);
+ if (blockStartPosition > 0)
+ {
+ final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
+ reader.readFully(offsetData);
+ final int offsetToRecord = ByteString.wrap(offsetData).toInt();
+ if (offsetToRecord > 0)
+ {
+ reader.seek(blockStartPosition - offsetToRecord);
+ } // if offset is zero, reader is already well positioned
+ }
+ }
+
+ /**
+ * Reads the next record.
+ *
+ * @return the bytes of the next record, or {@code null} if no record is available
+ * @throws IOException
+ * If an error occurs while reading.
+ */
+ private ByteString readNextRecord() throws IOException
+ {
+ try
+ {
+ // read length of record
+ final long filePosition = reader.getFilePointer();
+ int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
+ final int recordLength = readRecordLength(distanceToBlockStart);
+
+ // read the record
+ long currentPosition = reader.getFilePointer();
+ distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
+ final ByteStringBuilder recordBytes =
+ new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
+ int remainingBytesToRead = recordLength;
+ while (distanceToBlockStart < remainingBytesToRead)
+ {
+ if (distanceToBlockStart != 0)
+ {
+ recordBytes.append(reader, distanceToBlockStart);
+ }
+ // skip the offset
+ reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+
+ // next step
+ currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
+ remainingBytesToRead -= distanceToBlockStart;
+ distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
+ }
+ if (remainingBytesToRead > 0)
+ {
+ // last bytes of the record
+ recordBytes.append(reader, remainingBytesToRead);
+ }
+ return recordBytes.toByteString();
+ }
+ catch (EOFException e)
+ {
+ // end of stream, no record or uncomplete record
+ return null;
+ }
+ }
+
+ /**
+ * Returns the total length in bytes taken by a record when stored in log file,
+ * including size taken by block offsets.
+ *
+ * @param recordLength
+ * The length of record to write.
+ * @param distanceToBlockStart
+ * Distance before the next block start.
+ * @return the length in bytes necessary to store the record in the log
+ */
+ int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
+ {
+ int totalLength = recordLength;
+ if (recordLength > distanceToBlockStart)
+ {
+ totalLength += SIZE_OF_BLOCK_OFFSET;
+ final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
+ totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
+ }
+ return totalLength;
+ }
+
+ /** Read the length of a record. */
+ private int readRecordLength(final int distanceToBlockStart) throws IOException
+ {
+ final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
+ if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
+ {
+ lengthBytes.append(reader, distanceToBlockStart);
+ // skip the offset
+ reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+ lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
+ return lengthBytes.toByteString().toInt();
+ }
+ else
+ {
+ if (distanceToBlockStart == 0)
+ {
+ // skip the offset
+ reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+ }
+ lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
+ return lengthBytes.toByteString().toInt();
+ }
+ }
+
+ /**
+ * Search the closest block start to the provided key, using binary search.
+ * <p>
+ * Note that position of reader is modified by this method.
+ *
+ * @param key
+ * The key to search
+ * @return the file position of block start that must be used to find the given key,
+ * or a negative number if no position could be found.
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ long searchClosestBlockStartToKey(K key) throws ChangelogException
+ {
+ final long maxPos = getFileLength() - 1;
+ long lowPos = 0L;
+ long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
+
+ while (lowPos <= highPos)
+ {
+ final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
+ final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
+ final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
+ if (middleRecord == null)
+ {
+ return -1;
+ }
+
+ final int keyComparison = middleRecord.getKey().compareTo(key);
+ if (keyComparison < 0)
+ {
+ if (middleBlockStartPos <= lowPos)
+ {
+ return lowPos;
+ }
+ lowPos = middleBlockStartPos;
+ }
+ else if (keyComparison > 0)
+ {
+ if (middleBlockStartPos >= highPos)
+ {
+ return highPos;
+ }
+ highPos = middleBlockStartPos;
+ }
+ else
+ {
+ return middleBlockStartPos;
+ }
+ }
+ // Unable to find a position where key can be found
+ return -1;
+ }
+
+ private long getFileLength() throws ChangelogException
+ {
+ try
+ {
+ return reader.length();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
+ }
+ }
+
+ /**
+ * Position before, at or after provided key, starting from provided block
+ * start position and reading sequentially until key is found according to
+ * matching and positioning strategies.
+ *
+ * @param blockStartPosition
+ * Position of read pointer in the file, expected to be the start of
+ * a block where a record offset is written
+ * @param key
+ * The key to find
+ * @param matchStrategy
+ * The key matching strategy
+ * @param positionStrategy
+ * The positioning strategy
+ * @return The pair ({@code true}, selected record) if reader is successfully
+ * positioned (selected record may be null if end of file is reached),
+ * ({@code false}, null) otherwise.
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ Pair<Boolean, Record<K,V>> positionToKeySequentially(final long blockStartPosition, final K key,
+ final KeyMatchingStrategy matchStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ Record<K,V> record = readRecord(blockStartPosition);
+ Record<K,V> previousRecord = null;
+ long previousPosition = blockStartPosition;
+ boolean matchingKeyIsLowerThanAnyRecord = true;
+ while (record != null)
+ {
+ final int keysComparison = record.getKey().compareTo(key);
+ if (keysComparison <= 0)
+ {
+ matchingKeyIsLowerThanAnyRecord = false;
+ }
+ if ((keysComparison == 0 && matchStrategy == EQUAL_TO_KEY)
+ || (keysComparison >= 0 && matchStrategy != EQUAL_TO_KEY))
+ {
+ return getMatchingRecord(matchStrategy, positionStrategy, keysComparison, matchingKeyIsLowerThanAnyRecord,
+ record, previousRecord, previousPosition);
+ }
+ previousRecord = record;
+ previousPosition = getFilePosition();
+ record = readRecord();
+ }
+
+ if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
+ {
+ return getRecordNoMatchForLessStrategy(positionStrategy, previousRecord, previousPosition);
+ }
+ return Pair.of(false, null);
+ }
+
+ private Pair<Boolean,Record<K,V>> getMatchingRecord(KeyMatchingStrategy matchStrategy,
+ PositionStrategy positionStrategy, int keysComparison, boolean matchKeyIsLowerThanAnyRecord,
+ Record<K, V> currentRecord, Record<K, V> previousRecord, long previousPosition)
+ throws ChangelogException
+ {
+ Record<K, V> record = currentRecord;
+
+ if (positionStrategy == AFTER_MATCHING_KEY)
+ {
+ if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && matchKeyIsLowerThanAnyRecord)
+ {
+ return Pair.of(false, null);
+ }
+ if (keysComparison == 0)
+ {
+ // skip matching key
+ record = readRecord();
+ }
+ }
+ else if (positionStrategy == ON_MATCHING_KEY && matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && keysComparison > 0)
+ {
+ seekToPosition(previousPosition);
+ return Pair.of(previousRecord != null, previousRecord);
+ }
+ return Pair.of(true, record);
+ }
+
+ private Pair<Boolean, Record<K, V>> getRecordNoMatchForLessStrategy(
+ final PositionStrategy positionStrategy, final Record<K, V> previousRecord, final long previousPosition)
+ throws ChangelogException
+ {
+ if (positionStrategy == ON_MATCHING_KEY)
+ {
+ seekToPosition(previousPosition);
+ return Pair.of(previousRecord != null, previousRecord);
+ }
+ else
+ {
+ return Pair.of(true, null);
+ }
+ }
+
+ /**
+ * Returns the closest start of block which has a position lower than or equal
+ * to the provided file position.
+ *
+ * @param filePosition
+ * The position of reader on file.
+ * @return the file position of the block start.
+ */
+ long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
+ {
+ final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+ return dist == 0 ? filePosition : filePosition + dist - blockSize;
+ }
+
+ /**
+ * Returns the closest start of block which has a position strictly
+ * higher than the provided file position.
+ *
+ * @param filePosition
+ * The position of reader on file.
+ * @return the file position of the block start.
+ */
+ long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
+ {
+ final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+ return dist == 0 ? filePosition + blockSize : filePosition + dist;
+ }
+
+ /**
+ * Returns the distance to next block for the provided file position.
+ *
+ * @param filePosition
+ * offset from the beginning of the file, in bytes.
+ * @param blockSize
+ * Size of each block in bytes.
+ * @return the distance to next block in bytes
+ */
+ static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
+ {
+ if (filePosition == 0)
+ {
+ return blockSize;
+ }
+ final int distance = (int) (filePosition % blockSize);
+ return distance == 0 ? 0 : blockSize - distance;
+ }
+
+ /**
+ * Check if the log file is valid, by reading the latest records at the end of
+ * the log file.
+ * <p>
+ * The intent of this method is to allow self-recovery in case a partial
+ * record as been written at the end of file (eg, after a server crash).
+ * <p>
+ * Any unexpected exception is considered as a severe error where
+ * self-recovery is not appropriate and thus will lead to a
+ * ChangelogException.
+ *
+ * @return -1 if log is valid, or a positive number if log is invalid, where
+ * the number represents the last valid position in the log file.
+ * @throws ChangelogException
+ * if an error occurs while checking the log
+ */
+ long checkLogIsValid() throws ChangelogException
+ {
+ try
+ {
+ final long fileSize = getFileLength();
+ final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
+ positionToRecordFromBlockStart(lastBlockStart);
+
+ long lastValidPosition = lastBlockStart;
+ for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
+ parser.decodeRecord(recordData);
+ lastValidPosition = reader.getFilePointer();
+ }
+
+ final boolean isFileValid = lastValidPosition == fileSize;
+ return isFileValid ? -1 : lastValidPosition;
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+ file.getPath(),
+ StaticUtils.stackTraceToSingleLineString(e)));
+ }
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
new file mode 100644
index 0000000..d0cd37c
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
@@ -0,0 +1,214 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+
+/**
+ * A log writer, using fixed-size blocks to allow fast retrieval when reading.
+ * <p>
+ * The log file contains record offsets at fixed block size : given block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+class BlockLogWriter<K extends Comparable<K>, V> implements Closeable
+{
+ private final int blockSize;
+
+ private final RecordParser<K, V> parser;
+
+ private final LogWriter writer;
+
+ /**
+ * Creates a writer for the provided log writer and parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param writer
+ * The writer on the log file.
+ * @param parser
+ * The parser to encode the records.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter(
+ final LogWriter writer, final RecordParser<K, V> parser)
+ {
+ return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE);
+ }
+
+ /**
+ * Creates a writer for the provided log writer, parser and size for blocks.
+ * <p>
+ * This method is intended for tests only, to allow tuning of the block size.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param writer
+ * The writer on the log file.
+ * @param parser
+ * The parser to encode the records.
+ * @param blockSize
+ * The size of each block, or frequency at which the record offset is
+ * present in the log file.
+ * @return a new log reader
+ */
+ static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests(
+ final LogWriter writer, final RecordParser<K, V> parser, final int blockSize)
+ {
+ return new BlockLogWriter<K, V>(writer, parser, blockSize);
+ }
+
+ /**
+ * Creates the writer with an underlying writer, a parser and a size for blocks.
+ *
+ * @param writer
+ * The writer to the log file.
+ * @param parser
+ * The parser to encode the records.
+ * @param blockSize
+ * The size of each block.
+ */
+ private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize)
+ {
+ Reject.ifNull(writer, parser);
+ this.writer = writer;
+ this.parser = parser;
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * Writes the provided record to the log file.
+ *
+ * @param record
+ * The record to write.
+ * @throws ChangelogException
+ * If a problem occurs during write.
+ */
+ public void write(final Record<K, V> record) throws ChangelogException
+ {
+ try
+ {
+ write(parser.encodeRecord(record));
+ writer.flush();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(),
+ writer.getFile().getPath()), e);
+ }
+ }
+
+ /**
+ * Returns the number of bytes written in the log file.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesWritten()
+ {
+ return writer.getBytesWritten();
+ }
+
+ /**
+ * Synchronize all modifications to the log file to the underlying device.
+ *
+ * @throws SyncFailedException
+ * If synchronization fails.
+ */
+ public void sync() throws SyncFailedException
+ {
+ writer.sync();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ writer.close();
+ }
+
+ /**
+ * Writes the provided byte string to the log file.
+ *
+ * @param record
+ * The value to write.
+ * @throws IOException
+ * If an error occurs while writing
+ */
+ private void write(final ByteString record) throws IOException
+ {
+ // Add length of record before writing
+ ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()).
+ append(record.length()).
+ append(record).
+ toByteString();
+
+ int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize);
+ int cumulatedDistanceToBeginning = distanceToBlockStart;
+ int dataPosition = 0;
+ int dataRemaining = data.length();
+ final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET;
+
+ while (distanceToBlockStart < dataRemaining)
+ {
+ if (distanceToBlockStart > 0)
+ {
+ // append part of record
+ final int dataEndPosition = dataPosition + distanceToBlockStart;
+ writer.write(data.subSequence(dataPosition, dataEndPosition));
+ dataPosition = dataEndPosition;
+ dataRemaining -= distanceToBlockStart;
+ }
+ // append the offset to the record
+ writer.write(ByteString.valueOf(cumulatedDistanceToBeginning));
+
+ // next step
+ distanceToBlockStart = dataSizeForOneBlock;
+ cumulatedDistanceToBeginning += blockSize;
+ }
+ // append the remaining bytes to finish the record
+ writer.write(data.subSequence(dataPosition, data.length()));
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java
new file mode 100644
index 0000000..897bb4a
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java
@@ -0,0 +1,72 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+
+/**
+ * Exception thrown when a record can't be decoded properly.
+ */
+public class DecodingException extends ChangelogException
+{
+ private static final long serialVersionUID = 5629692522662643737L;
+
+ /**
+ * Creates a new decoding exception with the provided information.
+ *
+ * @param message
+ * The message that explains the problem that occurred.
+ */
+ public DecodingException(LocalizableMessage message)
+ {
+ super(message);
+ }
+
+ /**
+ * Creates a new decoding exception with the provided information.
+ *
+ * @param cause
+ * The underlying cause that triggered this exception.
+ */
+ public DecodingException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ /**
+ * Creates a new decoding exception with the provided information.
+ *
+ * @param message
+ * The message that explains the problem that occurred.
+ * @param cause
+ * The underlying cause that triggered this exception.
+ */
+ public DecodingException(LocalizableMessage message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
new file mode 100644
index 0000000..e4ebf0e
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -0,0 +1,394 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.types.*;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * Implementation of a ChangeNumberIndexDB with a log.
+ * <p>
+ * This class publishes some monitoring information below <code>
+ * cn=monitor</code>.
+ */
+class FileChangeNumberIndexDB implements ChangeNumberIndexDB
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private static final int NO_KEY = 0;
+
+ /** The parser of records stored in this ChangeNumberIndexDB. */
+ static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
+
+ /** The log in which records are persisted. */
+ private final Log<Long, ChangeNumberIndexRecord> log;
+
+ /**
+ * The newest changenumber stored in the DB. It is used to avoid purging the
+ * record with the newest changenumber. The newest record in the changenumber
+ * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
+ * then retrieved on server startup.
+ */
+ private volatile long newestChangeNumber = NO_KEY;
+
+ /**
+ * The last generated value for the change number. It is kept separate from
+ * the {@link #newestChangeNumber} because there is an opportunity for a race
+ * condition between:
+ * <ol>
+ * <li>this atomic long being incremented for a new record ('recordB')</li>
+ * <li>the current newest record ('recordA') being purged from the DB</li>
+ * <li>'recordB' failing to be inserted in the DB</li>
+ * </ol>
+ */
+ private final AtomicLong lastGeneratedChangeNumber;
+
+ private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
+
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ /**
+ * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
+ *
+ * @param replicationEnv the Database Env to use to create the ReplicationServer DB.
+ * server for this domain.
+ * @throws ChangelogException If a database problem happened
+ */
+ FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
+ {
+ log = replicationEnv.getOrCreateCNIndexDB();
+ final ChangeNumberIndexRecord newestRecord = readLastRecord();
+ newestChangeNumber = getChangeNumber(newestRecord);
+ // initialization of the lastGeneratedChangeNumber from the DB content
+ // if DB is empty => last record does not exist => default to 0
+ lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
+
+ // Monitoring registration
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ DirectoryServer.registerMonitorProvider(dbMonitor);
+ }
+
+ private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
+ {
+ final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
+ return record == null ? null : record.getValue();
+ }
+
+ private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
+ {
+ final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
+ return record == null ? null : record.getValue();
+ }
+
+ private long getChangeNumber(final ChangeNumberIndexRecord record) throws ChangelogException
+ {
+ if (record != null)
+ {
+ return record.getChangeNumber();
+ }
+ return NO_KEY;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long addRecord(final ChangeNumberIndexRecord record) throws ChangelogException
+ {
+ final long changeNumber = nextChangeNumber();
+ final ChangeNumberIndexRecord newRecord =
+ new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN());
+ log.append(Record.from(newRecord.getChangeNumber(), newRecord));
+ newestChangeNumber = changeNumber;
+
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("In FileChangeNumberIndexDB.addRecord, added: " + newRecord);
+ }
+ return changeNumber;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException
+ {
+ return readFirstRecord();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException
+ {
+ return readLastRecord();
+ }
+
+ private long nextChangeNumber()
+ {
+ return lastGeneratedChangeNumber.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getLastGeneratedChangeNumber()
+ {
+ return lastGeneratedChangeNumber.get();
+ }
+
+ /**
+ * Get the number of changes.
+ *
+ * @return Returns the number of changes.
+ * @throws ChangelogException
+ * If a problem occurs.
+ */
+ long count() throws ChangelogException
+ {
+ return log.getNumberOfRecords();
+ }
+
+ /**
+ * Returns whether this database is empty.
+ *
+ * @return <code>true</code> if this database is empty, <code>false</code>
+ * otherwise
+ * @throws ChangelogException
+ * if a problem occurs.
+ */
+ boolean isEmpty() throws ChangelogException
+ {
+ return getNewestRecord() == null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
+ {
+ return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
+ }
+
+ /**
+ * Shutdown this DB.
+ */
+ void shutdown()
+ {
+ if (shutdown.compareAndSet(false, true))
+ {
+ log.close();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ }
+ }
+
+ /**
+ * Synchronously purges the change number index DB up to and excluding the
+ * provided timestamp.
+ *
+ * @param purgeCSN
+ * the timestamp up to which purging must happen
+ * @return the oldest non purged CSN.
+ * @throws ChangelogException
+ * if a database problem occurs.
+ */
+ CSN purgeUpTo(final CSN purgeCSN) throws ChangelogException
+ {
+ if (isEmpty() || purgeCSN == null)
+ {
+ return null;
+ }
+ final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
+ return record != null ? record.getValue().getCSN() : null;
+ }
+
+ /**
+ * Implements the Monitoring capabilities of the FileChangeNumberIndexDB.
+ */
+ private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
+ {
+ /** {@inheritDoc} */
+ @Override
+ public List<Attribute> getMonitorData()
+ {
+ final List<Attribute> attributes = new ArrayList<Attribute>();
+ attributes.add(createChangeNumberAttribute(true));
+ attributes.add(createChangeNumberAttribute(false));
+ long numberOfChanges = 0;
+ try
+ {
+ numberOfChanges = count();
+ }
+ catch (ChangelogException e)
+ {
+ logger.traceException(e);
+ }
+ attributes.add(Attributes.create("count", Long.toString(numberOfChanges)));
+ return attributes;
+ }
+
+ private Attribute createChangeNumberAttribute(final boolean isFirst)
+ {
+ final String attributeName = isFirst ? "first-draft-changenumber" : "last-draft-changenumber";
+ final String changeNumber = String.valueOf(readChangeNumber(isFirst));
+ return Attributes.create(attributeName, changeNumber);
+ }
+
+ private long readChangeNumber(final boolean isFirst)
+ {
+ try
+ {
+ return getChangeNumber(isFirst ? readFirstRecord() : readLastRecord());
+ }
+ catch (ChangelogException e)
+ {
+ logger.traceException(e);
+ return NO_KEY;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getMonitorInstanceName()
+ {
+ return "ChangeNumber Index Database";
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initializeMonitorProvider(MonitorProviderCfg configuration)
+ throws ConfigException, InitializationException
+ {
+ // Nothing to do for now
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + ", newestChangeNumber=" + newestChangeNumber;
+ }
+
+ /**
+ * Clear the changes from this DB (from both memory cache and DB storage).
+ *
+ * @throws ChangelogException
+ * if a database problem occurs.
+ */
+ public void clear() throws ChangelogException
+ {
+ log.clear();
+ newestChangeNumber = NO_KEY;
+ }
+
+ /** Parser of records persisted in the FileChangeNumberIndex log. */
+ private static class ChangeNumberIndexDBParser implements RecordParser<Long, ChangeNumberIndexRecord>
+ {
+ private static final byte STRING_SEPARATOR = 0;
+
+ @Override
+ public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
+ {
+ final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
+ return new ByteStringBuilder()
+ .append(record.getKey())
+ .append(cnIndexRecord.getBaseDN().toString())
+ .append(STRING_SEPARATOR)
+ .append(cnIndexRecord.getCSN().toByteString()).toByteString();
+ }
+
+ @Override
+ public Record<Long, ChangeNumberIndexRecord> decodeRecord(final ByteString data) throws DecodingException
+ {
+ try
+ {
+ ByteSequenceReader reader = data.asReader();
+ final long changeNumber = reader.getLong();
+ final DN baseDN = DN.valueOf(reader.getString(getNextStringLength(reader)));
+ reader.skip(1);
+ final CSN csn = CSN.valueOf(reader.getByteString(reader.remaining()));
+
+ return Record.from(changeNumber, new ChangeNumberIndexRecord(changeNumber, baseDN, csn));
+ }
+ catch (Exception e)
+ {
+ throw new DecodingException(e);
+ }
+ }
+
+ /** Returns the length of next string by looking for the zero byte used as separator. */
+ private int getNextStringLength(ByteSequenceReader reader)
+ {
+ int length = 0;
+ while (reader.peek(length) != STRING_SEPARATOR)
+ {
+ length++;
+ }
+ return length;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Long decodeKeyFromString(String key) throws ChangelogException
+ {
+ try
+ {
+ return Long.valueOf(key);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String encodeKeyToString(Long key)
+ {
+ return key.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Long getMaxKey()
+ {
+ return Long.MAX_VALUE;
+ }
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
new file mode 100644
index 0000000..d492576
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
@@ -0,0 +1,79 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+/**
+ * A cursor on ChangeNumberIndexDB.
+ *
+ * \@NotThreadSafe
+ */
+class FileChangeNumberIndexDBCursor implements DBCursor<ChangeNumberIndexRecord>
+{
+
+ /** The underlying cursor. */
+ private final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor;
+
+ /**
+ * Creates the cursor from provided cursor.
+ *
+ * @param cursor
+ * The underlying cursor to read log.
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor)
+ throws ChangelogException
+ {
+ this.cursor = cursor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexRecord getRecord()
+ {
+ final Record<Long, ChangeNumberIndexRecord> record = cursor.getRecord();
+ return record != null ? record.getValue() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ return cursor.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ cursor.close();
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index d735b38..4de0aba 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -21,17 +21,36 @@
* CDDL HEADER END
*
*
- * Copyright 2014 ForgeRock AS.
+ * Copyright 2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.file;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.forgerock.i18n.LocalizableMessageBuilder;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.backends.ChangelogBackend;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
@@ -40,43 +59,25 @@
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
+import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicaCursor;
import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
+
+import com.forgerock.opendj.util.Pair;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
/**
- * File-based implementation of the ChangelogDB interface.
+ * Log file implementation of the ChangelogDB interface.
*/
public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
{
-
- /**
- * Creates the changelog DB.
- *
- * @param replicationServer
- * replication server
- * @param cfg
- * configuration
- */
- public FileChangelogDB(ReplicationServer replicationServer,
- ReplicationServerCfg cfg)
- {
- throw new RuntimeException("Not implemented");
- }
-
- /** {@inheritDoc} */
- @Override
- public ServerState getDomainOldestCSNs(DN baseDN)
- {
- throw new RuntimeException("Not implemented");
- }
-
- /** {@inheritDoc} */
- @Override
- public ServerState getDomainNewestCSNs(DN baseDN)
- {
- throw new RuntimeException("Not implemented");
- }
-
/** {@inheritDoc} */
@Override
public long getDomainLatestTrimDate(DN baseDN)
@@ -84,136 +85,923 @@
throw new RuntimeException("Not implemented");
}
- /** {@inheritDoc} */
- @Override
- public void removeDomain(DN baseDN) throws ChangelogException
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ /**
+ * This map contains the List of updates received from each LDAP server.
+ * <p>
+ * When removing a domainMap, code:
+ * <ol>
+ * <li>first get the domainMap</li>
+ * <li>synchronized on the domainMap</li>
+ * <li>remove the domainMap</li>
+ * <li>then check it's not null</li>
+ * <li>then close all inside</li>
+ * </ol>
+ * When creating a replicaDB, synchronize on the domainMap to avoid
+ * concurrent shutdown.
+ */
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+ /**
+ * \@GuardedBy("itself")
+ */
+ private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+ new HashMap<DN, List<DomainDBCursor>>();
+ private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+ new CopyOnWriteArrayList<MultiDomainDBCursor>();
+ private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
+ private ReplicationEnvironment replicationEnv;
+ private final ReplicationServerCfg config;
+ private final File dbDirectory;
+
+ /**
+ * The handler of the changelog database, the database stores the relation
+ * between a change number and the associated cookie.
+ * <p>
+ * @GuardedBy("cnIndexDBLock")
+ */
+ private FileChangeNumberIndexDB cnIndexDB;
+ private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>();
+
+ /** Used for protecting {@link ChangeNumberIndexDB} related state. */
+ private final Object cnIndexDBLock = new Object();
+
+ /**
+ * The purge delay (in milliseconds). Records in the changelog DB that are
+ * older than this delay might be removed.
+ */
+ private long purgeDelayInMillis;
+ private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
+
+ /** The local replication server. */
+ private final ReplicationServer replicationServer;
+ private final AtomicBoolean shutdown = new AtomicBoolean();
+
+ private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
+ new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
+
+ /**
+ * Creates a new changelog DB.
+ *
+ * @param replicationServer
+ * the local replication server.
+ * @param config
+ * the replication server configuration
+ * @throws ConfigException
+ * if a problem occurs opening the supplied directory
+ */
+ public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+ throws ConfigException
{
- throw new RuntimeException("Not implemented");
+ this.config = config;
+ this.replicationServer = replicationServer;
+ this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
- /** {@inheritDoc} */
- @Override
- public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
- KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
- throws ChangelogException
+ private File makeDir(final String dbDirName) throws ConfigException
{
- throw new RuntimeException("Not implemented");
+ // Check that this path exists or create it.
+ final File dbDirectory = getFileForPath(dbDirName);
+ try
+ {
+ if (!dbDirectory.exists())
+ {
+ dbDirectory.mkdir();
+ }
+ return dbDirectory;
+ }
+ catch (Exception e)
+ {
+ final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(e.getLocalizedMessage()).append(" ")
+ .append(String.valueOf(dbDirectory));
+ throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
+ }
}
- /** {@inheritDoc} */
- @Override
- public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
- KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
- Set<DN> excludedDomainDns) throws ChangelogException
+ private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN)
{
- throw new RuntimeException("Not implemented");
+ final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
+ {
+ return domainMap;
+ }
+ return Collections.emptyMap();
}
- /** {@inheritDoc} */
- @Override
- public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState,
- KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
- throws ChangelogException
+ private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId)
{
- throw new RuntimeException("Not implemented");
+ return getDomainMap(baseDN).get(serverId);
}
- /** {@inheritDoc} */
- @Override
- public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
- CSN startCSN, KeyMatchingStrategy matchingStrategy,
- PositionStrategy positionStrategy) throws ChangelogException
+ /**
+ * Returns a {@link FileReplicaDB}, possibly creating it.
+ *
+ * @param baseDN
+ * the baseDN for which to create a ReplicaDB
+ * @param serverId
+ * the serverId for which to create a ReplicaDB
+ * @param server
+ * the ReplicationServer
+ * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
+ * @throws ChangelogException
+ * if a problem occurred with the database
+ */
+ Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+ final ReplicationServer server) throws ChangelogException
{
- throw new RuntimeException("Not implemented");
+ while (!shutdown.get())
+ {
+ final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+ final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+ if (result != null)
+ {
+ final Boolean dbWasCreated = result.getSecond();
+ if (dbWasCreated)
+ { // new replicaDB => update all cursors with it
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (DomainDBCursor cursor : cursors)
+ {
+ cursor.addReplicaDB(serverId, null);
+ }
+ }
+ }
+
+ return result;
+ }
+ }
+ throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
}
- /** {@inheritDoc} */
- @Override
- public void unregisterCursor(DBCursor<?> cursor)
+ private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
{
- throw new RuntimeException("Not implemented");
+ // happy path: the domainMap already exists
+ final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
+ if (currentValue != null)
+ {
+ return currentValue;
+ }
+
+ // unlucky, the domainMap does not exist: take the hit and create the
+ // newValue, even though the same could be done concurrently by another thread
+ final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<Integer, FileReplicaDB>();
+ final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+ if (previousValue != null)
+ {
+ // there was already a value associated to the key, let's use it
+ return previousValue;
+ }
+
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
+ return newValue;
}
- /** {@inheritDoc} */
- @Override
- public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
- throws ChangelogException
+ private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
+ final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
{
- throw new RuntimeException("Not implemented");
- }
+ // happy path: the replicaDB already exists
+ FileReplicaDB currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
- /** {@inheritDoc} */
- @Override
- public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
- throws ChangelogException
- {
- throw new RuntimeException("Not implemented");
+ // unlucky, the replicaDB does not exist: take the hit and synchronize
+ // on the domainMap to create a new ReplicaDB
+ synchronized (domainMap)
+ {
+ currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
+
+ if (domainToReplicaDBs.get(baseDN) != domainMap)
+ {
+ // The domainMap could have been concurrently removed because
+ // 1) a shutdown was initiated or 2) an initialize was called.
+ // Return will allow the code to:
+ // 1) shutdown properly or 2) lazily recreate the replicaDB
+ return null;
+ }
+
+ final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, replicationEnv);
+ domainMap.put(serverId, newDB);
+ return Pair.of(newDB, true);
+ }
}
/** {@inheritDoc} */
@Override
public void initializeDB()
{
- throw new RuntimeException("Not implemented");
+ try
+ {
+ final File dbDir = getFileForPath(config.getReplicationDBDirectory());
+ replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
+ final ChangelogState changelogState = replicationEnv.getChangelogState();
+ initializeToChangelogState(changelogState);
+ if (config.isComputeChangeNumber())
+ {
+ startIndexer(changelogState);
+ }
+ setPurgeDelay(replicationServer.getPurgeDelay());
+ }
+ catch (ChangelogException e)
+ {
+ logger.traceException(e);
+ logger.error(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()));
+ }
}
- /** {@inheritDoc} */
- @Override
- public void setPurgeDelay(long delayInMillis)
- {
- throw new RuntimeException("Not implemented");
- }
-
- /** {@inheritDoc} */
- @Override
- public void setComputeChangeNumber(boolean computeChangeNumber)
+ private void initializeToChangelogState(final ChangelogState changelogState)
throws ChangelogException
{
- throw new RuntimeException("Not implemented");
+ for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
+ {
+ replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
+ }
+ for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+ {
+ for (int serverId : entry.getValue())
+ {
+ getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
+ }
+ }
+ }
+
+ private void shutdownChangeNumberIndexDB() throws ChangelogException
+ {
+ synchronized (cnIndexDBLock)
+ {
+ if (cnIndexDB != null)
+ {
+ cnIndexDB.shutdown();
+ }
+ }
}
/** {@inheritDoc} */
@Override
public void shutdownDB() throws ChangelogException
{
- throw new RuntimeException("Not implemented");
+ if (!this.shutdown.compareAndSet(false, true))
+ { // shutdown has already been initiated
+ return;
+ }
+
+ // Remember the first exception because :
+ // - we want to try to remove everything we want to remove
+ // - then throw the first encountered exception
+ ChangelogException firstException = null;
+
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
+ if (indexer != null)
+ {
+ indexer.initiateShutdown();
+ }
+ final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ }
+
+ // wait for shutdown of the threads holding cursors
+ try
+ {
+ if (indexer != null)
+ {
+ indexer.join();
+ }
+ if (purger != null)
+ {
+ purger.join();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing: we are already shutting down
+ }
+
+ // now we can safely shutdown all DBs
+ try
+ {
+ shutdownChangeNumberIndexDB();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+ for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
+ this.domainToReplicaDBs.values().iterator(); it.hasNext();)
+ {
+ final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next();
+ synchronized (domainMap)
+ {
+ it.remove();
+ for (FileReplicaDB replicaDB : domainMap.values())
+ {
+ replicaDB.shutdown();
+ }
+ }
+ }
+ if (replicationEnv != null)
+ {
+ replicationEnv.shutdown();
+ }
+
+ if (firstException != null)
+ {
+ throw firstException;
+ }
+ }
+
+ /**
+ * Clears all records from the changelog (does not remove the changelog itself).
+ *
+ * @throws ChangelogException
+ * If an error occurs when clearing the changelog.
+ */
+ public void clearDB() throws ChangelogException
+ {
+ if (!dbDirectory.exists())
+ {
+ return;
+ }
+
+ // Remember the first exception because :
+ // - we want to try to remove everything we want to remove
+ // - then throw the first encountered exception
+ ChangelogException firstException = null;
+
+ for (DN baseDN : this.domainToReplicaDBs.keySet())
+ {
+ removeDomain(baseDN);
+ }
+
+ synchronized (cnIndexDBLock)
+ {
+ if (cnIndexDB != null)
+ {
+ try
+ {
+ cnIndexDB.clear();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+
+ try
+ {
+ shutdownChangeNumberIndexDB();
+ }
+ catch (ChangelogException e)
+ {
+ if (firstException == null)
+ {
+ firstException = e;
+ }
+ else if (logger.isTraceEnabled())
+ {
+ logger.traceException(e);
+ }
+ }
+
+ cnIndexDB = null;
+ }
+ }
+
+ if (firstException != null)
+ {
+ throw firstException;
+ }
}
/** {@inheritDoc} */
@Override
public void removeDB() throws ChangelogException
{
- throw new RuntimeException("Not implemented");
+ shutdownDB();
+ StaticUtils.recursiveDelete(dbDirectory);
}
/** {@inheritDoc} */
@Override
- public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
+ public ServerState getDomainOldestCSNs(DN baseDN)
+ {
+ final ServerState result = new ServerState();
+ for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
+ {
+ result.update(replicaDB.getOldestCSN());
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ServerState getDomainNewestCSNs(DN baseDN)
+ {
+ final ServerState result = new ServerState();
+ for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
+ {
+ result.update(replicaDB.getNewestCSN());
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void removeDomain(DN baseDN) throws ChangelogException
+ {
+ // Remember the first exception because :
+ // - we want to try to remove everything we want to remove
+ // - then throw the first encountered exception
+ ChangelogException firstException = null;
+
+ // 1- clear the replica DBs
+ Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
+ {
+ final ChangeNumberIndexer indexer = this.cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.clear(baseDN);
+ }
+ synchronized (domainMap)
+ {
+ domainMap = domainToReplicaDBs.remove(baseDN);
+ for (FileReplicaDB replicaDB : domainMap.values())
+ {
+ try
+ {
+ replicaDB.clear();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+ replicaDB.shutdown();
+ }
+ }
+ }
+
+
+ // 2- clear the changelogstate DB
+ try
+ {
+ replicationEnv.clearGenerationId(baseDN);
+ }
+ catch (ChangelogException e)
+ {
+ if (firstException == null)
+ {
+ firstException = e;
+ }
+ else if (logger.isTraceEnabled())
+ {
+ logger.traceException(e);
+ }
+ }
+
+ if (firstException != null)
+ {
+ throw firstException;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setPurgeDelay(final long purgeDelayInMillis)
+ {
+ this.purgeDelayInMillis = purgeDelayInMillis;
+ final ChangelogDBPurger purger;
+ if (purgeDelayInMillis > 0)
+ {
+ purger = new ChangelogDBPurger();
+ if (cnPurger.compareAndSet(null, purger))
+ {
+ purger.start();
+ } // otherwise a purger was already running
+ }
+ else
+ {
+ purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setComputeChangeNumber(final boolean computeChangeNumber)
throws ChangelogException
{
- throw new RuntimeException("Not implemented");
+ if (computeChangeNumber)
+ {
+ startIndexer(replicationEnv.getChangelogState());
+ }
+ else
+ {
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
+ if (indexer != null)
+ {
+ indexer.initiateShutdown();
+ }
+ }
+ }
+
+ private void startIndexer(final ChangelogState changelogState)
+ {
+ final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
+ if (cnIndexer.compareAndSet(null, indexer))
+ {
+ indexer.start();
+ }
}
/** {@inheritDoc} */
@Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
- throw new RuntimeException("Not implemented");
+ synchronized (cnIndexDBLock)
+ {
+ if (cnIndexDB == null)
+ {
+ try
+ {
+ cnIndexDB = new FileChangeNumberIndexDB(replicationEnv);
+ }
+ catch (Exception e)
+ {
+ logger.traceException(e);
+ logger.error(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
+ }
+ }
+ return cnIndexDB;
+ }
}
/** {@inheritDoc} */
@Override
public ReplicationDomainDB getReplicationDomainDB()
{
- throw new RuntimeException("Not implemented");
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+ KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ final Set<DN> excludedDomainDns = Collections.emptySet();
+ return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
+ final Set<DN> excludedDomainDns)
+ throws ChangelogException
+ {
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
+ registeredMultiDomainCursors.add(cursor);
+ for (DN baseDN : domainToReplicaDBs.keySet())
+ {
+ if (!excludedDomainDns.contains(baseDN))
+ {
+ cursor.addDomain(baseDN, startState.getServerState(baseDN));
+ }
+ }
+ return cursor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy);
+ for (int serverId : getDomainMap(baseDN).keySet())
+ {
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
+ cursor.addReplicaDB(serverId, lastCSN);
+ }
+ return cursor;
+ }
+
+ private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy)
+ {
+ synchronized (registeredDomainCursors)
+ {
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
+ List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+ if (cursors == null)
+ {
+ cursors = new ArrayList<DomainDBCursor>();
+ registeredDomainCursors.put(baseDN, cursors);
+ }
+ cursors.add(cursor);
+ return cursor;
+ }
+ }
+
+ private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+ {
+ final MultiDomainServerState offlineReplicas =
+ replicationEnv.getChangelogState().getOfflineReplicas();
+ final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+ if (offlineCSN != null
+ && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+ {
+ return offlineCSN;
+ }
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ if (replicaDB != null)
+ {
+ final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
+ final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+ final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+ synchronized (replicaCursors)
+ {
+ List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+ if (cursors == null)
+ {
+ cursors = new ArrayList<ReplicaCursor>();
+ replicaCursors.put(replicaID, cursors);
+ }
+ cursors.add(replicaCursor);
+ }
+
+ return replicaCursor;
+ }
+ return EMPTY_CURSOR_REPLICA_DB;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void unregisterCursor(final DBCursor<?> cursor)
+ {
+ if (cursor instanceof MultiDomainDBCursor)
+ {
+ registeredMultiDomainCursors.remove(cursor);
+ }
+ else if (cursor instanceof DomainDBCursor)
+ {
+ final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+ synchronized (registeredMultiDomainCursors)
+ {
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
+ }
+ else if (cursor instanceof ReplicaCursor)
+ {
+ final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+ synchronized (replicaCursors)
+ {
+ final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+ {
+ final CSN csn = updateMsg.getCSN();
+ final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+ csn.getServerId(), replicationServer);
+ final FileReplicaDB replicaDB = pair.getFirst();
+ replicaDB.add(updateMsg);
+
+ ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
+
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ notifyReplicaOnline(indexer, baseDN, csn.getServerId());
+ indexer.publishUpdateMsg(baseDN, updateMsg);
+ }
+ return pair.getSecond(); // replica DB was created
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
+ {
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
+ indexer.publishHeartbeat(baseDN, heartbeatCSN);
+ }
+ }
+
+ private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+ throws ChangelogException
+ {
+ if (indexer.isReplicaOffline(baseDN, serverId))
+ {
+ replicationEnv.notifyReplicaOnline(baseDN, serverId);
+ }
+ updateCursorsWithOfflineCSN(baseDN, serverId, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
+ {
+ replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.replicaOffline(baseDN, offlineCSN);
+ }
+ updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
+ }
+
+ private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
+ {
+ synchronized (replicaCursors)
+ {
+ final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
+ if (cursors != null)
+ {
+ for (ReplicaCursor cursor : cursors)
+ {
+ cursor.setOfflineCSN(offlineCSN);
+ }
+ }
+ }
}
/**
- * Clear the database.
+ * The thread purging the changelogDB on a regular interval. Records are
+ * purged from the changelogDB if they are older than a delay specified in
+ * seconds. The purge process works in two steps:
+ * <ol>
+ * <li>first purge the changeNumberIndexDB and retrieve information to drive
+ * replicaDBs purging</li>
+ * <li>proceed to purge each replicaDBs based on the information collected
+ * when purging the changeNumberIndexDB</li>
+ * </ol>
*/
- public void clearDB()
+ private final class ChangelogDBPurger extends DirectoryThread
{
- throw new RuntimeException("Not implemented");
- }
+ private static final int DEFAULT_SLEEP = 500;
+ protected ChangelogDBPurger()
+ {
+ super("changelog DB purger");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run()
+ {
+ // initialize CNIndexDB
+ getChangeNumberIndexDB();
+ while (!isShutdownInitiated())
+ {
+ try
+ {
+ final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+ final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+ final CSN oldestNotPurgedCSN;
+
+ // next code assumes that the compute-change-number config
+ // never changes during the life time of an RS
+ if (!config.isComputeChangeNumber())
+ {
+ oldestNotPurgedCSN = purgeCSN;
+ }
+ else
+ {
+ final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been initiated
+ return;
+ }
+
+ oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+ if (oldestNotPurgedCSN == null)
+ { // shutdown may have been initiated...
+ // ... or the change number index DB is empty,
+ // wait for new changes to come in.
+
+ // Note we cannot sleep for as long as the purge delay
+ // (3 days default), because we might receive late updates
+ // that will have to be purged before the purge delay elapses.
+ // This can particularly happen in case of network partitions.
+ jeFriendlySleep(DEFAULT_SLEEP);
+ continue;
+ }
+ }
+
+ for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
+ {
+ for (final FileReplicaDB replicaDB : domainMap.values())
+ {
+ replicaDB.purgeUpTo(oldestNotPurgedCSN);
+ }
+ }
+
+ jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown initiated?
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e)));
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ }
+ }
+ }
+
+ /**
+ * This method implements a sleep() that is friendly to Berkeley JE.
+ * <p>
+ * Originally, {@link Thread#sleep(long)} was used , but waking up a
+ * sleeping threads required calling {@link Thread#interrupt()}, and JE
+ * threw exceptions when invoked on interrupted threads.
+ * <p>
+ * The solution is to replace:
+ * <ol>
+ * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
+ * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
+ * </ol>
+ */
+ private void jeFriendlySleep(long millis) throws InterruptedException
+ {
+ if (!isShutdownInitiated())
+ {
+ synchronized (this)
+ {
+ if (!isShutdownInitiated())
+ {
+ wait(millis);
+ }
+ }
+ }
+ }
+
+ private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+ {
+ final long nextPurgeTime = notPurgedCSN.getTime();
+ final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+ if (currentPurgeTime <= nextPurgeTime)
+ {
+ // sleep till the next CSN to purge,
+ return nextPurgeTime - currentPurgeTime;
+ }
+ // wait a bit before purging more
+ return DEFAULT_SLEEP;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initiateShutdown()
+ {
+ super.initiateShutdown();
+ synchronized (this)
+ {
+ notify(); // wake up the purger thread for faster shutdown
+ }
+ }
+ }
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
new file mode 100644
index 0000000..69cd5ea
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -0,0 +1,409 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.ReplicationServerDomain;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.InitializationException;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * Represents a replication server database for one server in the topology.
+ * <p>
+ * It is responsible for efficiently saving the updates that is received from
+ * each master server into stable storage.
+ * <p>
+ * It is also able to generate a {@link DBCursor} that can be used to
+ * read all changes from a given {@link CSN}.
+ * <p>
+ * It publishes some monitoring information below cn=monitor.
+ */
+class FileReplicaDB
+{
+
+ /** The parser of records stored in Replica DB. */
+ static final RecordParser<CSN, UpdateMsg> RECORD_PARSER = new ReplicaDBParser();
+
+ /**
+ * Class that allows atomically setting oldest and newest CSNs without
+ * synchronization.
+ *
+ * @Immutable
+ */
+ private static final class CSNLimits
+ {
+ private final CSN oldestCSN;
+ private final CSN newestCSN;
+
+ public CSNLimits(CSN oldestCSN, CSN newestCSN)
+ {
+ this.oldestCSN = oldestCSN;
+ this.newestCSN = newestCSN;
+ }
+ }
+
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ /** The log in which records are persisted. */
+ private final Log<CSN, UpdateMsg> log;
+
+ /**
+ * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
+ *
+ * @NonNull
+ */
+ private volatile CSNLimits csnLimits;
+ private final int serverId;
+ private final DN baseDN;
+ private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
+ private final ReplicationServer replicationServer;
+ private final ReplicationEnvironment replicationEnv;
+
+ /**
+ * Creates a new ReplicaDB associated to a given LDAP server.
+ *
+ * @param serverId
+ * Id of this server.
+ * @param baseDN
+ * the replication domain baseDN.
+ * @param replicationServer
+ * The ReplicationServer that creates this ReplicaDB.
+ * @param replicationEnv
+ * the Database Env to use to create the ReplicationServer DB. server
+ * for this domain.
+ * @throws ChangelogException
+ * If a database problem happened
+ */
+ FileReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
+ final ReplicationEnvironment replicationEnv) throws ChangelogException
+ {
+ this.serverId = serverId;
+ this.baseDN = baseDN;
+ this.replicationServer = replicationServer;
+ this.replicationEnv = replicationEnv;
+ this.log = createLog(replicationEnv);
+ this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
+
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ DirectoryServer.registerMonitorProvider(dbMonitor);
+ }
+
+ private CSN readOldestCSN() throws ChangelogException
+ {
+ final Record<CSN, UpdateMsg> record = log.getOldestRecord();
+ return record == null ? null : record.getKey();
+ }
+
+ private CSN readNewestCSN() throws ChangelogException
+ {
+ final Record<CSN, UpdateMsg> record = log.getNewestRecord();
+ return record == null ? null : record.getKey();
+ }
+
+ private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
+ {
+ final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
+ return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
+ }
+
+ /**
+ * Adds a new message.
+ *
+ * @param updateMsg
+ * The update message to add.
+ * @throws ChangelogException
+ * If an error occurs when trying to add the message.
+ */
+ void add(final UpdateMsg updateMsg) throws ChangelogException
+ {
+ if (shutdown.get())
+ {
+ throw new ChangelogException(
+ ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
+ .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
+ }
+
+ log.append(Record.from(updateMsg.getCSN(), updateMsg));
+
+ final CSNLimits limits = csnLimits;
+ final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
+ final boolean updateOld = limits.oldestCSN == null;
+ if (updateOld || updateNew)
+ {
+ csnLimits = new CSNLimits(
+ updateOld ? updateMsg.getCSN() : limits.oldestCSN,
+ updateNew ? updateMsg.getCSN() : limits.newestCSN);
+ }
+ }
+
+ /**
+ * Get the oldest CSN that has not been purged yet.
+ *
+ * @return the oldest CSN that has not been purged yet.
+ */
+ CSN getOldestCSN()
+ {
+ return csnLimits.oldestCSN;
+ }
+
+ /**
+ * Get the newest CSN that has not been purged yet.
+ *
+ * @return the newest CSN that has not been purged yet.
+ */
+ CSN getNewestCSN()
+ {
+ return csnLimits.newestCSN;
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the update messages from this DB.
+ * The actual starting position is defined by the provided CSN, the key
+ * matching strategy and the positioning strategy.
+ *
+ * @param startCSN
+ * The position where the cursor must start. If null, start from the
+ * oldest CSN
+ * @param matchingStrategy
+ * Cursor key matching strategy
+ * @param positionStrategy
+ * Cursor position strategy
+ * @return a new {@link DBCursor} to retrieve update messages.
+ * @throws ChangelogException
+ * if a database problem happened
+ */
+ DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy);
+ return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
+ }
+
+ /**
+ * Shutdown this ReplicaDB.
+ */
+ void shutdown()
+ {
+ if (shutdown.compareAndSet(false, true))
+ {
+ log.close();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ }
+ }
+
+ /**
+ * Synchronously purge changes older than purgeCSN from this replicaDB.
+ *
+ * @param purgeCSN
+ * The CSN up to which changes can be purged. No purging happens when
+ * it is {@code null}.
+ * @throws ChangelogException
+ * In case of database problem.
+ */
+ void purgeUpTo(final CSN purgeCSN) throws ChangelogException
+ {
+ if (purgeCSN == null)
+ {
+ return;
+ }
+ final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
+ if (oldestRecord != null)
+ {
+ csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
+ }
+ }
+
+ /**
+ * Implements monitoring capabilities of the ReplicaDB.
+ */
+ private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
+ {
+ /** {@inheritDoc} */
+ @Override
+ public List<Attribute> getMonitorData()
+ {
+ final List<Attribute> attributes = new ArrayList<Attribute>();
+ create(attributes, "replicationServer-database",String.valueOf(serverId));
+ create(attributes, "domain-name", baseDN.toNormalizedString());
+ final CSNLimits limits = csnLimits;
+ if (limits.oldestCSN != null)
+ {
+ create(attributes, "first-change", encode(limits.oldestCSN));
+ }
+ if (limits.newestCSN != null)
+ {
+ create(attributes, "last-change", encode(limits.newestCSN));
+ }
+ return attributes;
+ }
+
+ private void create(final List<Attribute> attributes, final String name, final String value)
+ {
+ attributes.add(Attributes.create(name, value));
+ }
+
+ private String encode(final CSN csn)
+ {
+ return csn + " " + new Date(csn.getTime());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getMonitorInstanceName()
+ {
+ ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
+ return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initializeMonitorProvider(MonitorProviderCfg configuration)
+ throws ConfigException, InitializationException
+ {
+ // Nothing to do for now
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ final CSNLimits limits = csnLimits;
+ return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
+ + limits.oldestCSN + " " + limits.newestCSN;
+ }
+
+ /**
+ * Clear the changes from this DB, from both memory cache and persistent
+ * storage.
+ *
+ * @throws ChangelogException
+ * If an error occurs while removing the changes from the DB.
+ */
+ void clear() throws ChangelogException
+ {
+ // Remove all persisted data and reset generationId to default value
+ log.clear();
+ replicationEnv.resetGenerationId(baseDN);
+
+ csnLimits = new CSNLimits(null, null);
+ }
+
+ /**
+ * Return the number of records of this replicaDB.
+ * <p>
+ * For test purpose.
+ *
+ * @return The number of records of this replicaDB.
+ * @throws ChangelogException
+ * If an error occurs
+ */
+ long getNumberRecords() throws ChangelogException
+ {
+ return log.getNumberOfRecords();
+ }
+
+ /**
+ * Dump this DB as text files, intended for debugging purpose only.
+ *
+ * @throws ChangelogException
+ * If an error occurs during dump
+ */
+ void dumpAsTextFiles() throws ChangelogException {
+ log.dumpAsTextFile(log.getPath());
+ }
+
+ /** Parser of records persisted in the ReplicaDB log. */
+ private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
+ {
+
+ @Override
+ public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
+ {
+ final UpdateMsg message = record.getValue();
+ return ByteString.wrap(message.getBytes());
+ }
+
+ @Override
+ public Record<CSN, UpdateMsg> decodeRecord(final ByteString data) throws DecodingException
+ {
+ try
+ {
+ final UpdateMsg msg =
+ (UpdateMsg) UpdateMsg.generateMsg(data.toByteArray(), ProtocolVersion.REPLICATION_PROTOCOL_V7);
+ return Record.from(msg.getCSN(), msg);
+ }
+ catch (Exception e)
+ {
+ throw new DecodingException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CSN decodeKeyFromString(String key) throws ChangelogException
+ {
+ return new CSN(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String encodeKeyToString(CSN key)
+ {
+ return key.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CSN getMaxKey()
+ {
+ return CSN.MAX_CSN_VALUE;
+ }
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
new file mode 100644
index 0000000..d48b18e
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -0,0 +1,144 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
+
+/**
+ * A cursor on ReplicaDB, which can re-initialize itself after exhaustion.
+ * <p>
+ * The cursor provides a java.sql.ResultSet like API :
+ * <pre>
+ * FileReplicaDBCursor cursor = ...;
+ * try {
+ * while (cursor.next()) {
+ * Record record = cursor.getRecord();
+ * // ... can call cursor.getRecord() again: it will return the same result
+ * }
+ * }
+ * finally {
+ * close(cursor);
+ * }
+ * }
+ * </pre>
+ * <p>
+ * The cursor automatically re-initializes itself if it is exhausted: if a
+ * record is newly available, a subsequent call to the {@code next()} method will
+ * return {@code true} and the record will be available by calling {@code getRecord()}
+ * method.
+ *
+ * \@NotThreadSafe
+ */
+class FileReplicaDBCursor implements DBCursor<UpdateMsg>
+{
+ /** The underlying cursor. */
+ private final RepositionableCursor<CSN, UpdateMsg> cursor;
+
+ /** The next record to return. */
+ private Record<CSN, UpdateMsg> nextRecord;
+
+ /** The CSN to re-start with in case the cursor is exhausted. */
+ private CSN lastNonNullCurrentCSN;
+
+ private PositionStrategy positionStrategy;
+
+ /**
+ * Creates the cursor from provided log cursor and start CSN.
+ *
+ * @param cursor
+ * The underlying log cursor to read log.
+ * @param startCSN
+ * The CSN to use as a start point (excluded from cursor, the lowest
+ * CSN higher than this CSN is used as the real start point).
+ * @param positionStrategy
+ * Cursor position strategy, which allow to choose if cursor must
+ * start from the provided CSN or just after the provided CSN.
+ */
+ FileReplicaDBCursor(
+ final RepositionableCursor<CSN, UpdateMsg> cursor,
+ final CSN startCSN,
+ final PositionStrategy positionStrategy) {
+ this.cursor = cursor;
+ this.lastNonNullCurrentCSN = startCSN;
+ this.positionStrategy = positionStrategy;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return nextRecord == null ? null : nextRecord.getValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ if (cursor.next())
+ {
+ nextRecord = cursor.getRecord();
+ final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN);
+ if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY))
+ {
+ // start CSN is found, switch to position strategy that always find the next
+ lastNonNullCurrentCSN = nextRecord.getKey();
+ positionStrategy = AFTER_MATCHING_KEY;
+ return true;
+ }
+ }
+ // either cursor is exhausted or we still have not reached the start CSN
+ return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned();
+ }
+
+ /** Re-initialize the cursor after the last non null CSN. */
+ private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException
+ {
+ final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+ if (found && cursor.next())
+ {
+ nextRecord = cursor.getRecord();
+ lastNonNullCurrentCSN = nextRecord.getKey();
+ return true;
+ }
+ nextRecord = null;
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ cursor.close();
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
new file mode 100644
index 0000000..3ffc90f
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -0,0 +1,1187 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.Reject;
+import org.forgerock.util.Utils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A multi-file log that features appending key-value records and reading them
+ * using a {@code DBCursor}.
+ * The records must be appended to the log in ascending order of the keys.
+ * <p>
+ * A log is defined for a path - the log path - and contains one to many log files:
+ * <ul>
+ * <li>it has always at least one log file, the head log file, named "head.log",
+ * which is used to append the records.</li>
+ * <li>it may have from zero to many read-only log files, which are named after
+ * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively
+ * the string representation of lowest and highest key present in the log file.</li>
+ * </ul>
+ * A read-only log file is created each time the head log file has reached the
+ * maximum size limit. The head log file is then rotated to the read-only file and a
+ * new empty head log file is opened. There is no limit on the number of read-only
+ * files, but they can be purged.
+ * <p>
+ * A log is obtained using the {@code Log.openLog()} method and must always be
+ * released using the {@code close()} method.
+ * <p>
+ * Usage example:
+ * <pre>
+ * {@code
+ * Log<K, V> log = null;
+ * try
+ * {
+ * log = Log.openLog(logPath, parser, maxFileSize);
+ * log.append(key, value);
+ * DBCursor<K, V> cursor = log.getCursor(someKey);
+ * // use cursor, then close cursor
+ * }
+ * finally
+ * {
+ * log.close();
+ * }
+ * }
+ * </pre>
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+final class Log<K extends Comparable<K>, V> implements Closeable
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private static final String LOG_FILE_SUFFIX = ".log";
+
+ static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
+
+ private static final String LOG_FILE_NAME_SEPARATOR = "_";
+
+ private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) &&
+ !file.getName().equals(HEAD_LOG_FILE_NAME);
+ }
+ };
+
+ /** Map that holds the unique log instance for each log path. */
+ private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>();
+
+ /**
+ * The number of references on this log instance. It is incremented each time
+ * a log is opened on the same log path. The log is effectively closed only
+ * when the {@code close()} method is called and this value is at most 1.
+ */
+ private int referenceCount;
+
+ /** The path of directory for this log, where log files are stored. */
+ private final File logPath;
+
+ /** The parser used for encoding/decoding of records. */
+ private final RecordParser<K, V> recordParser;
+
+ /**
+ * Indicates if this log is closed. When the log is closed, all methods return
+ * immediately with no effect.
+ */
+ private boolean isClosed;
+
+ /**
+ * The log files contained in this log, ordered by key.
+ * <p>
+ * The head log file is always present and is associated with the maximum
+ * possible key, given by the record parser.
+ * <p>
+ * The read-only log files are associated with the highest key they contain.
+ */
+ private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>();
+
+ /**
+ * The last key appended to the log. In order to keep the ordering of the keys
+ * in the log, any attempt to append a record with a key lower or equal to
+ * this key is rejected (no error but an event is logged).
+ */
+ private K lastAppendedKey;
+
+ /**
+ * The list of non-empty cursors opened on this log. Opened cursors may have
+ * to be updated when rotating the head log file.
+ */
+ private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
+
+ /**
+ * A log file is rotated once it has exceeded this size limit. The log file can have
+ * a size much larger than this limit if the last record written has a huge size.
+ *
+ * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+ * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
+ */
+ private final long sizeLimitPerLogFileInBytes;
+
+ /**
+ * The exclusive lock used for writes and lifecycle operations on this log:
+ * initialize, clear, sync and close.
+ */
+ private final Lock exclusiveLock;
+
+ /**
+ * The shared lock used for reads and cursor operations on this log.
+ */
+ private final Lock sharedLock;
+
+ /**
+ * Open a log with the provided log path, record parser and maximum size per
+ * log file.
+ * <p>
+ * If no log exists for the provided path, a new one is created.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param logPath
+ * Path of the log.
+ * @param parser
+ * Parser for encoding/decoding of records.
+ * @param sizeLimitPerFileInBytes
+ * Limit in bytes before rotating the head log file of the log.
+ * @return a log
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
+ final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+ {
+ Reject.ifNull(logPath, parser);
+ @SuppressWarnings("unchecked")
+ Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
+ if (log == null)
+ {
+ log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+ logsCache.put(logPath, log);
+ }
+ else
+ {
+ // TODO : check that parser and size limit are compatible with the existing one,
+ // and issue a warning if it is not the case
+ log.referenceCount++;
+ }
+ return log;
+ }
+
+ /**
+ * Release a reference to the log corresponding to provided path. The log is
+ * closed if this is the last reference.
+ */
+ private static synchronized void releaseLog(final File logPath)
+ {
+ Log<?, ?> log = logsCache.get(logPath);
+ if (log == null)
+ {
+ // this should never happen
+ logger.error(ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
+ return;
+ }
+ if (log.referenceCount > 1)
+ {
+ log.referenceCount--;
+ }
+ else
+ {
+ log.doClose();
+ logsCache.remove(logPath);
+ }
+ }
+
+ /**
+ * Creates a new log.
+ *
+ * @param logPath
+ * The directory path of the log.
+ * @param parser
+ * Parser of records.
+ * @param sizeLimitPerFile
+ * Limit in bytes before rotating a log file.
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
+ throws ChangelogException
+ {
+ this.logPath = logPath;
+ this.recordParser = parser;
+ this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+ this.referenceCount = 1;
+
+ final ReadWriteLock lock = new ReentrantReadWriteLock(false);
+ this.exclusiveLock = lock.writeLock();
+ this.sharedLock = lock.readLock();
+
+ createOrOpenLogFiles();
+ }
+
+ /** Create or open log files used by this log. */
+ private void createOrOpenLogFiles() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ createRootDirIfNotExists();
+ openHeadLogFile();
+ for (final File file : getReadOnlyLogFiles())
+ {
+ openReadOnlyLogFile(file);
+ }
+ isClosed = false;
+ }
+ catch (ChangelogException e)
+ {
+ // ensure all log files opened at this point are closed
+ close();
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ private File[] getReadOnlyLogFiles() throws ChangelogException
+ {
+ File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
+ if (files == null)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
+ }
+ return files;
+ }
+
+ private void createRootDirIfNotExists() throws ChangelogException
+ {
+ if (!logPath.exists())
+ {
+ if (!logPath.mkdirs())
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
+ }
+ }
+ }
+
+ /**
+ * Returns the path of this log.
+ *
+ * @return the path of this log directory
+ */
+ public File getPath()
+ {
+ return logPath;
+ }
+
+ /**
+ * Add the provided record at the end of this log.
+ * <p>
+ * The record must have a key strictly higher than the key
+ * of the last record added. If it is not the case, the record is not
+ * appended and the method returns immediately.
+ * <p>
+ * In order to ensure that record is written out of buffers and persisted
+ * to file system, it is necessary to explicitely call the
+ * {@code syncToFileSystem()} method.
+ *
+ * @param record
+ * The record to add.
+ * @throws ChangelogException
+ * If an error occurs while adding the record to the log.
+ */
+ public void append(final Record<K, V> record) throws ChangelogException
+ {
+ // If this exclusive lock happens to be a bottleneck :
+ // 1. use a shared lock for appending the record first
+ // 2. switch to an exclusive lock only if rotation is needed
+ // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ if (recordIsBreakingKeyOrdering(record))
+ {
+ logger.info(LocalizableMessage.raw(
+ "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
+ lastAppendedKey != null ? lastAppendedKey : "null"));
+ return;
+ }
+ LogFile<K, V> headLogFile = getHeadLogFile();
+ if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+ {
+ logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+
+ rotateHeadLogFile();
+ headLogFile = getHeadLogFile();
+ }
+ headLogFile.append(record);
+ lastAppendedKey = record.getKey();
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /**
+ * Indicates if the provided record has a key that would break the key
+ * ordering in the log.
+ */
+ private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
+ {
+ return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
+ }
+
+ /**
+ * Synchronize all records added with the file system, ensuring that records
+ * are effectively persisted.
+ * <p>
+ * After a successful call to this method, it is guaranteed that all records
+ * added to the log are persisted to the file system.
+ *
+ * @throws ChangelogException
+ * If the synchronization fails.
+ */
+ public void syncToFileSystem() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ getHeadLogFile().syncToFileSystem();
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the first position.
+ *
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public RepositionableCursor<K, V> getCursor() throws ChangelogException
+ {
+ LogCursor<K, V> cursor = null;
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return new EmptyLogCursor<K, V>();
+ }
+ cursor = new LogCursor<K, V>(this);
+ cursor.positionTo(null, null, null);
+ registerCursor(cursor);
+ return cursor;
+ }
+ catch (ChangelogException e)
+ {
+ StaticUtils.close(cursor);
+ throw e;
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the position defined by the provided key.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, cursor will point at the first record of the log.
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
+ {
+ return getCursor(key, EQUAL_TO_KEY, ON_MATCHING_KEY);
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log. The
+ * starting position is defined by the provided key, cursor matching strategy
+ * and cursor positioning strategy.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, cursor will point at the first record of the log.
+ * @param matchingStrategy
+ * Cursor key matching strategy.
+ * @param positionStrategy
+ * The cursor positioning strategy.
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ if (key == null)
+ {
+ return getCursor();
+ }
+ LogCursor<K, V> cursor = null;
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return new EmptyLogCursor<K, V>();
+ }
+ cursor = new LogCursor<K, V>(this);
+ final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
+ // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
+ if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
+ {
+ registerCursor(cursor);
+ return cursor;
+ }
+ else
+ {
+ StaticUtils.close(cursor);
+ return new EmptyLogCursor<K, V>();
+ }
+ }
+ catch (ChangelogException e)
+ {
+ StaticUtils.close(cursor);
+ throw e;
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+
+ /**
+ * Returns the oldest (first) record from this log.
+ *
+ * @return the oldest record, which may be {@code null} if there is no record
+ * in the log.
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ public Record<K, V> getOldestRecord() throws ChangelogException
+ {
+ return getOldestLogFile().getOldestRecord();
+ }
+
+
+ /**
+ * Returns the newest (last) record from this log.
+ *
+ * @return the newest record, which may be {@code null}
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ public Record<K, V> getNewestRecord() throws ChangelogException
+ {
+ return getHeadLogFile().getNewestRecord();
+ }
+
+ /**
+ * Returns the number of records in the log.
+ *
+ * @return the number of records
+ * @throws ChangelogException
+ * If a problem occurs.
+ */
+ public long getNumberOfRecords() throws ChangelogException
+ {
+ long count = 0;
+ for (final LogFile<K, V> logFile : logFiles.values())
+ {
+ count += logFile.getNumberOfRecords();
+ }
+ return count;
+ }
+
+ /**
+ * Purge the log up to and excluding the provided key.
+ *
+ * @param purgeKey
+ * the key up to which purging must happen
+ * @return the oldest non purged record, or {@code null}
+ * if no record was purged
+ * @throws ChangelogException
+ * if a database problem occurs.
+ */
+ public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return null;
+ }
+ final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey);
+ if (logFilesToPurge.isEmpty())
+ {
+ return null;
+ }
+ final List<String> undeletableFiles = new ArrayList<String>();
+ final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
+ while (entriesToPurge.hasNext())
+ {
+ final LogFile<K, V> logFile = entriesToPurge.next().getValue();
+ try
+ {
+ logFile.close();
+ logFile.delete();
+ entriesToPurge.remove();
+ }
+ catch (ChangelogException e)
+ {
+ // The deletion of log file on file system has failed
+ undeletableFiles.add(logFile.getFile().getPath());
+ }
+ }
+ if (!undeletableFiles.isEmpty())
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
+ Utils.joinAsString(", ", undeletableFiles)));
+ }
+ return getOldestRecord();
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+
+ }
+
+ /**
+ * Empties the log, discarding all records it contains.
+ *
+ * @throws ChangelogException
+ * If cursors are opened on this log, or if a problem occurs during
+ * clearing operation.
+ */
+ public void clear() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ if (!openCursors.isEmpty())
+ {
+ // Allow opened cursors at this point, but turn them into empty cursors.
+ // This behavior is needed by the change number indexer thread.
+ switchCursorsOpenedIntoEmptyCursors();
+ }
+
+ // delete all log files
+ final List<String> undeletableFiles = new ArrayList<String>();
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ try
+ {
+ logFile.close();
+ logFile.delete();
+ }
+ catch (ChangelogException e)
+ {
+ undeletableFiles.add(logFile.getFile().getPath());
+ }
+ }
+ if (!undeletableFiles.isEmpty())
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
+ Utils.joinAsString(", ", undeletableFiles)));
+ }
+ logFiles.clear();
+
+ // recreate an empty head log file
+ openHeadLogFile();
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /**
+ * Dump this log as a text files, intended for debugging purpose only.
+ *
+ * @param dumpDirectory
+ * Directory that will contains log files with text format
+ * and ".txt" extensions
+ * @throws ChangelogException
+ * If an error occurs during dump
+ */
+ void dumpAsTextFile(File dumpDirectory) throws ChangelogException
+ {
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ releaseLog(logPath);
+ }
+
+ /** Effectively close this log. */
+ private void doClose()
+ {
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ if (!openCursors.isEmpty())
+ {
+ logger.error(ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
+ }
+ StaticUtils.close(logFiles.values());
+ isClosed = true;
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ private LogFile<K, V> getHeadLogFile()
+ {
+ return logFiles.lastEntry().getValue();
+ }
+
+ private LogFile<K, V> getOldestLogFile()
+ {
+ return logFiles.firstEntry().getValue();
+ }
+
+ /**
+ * Rotate the head log file to a read-only log file, and open a new empty head
+ * log file to write in.
+ * <p>
+ * All cursors opened on this log are temporarily disabled (closing underlying resources)
+ * and then re-open with their previous state.
+ */
+ private void rotateHeadLogFile() throws ChangelogException
+ {
+ // Temporarily disable cursors opened on head, saving their state
+ final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
+
+ final LogFile<K, V> headLogFile = getHeadLogFile();
+ final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
+ headLogFile.close();
+ renameHeadLogFileTo(readOnlyLogFile);
+
+ openHeadLogFile();
+ openReadOnlyLogFile(readOnlyLogFile);
+
+ // Re-enable cursors previously opened on head, with the saved state
+ updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
+ }
+
+ private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
+ {
+ final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
+ try
+ {
+ StaticUtils.renameFile(headLogFile, rotatedLogFile);
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
+ }
+ }
+
+ /**
+ * Returns the key bounds for the provided log file.
+ *
+ * @return the pair of (lowest key, highest key) that correspond to records
+ * stored in the corresponding log file.
+ * @throws ChangelogException
+ * if an error occurs while retrieving the keys
+ */
+ private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException
+ {
+ try
+ {
+ final String name = logFile.getFile().getName();
+ final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
+ .split(LOG_FILE_NAME_SEPARATOR);
+ return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
+ }
+ }
+
+ /**
+ * Returns the file name to use for the read-only version of the provided
+ * log file.
+ * <p>
+ * The file name is based on the lowest and highest key in the log file.
+ *
+ * @return the name to use for the read-only version of the log file
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException
+ {
+ final K lowestKey = logFile.getOldestRecord().getKey();
+ final K highestKey = logFile.getNewestRecord().getKey();
+ return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
+ + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
+ }
+
+ /** Update the cursors that were pointing to head after a rotation of the head log file. */
+ private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
+ throws ChangelogException
+ {
+ for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
+ {
+ final CursorState<K, V> cursorState = pair.getSecond();
+
+ // Need to update the cursor only if it is pointing to the head log file
+ if (isHeadLogFile(cursorState.logFile))
+ {
+ final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
+ final LogFile<K, V> logFile = findLogFileFor(previousKey);
+ final LogCursor<K, V> cursor = pair.getFirst();
+ cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
+ }
+ }
+ }
+
+ private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
+ {
+ for (LogCursor<K, V> cursor : openCursors)
+ {
+ cursor.actAsEmptyCursor();
+ }
+ openCursors.clear();
+ }
+
+ /**
+ * Disable the cursors opened on the head log file log, by closing their underlying cursor.
+ * Returns the state of each cursor just before the close operation.
+ *
+ * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
+ {
+ final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
+ new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
+ for (LogCursor<K, V> cursor : openCursors)
+ {
+ if (isHeadLogFile(cursor.currentLogFile))
+ {
+ openCursorsStates.add(Pair.of(cursor, cursor.getState()));
+ cursor.closeUnderlyingCursor();
+ }
+ }
+ return openCursorsStates;
+ }
+
+ private void openHeadLogFile() throws ChangelogException
+ {
+ final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser);
+ final Record<K,V> newestRecord = head.getNewestRecord();
+ lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
+ logFiles.put(recordParser.getMaxKey(), head);
+ }
+
+ private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
+ {
+ final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
+ final Pair<K, K> bounds = getKeyBounds(logFile);
+ logFiles.put(bounds.getSecond(), logFile);
+ }
+
+ private void registerCursor(final LogCursor<K, V> cursor)
+ {
+ openCursors.add(cursor);
+ }
+
+ private void unregisterCursor(final LogCursor<K, V> cursor)
+ {
+ openCursors.remove(cursor);
+ }
+
+ /**
+ * Returns the log file that is just after the provided log file wrt the order
+ * defined on keys, or {@code null} if the provided log file is the last one
+ * (the head log file).
+ */
+ private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
+ {
+ if (isHeadLogFile(currentLogFile))
+ {
+ return null;
+ }
+ final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+ return logFiles.higherEntry(bounds.getSecond()).getValue();
+ }
+
+ private boolean isHeadLogFile(final LogFile<K, V> logFile)
+ {
+ return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
+ }
+
+ /** Returns the log file that should contain the provided key. */
+ private LogFile<K, V> findLogFileFor(final K key)
+ {
+ if (key == null || logFiles.lowerKey(key) == null)
+ {
+ return getOldestLogFile();
+ }
+ return logFiles.ceilingEntry(key).getValue();
+ }
+
+ /**
+ * Represents a DB Cursor than can be repositioned on a given key.
+ * <p>
+ * Note that as a DBCursor, it provides a java.sql.ResultSet like API.
+ */
+ static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
+ {
+ /**
+ * Position the cursor to the record corresponding to the provided key and
+ * provided matching and positioning strategies.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, use the key of the first record instead.
+ * @param matchStrategy
+ * The cursor key matching strategy.
+ * @param positionStrategy
+ * The cursor positioning strategy.
+ * @return {@code true} if cursor is successfully positioned, or
+ * {@code false} otherwise.
+ * @throws ChangelogException
+ * If an error occurs when positioning cursor.
+ */
+ boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
+ throws ChangelogException;
+ }
+
+ /**
+ * Implements a cursor on the log.
+ * <p>
+ * The cursor uses the log shared lock to ensure reads are not done during a rotation.
+ * <p>
+ * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
+ * method.
+ */
+ private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+ {
+ private final Log<K, V> log;
+
+ private LogFile<K, V> currentLogFile;
+ private LogFileCursor<K, V> currentCursor;
+ private boolean actAsEmptyCursor;
+
+ /**
+ * Creates a cursor on the provided log.
+ *
+ * @param log
+ * The log on which the cursor read records.
+ * @throws ChangelogException
+ * If an error occurs when creating the cursor.
+ */
+ private LogCursor(final Log<K, V> log) throws ChangelogException
+ {
+ this.log = log;
+ this.actAsEmptyCursor = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Record<K, V> getRecord()
+ {
+ return currentCursor != null ? currentCursor.getRecord() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ if (actAsEmptyCursor)
+ {
+ return false;
+ }
+ log.sharedLock.lock();
+ try
+ {
+ final boolean hasNext = currentCursor.next();
+ if (hasNext)
+ {
+ return true;
+ }
+ final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+ if (logFile != null)
+ {
+ switchToLogFile(logFile);
+ return currentCursor.next();
+ }
+ return false;
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ log.sharedLock.lock();
+ try
+ {
+ StaticUtils.close(currentCursor);
+ log.unregisterCursor(this);
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionTo(
+ final K key,
+ final KeyMatchingStrategy matchStrategy,
+ final PositionStrategy positionStrategy)
+ throws ChangelogException
+ {
+ if (actAsEmptyCursor)
+ {
+ return false;
+ }
+ log.sharedLock.lock();
+ try
+ {
+ final LogFile<K, V> logFile = log.findLogFileFor(key);
+ if (logFile != currentLogFile)
+ {
+ switchToLogFile(logFile);
+ }
+ return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /** Returns the state of this cursor. */
+ private CursorState<K, V> getState() throws ChangelogException
+ {
+ return !actAsEmptyCursor ?
+ new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
+ }
+
+ private void closeUnderlyingCursor()
+ {
+ StaticUtils.close(currentCursor);
+ }
+
+ /** Reinitialize this cursor to the provided state. */
+ private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+ {
+ if (!actAsEmptyCursor)
+ {
+ currentLogFile = cursorState.logFile;
+ currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+ }
+ }
+
+ /** Turn this cursor into an empty cursor, with no actual resource used. */
+ private void actAsEmptyCursor()
+ {
+ currentLogFile = null;
+ currentCursor = null;
+ actAsEmptyCursor = true;
+ }
+
+ /** Switch the cursor to the provided log file. */
+ private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException
+ {
+ StaticUtils.close(currentCursor);
+ currentLogFile = logFile;
+ currentCursor = currentLogFile.getCursor();
+ }
+
+ /** {@inheritDoc} */
+ public String toString()
+ {
+ return actAsEmptyCursor ?
+ String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
+ String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+ log.logPath, currentLogFile.getFile().getName(), currentCursor);
+ }
+ }
+
+ /** An empty cursor, that always return null records and false to {@code next()} method. */
+ static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+ {
+ /** {@inheritDoc} */
+ @Override
+ public Record<K,V> getRecord()
+ {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next()
+ {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
+ {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ // nothing to do
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "EmptyLogCursor";
+ }
+ }
+
+ /**
+ * Represents the state of a cursor.
+ * <p>
+ * The state is used to update a cursor when rotating the head log file : the
+ * state of cursor on head log file must be reported to the new read-only log
+ * file that is created when rotating.
+ */
+ private static class CursorState<K extends Comparable<K>, V>
+ {
+ /** The log file. */
+ private final LogFile<K, V> logFile;
+
+ /**
+ * The position of the reader on the log file. It is the offset from the
+ * beginning of the file, in bytes, at which the next read occurs.
+ */
+ private final long filePosition;
+
+ /** The record the cursor is pointing to. */
+ private final Record<K,V> record;
+
+ private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
+ {
+ this.logFile = logFile;
+ this.filePosition = filePosition;
+ this.record = record;
+ }
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
new file mode 100644
index 0000000..9124076
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -0,0 +1,607 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A log file, containing part of a {@code Log}. The log file may be:
+ * <ul>
+ * <li>write-enabled : allowing to append key-value records and read records
+ * from cursors,</li>
+ * <li>read-only : allowing to read records from cursors.</li>
+ * </ul>
+ * <p>
+ * A log file is NOT intended to be used directly, but only has part of a
+ * {@code Log}. In particular, there is no concurrency management and no checks
+ * to ensure that log is not closed when performing any operation on it. Those
+ * are managed at the {@code Log} level.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+final class LogFile<K extends Comparable<K>, V> implements Closeable
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ /** The file containing the records. */
+ private final File logfile;
+
+ /** The pool to obtain a reader on the log. */
+ private final LogReaderPool<K, V> readerPool;
+
+ /**
+ * The writer on the log file, which may be {@code null} if log file is not
+ * write-enabled.
+ */
+ private final BlockLogWriter<K, V> writer;
+
+ /** Indicates if log is enabled for write. */
+ private final boolean isWriteEnabled;
+
+ /**
+ * Creates a new log file.
+ *
+ * @param logFilePath
+ * Path of the log file.
+ * @param parser
+ * Parser of records.
+ * @param isWriteEnabled
+ * {@code true} if this changelog is write-enabled, {@code false}
+ * otherwise.
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
+ throws ChangelogException
+ {
+ Reject.ifNull(logFilePath, parser);
+ this.logfile = logFilePath;
+ this.isWriteEnabled = isWriteEnabled;
+
+ createLogFileIfNotExists();
+ if (isWriteEnabled)
+ {
+ ensureLogFileIsValid(parser);
+ writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser);
+ }
+ else
+ {
+ writer = null;
+ }
+ readerPool = new LogReaderPool<K, V>(logfile, parser);
+ }
+
+ /**
+ * Creates a read-only log file with the provided root path and record parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param logFilePath
+ * Path of the log file.
+ * @param parser
+ * Parser of records.
+ * @return a read-only log file
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
+ final RecordParser<K, V> parser) throws ChangelogException
+ {
+ return new LogFile<K, V>(logFilePath, parser, false);
+ }
+
+ /**
+ * Creates a write-enabled log file that appends records to the end of file,
+ * with the provided root path and record parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param logFilePath
+ * Path of the log file.
+ * @param parser
+ * Parser of records.
+ * @return a write-enabled log file
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
+ final RecordParser<K, V> parser) throws ChangelogException
+ {
+ return new LogFile<K, V>(logFilePath, parser, true);
+ }
+
+ /**
+ * Returns the file containing the records.
+ *
+ * @return the file
+ */
+ File getFile()
+ {
+ return logfile;
+ }
+
+ private void checkLogIsEnabledForWrite() throws ChangelogException
+ {
+ if (!isWriteEnabled)
+ {
+ throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
+ }
+ }
+
+ private void createLogFileIfNotExists() throws ChangelogException
+ {
+ try
+ {
+ if (!logfile.exists())
+ {
+ logfile.createNewFile();
+ }
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE.get(logfile.getPath()), e);
+ }
+ }
+
+ /**
+ * Ensure that log file is not corrupted, by checking it is valid and cleaning
+ * the end of file if necessary, to remove a partially written record.
+ * <p>
+ * If log file is cleaned to remove a partially written record, then a message
+ * is logged for information.
+ *
+ * @throws ChangelogException
+ * If an error occurs or if log file is corrupted and can't be
+ * cleaned
+ */
+ private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
+ {
+ BlockLogReader<K, V> reader = null;
+ try
+ {
+ final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
+ reader = BlockLogReader.newReader(logfile, readerWriter, parser) ;
+ final long lastValidPosition = reader.checkLogIsValid();
+ if (lastValidPosition != -1)
+ {
+ // truncate the file to point where last valid record has been read
+ readerWriter.setLength(lastValidPosition);
+ logger.error(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath()));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+ logfile.getPath(),
+ StaticUtils.stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ /**
+ * Add the provided record at the end of this log.
+ * <p>
+ * In order to ensure that record is written out of buffers and persisted
+ * to file system, it is necessary to explicitely call the
+ * {@code syncToFileSystem()} method.
+ *
+ * @param record
+ * The record to add.
+ * @throws ChangelogException
+ * If the record can't be added to the log.
+ */
+ void append(final Record<K, V> record) throws ChangelogException
+ {
+ checkLogIsEnabledForWrite();
+ writer.write(record);
+ }
+
+ /**
+ * Dump this log file as a text file, intended for debugging purpose only.
+ *
+ * @param dumpFile
+ * File that will contains log records using a human-readable format
+ * @throws ChangelogException
+ * If an error occurs during dump
+ */
+ void dumpAsTextFile(File dumpFile) throws ChangelogException
+ {
+ DBCursor<Record<K, V>> cursor = getCursor();
+ BufferedWriter textWriter = null;
+ try
+ {
+ textWriter = new BufferedWriter(new FileWriter(dumpFile));
+ while (cursor.getRecord() != null)
+ {
+ Record<K, V> record = cursor.getRecord();
+ textWriter.write("key=" + record.getKey());
+ textWriter.write(" | ");
+ textWriter.write("value=" + record.getValue());
+ textWriter.write('\n');
+ cursor.next();
+ }
+ }
+ catch (IOException e)
+ {
+ // No I18N needed, used for debugging purpose only
+ throw new ChangelogException(
+ LocalizableMessage.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile),
+ e);
+ }
+ finally
+ {
+ StaticUtils.close(textWriter);
+ }
+ }
+
+ /**
+ * Synchronize all records added with the file system, ensuring that records
+ * are effectively persisted.
+ * <p>
+ * After a successful call to this method, it is guaranteed that all records
+ * added to the log are persisted to the file system.
+ *
+ * @throws ChangelogException
+ * If the synchronization fails.
+ */
+ void syncToFileSystem() throws ChangelogException
+ {
+ checkLogIsEnabledForWrite();
+ try
+ {
+ writer.sync();
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
+ }
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the first position.
+ *
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ LogFileCursor<K, V> getCursor() throws ChangelogException
+ {
+ return new LogFileCursor<K, V>(this);
+ }
+
+ /**
+ * Returns a cursor initialised to the provided record and position in file.
+ *
+ * @param record
+ * The initial record this cursor points on
+ * @param position
+ * The file position this cursor points on
+ * @return the cursor
+ * @throws ChangelogException
+ * If a problem occurs while creating the cursor.
+ */
+ LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
+ {
+ return new LogFileCursor<K, V>(this, record, position);
+ }
+
+ /**
+ * Returns the oldest (first) record from this log.
+ *
+ * @return the oldest record, which may be {@code null} if there is no record
+ * in the log.
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ Record<K, V> getOldestRecord() throws ChangelogException
+ {
+ DBCursor<Record<K, V>> cursor = null;
+ try
+ {
+ cursor = getCursor();
+ return cursor.next() ? cursor.getRecord() : null;
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ /**
+ * Returns the newest (last) record from this log.
+ *
+ * @return the newest record, which may be {@code null}
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ Record<K, V> getNewestRecord() throws ChangelogException
+ {
+ // TODO : need a more efficient way to retrieve it
+ DBCursor<Record<K, V>> cursor = null;
+ try
+ {
+ cursor = getCursor();
+ Record<K, V> record = null;
+ while (cursor.next())
+ {
+ record = cursor.getRecord();
+ }
+ return record;
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ /**
+ * Returns the number of records in the log.
+ *
+ * @return the number of records
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ long getNumberOfRecords() throws ChangelogException
+ {
+ // TODO : need a more efficient way to retrieve it
+ DBCursor<Record<K, V>> cursor = null;
+ try
+ {
+ cursor = getCursor();
+ long counter = 0L;
+ while (cursor.next())
+ {
+ counter++;
+ }
+ return counter;
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close()
+ {
+ if (isWriteEnabled)
+ {
+ try
+ {
+ syncToFileSystem();
+ }
+ catch (ChangelogException e)
+ {
+ logger.traceException(e);
+ }
+ writer.close();
+ }
+ readerPool.shutdown();
+ }
+
+ /**
+ * Delete this log file (file is physically removed). Should be called only
+ * when log file is closed.
+ *
+ * @throws ChangelogException
+ * If log file can't be deleted.
+ */
+ void delete() throws ChangelogException
+ {
+ final boolean isDeleted = logfile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+ }
+ }
+
+ /**
+ * Return the size of this log file in bytes.
+ *
+ * @return the size of log file
+ */
+ long getSizeInBytes()
+ {
+ return writer.getBytesWritten();
+ }
+
+ /** The path of this log file as a String. */
+ private String getPath()
+ {
+ return logfile.getPath();
+ }
+
+ /**
+ * Returns a reader for this log.
+ * <p>
+ * Assumes that calling methods ensure that log is not closed.
+ */
+ private BlockLogReader<K, V> getReader() throws ChangelogException
+ {
+ return readerPool.get();
+ }
+
+ /** Release the provided reader. */
+ private void releaseReader(BlockLogReader<K, V> reader) {
+ readerPool.release(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode()
+ {
+ return logfile.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object that)
+ {
+ if (this == that)
+ {
+ return true;
+ }
+ if (!(that instanceof LogFile))
+ {
+ return false;
+ }
+ final LogFile<?, ?> other = (LogFile<?, ?>) that;
+ return logfile.equals(other.logfile);
+ }
+
+ /** Implements a repositionable cursor on the log file. */
+ static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+ {
+ /** The underlying log on which entries are read. */
+ private final LogFile<K, V> logFile;
+
+ /** To read the records. */
+ private final BlockLogReader<K, V> reader;
+
+ /** The current available record, may be {@code null}. */
+ private Record<K,V> currentRecord;
+
+ /**
+ * The initial record when starting from a given key. It must be
+ * stored because it is read in advance.
+ */
+ private Record<K,V> initialRecord;
+
+ /**
+ * Creates a cursor on the provided log.
+ *
+ * @param logFile
+ * The log on which the cursor read records.
+ * @throws ChangelogException
+ * If an error occurs when creating the cursor.
+ */
+ private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
+ {
+ this.logFile = logFile;
+ this.reader = logFile.getReader();
+ }
+
+ /**
+ * Creates a cursor on the provided log, initialised to the provided record and
+ * pointing to the provided file position.
+ * <p>
+ * Note: there is no check to ensure that provided record and file position are
+ * consistent. It is the responsability of the caller of this method.
+ */
+ private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
+ {
+ this.logFile = logFile;
+ this.reader = logFile.getReader();
+ this.currentRecord = record;
+ reader.seekToPosition(filePosition);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ if (initialRecord != null)
+ {
+ // initial record is used only once
+ currentRecord = initialRecord;
+ initialRecord = null;
+ return true;
+ }
+ currentRecord = reader.readRecord();
+ return currentRecord != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Record<K,V> getRecord()
+ {
+ return currentRecord;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
+ throws ChangelogException {
+ final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
+ final boolean found = result.getFirst();
+ initialRecord = found ? result.getSecond() : null;
+ return found;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ logFile.releaseReader(reader);
+ }
+
+ /**
+ * Returns the file position this cursor is pointing at.
+ *
+ * @return the position of reader on the log file
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ long getFilePosition() throws ChangelogException
+ {
+ return reader.getFilePosition();
+ }
+
+ /** {@inheritDoc} */
+ public String toString()
+ {
+ return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
+ }
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
new file mode 100644
index 0000000..5300ffd
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
@@ -0,0 +1,118 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * A Pool of readers to a log file.
+
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+// TODO : implement a real pool - reusing readers instead of opening-closing them each time
+class LogReaderPool<K extends Comparable<K>, V>
+{
+ /** The file to read. */
+ private final File file;
+
+ private final RecordParser<K, V> parser;
+
+ /**
+ * Creates a pool of readers for provided file.
+ *
+ * @param file
+ * The file to read.
+ * @param parser
+ * The parser to decode the records read.
+ */
+ LogReaderPool(File file, RecordParser<K, V> parser)
+ {
+ this.file = file;
+ this.parser = parser;
+ }
+
+ /**
+ * Returns a random access reader on the provided file.
+ * <p>
+ * The acquired reader must be released with the {@code release()}
+ * method.
+ *
+ * @return a random access reader
+ * @throws ChangelogException
+ * If the file can't be found or read.
+ */
+ BlockLogReader<K, V> get() throws ChangelogException
+ {
+ return getReader(file);
+ }
+
+ /**
+ * Release the provided reader.
+ * <p>
+ * Once released, this reader must not be used any more.
+ *
+ * @param reader
+ * The random access reader to a file previously acquired with this
+ * pool.
+ */
+ void release(BlockLogReader<K, V> reader)
+ {
+ StaticUtils.close(reader);
+ }
+
+ /** Returns a random access file to read this log. */
+ private BlockLogReader<K, V> getReader(File file) throws ChangelogException
+ {
+ try
+ {
+ return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ;
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_READER_ON_LOG_FILE.get(file.getPath()), e);
+ }
+ }
+
+ /**
+ * Shutdown this pool, releasing all files handles opened
+ * on the file.
+ */
+ void shutdown()
+ {
+ // Nothing to do yet as no file handle is kept opened.
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
new file mode 100644
index 0000000..7ff6aa7
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -0,0 +1,151 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.SyncFailedException;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.loggers.MeteredStream;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * A writer on a log file.
+ */
+class LogWriter extends OutputStream
+{
+ /** The file to write in. */
+ private final File file;
+
+ /** The stream to write data in the file, capable of counting bytes written. */
+ private final MeteredStream stream;
+
+ /** The file descriptor on the file. */
+ private final FileDescriptor fileDescriptor;
+
+ /**
+ * Creates a writer on the provided file.
+ *
+ * @param file
+ * The file to write.
+ * @throws ChangelogException
+ * If a problem occurs at creation.
+ */
+ public LogWriter(final File file) throws ChangelogException
+ {
+ this.file = file;
+ try
+ {
+ FileOutputStream fos = new FileOutputStream(file, true);
+ this.stream = new MeteredStream(fos, file.length());
+ this.fileDescriptor = fos.getFD();
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
+ }
+ }
+
+ /**
+ * Returns the file used by this writer.
+ *
+ * @return the file
+ */
+ File getFile()
+ {
+ return file;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(int b) throws IOException
+ {
+ stream.write(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(byte[] b) throws IOException
+ {
+ stream.write(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ stream.write(b, off, len);
+ }
+
+ /**
+ * Writes the provided byte string to the underlying output stream of this writer.
+ *
+ * @param bs
+ * The byte string to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ public void write(ByteString bs) throws IOException
+ {
+ bs.copyTo(stream);
+ }
+
+ /**
+ * Returns the number of bytes written in the underlying file.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesWritten()
+ {
+ return stream.getBytesWritten();
+ }
+
+ /**
+ * Synchronize all modifications to the file to the underlying device.
+ *
+ * @throws SyncFailedException
+ * If synchronization fails.
+ */
+ void sync() throws SyncFailedException {
+ fileDescriptor.sync();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ StaticUtils.close(stream);
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
new file mode 100644
index 0000000..24b25d2
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
@@ -0,0 +1,130 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+/**
+ * Represents a record, which is a pair of key-value.
+ *
+ * @param <K>
+ * The type of a key.
+ * @param <V>
+ * The type of a value.
+ */
+class Record<K, V>
+{
+ private final K key;
+ private final V value;
+
+ /**
+ * Creates a record from provided key and value.
+ *
+ * @param key
+ * The key.
+ * @param value
+ * The value.
+ */
+ private Record(final K key, final V value)
+ {
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Create a record from provided key and value.
+ *
+ * @param <K>
+ * The type of the key.
+ * @param <V>
+ * The type of the value.
+ * @param key
+ * The key.
+ * @param value
+ * The value.
+ * @return a record
+ */
+ static <K, V> Record<K, V> from(final K key, final V value) {
+ return new Record<K, V>(key, value);
+ }
+
+ /**
+ * Returns the key of this record.
+ *
+ * @return the key
+ */
+ K getKey()
+ {
+ return key;
+ }
+
+ /**
+ * Returns the value of this record.
+ *
+ * @return the value
+ */
+ V getValue()
+ {
+ return value;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((key == null) ? 0 : key.hashCode());
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object that)
+ {
+ if (this == that)
+ {
+ return true;
+ }
+ if (!(that instanceof Record))
+ {
+ return false;
+ }
+ Record<?, ?> other = (Record<?, ?>) that;
+ final boolean keyEquals = key == null ? other.key == null : key.equals(other.key);
+ if (!keyEquals)
+ {
+ return false;
+ }
+ return value == null ? other.value == null : value.equals(other.value);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return "Record [" + key + ":" + value + "]";
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
new file mode 100644
index 0000000..b62d80d
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
@@ -0,0 +1,101 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+
+/**
+ * Parser of a log record.
+ * <p>
+ * The parser allows to convert from a object to its binary representation
+ * and to convert back from the binary representation to the object.
+ *
+ * @param <K>
+ * Type of the key of the record.
+ * @param <V>
+ * Type of the value of the record.
+ */
+interface RecordParser<K, V>
+{
+ /**
+ * Decode a record from the provided byte array.
+ * <p>
+ * The record is expected to have been encoded using the {@code writeRecord()}
+ * method.
+ *
+ * @param data
+ * The raw data to read the record from.
+ * @return the decoded record, or {@code null} if there is no more record to
+ * read, or only an incomplete record
+ * @throws DecodingException
+ * If an error occurs while decoding the record.
+ */
+ Record<K, V> decodeRecord(ByteString data) throws DecodingException;
+
+ /**
+ * Encode the provided record to a byte array.
+ * <p>
+ * The returned array is intended to be stored as provided in the log file.
+ *
+ * @param record
+ * The record to encode.
+ * @return the bytes array representing the (key,value) record
+ */
+ ByteString encodeRecord(Record<K, V> record);
+
+ /**
+ * Read the key from the provided string.
+ *
+ * @param key
+ * The string representation of key, suitable for use in a filename,
+ * as written by the {@code encodeKeyToString()} method.
+ * @return the key
+ * @throws ChangelogException
+ * If key can't be read from the string.
+ */
+ K decodeKeyFromString(String key) throws ChangelogException;
+
+ /**
+ * Returns the provided key as a string that is suitable to be used in a
+ * filename.
+ *
+ * @param key
+ * The key of a record.
+ * @return a string encoding the key, unambiguously decodable to the original
+ * key, and suitable for use in a filename. The string should contain
+ * only ASCII characters and no space.
+ */
+ String encodeKeyToString(K key);
+
+ /**
+ * Returns a key that is guaranted to be always higher than any other key.
+ *
+ * @return the highest possible key
+ */
+ K getMaxKey();
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
new file mode 100644
index 0000000..624cda1
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -0,0 +1,860 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * Represents the replication environment, which allows to manage the lifecycle
+ * of the replication changelog.
+ * <p>
+ * A changelog has a root directory, under which the following directories and files are
+ * created :
+ * <ul>
+ * <li>A "changenumberindex" directory containing the log files for
+ * ChangeNumberIndexDB</li>
+ * <li>A "domains.state" file containing a mapping of each domain DN to an id. The
+ * id is used to name the corresponding domain directory.</li>
+ * <li>One directory per domain, named after "[id].domain" where [id] is the id
+ * assigned to the domain, as specified in the "domains.state" file.</li>
+ * </ul>
+ * <p>
+ * Each domain directory contains the following directories and files :
+ * <ul>
+ * <li>A "generation_[id].id" file, where [id] is the generation id</li>
+ * <li>One directory per server id, named after "[id].server" where [id] is the
+ * id of the server.</li>
+ * </ul>
+ * Each server id directory contains the following files :
+ * <ul>
+ * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
+ * <li>Zero to many read-only log files named after the lowest key
+ * and highest key present in the log file (they all end with the ".log" suffix.</li>
+ * <li>Optionally, a "offline.state" file that indicates that this particular server id
+ * of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
+ * </ul>
+ * See {@code Log} class for details on the log files.
+ *
+ * <p>
+ * Layout example with two domains "o=test1" and "o=test2", each having server
+ * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
+ *
+ * <pre>
+ * +---changelog
+ * | \---domains.state [contains mapping: 1 => "o=test1", 2 => "o=test2"]
+ * | \---changenumberindex
+ * | \--- head.log [contains last records written]
+ * | \--- 1_50.log [contains records with keys in interval [1, 50]]
+ * | \---1.domain
+ * | \---generation1.id
+ * | \---22.server
+ * | \---head.log [contains last records written]
+ * | \---33.server
+ * | \---head.log [contains last records written]
+ * \---offline.state
+ * | \---2.domain
+ * | \---generation1.id
+ * | \---22.server
+ * | \---head.log [contains last records written]
+ * | \---33.server
+ * | \---head.log [contains last records written]
+ * </pre>
+ */
+class ReplicationEnvironment
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024;
+
+ private static final int NO_GENERATION_ID = -1;
+
+ private static final String CN_INDEX_DB_DIRNAME = "changenumberindex";
+
+ private static final String DOMAINS_STATE_FILENAME = "domains.state";
+
+ static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
+
+ private static final String DOMAIN_STATE_SEPARATOR = ":";
+
+ private static final String DOMAIN_SUFFIX = ".domain";
+
+ private static final String SERVER_ID_SUFFIX = ".server";
+
+ private static final String GENERATION_ID_FILE_PREFIX = "generation";
+
+ private static final String GENERATION_ID_FILE_SUFFIX = ".id";
+
+ private static final String UTF8_ENCODING = "UTF-8";
+
+ private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isDirectory() && file.getName().endsWith(DOMAIN_SUFFIX);
+ }
+ };
+
+ private static final FileFilter SERVER_ID_FILE_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isDirectory() && file.getName().endsWith(SERVER_ID_SUFFIX);
+ }
+ };
+
+ private static final FileFilter GENERATION_ID_FILE_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isFile()
+ && file.getName().startsWith(GENERATION_ID_FILE_PREFIX)
+ && file.getName().endsWith(GENERATION_ID_FILE_SUFFIX);
+ }
+ };
+
+ /** Root path where the replication log is stored. */
+ private final String replicationRootPath;
+ /**
+ * The current changelogState. This is in-memory version of what is inside the
+ * on-disk changelogStateDB. It improves performances in case the
+ * changelogState is read often.
+ *
+ * @GuardedBy("domainsLock")
+ */
+ private final ChangelogState changelogState;
+
+ /** The list of logs that are in use. */
+ private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
+
+ /**
+ * Maps each domain DN to a domain id that is used to name directory in file system.
+ *
+ * @GuardedBy("domainsLock")
+ */
+ private final Map<DN, String> domains = new HashMap<DN, String>();
+
+ /**
+ * Exclusive lock to synchronize:
+ * <ul>
+ * <li>the domains mapping</li>
+ * <li>changes to the in-memory changelogState</li>
+ * <li>changes to the on-disk state of a domain</li>
+ */
+ private final Object domainsLock = new Object();
+
+ /** The underlying replication server. */
+ private final ReplicationServer replicationServer;
+
+ private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+
+ /**
+ * Creates the replication environment.
+ *
+ * @param rootPath
+ * Root path where replication log is stored.
+ * @param replicationServer
+ * The underlying replication server.
+ * @throws ChangelogException
+ * If an error occurs during initialization.
+ */
+ ReplicationEnvironment(final String rootPath,
+ final ReplicationServer replicationServer) throws ChangelogException
+ {
+ this.replicationRootPath = rootPath;
+ this.replicationServer = replicationServer;
+ this.changelogState = readOnDiskChangelogState();
+ }
+
+ /**
+ * Returns the state of the replication changelog.
+ *
+ * @return the {@link ChangelogState} read from the changelogState DB
+ * @throws ChangelogException
+ * if a database problem occurs
+ */
+ ChangelogState readOnDiskChangelogState() throws ChangelogException
+ {
+ final ChangelogState state = new ChangelogState();
+ final File changelogPath = new File(replicationRootPath);
+ synchronized (domainsLock)
+ {
+ readDomainsStateFile();
+ checkDomainDirectories(changelogPath);
+ for (final Entry<DN, String> domainEntry : domains.entrySet())
+ {
+ readStateForDomain(domainEntry, state);
+ }
+ }
+ return state;
+ }
+
+ /**
+ * Returns the current state of the replication changelog.
+ *
+ * @return the current {@link ChangelogState}
+ */
+ ChangelogState getChangelogState()
+ {
+ return changelogState;
+ }
+
+ /**
+ * Finds or creates the log used to store changes from the replication server
+ * with the given serverId and the given baseDN.
+ *
+ * @param domainDN
+ * The DN that identifies the domain.
+ * @param serverId
+ * The server id that identifies the server.
+ * @param generationId
+ * The generationId associated to this domain.
+ * @return the log.
+ * @throws ChangelogException
+ * if an error occurs.
+ */
+ Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
+ throws ChangelogException
+ {
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("ReplicationEnvironment.getOrCreateReplicaDB(%s, %s, %s)", domainDN, serverId, generationId);
+ }
+
+ try
+ {
+ ensureRootDirectoryExists();
+
+ String domainId = null;
+ synchronized (domainsLock)
+ {
+ domainId = domains.get(domainDN);
+ if (domainId == null)
+ {
+ domainId = createDomainId(domainDN);
+ }
+
+ final File serverIdPath = getServerIdPath(domainId, serverId);
+ ensureServerIdDirectoryExists(serverIdPath);
+ changelogState.addServerIdToDomain(serverId, domainDN);
+
+ final File generationIdPath = getGenerationIdPath(domainId, generationId);
+ ensureGenerationIdFileExists(generationIdPath);
+ changelogState.setDomainGenerationId(domainDN, generationId);
+
+ return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_REPLICA_DB.get(domainDN.toString(), serverId, generationId), e);
+ }
+ }
+
+ /**
+ * Find or create the log to manage integer change number associated to
+ * multidomain server state.
+ * <p>
+ * TODO: ECL how to manage compatibility of this db
+ * with new domains added or removed ?
+ *
+ * @return the log.
+ * @throws ChangelogException
+ * when a problem occurs.
+ */
+ Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
+ {
+ final File path = getCNIndexDBPath();
+ try
+ {
+ return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER);
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_CN_INDEX_DB.get(replicationRootPath, path.getPath()), e);
+ }
+ }
+
+ /**
+ * Shutdown the environment.
+ * <p>
+ * The log DBs are not closed by this method. It assumes they are already
+ * closed.
+ */
+ void shutdown()
+ {
+ if (isShuttingDown.compareAndSet(false, true))
+ {
+ logs.clear();
+ }
+ }
+
+ /**
+ * Clears the generated id associated to the provided domain DN from the state
+ * Db.
+ * <p>
+ * If generation id can't be found, it is not considered as an error, the
+ * method will just return.
+ *
+ * @param domainDN
+ * The domain DN for which the generationID must be cleared.
+ * @throws ChangelogException
+ * If a problem occurs during clearing.
+ */
+ void clearGenerationId(final DN domainDN) throws ChangelogException
+ {
+ synchronized (domainsLock)
+ {
+ final String domainId = domains.get(domainDN);
+ if (domainId == null)
+ {
+ return; // unknown domain => no-op
+ }
+ final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
+ if (idFile != null)
+ {
+ final boolean isDeleted = idFile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
+ }
+ }
+ changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
+ }
+ }
+
+ /**
+ * Reset the generationId to the default value used when there is no
+ * generation id.
+ *
+ * @param baseDN
+ * The baseDN for which the generationID must be reset.
+ * @throws ChangelogException
+ * If a problem occurs during reset.
+ */
+ void resetGenerationId(final DN baseDN) throws ChangelogException
+ {
+ synchronized (domainsLock)
+ {
+ clearGenerationId(baseDN);
+ final String domainId = domains.get(baseDN);
+ if (domainId == null)
+ {
+ return; // unknown domain => no-op
+ }
+ final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
+ ensureGenerationIdFileExists(generationIdPath);
+ changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
+ }
+ }
+
+ /**
+ * Notify that the replica corresponding to provided domain and provided CSN
+ * is offline.
+ *
+ * @param domainDN
+ * the domain of the offline replica
+ * @param offlineCSN
+ * the offline replica serverId and offline timestamp
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
+ {
+ synchronized (domainsLock)
+ {
+ final String domainId = domains.get(domainDN);
+ if (domainId == null)
+ {
+ return; // unknown domain => no-op
+ }
+ final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
+ if (!serverIdPath.exists())
+ {
+ return; // no serverId anymore => no-op
+ }
+ final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+ Writer writer = null;
+ try
+ {
+ // Overwrite file, only the last sent offline CSN is kept
+ writer = newFileWriter(offlineFile);
+ writer.write(offlineCSN.toString());
+ changelogState.addOfflineReplica(domainDN, offlineCSN);
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
+ }
+ finally
+ {
+ StaticUtils.close(writer);
+ }
+ }
+ }
+
+ /**
+ * Notify that the replica corresponding to provided domain and server id
+ * is online.
+ *
+ * @param domainDN
+ * the domain of the replica
+ * @param serverId
+ * the replica serverId
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
+ {
+ synchronized (domainsLock)
+ {
+ final String domainId = domains.get(domainDN);
+ if (domainId == null)
+ {
+ return; // unknown domain => no-op
+ }
+ final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
+ if (offlineFile.exists())
+ {
+ final boolean isDeleted = offlineFile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
+ offlineFile.getPath(), domainDN.toString(), serverId));
+ }
+ }
+ changelogState.removeOfflineReplica(domainDN, serverId);
+ }
+ }
+
+ /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
+ private void readDomainsStateFile() throws ChangelogException
+ {
+ final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
+ if (domainsStateFile.exists())
+ {
+ BufferedReader reader = null;
+ String line = null;
+ try
+ {
+ reader = newFileReader(domainsStateFile);
+ while ((line = reader.readLine()) != null)
+ {
+ final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
+ final String domainId = line.substring(0, separatorPos);
+ final DN domainDN = DN.valueOf(line.substring(separatorPos+1));
+ domains.put(domainDN, domainId);
+ }
+ }
+ catch(DirectoryException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE.get(
+ domainsStateFile.getPath(), line), e);
+ }
+ catch(Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE.get(
+ domainsStateFile.getPath()), e);
+ }
+ finally {
+ StaticUtils.close(reader);
+ }
+ }
+ }
+
+ /**
+ * Checks that domain directories in file system are consistent with
+ * information from domains mapping.
+ */
+ private void checkDomainDirectories(final File changelogPath) throws ChangelogException
+ {
+ final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
+ if (dnDirectories != null)
+ {
+ final Set<String> domainIdsFromFileSystem = new HashSet<String>();
+ for (final File dnDir : dnDirectories)
+ {
+ final String fileName = dnDir.getName();
+ final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
+ domainIdsFromFileSystem.add(domainId);
+ }
+
+ final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
+ if (!domainIdsFromFileSystem.equals(expectedDomainIds))
+ {
+ throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
+ domainIdsFromFileSystem.toString()));
+ }
+ }
+ }
+
+ /**
+ * Update the changelog state with the state corresponding to the provided
+ * domain DN.
+ */
+ private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
+ throws ChangelogException
+ {
+ final File domainDirectory = getDomainPath(domainEntry.getValue());
+ final DN domainDN = domainEntry.getKey();
+ final String generationId = retrieveGenerationId(domainDirectory);
+ if (generationId != null)
+ {
+ state.setDomainGenerationId(domainDN, toGenerationId(generationId));
+ }
+
+ final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
+ if (serverIds == null)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY.get(
+ replicationRootPath, domainDirectory.getPath()));
+ }
+ for (final File serverId : serverIds)
+ {
+ readStateForServerId(domainDN, serverId, state);
+ }
+ }
+
+ private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
+ {
+ state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
+
+ final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+ if (offlineFile.exists())
+ {
+ final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
+ state.addOfflineReplica(domainDN, offlineCSN);
+ }
+ }
+
+ private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
+ {
+ BufferedReader reader = null;
+ try
+ {
+ reader = newFileReader(offlineFile);
+ String line = reader.readLine();
+ if (line == null || reader.readLine() != null)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineFile.getPath()));
+ }
+ return new CSN(line);
+ }
+ catch(IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineFile.getPath()), e);
+ }
+ finally {
+ StaticUtils.close(reader);
+ }
+ }
+
+ private String createDomainId(final DN domainDN) throws ChangelogException
+ {
+ final String nextDomainId = findNextDomainId();
+ domains.put(domainDN, nextDomainId);
+ final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
+ Writer writer = null;
+ try
+ {
+ writer = newFileWriter(domainsStateFile);
+ for (final Entry<DN, String> entry : domains.entrySet())
+ {
+ writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
+ domainDN.toString(), domainsStateFile.getPath()), e);
+ }
+ finally
+ {
+ StaticUtils.close(writer);
+ }
+ return nextDomainId;
+ }
+
+ /** Find the next domain id to use. This is the lowest integer that is higher than all existing ids. */
+ private String findNextDomainId()
+ {
+ int nextId = 1;
+ for (final String domainId : domains.values())
+ {
+ final Integer id = Integer.valueOf(domainId);
+ if (nextId <= id)
+ {
+ nextId = id + 1;
+ }
+ }
+ return String.valueOf(nextId);
+ }
+
+ /** Open a log from the provided path and record parser. */
+ private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
+ throws ChangelogException
+ {
+ checkShutDownBeforeOpening(serverIdPath);
+
+ final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
+
+ checkShutDownAfterOpening(serverIdPath, log);
+
+ logs.add(log);
+ return log;
+ }
+
+ private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException
+ {
+ if (isShuttingDown.get())
+ {
+ closeLog(log);
+ throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(),
+ replicationServer.getServerId()));
+ }
+ }
+
+ private void checkShutDownBeforeOpening(final File serverIdPath) throws ChangelogException
+ {
+ if (isShuttingDown.get())
+ {
+ throw new ChangelogException(
+ WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
+ serverIdPath.getPath(), replicationServer.getServerId()));
+ }
+ }
+
+ /**
+ * Retrieve the generation id from the provided directory.
+ *
+ * @return the generation id or {@code null} if the corresponding file can't
+ * be found
+ */
+ private String retrieveGenerationId(final File directory)
+ {
+ final File generationId = retrieveGenerationIdFile(directory);
+ if (generationId != null)
+ {
+ String filename = generationId.getName();
+ return filename.substring(GENERATION_ID_FILE_PREFIX.length(),
+ filename.length() - GENERATION_ID_FILE_SUFFIX.length());
+ }
+ return null;
+ }
+
+ /**
+ * Retrieve the file named after the generation id from the provided
+ * directory.
+ *
+ * @return the generation id file or {@code null} if the corresponding file
+ * can't be found
+ */
+ private File retrieveGenerationIdFile(final File directory)
+ {
+ File[] generationIds = directory.listFiles(GENERATION_ID_FILE_FILTER);
+ return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null;
+ }
+
+ private File getDomainPath(final String domainId)
+ {
+ return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
+ }
+
+ /**
+ * Return the path for the provided domain id and server id.
+ * Package private to be usable in tests.
+ *
+ * @param domainId
+ * The id corresponding to a domain DN
+ * @param serverId
+ * The server id to retrieve
+ * @return the path
+ */
+ File getServerIdPath(final String domainId, final int serverId)
+ {
+ return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
+ }
+
+ private File getGenerationIdPath(final String domainId, final long generationId)
+ {
+ return new File(getDomainPath(domainId), GENERATION_ID_FILE_PREFIX + generationId + GENERATION_ID_FILE_SUFFIX);
+ }
+
+ private File getCNIndexDBPath()
+ {
+ return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
+ }
+
+ private void closeLog(final Log<?, ?> log)
+ {
+ logs.remove(log);
+ log.close();
+ }
+
+ private void ensureRootDirectoryExists() throws ChangelogException
+ {
+ final File rootDir = new File(replicationRootPath);
+ if (!rootDir.exists())
+ {
+ final boolean created = rootDir.mkdirs();
+ if (!created)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(replicationRootPath));
+ }
+ }
+ }
+
+ private void ensureServerIdDirectoryExists(final File serverIdPath) throws ChangelogException
+ {
+ if (!serverIdPath.exists())
+ {
+ boolean created = false;
+ try
+ {
+ created = serverIdPath.mkdirs();
+ }
+ catch (Exception e)
+ {
+ // nothing to do
+ }
+
+ if (!created)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_SERVER_ID_DIRECTORY.get(serverIdPath.getPath(), 0));
+ }
+ }
+ }
+
+ private void ensureGenerationIdFileExists(final File generationIdPath)
+ throws ChangelogException
+ {
+ if (!generationIdPath.exists())
+ {
+ try
+ {
+ boolean isCreated = generationIdPath.createNewFile();
+ if (!isCreated)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
+ }
+ }
+ }
+
+ private void debug(String message)
+ {
+ // Replication server may be null when testing
+ String monitorInstanceName = replicationServer != null ? replicationServer.getMonitorInstanceName() :
+ "no monitor [test]";
+ logger.trace("In %s, %s", monitorInstanceName, message);
+ }
+
+ private int toServerId(final String serverIdName) throws ChangelogException
+ {
+ try
+ {
+ String serverId = serverIdName.substring(0, serverIdName.length() - SERVER_ID_SUFFIX.length());
+ return Integer.parseInt(serverId);
+ }
+ catch (NumberFormatException e)
+ {
+ // should never happen
+ throw new ChangelogException(ERR_CHANGELOG_SERVER_ID_FILENAME_WRONG_FORMAT.get(serverIdName), e);
+ }
+ }
+
+ private long toGenerationId(final String data) throws ChangelogException
+ {
+ try
+ {
+ return Long.parseLong(data);
+ }
+ catch (NumberFormatException e)
+ {
+ // should never happen
+ throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
+ }
+ }
+
+ /** Returns a buffered writer on the provided file. */
+ private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
+ {
+ return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
+ }
+
+ /** Returns a buffered reader on the provided file. */
+ private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
+ {
+ return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index d29c707..02a107f 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -115,7 +115,7 @@
* @param changelogState
* the changelog state used for initialization
*/
- ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
+ public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
{
this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
}
--
Gitblit v1.10.0