OPENDJ-1389 – Add support for replication changelog DB rotation
Implemented a log based on multiple log files
* Add Log class that manage a log as a set of log files:
** it contains at least one log file, the head log file,
where new records are appended
** it contains from zero to multiple read-only log files,
issued from rotation of the head log file when it reaches
a given size
* Update LogFile class to act as part of a Log.
* Add purging feature
* Update other classes from file package with minor changes
* Add unit tests for new Log class and purging feature
* Update MeteredStream class to use it in changelog
* Add new interface RotatableLogFile (to be used later by changelog)
* Update Policy classes to use this new interface
3 files added
24 files modified
| | |
| | | SEVERE_ERR_BAD_HISTORICAL_56=Entry %s was containing some unknown historical \ |
| | | information, This may cause some inconsistency for this entry |
| | | MILD_ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE_57=A conflict was detected but the \ |
| | | conflict information could not be added. Operation: %s, Result: %s |
| | | conflict information could not be added. Operation: %s, Result: %s |
| | | MILD_ERR_CANNOT_RENAME_CONFLICT_ENTRY_58=An error happened trying to \ |
| | | rename a conflicting entry. DN: %s, Operation: %s, Result: %s |
| | | MILD_ERR_EXCEPTION_RENAME_CONFLICT_ENTRY_59=An Exception happened when \ |
| | |
| | | on log file '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD_254=Could not decode a record from data \ |
| | | read in log file '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file(s): '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE_256=Could not create log file '%s' |
| | | SEVERE_WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE_257=The changelog '%s' has been opened in \ |
| | | read-only mode, it is not enabled for write |
| | |
| | | Actual domain ids found in file system: '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE_265=Could not create a new domain \ |
| | | id %s for domain DN %s and save it in domain state file '%s" |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE_266=Could not get reader \ |
| | | position for cursor in log file '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING_267=Could not decode the key from \ |
| | | string [%s] |
| | | SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG_268=When cleaning log '%s', \ |
| | | found %d cursor(s) still opened on the log |
| | | SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG_269=When closing log '%s', \ |
| | | found %d cursor(s) still opened on the log |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG_270=Could not initialize \ |
| | | the log '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE_271=Could not \ |
| | | retrieve key bounds from log file '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST_272=Could not \ |
| | | retrieve read-only log files from log '%s' |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING_273=While purging log, could not \ |
| | | delete log file(s): '%s' |
| | | SEVERE_ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING_274 =The following log \ |
| | | '%s' must be released but it is not referenced." |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \ |
| | | head log file from '%s' to '%s' |
| | | INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \ |
| | | size of head log file is %d bytes |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Portions Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.loggers; |
| | | import org.opends.messages.Message; |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean rotateFile(MultifileTextWriter writer) |
| | | public boolean rotateFile(RotatableLogFile writer) |
| | | { |
| | | Calendar lastRotationTime = writer.getLastRotationTime(); |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Portions Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.loggers; |
| | | |
| | |
| | | * (a) forwards all its output to a target stream |
| | | * (b) keeps track of how many bytes have been written. |
| | | */ |
| | | class MeteredStream extends OutputStream |
| | | public final class MeteredStream extends OutputStream |
| | | { |
| | | OutputStream out; |
| | | long written; |
| | |
| | | * @param out The target output stream to keep track of. |
| | | * @param written The number of bytes written to the stream. |
| | | */ |
| | | MeteredStream(OutputStream out, long written) |
| | | public MeteredStream(OutputStream out, long written) |
| | | { |
| | | this.out = out; |
| | | this.written = written; |
| | |
| | | { |
| | | out.close(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of bytes written in this stream. |
| | | * |
| | | * @return the number of bytes |
| | | */ |
| | | public long getBytesWritten() |
| | | { |
| | | return written; |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Portions Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.loggers; |
| | | |
| | |
| | | * new one named in accordance with a specified FileNamingPolicy. |
| | | */ |
| | | public class MultifileTextWriter |
| | | implements ServerShutdownListener, TextWriter, |
| | | implements ServerShutdownListener, TextWriter, RotatableLogFile, |
| | | ConfigurationChangeListener<SizeLimitLogRotationPolicyCfg> |
| | | { |
| | | /** |
| | |
| | | outputStream = new MeteredStream(stream, file.length()); |
| | | |
| | | OutputStreamWriter osw = new OutputStreamWriter(outputStream, encoding); |
| | | BufferedWriter bw = null; |
| | | if(bufferSize <= 0) |
| | | { |
| | | writer = new BufferedWriter(osw); |
| | |
| | | this.actions = actions; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the number of bytes written to the current log file. |
| | | * |
| | | * @return The number of bytes written to the current log file. |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getBytesWritten() |
| | | { |
| | | return outputStream.written; |
| | |
| | | return lastCleanCount; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the last time a log file was rotated in this instance of |
| | | * Directory Server. If a log rotation never |
| | | * occurred, this value will be the time the server started. |
| | | * |
| | | * @return The last time log rotation occurred. |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Calendar getLastRotationTime() |
| | | { |
| | | return lastRotationTime; |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.loggers; |
| | | |
| | | import java.util.Calendar; |
| | | |
| | | /** |
| | | * Represents a file that can be rotated based on size or on time. |
| | | */ |
| | | public interface RotatableLogFile |
| | | { |
| | | |
| | | /** |
| | | * Retrieves the number of bytes written to the file. |
| | | * |
| | | * @return The number of bytes written to the file. |
| | | */ |
| | | long getBytesWritten(); |
| | | |
| | | /** |
| | | * Retrieves the last time the file was rotated. If a file rotation never |
| | | * occurred, this value will be the time the server started. |
| | | * |
| | | * @return The last time file rotation occurred. |
| | | */ |
| | | Calendar getLastRotationTime(); |
| | | |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Portions Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.loggers; |
| | | |
| | |
| | | |
| | | |
| | | /** |
| | | * This method indicates if the log file should be |
| | | * rotated or not. |
| | | * This method indicates if the log file should be rotated or not. |
| | | * |
| | | * @param writer The multi file writer writing the file to be |
| | | * checked. |
| | | * @param writer |
| | | * the file writer to be checked. |
| | | * @return true if the log file should be rotated, false otherwise. |
| | | */ |
| | | public boolean rotateFile(MultifileTextWriter writer); |
| | | public boolean rotateFile(RotatableLogFile writer); |
| | | |
| | | |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Portions Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.loggers; |
| | | import org.opends.messages.Message; |
| | |
| | | * @param writer The multi file text writer writing the log file. |
| | | * @return true if the file needs to be rotated, false otherwise. |
| | | */ |
| | | public boolean rotateFile(MultifileTextWriter writer) |
| | | public boolean rotateFile(RotatableLogFile writer) |
| | | { |
| | | long fileSize = writer.getBytesWritten(); |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Portions Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.loggers; |
| | | import org.opends.messages.Message; |
| | |
| | | * @param writer The mutli file text writer written the log file. |
| | | * @return true if the file should be rotated, false otherwise. |
| | | */ |
| | | public boolean rotateFile(MultifileTextWriter writer) |
| | | public boolean rotateFile(RotatableLogFile writer) |
| | | { |
| | | long currInterval = TimeThread.getTime() - |
| | | writer.getLastRotationTime().getTimeInMillis(); |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2013 ForgeRock AS. |
| | | * Portions Copyright 2013-2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | |
| | | */ |
| | | public static final int STRING_ENCODING_LENGTH = 28; |
| | | |
| | | /** The maximum possible value for a CSN. */ |
| | | public static final CSN MAX_CSN_VALUE = new CSN(Long.MAX_VALUE, Integer.MAX_VALUE, Short.MAX_VALUE); |
| | | |
| | | private static final long serialVersionUID = -8802722277749190740L; |
| | | private final long timeStamp; |
| | | /** |
| | |
| | | import java.util.List; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | /** |
| | | * Logfile-based implementation of a ChangeNumberIndexDB. |
| | | * Implementation of a ChangeNumberIndexDB with a log. |
| | | * <p> |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | |
| | | static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser(); |
| | | |
| | | /** The log in which records are persisted. */ |
| | | private final LogFile<Long, ChangeNumberIndexRecord> logFile; |
| | | private final Log<Long, ChangeNumberIndexRecord> log; |
| | | |
| | | /** |
| | | * The newest changenumber stored in the DB. It is used to avoid purging the |
| | |
| | | */ |
| | | FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | { |
| | | logFile = replicationEnv.getOrCreateCNIndexDB(); |
| | | log = replicationEnv.getOrCreateCNIndexDB(); |
| | | final ChangeNumberIndexRecord newestRecord = readLastRecord(); |
| | | newestChangeNumber = getChangeNumber(newestRecord); |
| | | // initialization of the lastGeneratedChangeNumber from the DB content |
| | |
| | | |
| | | private ChangeNumberIndexRecord readLastRecord() throws ChangelogException |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord(); |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord(); |
| | | return record == null ? null : record.getValue(); |
| | | } |
| | | |
| | | private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord(); |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord(); |
| | | return record == null ? null : record.getValue(); |
| | | } |
| | | |
| | |
| | | final long changeNumber = nextChangeNumber(); |
| | | final ChangeNumberIndexRecord newRecord = |
| | | new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN()); |
| | | logFile.addRecord(newRecord.getChangeNumber(), newRecord); |
| | | log.append(Record.from(newRecord.getChangeNumber(), newRecord)); |
| | | newestChangeNumber = changeNumber; |
| | | |
| | | if (debugEnabled()) |
| | |
| | | */ |
| | | long count() throws ChangelogException |
| | | { |
| | | return logFile.getNumberOfRecords(); |
| | | return log.getNumberOfRecords(); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException |
| | | { |
| | | return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber)); |
| | | return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber)); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | | logFile.close(); |
| | | log.close(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | } |
| | |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | // TODO : no purge implemented yet as implementation is based on a single-file log. |
| | | // The purge must be implemented once we handle a log with multiple files. |
| | | // The purge will only delete whole files. |
| | | return null; |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime()); |
| | | return record != null ? record.getValue().getCSN() : null; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | logFile.clear(); |
| | | log.clear(); |
| | | newestChangeNumber = NO_KEY; |
| | | } |
| | | |
| | |
| | | private static final byte STRING_SEPARATOR = 0; |
| | | |
| | | @Override |
| | | public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record) |
| | | public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record) |
| | | { |
| | | final ChangeNumberIndexRecord cnIndexRecord = record.getValue(); |
| | | return new ByteStringBuilder() |
| | | .append(changeNumber) |
| | | .append(record.getPreviousCookie()) |
| | | .append(record.getKey()) |
| | | .append(cnIndexRecord.getPreviousCookie()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(record.getBaseDN().toString()) |
| | | .append(cnIndexRecord.getBaseDN().toString()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(record.getCSN().toByteString()).toByteString(); |
| | | .append(cnIndexRecord.getCSN().toByteString()).toByteString(); |
| | | } |
| | | |
| | | @Override |
| | |
| | | } |
| | | return length; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Long decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return Long.valueOf(key); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String encodeKeyToString(Long key) |
| | | { |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Long getMaxKey() |
| | | { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * Log file implementation of the ChangelogDB interface. |
| | |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new FileReplicaDBCursor(new LogFile.EmptyLogCursor<CSN, UpdateMsg>(), null); |
| | | new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null); |
| | | |
| | | /** |
| | | * Creates a new changelog DB. |
| | |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) |
| | | throws ConfigException |
| | | throws ConfigException |
| | | { |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.config = config; |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.ByteString; |
| | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | |
| | | /** The log in which records are persisted. */ |
| | | private final LogFile<CSN, UpdateMsg> logFile; |
| | | private final Log<CSN, UpdateMsg> log; |
| | | |
| | | /** |
| | | * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. |
| | |
| | | this.baseDN = baseDN; |
| | | this.replicationServer = replicationServer; |
| | | this.replicationEnv = replicationEnv; |
| | | this.logFile = createLogFile(replicationEnv); |
| | | this.log = createLog(replicationEnv); |
| | | this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN()); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | |
| | | |
| | | private CSN readOldestCSN() throws ChangelogException |
| | | { |
| | | final Record<CSN, UpdateMsg> record = logFile.getOldestRecord(); |
| | | final Record<CSN, UpdateMsg> record = log.getOldestRecord(); |
| | | return record == null ? null : record.getKey(); |
| | | } |
| | | |
| | | private CSN readNewestCSN() throws ChangelogException |
| | | { |
| | | final Record<CSN, UpdateMsg> record = logFile.getNewestRecord(); |
| | | final Record<CSN, UpdateMsg> record = log.getNewestRecord(); |
| | | return record == null ? null : record.getKey(); |
| | | } |
| | | |
| | | private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException |
| | | { |
| | | final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true); |
| | | return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId()); |
| | |
| | | ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg |
| | | .toString(), String.valueOf(baseDN), String.valueOf(serverId))); |
| | | } |
| | | logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg)); |
| | | log.append(Record.from(updateMsg.getCSN(), updateMsg)); |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateOld = limits.oldestCSN == null; |
| | |
| | | */ |
| | | DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException |
| | | { |
| | | LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN); |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN); |
| | | return new FileReplicaDBCursor(cursor, startAfterCSN); |
| | | } |
| | | |
| | |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | | logFile.close(); |
| | | log.close(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | } |
| | |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // TODO : no purge implemented yet, as we have a single-file log. |
| | | // The purge must be implemented once we handle a log with multiple files. |
| | | // The purge will only delete whole files. |
| | | final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN); |
| | | if (oldestRecord != null) |
| | | { |
| | | csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | void clear() throws ChangelogException |
| | | { |
| | | // Remove all persisted data and reset generationId to default value |
| | | logFile.clear(); |
| | | log.clear(); |
| | | replicationEnv.resetGenerationId(baseDN); |
| | | |
| | | csnLimits = new CSNLimits(null, null); |
| | |
| | | */ |
| | | long getNumberRecords() throws ChangelogException |
| | | { |
| | | return logFile.getNumberOfRecords(); |
| | | return log.getNumberOfRecords(); |
| | | } |
| | | |
| | | /** Parser of records persisted in the ReplicaDB log. */ |
| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | @Override |
| | | public ByteString encodeRecord(CSN key, UpdateMsg message) |
| | | public ByteString encodeRecord(final Record<CSN, UpdateMsg> record) |
| | | { |
| | | final UpdateMsg message = record.getValue(); |
| | | try |
| | | { |
| | | return ByteString.wrap(message.getBytes()); |
| | |
| | | throw new DecodingException(e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | return new CSN(key); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String encodeKeyToString(CSN key) |
| | | { |
| | | return key.toString(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getMaxKey() |
| | | { |
| | | return CSN.MAX_CSN_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | |
| | | /** |
| | | * A cursor on ReplicaDB. |
| | |
| | | { |
| | | |
| | | /** The underlying cursor. */ |
| | | private final LogCursor<CSN, UpdateMsg> cursor; |
| | | private final RepositionableCursor<CSN, UpdateMsg> cursor; |
| | | |
| | | /** The next record to return. */ |
| | | private Record<CSN, UpdateMsg> nextRecord; |
| | |
| | | * The CSN to use as a start point (excluded from cursor, the lowest |
| | | * CSN higher than this CSN is used as the real start point). |
| | | */ |
| | | FileReplicaDBCursor(LogCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) { |
| | | FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) { |
| | | this.cursor = cursor; |
| | | this.lastNonNullCurrentCSN = startAfterCSN; |
| | | } |
| | |
| | | else |
| | | { |
| | | // Exhausted cursor must be able to reinitialize itself |
| | | cursor.rewind(); |
| | | cursor.positionTo(lastNonNullCurrentCSN, true); |
| | | |
| | | nextRecord = cursor.getRecord(); |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.File; |
| | | import java.io.FileFilter; |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReadWriteLock; |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.loggers.ErrorLogger; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * A multi-file log that features appending key-value records and reading them |
| | | * using a {@code DBCursor}. |
| | | * The records must be appended to the log in ascending order of the keys. |
| | | * <p> |
| | | * A log is defined for a path - the log path - and contains one to many log files: |
| | | * <ul> |
| | | * <li>it has always at least one log file, the head log file, named "head.log", |
| | | * which is used to append the records.</li> |
| | | * <li>it may have from zero to many read-only log files, which are named after |
| | | * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively |
| | | * the string representation of lowest and highest key present in the log file.</li> |
| | | * </ul> |
| | | * A read-only log file is created each time the head log file has reached the |
| | | * maximum size limit. The head log file is then rotated to the read-only file and a |
| | | * new empty head log file is opened. There is no limit on the number of read-only |
| | | * files, but they can be purged. |
| | | * <p> |
| | | * A log is obtained using the {@code Log.openLog()} method and must always be |
| | | * released using the {@code close()} method. |
| | | * <p> |
| | | * Usage example: |
| | | * <pre> |
| | | * {@code |
| | | * Log<K, V> log = null; |
| | | * try |
| | | * { |
| | | * log = Log.openLog(logPath, parser, maxFileSize); |
| | | * log.append(key, value); |
| | | * DBCursor<K, V> cursor = log.getCursor(someKey); |
| | | * // use cursor, then close cursor |
| | | * } |
| | | * finally |
| | | * { |
| | | * log.close(); |
| | | * } |
| | | * } |
| | | * </pre> |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | */ |
| | | final class Log<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private static final String LOG_FILE_SUFFIX = ".log"; |
| | | |
| | | static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX; |
| | | |
| | | private static final String LOG_FILE_NAME_SEPARATOR = "_"; |
| | | |
| | | private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | | public boolean accept(File file) |
| | | { |
| | | return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) && |
| | | !file.getName().equals(HEAD_LOG_FILE_NAME); |
| | | } |
| | | }; |
| | | |
| | | /** Map that holds the unique log instance for each log path. */ |
| | | private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>(); |
| | | |
| | | /** |
| | | * The number of references on this log instance. It is incremented each time |
| | | * a log is opened on the same log path. The log is effectively closed only |
| | | * when the {@code close()} method is called and this value is at most 1. |
| | | */ |
| | | private int referenceCount; |
| | | |
| | | /** The path of directory for this log, where log files are stored. */ |
| | | private final File logPath; |
| | | |
| | | /** The parser used for encoding/decoding of records. */ |
| | | private final RecordParser<K, V> recordParser; |
| | | |
| | | /** |
| | | * Indicates if this log is closed. When the log is closed, all methods return |
| | | * immediately with no effect. |
| | | */ |
| | | private boolean isClosed; |
| | | |
| | | /** |
| | | * The log files contained in this log, ordered by key. |
| | | * <p> |
| | | * The head log file is always present and is associated with the maximum |
| | | * possible key, given by the record parser. |
| | | * <p> |
| | | * The read-only log files are associated with the highest key they contain. |
| | | */ |
| | | private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>(); |
| | | |
| | | /** |
| | | * The list of non-empty cursors opened on this log. Opened cursors may have |
| | | * to be updated when rotating the head log file. |
| | | */ |
| | | private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>(); |
| | | |
| | | /** |
| | | * A log file is rotated once it has exceeded this size limit. The log file can have |
| | | * a size much larger than this limit if the last record written has a huge size. |
| | | * |
| | | * TODO : to be replaced later by a (or a list of) configurable Rotation policy |
| | | * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>(); |
| | | */ |
| | | private final long sizeLimitPerLogFileInBytes; |
| | | |
| | | /** |
| | | * The exclusive lock used for writes and lifecycle operations on this log: |
| | | * initialize, clear, sync and close. |
| | | */ |
| | | private final Lock exclusiveLock; |
| | | |
| | | /** |
| | | * The shared lock used for reads and cursor operations on this log. |
| | | */ |
| | | private final Lock sharedLock; |
| | | |
| | | /** |
| | | * Open a log with the provided log path, record parser and maximum size per |
| | | * log file. |
| | | * <p> |
| | | * If no log exists for the provided path, a new one is created. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param logPath |
| | | * Path of the log. |
| | | * @param parser |
| | | * Parser for encoding/decoding of records. |
| | | * @param sizeLimitPerFileInBytes |
| | | * Limit in bytes before rotating the head log file of the log. |
| | | * @return a log |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath, |
| | | final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException |
| | | { |
| | | Reject.ifNull(logPath, parser); |
| | | @SuppressWarnings("unchecked") |
| | | Log<K, V> log = (Log<K, V>) logsCache.get(logPath); |
| | | if (log == null) |
| | | { |
| | | log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes); |
| | | logsCache.put(logPath, log); |
| | | } |
| | | else |
| | | { |
| | | // TODO : check that parser and size limit are compatible with the existing one, |
| | | // and issue a warning if it is not the case |
| | | log.referenceCount++; |
| | | } |
| | | return log; |
| | | } |
| | | |
| | | /** |
| | | * Release a reference to the log corresponding to provided path. The log is |
| | | * closed if this is the last reference. |
| | | */ |
| | | private static synchronized void releaseLog(final File logPath) |
| | | { |
| | | Log<?, ?> log = logsCache.get(logPath); |
| | | if (log == null) |
| | | { |
| | | // this should never happen |
| | | ErrorLogger.logError( |
| | | ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath())); |
| | | return; |
| | | } |
| | | if (log.referenceCount > 1) |
| | | { |
| | | log.referenceCount--; |
| | | } |
| | | else |
| | | { |
| | | log.doClose(); |
| | | logsCache.remove(logPath); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new log. |
| | | * |
| | | * @param logPath |
| | | * The directory path of the log. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @param sizeLimitPerFile |
| | | * Limit in bytes before rotating a log file. |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile) |
| | | throws ChangelogException |
| | | { |
| | | this.logPath = logPath; |
| | | this.recordParser = parser; |
| | | this.sizeLimitPerLogFileInBytes = sizeLimitPerFile; |
| | | this.referenceCount = 1; |
| | | |
| | | final ReadWriteLock lock = new ReentrantReadWriteLock(false); |
| | | this.exclusiveLock = lock.writeLock(); |
| | | this.sharedLock = lock.readLock(); |
| | | |
| | | createOrOpenLogFiles(); |
| | | } |
| | | |
| | | /** Create or open log files used by this log. */ |
| | | private void createOrOpenLogFiles() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | createRootDirIfNotExists(); |
| | | openHeadLogFile(); |
| | | for (final File file : getReadOnlyLogFiles()) |
| | | { |
| | | openReadOnlyLogFile(file); |
| | | } |
| | | isClosed = false; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // ensure all log files opened at this point are closed |
| | | close(); |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private File[] getReadOnlyLogFiles() throws ChangelogException |
| | | { |
| | | File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER); |
| | | if (files == null) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath())); |
| | | } |
| | | return files; |
| | | } |
| | | |
| | | private void createRootDirIfNotExists() throws ChangelogException |
| | | { |
| | | if (!logPath.exists()) |
| | | { |
| | | if (!logPath.mkdirs()) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the path of this log. |
| | | * |
| | | * @return the path of this log directory |
| | | */ |
| | | public File getPath() |
| | | { |
| | | return logPath; |
| | | } |
| | | |
| | | /** |
| | | * Add the provided record at the end of this log. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | | * to file system, it is necessary to explicitely call the |
| | | * {@code syncToFileSystem()} method. |
| | | * |
| | | * @param record |
| | | * The record to add. |
| | | * @throws ChangelogException |
| | | * If the record can't be added to the log. |
| | | */ |
| | | public void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | // If this exclusive lock happens to be a bottleneck : |
| | | // 1. use a shared lock for appending the record first |
| | | // 2. switch to an exclusive lock only if rotation is needed |
| | | // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes) |
| | | { |
| | | ErrorLogger.logError(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | |
| | | rotateHeadLogFile(); |
| | | headLogFile = getHeadLogFile(); |
| | | } |
| | | headLogFile.append(record); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Synchronize all records added with the file system, ensuring that records |
| | | * are effectively persisted. |
| | | * <p> |
| | | * After a successful call to this method, it is guaranteed that all records |
| | | * added to the log are persisted to the file system. |
| | | * |
| | | * @throws ChangelogException |
| | | * If the synchronization fails. |
| | | */ |
| | | public void syncToFileSystem() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | getHeadLogFile().syncToFileSystem(); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the first position. |
| | | * <p> |
| | | * The returned cursor initially points to record corresponding to the first |
| | | * key, that is {@code cursor.getRecord()} is equals to the record |
| | | * corresponding to the first key before any call to {@code cursor.next()} |
| | | * method. |
| | | * |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor() throws ChangelogException |
| | | { |
| | | LogCursor<K, V> cursor = null; |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | cursor.positionTo(null, false); |
| | | registerCursor(cursor); |
| | | return cursor; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | StaticUtils.close(cursor); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the provided key. |
| | | * <p> |
| | | * The returned cursor initially points to record corresponding to the key, |
| | | * that is {@code cursor.getRecord()} is equals to the record corresponding to |
| | | * the key before any call to {@code cursor.next()} method. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, false); |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the smallest key that is higher than |
| | | * the provided key. |
| | | * <p> |
| | | * The returned cursor initially points to record corresponding to the key |
| | | * found, that is {@code cursor.getRecord()} is equals to the record |
| | | * corresponding to the key found before any call to {@code cursor.next()} |
| | | * method. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getNearestCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, true); |
| | | } |
| | | |
| | | /** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */ |
| | | private RepositionableCursor<K, V> getCursor(final K key, boolean findNearest) throws ChangelogException |
| | | { |
| | | if (key == null) |
| | | { |
| | | return getCursor(); |
| | | } |
| | | LogCursor<K, V> cursor = null; |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | final boolean isFound = cursor.positionTo(key, findNearest); |
| | | // for nearest case, it is ok if the target is not found |
| | | if (isFound || findNearest) |
| | | { |
| | | registerCursor(cursor); |
| | | return cursor; |
| | | } |
| | | else |
| | | { |
| | | StaticUtils.close(cursor); |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | StaticUtils.close(cursor); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Returns the oldest (first) record from this log. |
| | | * |
| | | * @return the oldest record, which may be {@code null} if there is no record |
| | | * in the log. |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | public Record<K, V> getOldestRecord() throws ChangelogException |
| | | { |
| | | return getOldestLogFile().getOldestRecord(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Returns the newest (last) record from this log. |
| | | * |
| | | * @return the newest record, which may be {@code null} |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | public Record<K, V> getNewestRecord() throws ChangelogException |
| | | { |
| | | return getHeadLogFile().getNewestRecord(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of records in the log. |
| | | * |
| | | * @return the number of records |
| | | * @throws ChangelogException |
| | | * If a problem occurs. |
| | | */ |
| | | public long getNumberOfRecords() throws ChangelogException |
| | | { |
| | | long count = 0; |
| | | for (final LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | count += logFile.getNumberOfRecords(); |
| | | } |
| | | return count; |
| | | } |
| | | |
| | | /** |
| | | * Purge the log up to and excluding the provided key. |
| | | * |
| | | * @param purgeKey |
| | | * the key up to which purging must happen |
| | | * @return the oldest non purged record, or {@code null} |
| | | * if no record was purged |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | } |
| | | final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey); |
| | | if (logFilesToPurge.isEmpty()) |
| | | { |
| | | return null; |
| | | } |
| | | final List<String> undeletableFiles = new ArrayList<String>(); |
| | | final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator(); |
| | | while (entriesToPurge.hasNext()) |
| | | { |
| | | final LogFile<K, V> logFile = entriesToPurge.next().getValue(); |
| | | try |
| | | { |
| | | logFile.close(); |
| | | logFile.delete(); |
| | | entriesToPurge.remove(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // The deletion of log file on file system has failed |
| | | undeletableFiles.add(logFile.getFile().getPath()); |
| | | } |
| | | } |
| | | if (!undeletableFiles.isEmpty()) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get( |
| | | StaticUtils.listToString(undeletableFiles, ", "))); |
| | | } |
| | | return getOldestRecord(); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Empties the log, discarding all records it contains. |
| | | * |
| | | * @throws ChangelogException |
| | | * If cursors are opened on this log, or if a problem occurs during |
| | | * clearing operation. |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | if (!openCursors.isEmpty()) |
| | | { |
| | | // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because |
| | | // there is one open cursor when clearing. |
| | | // Switch to logging until this issue is solved |
| | | // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it", |
| | | // logPath.getPath(), openCursors.size())); |
| | | ErrorLogger.logError( |
| | | ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size())); |
| | | } |
| | | |
| | | // delete all log files |
| | | final List<String> undeletableFiles = new ArrayList<String>(); |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | try |
| | | { |
| | | logFile.close(); |
| | | logFile.delete(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | undeletableFiles.add(logFile.getFile().getPath()); |
| | | } |
| | | } |
| | | if (!undeletableFiles.isEmpty()) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get( |
| | | StaticUtils.listToString(undeletableFiles, ", "))); |
| | | } |
| | | logFiles.clear(); |
| | | |
| | | // recreate an empty head log file |
| | | openHeadLogFile(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | releaseLog(logPath); |
| | | } |
| | | |
| | | /** Effectively close this log. */ |
| | | private void doClose() |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | if (!openCursors.isEmpty()) |
| | | { |
| | | ErrorLogger.logError( |
| | | ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size())); |
| | | } |
| | | StaticUtils.close(logFiles.values()); |
| | | isClosed = true; |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private LogFile<K, V> getHeadLogFile() |
| | | { |
| | | return logFiles.lastEntry().getValue(); |
| | | } |
| | | |
| | | private LogFile<K, V> getOldestLogFile() |
| | | { |
| | | return logFiles.firstEntry().getValue(); |
| | | } |
| | | |
| | | /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */ |
| | | private void rotateHeadLogFile() throws ChangelogException |
| | | { |
| | | final LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile)); |
| | | headLogFile.close(); |
| | | renameHeadLogFileTo(readOnlyLogFile); |
| | | |
| | | openHeadLogFile(); |
| | | openReadOnlyLogFile(readOnlyLogFile); |
| | | updateCursorsOpenedOnHead(); |
| | | } |
| | | |
| | | private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException |
| | | { |
| | | final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME); |
| | | try |
| | | { |
| | | StaticUtils.renameFile(headLogFile, rotatedLogFile); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the key bounds for the provided log file. |
| | | * |
| | | * @return the pair of (lowest key, highest key) that correspond to records |
| | | * stored in the corresponding log file. |
| | | * @throws ChangelogException |
| | | * if an error occurs while retrieving the keys |
| | | */ |
| | | private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final String name = logFile.getFile().getName(); |
| | | final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length()) |
| | | .split(LOG_FILE_NAME_SEPARATOR); |
| | | return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1])); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the file name to use for the read-only version of the provided |
| | | * log file. |
| | | * <p> |
| | | * The file name is based on the lowest and highest key in the log file. |
| | | * |
| | | * @return the name to use for the read-only version of the log file |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException |
| | | { |
| | | final K lowestKey = logFile.getOldestRecord().getKey(); |
| | | final K highestKey = logFile.getNewestRecord().getKey(); |
| | | return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR |
| | | + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX; |
| | | } |
| | | |
| | | /** Update the open cursors after a rotation of the head log file. */ |
| | | private void updateCursorsOpenedOnHead() throws ChangelogException |
| | | { |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | { |
| | | final CursorState<K, V> state = cursor.getState(); |
| | | // Need to update the cursor only if it is pointing to the head log file |
| | | if (isHeadLogFile(state.logFile)) |
| | | { |
| | | updateOpenCursor(cursor, state); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the provided open cursor with the provided state. |
| | | * <p> |
| | | * The cursor must report the previous state on the head log file to the same |
| | | * state (position in file, current record) in the read-only log file just created. |
| | | */ |
| | | private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException |
| | | { |
| | | final K previousKey = logFiles.lowerKey(recordParser.getMaxKey()); |
| | | final LogFile<K, V> logFile = findLogFileFor(previousKey); |
| | | cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record)); |
| | | } |
| | | |
| | | private void openHeadLogFile() throws ChangelogException |
| | | { |
| | | logFiles.put(recordParser.getMaxKey(), |
| | | LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser)); |
| | | } |
| | | |
| | | private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException |
| | | { |
| | | final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser); |
| | | final Pair<K, K> bounds = getKeyBounds(logFile); |
| | | logFiles.put(bounds.getSecond(), logFile); |
| | | } |
| | | |
| | | private void registerCursor(final LogCursor<K, V> cursor) |
| | | { |
| | | openCursors.add(cursor); |
| | | } |
| | | |
| | | private void unregisterCursor(final LogCursor<K, V> cursor) |
| | | { |
| | | openCursors.remove(cursor); |
| | | } |
| | | |
| | | /** |
| | | * Returns the log file that is just after the provided log file wrt the order |
| | | * defined on keys, or {@code null} if the provided log file is the last one |
| | | * (the head log file). |
| | | */ |
| | | private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException |
| | | { |
| | | if (isHeadLogFile(currentLogFile)) |
| | | { |
| | | return null; |
| | | } |
| | | final Pair<K, K> bounds = getKeyBounds(currentLogFile); |
| | | return logFiles.higherEntry(bounds.getSecond()).getValue(); |
| | | } |
| | | |
| | | private boolean isHeadLogFile(final LogFile<K, V> logFile) |
| | | { |
| | | return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME); |
| | | } |
| | | |
| | | /** Returns the log file that should contain the provided key. */ |
| | | private LogFile<K, V> findLogFileFor(final K key) |
| | | { |
| | | if (key == null || logFiles.lowerKey(key) == null) |
| | | { |
| | | return getOldestLogFile(); |
| | | } |
| | | return logFiles.ceilingEntry(key).getValue(); |
| | | } |
| | | |
| | | /** |
| | | * Represents a cursor than can be repositioned on a given key. |
| | | */ |
| | | static interface RepositionableCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>> |
| | | { |
| | | /** |
| | | * Position the cursor to the record corresponding to the provided key or to |
| | | * the nearest key (the lowest key higher than the provided key). |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, use the key of the first record instead. |
| | | * @param findNearest |
| | | * If {@code true}, start position is the lowest key that is higher |
| | | * than the provided key, otherwise start position is the provided |
| | | * key. |
| | | * @return {@code true} if cursor is successfully positionned to the key or |
| | | * the nearest key, {@code false} otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs when positioning cursor. |
| | | */ |
| | | boolean positionTo(K key, boolean findNearest) throws ChangelogException; |
| | | } |
| | | |
| | | /** |
| | | * Implements a cursor on the log. |
| | | * <p> |
| | | * The cursor initially points to a record, that is {@code cursor.getRecord()} |
| | | * is equals to the first record available from the cursor before any call to |
| | | * {@code cursor.next()} method. |
| | | * <p> |
| | | * The cursor uses the log shared lock to ensure reads are not done during a rotation. |
| | | */ |
| | | private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V> |
| | | { |
| | | private final Log<K, V> log; |
| | | |
| | | private LogFile<K, V> currentLogFile; |
| | | private LogFileCursor<K, V> currentCursor; |
| | | |
| | | /** |
| | | * Creates a cursor on the provided log. |
| | | * |
| | | * @param log |
| | | * The log on which the cursor read records. |
| | | * @throws ChangelogException |
| | | * If an error occurs when creating the cursor. |
| | | */ |
| | | private LogCursor(final Log<K, V> log) throws ChangelogException |
| | | { |
| | | this.log = log; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K, V> getRecord() |
| | | { |
| | | return currentCursor.getRecord(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | final boolean hasNext = currentCursor.next(); |
| | | if (hasNext) |
| | | { |
| | | return true; |
| | | } |
| | | final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile); |
| | | if (logFile != null) |
| | | { |
| | | switchToLogFile(logFile); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | log.unregisterCursor(this); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | final LogFile<K, V> logFile = log.findLogFileFor(key); |
| | | if (logFile != currentLogFile) |
| | | { |
| | | switchToLogFile(logFile); |
| | | } |
| | | if (key != null) |
| | | { |
| | | boolean isFound = currentCursor.positionTo(key, findNearest); |
| | | if (isFound && getRecord() == null) |
| | | { |
| | | // The key to position to may be in the next file, force the switch |
| | | isFound = next(); |
| | | } |
| | | return isFound; |
| | | } |
| | | return true; |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** Returns the state of this cursor. */ |
| | | private CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()); |
| | | } |
| | | |
| | | /** Reinitialize this cursor to the provided state. */ |
| | | private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | currentLogFile = cursorState.logFile; |
| | | currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition); |
| | | } |
| | | |
| | | /** Switch the cursor to the provided log file. */ |
| | | private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | currentLogFile = logFile; |
| | | currentCursor = currentLogFile.getCursor(); |
| | | } |
| | | } |
| | | |
| | | /** An empty cursor, that always return null records and false to {@code next()} method. */ |
| | | static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V> |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "EmptyLogCursor"; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Represents the state of a cursor. |
| | | * <p> |
| | | * The state is used to update a cursor when rotating the head log file : the |
| | | * state of cursor on head log file must be reported to the new read-only log |
| | | * file that is created when rotating. |
| | | */ |
| | | private static class CursorState<K extends Comparable<K>, V> |
| | | { |
| | | /** The log file. */ |
| | | private final LogFile<K, V> logFile; |
| | | |
| | | /** |
| | | * The position of the reader on the log file. It is the offset from the |
| | | * beginning of the file, in bytes, at which the next read occurs. |
| | | */ |
| | | private final long filePosition; |
| | | |
| | | /** The record the cursor is pointing to. */ |
| | | private final Record<K,V> record; |
| | | |
| | | private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record) |
| | | { |
| | | this.logFile = logFile; |
| | | this.filePosition = filePosition; |
| | | this.record = record; |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.BufferedWriter; |
| | | import java.io.Closeable; |
| | |
| | | import java.io.FileWriter; |
| | | import java.io.IOException; |
| | | import java.io.RandomAccessFile; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReadWriteLock; |
| | | import java.util.concurrent.locks.ReentrantReadWriteLock; |
| | | |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * A file-based log that allow to append key-value records and |
| | | * read them using a {@code DBCursor}. |
| | | * A log file, containing part of a {@code Log}. The log file may be: |
| | | * <ul> |
| | | * <li>write-enabled : allowing to append key-value records and read records |
| | | * from cursors,</li> |
| | | * <li>read-only : allowing to read records from cursors.</li> |
| | | * </ul> |
| | | * <p> |
| | | * A log file is NOT intended to be used directly, but only has part of a |
| | | * {@code Log}. In particular, there is no concurrency management and no checks |
| | | * to ensure that log is not closed when performing any operation on it. Those |
| | | * are managed at the {@code Log} level. |
| | | * |
| | | * @param <K> |
| | | * Type of the key of a record, which must be comparable. |
| | |
| | | */ |
| | | final class LogFile<K extends Comparable<K>, V> implements Closeable |
| | | { |
| | | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | // Non private for use in tests |
| | | static final String LOG_FILE_NAME = "current.log"; |
| | | |
| | | /** The path of directory that contains the log file. */ |
| | | private final File rootPath; |
| | | |
| | | /** The log file containing the records. */ |
| | | /** The file containing the records. */ |
| | | private final File logfile; |
| | | |
| | | /** The parser of records, to convert bytes to record and record to bytes. */ |
| | |
| | | /** The pool to obtain a reader on the log. */ |
| | | private LogReaderPool readerPool; |
| | | |
| | | /** The writer on the log, which may be {@code null} if log is not write-enabled */ |
| | | /** |
| | | * The writer on the log file, which may be {@code null} if log file is not |
| | | * write-enabled |
| | | */ |
| | | private LogWriter writer; |
| | | |
| | | /** Indicates if log is enabled for write. */ |
| | | private final boolean isWriteEnabled; |
| | | |
| | | /** Indicates if the log is closed. */ |
| | | private volatile boolean isClosed; |
| | | |
| | | /** The exclusive lock used for wide changes on this log file : init, clear, sync and close. */ |
| | | private final Lock exclusiveLock; |
| | | |
| | | /** |
| | | * The shared lock used for read, write and flush operations on this log file. |
| | | * Write and flush operations can be shared because they're synchronized in the underlying writer. |
| | | * Reads can be done safely when writing because partially written records are handled. |
| | | */ |
| | | private final Lock sharedLock; |
| | | |
| | | /** |
| | | * Creates a new log file. |
| | | * |
| | | * @param rootPath |
| | | * Path of root directory that contains the log file. |
| | | * @param logFilePath |
| | | * Path of the log file. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @param isWriteEnabled |
| | |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | private LogFile(final File rootPath, final RecordParser<K, V> parser, boolean isWriteEnabled) |
| | | private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled) |
| | | throws ChangelogException |
| | | { |
| | | this.rootPath = rootPath; |
| | | Reject.ifNull(logFilePath, parser); |
| | | this.logfile = logFilePath; |
| | | this.parser = parser; |
| | | this.isWriteEnabled = isWriteEnabled; |
| | | this.logfile = new File(rootPath, LOG_FILE_NAME); |
| | | |
| | | final ReadWriteLock lock = new ReentrantReadWriteLock(false); |
| | | this.exclusiveLock = lock.writeLock(); |
| | | this.sharedLock = lock.readLock(); |
| | | |
| | | initialize(); |
| | | } |
| | |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param rootPath |
| | | * Path of root directory that contains the log file. |
| | | * @param logFilePath |
| | | * Path of the log file. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @return a read-only log file |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | public static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File rootPath, |
| | | static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath, |
| | | final RecordParser<K, V> parser) throws ChangelogException |
| | | { |
| | | return new LogFile<K, V>(rootPath, parser, false); |
| | | return new LogFile<K, V>(logFilePath, parser, false); |
| | | } |
| | | |
| | | /** |
| | |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param rootPath |
| | | * Path of root directory that contains the log file. |
| | | * @param logFilePath |
| | | * Path of the log file. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @return a write-enabled log file |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | public static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File rootPath, |
| | | static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath, |
| | | final RecordParser<K, V> parser) throws ChangelogException |
| | | { |
| | | return new LogFile<K, V>(rootPath, parser, true); |
| | | return new LogFile<K, V>(logFilePath, parser, true); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | private void initialize() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | createLogFileIfNotExists(); |
| | | if (isWriteEnabled) |
| | | { |
| | | createRootDirIfNotExists(); |
| | | createLogFileIfNotExists(); |
| | | isClosed = false; |
| | | if (isWriteEnabled) |
| | | { |
| | | writer = LogWriter.acquireWriter(logfile); |
| | | } |
| | | readerPool = new LogReaderPool(logfile); |
| | | writer = new LogWriter(logfile); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | readerPool = new LogReaderPool(logfile); |
| | | } |
| | | |
| | | /** |
| | | * Returns the name of this log. |
| | | * Returns the file containing the records. |
| | | * |
| | | * @return the name, which corresponds to the directory containing the log |
| | | * @return the file |
| | | */ |
| | | public String getName() |
| | | File getFile() |
| | | { |
| | | return logfile.getParent().toString(); |
| | | } |
| | | |
| | | /** |
| | | * Empties the log, discarding all records it contains. |
| | | * <p> |
| | | * This method should not be called with open cursors or |
| | | * when multiple instances of the log are opened. |
| | | * |
| | | * @throws ChangelogException |
| | | * If a problem occurs. |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | checkLogIsEnabledForWrite(); |
| | | |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | close(); |
| | | final boolean isDeleted = logfile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(logfile.getPath())); |
| | | } |
| | | initialize(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(getName(), stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | } |
| | | return logfile; |
| | | } |
| | | |
| | | private void checkLogIsEnabledForWrite() throws ChangelogException |
| | | { |
| | | if (!isWriteEnabled) |
| | | { |
| | | throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(rootPath.getPath())); |
| | | } |
| | | } |
| | | |
| | | private void createRootDirIfNotExists() throws ChangelogException |
| | | { |
| | | if (!rootPath.exists()) |
| | | { |
| | | final boolean created = rootPath.mkdirs(); |
| | | if (!created) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(rootPath.getPath())); |
| | | } |
| | | throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath())); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add a record at the end of this log from the provided key and value. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | | * to file system, it is necessary to explicitely call the |
| | | * {@code syncToFileSystem()} method. |
| | | * |
| | | * @param key |
| | | * The key of the record. |
| | | * @param value |
| | | * The value of the record. |
| | | * @throws ChangelogException |
| | | * If the record can't be added to the log. |
| | | */ |
| | | public void addRecord(final K key, final V value) throws ChangelogException |
| | | { |
| | | addRecord(Record.from(key, value)); |
| | | } |
| | | |
| | | /** |
| | | * Add the provided record at the end of this log. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | |
| | | * @throws ChangelogException |
| | | * If the record can't be added to the log. |
| | | */ |
| | | public void addRecord(final Record<K, V> record) throws ChangelogException |
| | | void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | checkLogIsEnabledForWrite(); |
| | | |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | writer.write(encodeRecord(record)); |
| | | writer.flush(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getName()), e); |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e); |
| | | } |
| | | } |
| | | |
| | | private ByteString encodeRecord(final Record<K, V> record) |
| | | { |
| | | final ByteString data = parser.encodeRecord(record.getKey(), record.getValue()); |
| | | final ByteString data = parser.encodeRecord(record); |
| | | return new ByteStringBuilder() |
| | | .append(data.length()) |
| | | .append(data) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Dump this log as text file, intended for debugging purpose only. |
| | | * Dump this log file as a text file, intended for debugging purpose only. |
| | | * |
| | | * @param dumpFile |
| | | * File that will contains log records using a human-readable format |
| | | * @throws ChangelogException |
| | | * If an error occurs during dump |
| | | */ |
| | | public void dumpAsTextFile(File dumpFile) throws ChangelogException |
| | | void dumpAsTextFile(File dumpFile) throws ChangelogException |
| | | { |
| | | DBCursor<Record<K, V>> cursor = getCursor(); |
| | | BufferedWriter textWriter = null; |
| | |
| | | { |
| | | Record<K, V> record = cursor.getRecord(); |
| | | textWriter.write("key=" + record.getKey()); |
| | | textWriter.write(" -- "); |
| | | textWriter.write(" | "); |
| | | textWriter.write("value=" + record.getValue()); |
| | | textWriter.write('\n'); |
| | | cursor.next(); |
| | |
| | | { |
| | | // No I18N needed, used for debugging purpose only |
| | | throw new ChangelogException( |
| | | Message.raw("Error when dumping content of log '%s' in target file : '%s'", getName(), dumpFile), e); |
| | | Message.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), e); |
| | | } |
| | | finally |
| | | { |
| | |
| | | * @throws ChangelogException |
| | | * If the synchronization fails. |
| | | */ |
| | | public void syncToFileSystem() throws ChangelogException |
| | | void syncToFileSystem() throws ChangelogException |
| | | { |
| | | exclusiveLock.lock(); |
| | | checkLogIsEnabledForWrite(); |
| | | try |
| | | { |
| | | writer.sync(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getName()), e); |
| | | } |
| | | finally |
| | | { |
| | | exclusiveLock.unlock(); |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e); |
| | | } |
| | | } |
| | | |
| | |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public LogCursor<K, V> getCursor() throws ChangelogException |
| | | LogFileCursor<K, V> getCursor() throws ChangelogException |
| | | { |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | return new LogFileCursor<K, V>(this); |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | return new LogFileCursor<K, V>(this); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public LogCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | LogFileCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, false); |
| | | } |
| | |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public LogCursor<K, V> getNearestCursor(final K key) throws ChangelogException |
| | | LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, true); |
| | | } |
| | | |
| | | /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */ |
| | | private LogCursor<K, V> getCursor(final K key, boolean findNearest) |
| | | private LogFileCursor<K, V> getCursor(final K key, boolean findNearest) |
| | | throws ChangelogException |
| | | { |
| | | if (key == null) |
| | |
| | | return getCursor(); |
| | | } |
| | | LogFileCursor<K, V> cursor = null; |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogFileCursor<K, V>(this); |
| | | cursor.positionTo(key, findNearest); |
| | | // if target is not found, cursor is positioned at end of stream |
| | |
| | | StaticUtils.close(cursor); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor initialised to the provided record and position in file. |
| | | * |
| | | * @param record |
| | | * The initial record this cursor points on |
| | | * @param position |
| | | * The file position this cursor points on |
| | | * @return the cursor |
| | | * @throws ChangelogException |
| | | * If a problem occurs while creating the cursor. |
| | | */ |
| | | LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException |
| | | { |
| | | return new LogFileCursor<K, V>(this, record, position); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | public Record<K, V> getOldestRecord() throws ChangelogException |
| | | Record<K, V> getOldestRecord() throws ChangelogException |
| | | { |
| | | DBCursor<Record<K, V>> cursor = null; |
| | | try |
| | |
| | | * @throws ChangelogException |
| | | * If an error occurs while retrieving the record. |
| | | */ |
| | | public Record<K, V> getNewestRecord() throws ChangelogException |
| | | Record<K, V> getNewestRecord() throws ChangelogException |
| | | { |
| | | // TODO : need a more efficient way to retrieve it |
| | | DBCursor<Record<K, V>> cursor = null; |
| | |
| | | /** {@inheritDoc} */ |
| | | public void close() |
| | | { |
| | | exclusiveLock.lock(); |
| | | try |
| | | if (isWriteEnabled) |
| | | { |
| | | if (isClosed) |
| | | try |
| | | { |
| | | return; |
| | | syncToFileSystem(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | writer.close(); |
| | | } |
| | | readerPool.shutdown(); |
| | | } |
| | | |
| | | if (isWriteEnabled) |
| | | { |
| | | try |
| | | { |
| | | syncToFileSystem(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | writer.close(); |
| | | } |
| | | readerPool.shutdown(); |
| | | isClosed = true; |
| | | } |
| | | finally |
| | | /** |
| | | * Delete this log file (file is physically removed). Should be called only |
| | | * when log file is closed. |
| | | * |
| | | * @throws ChangelogException |
| | | * If log file can't be deleted. |
| | | */ |
| | | void delete() throws ChangelogException |
| | | { |
| | | final boolean isDeleted = logfile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | exclusiveLock.unlock(); |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath())); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Return the size of this log file in bytes. |
| | | * |
| | | * @return the size of log file |
| | | */ |
| | | long getSizeInBytes() |
| | | { |
| | | return writer.getBytesWritten(); |
| | | } |
| | | |
| | | /** The path of this log file as a String. */ |
| | | private String getPath() |
| | | { |
| | | return logfile.getPath(); |
| | | } |
| | | |
| | | /** Read a record from the provided reader. */ |
| | | private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException |
| | | { |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return null; |
| | | } |
| | | final ByteString recordData = readEncodedRecord(reader); |
| | | return recordData != null ? parser.decodeRecord(recordData) : null; |
| | | } |
| | |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e); |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException |
| | |
| | | } |
| | | } |
| | | |
| | | /** Seek to provided position on the provided reader. */ |
| | | /** Seek to given position on the provided reader. */ |
| | | private void seek(RandomAccessFile reader, long position) throws ChangelogException |
| | | { |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | reader.seek(position); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e); |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /** Release the provided reader. */ |
| | | private void releaseReader(RandomAccessFile reader) { |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | readerPool.release(reader); |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | readerPool.release(reader); |
| | | } |
| | | |
| | | /** |
| | | * A cursor on the log. |
| | | */ |
| | | static interface LogCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>> |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | /** |
| | | * Position the cursor to the record corresponding to the provided key or to |
| | | * the nearest key (the lowest key higher than the provided key). |
| | | * <p> |
| | | * The record is only searched forward. To search backward, it is first |
| | | * necessary to call the {@code rewind()} method to start from beginning of |
| | | * log file. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, use the key of the first record instead. |
| | | * @param findNearest |
| | | * If {@code true}, start position is the lowest key that is higher |
| | | * than the provided key, otherwise start position is the provided |
| | | * key. |
| | | * @return {@code true} if cursor is successfully positionned to the key or |
| | | * the the nearest key, {@code false} otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs when positioning cursor. |
| | | */ |
| | | boolean positionTo(K key, boolean findNearest) throws ChangelogException; |
| | | return logfile.hashCode(); |
| | | } |
| | | |
| | | /** |
| | | * Rewind the cursor, positioning it to the beginning of the log file, |
| | | * pointing to no record initially. |
| | | * |
| | | * @throws ChangelogException |
| | | * If an error occurs when rewinding to zero. |
| | | */ |
| | | void rewind() throws ChangelogException; |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean equals(Object that) |
| | | { |
| | | if (this == that) |
| | | { |
| | | return true; |
| | | } |
| | | if (!(that instanceof LogFile)) |
| | | { |
| | | return false; |
| | | } |
| | | final LogFile<?, ?> other = (LogFile<?, ?>) that; |
| | | return logfile.equals(other.logfile); |
| | | } |
| | | |
| | | /** |
| | | * Implements a cursor on the log. |
| | | * Implements a repositionable cursor on the log file. |
| | | * <p> |
| | | * The cursor initially points to a record, that is {@code cursor.getRecord()} |
| | | * is equals to the first record available from the cursor before any call to |
| | | * {@code cursor.next()} method. |
| | | */ |
| | | private static final class LogFileCursor<K extends Comparable<K>, V> implements LogCursor<K,V> |
| | | static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V> |
| | | { |
| | | /** The underlying log on which entries are read. */ |
| | | private final LogFile<K, V> logFile; |
| | |
| | | * @throws ChangelogException |
| | | * If an error occurs when creating the cursor. |
| | | */ |
| | | LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException |
| | | private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException |
| | | { |
| | | this.logFile = logFile; |
| | | this.reader = logFile.getReader(); |
| | |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | public String toString() |
| | | /** |
| | | * Creates a cursor on the provided log, initialised to the provided record and |
| | | * pointing to the provided file position. |
| | | * <p> |
| | | * Note: there is no check to ensure that provided record and file position are |
| | | * consistent. It is the responsability of the caller of this method. |
| | | */ |
| | | private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException |
| | | { |
| | | return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | { |
| | | return currentRecord; |
| | | this.logFile = logFile; |
| | | this.reader = logFile.getReader(); |
| | | this.currentRecord = record; |
| | | logFile.seek(reader, filePosition); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | { |
| | | return currentRecord; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(final K key, boolean findNearest) throws ChangelogException { |
| | | do |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void rewind() throws ChangelogException |
| | | { |
| | | logFile.seek(reader, 0); |
| | | currentRecord = null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | logFile.releaseReader(reader); |
| | | } |
| | | } |
| | | |
| | | /** An empty cursor, that always return null records and false to {@code next()} method. */ |
| | | static final class EmptyLogCursor<K extends Comparable<K>, V> implements LogCursor<K,V> |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | /** |
| | | * Returns the file position this cursor is pointing at. |
| | | * |
| | | * @return the position of reader on the log file |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | long getFilePosition() throws ChangelogException |
| | | { |
| | | return null; |
| | | try |
| | | { |
| | | return reader.getFilePointer(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void rewind() throws ChangelogException |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "EmptyLogCursor"; |
| | | return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord); |
| | | } |
| | | |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import java.io.BufferedOutputStream; |
| | | import java.io.File; |
| | | import java.io.FileDescriptor; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.OutputStream; |
| | | import java.io.SyncFailedException; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.loggers.MeteredStream; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | |
| | | /** |
| | | * A writer on a log file. |
| | | * <p> |
| | | * The writer is cached in order to have a single writer per file in the JVM. |
| | | */ |
| | | class LogWriter extends OutputStream |
| | | { |
| | | /** The cache of log writers. There is a single writer per file in the JVM. */ |
| | | private static final Map<File, LogWriter> logWritersCache = new HashMap<File, LogWriter>(); |
| | | |
| | | /** The exclusive lock used to acquire or close a log writer. */ |
| | | private static final Object lock = new Object(); |
| | | |
| | | /** The file to write in. */ |
| | | private final File file; |
| | | |
| | | /** The stream to write data in the file. */ |
| | | private final BufferedOutputStream stream; |
| | | /** The stream to write data in the file, capable of counting bytes written. */ |
| | | private final MeteredStream stream; |
| | | |
| | | /** The file descriptor on the file. */ |
| | | private final FileDescriptor fileDescriptor; |
| | | |
| | | /** The number of references on this writer. */ |
| | | private int referenceCount; |
| | | |
| | | /** |
| | | * Creates a writer on the provided file. |
| | | * |
| | | * @param file |
| | | * The file to write. |
| | | * @param stream |
| | | * The stream to write in the file. |
| | | * @param fileDescriptor |
| | | * The descriptor on the file. |
| | | * @throws ChangelogException |
| | | * If a problem occurs at creation. |
| | | */ |
| | | private LogWriter(final File file, BufferedOutputStream stream, FileDescriptor fileDescriptor) |
| | | throws ChangelogException |
| | | public LogWriter(final File file) throws ChangelogException |
| | | { |
| | | this.file = file; |
| | | this.stream = stream; |
| | | this.fileDescriptor = fileDescriptor; |
| | | this.referenceCount = 1; |
| | | } |
| | | |
| | | /** |
| | | * Returns a log writer on the provided file, creating it if necessary. |
| | | * |
| | | * @param file |
| | | * The log file to write in. |
| | | * @return the log writer |
| | | * @throws ChangelogException |
| | | * If a problem occurs. |
| | | */ |
| | | public static LogWriter acquireWriter(File file) throws ChangelogException |
| | | { |
| | | synchronized (lock) |
| | | try |
| | | { |
| | | LogWriter logWriter = logWritersCache.get(file); |
| | | if (logWriter == null) |
| | | { |
| | | try |
| | | { |
| | | final FileOutputStream stream = new FileOutputStream(file, true); |
| | | logWriter = new LogWriter(file, new BufferedOutputStream(stream), stream.getFD()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath())); |
| | | } |
| | | logWritersCache.put(file, logWriter); |
| | | } |
| | | else |
| | | { |
| | | logWriter.incrementRefCounter(); |
| | | } |
| | | return logWriter; |
| | | FileOutputStream fos = new FileOutputStream(file, true); |
| | | this.stream = new MeteredStream(fos, file.length()); |
| | | this.fileDescriptor = fos.getFD(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath())); |
| | | } |
| | | } |
| | | |
| | |
| | | bs.copyTo(stream); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void flush() throws IOException |
| | | /** |
| | | * Returns the number of bytes written in the underlying file. |
| | | * |
| | | * @return the number of bytes |
| | | */ |
| | | public long getBytesWritten() |
| | | { |
| | | stream.flush(); |
| | | return stream.getBytesWritten(); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public void close() |
| | | { |
| | | synchronized (lock) |
| | | { |
| | | LogWriter writer = logWritersCache.get(file); |
| | | if (writer == null) |
| | | { |
| | | // writer is already closed |
| | | return; |
| | | } |
| | | // counter == 0 should never happen |
| | | if (referenceCount == 0 || referenceCount == 1) |
| | | { |
| | | StaticUtils.close(stream); |
| | | logWritersCache.remove(file); |
| | | referenceCount = 0; |
| | | } |
| | | else |
| | | { |
| | | referenceCount--; |
| | | } |
| | | } |
| | | StaticUtils.close(stream); |
| | | } |
| | | |
| | | private void incrementRefCounter() |
| | | { |
| | | referenceCount++; |
| | | } |
| | | |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteString; |
| | | |
| | | /** |
| | |
| | | */ |
| | | interface RecordParser<K, V> |
| | | { |
| | | |
| | | /** |
| | | * Decode a record from the provided byte array. |
| | | * <p> |
| | |
| | | Record<K, V> decodeRecord(ByteString data) throws DecodingException; |
| | | |
| | | /** |
| | | * Encode the provided key and value to a byte array. |
| | | * Encode the provided record to a byte array. |
| | | * <p> |
| | | * The returned array is intended to be stored as provided in the log file. |
| | | * |
| | | * @param key |
| | | * The key of the record. |
| | | * @param value |
| | | * The value of the record. |
| | | * @param record |
| | | * The record to encode. |
| | | * @return the bytes array representing the (key,value) record |
| | | */ |
| | | ByteString encodeRecord(K key, V value); |
| | | ByteString encodeRecord(Record<K, V> record); |
| | | |
| | | /** |
| | | * Read the key from the provided string. |
| | | * |
| | | * @param key |
| | | * The string representation of key, suitable for use in a filename, |
| | | * as written by the {@code encodeKeyToString()} method. |
| | | * @return the key |
| | | * @throws ChangelogException |
| | | * If key can't be read from the string. |
| | | */ |
| | | K decodeKeyFromString(String key) throws ChangelogException; |
| | | |
| | | /** |
| | | * Returns the provided key as a string that is suitable to be used in a |
| | | * filename. |
| | | * |
| | | * @param key |
| | | * The key of a record. |
| | | * @return a string encoding the key, unambiguously decodable to the original |
| | | * key, and suitable for use in a filename. The string should contain |
| | | * only ASCII characters and no space. |
| | | */ |
| | | String encodeKeyToString(K key); |
| | | |
| | | /** |
| | | * Returns a key that is guaranted to be always higher than any other key. |
| | | * |
| | | * @return the highest possible key |
| | | */ |
| | | K getMaxKey(); |
| | | |
| | | } |
| | |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | |
| | | * id of the server. Each directory contains the log files for the given server |
| | | * id.</li> |
| | | * </ul> |
| | | * All log files end with the ".log" suffix. |
| | | * All log files end with the ".log" suffix. Log files always include the "head.log" |
| | | * file and optionally zero to many read-only log files named after the lowest key |
| | | * and highest key present in the log file. |
| | | * <p> |
| | | * Layout example with two domains "o=test1" and "o=test2", each having server |
| | | * ids 22 and 33 : |
| | |
| | | * +---changelog |
| | | * | \---domains.state [contains mapping: 1 => "o=test1", 2 => "o=test2"] |
| | | * | \---changenumberindex |
| | | * | \--- current.log |
| | | * | \--- head.log [contains last records written] |
| | | * | \--- 1_50.log [contains records with keys in interval [1, 50]] |
| | | * | \---1.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---current.log |
| | | * | \---head.log |
| | | * | \---33.server |
| | | * | \---current.log |
| | | * | \---head.log |
| | | * | \---2.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | | * | \---current.log |
| | | * | \---head.log |
| | | * | \---33.server |
| | | * | \---current.log |
| | | * | \---head.log |
| | | * </pre> |
| | | */ |
| | | class ReplicationEnvironment |
| | | { |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | // TODO : to replace by configurable value |
| | | private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024; |
| | | |
| | | private static final int NO_GENERATION_ID = -1; |
| | | |
| | | private static final String CN_INDEX_DB_DIRNAME = "changenumberindex"; |
| | |
| | | private final String replicationRootPath; |
| | | |
| | | /** The list of logs that are in use. */ |
| | | private final List<LogFile<?, ?>> logs = new CopyOnWriteArrayList<LogFile<?, ?>>(); |
| | | private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>(); |
| | | |
| | | /** Maps each domain DN to a domain id that is used to name directory in file system. */ |
| | | private final Map<DN, String> domains = new HashMap<DN, String>(); |
| | |
| | | * @throws ChangelogException |
| | | * if an error occurs. |
| | | */ |
| | | LogFile<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId) |
| | | Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId) |
| | | throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | * @throws ChangelogException |
| | | * when a problem occurs. |
| | | */ |
| | | LogFile<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException |
| | | Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException |
| | | { |
| | | final File path = getCNIndexDBPath(); |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears the content of replication database. |
| | | * |
| | | * @param log |
| | | * The log to clear. |
| | | */ |
| | | void clearDB(final LogFile<?, ?> log) |
| | | { |
| | | try |
| | | { |
| | | log.clear(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | logError(ERR_ERROR_CLEARING_DB.get(log.getName(), stackTraceToSingleLineString(e))); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the generated id associated to the provided domain DN from the state |
| | | * Db. |
| | | * <p> |
| | |
| | | } |
| | | |
| | | /** Open a log from the provided path and record parser. */ |
| | | private <K extends Comparable<K>, V> LogFile<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser) |
| | | private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser) |
| | | throws ChangelogException |
| | | { |
| | | checkShutDownBeforeOpening(serverIdPath); |
| | | |
| | | final LogFile<K, V> log = LogFile.newAppendableLogFile(serverIdPath, parser); |
| | | final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES); |
| | | |
| | | checkShutDownAfterOpening(serverIdPath, log); |
| | | |
| | |
| | | return log; |
| | | } |
| | | |
| | | private void checkShutDownAfterOpening(final File serverIdPath, final LogFile<?, ?> log) throws ChangelogException |
| | | private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException |
| | | { |
| | | if (isShuttingDown.get()) |
| | | { |
| | | closeDB(log); |
| | | closeLog(log); |
| | | throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(), |
| | | replicationServer.getServerId())); |
| | | } |
| | |
| | | return new File(replicationRootPath, CN_INDEX_DB_DIRNAME); |
| | | } |
| | | |
| | | private void closeDB(final LogFile<?, ?> log) |
| | | private void closeLog(final Log<?, ?> log) |
| | | { |
| | | logs.remove(log); |
| | | log.close(); |
| | |
| | | return new File(testResourceDir, filename); |
| | | } |
| | | |
| | | public static File getUnitTestRootPath() |
| | | { |
| | | final String buildRoot = System.getProperty(PROPERTY_BUILD_ROOT); |
| | | final String path = System.getProperty(PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); |
| | | return new File(path, "unit-tests"); |
| | | } |
| | | |
| | | /** |
| | | * Prevent instantiation. |
| | | */ |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Portions copyright 2013 ForgeRock AS |
| | | * Portions Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | |
| | | + "cn: Replication Server\n" |
| | | + "ds-cfg-replication-port: " + replServerPort + "\n" |
| | | + "ds-cfg-replication-db-directory: ChangeNumberControlDbTest\n" |
| | | + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n" |
| | | + "ds-cfg-replication-server-id: 103\n"; |
| | | |
| | | // suffix synchronized |
| | |
| | | * |
| | | * |
| | | * Copyright 2007-2009 Sun Microsystems, Inc. |
| | | * Portions copyright 2013 ForgeRock AS |
| | | * Portions Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | |
| | | + "cn: Replication Server\n" |
| | | + "ds-cfg-replication-port:" + replServerPort + "\n" |
| | | + "ds-cfg-replication-db-directory: ReSyncTest\n" |
| | | + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n" |
| | | + "ds-cfg-replication-server-id: 104\n"; |
| | | |
| | | // suffix synchronized |
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | |
| | | + "cn: replication Server\n" |
| | | + "ds-cfg-replication-port: " + replServerPort + "\n" |
| | | + "ds-cfg-replication-db-directory: HistoricalTest\n" |
| | | + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n" |
| | | + "ds-cfg-replication-server-id: 102\n"; |
| | | |
| | | // The suffix to be synchronized. |
| | |
| | | |
| | | addEntriesWithHistorical(1, entryCnt); |
| | | // leave a little delay between adding/modifying test entries |
| | | // and configuring the purge delay. |
| | | // and configuring the purge delay. |
| | | Thread.sleep(10); |
| | | |
| | | // set the purge delay to 1 minute |
| | |
| | | { |
| | | RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER; |
| | | |
| | | ByteString data = parser.encodeRecord(msg.getChangeNumber(), msg); |
| | | ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg)); |
| | | Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data); |
| | | |
| | | assertThat(record).isNotNull(); |
| | |
| | | * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li> |
| | | * </ol> |
| | | */ |
| | | // TODO :: enable when purge is implemented with multi-files log |
| | | // TODO : this works only if we ensure that there is a rotation of ahead log file |
| | | // at the right place. First two records are 37 and 76 bytes long, |
| | | // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file |
| | | // Re-enable this test when max file size is customizable for log |
| | | @Test(enabled=false) |
| | | public void testPurge() throws Exception |
| | | { |
| | |
| | | { |
| | | RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER; |
| | | |
| | | ByteString data = parser.encodeRecord(msg.getCSN(), msg); |
| | | ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg)); |
| | | Record<CSN, UpdateMsg> record = parser.decodeRecord(data); |
| | | |
| | | assertThat(record).isNotNull(); |
| | |
| | | } |
| | | } |
| | | |
| | | // TODO : enable when purge is enabled with multi-files log implementation |
| | | // TODO : this works only if we ensure that there is a rotation of ahead log file |
| | | // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have |
| | | // the last record alone in the ahead log file |
| | | // Re-enable this test when max file size is customizable for log |
| | | @Test(enabled=false) |
| | | public void testPurge() throws Exception |
| | | { |
| | |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.Test; |
| | | |
| | |
| | | @Test(sequential=true) |
| | | public class LogFileTest extends DirectoryServerTestCase |
| | | { |
| | | private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog"; |
| | | private static final String TEST_DIRECTORY_CHANGELOG = "test-output" + File.separator + "changelog"; |
| | | |
| | | private static final StringRecordParser RECORD_PARSER = new StringRecordParser(); |
| | | static final StringRecordParser RECORD_PARSER = new StringRecordParser(); |
| | | |
| | | private static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() { |
| | | static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() { |
| | | @Override |
| | | public Record<String, String> decodeRecord(ByteString data) throws DecodingException |
| | | { |
| | |
| | | } |
| | | }; |
| | | |
| | | @BeforeClass |
| | | public void createTestDirectory() |
| | | { |
| | | File logDir = new File(TEST_DIRECTORY_CHANGELOG); |
| | | logDir.mkdirs(); |
| | | } |
| | | |
| | | @BeforeMethod |
| | | /** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */ |
| | | public void initialize() throws Exception |
| | | { |
| | | File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, LogFile.LOG_FILE_NAME); |
| | | File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME); |
| | | if (theLogFile.exists()) |
| | | { |
| | | theLogFile.delete(); |
| | |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | logFile.addRecord("key"+i, "value"+i); |
| | | logFile.append(Record.from("key"+i, "value"+i)); |
| | | } |
| | | logFile.close(); |
| | | } |
| | | |
| | | @AfterMethod |
| | | @AfterClass |
| | | public void cleanTestChangelogDirectory() |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | |
| | | |
| | | private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException |
| | | { |
| | | LogFile<String, String> logFile = LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG), parser); |
| | | return logFile; |
| | | return LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME), parser); |
| | | } |
| | | |
| | | @Test |
| | |
| | | for (int i = 1; i <= 100; i++) |
| | | { |
| | | Record<String, String> record = Record.from("newkey" + i, "newvalue" + i); |
| | | writeLog.addRecord(record); |
| | | writeLog.append(record); |
| | | assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record); |
| | | assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1")); |
| | | assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record); |
| | |
| | | } |
| | | } |
| | | |
| | | @Test() |
| | | public void testTwoConcurrentWrite() throws Exception |
| | | { |
| | | final LogFile<String, String> writeLog1 = getLogFile(RECORD_PARSER); |
| | | final LogFile<String, String> writeLog2 = getLogFile(RECORD_PARSER); |
| | | try |
| | | { |
| | | writeLog1.addRecord(Record.from("startkey", "startvalue")); |
| | | Thread write1 = getWriteLogThread(writeLog1, "a"); |
| | | Thread write2 = getWriteLogThread(writeLog2, "b"); |
| | | write1.run(); |
| | | write2.run(); |
| | | |
| | | write1.join(); |
| | | write2.join(); |
| | | writeLog1.syncToFileSystem(); |
| | | DBCursor<Record<String, String>> cursor = writeLog1.getCursor("startkey"); |
| | | for (int i = 1; i <= 200; i++) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | } |
| | | assertThat(cursor.getRecord()).isIn(Record.from("k-b100", "v-b100"), Record.from("k-a100", "v-a100")); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog1, writeLog2); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value |
| | | * endIndex, using (keyN, valueN) where N is the index. |
| | |
| | | assertThat(cursor.next()).as("next() value when i=" + i).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i)); |
| | | } |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | |
| | | /** Returns a thread that write 100 records to the provided log. */ |
| | | private Thread getWriteLogThread(final LogFile<String, String> writeLog, final String recordPrefix) |
| | | private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception |
| | | { |
| | | return new Thread() { |
| | | public void run() |
| | | { |
| | | for (int i = 1; i <= 100; i++) |
| | | { |
| | | Record<String, String> record = Record.from("k-" + recordPrefix + i, "v-" + recordPrefix + i); |
| | | try |
| | | { |
| | | writeLog.addRecord(record); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | /** |
| | |
| | | return length; |
| | | } |
| | | |
| | | public ByteString encodeRecord(String key, String value) |
| | | public ByteString encodeRecord(Record<String, String> record) |
| | | { |
| | | return new ByteStringBuilder() |
| | | .append(key).append(STRING_SEPARATOR) |
| | | .append(value).append(STRING_SEPARATOR).toByteString(); |
| | | .append(record.getKey()).append(STRING_SEPARATOR) |
| | | .append(record.getValue()).append(STRING_SEPARATOR).toByteString(); |
| | | } |
| | | |
| | | @Override |
| | | public String decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | return key; |
| | | } |
| | | |
| | | @Override |
| | | public String encodeKeyToString(String key) |
| | | { |
| | | return key; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getMaxKey() |
| | | { |
| | | // '~' character has the highest ASCII value |
| | | return "~~~~"; |
| | | } |
| | | } |
| | | |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.LogFileTest.*; |
| | | |
| | | import java.io.File; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | @Test(sequential=true) |
| | | public class LogTest extends DirectoryServerTestCase |
| | | { |
| | | // Use a directory dedicated to this test class |
| | | private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | @BeforeMethod |
| | | public void initialize() throws Exception |
| | | { |
| | | // Delete any previous log |
| | | if (LOG_DIRECTORY.exists()) |
| | | { |
| | | StaticUtils.recursiveDelete(LOG_DIRECTORY); |
| | | } |
| | | |
| | | // Build a log with 10 records with String keys and String values |
| | | // Keys are using the format keyNNN where N is a figure |
| | | // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | log.append(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | log.close(); |
| | | } |
| | | |
| | | private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException |
| | | { |
| | | // Each string record has a length of approximately 18 bytes |
| | | // This size is set in order to have 2 records per log file before the rotation happens |
| | | // This allow to ensure rotation mechanism is thoroughly tested |
| | | // Some tests rely on having 2 records per log file (especially the purge tests), so take care |
| | | // if this value has to be changed |
| | | int sizeLimitPerFileInBytes = 30; |
| | | |
| | | return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes); |
| | | } |
| | | |
| | | @Test |
| | | public void testCursor() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(); |
| | | |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1")); |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnExistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor("key005"); |
| | | |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key005", "value5")); |
| | | assertThatCursorCanBeFullyRead(cursor, 6, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | // key is between key005 and key006 |
| | | cursor = log.getCursor("key005000"); |
| | | |
| | | assertThat(cursor).isNotNull(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isFalse(); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(null); |
| | | |
| | | // should start from first record |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1")); |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnExistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null; |
| | | try { |
| | | // this key is the first key of the log file "key1_key2.log" |
| | | cursor1 = log.getNearestCursor("key001"); |
| | | // lowest higher key is key2 |
| | | assertThat(cursor1.getRecord()).isEqualTo(Record.from("key002", "value2")); |
| | | assertThatCursorCanBeFullyRead(cursor1, 3, 10); |
| | | |
| | | // this key is the last key of the log file "key3_key4.log" |
| | | cursor2 = log.getNearestCursor("key004"); |
| | | // lowest higher key is key5 |
| | | assertThat(cursor2.getRecord()).isEqualTo(Record.from("key005", "value5")); |
| | | assertThatCursorCanBeFullyRead(cursor2, 6, 10); |
| | | |
| | | cursor3 = log.getNearestCursor("key009"); |
| | | // lowest higher key is key10 |
| | | assertThat(cursor3.getRecord()).isEqualTo(Record.from("key010", "value10")); |
| | | assertThatCursorIsExhausted(cursor3); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor1, cursor2, cursor3, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnExistingKey_KeyIsTheLastOne() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getNearestCursor("key010"); |
| | | |
| | | // lowest higher key does not exist |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | // key is between key005 and key006 |
| | | cursor = log.getNearestCursor("key005000"); |
| | | |
| | | // lowest higher key is key006 |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key006", "value6")); |
| | | assertThatCursorCanBeFullyRead(cursor, 7, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getNearestCursor(null); |
| | | |
| | | // should start from start |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1")); |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testCursorWhenParserFailsToRead() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER_FAILING_TO_READ); |
| | | try { |
| | | log.getCursor("key"); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetOldestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | { |
| | | Record<String, String> record = log.getOldestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetNewestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | { |
| | | Record<String, String> record = log.getNewestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key010", "value10")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test that changes are visible immediately to a reader after a write. |
| | | */ |
| | | @Test |
| | | public void testWriteAndReadOnSameLog() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | Log<String, String> readLog = null; |
| | | try |
| | | { |
| | | writeLog = openLog(LogFileTest.RECORD_PARSER); |
| | | readLog = openLog(LogFileTest.RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i); |
| | | writeLog.append(record); |
| | | assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record); |
| | | assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1")); |
| | | assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record); |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog, readLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testTwoConcurrentWrite() throws Exception |
| | | { |
| | | Log<String, String> writeLog1 = null; |
| | | Log<String, String> writeLog2 = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | { |
| | | writeLog1 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog2 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog1.append(Record.from("key020", "starting record")); |
| | | AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>(); |
| | | Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef); |
| | | Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef); |
| | | write1.run(); |
| | | write2.run(); |
| | | |
| | | write1.join(); |
| | | write2.join(); |
| | | if (exceptionRef.get() != null) |
| | | { |
| | | throw exceptionRef.get(); |
| | | } |
| | | writeLog1.syncToFileSystem(); |
| | | cursor = writeLog1.getCursor("key020"); |
| | | for (int i = 1; i <= 60; i++) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | } |
| | | assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30")); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, writeLog1, writeLog2); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This test should be disabled. |
| | | * Enable it locally when you need to have an rough idea of write performance. |
| | | */ |
| | | @Test(enabled=false) |
| | | public void logWriteSpeed() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | try |
| | | { |
| | | long sizeOf1MB = 1024*1024; |
| | | writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB); |
| | | |
| | | for (int i = 1; i < 1000000; i++) |
| | | { |
| | | writeLog.append(Record.from(String.format("key%010d", i), "value" + i)); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | // advance cursor to last record to ensure it is pointing to ahead log file |
| | | advanceCursorFromFirstRecordTo(cursor, 10); |
| | | |
| | | // add new records to ensure the ahead log file is rotated |
| | | for (int i = 11; i <= 20; i++) |
| | | { |
| | | log.append(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | |
| | | // check that cursor can fully read the new records |
| | | assertThatCursorCanBeFullyRead(cursor, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor1 = log.getCursor(); |
| | | advanceCursorFromFirstRecordTo(cursor1, 1); |
| | | cursor2 = log.getCursor(); |
| | | advanceCursorFromFirstRecordTo(cursor2, 4); |
| | | cursor3 = log.getCursor(); |
| | | advanceCursorFromFirstRecordTo(cursor3, 9); |
| | | cursor4 = log.getCursor(); |
| | | advanceCursorFromFirstRecordTo(cursor4, 10); |
| | | |
| | | // add new records to ensure the ahead log file is rotated |
| | | for (int i = 11; i <= 20; i++) |
| | | { |
| | | log.append(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | |
| | | // check that cursors can fully read the new records |
| | | assertThatCursorCanBeFullyRead(cursor1, 2, 20); |
| | | assertThatCursorCanBeFullyRead(cursor2, 5, 20); |
| | | assertThatCursorCanBeFullyRead(cursor3, 10, 20); |
| | | assertThatCursorCanBeFullyRead(cursor4, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testClear() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | log.clear(); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved |
| | | @Test(enabled=false, expectedExceptions=ChangelogException.class) |
| | | public void testClearWhenCursorIsOpened() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | log.clear(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "purgeKeys") |
| | | Object[][] purgeKeys() |
| | | { |
| | | // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor |
| | | return new Object[][] |
| | | { |
| | | // lowest key of the read-only log file "key005_key006.log" |
| | | { "key005", Record.from("key005", "value5"), 6, 10}, |
| | | // key that is not the lowest of the read-only log file "key005_key006.log" |
| | | { "key006", Record.from("key005", "value5"), 6, 10}, |
| | | // lowest key of the ahead log file "ahead.log" |
| | | { "key009", Record.from("key009", "value9"), 10, 10}, |
| | | // key that is not the lowest of the ahead log file "ahead.log" |
| | | { "key010", Record.from("key009", "value9"), 10, 10}, |
| | | |
| | | // key not present in log, which is between key005 and key006 |
| | | { "key005a", Record.from("key005", "value5"), 6, 10}, |
| | | // key not present in log, which is between key006 and key007 |
| | | { "key006a", Record.from("key007", "value7"), 8, 10}, |
| | | // key not present in log, which is lower than oldest key key001 |
| | | { "key000", Record.from("key001", "value1"), 2, 10}, |
| | | // key not present in log, which is higher than newest key key010 |
| | | // should return the lowest key present in ahead log |
| | | { "key011", Record.from("key009", "value9"), 10, 10}, |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Given a purge key, after purge is done, expects a new cursor to point on first record provided and |
| | | * then to be fully read starting at provided start index and finishing at provided end index. |
| | | */ |
| | | @Test(dataProvider="purgeKeys") |
| | | public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge, |
| | | int cursorStartIndex, int cursorEndIndex) throws Exception |
| | | { |
| | | Log<String, String> log = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | |
| | | log.purgeUpTo(purgeKey); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge); |
| | | assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | private void advanceCursorFromFirstRecordTo(DBCursor<Record<String, String>> cursor, int endIndex) |
| | | throws Exception |
| | | { |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1")); |
| | | advanceCursorUpTo(cursor, 2, endIndex); |
| | | } |
| | | |
| | | private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | | for (int i = fromIndex; i <= endIndex; i++) |
| | | { |
| | | assertThat(cursor.next()).as("next() value when i=" + i).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value |
| | | * endIndex, using (keyN, valueN) where N is the index. |
| | | */ |
| | | private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | | advanceCursorUpTo(cursor, fromIndex, endIndex); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | |
| | | private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception |
| | | { |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | /** Returns a thread that write N records to the provided log. */ |
| | | private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix, |
| | | final AtomicReference<ChangelogException> exceptionRef) |
| | | { |
| | | return new Thread() { |
| | | public void run() |
| | | { |
| | | for (int i = 1; i <= 30; i++) |
| | | { |
| | | Record<String, String> record = Record.from( |
| | | String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i); |
| | | try |
| | | { |
| | | writeLog.append(record); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // keep the first exception only |
| | | exceptionRef.compareAndSet(null, e); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | } |
| | | |
| | | } |
| | |
| | | final DN domainDN = DN.decode(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); |
| | | LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); |
| | | LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); |
| | | Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); |
| | | Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); |
| | | StaticUtils.close(cnDB, replicaDB, replicaDB2); |
| | | |
| | | ChangelogState state = environment.readChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(1, 2))); |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDN); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); |
| | | } |
| | | |
| | |
| | | List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING)); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); |
| | | List<LogFile<CSN,UpdateMsg>> replicaDBs = new ArrayList<LogFile<CSN,UpdateMsg>>(); |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB(); |
| | | List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>(); |
| | | for (int i = 0; i <= 2 ; i++) |
| | | { |
| | | for (int j = 1; j <= 10; j++) |
| | |
| | | File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | DN domainDN = DN.decode(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); |
| | | LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); |
| | | Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1); |
| | | Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1); |
| | | StaticUtils.close(replicaDB, replicaDB2); |
| | | |
| | | // delete the domain directory created for the 2 replica DBs to break the |