Actual merge of complete changelog.file package which contains implementation
of file-based changelog
14 files added
3 files modified
| | |
| | | import org.opends.server.core.ModifyDNOperation; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; |
| | | import org.opends.server.types.AttributeType; |
| | |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | /** |
| | | * Get the instance of backend. |
| | | * |
| | | * @return the instance |
| | | */ |
| | | public static ChangelogBackend getInstance() |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | /** |
| | | * Notify. |
| | | * |
| | | * @param baseDN |
| | | * The base DN |
| | | * @param updateMsg |
| | | * The update msg |
| | | */ |
| | | public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.EOFException; |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * A log reader with binary search support. |
| | | * <p> |
| | | * The log file contains record offsets at fixed block size : given a block size N, |
| | | * an offset is written at every N bytes. The offset contains the number of bytes to |
| | | * reach the beginning of previous record (or next record if offset equals 0). |
| | | * <p> |
| | | * The reader provides both sequential access, using the {@code readRecord()} method, |
| | | * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | class BlockLogReader<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | static final int SIZE_OF_BLOCK_OFFSET = 4; |
| | | |
| | | static final int SIZE_OF_RECORD_SIZE = 4; |
| | | |
| | | /** |
| | | * Size of a block, after which an offset to the nearest record is written. |
| | | * <p> |
| | | * This value has been fixed based on performance tests. See |
| | | * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472"> |
| | | * OPENDJ-1472</a> for details. |
| | | */ |
| | | static final int BLOCK_SIZE = 256; |
| | | |
| | | private final int blockSize; |
| | | |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | private final RandomAccessFile reader; |
| | | |
| | | private final File file; |
| | | |
| | | /** |
| | | * Creates a reader for the provided file, file reader and parser. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param file |
| | | * The log file to read. |
| | | * @param reader |
| | | * The random access reader on the log file. |
| | | * @param parser |
| | | * The parser to decode the records read. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader( |
| | | final File file, final RandomAccessFile reader, final RecordParser<K, V> parser) |
| | | { |
| | | return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE); |
| | | } |
| | | |
| | | /** |
| | | * Creates a reader for the provided file, file reader, parser and block size. |
| | | * <p> |
| | | * This method is intended for tests only, to allow tuning of the block size. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param file |
| | | * The log file to read. |
| | | * @param reader |
| | | * The random access reader on the log file. |
| | | * @param parser |
| | | * The parser to decode the records read. |
| | | * @param blockSize |
| | | * The size of each block, or frequency at which the record offset is |
| | | * present in the log file. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests( |
| | | final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize) |
| | | { |
| | | return new BlockLogReader<K, V>(file, reader, parser, blockSize); |
| | | } |
| | | |
| | | private BlockLogReader( |
| | | final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize) |
| | | { |
| | | this.file = file; |
| | | this.reader = reader; |
| | | this.parser = parser; |
| | | this.blockSize = blockSize; |
| | | } |
| | | |
| | | /** |
| | | * Position the reader to the record corresponding to the provided key and |
| | | * matching and positioning strategies. Returns the last record read. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position. Key must not be {@code null}. |
| | | * @param matchStrategy |
| | | * The key matching strategy. |
| | | * @param positionStrategy |
| | | * The positioning strategy. |
| | | * @return The pair (key_found, last_record_read). key_found is a boolean |
| | | * indicating if reader is successfully positioned. last_record_read |
| | | * is the last record that was read. When key_found is equals to |
| | | * {@code false}, then last_record_read is always {@code null}. When |
| | | * key_found is equals to {@code true}, last_record_read can be valued |
| | | * or be {@code null} |
| | | * @throws ChangelogException |
| | | * If an error occurs when seeking the key. |
| | | */ |
| | | public Pair<Boolean, Record<K,V>> seekToRecord( |
| | | final K key, |
| | | final KeyMatchingStrategy matchStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | Reject.ifNull(key); |
| | | final long markerPosition = searchClosestBlockStartToKey(key); |
| | | if (markerPosition >= 0) |
| | | { |
| | | return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy); |
| | | } |
| | | return Pair.of(false, null); |
| | | } |
| | | |
| | | /** |
| | | * Position the reader to the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * offset from the beginning of the file, in bytes. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | public void seekToPosition(final long filePosition) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | reader.seek(filePosition); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Read a record from current position. |
| | | * |
| | | * @return the record read |
| | | * @throws ChangelogException |
| | | * If an error occurs during read. |
| | | */ |
| | | public Record<K,V> readRecord() throws ChangelogException |
| | | { |
| | | return readRecord(-1); |
| | | } |
| | | |
| | | /** |
| | | * Returns the file position for this reader. |
| | | * |
| | | * @return the position of reader on the log file |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | public long getFilePosition() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return reader.getFilePointer(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() throws IOException |
| | | { |
| | | reader.close(); |
| | | } |
| | | |
| | | /** |
| | | * Read a record, either from the provided start of block position or from |
| | | * the current position. |
| | | * |
| | | * @param blockStartPosition |
| | | * The position of start of block, where a record offset is written. |
| | | * If provided value is -1, then record is read from current position instead. |
| | | * @return the record read |
| | | * @throws ChangelogException |
| | | * If an error occurs during read. |
| | | */ |
| | | private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | if (blockStartPosition != -1) |
| | | { |
| | | positionToRecordFromBlockStart(blockStartPosition); |
| | | } |
| | | final ByteString recordData = readNextRecord(); |
| | | return recordData != null ? parser.decodeRecord(recordData) : null; |
| | | } |
| | | catch (Exception io) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Position to the record given by the offset read from provided block |
| | | * start. |
| | | * |
| | | * @param blockStartPosition |
| | | * Position of read pointer in the file, expected to be the start of |
| | | * a block where a record offset is written. |
| | | * @throws IOException |
| | | * If an error occurs during read. |
| | | */ |
| | | private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException |
| | | { |
| | | reader.seek(blockStartPosition); |
| | | if (blockStartPosition > 0) |
| | | { |
| | | final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET]; |
| | | reader.readFully(offsetData); |
| | | final int offsetToRecord = ByteString.wrap(offsetData).toInt(); |
| | | if (offsetToRecord > 0) |
| | | { |
| | | reader.seek(blockStartPosition - offsetToRecord); |
| | | } // if offset is zero, reader is already well positioned |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next record. |
| | | * |
| | | * @return the bytes of the next record, or {@code null} if no record is available |
| | | * @throws IOException |
| | | * If an error occurs while reading. |
| | | */ |
| | | private ByteString readNextRecord() throws IOException |
| | | { |
| | | try |
| | | { |
| | | // read length of record |
| | | final long filePosition = reader.getFilePointer(); |
| | | int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize); |
| | | final int recordLength = readRecordLength(distanceToBlockStart); |
| | | |
| | | // read the record |
| | | long currentPosition = reader.getFilePointer(); |
| | | distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize); |
| | | final ByteStringBuilder recordBytes = |
| | | new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart)); |
| | | int remainingBytesToRead = recordLength; |
| | | while (distanceToBlockStart < remainingBytesToRead) |
| | | { |
| | | if (distanceToBlockStart != 0) |
| | | { |
| | | recordBytes.append(reader, distanceToBlockStart); |
| | | } |
| | | // skip the offset |
| | | reader.skipBytes(SIZE_OF_BLOCK_OFFSET); |
| | | |
| | | // next step |
| | | currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET; |
| | | remainingBytesToRead -= distanceToBlockStart; |
| | | distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET; |
| | | } |
| | | if (remainingBytesToRead > 0) |
| | | { |
| | | // last bytes of the record |
| | | recordBytes.append(reader, remainingBytesToRead); |
| | | } |
| | | return recordBytes.toByteString(); |
| | | } |
| | | catch (EOFException e) |
| | | { |
| | | // end of stream, no record or uncomplete record |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the total length in bytes taken by a record when stored in log file, |
| | | * including size taken by block offsets. |
| | | * |
| | | * @param recordLength |
| | | * The length of record to write. |
| | | * @param distanceToBlockStart |
| | | * Distance before the next block start. |
| | | * @return the length in bytes necessary to store the record in the log |
| | | */ |
| | | int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart) |
| | | { |
| | | int totalLength = recordLength; |
| | | if (recordLength > distanceToBlockStart) |
| | | { |
| | | totalLength += SIZE_OF_BLOCK_OFFSET; |
| | | final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET); |
| | | totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET; |
| | | } |
| | | return totalLength; |
| | | } |
| | | |
| | | /** Read the length of a record. */ |
| | | private int readRecordLength(final int distanceToBlockStart) throws IOException |
| | | { |
| | | final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE); |
| | | if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE) |
| | | { |
| | | lengthBytes.append(reader, distanceToBlockStart); |
| | | // skip the offset |
| | | reader.skipBytes(SIZE_OF_BLOCK_OFFSET); |
| | | lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart); |
| | | return lengthBytes.toByteString().toInt(); |
| | | } |
| | | else |
| | | { |
| | | if (distanceToBlockStart == 0) |
| | | { |
| | | // skip the offset |
| | | reader.skipBytes(SIZE_OF_BLOCK_OFFSET); |
| | | } |
| | | lengthBytes.append(reader, SIZE_OF_RECORD_SIZE); |
| | | return lengthBytes.toByteString().toInt(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Search the closest block start to the provided key, using binary search. |
| | | * <p> |
| | | * Note that position of reader is modified by this method. |
| | | * |
| | | * @param key |
| | | * The key to search |
| | | * @return the file position of block start that must be used to find the given key, |
| | | * or a negative number if no position could be found. |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | long searchClosestBlockStartToKey(K key) throws ChangelogException |
| | | { |
| | | final long maxPos = getFileLength() - 1; |
| | | long lowPos = 0L; |
| | | long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos); |
| | | |
| | | while (lowPos <= highPos) |
| | | { |
| | | final long middlePos = Math.min((lowPos + highPos) / 2, maxPos); |
| | | final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos); |
| | | final Record<K, V> middleRecord = readRecord(middleBlockStartPos); |
| | | if (middleRecord == null) |
| | | { |
| | | return -1; |
| | | } |
| | | |
| | | final int keyComparison = middleRecord.getKey().compareTo(key); |
| | | if (keyComparison < 0) |
| | | { |
| | | if (middleBlockStartPos <= lowPos) |
| | | { |
| | | return lowPos; |
| | | } |
| | | lowPos = middleBlockStartPos; |
| | | } |
| | | else if (keyComparison > 0) |
| | | { |
| | | if (middleBlockStartPos >= highPos) |
| | | { |
| | | return highPos; |
| | | } |
| | | highPos = middleBlockStartPos; |
| | | } |
| | | else |
| | | { |
| | | return middleBlockStartPos; |
| | | } |
| | | } |
| | | // Unable to find a position where key can be found |
| | | return -1; |
| | | } |
| | | |
| | | private long getFileLength() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return reader.length(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Position before, at or after provided key, starting from provided block |
| | | * start position and reading sequentially until key is found according to |
| | | * matching and positioning strategies. |
| | | * |
| | | * @param blockStartPosition |
| | | * Position of read pointer in the file, expected to be the start of |
| | | * a block where a record offset is written |
| | | * @param key |
| | | * The key to find |
| | | * @param matchStrategy |
| | | * The key matching strategy |
| | | * @param positionStrategy |
| | | * The positioning strategy |
| | | * @return The pair ({@code true}, selected record) if reader is successfully |
| | | * positioned (selected record may be null if end of file is reached), |
| | | * ({@code false}, null) otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | Pair<Boolean, Record<K,V>> positionToKeySequentially(final long blockStartPosition, final K key, |
| | | final KeyMatchingStrategy matchStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | Record<K,V> record = readRecord(blockStartPosition); |
| | | Record<K,V> previousRecord = null; |
| | | long previousPosition = blockStartPosition; |
| | | boolean matchingKeyIsLowerThanAnyRecord = true; |
| | | while (record != null) |
| | | { |
| | | final int keysComparison = record.getKey().compareTo(key); |
| | | if (keysComparison <= 0) |
| | | { |
| | | matchingKeyIsLowerThanAnyRecord = false; |
| | | } |
| | | if ((keysComparison == 0 && matchStrategy == EQUAL_TO_KEY) |
| | | || (keysComparison >= 0 && matchStrategy != EQUAL_TO_KEY)) |
| | | { |
| | | return getMatchingRecord(matchStrategy, positionStrategy, keysComparison, matchingKeyIsLowerThanAnyRecord, |
| | | record, previousRecord, previousPosition); |
| | | } |
| | | previousRecord = record; |
| | | previousPosition = getFilePosition(); |
| | | record = readRecord(); |
| | | } |
| | | |
| | | if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | return getRecordNoMatchForLessStrategy(positionStrategy, previousRecord, previousPosition); |
| | | } |
| | | return Pair.of(false, null); |
| | | } |
| | | |
| | | private Pair<Boolean,Record<K,V>> getMatchingRecord(KeyMatchingStrategy matchStrategy, |
| | | PositionStrategy positionStrategy, int keysComparison, boolean matchKeyIsLowerThanAnyRecord, |
| | | Record<K, V> currentRecord, Record<K, V> previousRecord, long previousPosition) |
| | | throws ChangelogException |
| | | { |
| | | Record<K, V> record = currentRecord; |
| | | |
| | | if (positionStrategy == AFTER_MATCHING_KEY) |
| | | { |
| | | if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && matchKeyIsLowerThanAnyRecord) |
| | | { |
| | | return Pair.of(false, null); |
| | | } |
| | | if (keysComparison == 0) |
| | | { |
| | | // skip matching key |
| | | record = readRecord(); |
| | | } |
| | | } |
| | | else if (positionStrategy == ON_MATCHING_KEY && matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && keysComparison > 0) |
| | | { |
| | | seekToPosition(previousPosition); |
| | | return Pair.of(previousRecord != null, previousRecord); |
| | | } |
| | | return Pair.of(true, record); |
| | | } |
| | | |
| | | private Pair<Boolean, Record<K, V>> getRecordNoMatchForLessStrategy( |
| | | final PositionStrategy positionStrategy, final Record<K, V> previousRecord, final long previousPosition) |
| | | throws ChangelogException |
| | | { |
| | | if (positionStrategy == ON_MATCHING_KEY) |
| | | { |
| | | seekToPosition(previousPosition); |
| | | return Pair.of(previousRecord != null, previousRecord); |
| | | } |
| | | else |
| | | { |
| | | return Pair.of(true, null); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the closest start of block which has a position lower than or equal |
| | | * to the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * The position of reader on file. |
| | | * @return the file position of the block start. |
| | | */ |
| | | long getClosestBlockStartBeforeOrAtPosition(final long filePosition) |
| | | { |
| | | final int dist = getDistanceToNextBlockStart(filePosition, blockSize); |
| | | return dist == 0 ? filePosition : filePosition + dist - blockSize; |
| | | } |
| | | |
| | | /** |
| | | * Returns the closest start of block which has a position strictly |
| | | * higher than the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * The position of reader on file. |
| | | * @return the file position of the block start. |
| | | */ |
| | | long getClosestBlockStartStrictlyAfterPosition(final long filePosition) |
| | | { |
| | | final int dist = getDistanceToNextBlockStart(filePosition, blockSize); |
| | | return dist == 0 ? filePosition + blockSize : filePosition + dist; |
| | | } |
| | | |
| | | /** |
| | | * Returns the distance to next block for the provided file position. |
| | | * |
| | | * @param filePosition |
| | | * offset from the beginning of the file, in bytes. |
| | | * @param blockSize |
| | | * Size of each block in bytes. |
| | | * @return the distance to next block in bytes |
| | | */ |
| | | static int getDistanceToNextBlockStart(final long filePosition, final int blockSize) |
| | | { |
| | | if (filePosition == 0) |
| | | { |
| | | return blockSize; |
| | | } |
| | | final int distance = (int) (filePosition % blockSize); |
| | | return distance == 0 ? 0 : blockSize - distance; |
| | | } |
| | | |
| | | /** |
| | | * Check if the log file is valid, by reading the latest records at the end of |
| | | * the log file. |
| | | * <p> |
| | | * The intent of this method is to allow self-recovery in case a partial |
| | | * record as been written at the end of file (eg, after a server crash). |
| | | * <p> |
| | | * Any unexpected exception is considered as a severe error where |
| | | * self-recovery is not appropriate and thus will lead to a |
| | | * ChangelogException. |
| | | * |
| | | * @return -1 if log is valid, or a positive number if log is invalid, where |
| | | * the number represents the last valid position in the log file. |
| | | * @throws ChangelogException |
| | | * if an error occurs while checking the log |
| | | */ |
| | | long checkLogIsValid() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final long fileSize = getFileLength(); |
| | | final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize); |
| | | positionToRecordFromBlockStart(lastBlockStart); |
| | | |
| | | long lastValidPosition = lastBlockStart; |
| | | for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) { |
| | | parser.decodeRecord(recordData); |
| | | lastValidPosition = reader.getFilePointer(); |
| | | } |
| | | |
| | | final boolean isFileValid = lastValidPosition == fileSize; |
| | | return isFileValid ? -1 : lastValidPosition; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get( |
| | | file.getPath(), |
| | | StaticUtils.stackTraceToSingleLineString(e))); |
| | | } |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.file.BlockLogReader.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.IOException; |
| | | import java.io.SyncFailedException; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | |
| | | /** |
| | | * A log writer, using fixed-size blocks to allow fast retrieval when reading. |
| | | * <p> |
| | | * The log file contains record offsets at fixed block size : given block size N, |
| | | * an offset is written at every N bytes. The offset contains the number of bytes to |
| | | * reach the beginning of previous record (or next record if offset equals 0). |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | class BlockLogWriter<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | private final int blockSize; |
| | | |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | private final LogWriter writer; |
| | | |
| | | /** |
| | | * Creates a writer for the provided log writer and parser. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param writer |
| | | * The writer on the log file. |
| | | * @param parser |
| | | * The parser to encode the records. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter( |
| | | final LogWriter writer, final RecordParser<K, V> parser) |
| | | { |
| | | return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE); |
| | | } |
| | | |
| | | /** |
| | | * Creates a writer for the provided log writer, parser and size for blocks. |
| | | * <p> |
| | | * This method is intended for tests only, to allow tuning of the block size. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param writer |
| | | * The writer on the log file. |
| | | * @param parser |
| | | * The parser to encode the records. |
| | | * @param blockSize |
| | | * The size of each block, or frequency at which the record offset is |
| | | * present in the log file. |
| | | * @return a new log reader |
| | | */ |
| | | static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests( |
| | | final LogWriter writer, final RecordParser<K, V> parser, final int blockSize) |
| | | { |
| | | return new BlockLogWriter<K, V>(writer, parser, blockSize); |
| | | } |
| | | |
| | | /** |
| | | * Creates the writer with an underlying writer, a parser and a size for blocks. |
| | | * |
| | | * @param writer |
| | | * The writer to the log file. |
| | | * @param parser |
| | | * The parser to encode the records. |
| | | * @param blockSize |
| | | * The size of each block. |
| | | */ |
| | | private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize) |
| | | { |
| | | Reject.ifNull(writer, parser); |
| | | this.writer = writer; |
| | | this.parser = parser; |
| | | this.blockSize = blockSize; |
| | | } |
| | | |
| | | /** |
| | | * Writes the provided record to the log file. |
| | | * |
| | | * @param record |
| | | * The record to write. |
| | | * @throws ChangelogException |
| | | * If a problem occurs during write. |
| | | */ |
| | | public void write(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | write(parser.encodeRecord(record)); |
| | | writer.flush(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), |
| | | writer.getFile().getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of bytes written in the log file. |
| | | * |
| | | * @return the number of bytes |
| | | */ |
| | | public long getBytesWritten() |
| | | { |
| | | return writer.getBytesWritten(); |
| | | } |
| | | |
| | | /** |
| | | * Synchronize all modifications to the log file to the underlying device. |
| | | * |
| | | * @throws SyncFailedException |
| | | * If synchronization fails. |
| | | */ |
| | | public void sync() throws SyncFailedException |
| | | { |
| | | writer.sync(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | writer.close(); |
| | | } |
| | | |
| | | /** |
| | | * Writes the provided byte string to the log file. |
| | | * |
| | | * @param record |
| | | * The value to write. |
| | | * @throws IOException |
| | | * If an error occurs while writing |
| | | */ |
| | | private void write(final ByteString record) throws IOException |
| | | { |
| | | // Add length of record before writing |
| | | ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()). |
| | | append(record.length()). |
| | | append(record). |
| | | toByteString(); |
| | | |
| | | int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize); |
| | | int cumulatedDistanceToBeginning = distanceToBlockStart; |
| | | int dataPosition = 0; |
| | | int dataRemaining = data.length(); |
| | | final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET; |
| | | |
| | | while (distanceToBlockStart < dataRemaining) |
| | | { |
| | | if (distanceToBlockStart > 0) |
| | | { |
| | | // append part of record |
| | | final int dataEndPosition = dataPosition + distanceToBlockStart; |
| | | writer.write(data.subSequence(dataPosition, dataEndPosition)); |
| | | dataPosition = dataEndPosition; |
| | | dataRemaining -= distanceToBlockStart; |
| | | } |
| | | // append the offset to the record |
| | | writer.write(ByteString.valueOf(cumulatedDistanceToBeginning)); |
| | | |
| | | // next step |
| | | distanceToBlockStart = dataSizeForOneBlock; |
| | | cumulatedDistanceToBeginning += blockSize; |
| | | } |
| | | // append the remaining bytes to finish the record |
| | | writer.write(data.subSequence(dataPosition, data.length())); |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | |
| | | /** |
| | | * Exception thrown when a record can't be decoded properly. |
| | | */ |
| | | public class DecodingException extends ChangelogException |
| | | { |
| | | private static final long serialVersionUID = 5629692522662643737L; |
| | | |
| | | /** |
| | | * Creates a new decoding exception with the provided information. |
| | | * |
| | | * @param message |
| | | * The message that explains the problem that occurred. |
| | | */ |
| | | public DecodingException(LocalizableMessage message) |
| | | { |
| | | super(message); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new decoding exception with the provided information. |
| | | * |
| | | * @param cause |
| | | * The underlying cause that triggered this exception. |
| | | */ |
| | | public DecodingException(Throwable cause) |
| | | { |
| | | super(cause); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new decoding exception with the provided information. |
| | | * |
| | | * @param message |
| | | * The message that explains the problem that occurred. |
| | | * @param cause |
| | | * The underlying cause that triggered this exception. |
| | | */ |
| | | public DecodingException(LocalizableMessage message, Throwable cause) |
| | | { |
| | | super(message, cause); |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ByteSequenceReader; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * Implementation of a ChangeNumberIndexDB with a log. |
| | | * <p> |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | | */ |
| | | class FileChangeNumberIndexDB implements ChangeNumberIndexDB |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private static final int NO_KEY = 0; |
| | | |
| | | /** The parser of records stored in this ChangeNumberIndexDB. */ |
| | | static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser(); |
| | | |
| | | /** The log in which records are persisted. */ |
| | | private final Log<Long, ChangeNumberIndexRecord> log; |
| | | |
| | | /** |
| | | * The newest changenumber stored in the DB. It is used to avoid purging the |
| | | * record with the newest changenumber. The newest record in the changenumber |
| | | * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is |
| | | * then retrieved on server startup. |
| | | */ |
| | | private volatile long newestChangeNumber = NO_KEY; |
| | | |
| | | /** |
| | | * The last generated value for the change number. It is kept separate from |
| | | * the {@link #newestChangeNumber} because there is an opportunity for a race |
| | | * condition between: |
| | | * <ol> |
| | | * <li>this atomic long being incremented for a new record ('recordB')</li> |
| | | * <li>the current newest record ('recordA') being purged from the DB</li> |
| | | * <li>'recordB' failing to be inserted in the DB</li> |
| | | * </ol> |
| | | */ |
| | | private final AtomicLong lastGeneratedChangeNumber; |
| | | |
| | | private final DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | |
| | | /** |
| | | * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. |
| | | * |
| | | * @param replicationEnv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @throws ChangelogException If a database problem happened |
| | | */ |
| | | FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | { |
| | | log = replicationEnv.getOrCreateCNIndexDB(); |
| | | final ChangeNumberIndexRecord newestRecord = readLastRecord(); |
| | | newestChangeNumber = getChangeNumber(newestRecord); |
| | | // initialization of the lastGeneratedChangeNumber from the DB content |
| | | // if DB is empty => last record does not exist => default to 0 |
| | | lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber); |
| | | |
| | | // Monitoring registration |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | private ChangeNumberIndexRecord readLastRecord() throws ChangelogException |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord(); |
| | | return record == null ? null : record.getValue(); |
| | | } |
| | | |
| | | private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord(); |
| | | return record == null ? null : record.getValue(); |
| | | } |
| | | |
| | | private long getChangeNumber(final ChangeNumberIndexRecord record) throws ChangelogException |
| | | { |
| | | if (record != null) |
| | | { |
| | | return record.getChangeNumber(); |
| | | } |
| | | return NO_KEY; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long addRecord(final ChangeNumberIndexRecord record) throws ChangelogException |
| | | { |
| | | final long changeNumber = nextChangeNumber(); |
| | | final ChangeNumberIndexRecord newRecord = |
| | | new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN()); |
| | | log.append(Record.from(newRecord.getChangeNumber(), newRecord)); |
| | | newestChangeNumber = changeNumber; |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace("In FileChangeNumberIndexDB.addRecord, added: " + newRecord); |
| | | } |
| | | return changeNumber; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException |
| | | { |
| | | return readFirstRecord(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException |
| | | { |
| | | return readLastRecord(); |
| | | } |
| | | |
| | | private long nextChangeNumber() |
| | | { |
| | | return lastGeneratedChangeNumber.incrementAndGet(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getLastGeneratedChangeNumber() |
| | | { |
| | | return lastGeneratedChangeNumber.get(); |
| | | } |
| | | |
| | | /** |
| | | * Get the number of changes. |
| | | * |
| | | * @return Returns the number of changes. |
| | | * @throws ChangelogException |
| | | * If a problem occurs. |
| | | */ |
| | | long count() throws ChangelogException |
| | | { |
| | | return log.getNumberOfRecords(); |
| | | } |
| | | |
| | | /** |
| | | * Returns whether this database is empty. |
| | | * |
| | | * @return <code>true</code> if this database is empty, <code>false</code> |
| | | * otherwise |
| | | * @throws ChangelogException |
| | | * if a problem occurs. |
| | | */ |
| | | boolean isEmpty() throws ChangelogException |
| | | { |
| | | return getNewestRecord() == null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException |
| | | { |
| | | return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber)); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this DB. |
| | | */ |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | | log.close(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Synchronously purges the change number index DB up to and excluding the |
| | | * provided timestamp. |
| | | * |
| | | * @param purgeCSN |
| | | * the timestamp up to which purging must happen |
| | | * @return the oldest non purged CSN. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | CSN purgeUpTo(final CSN purgeCSN) throws ChangelogException |
| | | { |
| | | if (isEmpty() || purgeCSN == null) |
| | | { |
| | | return null; |
| | | } |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime()); |
| | | return record != null ? record.getValue().getCSN() : null; |
| | | } |
| | | |
| | | /** |
| | | * Implements the Monitoring capabilities of the FileChangeNumberIndexDB. |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | final List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(createChangeNumberAttribute(true)); |
| | | attributes.add(createChangeNumberAttribute(false)); |
| | | long numberOfChanges = 0; |
| | | try |
| | | { |
| | | numberOfChanges = count(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | logger.traceException(e); |
| | | } |
| | | attributes.add(Attributes.create("count", Long.toString(numberOfChanges))); |
| | | return attributes; |
| | | } |
| | | |
| | | private Attribute createChangeNumberAttribute(final boolean isFirst) |
| | | { |
| | | final String attributeName = isFirst ? "first-draft-changenumber" : "last-draft-changenumber"; |
| | | final String changeNumber = String.valueOf(readChangeNumber(isFirst)); |
| | | return Attributes.create(attributeName, changeNumber); |
| | | } |
| | | |
| | | private long readChangeNumber(final boolean isFirst) |
| | | { |
| | | try |
| | | { |
| | | return getChangeNumber(isFirst ? readFirstRecord() : readLastRecord()); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | logger.traceException(e); |
| | | return NO_KEY; |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "ChangeNumber Index Database"; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException, InitializationException |
| | | { |
| | | // Nothing to do for now |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + ", newestChangeNumber=" + newestChangeNumber; |
| | | } |
| | | |
| | | /** |
| | | * Clear the changes from this DB (from both memory cache and DB storage). |
| | | * |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | log.clear(); |
| | | newestChangeNumber = NO_KEY; |
| | | } |
| | | |
| | | /** Parser of records persisted in the FileChangeNumberIndex log. */ |
| | | private static class ChangeNumberIndexDBParser implements RecordParser<Long, ChangeNumberIndexRecord> |
| | | { |
| | | private static final byte STRING_SEPARATOR = 0; |
| | | |
| | | @Override |
| | | public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record) |
| | | { |
| | | final ChangeNumberIndexRecord cnIndexRecord = record.getValue(); |
| | | return new ByteStringBuilder() |
| | | .append(record.getKey()) |
| | | .append(cnIndexRecord.getBaseDN().toString()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(cnIndexRecord.getCSN().toByteString()).toByteString(); |
| | | } |
| | | |
| | | @Override |
| | | public Record<Long, ChangeNumberIndexRecord> decodeRecord(final ByteString data) throws DecodingException |
| | | { |
| | | try |
| | | { |
| | | ByteSequenceReader reader = data.asReader(); |
| | | final long changeNumber = reader.getLong(); |
| | | final DN baseDN = DN.valueOf(reader.getString(getNextStringLength(reader))); |
| | | reader.skip(1); |
| | | final CSN csn = CSN.valueOf(reader.getByteString(reader.remaining())); |
| | | |
| | | return Record.from(changeNumber, new ChangeNumberIndexRecord(changeNumber, baseDN, csn)); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new DecodingException(e); |
| | | } |
| | | } |
| | | |
| | | /** Returns the length of next string by looking for the zero byte used as separator. */ |
| | | private int getNextStringLength(ByteSequenceReader reader) |
| | | { |
| | | int length = 0; |
| | | while (reader.peek(length) != STRING_SEPARATOR) |
| | | { |
| | | length++; |
| | | } |
| | | return length; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Long decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return Long.valueOf(key); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String encodeKeyToString(Long key) |
| | | { |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Long getMaxKey() |
| | | { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | |
| | | /** |
| | | * A cursor on ChangeNumberIndexDB. |
| | | * |
| | | * \@NotThreadSafe |
| | | */ |
| | | class FileChangeNumberIndexDBCursor implements DBCursor<ChangeNumberIndexRecord> |
| | | { |
| | | |
| | | /** The underlying cursor. */ |
| | | private final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor; |
| | | |
| | | /** |
| | | * Creates the cursor from provided cursor. |
| | | * |
| | | * @param cursor |
| | | * The underlying cursor to read log. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor) |
| | | throws ChangelogException |
| | | { |
| | | this.cursor = cursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexRecord getRecord() |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = cursor.getRecord(); |
| | | return record != null ? record.getValue() : null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | return cursor.next(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | cursor.close(); |
| | | } |
| | | |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.File; |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessageBuilder; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.ChangelogBackend; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; |
| | | import org.opends.server.replication.server.changelog.je.DomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.ReplicaCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * File-based implementation of the ChangelogDB interface. |
| | | * Log file implementation of the ChangelogDB interface. |
| | | */ |
| | | public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB |
| | | { |
| | | |
| | | /** |
| | | * Creates the changelog DB. |
| | | * |
| | | * @param replicationServer |
| | | * replication server |
| | | * @param cfg |
| | | * configuration |
| | | */ |
| | | public FileChangelogDB(ReplicationServer replicationServer, |
| | | ReplicationServerCfg cfg) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ServerState getDomainOldestCSNs(DN baseDN) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ServerState getDomainNewestCSNs(DN baseDN) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void removeDomain(DN baseDN) throws ChangelogException |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | * <p> |
| | | * When removing a domainMap, code: |
| | | * <ol> |
| | | * <li>first get the domainMap</li> |
| | | * <li>synchronized on the domainMap</li> |
| | | * <li>remove the domainMap</li> |
| | | * <li>then check it's not null</li> |
| | | * <li>then close all inside</li> |
| | | * </ol> |
| | | * When creating a replicaDB, synchronize on the domainMap to avoid |
| | | * concurrent shutdown. |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>(); |
| | | /** |
| | | * \@GuardedBy("itself") |
| | | */ |
| | | private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); |
| | | private ReplicationEnvironment replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | | |
| | | /** |
| | | * The handler of the changelog database, the database stores the relation |
| | | * between a change number and the associated cookie. |
| | | * <p> |
| | | * @GuardedBy("cnIndexDBLock") |
| | | */ |
| | | private FileChangeNumberIndexDB cnIndexDB; |
| | | private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>(); |
| | | |
| | | /** Used for protecting {@link ChangeNumberIndexDB} related state. */ |
| | | private final Object cnIndexDBLock = new Object(); |
| | | |
| | | /** |
| | | * The purge delay (in milliseconds). Records in the changelog DB that are |
| | | * older than this delay might be removed. |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY); |
| | | |
| | | /** |
| | | * Creates a new changelog DB. |
| | | * |
| | | * @param replicationServer |
| | | * the local replication server. |
| | | * @param config |
| | | * the replication server configuration |
| | | * @throws ConfigException |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) |
| | | throws ConfigException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, |
| | | KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | private File makeDir(final String dbDirName) throws ConfigException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | // Check that this path exists or create it. |
| | | final File dbDirectory = getFileForPath(dbDirName); |
| | | try |
| | | { |
| | | if (!dbDirectory.exists()) |
| | | { |
| | | dbDirectory.mkdir(); |
| | | } |
| | | return dbDirectory; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(e.getLocalizedMessage()).append(" ") |
| | | .append(String.valueOf(dbDirectory)); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, |
| | | KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, |
| | | Set<DN> excludedDomainDns) throws ChangelogException |
| | | private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | | { |
| | | return domainMap; |
| | | } |
| | | return Collections.emptyMap(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, |
| | | KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | return getDomainMap(baseDN).get(serverId); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, |
| | | CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | /** |
| | | * Returns a {@link FileReplicaDB}, possibly creating it. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN for which to create a ReplicaDB |
| | | * @param serverId |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param server |
| | | * the ReplicationServer |
| | | * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, |
| | | final ReplicationServer server) throws ChangelogException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | while (!shutdown.get()) |
| | | { |
| | | final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); |
| | | final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | if (result != null) |
| | | { |
| | | final Boolean dbWasCreated = result.getSecond(); |
| | | if (dbWasCreated) |
| | | { // new replicaDB => update all cursors with it |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (DomainDBCursor cursor : cursors) |
| | | { |
| | | cursor.addReplicaDB(serverId, null); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return result; |
| | | } |
| | | } |
| | | throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void unregisterCursor(DBCursor<?> cursor) |
| | | private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | // happy path: the domainMap already exists |
| | | final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); |
| | | if (currentValue != null) |
| | | { |
| | | return currentValue; |
| | | } |
| | | |
| | | // unlucky, the domainMap does not exist: take the hit and create the |
| | | // newValue, even though the same could be done concurrently by another thread |
| | | final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<Integer, FileReplicaDB>(); |
| | | final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | if (previousValue != null) |
| | | { |
| | | // there was already a value associated to the key, let's use it |
| | | return previousValue; |
| | | } |
| | | |
| | | // we just created a new domain => update all cursors |
| | | for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) |
| | | { |
| | | cursor.addDomain(baseDN, null); |
| | | } |
| | | return newValue; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap, |
| | | final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | // happy path: the replicaDB already exists |
| | | FileReplicaDB currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) |
| | | throws ChangelogException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | // unlucky, the replicaDB does not exist: take the hit and synchronize |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | | currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | if (domainToReplicaDBs.get(baseDN) != domainMap) |
| | | { |
| | | // The domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the replicaDB |
| | | return null; |
| | | } |
| | | |
| | | final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, replicationEnv); |
| | | domainMap.put(serverId, newDB); |
| | | return Pair.of(newDB, true); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeDB() |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | try |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = replicationEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | | startIndexer(changelogState); |
| | | } |
| | | setPurgeDelay(replicationServer.getPurgeDelay()); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | logger.traceException(e); |
| | | logger.error(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setPurgeDelay(long delayInMillis) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setComputeChangeNumber(boolean computeChangeNumber) |
| | | private void initializeToChangelogState(final ChangelogState changelogState) |
| | | throws ChangelogException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet()) |
| | | { |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); |
| | | } |
| | | for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | | getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void shutdownChangeNumberIndexDB() throws ChangelogException |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | | if (cnIndexDB != null) |
| | | { |
| | | cnIndexDB.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void shutdownDB() throws ChangelogException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | if (!this.shutdown.compareAndSet(false, true)) |
| | | { // shutdown has already been initiated |
| | | return; |
| | | } |
| | | |
| | | // Remember the first exception because : |
| | | // - we want to try to remove everything we want to remove |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | } |
| | | final ChangelogDBPurger purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | } |
| | | |
| | | // wait for shutdown of the threads holding cursors |
| | | try |
| | | { |
| | | if (indexer != null) |
| | | { |
| | | indexer.join(); |
| | | } |
| | | if (purger != null) |
| | | { |
| | | purger.join(); |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // do nothing: we are already shutting down |
| | | } |
| | | |
| | | // now we can safely shutdown all DBs |
| | | try |
| | | { |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | firstException = e; |
| | | } |
| | | for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it = |
| | | this.domainToReplicaDBs.values().iterator(); it.hasNext();) |
| | | { |
| | | final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next(); |
| | | synchronized (domainMap) |
| | | { |
| | | it.remove(); |
| | | for (FileReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | replicaDB.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | if (replicationEnv != null) |
| | | { |
| | | replicationEnv.shutdown(); |
| | | } |
| | | |
| | | if (firstException != null) |
| | | { |
| | | throw firstException; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears all records from the changelog (does not remove the changelog itself). |
| | | * |
| | | * @throws ChangelogException |
| | | * If an error occurs when clearing the changelog. |
| | | */ |
| | | public void clearDB() throws ChangelogException |
| | | { |
| | | if (!dbDirectory.exists()) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // Remember the first exception because : |
| | | // - we want to try to remove everything we want to remove |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | for (DN baseDN : this.domainToReplicaDBs.keySet()) |
| | | { |
| | | removeDomain(baseDN); |
| | | } |
| | | |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | | if (cnIndexDB != null) |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB.clear(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | firstException = e; |
| | | } |
| | | |
| | | try |
| | | { |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | if (firstException == null) |
| | | { |
| | | firstException = e; |
| | | } |
| | | else if (logger.isTraceEnabled()) |
| | | { |
| | | logger.traceException(e); |
| | | } |
| | | } |
| | | |
| | | cnIndexDB = null; |
| | | } |
| | | } |
| | | |
| | | if (firstException != null) |
| | | { |
| | | throw firstException; |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void removeDB() throws ChangelogException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | shutdownDB(); |
| | | StaticUtils.recursiveDelete(dbDirectory); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) |
| | | public ServerState getDomainOldestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (FileReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(replicaDB.getOldestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ServerState getDomainNewestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (FileReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(replicaDB.getNewestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void removeDomain(DN baseDN) throws ChangelogException |
| | | { |
| | | // Remember the first exception because : |
| | | // - we want to try to remove everything we want to remove |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | // 1- clear the replica DBs |
| | | Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | | { |
| | | final ChangeNumberIndexer indexer = this.cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.clear(baseDN); |
| | | } |
| | | synchronized (domainMap) |
| | | { |
| | | domainMap = domainToReplicaDBs.remove(baseDN); |
| | | for (FileReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | try |
| | | { |
| | | replicaDB.clear(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | firstException = e; |
| | | } |
| | | replicaDB.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | // 2- clear the changelogstate DB |
| | | try |
| | | { |
| | | replicationEnv.clearGenerationId(baseDN); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | if (firstException == null) |
| | | { |
| | | firstException = e; |
| | | } |
| | | else if (logger.isTraceEnabled()) |
| | | { |
| | | logger.traceException(e); |
| | | } |
| | | } |
| | | |
| | | if (firstException != null) |
| | | { |
| | | throw firstException; |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setPurgeDelay(final long purgeDelayInMillis) |
| | | { |
| | | this.purgeDelayInMillis = purgeDelayInMillis; |
| | | final ChangelogDBPurger purger; |
| | | if (purgeDelayInMillis > 0) |
| | | { |
| | | purger = new ChangelogDBPurger(); |
| | | if (cnPurger.compareAndSet(null, purger)) |
| | | { |
| | | purger.start(); |
| | | } // otherwise a purger was already running |
| | | } |
| | | else |
| | | { |
| | | purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setComputeChangeNumber(final boolean computeChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(replicationEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | | final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void startIndexer(final ChangelogState changelogState) |
| | | { |
| | | final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDB getChangeNumberIndexDB() |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | | if (cnIndexDB == null) |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new FileChangeNumberIndexDB(replicationEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logger.traceException(e); |
| | | logger.error(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | return cnIndexDB; |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ReplicationDomainDB getReplicationDomainDB() |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | return this; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final Set<DN> excludedDomainDns = Collections.emptySet(); |
| | | return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, |
| | | final Set<DN> excludedDomainDns) |
| | | throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | if (!excludedDomainDns.contains(baseDN)) |
| | | { |
| | | cursor.addDomain(baseDN, startState.getServerState(baseDN)); |
| | | } |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<DomainDBCursor>(); |
| | | registeredDomainCursors.put(baseDN, cursors); |
| | | } |
| | | cursors.add(cursor); |
| | | return cursor; |
| | | } |
| | | } |
| | | |
| | | private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final MultiDomainServerState offlineReplicas = |
| | | replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); |
| | | if (offlineCSN != null |
| | | && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); |
| | | |
| | | synchronized (replicaCursors) |
| | | { |
| | | List<ReplicaCursor> cursors = replicaCursors.get(replicaID); |
| | | if (cursors == null) |
| | | { |
| | | cursors = new ArrayList<ReplicaCursor>(); |
| | | replicaCursors.put(replicaID, cursors); |
| | | } |
| | | cursors.add(replicaCursor); |
| | | } |
| | | |
| | | return replicaCursor; |
| | | } |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void unregisterCursor(final DBCursor<?> cursor) |
| | | { |
| | | if (cursor instanceof MultiDomainDBCursor) |
| | | { |
| | | registeredMultiDomainCursors.remove(cursor); |
| | | } |
| | | else if (cursor instanceof DomainDBCursor) |
| | | { |
| | | final DomainDBCursor domainCursor = (DomainDBCursor) cursor; |
| | | synchronized (registeredMultiDomainCursors) |
| | | { |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | else if (cursor instanceof ReplicaCursor) |
| | | { |
| | | final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; |
| | | synchronized (replicaCursors) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | csn.getServerId(), replicationServer); |
| | | final FileReplicaDB replicaDB = pair.getFirst(); |
| | | replicaDB.add(updateMsg); |
| | | |
| | | ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg); |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, csn.getServerId()); |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return pair.getSecond(); // replica DB was created |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); |
| | | indexer.publishHeartbeat(baseDN, heartbeatCSN); |
| | | } |
| | | } |
| | | |
| | | private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) |
| | | throws ChangelogException |
| | | { |
| | | if (indexer.isReplicaOffline(baseDN, serverId)) |
| | | { |
| | | replicationEnv.notifyReplicaOnline(baseDN, serverId); |
| | | } |
| | | updateCursorsWithOfflineCSN(baseDN, serverId, null); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException |
| | | { |
| | | replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.replicaOffline(baseDN, offlineCSN); |
| | | } |
| | | updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN); |
| | | } |
| | | |
| | | private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) |
| | | { |
| | | synchronized (replicaCursors) |
| | | { |
| | | final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); |
| | | if (cursors != null) |
| | | { |
| | | for (ReplicaCursor cursor : cursors) |
| | | { |
| | | cursor.setOfflineCSN(offlineCSN); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clear the database. |
| | | * The thread purging the changelogDB on a regular interval. Records are |
| | | * purged from the changelogDB if they are older than a delay specified in |
| | | * seconds. The purge process works in two steps: |
| | | * <ol> |
| | | * <li>first purge the changeNumberIndexDB and retrieve information to drive |
| | | * replicaDBs purging</li> |
| | | * <li>proceed to purge each replicaDBs based on the information collected |
| | | * when purging the changeNumberIndexDB</li> |
| | | * </ol> |
| | | */ |
| | | public void clearDB() |
| | | private final class ChangelogDBPurger extends DirectoryThread |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | private static final int DEFAULT_SLEEP = 500; |
| | | |
| | | protected ChangelogDBPurger() |
| | | { |
| | | super("changelog DB purger"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | // initialize CNIndexDB |
| | | getChangeNumberIndexDB(); |
| | | while (!isShutdownInitiated()) |
| | | { |
| | | try |
| | | { |
| | | final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; |
| | | final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); |
| | | final CSN oldestNotPurgedCSN; |
| | | |
| | | // next code assumes that the compute-change-number config |
| | | // never changes during the life time of an RS |
| | | if (!config.isComputeChangeNumber()) |
| | | { |
| | | oldestNotPurgedCSN = purgeCSN; |
| | | } |
| | | else |
| | | { |
| | | final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB; |
| | | if (localCNIndexDB == null) |
| | | { // shutdown has been initiated |
| | | return; |
| | | } |
| | | |
| | | oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); |
| | | if (oldestNotPurgedCSN == null) |
| | | { // shutdown may have been initiated... |
| | | // ... or the change number index DB is empty, |
| | | // wait for new changes to come in. |
| | | |
| | | // Note we cannot sleep for as long as the purge delay |
| | | // (3 days default), because we might receive late updates |
| | | // that will have to be purged before the purge delay elapses. |
| | | // This can particularly happen in case of network partitions. |
| | | jeFriendlySleep(DEFAULT_SLEEP); |
| | | continue; |
| | | } |
| | | } |
| | | |
| | | for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values()) |
| | | { |
| | | for (final FileReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | replicaDB.purgeUpTo(oldestNotPurgedCSN); |
| | | } |
| | | } |
| | | |
| | | jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // shutdown initiated? |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e))); |
| | | if (replicationServer != null) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method implements a sleep() that is friendly to Berkeley JE. |
| | | * <p> |
| | | * Originally, {@link Thread#sleep(long)} was used , but waking up a |
| | | * sleeping threads required calling {@link Thread#interrupt()}, and JE |
| | | * threw exceptions when invoked on interrupted threads. |
| | | * <p> |
| | | * The solution is to replace: |
| | | * <ol> |
| | | * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> |
| | | * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> |
| | | * </ol> |
| | | */ |
| | | private void jeFriendlySleep(long millis) throws InterruptedException |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | wait(millis); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) |
| | | { |
| | | final long nextPurgeTime = notPurgedCSN.getTime(); |
| | | final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis; |
| | | if (currentPurgeTime <= nextPurgeTime) |
| | | { |
| | | // sleep till the next CSN to purge, |
| | | return nextPurgeTime - currentPurgeTime; |
| | | } |
| | | // wait a bit before purging more |
| | | return DEFAULT_SLEEP; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | | { |
| | | super.initiateShutdown(); |
| | | synchronized (this) |
| | | { |
| | | notify(); // wake up the purger thread for faster shutdown |
| | | } |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.List; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.InitializationException; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * Represents a replication server database for one server in the topology. |
| | | * <p> |
| | | * It is responsible for efficiently saving the updates that is received from |
| | | * each master server into stable storage. |
| | | * <p> |
| | | * It is also able to generate a {@link DBCursor} that can be used to |
| | | * read all changes from a given {@link CSN}. |
| | | * <p> |
| | | * It publishes some monitoring information below cn=monitor. |
| | | */ |
| | | class FileReplicaDB |
| | | { |
| | | |
| | | /** The parser of records stored in Replica DB. */ |
| | | static final RecordParser<CSN, UpdateMsg> RECORD_PARSER = new ReplicaDBParser(); |
| | | |
| | | /** |
| | | * Class that allows atomically setting oldest and newest CSNs without |
| | | * synchronization. |
| | | * |
| | | * @Immutable |
| | | */ |
| | | private static final class CSNLimits |
| | | { |
| | | private final CSN oldestCSN; |
| | | private final CSN newestCSN; |
| | | |
| | | public CSNLimits(CSN oldestCSN, CSN newestCSN) |
| | | { |
| | | this.oldestCSN = oldestCSN; |
| | | this.newestCSN = newestCSN; |
| | | } |
| | | } |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | |
| | | /** The log in which records are persisted. */ |
| | | private final Log<CSN, UpdateMsg> log; |
| | | |
| | | /** |
| | | * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. |
| | | * |
| | | * @NonNull |
| | | */ |
| | | private volatile CSNLimits csnLimits; |
| | | private final int serverId; |
| | | private final DN baseDN; |
| | | private final DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private final ReplicationServer replicationServer; |
| | | private final ReplicationEnvironment replicationEnv; |
| | | |
| | | /** |
| | | * Creates a new ReplicaDB associated to a given LDAP server. |
| | | * |
| | | * @param serverId |
| | | * Id of this server. |
| | | * @param baseDN |
| | | * the replication domain baseDN. |
| | | * @param replicationServer |
| | | * The ReplicationServer that creates this ReplicaDB. |
| | | * @param replicationEnv |
| | | * the Database Env to use to create the ReplicationServer DB. server |
| | | * for this domain. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | FileReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer, |
| | | final ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | { |
| | | this.serverId = serverId; |
| | | this.baseDN = baseDN; |
| | | this.replicationServer = replicationServer; |
| | | this.replicationEnv = replicationEnv; |
| | | this.log = createLog(replicationEnv); |
| | | this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN()); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | private CSN readOldestCSN() throws ChangelogException |
| | | { |
| | | final Record<CSN, UpdateMsg> record = log.getOldestRecord(); |
| | | return record == null ? null : record.getKey(); |
| | | } |
| | | |
| | | private CSN readNewestCSN() throws ChangelogException |
| | | { |
| | | final Record<CSN, UpdateMsg> record = log.getNewestRecord(); |
| | | return record == null ? null : record.getKey(); |
| | | } |
| | | |
| | | private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | { |
| | | final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true); |
| | | return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId()); |
| | | } |
| | | |
| | | /** |
| | | * Adds a new message. |
| | | * |
| | | * @param updateMsg |
| | | * The update message to add. |
| | | * @throws ChangelogException |
| | | * If an error occurs when trying to add the message. |
| | | */ |
| | | void add(final UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | if (shutdown.get()) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg |
| | | .toString(), String.valueOf(baseDN), String.valueOf(serverId))); |
| | | } |
| | | |
| | | log.append(Record.from(updateMsg.getCSN(), updateMsg)); |
| | | |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateOld = limits.oldestCSN == null; |
| | | if (updateOld || updateNew) |
| | | { |
| | | csnLimits = new CSNLimits( |
| | | updateOld ? updateMsg.getCSN() : limits.oldestCSN, |
| | | updateNew ? updateMsg.getCSN() : limits.newestCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the oldest CSN that has not been purged yet. |
| | | * |
| | | * @return the oldest CSN that has not been purged yet. |
| | | */ |
| | | CSN getOldestCSN() |
| | | { |
| | | return csnLimits.oldestCSN; |
| | | } |
| | | |
| | | /** |
| | | * Get the newest CSN that has not been purged yet. |
| | | * |
| | | * @return the newest CSN that has not been purged yet. |
| | | */ |
| | | CSN getNewestCSN() |
| | | { |
| | | return csnLimits.newestCSN; |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the update messages from this DB. |
| | | * The actual starting position is defined by the provided CSN, the key |
| | | * matching strategy and the positioning strategy. |
| | | * |
| | | * @param startCSN |
| | | * The position where the cursor must start. If null, start from the |
| | | * oldest CSN |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy |
| | | * @return a new {@link DBCursor} to retrieve update messages. |
| | | * @throws ChangelogException |
| | | * if a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy); |
| | | return new FileReplicaDBCursor(cursor, startCSN, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | | log.close(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Synchronously purge changes older than purgeCSN from this replicaDB. |
| | | * |
| | | * @param purgeCSN |
| | | * The CSN up to which changes can be purged. No purging happens when |
| | | * it is {@code null}. |
| | | * @throws ChangelogException |
| | | * In case of database problem. |
| | | */ |
| | | void purgeUpTo(final CSN purgeCSN) throws ChangelogException |
| | | { |
| | | if (purgeCSN == null) |
| | | { |
| | | return; |
| | | } |
| | | final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN); |
| | | if (oldestRecord != null) |
| | | { |
| | | csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Implements monitoring capabilities of the ReplicaDB. |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | final List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | create(attributes, "replicationServer-database",String.valueOf(serverId)); |
| | | create(attributes, "domain-name", baseDN.toNormalizedString()); |
| | | final CSNLimits limits = csnLimits; |
| | | if (limits.oldestCSN != null) |
| | | { |
| | | create(attributes, "first-change", encode(limits.oldestCSN)); |
| | | } |
| | | if (limits.newestCSN != null) |
| | | { |
| | | create(attributes, "last-change", encode(limits.newestCSN)); |
| | | } |
| | | return attributes; |
| | | } |
| | | |
| | | private void create(final List<Attribute> attributes, final String name, final String value) |
| | | { |
| | | attributes.add(Attributes.create(name, value)); |
| | | } |
| | | |
| | | private String encode(final CSN csn) |
| | | { |
| | | return csn + " " + new Date(csn.getTime()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN); |
| | | return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException, InitializationException |
| | | { |
| | | // Nothing to do for now |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | final CSNLimits limits = csnLimits; |
| | | return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " |
| | | + limits.oldestCSN + " " + limits.newestCSN; |
| | | } |
| | | |
| | | /** |
| | | * Clear the changes from this DB, from both memory cache and persistent |
| | | * storage. |
| | | * |
| | | * @throws ChangelogException |
| | | * If an error occurs while removing the changes from the DB. |
| | | */ |
| | | void clear() throws ChangelogException |
| | | { |
| | | // Remove all persisted data and reset generationId to default value |
| | | log.clear(); |
| | | replicationEnv.resetGenerationId(baseDN); |
| | | |
| | | csnLimits = new CSNLimits(null, null); |
| | | } |
| | | |
| | | /** |
| | | * Return the number of records of this replicaDB. |
| | | * <p> |
| | | * For test purpose. |
| | | * |
| | | * @return The number of records of this replicaDB. |
| | | * @throws ChangelogException |
| | | * If an error occurs |
| | | */ |
| | | long getNumberRecords() throws ChangelogException |
| | | { |
| | | return log.getNumberOfRecords(); |
| | | } |
| | | |
| | | /** |
| | | * Dump this DB as text files, intended for debugging purpose only. |
| | | * |
| | | * @throws ChangelogException |
| | | * If an error occurs during dump |
| | | */ |
| | | void dumpAsTextFiles() throws ChangelogException { |
| | | log.dumpAsTextFile(log.getPath()); |
| | | } |
| | | |
| | | /** Parser of records persisted in the ReplicaDB log. */ |
| | | private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg> |
| | | { |
| | | |
| | | @Override |
| | | public ByteString encodeRecord(final Record<CSN, UpdateMsg> record) |
| | | { |
| | | final UpdateMsg message = record.getValue(); |
| | | return ByteString.wrap(message.getBytes()); |
| | | } |
| | | |
| | | @Override |
| | | public Record<CSN, UpdateMsg> decodeRecord(final ByteString data) throws DecodingException |
| | | { |
| | | try |
| | | { |
| | | final UpdateMsg msg = |
| | | (UpdateMsg) UpdateMsg.generateMsg(data.toByteArray(), ProtocolVersion.REPLICATION_PROTOCOL_V7); |
| | | return Record.from(msg.getCSN(), msg); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new DecodingException(e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | return new CSN(key); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String encodeKeyToString(CSN key) |
| | | { |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getMaxKey() |
| | | { |
| | | return CSN.MAX_CSN_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | |
| | | /** |
| | | * A cursor on ReplicaDB, which can re-initialize itself after exhaustion. |
| | | * <p> |
| | | * The cursor provides a java.sql.ResultSet like API : |
| | | * <pre> |
| | | * FileReplicaDBCursor cursor = ...; |
| | | * try { |
| | | * while (cursor.next()) { |
| | | * Record record = cursor.getRecord(); |
| | | * // ... can call cursor.getRecord() again: it will return the same result |
| | | * } |
| | | * } |
| | | * finally { |
| | | * close(cursor); |
| | | * } |
| | | * } |
| | | * </pre> |
| | | * <p> |
| | | * The cursor automatically re-initializes itself if it is exhausted: if a |
| | | * record is newly available, a subsequent call to the {@code next()} method will |
| | | * return {@code true} and the record will be available by calling {@code getRecord()} |
| | | * method. |
| | | * |
| | | * \@NotThreadSafe |
| | | */ |
| | | class FileReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | /** The underlying cursor. */ |
| | | private final RepositionableCursor<CSN, UpdateMsg> cursor; |
| | | |
| | | /** The next record to return. */ |
| | | private Record<CSN, UpdateMsg> nextRecord; |
| | | |
| | | /** The CSN to re-start with in case the cursor is exhausted. */ |
| | | private CSN lastNonNullCurrentCSN; |
| | | |
| | | private PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | | * Creates the cursor from provided log cursor and start CSN. |
| | | * |
| | | * @param cursor |
| | | * The underlying log cursor to read log. |
| | | * @param startCSN |
| | | * The CSN to use as a start point (excluded from cursor, the lowest |
| | | * CSN higher than this CSN is used as the real start point). |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to choose if cursor must |
| | | * start from the provided CSN or just after the provided CSN. |
| | | */ |
| | | FileReplicaDBCursor( |
| | | final RepositionableCursor<CSN, UpdateMsg> cursor, |
| | | final CSN startCSN, |
| | | final PositionStrategy positionStrategy) { |
| | | this.cursor = cursor; |
| | | this.lastNonNullCurrentCSN = startCSN; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return nextRecord == null ? null : nextRecord.getValue(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (cursor.next()) |
| | | { |
| | | nextRecord = cursor.getRecord(); |
| | | final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN); |
| | | if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY)) |
| | | { |
| | | // start CSN is found, switch to position strategy that always find the next |
| | | lastNonNullCurrentCSN = nextRecord.getKey(); |
| | | positionStrategy = AFTER_MATCHING_KEY; |
| | | return true; |
| | | } |
| | | } |
| | | // either cursor is exhausted or we still have not reached the start CSN |
| | | return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned(); |
| | | } |
| | | |
| | | /** Re-initialize the cursor after the last non null CSN. */ |
| | | private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException |
| | | { |
| | | final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | if (found && cursor.next()) |
| | | { |
| | | nextRecord = cursor.getRecord(); |
| | | lastNonNullCurrentCSN = nextRecord.getKey(); |
| | | return true; |
| | | } |
| | | nextRecord = null; |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | cursor.close(); |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.File; |
| | | import java.io.FileFilter; |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReadWriteLock; |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.Utils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * A multi-file log that features appending key-value records and reading them |
| | | * using a {@code DBCursor}. |
| | | * The records must be appended to the log in ascending order of the keys. |
| | | * <p> |
| | | * A log is defined for a path - the log path - and contains one to many log files: |
| | | * <ul> |
| | | * <li>it has always at least one log file, the head log file, named "head.log", |
| | | * which is used to append the records.</li> |
| | | * <li>it may have from zero to many read-only log files, which are named after |
| | | * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively |
| | | * the string representation of lowest and highest key present in the log file.</li> |
| | | * </ul> |
| | | * A read-only log file is created each time the head log file has reached the |
| | | * maximum size limit. The head log file is then rotated to the read-only file and a |
| | | * new empty head log file is opened. There is no limit on the number of read-only |
| | | * files, but they can be purged. |
| | | * <p> |
| | | * A log is obtained using the {@code Log.openLog()} method and must always be |
| | | * released using the {@code close()} method. |
| | | * <p> |
| | | * Usage example: |
| | | * <pre> |
| | | * {@code |
| | | * Log<K, V> log = null; |
| | | * try |
| | | * { |
| | | * log = Log.openLog(logPath, parser, maxFileSize); |
| | | * log.append(key, value); |
| | | * DBCursor<K, V> cursor = log.getCursor(someKey); |
| | | * // use cursor, then close cursor |
| | | * } |
| | | * finally |
| | | * { |
| | | * log.close(); |
| | | * } |
| | | * } |
| | | * </pre> |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | final class Log<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private static final String LOG_FILE_SUFFIX = ".log"; |
| | | |
| | | static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX; |
| | | |
| | | private static final String LOG_FILE_NAME_SEPARATOR = "_"; |
| | | |
| | | private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | | public boolean accept(File file) |
| | | { |
| | | return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) && |
| | | !file.getName().equals(HEAD_LOG_FILE_NAME); |
| | | } |
| | | }; |
| | | |
| | | /** Map that holds the unique log instance for each log path. */ |
| | | private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>(); |
| | | |
| | | /** |
| | | * The number of references on this log instance. It is incremented each time |
| | | * a log is opened on the same log path. The log is effectively closed only |
| | | * when the {@code close()} method is called and this value is at most 1. |
| | | */ |
| | | private int referenceCount; |
| | | |
| | | /** The path of directory for this log, where log files are stored. */ |
| | | private final File logPath; |
| | | |
| | | /** The parser used for encoding/decoding of records. */ |
| | | private final RecordParser<K, V> recordParser; |
| | | |
| | | /** |
| | | * Indicates if this log is closed. When the log is closed, all methods return |
| | | * immediately with no effect. |
| | | */ |
| | | private boolean isClosed; |
| | | |
| | | /** |
| | | * The log files contained in this log, ordered by key. |
| | | * <p> |
| | | * The head log file is always present and is associated with the maximum |
| | | * possible key, given by the record parser. |
| | | * <p> |
| | | * The read-only log files are associated with the highest key they contain. |
| | | */ |
| | | private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>(); |
| | | |
| | | /** |
| | | * The last key appended to the log. In order to keep the ordering of the keys |
| | | * in the log, any attempt to append a record with a key lower or equal to |
| | | * this key is rejected (no error but an event is logged). |
| | | */ |
| | | private K lastAppendedKey; |
| | | |
| | | /** |
| | | * The list of non-empty cursors opened on this log. Opened cursors may have |
| | | * to be updated when rotating the head log file. |
| | | */ |
| | | private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>(); |
| | | |
| | | /** |
| | | * A log file is rotated once it has exceeded this size limit. The log file can have |
| | | * a size much larger than this limit if the last record written has a huge size. |
| | | * |
| | | * TODO : to be replaced later by a (or a list of) configurable Rotation policy |
| | | * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>(); |
| | | */ |
| | | private final long sizeLimitPerLogFileInBytes; |
| | | |
| | | /** |
| | | * The exclusive lock used for writes and lifecycle operations on this log: |
| | | * initialize, clear, sync and close. |
| | | */ |
| | | private final Lock exclusiveLock; |
| | | |
| | | /** |
| | | * The shared lock used for reads and cursor operations on this log. |
| | | */ |
| | | private final Lock sharedLock; |
| | | |
| | | /** |
| | | * Open a log with the provided log path, record parser and maximum size per |
| | | * log file. |
| | | * <p> |
| | | * If no log exists for the provided path, a new one is created. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param logPath |
| | | * Path of the log. |
| | | * @param parser |
| | | * Parser for encoding/decoding of records. |
| | | * @param sizeLimitPerFileInBytes |
| | | * Limit in bytes before rotating the head log file of the log. |
| | | * @return a log |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath, |
| | | final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException |
| | | { |
| | | Reject.ifNull(logPath, parser); |
| | | @SuppressWarnings("unchecked") |
| | | Log<K, V> log = (Log<K, V>) logsCache.get(logPath); |
| | | if (log == null) |
| | | { |
| | | log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes); |
| | | logsCache.put(logPath, log); |
| | | } |
| | | else |
| | | { |
| | | // TODO : check that parser and size limit are compatible with the existing one, |
| | | // and issue a warning if it is not the case |
| | | log.referenceCount++; |
| | | } |
| | | return log; |
| | | } |
| | | |
| | | /** |
| | | * Release a reference to the log corresponding to provided path. The log is |
| | | * closed if this is the last reference. |
| | | */ |
| | | private static synchronized void releaseLog(final File logPath) |
| | | { |
| | | Log<?, ?> log = logsCache.get(logPath); |
| | | if (log == null) |
| | | { |
| | | // this should never happen |
| | | logger.error(ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath())); |
| | | return; |
| | | } |
| | | if (log.referenceCount > 1) |
| | | { |
| | | log.referenceCount--; |
| | | } |
| | | else |
| | | { |
| | | log.doClose(); |
| | | logsCache.remove(logPath); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new log. |
| | | * |
| | | * @param logPath |
| | | * The directory path of the log. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @param sizeLimitPerFile |
| | | * Limit in bytes before rotating a log file. |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile) |
| | | throws ChangelogException |
| | | { |
| | | this.logPath = logPath; |
| | | this.recordParser = parser; |
| | | this.sizeLimitPerLogFileInBytes = sizeLimitPerFile; |
| | | this.referenceCount = 1; |
| | | |
| | | final ReadWriteLock lock = new ReentrantReadWriteLock(false); |
| | | this.exclusiveLock = lock.writeLock(); |
| | | this.sharedLock = lock.readLock(); |
| | | |
| | | createOrOpenLogFiles(); |
| | | } |
| | | |
| | | /** Create or open log files used by this log. */ |
| | | private void createOrOpenLogFiles() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | createRootDirIfNotExists(); |
| | | openHeadLogFile(); |
| | | for (final File file : getReadOnlyLogFiles()) |
| | | { |
| | | openReadOnlyLogFile(file); |
| | | } |
| | | isClosed = false; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // ensure all log files opened at this point are closed |
| | | close(); |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private File[] getReadOnlyLogFiles() throws ChangelogException |
| | | { |
| | | File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER); |
| | | if (files == null) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath())); |
| | | } |
| | | return files; |
| | | } |
| | | |
| | | private void createRootDirIfNotExists() throws ChangelogException |
| | | { |
| | | if (!logPath.exists()) |
| | | { |
| | | if (!logPath.mkdirs()) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the path of this log. |
| | | * |
| | | * @return the path of this log directory |
| | | */ |
| | | public File getPath() |
| | | { |
| | | return logPath; |
| | | } |
| | | |
| | | /** |
| | | * Add the provided record at the end of this log. |
| | | * <p> |
| | | * The record must have a key strictly higher than the key |
| | | * of the last record added. If it is not the case, the record is not |
| | | * appended and the method returns immediately. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | | * to file system, it is necessary to explicitely call the |
| | | * {@code syncToFileSystem()} method. |
| | | * |
| | | * @param record |
| | | * The record to add. |
| | | * @throws ChangelogException |
| | | * If an error occurs while adding the record to the log. |
| | | */ |
| | | public void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | // If this exclusive lock happens to be a bottleneck : |
| | | // 1. use a shared lock for appending the record first |
| | | // 2. switch to an exclusive lock only if rotation is needed |
| | | // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | if (recordIsBreakingKeyOrdering(record)) |
| | | { |
| | | logger.info(LocalizableMessage.raw( |
| | | "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record, |
| | | lastAppendedKey != null ? lastAppendedKey : "null")); |
| | | return; |
| | | } |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes) |
| | | { |
| | | logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | |
| | | rotateHeadLogFile(); |
| | | headLogFile = getHeadLogFile(); |
| | | } |
| | | headLogFile.append(record); |
| | | lastAppendedKey = record.getKey(); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Indicates if the provided record has a key that would break the key |
| | | * ordering in the log. |
| | | */ |
| | | private boolean recordIsBreakingKeyOrdering(final Record<K, V> record) |
| | | { |
| | | return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0; |
| | | } |
| | | |
| | | /** |
| | | * Synchronize all records added with the file system, ensuring that records |
| | | * are effectively persisted. |
| | | * <p> |
| | | * After a successful call to this method, it is guaranteed that all records |
| | | * added to the log are persisted to the file system. |
| | | * |
| | | * @throws ChangelogException |
| | | * If the synchronization fails. |
| | | */ |
| | | public void syncToFileSystem() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | getHeadLogFile().syncToFileSystem(); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the first position. |
| | | * |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor() throws ChangelogException |
| | | { |
| | | LogCursor<K, V> cursor = null; |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | cursor.positionTo(null, null, null); |
| | | registerCursor(cursor); |
| | | return cursor; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | StaticUtils.close(cursor); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the provided key. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log. The |
| | | * starting position is defined by the provided key, cursor matching strategy |
| | | * and cursor positioning strategy. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy. |
| | | * @param positionStrategy |
| | | * The cursor positioning strategy. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | if (key == null) |
| | | { |
| | | return getCursor(); |
| | | } |
| | | LogCursor<K, V> cursor = null; |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy |
| | | if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | registerCursor(cursor); |
| | | return cursor; |
| | | } |
| | | else |
| | | { |
| | | StaticUtils.close(cursor); |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | StaticUtils.close(cursor); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Returns the oldest (first) record from this log. |
| | | * |
| | | * @return the oldest record, which may be {@code null} if there is no record |
| | | * in the log. |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | public Record<K, V> getOldestRecord() throws ChangelogException |
| | | { |
| | | return getOldestLogFile().getOldestRecord(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Returns the newest (last) record from this log. |
| | | * |
| | | * @return the newest record, which may be {@code null} |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | public Record<K, V> getNewestRecord() throws ChangelogException |
| | | { |
| | | return getHeadLogFile().getNewestRecord(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of records in the log. |
| | | * |
| | | * @return the number of records |
| | | * @throws ChangelogException |
| | | * If a problem occurs. |
| | | */ |
| | | public long getNumberOfRecords() throws ChangelogException |
| | | { |
| | | long count = 0; |
| | | for (final LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | count += logFile.getNumberOfRecords(); |
| | | } |
| | | return count; |
| | | } |
| | | |
| | | /** |
| | | * Purge the log up to and excluding the provided key. |
| | | * |
| | | * @param purgeKey |
| | | * the key up to which purging must happen |
| | | * @return the oldest non purged record, or {@code null} |
| | | * if no record was purged |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | } |
| | | final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey); |
| | | if (logFilesToPurge.isEmpty()) |
| | | { |
| | | return null; |
| | | } |
| | | final List<String> undeletableFiles = new ArrayList<String>(); |
| | | final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator(); |
| | | while (entriesToPurge.hasNext()) |
| | | { |
| | | final LogFile<K, V> logFile = entriesToPurge.next().getValue(); |
| | | try |
| | | { |
| | | logFile.close(); |
| | | logFile.delete(); |
| | | entriesToPurge.remove(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // The deletion of log file on file system has failed |
| | | undeletableFiles.add(logFile.getFile().getPath()); |
| | | } |
| | | } |
| | | if (!undeletableFiles.isEmpty()) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get( |
| | | Utils.joinAsString(", ", undeletableFiles))); |
| | | } |
| | | return getOldestRecord(); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Empties the log, discarding all records it contains. |
| | | * |
| | | * @throws ChangelogException |
| | | * If cursors are opened on this log, or if a problem occurs during |
| | | * clearing operation. |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | if (!openCursors.isEmpty()) |
| | | { |
| | | // Allow opened cursors at this point, but turn them into empty cursors. |
| | | // This behavior is needed by the change number indexer thread. |
| | | switchCursorsOpenedIntoEmptyCursors(); |
| | | } |
| | | |
| | | // delete all log files |
| | | final List<String> undeletableFiles = new ArrayList<String>(); |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | try |
| | | { |
| | | logFile.close(); |
| | | logFile.delete(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | undeletableFiles.add(logFile.getFile().getPath()); |
| | | } |
| | | } |
| | | if (!undeletableFiles.isEmpty()) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get( |
| | | Utils.joinAsString(", ", undeletableFiles))); |
| | | } |
| | | logFiles.clear(); |
| | | |
| | | // recreate an empty head log file |
| | | openHeadLogFile(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Dump this log as a text files, intended for debugging purpose only. |
| | | * |
| | | * @param dumpDirectory |
| | | * Directory that will contains log files with text format |
| | | * and ".txt" extensions |
| | | * @throws ChangelogException |
| | | * If an error occurs during dump |
| | | */ |
| | | void dumpAsTextFile(File dumpDirectory) throws ChangelogException |
| | | { |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt")); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | releaseLog(logPath); |
| | | } |
| | | |
| | | /** Effectively close this log. */ |
| | | private void doClose() |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | if (!openCursors.isEmpty()) |
| | | { |
| | | logger.error(ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size())); |
| | | } |
| | | StaticUtils.close(logFiles.values()); |
| | | isClosed = true; |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private LogFile<K, V> getHeadLogFile() |
| | | { |
| | | return logFiles.lastEntry().getValue(); |
| | | } |
| | | |
| | | private LogFile<K, V> getOldestLogFile() |
| | | { |
| | | return logFiles.firstEntry().getValue(); |
| | | } |
| | | |
| | | /** |
| | | * Rotate the head log file to a read-only log file, and open a new empty head |
| | | * log file to write in. |
| | | * <p> |
| | | * All cursors opened on this log are temporarily disabled (closing underlying resources) |
| | | * and then re-open with their previous state. |
| | | */ |
| | | private void rotateHeadLogFile() throws ChangelogException |
| | | { |
| | | // Temporarily disable cursors opened on head, saving their state |
| | | final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead(); |
| | | |
| | | final LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile)); |
| | | headLogFile.close(); |
| | | renameHeadLogFileTo(readOnlyLogFile); |
| | | |
| | | openHeadLogFile(); |
| | | openReadOnlyLogFile(readOnlyLogFile); |
| | | |
| | | // Re-enable cursors previously opened on head, with the saved state |
| | | updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead); |
| | | } |
| | | |
| | | private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException |
| | | { |
| | | final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME); |
| | | try |
| | | { |
| | | StaticUtils.renameFile(headLogFile, rotatedLogFile); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the key bounds for the provided log file. |
| | | * |
| | | * @return the pair of (lowest key, highest key) that correspond to records |
| | | * stored in the corresponding log file. |
| | | * @throws ChangelogException |
| | | * if an error occurs while retrieving the keys |
| | | */ |
| | | private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final String name = logFile.getFile().getName(); |
| | | final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length()) |
| | | .split(LOG_FILE_NAME_SEPARATOR); |
| | | return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1])); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the file name to use for the read-only version of the provided |
| | | * log file. |
| | | * <p> |
| | | * The file name is based on the lowest and highest key in the log file. |
| | | * |
| | | * @return the name to use for the read-only version of the log file |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException |
| | | { |
| | | final K lowestKey = logFile.getOldestRecord().getKey(); |
| | | final K highestKey = logFile.getNewestRecord().getKey(); |
| | | return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR |
| | | + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX; |
| | | } |
| | | |
| | | /** Update the cursors that were pointing to head after a rotation of the head log file. */ |
| | | private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors) |
| | | throws ChangelogException |
| | | { |
| | | for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors) |
| | | { |
| | | final CursorState<K, V> cursorState = pair.getSecond(); |
| | | |
| | | // Need to update the cursor only if it is pointing to the head log file |
| | | if (isHeadLogFile(cursorState.logFile)) |
| | | { |
| | | final K previousKey = logFiles.lowerKey(recordParser.getMaxKey()); |
| | | final LogFile<K, V> logFile = findLogFileFor(previousKey); |
| | | final LogCursor<K, V> cursor = pair.getFirst(); |
| | | cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException |
| | | { |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | { |
| | | cursor.actAsEmptyCursor(); |
| | | } |
| | | openCursors.clear(); |
| | | } |
| | | |
| | | /** |
| | | * Disable the cursors opened on the head log file log, by closing their underlying cursor. |
| | | * Returns the state of each cursor just before the close operation. |
| | | * |
| | | * @return the pairs (cursor, cursor state) for each cursor pointing to head log file. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException |
| | | { |
| | | final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates = |
| | | new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>(); |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | { |
| | | if (isHeadLogFile(cursor.currentLogFile)) |
| | | { |
| | | openCursorsStates.add(Pair.of(cursor, cursor.getState())); |
| | | cursor.closeUnderlyingCursor(); |
| | | } |
| | | } |
| | | return openCursorsStates; |
| | | } |
| | | |
| | | private void openHeadLogFile() throws ChangelogException |
| | | { |
| | | final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser); |
| | | final Record<K,V> newestRecord = head.getNewestRecord(); |
| | | lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null; |
| | | logFiles.put(recordParser.getMaxKey(), head); |
| | | } |
| | | |
| | | private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException |
| | | { |
| | | final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser); |
| | | final Pair<K, K> bounds = getKeyBounds(logFile); |
| | | logFiles.put(bounds.getSecond(), logFile); |
| | | } |
| | | |
| | | private void registerCursor(final LogCursor<K, V> cursor) |
| | | { |
| | | openCursors.add(cursor); |
| | | } |
| | | |
| | | private void unregisterCursor(final LogCursor<K, V> cursor) |
| | | { |
| | | openCursors.remove(cursor); |
| | | } |
| | | |
| | | /** |
| | | * Returns the log file that is just after the provided log file wrt the order |
| | | * defined on keys, or {@code null} if the provided log file is the last one |
| | | * (the head log file). |
| | | */ |
| | | private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException |
| | | { |
| | | if (isHeadLogFile(currentLogFile)) |
| | | { |
| | | return null; |
| | | } |
| | | final Pair<K, K> bounds = getKeyBounds(currentLogFile); |
| | | return logFiles.higherEntry(bounds.getSecond()).getValue(); |
| | | } |
| | | |
| | | private boolean isHeadLogFile(final LogFile<K, V> logFile) |
| | | { |
| | | return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME); |
| | | } |
| | | |
| | | /** Returns the log file that should contain the provided key. */ |
| | | private LogFile<K, V> findLogFileFor(final K key) |
| | | { |
| | | if (key == null || logFiles.lowerKey(key) == null) |
| | | { |
| | | return getOldestLogFile(); |
| | | } |
| | | return logFiles.ceilingEntry(key).getValue(); |
| | | } |
| | | |
| | | /** |
| | | * Represents a DB Cursor than can be repositioned on a given key. |
| | | * <p> |
| | | * Note that as a DBCursor, it provides a java.sql.ResultSet like API. |
| | | */ |
| | | static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>> |
| | | { |
| | | /** |
| | | * Position the cursor to the record corresponding to the provided key and |
| | | * provided matching and positioning strategies. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, use the key of the first record instead. |
| | | * @param matchStrategy |
| | | * The cursor key matching strategy. |
| | | * @param positionStrategy |
| | | * The cursor positioning strategy. |
| | | * @return {@code true} if cursor is successfully positioned, or |
| | | * {@code false} otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs when positioning cursor. |
| | | */ |
| | | boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | } |
| | | |
| | | /** |
| | | * Implements a cursor on the log. |
| | | * <p> |
| | | * The cursor uses the log shared lock to ensure reads are not done during a rotation. |
| | | * <p> |
| | | * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()} |
| | | * method. |
| | | */ |
| | | private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V> |
| | | { |
| | | private final Log<K, V> log; |
| | | |
| | | private LogFile<K, V> currentLogFile; |
| | | private LogFileCursor<K, V> currentCursor; |
| | | private boolean actAsEmptyCursor; |
| | | |
| | | /** |
| | | * Creates a cursor on the provided log. |
| | | * |
| | | * @param log |
| | | * The log on which the cursor read records. |
| | | * @throws ChangelogException |
| | | * If an error occurs when creating the cursor. |
| | | */ |
| | | private LogCursor(final Log<K, V> log) throws ChangelogException |
| | | { |
| | | this.log = log; |
| | | this.actAsEmptyCursor = false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K, V> getRecord() |
| | | { |
| | | return currentCursor != null ? currentCursor.getRecord() : null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (actAsEmptyCursor) |
| | | { |
| | | return false; |
| | | } |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | final boolean hasNext = currentCursor.next(); |
| | | if (hasNext) |
| | | { |
| | | return true; |
| | | } |
| | | final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile); |
| | | if (logFile != null) |
| | | { |
| | | switchToLogFile(logFile); |
| | | return currentCursor.next(); |
| | | } |
| | | return false; |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | log.unregisterCursor(this); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo( |
| | | final K key, |
| | | final KeyMatchingStrategy matchStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | if (actAsEmptyCursor) |
| | | { |
| | | return false; |
| | | } |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | final LogFile<K, V> logFile = log.findLogFileFor(key); |
| | | if (logFile != currentLogFile) |
| | | { |
| | | switchToLogFile(logFile); |
| | | } |
| | | return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** Returns the state of this cursor. */ |
| | | private CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | return !actAsEmptyCursor ? |
| | | new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null; |
| | | } |
| | | |
| | | private void closeUnderlyingCursor() |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | } |
| | | |
| | | /** Reinitialize this cursor to the provided state. */ |
| | | private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException |
| | | { |
| | | if (!actAsEmptyCursor) |
| | | { |
| | | currentLogFile = cursorState.logFile; |
| | | currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition); |
| | | } |
| | | } |
| | | |
| | | /** Turn this cursor into an empty cursor, with no actual resource used. */ |
| | | private void actAsEmptyCursor() |
| | | { |
| | | currentLogFile = null; |
| | | currentCursor = null; |
| | | actAsEmptyCursor = true; |
| | | } |
| | | |
| | | /** Switch the cursor to the provided log file. */ |
| | | private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | currentLogFile = logFile; |
| | | currentCursor = currentLogFile.getCursor(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | public String toString() |
| | | { |
| | | return actAsEmptyCursor ? |
| | | String.format("Cursor on log : %s, acting as empty cursor", log.logPath) : |
| | | String.format("Cursor on log : %s, current log file: %s, current cursor: %s", |
| | | log.logPath, currentLogFile.getFile().getName(), currentCursor); |
| | | } |
| | | } |
| | | |
| | | /** An empty cursor, that always return null records and false to {@code next()} method. */ |
| | | static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V> |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "EmptyLogCursor"; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Represents the state of a cursor. |
| | | * <p> |
| | | * The state is used to update a cursor when rotating the head log file : the |
| | | * state of cursor on head log file must be reported to the new read-only log |
| | | * file that is created when rotating. |
| | | */ |
| | | private static class CursorState<K extends Comparable<K>, V> |
| | | { |
| | | /** The log file. */ |
| | | private final LogFile<K, V> logFile; |
| | | |
| | | /** |
| | | * The position of the reader on the log file. It is the offset from the |
| | | * beginning of the file, in bytes, at which the next read occurs. |
| | | */ |
| | | private final long filePosition; |
| | | |
| | | /** The record the cursor is pointing to. */ |
| | | private final Record<K,V> record; |
| | | |
| | | private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record) |
| | | { |
| | | this.logFile = logFile; |
| | | this.filePosition = filePosition; |
| | | this.record = record; |
| | | } |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import java.io.BufferedWriter; |
| | | import java.io.Closeable; |
| | | import java.io.File; |
| | | import java.io.FileWriter; |
| | | import java.io.IOException; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * A log file, containing part of a {@code Log}. The log file may be: |
| | | * <ul> |
| | | * <li>write-enabled : allowing to append key-value records and read records |
| | | * from cursors,</li> |
| | | * <li>read-only : allowing to read records from cursors.</li> |
| | | * </ul> |
| | | * <p> |
| | | * A log file is NOT intended to be used directly, but only has part of a |
| | | * {@code Log}. In particular, there is no concurrency management and no checks |
| | | * to ensure that log is not closed when performing any operation on it. Those |
| | | * are managed at the {@code Log} level. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | final class LogFile<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | /** The file containing the records. */ |
| | | private final File logfile; |
| | | |
| | | /** The pool to obtain a reader on the log. */ |
| | | private final LogReaderPool<K, V> readerPool; |
| | | |
| | | /** |
| | | * The writer on the log file, which may be {@code null} if log file is not |
| | | * write-enabled. |
| | | */ |
| | | private final BlockLogWriter<K, V> writer; |
| | | |
| | | /** Indicates if log is enabled for write. */ |
| | | private final boolean isWriteEnabled; |
| | | |
| | | /** |
| | | * Creates a new log file. |
| | | * |
| | | * @param logFilePath |
| | | * Path of the log file. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @param isWriteEnabled |
| | | * {@code true} if this changelog is write-enabled, {@code false} |
| | | * otherwise. |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled) |
| | | throws ChangelogException |
| | | { |
| | | Reject.ifNull(logFilePath, parser); |
| | | this.logfile = logFilePath; |
| | | this.isWriteEnabled = isWriteEnabled; |
| | | |
| | | createLogFileIfNotExists(); |
| | | if (isWriteEnabled) |
| | | { |
| | | ensureLogFileIsValid(parser); |
| | | writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser); |
| | | } |
| | | else |
| | | { |
| | | writer = null; |
| | | } |
| | | readerPool = new LogReaderPool<K, V>(logfile, parser); |
| | | } |
| | | |
| | | /** |
| | | * Creates a read-only log file with the provided root path and record parser. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param logFilePath |
| | | * Path of the log file. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @return a read-only log file |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath, |
| | | final RecordParser<K, V> parser) throws ChangelogException |
| | | { |
| | | return new LogFile<K, V>(logFilePath, parser, false); |
| | | } |
| | | |
| | | /** |
| | | * Creates a write-enabled log file that appends records to the end of file, |
| | | * with the provided root path and record parser. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param logFilePath |
| | | * Path of the log file. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @return a write-enabled log file |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath, |
| | | final RecordParser<K, V> parser) throws ChangelogException |
| | | { |
| | | return new LogFile<K, V>(logFilePath, parser, true); |
| | | } |
| | | |
| | | /** |
| | | * Returns the file containing the records. |
| | | * |
| | | * @return the file |
| | | */ |
| | | File getFile() |
| | | { |
| | | return logfile; |
| | | } |
| | | |
| | | private void checkLogIsEnabledForWrite() throws ChangelogException |
| | | { |
| | | if (!isWriteEnabled) |
| | | { |
| | | throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath())); |
| | | } |
| | | } |
| | | |
| | | private void createLogFileIfNotExists() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | if (!logfile.exists()) |
| | | { |
| | | logfile.createNewFile(); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE.get(logfile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Ensure that log file is not corrupted, by checking it is valid and cleaning |
| | | * the end of file if necessary, to remove a partially written record. |
| | | * <p> |
| | | * If log file is cleaned to remove a partially written record, then a message |
| | | * is logged for information. |
| | | * |
| | | * @throws ChangelogException |
| | | * If an error occurs or if log file is corrupted and can't be |
| | | * cleaned |
| | | */ |
| | | private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException |
| | | { |
| | | BlockLogReader<K, V> reader = null; |
| | | try |
| | | { |
| | | final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws"); |
| | | reader = BlockLogReader.newReader(logfile, readerWriter, parser) ; |
| | | final long lastValidPosition = reader.checkLogIsValid(); |
| | | if (lastValidPosition != -1) |
| | | { |
| | | // truncate the file to point where last valid record has been read |
| | | readerWriter.setLength(lastValidPosition); |
| | | logger.error(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath())); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get( |
| | | logfile.getPath(), |
| | | StaticUtils.stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add the provided record at the end of this log. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | | * to file system, it is necessary to explicitely call the |
| | | * {@code syncToFileSystem()} method. |
| | | * |
| | | * @param record |
| | | * The record to add. |
| | | * @throws ChangelogException |
| | | * If the record can't be added to the log. |
| | | */ |
| | | void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | checkLogIsEnabledForWrite(); |
| | | writer.write(record); |
| | | } |
| | | |
| | | /** |
| | | * Dump this log file as a text file, intended for debugging purpose only. |
| | | * |
| | | * @param dumpFile |
| | | * File that will contains log records using a human-readable format |
| | | * @throws ChangelogException |
| | | * If an error occurs during dump |
| | | */ |
| | | void dumpAsTextFile(File dumpFile) throws ChangelogException |
| | | { |
| | | DBCursor<Record<K, V>> cursor = getCursor(); |
| | | BufferedWriter textWriter = null; |
| | | try |
| | | { |
| | | textWriter = new BufferedWriter(new FileWriter(dumpFile)); |
| | | while (cursor.getRecord() != null) |
| | | { |
| | | Record<K, V> record = cursor.getRecord(); |
| | | textWriter.write("key=" + record.getKey()); |
| | | textWriter.write(" | "); |
| | | textWriter.write("value=" + record.getValue()); |
| | | textWriter.write('\n'); |
| | | cursor.next(); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // No I18N needed, used for debugging purpose only |
| | | throw new ChangelogException( |
| | | LocalizableMessage.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), |
| | | e); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(textWriter); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Synchronize all records added with the file system, ensuring that records |
| | | * are effectively persisted. |
| | | * <p> |
| | | * After a successful call to this method, it is guaranteed that all records |
| | | * added to the log are persisted to the file system. |
| | | * |
| | | * @throws ChangelogException |
| | | * If the synchronization fails. |
| | | */ |
| | | void syncToFileSystem() throws ChangelogException |
| | | { |
| | | checkLogIsEnabledForWrite(); |
| | | try |
| | | { |
| | | writer.sync(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the first position. |
| | | * |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | LogFileCursor<K, V> getCursor() throws ChangelogException |
| | | { |
| | | return new LogFileCursor<K, V>(this); |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor initialised to the provided record and position in file. |
| | | * |
| | | * @param record |
| | | * The initial record this cursor points on |
| | | * @param position |
| | | * The file position this cursor points on |
| | | * @return the cursor |
| | | * @throws ChangelogException |
| | | * If a problem occurs while creating the cursor. |
| | | */ |
| | | LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException |
| | | { |
| | | return new LogFileCursor<K, V>(this, record, position); |
| | | } |
| | | |
| | | /** |
| | | * Returns the oldest (first) record from this log. |
| | | * |
| | | * @return the oldest record, which may be {@code null} if there is no record |
| | | * in the log. |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | Record<K, V> getOldestRecord() throws ChangelogException |
| | | { |
| | | DBCursor<Record<K, V>> cursor = null; |
| | | try |
| | | { |
| | | cursor = getCursor(); |
| | | return cursor.next() ? cursor.getRecord() : null; |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the newest (last) record from this log. |
| | | * |
| | | * @return the newest record, which may be {@code null} |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | Record<K, V> getNewestRecord() throws ChangelogException |
| | | { |
| | | // TODO : need a more efficient way to retrieve it |
| | | DBCursor<Record<K, V>> cursor = null; |
| | | try |
| | | { |
| | | cursor = getCursor(); |
| | | Record<K, V> record = null; |
| | | while (cursor.next()) |
| | | { |
| | | record = cursor.getRecord(); |
| | | } |
| | | return record; |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of records in the log. |
| | | * |
| | | * @return the number of records |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | long getNumberOfRecords() throws ChangelogException |
| | | { |
| | | // TODO : need a more efficient way to retrieve it |
| | | DBCursor<Record<K, V>> cursor = null; |
| | | try |
| | | { |
| | | cursor = getCursor(); |
| | | long counter = 0L; |
| | | while (cursor.next()) |
| | | { |
| | | counter++; |
| | | } |
| | | return counter; |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | public void close() |
| | | { |
| | | if (isWriteEnabled) |
| | | { |
| | | try |
| | | { |
| | | syncToFileSystem(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | logger.traceException(e); |
| | | } |
| | | writer.close(); |
| | | } |
| | | readerPool.shutdown(); |
| | | } |
| | | |
| | | /** |
| | | * Delete this log file (file is physically removed). Should be called only |
| | | * when log file is closed. |
| | | * |
| | | * @throws ChangelogException |
| | | * If log file can't be deleted. |
| | | */ |
| | | void delete() throws ChangelogException |
| | | { |
| | | final boolean isDeleted = logfile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath())); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Return the size of this log file in bytes. |
| | | * |
| | | * @return the size of log file |
| | | */ |
| | | long getSizeInBytes() |
| | | { |
| | | return writer.getBytesWritten(); |
| | | } |
| | | |
| | | /** The path of this log file as a String. */ |
| | | private String getPath() |
| | | { |
| | | return logfile.getPath(); |
| | | } |
| | | |
| | | /** |
| | | * Returns a reader for this log. |
| | | * <p> |
| | | * Assumes that calling methods ensure that log is not closed. |
| | | */ |
| | | private BlockLogReader<K, V> getReader() throws ChangelogException |
| | | { |
| | | return readerPool.get(); |
| | | } |
| | | |
| | | /** Release the provided reader. */ |
| | | private void releaseReader(BlockLogReader<K, V> reader) { |
| | | readerPool.release(reader); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | return logfile.hashCode(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean equals(Object that) |
| | | { |
| | | if (this == that) |
| | | { |
| | | return true; |
| | | } |
| | | if (!(that instanceof LogFile)) |
| | | { |
| | | return false; |
| | | } |
| | | final LogFile<?, ?> other = (LogFile<?, ?>) that; |
| | | return logfile.equals(other.logfile); |
| | | } |
| | | |
| | | /** Implements a repositionable cursor on the log file. */ |
| | | static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V> |
| | | { |
| | | /** The underlying log on which entries are read. */ |
| | | private final LogFile<K, V> logFile; |
| | | |
| | | /** To read the records. */ |
| | | private final BlockLogReader<K, V> reader; |
| | | |
| | | /** The current available record, may be {@code null}. */ |
| | | private Record<K,V> currentRecord; |
| | | |
| | | /** |
| | | * The initial record when starting from a given key. It must be |
| | | * stored because it is read in advance. |
| | | */ |
| | | private Record<K,V> initialRecord; |
| | | |
| | | /** |
| | | * Creates a cursor on the provided log. |
| | | * |
| | | * @param logFile |
| | | * The log on which the cursor read records. |
| | | * @throws ChangelogException |
| | | * If an error occurs when creating the cursor. |
| | | */ |
| | | private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException |
| | | { |
| | | this.logFile = logFile; |
| | | this.reader = logFile.getReader(); |
| | | } |
| | | |
| | | /** |
| | | * Creates a cursor on the provided log, initialised to the provided record and |
| | | * pointing to the provided file position. |
| | | * <p> |
| | | * Note: there is no check to ensure that provided record and file position are |
| | | * consistent. It is the responsability of the caller of this method. |
| | | */ |
| | | private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException |
| | | { |
| | | this.logFile = logFile; |
| | | this.reader = logFile.getReader(); |
| | | this.currentRecord = record; |
| | | reader.seekToPosition(filePosition); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (initialRecord != null) |
| | | { |
| | | // initial record is used only once |
| | | currentRecord = initialRecord; |
| | | initialRecord = null; |
| | | return true; |
| | | } |
| | | currentRecord = reader.readRecord(); |
| | | return currentRecord != null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | { |
| | | return currentRecord; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos) |
| | | throws ChangelogException { |
| | | final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos); |
| | | final boolean found = result.getFirst(); |
| | | initialRecord = found ? result.getSecond() : null; |
| | | return found; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | logFile.releaseReader(reader); |
| | | } |
| | | |
| | | /** |
| | | * Returns the file position this cursor is pointing at. |
| | | * |
| | | * @return the position of reader on the log file |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | long getFilePosition() throws ChangelogException |
| | | { |
| | | return reader.getFilePosition(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | public String toString() |
| | | { |
| | | return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord); |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.File; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * A Pool of readers to a log file. |
| | | |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | // TODO : implement a real pool - reusing readers instead of opening-closing them each time |
| | | class LogReaderPool<K extends Comparable<K>, V> |
| | | { |
| | | /** The file to read. */ |
| | | private final File file; |
| | | |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | /** |
| | | * Creates a pool of readers for provided file. |
| | | * |
| | | * @param file |
| | | * The file to read. |
| | | * @param parser |
| | | * The parser to decode the records read. |
| | | */ |
| | | LogReaderPool(File file, RecordParser<K, V> parser) |
| | | { |
| | | this.file = file; |
| | | this.parser = parser; |
| | | } |
| | | |
| | | /** |
| | | * Returns a random access reader on the provided file. |
| | | * <p> |
| | | * The acquired reader must be released with the {@code release()} |
| | | * method. |
| | | * |
| | | * @return a random access reader |
| | | * @throws ChangelogException |
| | | * If the file can't be found or read. |
| | | */ |
| | | BlockLogReader<K, V> get() throws ChangelogException |
| | | { |
| | | return getReader(file); |
| | | } |
| | | |
| | | /** |
| | | * Release the provided reader. |
| | | * <p> |
| | | * Once released, this reader must not be used any more. |
| | | * |
| | | * @param reader |
| | | * The random access reader to a file previously acquired with this |
| | | * pool. |
| | | */ |
| | | void release(BlockLogReader<K, V> reader) |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | |
| | | /** Returns a random access file to read this log. */ |
| | | private BlockLogReader<K, V> getReader(File file) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_READER_ON_LOG_FILE.get(file.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this pool, releasing all files handles opened |
| | | * on the file. |
| | | */ |
| | | void shutdown() |
| | | { |
| | | // Nothing to do yet as no file handle is kept opened. |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.File; |
| | | import java.io.FileDescriptor; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.OutputStream; |
| | | import java.io.SyncFailedException; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.opends.server.loggers.MeteredStream; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * A writer on a log file. |
| | | */ |
| | | class LogWriter extends OutputStream |
| | | { |
| | | /** The file to write in. */ |
| | | private final File file; |
| | | |
| | | /** The stream to write data in the file, capable of counting bytes written. */ |
| | | private final MeteredStream stream; |
| | | |
| | | /** The file descriptor on the file. */ |
| | | private final FileDescriptor fileDescriptor; |
| | | |
| | | /** |
| | | * Creates a writer on the provided file. |
| | | * |
| | | * @param file |
| | | * The file to write. |
| | | * @throws ChangelogException |
| | | * If a problem occurs at creation. |
| | | */ |
| | | public LogWriter(final File file) throws ChangelogException |
| | | { |
| | | this.file = file; |
| | | try |
| | | { |
| | | FileOutputStream fos = new FileOutputStream(file, true); |
| | | this.stream = new MeteredStream(fos, file.length()); |
| | | this.fileDescriptor = fos.getFD(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath())); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the file used by this writer. |
| | | * |
| | | * @return the file |
| | | */ |
| | | File getFile() |
| | | { |
| | | return file; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void write(int b) throws IOException |
| | | { |
| | | stream.write(b); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void write(byte[] b) throws IOException |
| | | { |
| | | stream.write(b); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void write(byte[] b, int off, int len) throws IOException |
| | | { |
| | | stream.write(b, off, len); |
| | | } |
| | | |
| | | /** |
| | | * Writes the provided byte string to the underlying output stream of this writer. |
| | | * |
| | | * @param bs |
| | | * The byte string to write. |
| | | * @throws IOException |
| | | * if an I/O error occurs. In particular, an IOException may be |
| | | * thrown if the output stream has been closed. |
| | | */ |
| | | public void write(ByteString bs) throws IOException |
| | | { |
| | | bs.copyTo(stream); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of bytes written in the underlying file. |
| | | * |
| | | * @return the number of bytes |
| | | */ |
| | | public long getBytesWritten() |
| | | { |
| | | return stream.getBytesWritten(); |
| | | } |
| | | |
| | | /** |
| | | * Synchronize all modifications to the file to the underlying device. |
| | | * |
| | | * @throws SyncFailedException |
| | | * If synchronization fails. |
| | | */ |
| | | void sync() throws SyncFailedException { |
| | | fileDescriptor.sync(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | StaticUtils.close(stream); |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | /** |
| | | * Represents a record, which is a pair of key-value. |
| | | * |
| | | * @param <K> |
| | | * The type of a key. |
| | | * @param <V> |
| | | * The type of a value. |
| | | */ |
| | | class Record<K, V> |
| | | { |
| | | private final K key; |
| | | private final V value; |
| | | |
| | | /** |
| | | * Creates a record from provided key and value. |
| | | * |
| | | * @param key |
| | | * The key. |
| | | * @param value |
| | | * The value. |
| | | */ |
| | | private Record(final K key, final V value) |
| | | { |
| | | this.key = key; |
| | | this.value = value; |
| | | } |
| | | |
| | | /** |
| | | * Create a record from provided key and value. |
| | | * |
| | | * @param <K> |
| | | * The type of the key. |
| | | * @param <V> |
| | | * The type of the value. |
| | | * @param key |
| | | * The key. |
| | | * @param value |
| | | * The value. |
| | | * @return a record |
| | | */ |
| | | static <K, V> Record<K, V> from(final K key, final V value) { |
| | | return new Record<K, V>(key, value); |
| | | } |
| | | |
| | | /** |
| | | * Returns the key of this record. |
| | | * |
| | | * @return the key |
| | | */ |
| | | K getKey() |
| | | { |
| | | return key; |
| | | } |
| | | |
| | | /** |
| | | * Returns the value of this record. |
| | | * |
| | | * @return the value |
| | | */ |
| | | V getValue() |
| | | { |
| | | return value; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | final int prime = 31; |
| | | int result = 1; |
| | | result = prime * result + ((key == null) ? 0 : key.hashCode()); |
| | | result = prime * result + ((value == null) ? 0 : value.hashCode()); |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean equals(Object that) |
| | | { |
| | | if (this == that) |
| | | { |
| | | return true; |
| | | } |
| | | if (!(that instanceof Record)) |
| | | { |
| | | return false; |
| | | } |
| | | Record<?, ?> other = (Record<?, ?>) that; |
| | | final boolean keyEquals = key == null ? other.key == null : key.equals(other.key); |
| | | if (!keyEquals) |
| | | { |
| | | return false; |
| | | } |
| | | return value == null ? other.value == null : value.equals(other.value); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() { |
| | | return "Record [" + key + ":" + value + "]"; |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | |
| | | /** |
| | | * Parser of a log record. |
| | | * <p> |
| | | * The parser allows to convert from a object to its binary representation |
| | | * and to convert back from the binary representation to the object. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of the record. |
| | | * @param <V> |
| | | * Type of the value of the record. |
| | | */ |
| | | interface RecordParser<K, V> |
| | | { |
| | | /** |
| | | * Decode a record from the provided byte array. |
| | | * <p> |
| | | * The record is expected to have been encoded using the {@code writeRecord()} |
| | | * method. |
| | | * |
| | | * @param data |
| | | * The raw data to read the record from. |
| | | * @return the decoded record, or {@code null} if there is no more record to |
| | | * read, or only an incomplete record |
| | | * @throws DecodingException |
| | | * If an error occurs while decoding the record. |
| | | */ |
| | | Record<K, V> decodeRecord(ByteString data) throws DecodingException; |
| | | |
| | | /** |
| | | * Encode the provided record to a byte array. |
| | | * <p> |
| | | * The returned array is intended to be stored as provided in the log file. |
| | | * |
| | | * @param record |
| | | * The record to encode. |
| | | * @return the bytes array representing the (key,value) record |
| | | */ |
| | | ByteString encodeRecord(Record<K, V> record); |
| | | |
| | | /** |
| | | * Read the key from the provided string. |
| | | * |
| | | * @param key |
| | | * The string representation of key, suitable for use in a filename, |
| | | * as written by the {@code encodeKeyToString()} method. |
| | | * @return the key |
| | | * @throws ChangelogException |
| | | * If key can't be read from the string. |
| | | */ |
| | | K decodeKeyFromString(String key) throws ChangelogException; |
| | | |
| | | /** |
| | | * Returns the provided key as a string that is suitable to be used in a |
| | | * filename. |
| | | * |
| | | * @param key |
| | | * The key of a record. |
| | | * @return a string encoding the key, unambiguously decodable to the original |
| | | * key, and suitable for use in a filename. The string should contain |
| | | * only ASCII characters and no space. |
| | | */ |
| | | String encodeKeyToString(K key); |
| | | |
| | | /** |
| | | * Returns a key that is guaranted to be always higher than any other key. |
| | | * |
| | | * @return the highest possible key |
| | | */ |
| | | K getMaxKey(); |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.BufferedReader; |
| | | import java.io.BufferedWriter; |
| | | import java.io.File; |
| | | import java.io.FileFilter; |
| | | import java.io.FileInputStream; |
| | | import java.io.FileNotFoundException; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.InputStreamReader; |
| | | import java.io.OutputStreamWriter; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.io.Writer; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * Represents the replication environment, which allows to manage the lifecycle |
| | | * of the replication changelog. |
| | | * <p> |
| | | * A changelog has a root directory, under which the following directories and files are |
| | | * created : |
| | | * <ul> |
| | | * <li>A "changenumberindex" directory containing the log files for |
| | | * ChangeNumberIndexDB</li> |
| | | * <li>A "domains.state" file containing a mapping of each domain DN to an id. The |
| | | * id is used to name the corresponding domain directory.</li> |
| | | * <li>One directory per domain, named after "[id].domain" where [id] is the id |
| | | * assigned to the domain, as specified in the "domains.state" file.</li> |
| | | * </ul> |
| | | * <p> |
| | | * Each domain directory contains the following directories and files : |
| | | * <ul> |
| | | * <li>A "generation_[id].id" file, where [id] is the generation id</li> |
| | | * <li>One directory per server id, named after "[id].server" where [id] is the |
| | | * id of the server.</li> |
| | | * </ul> |
| | | * Each server id directory contains the following files : |
| | | * <ul> |
| | | * <li>The "head.log" file, which is the more recent log file where records are appended.</li> |
| | | * <li>Zero to many read-only log files named after the lowest key |
| | | * and highest key present in the log file (they all end with the ".log" suffix.</li> |
| | | * <li>Optionally, a "offline.state" file that indicates that this particular server id |
| | | * of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li> |
| | | * </ul> |
| | | * See {@code Log} class for details on the log files. |
| | | * |
| | | * <p> |
| | | * Layout example with two domains "o=test1" and "o=test2", each having server |
| | | * ids 22 and 33, with server id 33 for domain "o=test1" being offline : |
| | | * |
| | | * <pre> |
| | | * +---changelog |
| | | * | \---domains.state [contains mapping: 1 => "o=test1", 2 => "o=test2"] |
| | | * | \---changenumberindex |
| | | * | \--- head.log [contains last records written] |
| | | * | \--- 1_50.log [contains records with keys in interval [1, 50]] |
| | | * | \---1.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---head.log [contains last records written] |
| | | * | \---33.server |
| | | * | \---head.log [contains last records written] |
| | | * \---offline.state |
| | | * | \---2.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---head.log [contains last records written] |
| | | * | \---33.server |
| | | * | \---head.log [contains last records written] |
| | | * </pre> |
| | | */ |
| | | class ReplicationEnvironment |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024; |
| | | |
| | | private static final int NO_GENERATION_ID = -1; |
| | | |
| | | private static final String CN_INDEX_DB_DIRNAME = "changenumberindex"; |
| | | |
| | | private static final String DOMAINS_STATE_FILENAME = "domains.state"; |
| | | |
| | | static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state"; |
| | | |
| | | private static final String DOMAIN_STATE_SEPARATOR = ":"; |
| | | |
| | | private static final String DOMAIN_SUFFIX = ".domain"; |
| | | |
| | | private static final String SERVER_ID_SUFFIX = ".server"; |
| | | |
| | | private static final String GENERATION_ID_FILE_PREFIX = "generation"; |
| | | |
| | | private static final String GENERATION_ID_FILE_SUFFIX = ".id"; |
| | | |
| | | private static final String UTF8_ENCODING = "UTF-8"; |
| | | |
| | | private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | | public boolean accept(File file) |
| | | { |
| | | return file.isDirectory() && file.getName().endsWith(DOMAIN_SUFFIX); |
| | | } |
| | | }; |
| | | |
| | | private static final FileFilter SERVER_ID_FILE_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | | public boolean accept(File file) |
| | | { |
| | | return file.isDirectory() && file.getName().endsWith(SERVER_ID_SUFFIX); |
| | | } |
| | | }; |
| | | |
| | | private static final FileFilter GENERATION_ID_FILE_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | | public boolean accept(File file) |
| | | { |
| | | return file.isFile() |
| | | && file.getName().startsWith(GENERATION_ID_FILE_PREFIX) |
| | | && file.getName().endsWith(GENERATION_ID_FILE_SUFFIX); |
| | | } |
| | | }; |
| | | |
| | | /** Root path where the replication log is stored. */ |
| | | private final String replicationRootPath; |
| | | /** |
| | | * The current changelogState. This is in-memory version of what is inside the |
| | | * on-disk changelogStateDB. It improves performances in case the |
| | | * changelogState is read often. |
| | | * |
| | | * @GuardedBy("domainsLock") |
| | | */ |
| | | private final ChangelogState changelogState; |
| | | |
| | | /** The list of logs that are in use. */ |
| | | private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>(); |
| | | |
| | | /** |
| | | * Maps each domain DN to a domain id that is used to name directory in file system. |
| | | * |
| | | * @GuardedBy("domainsLock") |
| | | */ |
| | | private final Map<DN, String> domains = new HashMap<DN, String>(); |
| | | |
| | | /** |
| | | * Exclusive lock to synchronize: |
| | | * <ul> |
| | | * <li>the domains mapping</li> |
| | | * <li>changes to the in-memory changelogState</li> |
| | | * <li>changes to the on-disk state of a domain</li> |
| | | */ |
| | | private final Object domainsLock = new Object(); |
| | | |
| | | /** The underlying replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | | |
| | | /** |
| | | * Creates the replication environment. |
| | | * |
| | | * @param rootPath |
| | | * Root path where replication log is stored. |
| | | * @param replicationServer |
| | | * The underlying replication server. |
| | | * @throws ChangelogException |
| | | * If an error occurs during initialization. |
| | | */ |
| | | ReplicationEnvironment(final String rootPath, |
| | | final ReplicationServer replicationServer) throws ChangelogException |
| | | { |
| | | this.replicationRootPath = rootPath; |
| | | this.replicationServer = replicationServer; |
| | | this.changelogState = readOnDiskChangelogState(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the state of the replication changelog. |
| | | * |
| | | * @return the {@link ChangelogState} read from the changelogState DB |
| | | * @throws ChangelogException |
| | | * if a database problem occurs |
| | | */ |
| | | ChangelogState readOnDiskChangelogState() throws ChangelogException |
| | | { |
| | | final ChangelogState state = new ChangelogState(); |
| | | final File changelogPath = new File(replicationRootPath); |
| | | synchronized (domainsLock) |
| | | { |
| | | readDomainsStateFile(); |
| | | checkDomainDirectories(changelogPath); |
| | | for (final Entry<DN, String> domainEntry : domains.entrySet()) |
| | | { |
| | | readStateForDomain(domainEntry, state); |
| | | } |
| | | } |
| | | return state; |
| | | } |
| | | |
| | | /** |
| | | * Returns the current state of the replication changelog. |
| | | * |
| | | * @return the current {@link ChangelogState} |
| | | */ |
| | | ChangelogState getChangelogState() |
| | | { |
| | | return changelogState; |
| | | } |
| | | |
| | | /** |
| | | * Finds or creates the log used to store changes from the replication server |
| | | * with the given serverId and the given baseDN. |
| | | * |
| | | * @param domainDN |
| | | * The DN that identifies the domain. |
| | | * @param serverId |
| | | * The server id that identifies the server. |
| | | * @param generationId |
| | | * The generationId associated to this domain. |
| | | * @return the log. |
| | | * @throws ChangelogException |
| | | * if an error occurs. |
| | | */ |
| | | Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId) |
| | | throws ChangelogException |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace("ReplicationEnvironment.getOrCreateReplicaDB(%s, %s, %s)", domainDN, serverId, generationId); |
| | | } |
| | | |
| | | try |
| | | { |
| | | ensureRootDirectoryExists(); |
| | | |
| | | String domainId = null; |
| | | synchronized (domainsLock) |
| | | { |
| | | domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | domainId = createDomainId(domainDN); |
| | | } |
| | | |
| | | final File serverIdPath = getServerIdPath(domainId, serverId); |
| | | ensureServerIdDirectoryExists(serverIdPath); |
| | | changelogState.addServerIdToDomain(serverId, domainDN); |
| | | |
| | | final File generationIdPath = getGenerationIdPath(domainId, generationId); |
| | | ensureGenerationIdFileExists(generationIdPath); |
| | | changelogState.setDomainGenerationId(domainDN, generationId); |
| | | |
| | | return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_CREATE_REPLICA_DB.get(domainDN.toString(), serverId, generationId), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Find or create the log to manage integer change number associated to |
| | | * multidomain server state. |
| | | * <p> |
| | | * TODO: ECL how to manage compatibility of this db |
| | | * with new domains added or removed ? |
| | | * |
| | | * @return the log. |
| | | * @throws ChangelogException |
| | | * when a problem occurs. |
| | | */ |
| | | Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException |
| | | { |
| | | final File path = getCNIndexDBPath(); |
| | | try |
| | | { |
| | | return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_CREATE_CN_INDEX_DB.get(replicationRootPath, path.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the environment. |
| | | * <p> |
| | | * The log DBs are not closed by this method. It assumes they are already |
| | | * closed. |
| | | */ |
| | | void shutdown() |
| | | { |
| | | if (isShuttingDown.compareAndSet(false, true)) |
| | | { |
| | | logs.clear(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the generated id associated to the provided domain DN from the state |
| | | * Db. |
| | | * <p> |
| | | * If generation id can't be found, it is not considered as an error, the |
| | | * method will just return. |
| | | * |
| | | * @param domainDN |
| | | * The domain DN for which the generationID must be cleared. |
| | | * @throws ChangelogException |
| | | * If a problem occurs during clearing. |
| | | */ |
| | | void clearGenerationId(final DN domainDN) throws ChangelogException |
| | | { |
| | | synchronized (domainsLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File idFile = retrieveGenerationIdFile(getDomainPath(domainId)); |
| | | if (idFile != null) |
| | | { |
| | | final boolean isDeleted = idFile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString())); |
| | | } |
| | | } |
| | | changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reset the generationId to the default value used when there is no |
| | | * generation id. |
| | | * |
| | | * @param baseDN |
| | | * The baseDN for which the generationID must be reset. |
| | | * @throws ChangelogException |
| | | * If a problem occurs during reset. |
| | | */ |
| | | void resetGenerationId(final DN baseDN) throws ChangelogException |
| | | { |
| | | synchronized (domainsLock) |
| | | { |
| | | clearGenerationId(baseDN); |
| | | final String domainId = domains.get(baseDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID); |
| | | ensureGenerationIdFileExists(generationIdPath); |
| | | changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notify that the replica corresponding to provided domain and provided CSN |
| | | * is offline. |
| | | * |
| | | * @param domainDN |
| | | * the domain of the offline replica |
| | | * @param offlineCSN |
| | | * the offline replica serverId and offline timestamp |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException |
| | | { |
| | | synchronized (domainsLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId()); |
| | | if (!serverIdPath.exists()) |
| | | { |
| | | return; // no serverId anymore => no-op |
| | | } |
| | | final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); |
| | | Writer writer = null; |
| | | try |
| | | { |
| | | // Overwrite file, only the last sent offline CSN is kept |
| | | writer = newFileWriter(offlineFile); |
| | | writer.write(offlineCSN.toString()); |
| | | changelogState.addOfflineReplica(domainDN, offlineCSN); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notify that the replica corresponding to provided domain and server id |
| | | * is online. |
| | | * |
| | | * @param domainDN |
| | | * the domain of the replica |
| | | * @param serverId |
| | | * the replica serverId |
| | | * @throws ChangelogException |
| | | * if a problem occurs |
| | | */ |
| | | void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException |
| | | { |
| | | synchronized (domainsLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknown domain => no-op |
| | | } |
| | | final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | | { |
| | | final boolean isDeleted = offlineFile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get( |
| | | offlineFile.getPath(), domainDN.toString(), serverId)); |
| | | } |
| | | } |
| | | changelogState.removeOfflineReplica(domainDN, serverId); |
| | | } |
| | | } |
| | | |
| | | /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */ |
| | | private void readDomainsStateFile() throws ChangelogException |
| | | { |
| | | final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME); |
| | | if (domainsStateFile.exists()) |
| | | { |
| | | BufferedReader reader = null; |
| | | String line = null; |
| | | try |
| | | { |
| | | reader = newFileReader(domainsStateFile); |
| | | while ((line = reader.readLine()) != null) |
| | | { |
| | | final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR); |
| | | final String domainId = line.substring(0, separatorPos); |
| | | final DN domainDN = DN.valueOf(line.substring(separatorPos+1)); |
| | | domains.put(domainDN, domainId); |
| | | } |
| | | } |
| | | catch(DirectoryException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE.get( |
| | | domainsStateFile.getPath(), line), e); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE.get( |
| | | domainsStateFile.getPath()), e); |
| | | } |
| | | finally { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Checks that domain directories in file system are consistent with |
| | | * information from domains mapping. |
| | | */ |
| | | private void checkDomainDirectories(final File changelogPath) throws ChangelogException |
| | | { |
| | | final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER); |
| | | if (dnDirectories != null) |
| | | { |
| | | final Set<String> domainIdsFromFileSystem = new HashSet<String>(); |
| | | for (final File dnDir : dnDirectories) |
| | | { |
| | | final String fileName = dnDir.getName(); |
| | | final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length()); |
| | | domainIdsFromFileSystem.add(domainId); |
| | | } |
| | | |
| | | final Set<String> expectedDomainIds = new HashSet<String>(domains.values()); |
| | | if (!domainIdsFromFileSystem.equals(expectedDomainIds)) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(), |
| | | domainIdsFromFileSystem.toString())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the changelog state with the state corresponding to the provided |
| | | * domain DN. |
| | | */ |
| | | private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state) |
| | | throws ChangelogException |
| | | { |
| | | final File domainDirectory = getDomainPath(domainEntry.getValue()); |
| | | final DN domainDN = domainEntry.getKey(); |
| | | final String generationId = retrieveGenerationId(domainDirectory); |
| | | if (generationId != null) |
| | | { |
| | | state.setDomainGenerationId(domainDN, toGenerationId(generationId)); |
| | | } |
| | | |
| | | final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER); |
| | | if (serverIds == null) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY.get( |
| | | replicationRootPath, domainDirectory.getPath())); |
| | | } |
| | | for (final File serverId : serverIds) |
| | | { |
| | | readStateForServerId(domainDN, serverId, state); |
| | | } |
| | | } |
| | | |
| | | private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException |
| | | { |
| | | state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN); |
| | | |
| | | final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | | { |
| | | final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN); |
| | | state.addOfflineReplica(domainDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException |
| | | { |
| | | BufferedReader reader = null; |
| | | try |
| | | { |
| | | reader = newFileReader(offlineFile); |
| | | String line = reader.readLine(); |
| | | if (line == null || reader.readLine() != null) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineFile.getPath())); |
| | | } |
| | | return new CSN(line); |
| | | } |
| | | catch(IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get( |
| | | domainDN.toString(), offlineFile.getPath()), e); |
| | | } |
| | | finally { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | private String createDomainId(final DN domainDN) throws ChangelogException |
| | | { |
| | | final String nextDomainId = findNextDomainId(); |
| | | domains.put(domainDN, nextDomainId); |
| | | final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME); |
| | | Writer writer = null; |
| | | try |
| | | { |
| | | writer = newFileWriter(domainsStateFile); |
| | | for (final Entry<DN, String> entry : domains.entrySet()) |
| | | { |
| | | writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey())); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId, |
| | | domainDN.toString(), domainsStateFile.getPath()), e); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writer); |
| | | } |
| | | return nextDomainId; |
| | | } |
| | | |
| | | /** Find the next domain id to use. This is the lowest integer that is higher than all existing ids. */ |
| | | private String findNextDomainId() |
| | | { |
| | | int nextId = 1; |
| | | for (final String domainId : domains.values()) |
| | | { |
| | | final Integer id = Integer.valueOf(domainId); |
| | | if (nextId <= id) |
| | | { |
| | | nextId = id + 1; |
| | | } |
| | | } |
| | | return String.valueOf(nextId); |
| | | } |
| | | |
| | | /** Open a log from the provided path and record parser. */ |
| | | private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser) |
| | | throws ChangelogException |
| | | { |
| | | checkShutDownBeforeOpening(serverIdPath); |
| | | |
| | | final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES); |
| | | |
| | | checkShutDownAfterOpening(serverIdPath, log); |
| | | |
| | | logs.add(log); |
| | | return log; |
| | | } |
| | | |
| | | private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException |
| | | { |
| | | if (isShuttingDown.get()) |
| | | { |
| | | closeLog(log); |
| | | throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(), |
| | | replicationServer.getServerId())); |
| | | } |
| | | } |
| | | |
| | | private void checkShutDownBeforeOpening(final File serverIdPath) throws ChangelogException |
| | | { |
| | | if (isShuttingDown.get()) |
| | | { |
| | | throw new ChangelogException( |
| | | WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get( |
| | | serverIdPath.getPath(), replicationServer.getServerId())); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Retrieve the generation id from the provided directory. |
| | | * |
| | | * @return the generation id or {@code null} if the corresponding file can't |
| | | * be found |
| | | */ |
| | | private String retrieveGenerationId(final File directory) |
| | | { |
| | | final File generationId = retrieveGenerationIdFile(directory); |
| | | if (generationId != null) |
| | | { |
| | | String filename = generationId.getName(); |
| | | return filename.substring(GENERATION_ID_FILE_PREFIX.length(), |
| | | filename.length() - GENERATION_ID_FILE_SUFFIX.length()); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Retrieve the file named after the generation id from the provided |
| | | * directory. |
| | | * |
| | | * @return the generation id file or {@code null} if the corresponding file |
| | | * can't be found |
| | | */ |
| | | private File retrieveGenerationIdFile(final File directory) |
| | | { |
| | | File[] generationIds = directory.listFiles(GENERATION_ID_FILE_FILTER); |
| | | return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null; |
| | | } |
| | | |
| | | private File getDomainPath(final String domainId) |
| | | { |
| | | return new File(replicationRootPath, domainId + DOMAIN_SUFFIX); |
| | | } |
| | | |
| | | /** |
| | | * Return the path for the provided domain id and server id. |
| | | * Package private to be usable in tests. |
| | | * |
| | | * @param domainId |
| | | * The id corresponding to a domain DN |
| | | * @param serverId |
| | | * The server id to retrieve |
| | | * @return the path |
| | | */ |
| | | File getServerIdPath(final String domainId, final int serverId) |
| | | { |
| | | return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX); |
| | | } |
| | | |
| | | private File getGenerationIdPath(final String domainId, final long generationId) |
| | | { |
| | | return new File(getDomainPath(domainId), GENERATION_ID_FILE_PREFIX + generationId + GENERATION_ID_FILE_SUFFIX); |
| | | } |
| | | |
| | | private File getCNIndexDBPath() |
| | | { |
| | | return new File(replicationRootPath, CN_INDEX_DB_DIRNAME); |
| | | } |
| | | |
| | | private void closeLog(final Log<?, ?> log) |
| | | { |
| | | logs.remove(log); |
| | | log.close(); |
| | | } |
| | | |
| | | private void ensureRootDirectoryExists() throws ChangelogException |
| | | { |
| | | final File rootDir = new File(replicationRootPath); |
| | | if (!rootDir.exists()) |
| | | { |
| | | final boolean created = rootDir.mkdirs(); |
| | | if (!created) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(replicationRootPath)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void ensureServerIdDirectoryExists(final File serverIdPath) throws ChangelogException |
| | | { |
| | | if (!serverIdPath.exists()) |
| | | { |
| | | boolean created = false; |
| | | try |
| | | { |
| | | created = serverIdPath.mkdirs(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | if (!created) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_CREATE_SERVER_ID_DIRECTORY.get(serverIdPath.getPath(), 0)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void ensureGenerationIdFileExists(final File generationIdPath) |
| | | throws ChangelogException |
| | | { |
| | | if (!generationIdPath.exists()) |
| | | { |
| | | try |
| | | { |
| | | boolean isCreated = generationIdPath.createNewFile(); |
| | | if (!isCreated) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath())); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void debug(String message) |
| | | { |
| | | // Replication server may be null when testing |
| | | String monitorInstanceName = replicationServer != null ? replicationServer.getMonitorInstanceName() : |
| | | "no monitor [test]"; |
| | | logger.trace("In %s, %s", monitorInstanceName, message); |
| | | } |
| | | |
| | | private int toServerId(final String serverIdName) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | String serverId = serverIdName.substring(0, serverIdName.length() - SERVER_ID_SUFFIX.length()); |
| | | return Integer.parseInt(serverId); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | throw new ChangelogException(ERR_CHANGELOG_SERVER_ID_FILENAME_WRONG_FORMAT.get(serverIdName), e); |
| | | } |
| | | } |
| | | |
| | | private long toGenerationId(final String data) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return Long.parseLong(data); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e); |
| | | } |
| | | } |
| | | |
| | | /** Returns a buffered writer on the provided file. */ |
| | | private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException |
| | | { |
| | | return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING)); |
| | | } |
| | | |
| | | /** Returns a buffered reader on the provided file. */ |
| | | private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException |
| | | { |
| | | return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING)); |
| | | } |
| | | } |
| | |
| | | * @param changelogState |
| | | * the changelog state used for initialization |
| | | */ |
| | | ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) |
| | | public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) |
| | | { |
| | | this(changelogDB, changelogState, new ECLEnabledDomainPredicate()); |
| | | } |