From 01eb7d07467b57c61868c73e9a94bff1d0b2dcd1 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 21 May 2014 15:56:41 +0000
Subject: [PATCH] OPENDJ-1389 – Add support for replication changelog DB rotation
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java | 5
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 510 +++-------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java | 107 -
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java | 17
opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java | 43
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 10
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java | 69 +
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java | 1091 +++++++++++++++++++++++
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 55
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java | 3
opends/src/messages/messages/replication.properties | 26
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java | 571 ++++++++++++
opends/src/server/org/opends/server/loggers/MeteredStream.java | 15
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java | 7
opends/src/server/org/opends/server/replication/common/CSN.java | 5
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java | 7
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java | 7
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 7
opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java | 3
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 54
opends/src/server/org/opends/server/loggers/RotatableLogFile.java | 51 +
opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java | 112 -
opends/src/server/org/opends/server/loggers/MultifileTextWriter.java | 20
opends/src/server/org/opends/server/loggers/RotationPolicy.java | 10
opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java | 3
opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java | 3
27 files changed, 2,198 insertions(+), 616 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 40600ff..e030d37 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -118,7 +118,7 @@
SEVERE_ERR_BAD_HISTORICAL_56=Entry %s was containing some unknown historical \
information, This may cause some inconsistency for this entry
MILD_ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE_57=A conflict was detected but the \
- conflict information could not be added. Operation: %s, Result: %s
+ conflict information could not be added. Operation: %s, Result: %s
MILD_ERR_CANNOT_RENAME_CONFLICT_ENTRY_58=An error happened trying to \
rename a conflicting entry. DN: %s, Operation: %s, Result: %s
MILD_ERR_EXCEPTION_RENAME_CONFLICT_ENTRY_59=An Exception happened when \
@@ -562,7 +562,7 @@
on log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD_254=Could not decode a record from data \
read in log file '%s'
-SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file(s): '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE_256=Could not create log file '%s'
SEVERE_WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE_257=The changelog '%s' has been opened in \
read-only mode, it is not enabled for write
@@ -583,3 +583,25 @@
Actual domain ids found in file system: '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE_265=Could not create a new domain \
id %s for domain DN %s and save it in domain state file '%s"
+SEVERE_ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE_266=Could not get reader \
+ position for cursor in log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING_267=Could not decode the key from \
+ string [%s]
+SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG_268=When cleaning log '%s', \
+ found %d cursor(s) still opened on the log
+SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG_269=When closing log '%s', \
+ found %d cursor(s) still opened on the log
+SEVERE_ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG_270=Could not initialize \
+ the log '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE_271=Could not \
+ retrieve key bounds from log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST_272=Could not \
+ retrieve read-only log files from log '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING_273=While purging log, could not \
+ delete log file(s): '%s'
+SEVERE_ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING_274 =The following log \
+ '%s' must be released but it is not referenced."
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
+ head log file from '%s' to '%s'
+INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
+ size of head log file is %d bytes
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java b/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java
index 96a8f6d..159863e 100644
--- a/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java
@@ -22,6 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2014 ForgeRock AS
*/
package org.opends.server.loggers;
import org.opends.messages.Message;
@@ -112,7 +113,7 @@
/**
* {@inheritDoc}
*/
- public boolean rotateFile(MultifileTextWriter writer)
+ public boolean rotateFile(RotatableLogFile writer)
{
Calendar lastRotationTime = writer.getLastRotationTime();
diff --git a/opends/src/server/org/opends/server/loggers/MeteredStream.java b/opends/src/server/org/opends/server/loggers/MeteredStream.java
index 59c053d..803284b 100644
--- a/opends/src/server/org/opends/server/loggers/MeteredStream.java
+++ b/opends/src/server/org/opends/server/loggers/MeteredStream.java
@@ -22,6 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2014 ForgeRock AS
*/
package org.opends.server.loggers;
@@ -33,7 +34,7 @@
* (a) forwards all its output to a target stream
* (b) keeps track of how many bytes have been written.
*/
-class MeteredStream extends OutputStream
+public final class MeteredStream extends OutputStream
{
OutputStream out;
long written;
@@ -45,7 +46,7 @@
* @param out The target output stream to keep track of.
* @param written The number of bytes written to the stream.
*/
- MeteredStream(OutputStream out, long written)
+ public MeteredStream(OutputStream out, long written)
{
this.out = out;
this.written = written;
@@ -111,5 +112,15 @@
{
out.close();
}
+
+ /**
+ * Returns the number of bytes written in this stream.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesWritten()
+ {
+ return written;
+ }
}
diff --git a/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java b/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java
index bfa7f2a..7fd6ba7 100644
--- a/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java
+++ b/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java
@@ -22,6 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2014 ForgeRock AS
*/
package org.opends.server.loggers;
@@ -57,7 +58,7 @@
* new one named in accordance with a specified FileNamingPolicy.
*/
public class MultifileTextWriter
- implements ServerShutdownListener, TextWriter,
+ implements ServerShutdownListener, TextWriter, RotatableLogFile,
ConfigurationChangeListener<SizeLimitLogRotationPolicyCfg>
{
/**
@@ -180,7 +181,6 @@
outputStream = new MeteredStream(stream, file.length());
OutputStreamWriter osw = new OutputStreamWriter(outputStream, encoding);
- BufferedWriter bw = null;
if(bufferSize <= 0)
{
writer = new BufferedWriter(osw);
@@ -687,11 +687,8 @@
this.actions = actions;
}
- /**
- * Retrieves the number of bytes written to the current log file.
- *
- * @return The number of bytes written to the current log file.
- */
+ /** {@inheritDoc} */
+ @Override
public long getBytesWritten()
{
return outputStream.written;
@@ -719,13 +716,8 @@
return lastCleanCount;
}
- /**
- * Retrieves the last time a log file was rotated in this instance of
- * Directory Server. If a log rotation never
- * occurred, this value will be the time the server started.
- *
- * @return The last time log rotation occurred.
- */
+ /** {@inheritDoc} */
+ @Override
public Calendar getLastRotationTime()
{
return lastRotationTime;
diff --git a/opends/src/server/org/opends/server/loggers/RotatableLogFile.java b/opends/src/server/org/opends/server/loggers/RotatableLogFile.java
new file mode 100644
index 0000000..eaf33db
--- /dev/null
+++ b/opends/src/server/org/opends/server/loggers/RotatableLogFile.java
@@ -0,0 +1,51 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.loggers;
+
+import java.util.Calendar;
+
+/**
+ * Represents a file that can be rotated based on size or on time.
+ */
+public interface RotatableLogFile
+{
+
+ /**
+ * Retrieves the number of bytes written to the file.
+ *
+ * @return The number of bytes written to the file.
+ */
+ long getBytesWritten();
+
+ /**
+ * Retrieves the last time the file was rotated. If a file rotation never
+ * occurred, this value will be the time the server started.
+ *
+ * @return The last time file rotation occurred.
+ */
+ Calendar getLastRotationTime();
+
+}
diff --git a/opends/src/server/org/opends/server/loggers/RotationPolicy.java b/opends/src/server/org/opends/server/loggers/RotationPolicy.java
index dcbc8cc..af45956 100644
--- a/opends/src/server/org/opends/server/loggers/RotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/RotationPolicy.java
@@ -22,6 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2014 ForgeRock AS
*/
package org.opends.server.loggers;
@@ -60,14 +61,13 @@
/**
- * This method indicates if the log file should be
- * rotated or not.
+ * This method indicates if the log file should be rotated or not.
*
- * @param writer The multi file writer writing the file to be
- * checked.
+ * @param writer
+ * the file writer to be checked.
* @return true if the log file should be rotated, false otherwise.
*/
- public boolean rotateFile(MultifileTextWriter writer);
+ public boolean rotateFile(RotatableLogFile writer);
}
diff --git a/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java b/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java
index fc8a213..198bb6d 100644
--- a/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java
@@ -22,6 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2014 ForgeRock AS
*/
package org.opends.server.loggers;
import org.opends.messages.Message;
@@ -97,7 +98,7 @@
* @param writer The multi file text writer writing the log file.
* @return true if the file needs to be rotated, false otherwise.
*/
- public boolean rotateFile(MultifileTextWriter writer)
+ public boolean rotateFile(RotatableLogFile writer)
{
long fileSize = writer.getBytesWritten();
diff --git a/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java b/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java
index 3d14e63..a7b50c2 100644
--- a/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java
@@ -22,6 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2014 ForgeRock AS
*/
package org.opends.server.loggers;
import org.opends.messages.Message;
@@ -90,7 +91,7 @@
* @param writer The mutli file text writer written the log file.
* @return true if the file should be rotated, false otherwise.
*/
- public boolean rotateFile(MultifileTextWriter writer)
+ public boolean rotateFile(RotatableLogFile writer)
{
long currInterval = TimeThread.getTime() -
writer.getLastRotationTime().getTimeInMillis();
diff --git a/opends/src/server/org/opends/server/replication/common/CSN.java b/opends/src/server/org/opends/server/replication/common/CSN.java
index 0460b7e..f31e8f0 100644
--- a/opends/src/server/org/opends/server/replication/common/CSN.java
+++ b/opends/src/server/org/opends/server/replication/common/CSN.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.common;
@@ -61,6 +61,9 @@
*/
public static final int STRING_ENCODING_LENGTH = 28;
+ /** The maximum possible value for a CSN. */
+ public static final CSN MAX_CSN_VALUE = new CSN(Long.MAX_VALUE, Integer.MAX_VALUE, Short.MAX_VALUE);
+
private static final long serialVersionUID = -8802722277749190740L;
private final long timeStamp;
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index 53ee20e..9a91ff9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
@@ -38,10 +39,11 @@
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
+import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
- * Logfile-based implementation of a ChangeNumberIndexDB.
+ * Implementation of a ChangeNumberIndexDB with a log.
* <p>
* This class publishes some monitoring information below <code>
* cn=monitor</code>.
@@ -57,7 +59,7 @@
static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
/** The log in which records are persisted. */
- private final LogFile<Long, ChangeNumberIndexRecord> logFile;
+ private final Log<Long, ChangeNumberIndexRecord> log;
/**
* The newest changenumber stored in the DB. It is used to avoid purging the
@@ -92,7 +94,7 @@
*/
FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
{
- logFile = replicationEnv.getOrCreateCNIndexDB();
+ log = replicationEnv.getOrCreateCNIndexDB();
final ChangeNumberIndexRecord newestRecord = readLastRecord();
newestChangeNumber = getChangeNumber(newestRecord);
// initialization of the lastGeneratedChangeNumber from the DB content
@@ -106,13 +108,13 @@
private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
{
- final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord();
+ final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
return record == null ? null : record.getValue();
}
private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
{
- final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord();
+ final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
return record == null ? null : record.getValue();
}
@@ -132,7 +134,7 @@
final long changeNumber = nextChangeNumber();
final ChangeNumberIndexRecord newRecord =
new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN());
- logFile.addRecord(newRecord.getChangeNumber(), newRecord);
+ log.append(Record.from(newRecord.getChangeNumber(), newRecord));
newestChangeNumber = changeNumber;
if (debugEnabled())
@@ -177,7 +179,7 @@
*/
long count() throws ChangelogException
{
- return logFile.getNumberOfRecords();
+ return log.getNumberOfRecords();
}
/**
@@ -197,7 +199,7 @@
@Override
public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
{
- return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber));
+ return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
}
/**
@@ -207,7 +209,7 @@
{
if (shutdown.compareAndSet(false, true))
{
- logFile.close();
+ log.close();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
}
@@ -228,11 +230,8 @@
{
return null;
}
-
- // TODO : no purge implemented yet as implementation is based on a single-file log.
- // The purge must be implemented once we handle a log with multiple files.
- // The purge will only delete whole files.
- return null;
+ final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
+ return record != null ? record.getValue().getCSN() : null;
}
/**
@@ -321,7 +320,7 @@
*/
public void clear() throws ChangelogException
{
- logFile.clear();
+ log.clear();
newestChangeNumber = NO_KEY;
}
@@ -331,15 +330,16 @@
private static final byte STRING_SEPARATOR = 0;
@Override
- public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record)
+ public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
{
+ final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
return new ByteStringBuilder()
- .append(changeNumber)
- .append(record.getPreviousCookie())
+ .append(record.getKey())
+ .append(cnIndexRecord.getPreviousCookie())
.append(STRING_SEPARATOR)
- .append(record.getBaseDN().toString())
+ .append(cnIndexRecord.getBaseDN().toString())
.append(STRING_SEPARATOR)
- .append(record.getCSN().toByteString()).toByteString();
+ .append(cnIndexRecord.getCSN().toByteString()).toByteString();
}
@Override
@@ -378,6 +378,35 @@
}
return length;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Long decodeKeyFromString(String key) throws ChangelogException
+ {
+ try
+ {
+ return Long.valueOf(key);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String encodeKeyToString(Long key)
+ {
+ return key.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Long getMaxKey()
+ {
+ return Long.MAX_VALUE;
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index e2b4f08..36e8a7a 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -49,13 +49,12 @@
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
-
import com.forgerock.opendj.util.Pair;
-import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
+import static org.opends.messages.ReplicationMessages.*;
/**
* Log file implementation of the ChangelogDB interface.
@@ -106,10 +105,11 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
+
private final AtomicBoolean shutdown = new AtomicBoolean();
static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
- new FileReplicaDBCursor(new LogFile.EmptyLogCursor<CSN, UpdateMsg>(), null);
+ new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null);
/**
* Creates a new changelog DB.
@@ -122,10 +122,10 @@
* if a problem occurs opening the supplied directory
*/
public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
- throws ConfigException
+ throws ConfigException
{
- this.config = config;
this.replicationServer = replicationServer;
+ this.config = config;
this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 1526830..0525996 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -43,7 +43,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
@@ -92,7 +92,7 @@
private final AtomicBoolean shutdown = new AtomicBoolean(false);
/** The log in which records are persisted. */
- private final LogFile<CSN, UpdateMsg> logFile;
+ private final Log<CSN, UpdateMsg> log;
/**
* Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -133,7 +133,7 @@
this.baseDN = baseDN;
this.replicationServer = replicationServer;
this.replicationEnv = replicationEnv;
- this.logFile = createLogFile(replicationEnv);
+ this.log = createLog(replicationEnv);
this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -142,17 +142,17 @@
private CSN readOldestCSN() throws ChangelogException
{
- final Record<CSN, UpdateMsg> record = logFile.getOldestRecord();
+ final Record<CSN, UpdateMsg> record = log.getOldestRecord();
return record == null ? null : record.getKey();
}
private CSN readNewestCSN() throws ChangelogException
{
- final Record<CSN, UpdateMsg> record = logFile.getNewestRecord();
+ final Record<CSN, UpdateMsg> record = log.getNewestRecord();
return record == null ? null : record.getKey();
}
- private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException
+ private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
{
final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
@@ -174,7 +174,7 @@
ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
.toString(), String.valueOf(baseDN), String.valueOf(serverId)));
}
- logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg));
+ log.append(Record.from(updateMsg.getCSN(), updateMsg));
final CSNLimits limits = csnLimits;
final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
final boolean updateOld = limits.oldestCSN == null;
@@ -239,7 +239,7 @@
*/
DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
{
- LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN);
+ RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
return new FileReplicaDBCursor(cursor, startAfterCSN);
}
@@ -250,7 +250,7 @@
{
if (shutdown.compareAndSet(false, true))
{
- logFile.close();
+ log.close();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
}
@@ -270,10 +270,11 @@
{
return;
}
-
- // TODO : no purge implemented yet, as we have a single-file log.
- // The purge must be implemented once we handle a log with multiple files.
- // The purge will only delete whole files.
+ final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
+ if (oldestRecord != null)
+ {
+ csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
+ }
}
/**
@@ -346,7 +347,7 @@
void clear() throws ChangelogException
{
// Remove all persisted data and reset generationId to default value
- logFile.clear();
+ log.clear();
replicationEnv.resetGenerationId(baseDN);
csnLimits = new CSNLimits(null, null);
@@ -363,7 +364,7 @@
*/
long getNumberRecords() throws ChangelogException
{
- return logFile.getNumberOfRecords();
+ return log.getNumberOfRecords();
}
/** Parser of records persisted in the ReplicaDB log. */
@@ -372,8 +373,9 @@
private static final DebugTracer TRACER = getTracer();
@Override
- public ByteString encodeRecord(CSN key, UpdateMsg message)
+ public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
{
+ final UpdateMsg message = record.getValue();
try
{
return ByteString.wrap(message.getBytes());
@@ -400,6 +402,27 @@
throw new DecodingException(e);
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public CSN decodeKeyFromString(String key) throws ChangelogException
+ {
+ return new CSN(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String encodeKeyToString(CSN key)
+ {
+ return key.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CSN getMaxKey()
+ {
+ return CSN.MAX_CSN_VALUE;
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
index 3f3a2fa..e067b6a 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -29,7 +29,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
/**
* A cursor on ReplicaDB.
@@ -49,7 +49,7 @@
{
/** The underlying cursor. */
- private final LogCursor<CSN, UpdateMsg> cursor;
+ private final RepositionableCursor<CSN, UpdateMsg> cursor;
/** The next record to return. */
private Record<CSN, UpdateMsg> nextRecord;
@@ -66,7 +66,7 @@
* The CSN to use as a start point (excluded from cursor, the lowest
* CSN higher than this CSN is used as the real start point).
*/
- FileReplicaDBCursor(LogCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
+ FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
this.cursor = cursor;
this.lastNonNullCurrentCSN = startAfterCSN;
}
@@ -90,7 +90,6 @@
else
{
// Exhausted cursor must be able to reinitialize itself
- cursor.rewind();
cursor.positionTo(lastNonNullCurrentCSN, true);
nextRecord = cursor.getRecord();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
new file mode 100644
index 0000000..9667a56
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -0,0 +1,1091 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.opends.messages.ReplicationMessages.*;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.forgerock.util.Reject;
+import org.opends.server.loggers.ErrorLogger;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A multi-file log that features appending key-value records and reading them
+ * using a {@code DBCursor}.
+ * The records must be appended to the log in ascending order of the keys.
+ * <p>
+ * A log is defined for a path - the log path - and contains one to many log files:
+ * <ul>
+ * <li>it has always at least one log file, the head log file, named "head.log",
+ * which is used to append the records.</li>
+ * <li>it may have from zero to many read-only log files, which are named after
+ * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively
+ * the string representation of lowest and highest key present in the log file.</li>
+ * </ul>
+ * A read-only log file is created each time the head log file has reached the
+ * maximum size limit. The head log file is then rotated to the read-only file and a
+ * new empty head log file is opened. There is no limit on the number of read-only
+ * files, but they can be purged.
+ * <p>
+ * A log is obtained using the {@code Log.openLog()} method and must always be
+ * released using the {@code close()} method.
+ * <p>
+ * Usage example:
+ * <pre>
+ * {@code
+ * Log<K, V> log = null;
+ * try
+ * {
+ * log = Log.openLog(logPath, parser, maxFileSize);
+ * log.append(key, value);
+ * DBCursor<K, V> cursor = log.getCursor(someKey);
+ * // use cursor, then close cursor
+ * }
+ * finally
+ * {
+ * log.close();
+ * }
+ * }
+ * </pre>
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+final class Log<K extends Comparable<K>, V> implements Closeable
+{
+ private static final DebugTracer TRACER = getTracer();
+
+ private static final String LOG_FILE_SUFFIX = ".log";
+
+ static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
+
+ private static final String LOG_FILE_NAME_SEPARATOR = "_";
+
+ private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) &&
+ !file.getName().equals(HEAD_LOG_FILE_NAME);
+ }
+ };
+
+ /** Map that holds the unique log instance for each log path. */
+ private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>();
+
+ /**
+ * The number of references on this log instance. It is incremented each time
+ * a log is opened on the same log path. The log is effectively closed only
+ * when the {@code close()} method is called and this value is at most 1.
+ */
+ private int referenceCount;
+
+ /** The path of directory for this log, where log files are stored. */
+ private final File logPath;
+
+ /** The parser used for encoding/decoding of records. */
+ private final RecordParser<K, V> recordParser;
+
+ /**
+ * Indicates if this log is closed. When the log is closed, all methods return
+ * immediately with no effect.
+ */
+ private boolean isClosed;
+
+ /**
+ * The log files contained in this log, ordered by key.
+ * <p>
+ * The head log file is always present and is associated with the maximum
+ * possible key, given by the record parser.
+ * <p>
+ * The read-only log files are associated with the highest key they contain.
+ */
+ private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>();
+
+ /**
+ * The list of non-empty cursors opened on this log. Opened cursors may have
+ * to be updated when rotating the head log file.
+ */
+ private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
+
+ /**
+ * A log file is rotated once it has exceeded this size limit. The log file can have
+ * a size much larger than this limit if the last record written has a huge size.
+ *
+ * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+ * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
+ */
+ private final long sizeLimitPerLogFileInBytes;
+
+ /**
+ * The exclusive lock used for writes and lifecycle operations on this log:
+ * initialize, clear, sync and close.
+ */
+ private final Lock exclusiveLock;
+
+ /**
+ * The shared lock used for reads and cursor operations on this log.
+ */
+ private final Lock sharedLock;
+
+ /**
+ * Open a log with the provided log path, record parser and maximum size per
+ * log file.
+ * <p>
+ * If no log exists for the provided path, a new one is created.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param logPath
+ * Path of the log.
+ * @param parser
+ * Parser for encoding/decoding of records.
+ * @param sizeLimitPerFileInBytes
+ * Limit in bytes before rotating the head log file of the log.
+ * @return a log
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
+ final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+ {
+ Reject.ifNull(logPath, parser);
+ @SuppressWarnings("unchecked")
+ Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
+ if (log == null)
+ {
+ log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+ logsCache.put(logPath, log);
+ }
+ else
+ {
+ // TODO : check that parser and size limit are compatible with the existing one,
+ // and issue a warning if it is not the case
+ log.referenceCount++;
+ }
+ return log;
+ }
+
+ /**
+ * Release a reference to the log corresponding to provided path. The log is
+ * closed if this is the last reference.
+ */
+ private static synchronized void releaseLog(final File logPath)
+ {
+ Log<?, ?> log = logsCache.get(logPath);
+ if (log == null)
+ {
+ // this should never happen
+ ErrorLogger.logError(
+ ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
+ return;
+ }
+ if (log.referenceCount > 1)
+ {
+ log.referenceCount--;
+ }
+ else
+ {
+ log.doClose();
+ logsCache.remove(logPath);
+ }
+ }
+
+ /**
+ * Creates a new log.
+ *
+ * @param logPath
+ * The directory path of the log.
+ * @param parser
+ * Parser of records.
+ * @param sizeLimitPerFile
+ * Limit in bytes before rotating a log file.
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
+ throws ChangelogException
+ {
+ this.logPath = logPath;
+ this.recordParser = parser;
+ this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+ this.referenceCount = 1;
+
+ final ReadWriteLock lock = new ReentrantReadWriteLock(false);
+ this.exclusiveLock = lock.writeLock();
+ this.sharedLock = lock.readLock();
+
+ createOrOpenLogFiles();
+ }
+
+ /** Create or open log files used by this log. */
+ private void createOrOpenLogFiles() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ createRootDirIfNotExists();
+ openHeadLogFile();
+ for (final File file : getReadOnlyLogFiles())
+ {
+ openReadOnlyLogFile(file);
+ }
+ isClosed = false;
+ }
+ catch (ChangelogException e)
+ {
+ // ensure all log files opened at this point are closed
+ close();
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ private File[] getReadOnlyLogFiles() throws ChangelogException
+ {
+ File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
+ if (files == null)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
+ }
+ return files;
+ }
+
+ private void createRootDirIfNotExists() throws ChangelogException
+ {
+ if (!logPath.exists())
+ {
+ if (!logPath.mkdirs())
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
+ }
+ }
+ }
+
+ /**
+ * Returns the path of this log.
+ *
+ * @return the path of this log directory
+ */
+ public File getPath()
+ {
+ return logPath;
+ }
+
+ /**
+ * Add the provided record at the end of this log.
+ * <p>
+ * In order to ensure that record is written out of buffers and persisted
+ * to file system, it is necessary to explicitely call the
+ * {@code syncToFileSystem()} method.
+ *
+ * @param record
+ * The record to add.
+ * @throws ChangelogException
+ * If the record can't be added to the log.
+ */
+ public void append(final Record<K, V> record) throws ChangelogException
+ {
+ // If this exclusive lock happens to be a bottleneck :
+ // 1. use a shared lock for appending the record first
+ // 2. switch to an exclusive lock only if rotation is needed
+ // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ LogFile<K, V> headLogFile = getHeadLogFile();
+ if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+ {
+ ErrorLogger.logError(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+
+ rotateHeadLogFile();
+ headLogFile = getHeadLogFile();
+ }
+ headLogFile.append(record);
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /**
+ * Synchronize all records added with the file system, ensuring that records
+ * are effectively persisted.
+ * <p>
+ * After a successful call to this method, it is guaranteed that all records
+ * added to the log are persisted to the file system.
+ *
+ * @throws ChangelogException
+ * If the synchronization fails.
+ */
+ public void syncToFileSystem() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ getHeadLogFile().syncToFileSystem();
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the first position.
+ * <p>
+ * The returned cursor initially points to record corresponding to the first
+ * key, that is {@code cursor.getRecord()} is equals to the record
+ * corresponding to the first key before any call to {@code cursor.next()}
+ * method.
+ *
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public RepositionableCursor<K, V> getCursor() throws ChangelogException
+ {
+ LogCursor<K, V> cursor = null;
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return new EmptyLogCursor<K, V>();
+ }
+ cursor = new LogCursor<K, V>(this);
+ cursor.positionTo(null, false);
+ registerCursor(cursor);
+ return cursor;
+ }
+ catch (ChangelogException e)
+ {
+ StaticUtils.close(cursor);
+ throw e;
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the position defined by the provided key.
+ * <p>
+ * The returned cursor initially points to record corresponding to the key,
+ * that is {@code cursor.getRecord()} is equals to the record corresponding to
+ * the key before any call to {@code cursor.next()} method.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, cursor will point at the first record of the log.
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
+ {
+ return getCursor(key, false);
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the position defined by the smallest key that is higher than
+ * the provided key.
+ * <p>
+ * The returned cursor initially points to record corresponding to the key
+ * found, that is {@code cursor.getRecord()} is equals to the record
+ * corresponding to the key found before any call to {@code cursor.next()}
+ * method.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, cursor will point at the first record of the log.
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public RepositionableCursor<K, V> getNearestCursor(final K key) throws ChangelogException
+ {
+ return getCursor(key, true);
+ }
+
+ /** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */
+ private RepositionableCursor<K, V> getCursor(final K key, boolean findNearest) throws ChangelogException
+ {
+ if (key == null)
+ {
+ return getCursor();
+ }
+ LogCursor<K, V> cursor = null;
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return new EmptyLogCursor<K, V>();
+ }
+ cursor = new LogCursor<K, V>(this);
+ final boolean isFound = cursor.positionTo(key, findNearest);
+ // for nearest case, it is ok if the target is not found
+ if (isFound || findNearest)
+ {
+ registerCursor(cursor);
+ return cursor;
+ }
+ else
+ {
+ StaticUtils.close(cursor);
+ return new EmptyLogCursor<K, V>();
+ }
+ }
+ catch (ChangelogException e)
+ {
+ StaticUtils.close(cursor);
+ throw e;
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+
+ /**
+ * Returns the oldest (first) record from this log.
+ *
+ * @return the oldest record, which may be {@code null} if there is no record
+ * in the log.
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ public Record<K, V> getOldestRecord() throws ChangelogException
+ {
+ return getOldestLogFile().getOldestRecord();
+ }
+
+
+ /**
+ * Returns the newest (last) record from this log.
+ *
+ * @return the newest record, which may be {@code null}
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ public Record<K, V> getNewestRecord() throws ChangelogException
+ {
+ return getHeadLogFile().getNewestRecord();
+ }
+
+ /**
+ * Returns the number of records in the log.
+ *
+ * @return the number of records
+ * @throws ChangelogException
+ * If a problem occurs.
+ */
+ public long getNumberOfRecords() throws ChangelogException
+ {
+ long count = 0;
+ for (final LogFile<K, V> logFile : logFiles.values())
+ {
+ count += logFile.getNumberOfRecords();
+ }
+ return count;
+ }
+
+ /**
+ * Purge the log up to and excluding the provided key.
+ *
+ * @param purgeKey
+ * the key up to which purging must happen
+ * @return the oldest non purged record, or {@code null}
+ * if no record was purged
+ * @throws ChangelogException
+ * if a database problem occurs.
+ */
+ public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return null;
+ }
+ final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey);
+ if (logFilesToPurge.isEmpty())
+ {
+ return null;
+ }
+ final List<String> undeletableFiles = new ArrayList<String>();
+ final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
+ while (entriesToPurge.hasNext())
+ {
+ final LogFile<K, V> logFile = entriesToPurge.next().getValue();
+ try
+ {
+ logFile.close();
+ logFile.delete();
+ entriesToPurge.remove();
+ }
+ catch (ChangelogException e)
+ {
+ // The deletion of log file on file system has failed
+ undeletableFiles.add(logFile.getFile().getPath());
+ }
+ }
+ if (!undeletableFiles.isEmpty())
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
+ StaticUtils.listToString(undeletableFiles, ", ")));
+ }
+ return getOldestRecord();
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+
+ }
+
+ /**
+ * Empties the log, discarding all records it contains.
+ *
+ * @throws ChangelogException
+ * If cursors are opened on this log, or if a problem occurs during
+ * clearing operation.
+ */
+ public void clear() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ if (!openCursors.isEmpty())
+ {
+ // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because
+ // there is one open cursor when clearing.
+ // Switch to logging until this issue is solved
+ // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it",
+ // logPath.getPath(), openCursors.size()));
+ ErrorLogger.logError(
+ ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size()));
+ }
+
+ // delete all log files
+ final List<String> undeletableFiles = new ArrayList<String>();
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ try
+ {
+ logFile.close();
+ logFile.delete();
+ }
+ catch (ChangelogException e)
+ {
+ undeletableFiles.add(logFile.getFile().getPath());
+ }
+ }
+ if (!undeletableFiles.isEmpty())
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
+ StaticUtils.listToString(undeletableFiles, ", ")));
+ }
+ logFiles.clear();
+
+ // recreate an empty head log file
+ openHeadLogFile();
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ releaseLog(logPath);
+ }
+
+ /** Effectively close this log. */
+ private void doClose()
+ {
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ if (!openCursors.isEmpty())
+ {
+ ErrorLogger.logError(
+ ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
+ }
+ StaticUtils.close(logFiles.values());
+ isClosed = true;
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ private LogFile<K, V> getHeadLogFile()
+ {
+ return logFiles.lastEntry().getValue();
+ }
+
+ private LogFile<K, V> getOldestLogFile()
+ {
+ return logFiles.firstEntry().getValue();
+ }
+
+ /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */
+ private void rotateHeadLogFile() throws ChangelogException
+ {
+ final LogFile<K, V> headLogFile = getHeadLogFile();
+ final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
+ headLogFile.close();
+ renameHeadLogFileTo(readOnlyLogFile);
+
+ openHeadLogFile();
+ openReadOnlyLogFile(readOnlyLogFile);
+ updateCursorsOpenedOnHead();
+ }
+
+ private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
+ {
+ final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
+ try
+ {
+ StaticUtils.renameFile(headLogFile, rotatedLogFile);
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e);
+ }
+ }
+
+ /**
+ * Returns the key bounds for the provided log file.
+ *
+ * @return the pair of (lowest key, highest key) that correspond to records
+ * stored in the corresponding log file.
+ * @throws ChangelogException
+ * if an error occurs while retrieving the keys
+ */
+ private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException
+ {
+ try
+ {
+ final String name = logFile.getFile().getName();
+ final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
+ .split(LOG_FILE_NAME_SEPARATOR);
+ return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
+ }
+ }
+
+ /**
+ * Returns the file name to use for the read-only version of the provided
+ * log file.
+ * <p>
+ * The file name is based on the lowest and highest key in the log file.
+ *
+ * @return the name to use for the read-only version of the log file
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException
+ {
+ final K lowestKey = logFile.getOldestRecord().getKey();
+ final K highestKey = logFile.getNewestRecord().getKey();
+ return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
+ + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
+ }
+
+ /** Update the open cursors after a rotation of the head log file. */
+ private void updateCursorsOpenedOnHead() throws ChangelogException
+ {
+ for (LogCursor<K, V> cursor : openCursors)
+ {
+ final CursorState<K, V> state = cursor.getState();
+ // Need to update the cursor only if it is pointing to the head log file
+ if (isHeadLogFile(state.logFile))
+ {
+ updateOpenCursor(cursor, state);
+ }
+ }
+ }
+
+ /**
+ * Update the provided open cursor with the provided state.
+ * <p>
+ * The cursor must report the previous state on the head log file to the same
+ * state (position in file, current record) in the read-only log file just created.
+ */
+ private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException
+ {
+ final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
+ final LogFile<K, V> logFile = findLogFileFor(previousKey);
+ cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record));
+ }
+
+ private void openHeadLogFile() throws ChangelogException
+ {
+ logFiles.put(recordParser.getMaxKey(),
+ LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser));
+ }
+
+ private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
+ {
+ final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
+ final Pair<K, K> bounds = getKeyBounds(logFile);
+ logFiles.put(bounds.getSecond(), logFile);
+ }
+
+ private void registerCursor(final LogCursor<K, V> cursor)
+ {
+ openCursors.add(cursor);
+ }
+
+ private void unregisterCursor(final LogCursor<K, V> cursor)
+ {
+ openCursors.remove(cursor);
+ }
+
+ /**
+ * Returns the log file that is just after the provided log file wrt the order
+ * defined on keys, or {@code null} if the provided log file is the last one
+ * (the head log file).
+ */
+ private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
+ {
+ if (isHeadLogFile(currentLogFile))
+ {
+ return null;
+ }
+ final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+ return logFiles.higherEntry(bounds.getSecond()).getValue();
+ }
+
+ private boolean isHeadLogFile(final LogFile<K, V> logFile)
+ {
+ return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
+ }
+
+ /** Returns the log file that should contain the provided key. */
+ private LogFile<K, V> findLogFileFor(final K key)
+ {
+ if (key == null || logFiles.lowerKey(key) == null)
+ {
+ return getOldestLogFile();
+ }
+ return logFiles.ceilingEntry(key).getValue();
+ }
+
+ /**
+ * Represents a cursor than can be repositioned on a given key.
+ */
+ static interface RepositionableCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+ {
+ /**
+ * Position the cursor to the record corresponding to the provided key or to
+ * the nearest key (the lowest key higher than the provided key).
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, use the key of the first record instead.
+ * @param findNearest
+ * If {@code true}, start position is the lowest key that is higher
+ * than the provided key, otherwise start position is the provided
+ * key.
+ * @return {@code true} if cursor is successfully positionned to the key or
+ * the nearest key, {@code false} otherwise.
+ * @throws ChangelogException
+ * If an error occurs when positioning cursor.
+ */
+ boolean positionTo(K key, boolean findNearest) throws ChangelogException;
+ }
+
+ /**
+ * Implements a cursor on the log.
+ * <p>
+ * The cursor initially points to a record, that is {@code cursor.getRecord()}
+ * is equals to the first record available from the cursor before any call to
+ * {@code cursor.next()} method.
+ * <p>
+ * The cursor uses the log shared lock to ensure reads are not done during a rotation.
+ */
+ private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+ {
+ private final Log<K, V> log;
+
+ private LogFile<K, V> currentLogFile;
+ private LogFileCursor<K, V> currentCursor;
+
+ /**
+ * Creates a cursor on the provided log.
+ *
+ * @param log
+ * The log on which the cursor read records.
+ * @throws ChangelogException
+ * If an error occurs when creating the cursor.
+ */
+ private LogCursor(final Log<K, V> log) throws ChangelogException
+ {
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Record<K, V> getRecord()
+ {
+ return currentCursor.getRecord();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ log.sharedLock.lock();
+ try
+ {
+ final boolean hasNext = currentCursor.next();
+ if (hasNext)
+ {
+ return true;
+ }
+ final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+ if (logFile != null)
+ {
+ switchToLogFile(logFile);
+ return true;
+ }
+ return false;
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ log.sharedLock.lock();
+ try
+ {
+ StaticUtils.close(currentCursor);
+ log.unregisterCursor(this);
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
+ {
+ log.sharedLock.lock();
+ try
+ {
+ final LogFile<K, V> logFile = log.findLogFileFor(key);
+ if (logFile != currentLogFile)
+ {
+ switchToLogFile(logFile);
+ }
+ if (key != null)
+ {
+ boolean isFound = currentCursor.positionTo(key, findNearest);
+ if (isFound && getRecord() == null)
+ {
+ // The key to position to may be in the next file, force the switch
+ isFound = next();
+ }
+ return isFound;
+ }
+ return true;
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /** Returns the state of this cursor. */
+ private CursorState<K, V> getState() throws ChangelogException
+ {
+ return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+ }
+
+ /** Reinitialize this cursor to the provided state. */
+ private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+ {
+ StaticUtils.close(currentCursor);
+ currentLogFile = cursorState.logFile;
+ currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+ }
+
+ /** Switch the cursor to the provided log file. */
+ private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException
+ {
+ StaticUtils.close(currentCursor);
+ currentLogFile = logFile;
+ currentCursor = currentLogFile.getCursor();
+ }
+ }
+
+ /** An empty cursor, that always return null records and false to {@code next()} method. */
+ static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+ {
+ /** {@inheritDoc} */
+ @Override
+ public Record<K,V> getRecord()
+ {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next()
+ {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
+ {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ // nothing to do
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "EmptyLogCursor";
+ }
+ }
+
+ /**
+ * Represents the state of a cursor.
+ * <p>
+ * The state is used to update a cursor when rotating the head log file : the
+ * state of cursor on head log file must be reported to the new read-only log
+ * file that is created when rotating.
+ */
+ private static class CursorState<K extends Comparable<K>, V>
+ {
+ /** The log file. */
+ private final LogFile<K, V> logFile;
+
+ /**
+ * The position of the reader on the log file. It is the offset from the
+ * beginning of the file, in bytes, at which the next read occurs.
+ */
+ private final long filePosition;
+
+ /** The record the cursor is pointing to. */
+ private final Record<K,V> record;
+
+ private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
+ {
+ this.logFile = logFile;
+ this.filePosition = filePosition;
+ this.record = record;
+ }
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index 005c8c7..f0bb7e9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -25,8 +25,8 @@
*/
package org.opends.server.replication.server.changelog.file;
+import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
import java.io.BufferedWriter;
import java.io.Closeable;
@@ -35,24 +35,30 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.forgerock.util.Reject;
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
-import static org.opends.messages.ReplicationMessages.*;
-
/**
- * A file-based log that allow to append key-value records and
- * read them using a {@code DBCursor}.
+ * A log file, containing part of a {@code Log}. The log file may be:
+ * <ul>
+ * <li>write-enabled : allowing to append key-value records and read records
+ * from cursors,</li>
+ * <li>read-only : allowing to read records from cursors.</li>
+ * </ul>
+ * <p>
+ * A log file is NOT intended to be used directly, but only has part of a
+ * {@code Log}. In particular, there is no concurrency management and no checks
+ * to ensure that log is not closed when performing any operation on it. Those
+ * are managed at the {@code Log} level.
*
* @param <K>
* Type of the key of a record, which must be comparable.
@@ -61,16 +67,9 @@
*/
final class LogFile<K extends Comparable<K>, V> implements Closeable
{
-
private static final DebugTracer TRACER = getTracer();
- // Non private for use in tests
- static final String LOG_FILE_NAME = "current.log";
-
- /** The path of directory that contains the log file. */
- private final File rootPath;
-
- /** The log file containing the records. */
+ /** The file containing the records. */
private final File logfile;
/** The parser of records, to convert bytes to record and record to bytes. */
@@ -79,30 +78,20 @@
/** The pool to obtain a reader on the log. */
private LogReaderPool readerPool;
- /** The writer on the log, which may be {@code null} if log is not write-enabled */
+ /**
+ * The writer on the log file, which may be {@code null} if log file is not
+ * write-enabled
+ */
private LogWriter writer;
/** Indicates if log is enabled for write. */
private final boolean isWriteEnabled;
- /** Indicates if the log is closed. */
- private volatile boolean isClosed;
-
- /** The exclusive lock used for wide changes on this log file : init, clear, sync and close. */
- private final Lock exclusiveLock;
-
- /**
- * The shared lock used for read, write and flush operations on this log file.
- * Write and flush operations can be shared because they're synchronized in the underlying writer.
- * Reads can be done safely when writing because partially written records are handled.
- */
- private final Lock sharedLock;
-
/**
* Creates a new log file.
*
- * @param rootPath
- * Path of root directory that contains the log file.
+ * @param logFilePath
+ * Path of the log file.
* @param parser
* Parser of records.
* @param isWriteEnabled
@@ -111,17 +100,13 @@
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- private LogFile(final File rootPath, final RecordParser<K, V> parser, boolean isWriteEnabled)
+ private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
throws ChangelogException
{
- this.rootPath = rootPath;
+ Reject.ifNull(logFilePath, parser);
+ this.logfile = logFilePath;
this.parser = parser;
this.isWriteEnabled = isWriteEnabled;
- this.logfile = new File(rootPath, LOG_FILE_NAME);
-
- final ReadWriteLock lock = new ReentrantReadWriteLock(false);
- this.exclusiveLock = lock.writeLock();
- this.sharedLock = lock.readLock();
initialize();
}
@@ -133,18 +118,18 @@
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
- * @param rootPath
- * Path of root directory that contains the log file.
+ * @param logFilePath
+ * Path of the log file.
* @param parser
* Parser of records.
* @return a read-only log file
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- public static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File rootPath,
+ static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
final RecordParser<K, V> parser) throws ChangelogException
{
- return new LogFile<K, V>(rootPath, parser, false);
+ return new LogFile<K, V>(logFilePath, parser, false);
}
/**
@@ -155,18 +140,18 @@
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
- * @param rootPath
- * Path of root directory that contains the log file.
+ * @param logFilePath
+ * Path of the log file.
* @param parser
* Parser of records.
* @return a write-enabled log file
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- public static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File rootPath,
+ static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
final RecordParser<K, V> parser) throws ChangelogException
{
- return new LogFile<K, V>(rootPath, parser, true);
+ return new LogFile<K, V>(logFilePath, parser, true);
}
/**
@@ -180,89 +165,29 @@
*/
private void initialize() throws ChangelogException
{
- exclusiveLock.lock();
- try
+ createLogFileIfNotExists();
+ if (isWriteEnabled)
{
- createRootDirIfNotExists();
- createLogFileIfNotExists();
- isClosed = false;
- if (isWriteEnabled)
- {
- writer = LogWriter.acquireWriter(logfile);
- }
- readerPool = new LogReaderPool(logfile);
+ writer = new LogWriter(logfile);
}
- finally
- {
- exclusiveLock.unlock();
- }
+ readerPool = new LogReaderPool(logfile);
}
/**
- * Returns the name of this log.
+ * Returns the file containing the records.
*
- * @return the name, which corresponds to the directory containing the log
+ * @return the file
*/
- public String getName()
+ File getFile()
{
- return logfile.getParent().toString();
- }
-
- /**
- * Empties the log, discarding all records it contains.
- * <p>
- * This method should not be called with open cursors or
- * when multiple instances of the log are opened.
- *
- * @throws ChangelogException
- * If a problem occurs.
- */
- public void clear() throws ChangelogException
- {
- checkLogIsEnabledForWrite();
-
- exclusiveLock.lock();
- try
- {
- if (isClosed)
- {
- return;
- }
- close();
- final boolean isDeleted = logfile.delete();
- if (!isDeleted)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(logfile.getPath()));
- }
- initialize();
- }
- catch (Exception e)
- {
- throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(getName(), stackTraceToSingleLineString(e)));
- }
- finally
- {
- exclusiveLock.unlock();
- }
+ return logfile;
}
private void checkLogIsEnabledForWrite() throws ChangelogException
{
if (!isWriteEnabled)
{
- throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(rootPath.getPath()));
- }
- }
-
- private void createRootDirIfNotExists() throws ChangelogException
- {
- if (!rootPath.exists())
- {
- final boolean created = rootPath.mkdirs();
- if (!created)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(rootPath.getPath()));
- }
+ throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
}
}
@@ -282,25 +207,6 @@
}
/**
- * Add a record at the end of this log from the provided key and value.
- * <p>
- * In order to ensure that record is written out of buffers and persisted
- * to file system, it is necessary to explicitely call the
- * {@code syncToFileSystem()} method.
- *
- * @param key
- * The key of the record.
- * @param value
- * The value of the record.
- * @throws ChangelogException
- * If the record can't be added to the log.
- */
- public void addRecord(final K key, final V value) throws ChangelogException
- {
- addRecord(Record.from(key, value));
- }
-
- /**
* Add the provided record at the end of this log.
* <p>
* In order to ensure that record is written out of buffers and persisted
@@ -312,33 +218,22 @@
* @throws ChangelogException
* If the record can't be added to the log.
*/
- public void addRecord(final Record<K, V> record) throws ChangelogException
+ void append(final Record<K, V> record) throws ChangelogException
{
checkLogIsEnabledForWrite();
-
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return;
- }
writer.write(encodeRecord(record));
- writer.flush();
}
catch (IOException e)
{
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getName()), e);
- }
- finally
- {
- sharedLock.unlock();
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
}
}
private ByteString encodeRecord(final Record<K, V> record)
{
- final ByteString data = parser.encodeRecord(record.getKey(), record.getValue());
+ final ByteString data = parser.encodeRecord(record);
return new ByteStringBuilder()
.append(data.length())
.append(data)
@@ -346,14 +241,14 @@
}
/**
- * Dump this log as text file, intended for debugging purpose only.
+ * Dump this log file as a text file, intended for debugging purpose only.
*
* @param dumpFile
* File that will contains log records using a human-readable format
* @throws ChangelogException
* If an error occurs during dump
*/
- public void dumpAsTextFile(File dumpFile) throws ChangelogException
+ void dumpAsTextFile(File dumpFile) throws ChangelogException
{
DBCursor<Record<K, V>> cursor = getCursor();
BufferedWriter textWriter = null;
@@ -364,7 +259,7 @@
{
Record<K, V> record = cursor.getRecord();
textWriter.write("key=" + record.getKey());
- textWriter.write(" -- ");
+ textWriter.write(" | ");
textWriter.write("value=" + record.getValue());
textWriter.write('\n');
cursor.next();
@@ -374,7 +269,7 @@
{
// No I18N needed, used for debugging purpose only
throw new ChangelogException(
- Message.raw("Error when dumping content of log '%s' in target file : '%s'", getName(), dumpFile), e);
+ Message.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), e);
}
finally
{
@@ -392,20 +287,16 @@
* @throws ChangelogException
* If the synchronization fails.
*/
- public void syncToFileSystem() throws ChangelogException
+ void syncToFileSystem() throws ChangelogException
{
- exclusiveLock.lock();
+ checkLogIsEnabledForWrite();
try
{
writer.sync();
}
catch (Exception e)
{
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getName()), e);
- }
- finally
- {
- exclusiveLock.unlock();
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
}
}
@@ -422,21 +313,9 @@
* @throws ChangelogException
* If the cursor can't be created.
*/
- public LogCursor<K, V> getCursor() throws ChangelogException
+ LogFileCursor<K, V> getCursor() throws ChangelogException
{
- sharedLock.lock();
- try
- {
- if (isClosed)
- {
- return new EmptyLogCursor<K, V>();
- }
- return new LogFileCursor<K, V>(this);
- }
- finally
- {
- sharedLock.unlock();
- }
+ return new LogFileCursor<K, V>(this);
}
/**
@@ -454,7 +333,7 @@
* @throws ChangelogException
* If the cursor can't be created.
*/
- public LogCursor<K, V> getCursor(final K key) throws ChangelogException
+ LogFileCursor<K, V> getCursor(final K key) throws ChangelogException
{
return getCursor(key, false);
}
@@ -476,13 +355,13 @@
* @throws ChangelogException
* If the cursor can't be created.
*/
- public LogCursor<K, V> getNearestCursor(final K key) throws ChangelogException
+ LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException
{
return getCursor(key, true);
}
/** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
- private LogCursor<K, V> getCursor(final K key, boolean findNearest)
+ private LogFileCursor<K, V> getCursor(final K key, boolean findNearest)
throws ChangelogException
{
if (key == null)
@@ -490,13 +369,8 @@
return getCursor();
}
LogFileCursor<K, V> cursor = null;
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return new EmptyLogCursor<K, V>();
- }
cursor = new LogFileCursor<K, V>(this);
cursor.positionTo(key, findNearest);
// if target is not found, cursor is positioned at end of stream
@@ -506,10 +380,22 @@
StaticUtils.close(cursor);
throw e;
}
- finally
- {
- sharedLock.unlock();
- }
+ }
+
+ /**
+ * Returns a cursor initialised to the provided record and position in file.
+ *
+ * @param record
+ * The initial record this cursor points on
+ * @param position
+ * The file position this cursor points on
+ * @return the cursor
+ * @throws ChangelogException
+ * If a problem occurs while creating the cursor.
+ */
+ LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
+ {
+ return new LogFileCursor<K, V>(this, record, position);
}
/**
@@ -520,7 +406,7 @@
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
- public Record<K, V> getOldestRecord() throws ChangelogException
+ Record<K, V> getOldestRecord() throws ChangelogException
{
DBCursor<Record<K, V>> cursor = null;
try
@@ -541,7 +427,7 @@
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
- public Record<K, V> getNewestRecord() throws ChangelogException
+ Record<K, V> getNewestRecord() throws ChangelogException
{
// TODO : need a more efficient way to retrieve it
DBCursor<Record<K, V>> cursor = null;
@@ -597,45 +483,58 @@
/** {@inheritDoc} */
public void close()
{
- exclusiveLock.lock();
- try
+ if (isWriteEnabled)
{
- if (isClosed)
+ try
{
- return;
+ syncToFileSystem();
}
+ catch (ChangelogException e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ writer.close();
+ }
+ readerPool.shutdown();
+ }
- if (isWriteEnabled)
- {
- try
- {
- syncToFileSystem();
- }
- catch (ChangelogException e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- writer.close();
- }
- readerPool.shutdown();
- isClosed = true;
- }
- finally
+ /**
+ * Delete this log file (file is physically removed). Should be called only
+ * when log file is closed.
+ *
+ * @throws ChangelogException
+ * If log file can't be deleted.
+ */
+ void delete() throws ChangelogException
+ {
+ final boolean isDeleted = logfile.delete();
+ if (!isDeleted)
{
- exclusiveLock.unlock();
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
}
}
+ /**
+ * Return the size of this log file in bytes.
+ *
+ * @return the size of log file
+ */
+ long getSizeInBytes()
+ {
+ return writer.getBytesWritten();
+ }
+
+ /** The path of this log file as a String. */
+ private String getPath()
+ {
+ return logfile.getPath();
+ }
+
/** Read a record from the provided reader. */
private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
{
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return null;
- }
final ByteString recordData = readEncodedRecord(reader);
return recordData != null ? parser.decodeRecord(recordData) : null;
}
@@ -643,10 +542,6 @@
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
}
- finally
- {
- sharedLock.unlock();
- }
}
private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
@@ -672,26 +567,17 @@
}
}
- /** Seek to provided position on the provided reader. */
+ /** Seek to given position on the provided reader. */
private void seek(RandomAccessFile reader, long position) throws ChangelogException
{
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return;
- }
reader.seek(position);
}
catch (IOException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
}
- finally
- {
- sharedLock.unlock();
- }
}
/**
@@ -706,66 +592,42 @@
/** Release the provided reader. */
private void releaseReader(RandomAccessFile reader) {
- sharedLock.lock();
- try
- {
- if (isClosed)
- {
- return;
- }
- readerPool.release(reader);
- }
- finally
- {
- sharedLock.unlock();
- }
+ readerPool.release(reader);
}
- /**
- * A cursor on the log.
- */
- static interface LogCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode()
{
- /**
- * Position the cursor to the record corresponding to the provided key or to
- * the nearest key (the lowest key higher than the provided key).
- * <p>
- * The record is only searched forward. To search backward, it is first
- * necessary to call the {@code rewind()} method to start from beginning of
- * log file.
- *
- * @param key
- * Key to use as a start position for the cursor. If key is
- * {@code null}, use the key of the first record instead.
- * @param findNearest
- * If {@code true}, start position is the lowest key that is higher
- * than the provided key, otherwise start position is the provided
- * key.
- * @return {@code true} if cursor is successfully positionned to the key or
- * the the nearest key, {@code false} otherwise.
- * @throws ChangelogException
- * If an error occurs when positioning cursor.
- */
- boolean positionTo(K key, boolean findNearest) throws ChangelogException;
+ return logfile.hashCode();
+ }
- /**
- * Rewind the cursor, positioning it to the beginning of the log file,
- * pointing to no record initially.
- *
- * @throws ChangelogException
- * If an error occurs when rewinding to zero.
- */
- void rewind() throws ChangelogException;
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object that)
+ {
+ if (this == that)
+ {
+ return true;
+ }
+ if (!(that instanceof LogFile))
+ {
+ return false;
+ }
+ final LogFile<?, ?> other = (LogFile<?, ?>) that;
+ return logfile.equals(other.logfile);
}
/**
- * Implements a cursor on the log.
+ * Implements a repositionable cursor on the log file.
* <p>
* The cursor initially points to a record, that is {@code cursor.getRecord()}
* is equals to the first record available from the cursor before any call to
* {@code cursor.next()} method.
*/
- private static final class LogFileCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
+ static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
{
/** The underlying log on which entries are read. */
private final LogFile<K, V> logFile;
@@ -784,7 +646,7 @@
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
- LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
+ private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
{
this.logFile = logFile;
this.reader = logFile.getReader();
@@ -800,17 +662,19 @@
}
}
- /** {@inheritDoc} */
- public String toString()
+ /**
+ * Creates a cursor on the provided log, initialised to the provided record and
+ * pointing to the provided file position.
+ * <p>
+ * Note: there is no check to ensure that provided record and file position are
+ * consistent. It is the responsability of the caller of this method.
+ */
+ private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
{
- return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
- }
-
- /** {@inheritDoc} */
- @Override
- public Record<K,V> getRecord()
- {
- return currentRecord;
+ this.logFile = logFile;
+ this.reader = logFile.getReader();
+ this.currentRecord = record;
+ logFile.seek(reader, filePosition);
}
/** {@inheritDoc} */
@@ -823,6 +687,13 @@
/** {@inheritDoc} */
@Override
+ public Record<K,V> getRecord()
+ {
+ return currentRecord;
+ }
+
+ /** {@inheritDoc} */
+ @Override
public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
do
{
@@ -848,64 +719,35 @@
/** {@inheritDoc} */
@Override
- public void rewind() throws ChangelogException
- {
- logFile.seek(reader, 0);
- currentRecord = null;
- }
-
- /** {@inheritDoc} */
- @Override
public void close()
{
logFile.releaseReader(reader);
}
- }
- /** An empty cursor, that always return null records and false to {@code next()} method. */
- static final class EmptyLogCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
- {
- /** {@inheritDoc} */
- @Override
- public Record<K,V> getRecord()
+ /**
+ * Returns the file position this cursor is pointing at.
+ *
+ * @return the position of reader on the log file
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ long getFilePosition() throws ChangelogException
{
- return null;
+ try
+ {
+ return reader.getFilePointer();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
+ }
}
/** {@inheritDoc} */
- @Override
- public boolean next()
- {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
- {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public void rewind() throws ChangelogException
- {
- // nothing to do
- }
-
- /** {@inheritDoc} */
- @Override
- public void close()
- {
- // nothing to do
- }
-
- /** {@inheritDoc} */
- @Override
public String toString()
{
- return "EmptyLogCursor";
+ return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
}
-
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
index 037c048..44ce685 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -25,18 +25,14 @@
*/
package org.opends.server.replication.server.changelog.file;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.SyncFailedException;
-import java.util.HashMap;
-import java.util.Map;
+import org.opends.server.loggers.MeteredStream;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.util.StaticUtils;
@@ -45,80 +41,38 @@
/**
* A writer on a log file.
- * <p>
- * The writer is cached in order to have a single writer per file in the JVM.
*/
class LogWriter extends OutputStream
{
- /** The cache of log writers. There is a single writer per file in the JVM. */
- private static final Map<File, LogWriter> logWritersCache = new HashMap<File, LogWriter>();
-
- /** The exclusive lock used to acquire or close a log writer. */
- private static final Object lock = new Object();
-
/** The file to write in. */
private final File file;
- /** The stream to write data in the file. */
- private final BufferedOutputStream stream;
+ /** The stream to write data in the file, capable of counting bytes written. */
+ private final MeteredStream stream;
/** The file descriptor on the file. */
private final FileDescriptor fileDescriptor;
- /** The number of references on this writer. */
- private int referenceCount;
-
/**
* Creates a writer on the provided file.
*
* @param file
* The file to write.
- * @param stream
- * The stream to write in the file.
- * @param fileDescriptor
- * The descriptor on the file.
+ * @throws ChangelogException
+ * If a problem occurs at creation.
*/
- private LogWriter(final File file, BufferedOutputStream stream, FileDescriptor fileDescriptor)
- throws ChangelogException
+ public LogWriter(final File file) throws ChangelogException
{
this.file = file;
- this.stream = stream;
- this.fileDescriptor = fileDescriptor;
- this.referenceCount = 1;
- }
-
- /**
- * Returns a log writer on the provided file, creating it if necessary.
- *
- * @param file
- * The log file to write in.
- * @return the log writer
- * @throws ChangelogException
- * If a problem occurs.
- */
- public static LogWriter acquireWriter(File file) throws ChangelogException
- {
- synchronized (lock)
+ try
{
- LogWriter logWriter = logWritersCache.get(file);
- if (logWriter == null)
- {
- try
- {
- final FileOutputStream stream = new FileOutputStream(file, true);
- logWriter = new LogWriter(file, new BufferedOutputStream(stream), stream.getFD());
- }
- catch (Exception e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
- }
- logWritersCache.put(file, logWriter);
- }
- else
- {
- logWriter.incrementRefCounter();
- }
- return logWriter;
+ FileOutputStream fos = new FileOutputStream(file, true);
+ this.stream = new MeteredStream(fos, file.length());
+ this.fileDescriptor = fos.getFD();
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
}
}
@@ -157,11 +111,14 @@
bs.copyTo(stream);
}
- /** {@inheritDoc} */
- @Override
- public void flush() throws IOException
+ /**
+ * Returns the number of bytes written in the underlying file.
+ *
+ * @return the number of bytes
+ */
+ public long getBytesWritten()
{
- stream.flush();
+ return stream.getBytesWritten();
}
/**
@@ -178,32 +135,7 @@
@Override
public void close()
{
- synchronized (lock)
- {
- LogWriter writer = logWritersCache.get(file);
- if (writer == null)
- {
- // writer is already closed
- return;
- }
- // counter == 0 should never happen
- if (referenceCount == 0 || referenceCount == 1)
- {
- StaticUtils.close(stream);
- logWritersCache.remove(file);
- referenceCount = 0;
- }
- else
- {
- referenceCount--;
- }
- }
+ StaticUtils.close(stream);
}
- private void incrementRefCounter()
- {
- referenceCount++;
- }
-
-
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java b/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
index 94e65de..7009636 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
@@ -25,6 +25,7 @@
*/
package org.opends.server.replication.server.changelog.file;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
/**
@@ -40,7 +41,6 @@
*/
interface RecordParser<K, V>
{
-
/**
* Decode a record from the provided byte array.
* <p>
@@ -57,16 +57,45 @@
Record<K, V> decodeRecord(ByteString data) throws DecodingException;
/**
- * Encode the provided key and value to a byte array.
+ * Encode the provided record to a byte array.
* <p>
* The returned array is intended to be stored as provided in the log file.
*
- * @param key
- * The key of the record.
- * @param value
- * The value of the record.
+ * @param record
+ * The record to encode.
* @return the bytes array representing the (key,value) record
*/
- ByteString encodeRecord(K key, V value);
+ ByteString encodeRecord(Record<K, V> record);
+
+ /**
+ * Read the key from the provided string.
+ *
+ * @param key
+ * The string representation of key, suitable for use in a filename,
+ * as written by the {@code encodeKeyToString()} method.
+ * @return the key
+ * @throws ChangelogException
+ * If key can't be read from the string.
+ */
+ K decodeKeyFromString(String key) throws ChangelogException;
+
+ /**
+ * Returns the provided key as a string that is suitable to be used in a
+ * filename.
+ *
+ * @param key
+ * The key of a record.
+ * @return a string encoding the key, unambiguously decodable to the original
+ * key, and suitable for use in a filename. The string should contain
+ * only ASCII characters and no space.
+ */
+ String encodeKeyToString(K key);
+
+ /**
+ * Returns a key that is guaranted to be always higher than any other key.
+ *
+ * @return the highest possible key
+ */
+ K getMaxKey();
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 78d4686..ee69dd7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -55,9 +55,7 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
-import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
import static org.opends.messages.ReplicationMessages.*;
/**
@@ -82,7 +80,9 @@
* id of the server. Each directory contains the log files for the given server
* id.</li>
* </ul>
- * All log files end with the ".log" suffix.
+ * All log files end with the ".log" suffix. Log files always include the "head.log"
+ * file and optionally zero to many read-only log files named after the lowest key
+ * and highest key present in the log file.
* <p>
* Layout example with two domains "o=test1" and "o=test2", each having server
* ids 22 and 33 :
@@ -91,25 +91,29 @@
* +---changelog
* | \---domains.state [contains mapping: 1 => "o=test1", 2 => "o=test2"]
* | \---changenumberindex
- * | \--- current.log
+ * | \--- head.log [contains last records written]
+ * | \--- 1_50.log [contains records with keys in interval [1, 50]]
* | \---1.domain
* | \---generation1.id
* | \---22.server
- * | \---current.log
+ * | \---head.log
* | \---33.server
- * | \---current.log
+ * | \---head.log
* | \---2.domain
* | \---generation1.id
* | \---22.server
- * | \---current.log
+ * | \---head.log
* | \---33.server
- * | \---current.log
+ * | \---head.log
* </pre>
*/
class ReplicationEnvironment
{
private static final DebugTracer TRACER = getTracer();
+ // TODO : to replace by configurable value
+ private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024;
+
private static final int NO_GENERATION_ID = -1;
private static final String CN_INDEX_DB_DIRNAME = "changenumberindex";
@@ -159,7 +163,7 @@
private final String replicationRootPath;
/** The list of logs that are in use. */
- private final List<LogFile<?, ?>> logs = new CopyOnWriteArrayList<LogFile<?, ?>>();
+ private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
/** Maps each domain DN to a domain id that is used to name directory in file system. */
private final Map<DN, String> domains = new HashMap<DN, String>();
@@ -228,7 +232,7 @@
* @throws ChangelogException
* if an error occurs.
*/
- LogFile<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
+ Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
throws ChangelogException
{
if (debugEnabled())
@@ -276,7 +280,7 @@
* @throws ChangelogException
* when a problem occurs.
*/
- LogFile<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
+ Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
{
final File path = getCNIndexDBPath();
try
@@ -305,24 +309,6 @@
}
/**
- * Clears the content of replication database.
- *
- * @param log
- * The log to clear.
- */
- void clearDB(final LogFile<?, ?> log)
- {
- try
- {
- log.clear();
- }
- catch (ChangelogException e)
- {
- logError(ERR_ERROR_CLEARING_DB.get(log.getName(), stackTraceToSingleLineString(e)));
- }
- }
-
- /**
* Clears the generated id associated to the provided domain DN from the state
* Db.
* <p>
@@ -506,12 +492,12 @@
}
/** Open a log from the provided path and record parser. */
- private <K extends Comparable<K>, V> LogFile<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
+ private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
throws ChangelogException
{
checkShutDownBeforeOpening(serverIdPath);
- final LogFile<K, V> log = LogFile.newAppendableLogFile(serverIdPath, parser);
+ final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
checkShutDownAfterOpening(serverIdPath, log);
@@ -519,11 +505,11 @@
return log;
}
- private void checkShutDownAfterOpening(final File serverIdPath, final LogFile<?, ?> log) throws ChangelogException
+ private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException
{
if (isShuttingDown.get())
{
- closeDB(log);
+ closeLog(log);
throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(),
replicationServer.getServerId()));
}
@@ -590,7 +576,7 @@
return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
}
- private void closeDB(final LogFile<?, ?> log)
+ private void closeLog(final Log<?, ?> log)
{
logs.remove(log);
log.close();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index bc3aee3..54ea0c8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1101,6 +1101,13 @@
return new File(testResourceDir, filename);
}
+ public static File getUnitTestRootPath()
+ {
+ final String buildRoot = System.getProperty(PROPERTY_BUILD_ROOT);
+ final String path = System.getProperty(PROPERTY_BUILD_DIR, buildRoot + File.separator + "build");
+ return new File(path, "unit-tests");
+ }
+
/**
* Prevent instantiation.
*/
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
index 5fc8214..c56c37f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication;
@@ -79,6 +79,7 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: ChangeNumberControlDbTest\n"
+ + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 103\n";
// suffix synchronized
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
index bb657c8..96043fe 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2007-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication;
@@ -100,6 +100,7 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port:" + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: ReSyncTest\n"
+ + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 104\n";
// suffix synchronized
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
index 16926d6..30d4c87 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -81,6 +81,7 @@
+ "cn: replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: HistoricalTest\n"
+ + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 102\n";
// The suffix to be synchronized.
@@ -489,7 +490,7 @@
addEntriesWithHistorical(1, entryCnt);
// leave a little delay between adding/modifying test entries
- // and configuring the purge delay.
+ // and configuring the purge delay.
Thread.sleep(10);
// set the purge delay to 1 minute
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
index 2873fd4..fe5428e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -80,7 +80,7 @@
{
RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER;
- ByteString data = parser.encodeRecord(msg.getChangeNumber(), msg);
+ ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg));
Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data);
assertThat(record).isNotNull();
@@ -169,7 +169,10 @@
* in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
* </ol>
*/
- // TODO :: enable when purge is implemented with multi-files log
+ // TODO : this works only if we ensure that there is a rotation of ahead log file
+ // at the right place. First two records are 37 and 76 bytes long,
+ // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
+ // Re-enable this test when max file size is customizable for log
@Test(enabled=false)
public void testPurge() throws Exception
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index 9f7eca7..e70966b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -95,7 +95,7 @@
{
RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER;
- ByteString data = parser.encodeRecord(msg.getCSN(), msg);
+ ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg));
Record<CSN, UpdateMsg> record = parser.decodeRecord(data);
assertThat(record).isNotNull();
@@ -264,7 +264,10 @@
}
}
- // TODO : enable when purge is enabled with multi-files log implementation
+ // TODO : this works only if we ensure that there is a rotation of ahead log file
+ // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have
+ // the last record alone in the ahead log file
+ // Re-enable this test when max file size is customizable for log
@Test(enabled=false)
public void testPurge() throws Exception
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
index 0a0c37c..524d95a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -35,7 +35,8 @@
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -45,11 +46,11 @@
@Test(sequential=true)
public class LogFileTest extends DirectoryServerTestCase
{
- private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
+ private static final String TEST_DIRECTORY_CHANGELOG = "test-output" + File.separator + "changelog";
- private static final StringRecordParser RECORD_PARSER = new StringRecordParser();
+ static final StringRecordParser RECORD_PARSER = new StringRecordParser();
- private static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
+ static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
@Override
public Record<String, String> decodeRecord(ByteString data) throws DecodingException
{
@@ -57,11 +58,18 @@
}
};
+ @BeforeClass
+ public void createTestDirectory()
+ {
+ File logDir = new File(TEST_DIRECTORY_CHANGELOG);
+ logDir.mkdirs();
+ }
+
@BeforeMethod
/** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */
public void initialize() throws Exception
{
- File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, LogFile.LOG_FILE_NAME);
+ File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME);
if (theLogFile.exists())
{
theLogFile.delete();
@@ -70,12 +78,12 @@
for (int i = 1; i <= 10; i++)
{
- logFile.addRecord("key"+i, "value"+i);
+ logFile.append(Record.from("key"+i, "value"+i));
}
logFile.close();
}
- @AfterMethod
+ @AfterClass
public void cleanTestChangelogDirectory()
{
final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
@@ -87,8 +95,7 @@
private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
{
- LogFile<String, String> logFile = LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG), parser);
- return logFile;
+ return LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME), parser);
}
@Test
@@ -266,7 +273,7 @@
for (int i = 1; i <= 100; i++)
{
Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
- writeLog.addRecord(record);
+ writeLog.append(record);
assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1"));
assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
@@ -279,35 +286,6 @@
}
}
- @Test()
- public void testTwoConcurrentWrite() throws Exception
- {
- final LogFile<String, String> writeLog1 = getLogFile(RECORD_PARSER);
- final LogFile<String, String> writeLog2 = getLogFile(RECORD_PARSER);
- try
- {
- writeLog1.addRecord(Record.from("startkey", "startvalue"));
- Thread write1 = getWriteLogThread(writeLog1, "a");
- Thread write2 = getWriteLogThread(writeLog2, "b");
- write1.run();
- write2.run();
-
- write1.join();
- write2.join();
- writeLog1.syncToFileSystem();
- DBCursor<Record<String, String>> cursor = writeLog1.getCursor("startkey");
- for (int i = 1; i <= 200; i++)
- {
- assertThat(cursor.next()).isTrue();
- }
- assertThat(cursor.getRecord()).isIn(Record.from("k-b100", "v-b100"), Record.from("k-a100", "v-a100"));
- }
- finally
- {
- StaticUtils.close(writeLog1, writeLog2);
- }
- }
-
/**
* Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
* endIndex, using (keyN, valueN) where N is the index.
@@ -320,30 +298,13 @@
assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i));
}
- assertThat(cursor.next()).isFalse();
- assertThat(cursor.getRecord()).isNull();
+ assertThatCursorIsExhausted(cursor);
}
- /** Returns a thread that write 100 records to the provided log. */
- private Thread getWriteLogThread(final LogFile<String, String> writeLog, final String recordPrefix)
+ private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
{
- return new Thread() {
- public void run()
- {
- for (int i = 1; i <= 100; i++)
- {
- Record<String, String> record = Record.from("k-" + recordPrefix + i, "v-" + recordPrefix + i);
- try
- {
- writeLog.addRecord(record);
- }
- catch (ChangelogException e)
- {
- e.printStackTrace();
- }
- }
- }
- };
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
}
/**
@@ -374,11 +335,31 @@
return length;
}
- public ByteString encodeRecord(String key, String value)
+ public ByteString encodeRecord(Record<String, String> record)
{
return new ByteStringBuilder()
- .append(key).append(STRING_SEPARATOR)
- .append(value).append(STRING_SEPARATOR).toByteString();
+ .append(record.getKey()).append(STRING_SEPARATOR)
+ .append(record.getValue()).append(STRING_SEPARATOR).toByteString();
+ }
+
+ @Override
+ public String decodeKeyFromString(String key) throws ChangelogException
+ {
+ return key;
+ }
+
+ @Override
+ public String encodeKeyToString(String key)
+ {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getMaxKey()
+ {
+ // '~' character has the highest ASCII value
+ return "~~~~";
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
new file mode 100644
index 0000000..321d256
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -0,0 +1,571 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+@Test(sequential=true)
+public class LogTest extends DirectoryServerTestCase
+{
+ // Use a directory dedicated to this test class
+ private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+ @BeforeMethod
+ public void initialize() throws Exception
+ {
+ // Delete any previous log
+ if (LOG_DIRECTORY.exists())
+ {
+ StaticUtils.recursiveDelete(LOG_DIRECTORY);
+ }
+
+ // Build a log with 10 records with String keys and String values
+ // Keys are using the format keyNNN where N is a figure
+ // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly
+ Log<String, String> log = openLog(RECORD_PARSER);
+ for (int i = 1; i <= 10; i++)
+ {
+ log.append(Record.from(String.format("key%03d", i), "value" + i));
+ }
+ log.close();
+ }
+
+ private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException
+ {
+ // Each string record has a length of approximately 18 bytes
+ // This size is set in order to have 2 records per log file before the rotation happens
+ // This allow to ensure rotation mechanism is thoroughly tested
+ // Some tests rely on having 2 records per log file (especially the purge tests), so take care
+ // if this value has to be changed
+ int sizeLimitPerFileInBytes = 30;
+
+ return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
+ }
+
+ @Test
+ public void testCursor() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor();
+
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenAnExistingKey() throws Exception
+ {
+ Log<String, String> log = openLog(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor("key005");
+
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key005", "value5"));
+ assertThatCursorCanBeFullyRead(cursor, 6, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenAnUnexistingKey() throws Exception
+ {
+ Log<String, String> log = openLog(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ // key is between key005 and key006
+ cursor = log.getCursor("key005000");
+
+ assertThat(cursor).isNotNull();
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isFalse();
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenANullKey() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor(null);
+
+ // should start from first record
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testNearestCursorWhenGivenAnExistingKey() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null;
+ try {
+ // this key is the first key of the log file "key1_key2.log"
+ cursor1 = log.getNearestCursor("key001");
+ // lowest higher key is key2
+ assertThat(cursor1.getRecord()).isEqualTo(Record.from("key002", "value2"));
+ assertThatCursorCanBeFullyRead(cursor1, 3, 10);
+
+ // this key is the last key of the log file "key3_key4.log"
+ cursor2 = log.getNearestCursor("key004");
+ // lowest higher key is key5
+ assertThat(cursor2.getRecord()).isEqualTo(Record.from("key005", "value5"));
+ assertThatCursorCanBeFullyRead(cursor2, 6, 10);
+
+ cursor3 = log.getNearestCursor("key009");
+ // lowest higher key is key10
+ assertThat(cursor3.getRecord()).isEqualTo(Record.from("key010", "value10"));
+ assertThatCursorIsExhausted(cursor3);
+ }
+ finally {
+ StaticUtils.close(cursor1, cursor2, cursor3, log);
+ }
+ }
+
+ @Test
+ public void testNearestCursorWhenGivenAnExistingKey_KeyIsTheLastOne() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getNearestCursor("key010");
+
+ // lowest higher key does not exist
+ assertThatCursorIsExhausted(cursor);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testNearestCursorWhenGivenAnUnexistingKey() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ // key is between key005 and key006
+ cursor = log.getNearestCursor("key005000");
+
+ // lowest higher key is key006
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key006", "value6"));
+ assertThatCursorCanBeFullyRead(cursor, 7, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testNearestCursorWhenGivenANullKey() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getNearestCursor(null);
+
+ // should start from start
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test(expectedExceptions=ChangelogException.class)
+ public void testCursorWhenParserFailsToRead() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER_FAILING_TO_READ);
+ try {
+ log.getCursor("key");
+ }
+ finally {
+ StaticUtils.close(log);
+ }
+ }
+
+ @Test
+ public void testGetOldestRecord() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = log.getOldestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key001", "value1"));
+ }
+ finally {
+ StaticUtils.close(log);
+ }
+ }
+
+ @Test
+ public void testGetNewestRecord() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = log.getNewestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key010", "value10"));
+ }
+ finally {
+ StaticUtils.close(log);
+ }
+ }
+
+ /**
+ * Test that changes are visible immediately to a reader after a write.
+ */
+ @Test
+ public void testWriteAndReadOnSameLog() throws Exception
+ {
+ Log<String, String> writeLog = null;
+ Log<String, String> readLog = null;
+ try
+ {
+ writeLog = openLog(LogFileTest.RECORD_PARSER);
+ readLog = openLog(LogFileTest.RECORD_PARSER);
+
+ for (int i = 1; i <= 10; i++)
+ {
+ Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i);
+ writeLog.append(record);
+ assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
+ assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1"));
+ assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
+ assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1"));
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writeLog, readLog);
+ }
+ }
+
+ @Test
+ public void testTwoConcurrentWrite() throws Exception
+ {
+ Log<String, String> writeLog1 = null;
+ Log<String, String> writeLog2 = null;
+ DBCursor<Record<String, String>> cursor = null;
+ try
+ {
+ writeLog1 = openLog(LogFileTest.RECORD_PARSER);
+ writeLog2 = openLog(LogFileTest.RECORD_PARSER);
+ writeLog1.append(Record.from("key020", "starting record"));
+ AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>();
+ Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
+ Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
+ write1.run();
+ write2.run();
+
+ write1.join();
+ write2.join();
+ if (exceptionRef.get() != null)
+ {
+ throw exceptionRef.get();
+ }
+ writeLog1.syncToFileSystem();
+ cursor = writeLog1.getCursor("key020");
+ for (int i = 1; i <= 60; i++)
+ {
+ assertThat(cursor.next()).isTrue();
+ }
+ assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
+ }
+ finally
+ {
+ StaticUtils.close(cursor, writeLog1, writeLog2);
+ }
+ }
+
+ /**
+ * This test should be disabled.
+ * Enable it locally when you need to have an rough idea of write performance.
+ */
+ @Test(enabled=false)
+ public void logWriteSpeed() throws Exception
+ {
+ Log<String, String> writeLog = null;
+ try
+ {
+ long sizeOf1MB = 1024*1024;
+ writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
+
+ for (int i = 1; i < 1000000; i++)
+ {
+ writeLog.append(Record.from(String.format("key%010d", i), "value" + i));
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writeLog);
+ }
+ }
+
+ @Test
+ public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ cursor = log.getCursor();
+ // advance cursor to last record to ensure it is pointing to ahead log file
+ advanceCursorFromFirstRecordTo(cursor, 10);
+
+ // add new records to ensure the ahead log file is rotated
+ for (int i = 11; i <= 20; i++)
+ {
+ log.append(Record.from(String.format("key%03d", i), "value" + i));
+ }
+
+ // check that cursor can fully read the new records
+ assertThatCursorCanBeFullyRead(cursor, 11, 20);
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ cursor1 = log.getCursor();
+ advanceCursorFromFirstRecordTo(cursor1, 1);
+ cursor2 = log.getCursor();
+ advanceCursorFromFirstRecordTo(cursor2, 4);
+ cursor3 = log.getCursor();
+ advanceCursorFromFirstRecordTo(cursor3, 9);
+ cursor4 = log.getCursor();
+ advanceCursorFromFirstRecordTo(cursor4, 10);
+
+ // add new records to ensure the ahead log file is rotated
+ for (int i = 11; i <= 20; i++)
+ {
+ log.append(Record.from(String.format("key%03d", i), "value" + i));
+ }
+
+ // check that cursors can fully read the new records
+ assertThatCursorCanBeFullyRead(cursor1, 2, 20);
+ assertThatCursorCanBeFullyRead(cursor2, 5, 20);
+ assertThatCursorCanBeFullyRead(cursor3, 10, 20);
+ assertThatCursorCanBeFullyRead(cursor4, 11, 20);
+ }
+ finally
+ {
+ StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log);
+ }
+ }
+
+ @Test
+ public void testClear() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ log.clear();
+
+ cursor = log.getCursor();
+ assertThatCursorIsExhausted(cursor);
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved
+ @Test(enabled=false, expectedExceptions=ChangelogException.class)
+ public void testClearWhenCursorIsOpened() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ cursor = log.getCursor();
+ log.clear();
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @DataProvider(name = "purgeKeys")
+ Object[][] purgeKeys()
+ {
+ // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor
+ return new Object[][]
+ {
+ // lowest key of the read-only log file "key005_key006.log"
+ { "key005", Record.from("key005", "value5"), 6, 10},
+ // key that is not the lowest of the read-only log file "key005_key006.log"
+ { "key006", Record.from("key005", "value5"), 6, 10},
+ // lowest key of the ahead log file "ahead.log"
+ { "key009", Record.from("key009", "value9"), 10, 10},
+ // key that is not the lowest of the ahead log file "ahead.log"
+ { "key010", Record.from("key009", "value9"), 10, 10},
+
+ // key not present in log, which is between key005 and key006
+ { "key005a", Record.from("key005", "value5"), 6, 10},
+ // key not present in log, which is between key006 and key007
+ { "key006a", Record.from("key007", "value7"), 8, 10},
+ // key not present in log, which is lower than oldest key key001
+ { "key000", Record.from("key001", "value1"), 2, 10},
+ // key not present in log, which is higher than newest key key010
+ // should return the lowest key present in ahead log
+ { "key011", Record.from("key009", "value9"), 10, 10},
+ };
+ }
+
+ /**
+ * Given a purge key, after purge is done, expects a new cursor to point on first record provided and
+ * then to be fully read starting at provided start index and finishing at provided end index.
+ */
+ @Test(dataProvider="purgeKeys")
+ public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
+ int cursorStartIndex, int cursorEndIndex) throws Exception
+ {
+ Log<String, String> log = null;
+ DBCursor<Record<String, String>> cursor = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+
+ log.purgeUpTo(purgeKey);
+
+ cursor = log.getCursor();
+ assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
+ assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ private void advanceCursorFromFirstRecordTo(DBCursor<Record<String, String>> cursor, int endIndex)
+ throws Exception
+ {
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+ advanceCursorUpTo(cursor, 2, endIndex);
+ }
+
+ private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+ throws Exception
+ {
+ for (int i = fromIndex; i <= endIndex; i++)
+ {
+ assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+ assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i));
+ }
+ }
+
+ /**
+ * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
+ * endIndex, using (keyN, valueN) where N is the index.
+ */
+ private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+ throws Exception
+ {
+ advanceCursorUpTo(cursor, fromIndex, endIndex);
+ assertThatCursorIsExhausted(cursor);
+ }
+
+ private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
+ {
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ /** Returns a thread that write N records to the provided log. */
+ private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix,
+ final AtomicReference<ChangelogException> exceptionRef)
+ {
+ return new Thread() {
+ public void run()
+ {
+ for (int i = 1; i <= 30; i++)
+ {
+ Record<String, String> record = Record.from(
+ String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i);
+ try
+ {
+ writeLog.append(record);
+ }
+ catch (ChangelogException e)
+ {
+ // keep the first exception only
+ exceptionRef.compareAndSet(null, e);
+ }
+ }
+ }
+ };
+ }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index 06ab151..8f3e173 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -80,14 +80,15 @@
final DN domainDN = DN.decode(DN1_AS_STRING);
ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
- LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
- LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
- LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
+ Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+ Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
+ Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
StaticUtils.close(cnDB, replicaDB, replicaDB2);
ChangelogState state = environment.readChangelogState();
- assertThat(state.getDomainToServerIds()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(1, 2)));
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+ assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2);
assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
}
@@ -98,8 +99,8 @@
List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
- LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
- List<LogFile<CSN,UpdateMsg>> replicaDBs = new ArrayList<LogFile<CSN,UpdateMsg>>();
+ Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+ List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
for (int i = 0; i <= 2 ; i++)
{
for (int j = 1; j <= 10; j++)
@@ -129,8 +130,8 @@
File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
DN domainDN = DN.decode(DN1_AS_STRING);
ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
- LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
- LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
+ Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
+ Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
StaticUtils.close(replicaDB, replicaDB2);
// delete the domain directory created for the 2 replica DBs to break the
--
Gitblit v1.10.0