From 9d1bd29ee527b598f0e91a9d02920eaacb3f767d Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 07 May 2014 09:27:48 +0000
Subject: [PATCH] OPENDJ-1388 – Implement simple changelog db based on single log file
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java | 142 +
opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java | 72
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 4
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java | 77
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java | 10
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java | 383 +++
opends/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java | 72
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 405 ++++
opends/src/messages/messages/replication.properties | 52
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 9
opends/resource/schema/02-config.ldif | 9
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java | 114 +
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java | 6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java | 48
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java | 344 +++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 575 +++++
opends/src/admin/messages/ReplicationServerCfgDefn.properties | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/DirectoryServerTestCase.java | 7
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 690 ++++++
opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java | 209 ++
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 2
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml | 32
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java | 6
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 911 +++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java | 385 +++
opends/src/server/org/opends/server/replication/server/changelog/file/Record.java | 130 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java | 5
opends/src/server/org/opends/server/backends/jeb/BackupManager.java | 10
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 10
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/GroupIdHandshakeTest.java | 15
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 809 ++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/plugins/ReferentialIntegrityPluginTestCase.java | 13
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 18
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java | 6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 19
opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java | 107 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java | 6
opends/src/server/org/opends/server/replication/server/changelog/file/package-info.java | 33
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java | 6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java | 34
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 21
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 28
opends/build.xml | 22
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java | 35
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 21
52 files changed, 5,821 insertions(+), 119 deletions(-)
diff --git a/opends/build.xml b/opends/build.xml
index 39f48d2..9c282cf 100644
--- a/opends/build.xml
+++ b/opends/build.xml
@@ -22,7 +22,7 @@
!
!
! Copyright 2006-2010 Sun Microsystems, Inc.
- ! Portions Copyright 2011-2013 ForgeRock AS
+ ! Portions Copyright 2011-2014 ForgeRock AS
! Portions Copyright 2012 Delta Victor Consultants
! -->
@@ -2158,6 +2158,13 @@
<echo message=" Default debug target:"/>
<echo message=" org.opends.server:level=warning,category=caught|data|database-access|message|protocol,stack,cause" />
<echo message=""/>
+ <echo message=" -Dorg.opends.test.replicationDbImpl=LOG"/>
+ <echo message=" indicates which implementation to use for replication DB."/>
+ <echo message=" Value must be one of: JE, LOG." />
+ <echo message=" JE: use berkeley DB JE as implementation." />
+ <echo message=" LOG: use log file as implementation." />
+ <echo message=" Default value is LOG." />
+ <echo message=""/>
<echo message=" -Dtest.diff.srcpath=src/server/org/opends/server/core"/>
<echo message=" for example includes only the classes in"/>
<echo message=" src/server/org/opends/server/core in the coveragediff report."/>
@@ -2235,7 +2242,15 @@
</not>
</condition>
- <!-- This sets org.opends.test.suppressOutput if and only if it's not
+ <!-- This sets org.opends.test.replicationDbImpl if and only if it's not
+ already set. -->
+ <condition property="org.opends.test.replicationDbImpl" value="LOG">
+ <not>
+ <isset property="org.opends.test.replicationDbImpl" />
+ </not>
+ </condition>
+
+ <!-- This sets org.opends.test.suppressOutput if and only if it's not
already set. -->
<condition property="org.opends.test.suppressOutput" value="true">
<not>
@@ -2243,7 +2258,7 @@
</not>
</condition>
- <!-- This sets org.opends.test.pauseOnFailure if and only if it's not
+ <!-- This sets org.opends.test.pauseOnFailure if and only if it's not
already set. -->
<condition property="org.opends.test.pauseOnFailure" value="false">
<not>
@@ -2352,6 +2367,7 @@
<jvmarg value="-Dorg.opends.server.BuildDir=${build.dir}" />
<jvmarg value="-Dorg.opends.server.RunningUnitTests=true" />
<jvmarg value="-Dorg.opends.server.snmp.opendmk=${opendmk.lib.dir}"/>
+ <jvmarg value="-Dorg.opends.test.replicationDbImpl=${org.opends.test.replicationDbImpl}" />
<jvmarg value="-Dorg.opends.test.suppressOutput=${org.opends.test.suppressOutput}" />
<jvmarg value="-Dorg.opends.test.pauseOnFailure=${org.opends.test.pauseOnFailure}" />
<jvmarg value="-Dorg.opends.server.debug.target=${org.opends.server.debug.target}" />
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index d25b449..5087ff3 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -21,7 +21,7 @@
#
#
# Copyright 2006-2010 Sun Microsystems, Inc.
-# Portions Copyright 2010-2013 ForgeRock AS.
+# Portions Copyright 2010-2014 ForgeRock AS.
# Portions Copyright 2011 profiq, s.r.o.
# Portions Copyright 2012 Manuel Gaupp
#
@@ -3759,6 +3759,12 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
SINGLE-VALUE
X-ORIGIN 'OpenDJ Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.142
+ NAME 'ds-cfg-replication-db-implementation'
+ EQUALITY caseExactMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDJ Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler'
SUP top
@@ -4507,6 +4513,7 @@
ds-cfg-window-size $
ds-cfg-queue-size $
ds-cfg-replication-db-directory $
+ ds-cfg-replication-db-implementation $
ds-cfg-replication-purge-delay $
ds-cfg-group-id $
ds-cfg-assured-timeout $
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
index c82a59d..f9f5904 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -23,7 +23,7 @@
!
!
! Copyright 2007-2010 Sun Microsystems, Inc.
- ! Portions copyright 2011-2013 ForgeRock AS
+ ! Portions copyright 2011-2014 ForgeRock AS
! -->
<adm:managed-object name="replication-server"
plural-name="replication-servers"
@@ -159,6 +159,32 @@
</ldap:attribute>
</adm:profile>
</adm:property>
+ <adm:property name="replication-db-implementation" mandatory="true" read-only="true">
+ <adm:synopsis>
+ The <adm:user-friendly-name /> database implementation
+ that stores all persistent information.
+ </adm:synopsis>
+ <adm:default-behavior>
+ <adm:defined>
+ <adm:value>je</adm:value>
+ </adm:defined>
+ </adm:default-behavior>
+ <adm:syntax>
+ <adm:enumeration>
+ <adm:value name="je">
+ <adm:synopsis>Implementation based on Berkeley DB JE database.</adm:synopsis>
+ </adm:value>
+ <adm:value name="log">
+ <adm:synopsis>Implementation based on log file.</adm:synopsis>
+ </adm:value>
+ </adm:enumeration>
+ </adm:syntax>
+ <adm:profile name="ldap">
+ <ldap:attribute>
+ <ldap:name>ds-cfg-replication-db-implementation</ldap:name>
+ </ldap:attribute>
+ </adm:profile>
+ </adm:property>
<adm:property name="replication-purge-delay">
<adm:synopsis>
The time (in seconds) after which the
@@ -311,7 +337,7 @@
and directory servers). Larger values increase the length of time it
takes for a directory server to detect and switch to a more suitable
replication server, whereas smaller values increase the amount of
- background network traffic.
+ background network traffic.
</adm:description>
<adm:default-behavior>
<adm:defined>
@@ -334,7 +360,7 @@
<adm:description>
This boolean tells the replication server to compute change numbers for
each replicated change by maintaining a change number index database.
- Changenumbers are computed according to
+ Changenumbers are computed according to
http://tools.ietf.org/html/draft-good-ldap-changelog-04.
Note this functionality has an impact on CPU, disk accesses and storage.
If changenumbers are not required, it is advisable to set this value to
diff --git a/opends/src/admin/messages/ReplicationServerCfgDefn.properties b/opends/src/admin/messages/ReplicationServerCfgDefn.properties
index 7dc1423..b9111db 100644
--- a/opends/src/admin/messages/ReplicationServerCfgDefn.properties
+++ b/opends/src/admin/messages/ReplicationServerCfgDefn.properties
@@ -13,6 +13,9 @@
property.monitoring-period.description=Defines the duration that the replication server will wait before sending new monitoring messages to its peers (replication servers and directory servers). Larger values increase the length of time it takes for a directory server to detect and switch to a more suitable replication server, whereas smaller values increase the amount of background network traffic.
property.queue-size.synopsis=Specifies the number of changes that are kept in memory for each directory server in the Replication Domain.
property.replication-db-directory.synopsis=The path where the Replication Server stores all persistent information.
+property.replication-db-implementation.synopsis=The Replication Server database implementation that stores all persistent information.
+property.replication-db-implementation.syntax.enumeration.value.je.synopsis=Implementation based on Berkeley DB JE database.
+property.replication-db-implementation.syntax.enumeration.value.log.synopsis=Implementation based on log file.
property.replication-port.synopsis=The port on which this Replication Server waits for connections from other Replication Servers or Directory Servers.
property.replication-purge-delay.synopsis=The time (in seconds) after which the Replication Server erases all persistent information.
property.replication-server.synopsis=Specifies the addresses of other Replication Servers to which this Replication Server tries to connect at startup time.
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index fa4fec7..a1ae60f 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -531,3 +531,55 @@
change %s to replicaDB %s %s because: %s
SEVERE_ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \
change %s to replicaDB %s %s because flushing thread is shutting down
+SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \
+ state from root path '%s' : directory might not exist
+SEVERE_ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND_242=Error when retrieving \
+ changelog state from root path '%s' : no generation id file found in domain \
+ directory '%s'
+SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \
+ changelog state from root path '%s' : IO error on domain directory '%s' when retrieving \
+ list of server ids
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_REPLICA_DB_244=Could not get or create replica DB \
+ for baseDN '%s', serverId '%d', generationId '%d'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_CN_INDEX_DB_245= Could not get or create change \
+ number index DB in root path '%s', using path '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE_246=Could not retrieve \
+ generation id file '%s' for DN '%s' to delete it
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_SERVER_ID_DIRECTORY_247=Could not create \
+ directory '%s' for server id %d
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE_248=Could not create \
+ generation id file '%s'
+SEVERE_ERR_CHANGELOG_DOMAIN_FILENAME_WRONG_FORMAT_249=Could not read domain \
+ filename because it uses a wrong format, expecting '[dn].domain' where [dn] is \
+ a DN but got '%s'
+SEVERE_ERR_CHANGELOG_SERVER_ID_FILENAME_WRONG_FORMAT_250=Could not read server id \
+ filename because it uses a wrong format, expecting '[id].server' where [id] is \
+ numeric but got '%s'
+SEVERE_ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT_251=Could not read generation id \
+ because it uses a wrong format, expecting a number but got '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE_252=Could not open log file '%s' for write
+SEVERE_ERR_CHANGELOG_UNABLE_TO_OPEN_READER_ON_LOG_FILE_253=Could not open a reader \
+ on log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD_254=Could not decode a record from data \
+ read in log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE_256=Could not create log file '%s'
+SEVERE_WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE_257=The changelog '%s' has been opened in \
+ read-only mode, it is not enabled for write
+SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_RECORD_258=Could not add record '%s' in log \
+ file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_SYNC_259=Could not synchronize written records \
+ to file system for log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \
+ on log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \
+ log file
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \
+ from domain state file '%s', from line '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \
+ file '%s'
+SEVERE_ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE_264=There is a mismatch between domain state \
+ file and actual domain directories found in file system. Expected domain ids : '%s'. \
+ Actual domain ids found in file system: '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE_265=Could not create a new domain \
+ id %s for domain DN %s and save it in domain state file '%s"
diff --git a/opends/src/server/org/opends/server/backends/jeb/BackupManager.java b/opends/src/server/org/opends/server/backends/jeb/BackupManager.java
index 8595120..f07e00c 100644
--- a/opends/src/server/org/opends/server/backends/jeb/BackupManager.java
+++ b/opends/src/server/org/opends/server/backends/jeb/BackupManager.java
@@ -22,14 +22,14 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.backends.jeb;
import org.opends.messages.Message;
-
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.util.DynamicConstants;
+import org.opends.server.util.StaticUtils;
import org.opends.server.types.CryptoManagerException;
import javax.crypto.Mac;
@@ -53,9 +53,12 @@
import java.util.zip.ZipOutputStream;
import org.opends.server.types.*;
+
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
+
import org.opends.server.loggers.debug.DebugTracer;
+
import static org.opends.messages.JebMessages.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
@@ -716,8 +719,7 @@
// Delete the current backend directory and rename the restore directory.
if (!verifyOnly)
{
- cleanup(backendDir);
- backendDir.delete();
+ StaticUtils.recursiveDelete(backendDir);
if (!restoreDir.renameTo(backendDir))
{
Message msg = ERR_JEB_CANNOT_RENAME_RESTORE_DIRECTORY.get(
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 176922c..f1af61f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -36,6 +36,7 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
@@ -44,11 +45,13 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
import org.opends.server.core.networkgroups.NetworkGroup;
+import org.opends.server.loggers.debug.DebugLogger;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.file.FileChangelogDB;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
@@ -127,7 +130,23 @@
throws ConfigException
{
this.config = configuration;
- this.changelogDB = new JEChangelogDB(this, configuration);
+ ReplicationDBImplementation dbImpl = configuration.getReplicationDBImplementation();
+ if (dbImpl == ReplicationDBImplementation.JE)
+ {
+ if (DebugLogger.debugEnabled())
+ {
+ TRACER.debugMessage(DebugLogLevel.INFO, "Using JE as DB implementation for changelog DB");
+ }
+ this.changelogDB = new JEChangelogDB(this, configuration);
+ }
+ else
+ {
+ if (DebugLogger.debugEnabled())
+ {
+ TRACER.debugMessage(DebugLogLevel.INFO, "Using LOG FILE as DB implementation for changelog DB");
+ }
+ this.changelogDB = new FileChangelogDB(this, configuration);
+ }
replSessionSecurity = new ReplSessionSecurity();
initialize();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java b/opends/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java
new file mode 100644
index 0000000..e5d313c
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java
@@ -0,0 +1,72 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.opends.messages.Message;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+
+/**
+ * Exception thrown when a record can't be decoded properly.
+ */
+public class DecodingException extends ChangelogException
+{
+ private static final long serialVersionUID = 5629692522662643737L;
+
+ /**
+ * Creates a new decoding exception with the provided information.
+ *
+ * @param message
+ * The message that explains the problem that occurred.
+ */
+ public DecodingException(Message message)
+ {
+ super(message);
+ }
+
+ /**
+ * Creates a new decoding exception with the provided information.
+ *
+ * @param cause
+ * The underlying cause that triggered this exception.
+ */
+ public DecodingException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ /**
+ * Creates a new decoding exception with the provided information.
+ *
+ * @param message
+ * The message that explains the problem that occurred.
+ * @param cause
+ * The underlying cause that triggered this exception.
+ */
+ public DecodingException(Message message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
new file mode 100644
index 0000000..53ee20e
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -0,0 +1,383 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.types.*;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+/**
+ * Logfile-based implementation of a ChangeNumberIndexDB.
+ * <p>
+ * This class publishes some monitoring information below <code>
+ * cn=monitor</code>.
+ */
+class FileChangeNumberIndexDB implements ChangeNumberIndexDB
+{
+
+ private static final DebugTracer TRACER = getTracer();
+
+ private static final int NO_KEY = 0;
+
+ /** The parser of records stored in this ChangeNumberIndexDB. */
+ static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
+
+ /** The log in which records are persisted. */
+ private final LogFile<Long, ChangeNumberIndexRecord> logFile;
+
+ /**
+ * The newest changenumber stored in the DB. It is used to avoid purging the
+ * record with the newest changenumber. The newest record in the changenumber
+ * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
+ * then retrieved on server startup.
+ */
+ private volatile long newestChangeNumber = NO_KEY;
+
+ /**
+ * The last generated value for the change number. It is kept separate from
+ * the {@link #newestChangeNumber} because there is an opportunity for a race
+ * condition between:
+ * <ol>
+ * <li>this atomic long being incremented for a new record ('recordB')</li>
+ * <li>the current newest record ('recordA') being purged from the DB</li>
+ * <li>'recordB' failing to be inserted in the DB</li>
+ * </ol>
+ */
+ private final AtomicLong lastGeneratedChangeNumber;
+
+ private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
+
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ /**
+ * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
+ *
+ * @param replicationEnv the Database Env to use to create the ReplicationServer DB.
+ * server for this domain.
+ * @throws ChangelogException If a database problem happened
+ */
+ FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
+ {
+ logFile = replicationEnv.getOrCreateCNIndexDB();
+ final ChangeNumberIndexRecord newestRecord = readLastRecord();
+ newestChangeNumber = getChangeNumber(newestRecord);
+ // initialization of the lastGeneratedChangeNumber from the DB content
+ // if DB is empty => last record does not exist => default to 0
+ lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
+
+ // Monitoring registration
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ DirectoryServer.registerMonitorProvider(dbMonitor);
+ }
+
+ private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
+ {
+ final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord();
+ return record == null ? null : record.getValue();
+ }
+
+ private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
+ {
+ final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord();
+ return record == null ? null : record.getValue();
+ }
+
+ private long getChangeNumber(final ChangeNumberIndexRecord record) throws ChangelogException
+ {
+ if (record != null)
+ {
+ return record.getChangeNumber();
+ }
+ return NO_KEY;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long addRecord(final ChangeNumberIndexRecord record) throws ChangelogException
+ {
+ final long changeNumber = nextChangeNumber();
+ final ChangeNumberIndexRecord newRecord =
+ new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN());
+ logFile.addRecord(newRecord.getChangeNumber(), newRecord);
+ newestChangeNumber = changeNumber;
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In FileChangeNumberIndexDB.addRecord, added: " + newRecord);
+ }
+ return changeNumber;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException
+ {
+ return readFirstRecord();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException
+ {
+ return readLastRecord();
+ }
+
+ private long nextChangeNumber()
+ {
+ return lastGeneratedChangeNumber.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getLastGeneratedChangeNumber()
+ {
+ return lastGeneratedChangeNumber.get();
+ }
+
+ /**
+ * Get the number of changes.
+ *
+ * @return Returns the number of changes.
+ * @throws ChangelogException
+ * If a problem occurs.
+ */
+ long count() throws ChangelogException
+ {
+ return logFile.getNumberOfRecords();
+ }
+
+ /**
+ * Returns whether this database is empty.
+ *
+ * @return <code>true</code> if this database is empty, <code>false</code>
+ * otherwise
+ * @throws ChangelogException
+ * if a problem occurs.
+ */
+ boolean isEmpty() throws ChangelogException
+ {
+ return getNewestRecord() == null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
+ {
+ return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber));
+ }
+
+ /**
+ * Shutdown this DB.
+ */
+ void shutdown()
+ {
+ if (shutdown.compareAndSet(false, true))
+ {
+ logFile.close();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ }
+ }
+
+ /**
+ * Synchronously purges the change number index DB up to and excluding the
+ * provided timestamp.
+ *
+ * @param purgeCSN
+ * the timestamp up to which purging must happen
+ * @return the oldest non purged CSN.
+ * @throws ChangelogException
+ * if a database problem occurs.
+ */
+ CSN purgeUpTo(final CSN purgeCSN) throws ChangelogException
+ {
+ if (isEmpty() || purgeCSN == null)
+ {
+ return null;
+ }
+
+ // TODO : no purge implemented yet as implementation is based on a single-file log.
+ // The purge must be implemented once we handle a log with multiple files.
+ // The purge will only delete whole files.
+ return null;
+ }
+
+ /**
+ * Implements the Monitoring capabilities of the FileChangeNumberIndexDB.
+ */
+ private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
+ {
+ /** {@inheritDoc} */
+ @Override
+ public List<Attribute> getMonitorData()
+ {
+ final List<Attribute> attributes = new ArrayList<Attribute>();
+ attributes.add(createChangeNumberAttribute(true));
+ attributes.add(createChangeNumberAttribute(false));
+ long numberOfChanges = 0;
+ try
+ {
+ numberOfChanges = count();
+ }
+ catch (ChangelogException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ }
+ }
+ attributes.add(Attributes.create("count", Long.toString(numberOfChanges)));
+ return attributes;
+ }
+
+ private Attribute createChangeNumberAttribute(final boolean isFirst)
+ {
+ final String attributeName = isFirst ? "first-draft-changenumber" : "last-draft-changenumber";
+ final String changeNumber = String.valueOf(getChangeNumber(isFirst));
+ return Attributes.create(attributeName, changeNumber);
+ }
+
+ private long getChangeNumber(final boolean isFirst)
+ {
+ try
+ {
+ final ChangeNumberIndexRecord record = isFirst ? readFirstRecord() : readLastRecord();
+ if (record != null)
+ {
+ return record.getChangeNumber();
+ }
+ }
+ catch (ChangelogException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ }
+ }
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getMonitorInstanceName()
+ {
+ return "ChangeNumber Index Database";
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initializeMonitorProvider(MonitorProviderCfg configuration)
+ throws ConfigException, InitializationException
+ {
+ // Nothing to do for now
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + ", newestChangeNumber=" + newestChangeNumber;
+ }
+
+ /**
+ * Clear the changes from this DB (from both memory cache and DB storage).
+ *
+ * @throws ChangelogException
+ * if a database problem occurs.
+ */
+ public void clear() throws ChangelogException
+ {
+ logFile.clear();
+ newestChangeNumber = NO_KEY;
+ }
+
+ /** Parser of records persisted in the FileChangeNumberIndex log. */
+ private static class ChangeNumberIndexDBParser implements RecordParser<Long, ChangeNumberIndexRecord>
+ {
+ private static final byte STRING_SEPARATOR = 0;
+
+ @Override
+ public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record)
+ {
+ return new ByteStringBuilder()
+ .append(changeNumber)
+ .append(record.getPreviousCookie())
+ .append(STRING_SEPARATOR)
+ .append(record.getBaseDN().toString())
+ .append(STRING_SEPARATOR)
+ .append(record.getCSN().toByteString()).toByteString();
+ }
+
+ @Override
+ public Record<Long, ChangeNumberIndexRecord> decodeRecord(final ByteString data) throws DecodingException
+ {
+ try
+ {
+ ByteSequenceReader reader = data.asReader();
+ Long changeNumber = reader.getLong();
+
+ String previousCookie = reader.getString(getNextStringLength(reader));
+ reader.skip(1);
+
+ String baseDN = reader.getString(getNextStringLength(reader));
+ reader.skip(1);
+ DN dn = DN.decode(baseDN);
+
+ ByteString csnBytes = reader.getByteString(reader.remaining());
+ CSN csn = CSN.valueOf(csnBytes);
+
+ return Record.from(changeNumber, new ChangeNumberIndexRecord(changeNumber, previousCookie, dn, csn));
+ }
+ catch (Exception e)
+ {
+ throw new DecodingException(e);
+ }
+ }
+
+ /** Returns the length of next string by looking for the zero byte used as separator. */
+ private int getNextStringLength(ByteSequenceReader reader)
+ {
+ int length = 0;
+ while (reader.peek(length) != STRING_SEPARATOR)
+ {
+ length++;
+ }
+ return length;
+ }
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
new file mode 100644
index 0000000..efe233b
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
@@ -0,0 +1,77 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+/**
+ * A cursor on ChangeNumberIndexDB.
+ * <p>
+ * The cursor initially points to a record, that is {@code cursor.getRecord()}
+ * is equals to the first record available from the cursor before any call to
+ * {@code cursor.next()} method.
+ */
+class FileChangeNumberIndexDBCursor implements DBCursor<ChangeNumberIndexRecord>
+{
+
+ /** The underlying cursor. */
+ private final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor;
+
+ /**
+ * Creates the cursor from provided cursor.
+ *
+ * @param cursor
+ * The underlying cursor to read log.
+ */
+ FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor) {
+ this.cursor = cursor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexRecord getRecord()
+ {
+ final Record<Long, ChangeNumberIndexRecord> record = cursor.getRecord();
+ return record != null ? record.getValue() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ return cursor.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ cursor.close();
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
new file mode 100644
index 0000000..e2b4f08
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -0,0 +1,809 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opends.messages.MessageBuilder;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.config.ConfigException;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
+import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
+
+import com.forgerock.opendj.util.Pair;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
+/**
+ * Log file implementation of the ChangelogDB interface.
+ */
+public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
+{
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
+ * This map contains the List of updates received from each LDAP server.
+ * <p>
+ * When removing a domainMap, code:
+ * <ol>
+ * <li>first get the domainMap</li>
+ * <li>synchronized on the domainMap</li>
+ * <li>remove the domainMap</li>
+ * <li>then check it's not null</li>
+ * <li>then close all inside</li>
+ * </ol>
+ * When creating a FileReplicaDB, synchronize on the domainMap to avoid
+ * concurrent shutdown.
+ */
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>>
+ domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+ private ReplicationEnvironment replicationEnv;
+ private final ReplicationServerCfg config;
+ private final File dbDirectory;
+
+ /**
+ * The handler of the changelog database, the database stores the relation
+ * between a change number and the associated cookie.
+ * <p>
+ * @GuardedBy("cnIndexDBLock")
+ */
+ private FileChangeNumberIndexDB cnIndexDB;
+ private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>();
+
+ /** Used for protecting {@link ChangeNumberIndexDB} related state. */
+ private final Object cnIndexDBLock = new Object();
+
+ /**
+ * The purge delay (in milliseconds). Records in the changelog DB that are
+ * older than this delay might be removed.
+ */
+ private long purgeDelayInMillis;
+ private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
+ private volatile long latestPurgeDate;
+
+ /** The local replication server. */
+ private final ReplicationServer replicationServer;
+ private final AtomicBoolean shutdown = new AtomicBoolean();
+
+ static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
+ new FileReplicaDBCursor(new LogFile.EmptyLogCursor<CSN, UpdateMsg>(), null);
+
+ /**
+ * Creates a new changelog DB.
+ *
+ * @param replicationServer
+ * the local replication server.
+ * @param config
+ * the replication server configuration
+ * @throws ConfigException
+ * if a problem occurs opening the supplied directory
+ */
+ public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+ throws ConfigException
+ {
+ this.config = config;
+ this.replicationServer = replicationServer;
+ this.dbDirectory = makeDir(config.getReplicationDBDirectory());
+ }
+
+ private File makeDir(final String dbDirName) throws ConfigException
+ {
+ // Check that this path exists or create it.
+ final File dbDirectory = getFileForPath(dbDirName);
+ try
+ {
+ if (!dbDirectory.exists())
+ {
+ dbDirectory.mkdir();
+ }
+ return dbDirectory;
+ }
+ catch (Exception e)
+ {
+ final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
+ .append(String.valueOf(dbDirectory));
+ throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
+ }
+ }
+
+ private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN)
+ {
+ final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
+ {
+ return domainMap;
+ }
+ return Collections.emptyMap();
+ }
+
+ private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId)
+ {
+ return getDomainMap(baseDN).get(serverId);
+ }
+
+ /**
+ * Returns a {@link FileReplicaDB}, possibly creating it.
+ *
+ * @param baseDN
+ * the baseDN for which to create a ReplicaDB
+ * @param serverId
+ * the serverId for which to create a ReplicaDB
+ * @param server
+ * the ReplicationServer
+ * @return a Pair with the FileReplicaDB and a boolean indicating whether it had
+ * to be created
+ * @throws ChangelogException
+ * if a problem occurred with the database
+ */
+ Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+ final ReplicationServer server) throws ChangelogException
+ {
+ while (!shutdown.get())
+ {
+ final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+ final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+ if (result != null)
+ {
+ return result;
+ }
+ }
+ throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+ }
+
+ private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
+ {
+ // happy path: the domainMap already exists
+ final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
+ if (currentValue != null)
+ {
+ return currentValue;
+ }
+
+ // unlucky, the domainMap does not exist: take the hit and create the
+ // newValue, even though the same could be done concurrently by another
+ // thread
+ final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<Integer, FileReplicaDB>();
+ final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+ if (previousValue != null)
+ {
+ // there was already a value associated to the key, let's use it
+ return previousValue;
+ }
+ return newValue;
+ }
+
+ private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
+ final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
+ {
+ // happy path: the FileReplicaDB already exists
+ FileReplicaDB currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
+
+ // unlucky, the FileReplicaDB does not exist: take the hit and synchronize
+ // on the domainMap to create a new ReplicaDB
+ synchronized (domainMap)
+ {
+ currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
+
+ if (domainToReplicaDBs.get(baseDN) != domainMap)
+ {
+ // The domainMap could have been concurrently removed because
+ // 1) a shutdown was initiated or 2) an initialize was called.
+ // Return will allow the code to:
+ // 1) shutdown properly or 2) lazily recreate the FileReplicaDB
+ return null;
+ }
+
+ final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, replicationEnv);
+ domainMap.put(serverId, newDB);
+ return Pair.of(newDB, true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initializeDB()
+ {
+ try
+ {
+ final File dbDir = getFileForPath(config.getReplicationDBDirectory());
+ replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
+ final ChangelogState changelogState = replicationEnv.readChangelogState();
+ initializeToChangelogState(changelogState);
+ if (config.isComputeChangeNumber())
+ {
+ startIndexer(changelogState);
+ }
+ setPurgeDelay(replicationServer.getPurgeDelay());
+ }
+ catch (ChangelogException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()));
+ }
+ }
+
+ private void initializeToChangelogState(final ChangelogState changelogState)
+ throws ChangelogException
+ {
+ for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
+ {
+ replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
+ }
+ for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+ {
+ for (int serverId : entry.getValue())
+ {
+ getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
+ }
+ }
+ }
+
+ private void shutdownChangeNumberIndexDB() throws ChangelogException
+ {
+ synchronized (cnIndexDBLock)
+ {
+ if (cnIndexDB != null)
+ {
+ cnIndexDB.shutdown();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void shutdownDB() throws ChangelogException
+ {
+ if (!this.shutdown.compareAndSet(false, true))
+ { // shutdown has already been initiated
+ return;
+ }
+
+ // Remember the first exception because :
+ // - we want to try to remove everything we want to remove
+ // - then throw the first encountered exception
+ ChangelogException firstException = null;
+
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
+ if (indexer != null)
+ {
+ indexer.initiateShutdown();
+ indexer.interrupt();
+ }
+ final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ purger.interrupt();
+ }
+
+ try
+ {
+ shutdownChangeNumberIndexDB();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+
+ for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
+ this.domainToReplicaDBs.values().iterator(); it.hasNext();)
+ {
+ final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next();
+ synchronized (domainMap)
+ {
+ it.remove();
+ for (FileReplicaDB replicaDB : domainMap.values())
+ {
+ replicaDB.shutdown();
+ }
+ }
+ }
+
+ if (replicationEnv != null)
+ {
+ // wait for shutdown of the threads holding cursors
+ try
+ {
+ if (indexer != null)
+ {
+ indexer.join();
+ }
+ if (purger != null)
+ {
+ purger.join();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing: we are already shutting down
+ }
+ replicationEnv.shutdown();
+ }
+
+ if (firstException != null)
+ {
+ throw firstException;
+ }
+ }
+
+ /**
+ * Clears all records from the changelog (does not remove the log itself).
+ *
+ * @throws ChangelogException
+ * If an error occurs when clearing the log.
+ */
+ public void clearDB() throws ChangelogException
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugError("clear the FileChangelogDB");
+ }
+ if (!dbDirectory.exists())
+ {
+ return;
+ }
+
+ // Remember the first exception because :
+ // - we want to try to remove everything we want to remove
+ // - then throw the first encountered exception
+ ChangelogException firstException = null;
+
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.clear();
+ }
+
+ for (DN baseDN : this.domainToReplicaDBs.keySet())
+ {
+ removeDomain(baseDN);
+ }
+
+ synchronized (cnIndexDBLock)
+ {
+ if (cnIndexDB != null)
+ {
+ try
+ {
+ cnIndexDB.clear();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+
+ try
+ {
+ shutdownChangeNumberIndexDB();
+ }
+ catch (ChangelogException e)
+ {
+ if (firstException == null)
+ {
+ firstException = e;
+ }
+ else if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+
+ cnIndexDB = null;
+ }
+ }
+
+ if (firstException != null)
+ {
+ throw firstException;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void removeDB() throws ChangelogException
+ {
+ shutdownDB();
+ StaticUtils.recursiveDelete(dbDirectory);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ServerState getDomainOldestCSNs(DN baseDN)
+ {
+ final ServerState result = new ServerState();
+ for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
+ {
+ result.update(replicaDB.getOldestCSN());
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ServerState getDomainNewestCSNs(DN baseDN)
+ {
+ final ServerState result = new ServerState();
+ for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
+ {
+ result.update(replicaDB.getNewestCSN());
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void removeDomain(DN baseDN) throws ChangelogException
+ {
+ // Remember the first exception because :
+ // - we want to try to remove everything we want to remove
+ // - then throw the first encountered exception
+ ChangelogException firstException = null;
+
+ // 1- clear the replica DBs
+ Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
+ {
+ synchronized (domainMap)
+ {
+ domainMap = domainToReplicaDBs.remove(baseDN);
+ for (FileReplicaDB replicaDB : domainMap.values())
+ {
+ try
+ {
+ replicaDB.clear();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+ replicaDB.shutdown();
+ }
+ }
+ }
+
+
+ // 2- clear the changelogstate DB
+ try
+ {
+ replicationEnv.clearGenerationId(baseDN);
+ }
+ catch (ChangelogException e)
+ {
+ if (firstException == null)
+ {
+ firstException = e;
+ }
+ else if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+
+ if (firstException != null)
+ {
+ throw firstException;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setPurgeDelay(final long purgeDelayInMillis)
+ {
+ this.purgeDelayInMillis = purgeDelayInMillis;
+ final ChangelogDBPurger purger;
+ if (purgeDelayInMillis > 0)
+ {
+ purger = new ChangelogDBPurger();
+ if (cnPurger.compareAndSet(null, purger))
+ {
+ purger.start();
+ } // otherwise a purger was already running
+ }
+ else
+ {
+ purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setComputeChangeNumber(final boolean computeChangeNumber)
+ throws ChangelogException
+ {
+ if (computeChangeNumber)
+ {
+ startIndexer(replicationEnv.readChangelogState());
+ }
+ else
+ {
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
+ if (indexer != null)
+ {
+ indexer.initiateShutdown();
+ }
+ }
+ }
+
+ private void startIndexer(final ChangelogState changelogState)
+ {
+ final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
+ if (cnIndexer.compareAndSet(null, indexer))
+ {
+ indexer.start();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getDomainLatestTrimDate(final DN baseDN)
+ {
+ return latestPurgeDate;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexDB getChangeNumberIndexDB()
+ {
+ synchronized (cnIndexDBLock)
+ {
+ if (cnIndexDB == null)
+ {
+ try
+ {
+ cnIndexDB = new FileChangeNumberIndexDB(replicationEnv);
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
+ }
+ }
+ return cnIndexDB;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReplicationDomainDB getReplicationDomainDB()
+ {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
+ throws ChangelogException
+ {
+ final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
+ final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
+ for (int serverId : serverIds)
+ {
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
+ cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
+ }
+ return new CompositeDBCursor<Void>(cursors, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
+ throws ChangelogException
+ {
+ final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ if (replicaDB != null)
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
+ cursor.next();
+ return cursor;
+ }
+ return EMPTY_CURSOR_REPLICA_DB;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+ {
+ final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+ updateMsg.getCSN().getServerId(), replicationServer);
+ final FileReplicaDB replicaDB = pair.getFirst();
+ final boolean wasCreated = pair.getSecond();
+
+ replicaDB.add(updateMsg);
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.publishUpdateMsg(baseDN, updateMsg);
+ }
+ return wasCreated;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN)
+ {
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.publishHeartbeat(baseDN, heartbeatCSN);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void replicaOffline(final DN baseDN, final CSN offlineCSN)
+ {
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.replicaOffline(baseDN, offlineCSN);
+ }
+ // TODO save this state in the changelogStateDB?
+ }
+
+ /**
+ * The thread purging the changelogDB on a regular interval. Records are
+ * purged from the changelogDB if they are older than a delay specified in
+ * seconds. The purge process works in two steps:
+ * <ol>
+ * <li>first purge the changeNumberIndexDB and retrieve information to drive
+ * replicaDBs purging</li>
+ * <li>proceed to purge each replicaDBs based on the information collected
+ * when purging the changeNumberIndexDB</li>
+ * </ol>
+ */
+ private final class ChangelogDBPurger extends DirectoryThread
+ {
+ private static final int DEFAULT_SLEEP = 500;
+
+ protected ChangelogDBPurger()
+ {
+ super("changelog DB purger");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run()
+ {
+ // initialize CNIndexDB
+ getChangeNumberIndexDB();
+ while (!isShutdownInitiated())
+ {
+ try
+ {
+ final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+ final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+ final CSN oldestNotPurgedCSN;
+
+ // next code assumes that the compute-change-number config
+ // never changes during the life time of an RS
+ if (!config.isComputeChangeNumber())
+ {
+ oldestNotPurgedCSN = purgeCSN;
+ }
+ else
+ {
+ final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been initiated
+ return;
+ }
+
+ oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+ if (oldestNotPurgedCSN == null)
+ { // shutdown may have been initiated...
+ if (!isShutdownInitiated())
+ {
+ // ... or the change number index DB is empty,
+ // wait for new changes to come in.
+
+ // Note we cannot sleep for as long as the purge delay
+ // (3 days default), because we might receive late updates
+ // that will have to be purged before the purge delay elapses.
+ // This can particularly happen in case of network partitions.
+ sleep(DEFAULT_SLEEP);
+ }
+ continue;
+ }
+ }
+
+ for (final Map<Integer, FileReplicaDB> domainMap: domainToReplicaDBs.values())
+ {
+ for (final FileReplicaDB replicaDB : domainMap.values())
+ {
+ replicaDB.purgeUpTo(oldestNotPurgedCSN);
+ }
+ }
+
+ latestPurgeDate = purgeTimestamp;
+
+ sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown initiated?
+ }
+ catch (Exception e)
+ {
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e)));
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ }
+ }
+ }
+
+ private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+ {
+ final long nextPurgeTime = notPurgedCSN.getTime();
+ final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+ if (currentPurgeTime <= nextPurgeTime)
+ {
+ // sleep till the next CSN to purge,
+ return nextPurgeTime - currentPurgeTime;
+ }
+ // wait a bit before purging more
+ return DEFAULT_SLEEP;
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
new file mode 100644
index 0000000..1526830
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -0,0 +1,405 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.ReplicationServerDomain;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.InitializationException;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+/**
+ * Represents a replication server database for one server in the topology.
+ * <p>
+ * It is responsible for efficiently saving the updates that is received from
+ * each master server into stable storage.
+ * <p>
+ * It is also able to generate a {@link DBCursor} that can be used to
+ * read all changes from a given {@link CSN}.
+ * <p>
+ * It publishes some monitoring information below cn=monitor.
+ */
+class FileReplicaDB
+{
+
+ /** The parser of records stored in Replica DB. */
+ static final RecordParser<CSN, UpdateMsg> RECORD_PARSER = new ReplicaDBParser();
+
+ /**
+ * Class that allows atomically setting oldest and newest CSNs without
+ * synchronization.
+ *
+ * @Immutable
+ */
+ private static final class CSNLimits
+ {
+ private final CSN oldestCSN;
+ private final CSN newestCSN;
+
+ public CSNLimits(CSN oldestCSN, CSN newestCSN)
+ {
+ this.oldestCSN = oldestCSN;
+ this.newestCSN = newestCSN;
+ }
+ }
+
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ /** The log in which records are persisted. */
+ private final LogFile<CSN, UpdateMsg> logFile;
+
+ /**
+ * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
+ *
+ * @NonNull
+ */
+ private volatile CSNLimits csnLimits;
+
+ private final int serverId;
+
+ private final DN baseDN;
+
+ private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
+
+ private final ReplicationServer replicationServer;
+
+ private final ReplicationEnvironment replicationEnv;
+
+ /**
+ * Creates a new ReplicaDB associated to a given LDAP server.
+ *
+ * @param serverId
+ * Id of this server.
+ * @param baseDN
+ * the replication domain baseDN.
+ * @param replicationServer
+ * The ReplicationServer that creates this ReplicaDB.
+ * @param replicationEnv
+ * the Database Env to use to create the ReplicationServer DB. server
+ * for this domain.
+ * @throws ChangelogException
+ * If a database problem happened
+ */
+ FileReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
+ final ReplicationEnvironment replicationEnv) throws ChangelogException
+ {
+ this.serverId = serverId;
+ this.baseDN = baseDN;
+ this.replicationServer = replicationServer;
+ this.replicationEnv = replicationEnv;
+ this.logFile = createLogFile(replicationEnv);
+ this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
+
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ DirectoryServer.registerMonitorProvider(dbMonitor);
+ }
+
+ private CSN readOldestCSN() throws ChangelogException
+ {
+ final Record<CSN, UpdateMsg> record = logFile.getOldestRecord();
+ return record == null ? null : record.getKey();
+ }
+
+ private CSN readNewestCSN() throws ChangelogException
+ {
+ final Record<CSN, UpdateMsg> record = logFile.getNewestRecord();
+ return record == null ? null : record.getKey();
+ }
+
+ private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException
+ {
+ final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
+ return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
+ }
+
+ /**
+ * Adds a new message.
+ *
+ * @param updateMsg
+ * The update message to add.
+ * @throws ChangelogException
+ * If an error occurs when trying to add the message.
+ */
+ void add(final UpdateMsg updateMsg) throws ChangelogException
+ {
+ if (shutdown.get())
+ {
+ throw new ChangelogException(
+ ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
+ .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
+ }
+ logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg));
+ final CSNLimits limits = csnLimits;
+ final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
+ final boolean updateOld = limits.oldestCSN == null;
+ if (updateOld || updateNew)
+ {
+ csnLimits = new CSNLimits(
+ updateOld ? updateMsg.getCSN() : limits.oldestCSN,
+ updateNew ? updateMsg.getCSN() : limits.newestCSN);
+ }
+ }
+
+ /**
+ * Get the oldest CSN that has not been purged yet.
+ *
+ * @return the oldest CSN that has not been purged yet.
+ */
+ CSN getOldestCSN()
+ {
+ return csnLimits.oldestCSN;
+ }
+
+ /**
+ * Get the newest CSN that has not been purged yet.
+ *
+ * @return the newest CSN that has not been purged yet.
+ */
+ CSN getNewestCSN()
+ {
+ return csnLimits.newestCSN;
+ }
+
+ /**
+ * Get the number of changes.
+ *
+ * @return Returns the number of changes.
+ */
+ long getChangesCount()
+ {
+ final CSNLimits limits = csnLimits;
+ if (limits.newestCSN != null && limits.oldestCSN != null)
+ {
+ return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
+ }
+ return 0;
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the update messages from this DB,
+ * starting at the position defined by the smallest CSN that is strictly
+ * higher than the provided CSN.
+ * <p>
+ * The returned cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals {@code null} before any call to
+ * {@code cursor.next()} method.
+ *
+ * @param startAfterCSN
+ * The position where the cursor must start. If null, start from the
+ * oldest CSN
+ * @return a new {@link DBCursor} to retreive update messages.
+ * @throws ChangelogException
+ * if a database problem happened
+ */
+ DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
+ {
+ LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN);
+ return new FileReplicaDBCursor(cursor, startAfterCSN);
+ }
+
+ /**
+ * Shutdown this ReplicaDB.
+ */
+ void shutdown()
+ {
+ if (shutdown.compareAndSet(false, true))
+ {
+ logFile.close();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
+ }
+ }
+
+ /**
+ * Synchronously purge changes older than purgeCSN from this replicaDB.
+ *
+ * @param purgeCSN
+ * The CSN up to which changes can be purged. No purging happens when
+ * it is {@code null}.
+ * @throws ChangelogException
+ * In case of database problem.
+ */
+ void purgeUpTo(final CSN purgeCSN) throws ChangelogException
+ {
+ if (purgeCSN == null)
+ {
+ return;
+ }
+
+ // TODO : no purge implemented yet, as we have a single-file log.
+ // The purge must be implemented once we handle a log with multiple files.
+ // The purge will only delete whole files.
+ }
+
+ /**
+ * Implements monitoring capabilities of the ReplicaDB.
+ */
+ private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
+ {
+ /** {@inheritDoc} */
+ @Override
+ public List<Attribute> getMonitorData()
+ {
+ final List<Attribute> attributes = new ArrayList<Attribute>();
+ create(attributes, "replicationServer-database",String.valueOf(serverId));
+ create(attributes, "domain-name", baseDN.toNormalizedString());
+ final CSNLimits limits = csnLimits;
+ if (limits.oldestCSN != null)
+ {
+ create(attributes, "first-change", encode(limits.oldestCSN));
+ }
+ if (limits.newestCSN != null)
+ {
+ create(attributes, "last-change", encode(limits.newestCSN));
+ }
+ return attributes;
+ }
+
+ private void create(final List<Attribute> attributes, final String name, final String value)
+ {
+ attributes.add(Attributes.create(name, value));
+ }
+
+ private String encode(final CSN csn)
+ {
+ return csn + " " + new Date(csn.getTime());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getMonitorInstanceName()
+ {
+ ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
+ return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initializeMonitorProvider(MonitorProviderCfg configuration)
+ throws ConfigException,InitializationException
+ {
+ // Nothing to do for now
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ final CSNLimits limits = csnLimits;
+ return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " + limits.oldestCSN + " "
+ + limits.newestCSN;
+ }
+
+ /**
+ * Clear the changes from this DB, from both memory cache and persistent
+ * storage.
+ *
+ * @throws ChangelogException
+ * If an error occurs while removing the changes from the DB.
+ */
+ void clear() throws ChangelogException
+ {
+ // Remove all persisted data and reset generationId to default value
+ logFile.clear();
+ replicationEnv.resetGenerationId(baseDN);
+
+ csnLimits = new CSNLimits(null, null);
+ }
+
+ /**
+ * Return the number of records of this replicaDB.
+ * <p>
+ * For test purpose.
+ *
+ * @return The number of records of this replicaDB.
+ * @throws ChangelogException
+ * If an error occurs
+ */
+ long getNumberRecords() throws ChangelogException
+ {
+ return logFile.getNumberOfRecords();
+ }
+
+ /** Parser of records persisted in the ReplicaDB log. */
+ private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
+ {
+ private static final DebugTracer TRACER = getTracer();
+
+ @Override
+ public ByteString encodeRecord(CSN key, UpdateMsg message)
+ {
+ try
+ {
+ return ByteString.wrap(message.getBytes());
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // should never happen
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ return ByteString.empty();
+ }
+ }
+
+ @Override
+ public Record<CSN, UpdateMsg> decodeRecord(final ByteString data) throws DecodingException
+ {
+ try
+ {
+ final UpdateMsg msg =
+ (UpdateMsg) UpdateMsg.generateMsg(data.toByteArray(), ProtocolVersion.REPLICATION_PROTOCOL_V7);
+ return Record.from(msg.getCSN(), msg);
+ }
+ catch (Exception e)
+ {
+ throw new DecodingException(e);
+ }
+ }
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
new file mode 100644
index 0000000..3f3a2fa
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -0,0 +1,114 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+
+/**
+ * A cursor on ReplicaDB.
+ * <p>
+ * This cursor behaves specially in two ways :
+ * <ul>
+ * <li>The cursor initially points to a {@code null} value: the
+ * {@code getRecord()} method return {@code null} if called before any call to
+ * {@code next()} method.</li>
+ * <li>The cursor automatically re-initializes itself if it is exhausted: when
+ * exhausted, the cursor re-position itself to the last non null CSN previously
+ * read.
+ * <li>
+ * </ul>
+ */
+class FileReplicaDBCursor implements DBCursor<UpdateMsg>
+{
+
+ /** The underlying cursor. */
+ private final LogCursor<CSN, UpdateMsg> cursor;
+
+ /** The next record to return. */
+ private Record<CSN, UpdateMsg> nextRecord;
+
+ /** The CSN to re-start with in case the cursor is exhausted. */
+ private CSN lastNonNullCurrentCSN;
+
+ /**
+ * Creates the cursor from provided log cursor and start CSN.
+ *
+ * @param cursor
+ * The underlying log cursor to read log.
+ * @param startAfterCSN
+ * The CSN to use as a start point (excluded from cursor, the lowest
+ * CSN higher than this CSN is used as the real start point).
+ */
+ FileReplicaDBCursor(LogCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
+ this.cursor = cursor;
+ this.lastNonNullCurrentCSN = startAfterCSN;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return nextRecord == null ? null : nextRecord.getValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ nextRecord = cursor.getRecord();
+ if (nextRecord != null)
+ {
+ lastNonNullCurrentCSN = nextRecord.getKey();
+ }
+ else
+ {
+ // Exhausted cursor must be able to reinitialize itself
+ cursor.rewind();
+ cursor.positionTo(lastNonNullCurrentCSN, true);
+
+ nextRecord = cursor.getRecord();
+ if (nextRecord != null)
+ {
+ lastNonNullCurrentCSN = nextRecord.getKey();
+ }
+ }
+ // the underlying cursor is one record in advance
+ cursor.next();
+ return nextRecord != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ cursor.close();
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
new file mode 100644
index 0000000..005c8c7
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -0,0 +1,911 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.opends.messages.Message;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * A file-based log that allow to append key-value records and
+ * read them using a {@code DBCursor}.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ */
+final class LogFile<K extends Comparable<K>, V> implements Closeable
+{
+
+ private static final DebugTracer TRACER = getTracer();
+
+ // Non private for use in tests
+ static final String LOG_FILE_NAME = "current.log";
+
+ /** The path of directory that contains the log file. */
+ private final File rootPath;
+
+ /** The log file containing the records. */
+ private final File logfile;
+
+ /** The parser of records, to convert bytes to record and record to bytes. */
+ private final RecordParser<K, V> parser;
+
+ /** The pool to obtain a reader on the log. */
+ private LogReaderPool readerPool;
+
+ /** The writer on the log, which may be {@code null} if log is not write-enabled */
+ private LogWriter writer;
+
+ /** Indicates if log is enabled for write. */
+ private final boolean isWriteEnabled;
+
+ /** Indicates if the log is closed. */
+ private volatile boolean isClosed;
+
+ /** The exclusive lock used for wide changes on this log file : init, clear, sync and close. */
+ private final Lock exclusiveLock;
+
+ /**
+ * The shared lock used for read, write and flush operations on this log file.
+ * Write and flush operations can be shared because they're synchronized in the underlying writer.
+ * Reads can be done safely when writing because partially written records are handled.
+ */
+ private final Lock sharedLock;
+
+ /**
+ * Creates a new log file.
+ *
+ * @param rootPath
+ * Path of root directory that contains the log file.
+ * @param parser
+ * Parser of records.
+ * @param isWriteEnabled
+ * {@code true} if this changelog is write-enabled, {@code false}
+ * otherwise.
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ private LogFile(final File rootPath, final RecordParser<K, V> parser, boolean isWriteEnabled)
+ throws ChangelogException
+ {
+ this.rootPath = rootPath;
+ this.parser = parser;
+ this.isWriteEnabled = isWriteEnabled;
+ this.logfile = new File(rootPath, LOG_FILE_NAME);
+
+ final ReadWriteLock lock = new ReentrantReadWriteLock(false);
+ this.exclusiveLock = lock.writeLock();
+ this.sharedLock = lock.readLock();
+
+ initialize();
+ }
+
+ /**
+ * Creates a read-only log file with the provided root path and record parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param rootPath
+ * Path of root directory that contains the log file.
+ * @param parser
+ * Parser of records.
+ * @return a read-only log file
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ public static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File rootPath,
+ final RecordParser<K, V> parser) throws ChangelogException
+ {
+ return new LogFile<K, V>(rootPath, parser, false);
+ }
+
+ /**
+ * Creates a write-enabled log file that appends records to the end of file,
+ * with the provided root path and record parser.
+ *
+ * @param <K>
+ * Type of the key of a record, which must be comparable.
+ * @param <V>
+ * Type of the value of a record.
+ * @param rootPath
+ * Path of root directory that contains the log file.
+ * @param parser
+ * Parser of records.
+ * @return a write-enabled log file
+ * @throws ChangelogException
+ * If a problem occurs during initialization.
+ */
+ public static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File rootPath,
+ final RecordParser<K, V> parser) throws ChangelogException
+ {
+ return new LogFile<K, V>(rootPath, parser, true);
+ }
+
+ /**
+ * Initialize this log.
+ * <p>
+ * Create directories and file if necessary, and create a writer
+ * and pool of readers.
+ *
+ * @throws ChangelogException
+ * If an errors occurs during initialization.
+ */
+ private void initialize() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ createRootDirIfNotExists();
+ createLogFileIfNotExists();
+ isClosed = false;
+ if (isWriteEnabled)
+ {
+ writer = LogWriter.acquireWriter(logfile);
+ }
+ readerPool = new LogReaderPool(logfile);
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the name of this log.
+ *
+ * @return the name, which corresponds to the directory containing the log
+ */
+ public String getName()
+ {
+ return logfile.getParent().toString();
+ }
+
+ /**
+ * Empties the log, discarding all records it contains.
+ * <p>
+ * This method should not be called with open cursors or
+ * when multiple instances of the log are opened.
+ *
+ * @throws ChangelogException
+ * If a problem occurs.
+ */
+ public void clear() throws ChangelogException
+ {
+ checkLogIsEnabledForWrite();
+
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ close();
+ final boolean isDeleted = logfile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(logfile.getPath()));
+ }
+ initialize();
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(getName(), stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ private void checkLogIsEnabledForWrite() throws ChangelogException
+ {
+ if (!isWriteEnabled)
+ {
+ throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(rootPath.getPath()));
+ }
+ }
+
+ private void createRootDirIfNotExists() throws ChangelogException
+ {
+ if (!rootPath.exists())
+ {
+ final boolean created = rootPath.mkdirs();
+ if (!created)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(rootPath.getPath()));
+ }
+ }
+ }
+
+ private void createLogFileIfNotExists() throws ChangelogException
+ {
+ try
+ {
+ if (!logfile.exists())
+ {
+ logfile.createNewFile();
+ }
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE.get(logfile.getPath()), e);
+ }
+ }
+
+ /**
+ * Add a record at the end of this log from the provided key and value.
+ * <p>
+ * In order to ensure that record is written out of buffers and persisted
+ * to file system, it is necessary to explicitely call the
+ * {@code syncToFileSystem()} method.
+ *
+ * @param key
+ * The key of the record.
+ * @param value
+ * The value of the record.
+ * @throws ChangelogException
+ * If the record can't be added to the log.
+ */
+ public void addRecord(final K key, final V value) throws ChangelogException
+ {
+ addRecord(Record.from(key, value));
+ }
+
+ /**
+ * Add the provided record at the end of this log.
+ * <p>
+ * In order to ensure that record is written out of buffers and persisted
+ * to file system, it is necessary to explicitely call the
+ * {@code syncToFileSystem()} method.
+ *
+ * @param record
+ * The record to add.
+ * @throws ChangelogException
+ * If the record can't be added to the log.
+ */
+ public void addRecord(final Record<K, V> record) throws ChangelogException
+ {
+ checkLogIsEnabledForWrite();
+
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ writer.write(encodeRecord(record));
+ writer.flush();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getName()), e);
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ private ByteString encodeRecord(final Record<K, V> record)
+ {
+ final ByteString data = parser.encodeRecord(record.getKey(), record.getValue());
+ return new ByteStringBuilder()
+ .append(data.length())
+ .append(data)
+ .toByteString();
+ }
+
+ /**
+ * Dump this log as text file, intended for debugging purpose only.
+ *
+ * @param dumpFile
+ * File that will contains log records using a human-readable format
+ * @throws ChangelogException
+ * If an error occurs during dump
+ */
+ public void dumpAsTextFile(File dumpFile) throws ChangelogException
+ {
+ DBCursor<Record<K, V>> cursor = getCursor();
+ BufferedWriter textWriter = null;
+ try
+ {
+ textWriter = new BufferedWriter(new FileWriter(dumpFile));
+ while (cursor.getRecord() != null)
+ {
+ Record<K, V> record = cursor.getRecord();
+ textWriter.write("key=" + record.getKey());
+ textWriter.write(" -- ");
+ textWriter.write("value=" + record.getValue());
+ textWriter.write('\n');
+ cursor.next();
+ }
+ }
+ catch (IOException e)
+ {
+ // No I18N needed, used for debugging purpose only
+ throw new ChangelogException(
+ Message.raw("Error when dumping content of log '%s' in target file : '%s'", getName(), dumpFile), e);
+ }
+ finally
+ {
+ StaticUtils.close(textWriter);
+ }
+ }
+
+ /**
+ * Synchronize all records added with the file system, ensuring that records
+ * are effectively persisted.
+ * <p>
+ * After a successful call to this method, it is guaranteed that all records
+ * added to the log are persisted to the file system.
+ *
+ * @throws ChangelogException
+ * If the synchronization fails.
+ */
+ public void syncToFileSystem() throws ChangelogException
+ {
+ exclusiveLock.lock();
+ try
+ {
+ writer.sync();
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getName()), e);
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the first position.
+ * <p>
+ * The returned cursor initially points to record corresponding to the first
+ * key, that is {@code cursor.getRecord()} is equals to the record
+ * corresponding to the first key before any call to {@code cursor.next()}
+ * method.
+ *
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public LogCursor<K, V> getCursor() throws ChangelogException
+ {
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return new EmptyLogCursor<K, V>();
+ }
+ return new LogFileCursor<K, V>(this);
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the position defined by the provided key.
+ * <p>
+ * The returned cursor initially points to record corresponding to the key,
+ * that is {@code cursor.getRecord()} is equals to the record corresponding to
+ * the key before any call to {@code cursor.next()} method.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, cursor will point at the first record of the log.
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public LogCursor<K, V> getCursor(final K key) throws ChangelogException
+ {
+ return getCursor(key, false);
+ }
+
+ /**
+ * Returns a cursor that allows to retrieve the records from this log,
+ * starting at the position defined by the smallest key that is higher than
+ * the provided key.
+ * <p>
+ * The returned cursor initially points to record corresponding to the key
+ * found, that is {@code cursor.getRecord()} is equals to the record
+ * corresponding to the key found before any call to {@code cursor.next()}
+ * method.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, cursor will point at the first record of the log.
+ * @return a cursor on the log records, which is never {@code null}
+ * @throws ChangelogException
+ * If the cursor can't be created.
+ */
+ public LogCursor<K, V> getNearestCursor(final K key) throws ChangelogException
+ {
+ return getCursor(key, true);
+ }
+
+ /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
+ private LogCursor<K, V> getCursor(final K key, boolean findNearest)
+ throws ChangelogException
+ {
+ if (key == null)
+ {
+ return getCursor();
+ }
+ LogFileCursor<K, V> cursor = null;
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return new EmptyLogCursor<K, V>();
+ }
+ cursor = new LogFileCursor<K, V>(this);
+ cursor.positionTo(key, findNearest);
+ // if target is not found, cursor is positioned at end of stream
+ return cursor;
+ }
+ catch (ChangelogException e) {
+ StaticUtils.close(cursor);
+ throw e;
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the oldest (first) record from this log.
+ *
+ * @return the oldest record, which may be {@code null} if there is no record
+ * in the log.
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ public Record<K, V> getOldestRecord() throws ChangelogException
+ {
+ DBCursor<Record<K, V>> cursor = null;
+ try
+ {
+ cursor = getCursor();
+ return cursor.getRecord();
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ /**
+ * Returns the newest (last) record from this log.
+ *
+ * @return the newest record, which may be {@code null}
+ * @throws ChangelogException
+ * If an error occurs while retrieving the record.
+ */
+ public Record<K, V> getNewestRecord() throws ChangelogException
+ {
+ // TODO : need a more efficient way to retrieve it
+ DBCursor<Record<K, V>> cursor = null;
+ try
+ {
+ cursor = getCursor();
+ Record<K, V> record = cursor.getRecord();
+ while (cursor.next())
+ {
+ record = cursor.getRecord();
+ }
+ return record;
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ /**
+ * Returns the number of records in the log.
+ *
+ * @return the number of records
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ long getNumberOfRecords() throws ChangelogException
+ {
+ // TODO : need a more efficient way to retrieve it
+ DBCursor<Record<K, V>> cursor = null;
+ try
+ {
+ cursor = getCursor();
+ Record<K, V> record = cursor.getRecord();
+ if (record == null)
+ {
+ return 0L;
+ }
+ long counter = 1L;
+ while (cursor.next())
+ {
+ record = cursor.getRecord();
+ counter++;
+ }
+ return counter;
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close()
+ {
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+
+ if (isWriteEnabled)
+ {
+ try
+ {
+ syncToFileSystem();
+ }
+ catch (ChangelogException e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ writer.close();
+ }
+ readerPool.shutdown();
+ isClosed = true;
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
+ }
+
+ /** Read a record from the provided reader. */
+ private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
+ {
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return null;
+ }
+ final ByteString recordData = readEncodedRecord(reader);
+ return recordData != null ? parser.decodeRecord(recordData) : null;
+ }
+ catch(DecodingException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
+ {
+ try
+ {
+ final byte[] lengthData = new byte[4];
+ reader.readFully(lengthData);
+ int recordLength = ByteString.wrap(lengthData).toInt();
+
+ final byte[] recordData = new byte[recordLength];
+ reader.readFully(recordData);
+ return ByteString.wrap(recordData);
+ }
+ catch(EOFException e)
+ {
+ // end of stream, no record or uncomplete record
+ return null;
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
+ }
+ }
+
+ /** Seek to provided position on the provided reader. */
+ private void seek(RandomAccessFile reader, long position) throws ChangelogException
+ {
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ reader.seek(position);
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a random access file to read this log.
+ * <p>
+ * Assumes that calling methods ensure that log is not closed.
+ */
+ private RandomAccessFile getReader() throws ChangelogException
+ {
+ return readerPool.get();
+ }
+
+ /** Release the provided reader. */
+ private void releaseReader(RandomAccessFile reader) {
+ sharedLock.lock();
+ try
+ {
+ if (isClosed)
+ {
+ return;
+ }
+ readerPool.release(reader);
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+ }
+
+ /**
+ * A cursor on the log.
+ */
+ static interface LogCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+ {
+ /**
+ * Position the cursor to the record corresponding to the provided key or to
+ * the nearest key (the lowest key higher than the provided key).
+ * <p>
+ * The record is only searched forward. To search backward, it is first
+ * necessary to call the {@code rewind()} method to start from beginning of
+ * log file.
+ *
+ * @param key
+ * Key to use as a start position for the cursor. If key is
+ * {@code null}, use the key of the first record instead.
+ * @param findNearest
+ * If {@code true}, start position is the lowest key that is higher
+ * than the provided key, otherwise start position is the provided
+ * key.
+ * @return {@code true} if cursor is successfully positionned to the key or
+ * the the nearest key, {@code false} otherwise.
+ * @throws ChangelogException
+ * If an error occurs when positioning cursor.
+ */
+ boolean positionTo(K key, boolean findNearest) throws ChangelogException;
+
+ /**
+ * Rewind the cursor, positioning it to the beginning of the log file,
+ * pointing to no record initially.
+ *
+ * @throws ChangelogException
+ * If an error occurs when rewinding to zero.
+ */
+ void rewind() throws ChangelogException;
+ }
+
+ /**
+ * Implements a cursor on the log.
+ * <p>
+ * The cursor initially points to a record, that is {@code cursor.getRecord()}
+ * is equals to the first record available from the cursor before any call to
+ * {@code cursor.next()} method.
+ */
+ private static final class LogFileCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
+ {
+ /** The underlying log on which entries are read. */
+ private final LogFile<K, V> logFile;
+
+ /** To read the records. */
+ private final RandomAccessFile reader;
+
+ /** The current available record, may be {@code null}. */
+ private Record<K,V> currentRecord;
+
+ /**
+ * Creates a cursor on the provided log.
+ *
+ * @param logFile
+ * The log on which the cursor read records.
+ * @throws ChangelogException
+ * If an error occurs when creating the cursor.
+ */
+ LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
+ {
+ this.logFile = logFile;
+ this.reader = logFile.getReader();
+ try
+ {
+ // position to the first record.
+ next();
+ }
+ catch (ChangelogException e)
+ {
+ close();
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public String toString()
+ {
+ return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Record<K,V> getRecord()
+ {
+ return currentRecord;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ currentRecord = logFile.readRecord(reader);
+ return currentRecord != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
+ do
+ {
+ if (currentRecord != null)
+ {
+ final boolean matches = findNearest ?
+ currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key);
+ if (matches)
+ {
+ if (findNearest && currentRecord.getKey().equals(key))
+ {
+ // skip key in order to position on lowest higher key
+ next();
+ }
+ return true;
+ }
+ }
+ next();
+ }
+ while (currentRecord != null);
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void rewind() throws ChangelogException
+ {
+ logFile.seek(reader, 0);
+ currentRecord = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ logFile.releaseReader(reader);
+ }
+ }
+
+ /** An empty cursor, that always return null records and false to {@code next()} method. */
+ static final class EmptyLogCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
+ {
+ /** {@inheritDoc} */
+ @Override
+ public Record<K,V> getRecord()
+ {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next()
+ {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
+ {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void rewind() throws ChangelogException
+ {
+ // nothing to do
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ // nothing to do
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "EmptyLogCursor";
+ }
+
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
new file mode 100644
index 0000000..b5aed97
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
@@ -0,0 +1,107 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * A Pool of readers to a log file.
+ */
+// TODO : implement a real pool - reusing readers instead of opening-closing them each time
+class LogReaderPool
+{
+ /** The file to read. */
+ private final File file;
+
+ /**
+ * Creates a pool of readers for provided file.
+ *
+ * @param file
+ * The file to read.
+ */
+ LogReaderPool(File file)
+ {
+ this.file = file;
+ }
+
+ /**
+ * Returns a random access reader on the provided file.
+ * <p>
+ * The acquired reader must be released with the {@code release()}
+ * method.
+ *
+ * @return a random access reader
+ * @throws ChangelogException
+ * If the file can't be found or read.
+ */
+ RandomAccessFile get() throws ChangelogException
+ {
+ return getRandomAccess(file);
+ }
+
+ /**
+ * Release the provided reader.
+ * <p>
+ * Once released, this reader must not be used any more.
+ *
+ * @param reader
+ * The random access reader to a file previously acquired with this
+ * pool.
+ */
+ void release(RandomAccessFile reader)
+ {
+ StaticUtils.close(reader);
+ }
+
+ /** Returns a random access file to read this log. */
+ private RandomAccessFile getRandomAccess(File file) throws ChangelogException
+ {
+ try
+ {
+ return new RandomAccessFile(file, "r");
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_READER_ON_LOG_FILE.get(file.getPath()), e);
+ }
+ }
+
+ /**
+ * Shutdown this pool, releasing all files handles opened
+ * on the file.
+ */
+ void shutdown()
+ {
+ // Nothing to do yet as no file handle is kept opened.
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
new file mode 100644
index 0000000..037c048
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -0,0 +1,209 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.SyncFailedException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * A writer on a log file.
+ * <p>
+ * The writer is cached in order to have a single writer per file in the JVM.
+ */
+class LogWriter extends OutputStream
+{
+ /** The cache of log writers. There is a single writer per file in the JVM. */
+ private static final Map<File, LogWriter> logWritersCache = new HashMap<File, LogWriter>();
+
+ /** The exclusive lock used to acquire or close a log writer. */
+ private static final Object lock = new Object();
+
+ /** The file to write in. */
+ private final File file;
+
+ /** The stream to write data in the file. */
+ private final BufferedOutputStream stream;
+
+ /** The file descriptor on the file. */
+ private final FileDescriptor fileDescriptor;
+
+ /** The number of references on this writer. */
+ private int referenceCount;
+
+ /**
+ * Creates a writer on the provided file.
+ *
+ * @param file
+ * The file to write.
+ * @param stream
+ * The stream to write in the file.
+ * @param fileDescriptor
+ * The descriptor on the file.
+ */
+ private LogWriter(final File file, BufferedOutputStream stream, FileDescriptor fileDescriptor)
+ throws ChangelogException
+ {
+ this.file = file;
+ this.stream = stream;
+ this.fileDescriptor = fileDescriptor;
+ this.referenceCount = 1;
+ }
+
+ /**
+ * Returns a log writer on the provided file, creating it if necessary.
+ *
+ * @param file
+ * The log file to write in.
+ * @return the log writer
+ * @throws ChangelogException
+ * If a problem occurs.
+ */
+ public static LogWriter acquireWriter(File file) throws ChangelogException
+ {
+ synchronized (lock)
+ {
+ LogWriter logWriter = logWritersCache.get(file);
+ if (logWriter == null)
+ {
+ try
+ {
+ final FileOutputStream stream = new FileOutputStream(file, true);
+ logWriter = new LogWriter(file, new BufferedOutputStream(stream), stream.getFD());
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
+ }
+ logWritersCache.put(file, logWriter);
+ }
+ else
+ {
+ logWriter.incrementRefCounter();
+ }
+ return logWriter;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(int b) throws IOException
+ {
+ stream.write(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(byte[] b) throws IOException
+ {
+ stream.write(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ stream.write(b, off, len);
+ }
+
+ /**
+ * Writes the provided byte string to the underlying output stream of this writer.
+ *
+ * @param bs
+ * The byte string to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ public void write(ByteString bs) throws IOException
+ {
+ bs.copyTo(stream);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void flush() throws IOException
+ {
+ stream.flush();
+ }
+
+ /**
+ * Synchronize all modifications to the file to the underlying device.
+ *
+ * @throws SyncFailedException
+ * If synchronization fails.
+ */
+ void sync() throws SyncFailedException {
+ fileDescriptor.sync();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ synchronized (lock)
+ {
+ LogWriter writer = logWritersCache.get(file);
+ if (writer == null)
+ {
+ // writer is already closed
+ return;
+ }
+ // counter == 0 should never happen
+ if (referenceCount == 0 || referenceCount == 1)
+ {
+ StaticUtils.close(stream);
+ logWritersCache.remove(file);
+ referenceCount = 0;
+ }
+ else
+ {
+ referenceCount--;
+ }
+ }
+ }
+
+ private void incrementRefCounter()
+ {
+ referenceCount++;
+ }
+
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Record.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Record.java
new file mode 100644
index 0000000..24b25d2
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Record.java
@@ -0,0 +1,130 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+/**
+ * Represents a record, which is a pair of key-value.
+ *
+ * @param <K>
+ * The type of a key.
+ * @param <V>
+ * The type of a value.
+ */
+class Record<K, V>
+{
+ private final K key;
+ private final V value;
+
+ /**
+ * Creates a record from provided key and value.
+ *
+ * @param key
+ * The key.
+ * @param value
+ * The value.
+ */
+ private Record(final K key, final V value)
+ {
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Create a record from provided key and value.
+ *
+ * @param <K>
+ * The type of the key.
+ * @param <V>
+ * The type of the value.
+ * @param key
+ * The key.
+ * @param value
+ * The value.
+ * @return a record
+ */
+ static <K, V> Record<K, V> from(final K key, final V value) {
+ return new Record<K, V>(key, value);
+ }
+
+ /**
+ * Returns the key of this record.
+ *
+ * @return the key
+ */
+ K getKey()
+ {
+ return key;
+ }
+
+ /**
+ * Returns the value of this record.
+ *
+ * @return the value
+ */
+ V getValue()
+ {
+ return value;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((key == null) ? 0 : key.hashCode());
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object that)
+ {
+ if (this == that)
+ {
+ return true;
+ }
+ if (!(that instanceof Record))
+ {
+ return false;
+ }
+ Record<?, ?> other = (Record<?, ?>) that;
+ final boolean keyEquals = key == null ? other.key == null : key.equals(other.key);
+ if (!keyEquals)
+ {
+ return false;
+ }
+ return value == null ? other.value == null : value.equals(other.value);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return "Record [" + key + ":" + value + "]";
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java b/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
new file mode 100644
index 0000000..94e65de
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
@@ -0,0 +1,72 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.opends.server.types.ByteString;
+
+/**
+ * Parser of a log record.
+ * <p>
+ * The parser allows to convert from a object to its binary representation
+ * and to convert back from the binary representation to the object.
+ *
+ * @param <K>
+ * Type of the key of the record.
+ * @param <V>
+ * Type of the value of the record.
+ */
+interface RecordParser<K, V>
+{
+
+ /**
+ * Decode a record from the provided byte array.
+ * <p>
+ * The record is expected to have been encoded using the {@code writeRecord()}
+ * method.
+ *
+ * @param data
+ * The raw data to read the record from.
+ * @return the decoded record, or {@code null} if there is no more record to
+ * read, or only an incomplete record
+ * @throws DecodingException
+ * If an error occurs while decoding the record.
+ */
+ Record<K, V> decodeRecord(ByteString data) throws DecodingException;
+
+ /**
+ * Encode the provided key and value to a byte array.
+ * <p>
+ * The returned array is intended to be stored as provided in the log file.
+ *
+ * @param key
+ * The key of the record.
+ * @param value
+ * The value of the record.
+ * @return the bytes array representing the (key,value) record
+ */
+ ByteString encodeRecord(K key, V value);
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
new file mode 100644
index 0000000..78d4686
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -0,0 +1,690 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * Represents the replication environment, which allows to manage the lifecycle
+ * of the replication changelog.
+ * <p>
+ * A changelog has a root directory, under which the following directories and files are
+ * created :
+ * <ul>
+ * <li>A "changenumberindex" directory containing the log files for
+ * ChangeNumberIndexDB</li>
+ * <li>A "domains.state" file containing a mapping of each domain DN to an id. The
+ * id is used to name the corresponding domain directory.</li>
+ * <li>One directory per domain, named after "[id].domain" where [id] is the id
+ * assigned to the domain, as specified in the "domains.state" file.</li>
+ * </ul>
+ * <p>
+ * Each domain directory contains the following directories and files :
+ * <ul>
+ * <li>A "generation_[id].id" file, where [id] is the generation id</li>
+ * <li>One directory per server id, named after "[id].server" where [id] is the
+ * id of the server. Each directory contains the log files for the given server
+ * id.</li>
+ * </ul>
+ * All log files end with the ".log" suffix.
+ * <p>
+ * Layout example with two domains "o=test1" and "o=test2", each having server
+ * ids 22 and 33 :
+ *
+ * <pre>
+ * +---changelog
+ * | \---domains.state [contains mapping: 1 => "o=test1", 2 => "o=test2"]
+ * | \---changenumberindex
+ * | \--- current.log
+ * | \---1.domain
+ * | \---generation1.id
+ * | \---22.server
+ * | \---current.log
+ * | \---33.server
+ * | \---current.log
+ * | \---2.domain
+ * | \---generation1.id
+ * | \---22.server
+ * | \---current.log
+ * | \---33.server
+ * | \---current.log
+ * </pre>
+ */
+class ReplicationEnvironment
+{
+ private static final DebugTracer TRACER = getTracer();
+
+ private static final int NO_GENERATION_ID = -1;
+
+ private static final String CN_INDEX_DB_DIRNAME = "changenumberindex";
+
+ private static final String DOMAINS_STATE_FILENAME = "domains.state";
+
+ private static final String DOMAIN_STATE_SEPARATOR = ":";
+
+ private static final String DOMAIN_SUFFIX = ".domain";
+
+ private static final String SERVER_ID_SUFFIX = ".server";
+
+ private static final String GENERATION_ID_FILE_PREFIX = "generation";
+
+ private static final String GENERATION_ID_FILE_SUFFIX = ".id";
+
+ private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isDirectory() && file.getName().endsWith(DOMAIN_SUFFIX);
+ }
+ };
+
+ private static final FileFilter SERVER_ID_FILE_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isDirectory() && file.getName().endsWith(SERVER_ID_SUFFIX);
+ }
+ };
+
+ private static final FileFilter GENERATION_ID_FILE_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isFile()
+ && file.getName().startsWith(GENERATION_ID_FILE_PREFIX)
+ && file.getName().endsWith(GENERATION_ID_FILE_SUFFIX);
+ }
+ };
+
+ /** Root path where the replication log is stored. */
+ private final String replicationRootPath;
+
+ /** The list of logs that are in use. */
+ private final List<LogFile<?, ?>> logs = new CopyOnWriteArrayList<LogFile<?, ?>>();
+
+ /** Maps each domain DN to a domain id that is used to name directory in file system. */
+ private final Map<DN, String> domains = new HashMap<DN, String>();
+
+ /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
+ // TODO : review the usefulness of this lock
+ private final Object domainLock = new Object();
+
+ /** The underlying replication server. */
+ private final ReplicationServer replicationServer;
+
+ private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+
+ /**
+ * Creates the replication environment.
+ *
+ * @param rootPath
+ * Root path where replication log is stored.
+ * @param replicationServer
+ * The underlying replication server.
+ * @throws ChangelogException
+ * If an error occurs during initialization.
+ */
+ ReplicationEnvironment(final String rootPath,
+ final ReplicationServer replicationServer) throws ChangelogException
+ {
+ this.replicationRootPath = rootPath;
+ this.replicationServer = replicationServer;
+ }
+
+ /**
+ * Returns the state of the replication changelog, which includes the list of
+ * known servers and the generation id.
+ *
+ * @return the {@link ChangelogState}
+ * @throws ChangelogException
+ * if a problem occurs while retrieving the state.
+ */
+ ChangelogState readChangelogState() throws ChangelogException
+ {
+ final ChangelogState state = new ChangelogState();
+ final File changelogPath = new File(replicationRootPath);
+ synchronized (domainLock)
+ {
+ readDomainsStateFile();
+ checkDomainDirectories(changelogPath);
+ for (final Entry<DN, String> domainEntry : domains.entrySet())
+ {
+ readStateForDomain(domainEntry, state);
+ }
+ }
+ return state;
+ }
+
+ /**
+ * Finds or creates the log used to store changes from the replication server
+ * with the given serverId and the given baseDN.
+ *
+ * @param domainDN
+ * The DN that identifies the domain.
+ * @param serverId
+ * The server id that identifies the server.
+ * @param generationId
+ * The generationId associated to this domain.
+ * @return the log.
+ * @throws ChangelogException
+ * if an error occurs.
+ */
+ LogFile<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
+ throws ChangelogException
+ {
+ if (debugEnabled())
+ {
+ debug("ReplicationEnvironment.getOrCreateReplicaDB(" + domainDN + ", " + serverId + ", " + generationId + ")");
+ }
+
+ try
+ {
+ ensureRootDirectoryExists();
+
+ String domainId = null;
+ synchronized (domainLock)
+ {
+ domainId = domains.get(domainDN);
+ if (domainId == null)
+ {
+ domainId = createDomainId(domainDN);
+ }
+
+ final File serverIdPath = getServerIdPath(domainId, serverId);
+ ensureServerIdDirectoryExists(serverIdPath);
+
+ final File generationIdPath = getGenerationIdPath(domainId, generationId);
+ ensureGenerationIdFileExists(generationIdPath);
+
+ return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_REPLICA_DB.get(domainDN.toString(), serverId, generationId), e);
+ }
+ }
+
+ /**
+ * Find or create the log to manage integer change number associated to
+ * multidomain server state.
+ * <p>
+ * TODO: ECL how to manage compatibility of this db
+ * with new domains added or removed ?
+ *
+ * @return the log.
+ * @throws ChangelogException
+ * when a problem occurs.
+ */
+ LogFile<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
+ {
+ final File path = getCNIndexDBPath();
+ try
+ {
+ return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER);
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_CN_INDEX_DB.get(replicationRootPath, path.getPath()), e);
+ }
+ }
+
+ /**
+ * Shutdown the environment.
+ * <p>
+ * The log DBs are not closed by this method. It assumes they are already
+ * closed.
+ */
+ void shutdown()
+ {
+ if (isShuttingDown.compareAndSet(false, true))
+ {
+ logs.clear();
+ }
+ }
+
+ /**
+ * Clears the content of replication database.
+ *
+ * @param log
+ * The log to clear.
+ */
+ void clearDB(final LogFile<?, ?> log)
+ {
+ try
+ {
+ log.clear();
+ }
+ catch (ChangelogException e)
+ {
+ logError(ERR_ERROR_CLEARING_DB.get(log.getName(), stackTraceToSingleLineString(e)));
+ }
+ }
+
+ /**
+ * Clears the generated id associated to the provided domain DN from the state
+ * Db.
+ * <p>
+ * If generation id can't be found, it is not considered as an error, the
+ * method will just return.
+ *
+ * @param domainDN
+ * The domain DN for which the generationID must be cleared.
+ * @throws ChangelogException
+ * If a problem occurs during clearing.
+ */
+ void clearGenerationId(final DN domainDN) throws ChangelogException
+ {
+ synchronized(domainLock)
+ {
+ final String domainId = domains.get(domainDN);
+ final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
+ if (idFile != null)
+ {
+ final boolean isDeleted = idFile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Reset the generationId to the default value used when there is no
+ * generation id.
+ *
+ * @param baseDN
+ * The baseDN for which the generationID must be reset.
+ * @throws ChangelogException
+ * If a problem occurs during reset.
+ */
+ void resetGenerationId(final DN baseDN) throws ChangelogException
+ {
+ synchronized (domainLock)
+ {
+ clearGenerationId(baseDN);
+ final String domainId = domains.get(baseDN);
+ final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
+ ensureGenerationIdFileExists(generationIdPath);
+ }
+ }
+
+ /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
+ private void readDomainsStateFile() throws ChangelogException
+ {
+ final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
+ if (domainsStateFile.exists())
+ {
+ BufferedReader reader = null;
+ String line = null;
+ try
+ {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
+ while ((line = reader.readLine()) != null)
+ {
+ final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
+ final String domainId = line.substring(0, separatorPos);
+ final DN domainDN = DN.decode(line.substring(separatorPos+1));
+ domains.put(domainDN, domainId);
+ }
+ }
+ catch(DirectoryException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE.get(
+ domainsStateFile.getPath(), line), e);
+ }
+ catch(Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE.get(
+ domainsStateFile.getPath()), e);
+ }
+ finally {
+ StaticUtils.close(reader);
+ }
+ }
+ }
+
+ /**
+ * Checks that domain directories in file system are consistent with
+ * information from domains mapping.
+ */
+ private void checkDomainDirectories(final File changelogPath) throws ChangelogException
+ {
+ final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
+ if (dnDirectories == null)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
+ }
+
+ Set<String> domainIdsFromFileSystem = new HashSet<String>();
+ for (final File dnDir : dnDirectories)
+ {
+ final String fileName = dnDir.getName();
+ final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
+ domainIdsFromFileSystem.add(domainId);
+ }
+
+ Set<String> expectedDomainIds = new HashSet<String>(domains.values());
+ if (!domainIdsFromFileSystem.equals(expectedDomainIds))
+ {
+ throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
+ domainIdsFromFileSystem.toString()));
+ }
+ }
+
+ /**
+ * Update the changelog state with the state corresponding to the provided
+ * domain DN.
+ */
+ private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result)
+ throws ChangelogException
+ {
+ final File domainDirectory = getDomainPath(domainEntry.getValue());
+ final String generationId = retrieveGenerationId(domainDirectory);
+ if (generationId == null)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND.get(
+ replicationRootPath, domainDirectory.getPath()));
+ }
+ final DN domainDN = domainEntry.getKey();
+ result.setDomainGenerationId(domainDN, toGenerationId(generationId));
+
+ final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
+ if (serverIds == null)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY.get(
+ replicationRootPath, domainDirectory.getPath()));
+ }
+ for (final File serverId : serverIds)
+ {
+ result.addServerIdToDomain(toServerId(serverId.getName()), domainDN);
+ }
+ }
+
+ private String createDomainId(final DN domainDN) throws ChangelogException
+ {
+ final String nextDomainId = findNextDomainId();
+ domains.put(domainDN, nextDomainId);
+ final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
+ Writer writer = null;
+ try
+ {
+ writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8"));
+ for (final Entry<DN, String> entry : domains.entrySet())
+ {
+ writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
+ }
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
+ domainDN.toString(), domainsStateFile.getPath()), e);
+ }
+ finally
+ {
+ StaticUtils.close(writer);
+ }
+ return nextDomainId;
+ }
+
+ /** Find the next domain id to use. This is the lowest integer that is higher than all existing ids. */
+ private String findNextDomainId()
+ {
+ int nextId = 1;
+ for (final String domainId : domains.values())
+ {
+ final Integer id = Integer.valueOf(domainId);
+ if (nextId <= id)
+ {
+ nextId = id + 1;
+ }
+ }
+ return String.valueOf(nextId);
+ }
+
+ /** Open a log from the provided path and record parser. */
+ private <K extends Comparable<K>, V> LogFile<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
+ throws ChangelogException
+ {
+ checkShutDownBeforeOpening(serverIdPath);
+
+ final LogFile<K, V> log = LogFile.newAppendableLogFile(serverIdPath, parser);
+
+ checkShutDownAfterOpening(serverIdPath, log);
+
+ logs.add(log);
+ return log;
+ }
+
+ private void checkShutDownAfterOpening(final File serverIdPath, final LogFile<?, ?> log) throws ChangelogException
+ {
+ if (isShuttingDown.get())
+ {
+ closeDB(log);
+ throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(),
+ replicationServer.getServerId()));
+ }
+ }
+
+ private void checkShutDownBeforeOpening(final File serverIdPath) throws ChangelogException
+ {
+ if (isShuttingDown.get())
+ {
+ throw new ChangelogException(
+ WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
+ serverIdPath.getPath(), replicationServer.getServerId()));
+ }
+ }
+
+ /**
+ * Retrieve the generation id from the provided directory.
+ *
+ * @return the generation id or {@code null} if the corresponding file can't
+ * be found
+ */
+ private String retrieveGenerationId(final File directory)
+ {
+ final File generationId = retrieveGenerationIdFile(directory);
+ if (generationId != null)
+ {
+ String filename = generationId.getName();
+ return filename.substring(GENERATION_ID_FILE_PREFIX.length(),
+ filename.length() - GENERATION_ID_FILE_SUFFIX.length());
+ }
+ return null;
+ }
+
+ /**
+ * Retrieve the file named after the generation id from the provided
+ * directory.
+ *
+ * @return the generation id file or {@code null} if the corresponding file
+ * can't be found
+ */
+ private File retrieveGenerationIdFile(final File directory)
+ {
+ File[] generationIds = directory.listFiles(GENERATION_ID_FILE_FILTER);
+ return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null;
+ }
+
+ private File getDomainPath(final String domainId)
+ {
+ return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
+ }
+
+ private File getServerIdPath(final String domainId, final int serverId)
+ {
+ return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
+ }
+
+ private File getGenerationIdPath(final String domainId, final long generationId)
+ {
+ return new File(getDomainPath(domainId), GENERATION_ID_FILE_PREFIX + generationId + GENERATION_ID_FILE_SUFFIX);
+ }
+
+ private File getCNIndexDBPath()
+ {
+ return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
+ }
+
+ private void closeDB(final LogFile<?, ?> log)
+ {
+ logs.remove(log);
+ log.close();
+ }
+
+ private void ensureRootDirectoryExists() throws ChangelogException
+ {
+ final File rootDir = new File(replicationRootPath);
+ if (!rootDir.exists())
+ {
+ final boolean created = rootDir.mkdirs();
+ if (!created)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(replicationRootPath));
+ }
+ }
+ }
+
+ private void ensureServerIdDirectoryExists(final File serverIdPath) throws ChangelogException
+ {
+ if (!serverIdPath.exists())
+ {
+ boolean created = false;
+ try
+ {
+ created = serverIdPath.mkdirs();
+ }
+ catch (Exception e)
+ {
+ // nothing to do
+ }
+
+ if (!created)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_SERVER_ID_DIRECTORY.get(serverIdPath.getPath(), 0));
+ }
+ }
+ }
+
+ private void ensureGenerationIdFileExists(final File generationIdPath)
+ throws ChangelogException
+ {
+ if (!generationIdPath.exists())
+ {
+ try
+ {
+ boolean isCreated = generationIdPath.createNewFile();
+ if (!isCreated)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
+ }
+ }
+ }
+
+ private void debug(String message)
+ {
+ // Replication server may be null when testing
+ String monitorInstanceName = replicationServer != null ? replicationServer.getMonitorInstanceName() :
+ "no monitor [test]";
+ TRACER.debugInfo("In " + monitorInstanceName + ", " + message);
+ }
+
+ private int toServerId(final String serverIdName) throws ChangelogException
+ {
+ try
+ {
+ String serverId = serverIdName.substring(0, serverIdName.length() - SERVER_ID_SUFFIX.length());
+ return Integer.parseInt(serverId);
+ }
+ catch (NumberFormatException e)
+ {
+ // should never happen
+ throw new ChangelogException(ERR_CHANGELOG_SERVER_ID_FILENAME_WRONG_FORMAT.get(serverIdName), e);
+ }
+ }
+
+ private long toGenerationId(final String data) throws ChangelogException
+ {
+ try
+ {
+ return Long.parseLong(data);
+ }
+ catch (NumberFormatException e)
+ {
+ // should never happen
+ throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/package-info.java b/opends/src/server/org/opends/server/replication/server/changelog/file/package-info.java
new file mode 100644
index 0000000..c6b4899
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/package-info.java
@@ -0,0 +1,33 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+
+/**
+ * This package contains a file-based log implementation for the changelog
+ * database API.
+ */
+@org.opends.server.types.PublicAPI(
+ stability = org.opends.server.types.StabilityLevel.PRIVATE)
+package org.opends.server.replication.server.changelog.file;
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 5ee48e5..c93ca98 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -160,7 +160,7 @@
* @param changelogState
* the changelog state used for initialization
*/
- ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
+ public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
{
super("Change number indexer");
this.changelogDB = changelogDB;
@@ -521,7 +521,6 @@
}
}
-
// OK, the oldest change is older than the medium consistency point
// let's publish it to the CNIndexDB.
final String previousCookie = mediumConsistencyRUV.toString();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 52ef4c6..ccd27c0 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
* @param <Data>
* The type of data associated with each cursor
*/
-final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
+public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
private static final byte UNINITIALIZED = 0;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/DirectoryServerTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/DirectoryServerTestCase.java
index e3bfc3e..96e1005 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/DirectoryServerTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/DirectoryServerTestCase.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server;
@@ -32,6 +32,7 @@
import org.testng.annotations.Test;
import org.testng.annotations.AfterClass;
import org.opends.messages.Message;
+import org.opends.server.replication.ReplicationTestCase;
import java.util.ArrayList;
import java.util.Collections;
@@ -54,6 +55,10 @@
@BeforeSuite
public final void suppressOutput() {
+ System.out.println("Replication DB implementation used in tests: '" +
+ ReplicationTestCase.replicationDbImplementation + "'.");
+ System.out.flush();
+
TestCaseUtils.suppressOutput();
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 5514ab7..bc3aee3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -22,13 +22,17 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
* Portions Copyright 2013 Manuel Gaupp
*/
package org.opends.server;
import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.net.*;
+import java.text.SimpleDateFormat;
import java.util.*;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
@@ -1926,4 +1930,46 @@
return new ArrayList<T>(Arrays.asList(elems));
}
+ /** Saves a thread dump in a file with the provided id used in file prefix. */
+ public static void generateThreadDump(String id)
+ {
+ String date = new SimpleDateFormat("yyyyMMdd_hhmmss").format(new Date().getTime());
+ BufferedWriter writer = null;
+ try
+ {
+ writer = new BufferedWriter(new FileWriter("/tmp/thread_dump_" + id + "_" + date));
+ writer.write(generateThreadDump());
+ }
+ catch (Exception e)
+ {
+ // do nothing
+ }
+ finally
+ {
+ close(writer);
+ }
+ }
+
+ /** Generates a thread dump programmatically. */
+ public static String generateThreadDump() {
+ final StringBuilder dump = new StringBuilder();
+ final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+ for (ThreadInfo threadInfo : threadInfos) {
+ dump.append('"');
+ dump.append(threadInfo.getThreadName());
+ dump.append("\" ");
+ final Thread.State state = threadInfo.getThreadState();
+ dump.append("\n java.lang.Thread.State: ");
+ dump.append(state);
+ final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+ for (final StackTraceElement stackTraceElement : stackTraceElements) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n\n");
+ }
+ return dump.toString();
+}
+
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/plugins/ReferentialIntegrityPluginTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/plugins/ReferentialIntegrityPluginTestCase.java
index cad144e..8dcc4b5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/plugins/ReferentialIntegrityPluginTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/plugins/ReferentialIntegrityPluginTestCase.java
@@ -23,6 +23,7 @@
*
* Copyright 2008-2010 Sun Microsystems, Inc.
* Portions copyright 2011 profiq s.r.o.
+ * Portions copyright 2014 ForgeRock AS
*/
package org.opends.server.plugins;
@@ -850,6 +851,8 @@
@AfterClass
public void tearDown() throws Exception {
deleteAttrsEntry(configDN, dsConfigBaseDN);
+ deleteAttrsEntry(configDN, dsConfigEnforceIntegrity);
+ deleteAttrsEntry(configDN, dsConfigAttrFiltMapping);
//Hopefully put an attribute type there that won't impact the rest of the
//unit tests.
replaceAttrEntry(configDN, dsConfigAttrType,"seeAlso");
@@ -1398,7 +1401,7 @@
Entry entry = null;
AddOperation addOperation = null;
-
+
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
@@ -1452,7 +1455,7 @@
Entry entry = null;
AddOperation addOperation = null;
-
+
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
@@ -1615,7 +1618,6 @@
"member:(objectclass=person)");
replaceAttrEntry(configDN, "ds-cfg-enabled", "true");
-
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
@@ -1632,8 +1634,7 @@
);
AddOperation addOperation = conn.processAdd(entry);
- assertEquals(addOperation.getResultCode(),
- ResultCode.CONSTRAINT_VIOLATION);
+ assertEquals(addOperation.getResultCode(), ResultCode.CONSTRAINT_VIOLATION);
}
/**
@@ -2028,7 +2029,7 @@
"subordinatedelete",
"preoperationadd",
"preoperationmodify");
- addAttrEntry(configDN, dsConfigBaseDN,
+ addAttrEntry(configDN, dsConfigBaseDN,
"dc=example,dc=com",
"o=test");
replaceAttrEntry(configDN, dsConfigEnforceIntegrity, "true");
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
index 6314d5e..0a03dcd 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
@@ -22,10 +22,13 @@
*
*
* Copyright 2007-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.LinkedList;
@@ -46,13 +49,14 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
-import org.opends.server.types.*;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.testng.Assert.*;
-
/**
* Test that the dependencies are computed correctly when replaying
* sequences of operations that requires to follow a given order
@@ -71,7 +75,7 @@
{
TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING);
}
-
+
/**
* Check that a sequence of dependents adds and mods is correctly ordered:
* Using a deep dit :
@@ -131,8 +135,8 @@
ReplServerFakeConfiguration conf =
new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddModDelDependencyTestDb",
- 0, replServerId, 0,
- AddSequenceLength*5+100, null);
+ replicationDbImplementation, 0, replServerId,
+ 0, AddSequenceLength*5+100, null);
replServer = new ReplicationServer(conf);
ReplicationBroker broker = openReplicationSession(
@@ -267,8 +271,8 @@
ReplServerFakeConfiguration conf =
new ReplServerFakeConfiguration(replServerPort, "dependencyTestModdnDelDependencyTestDb",
- 0, replServerId, 0,
- 200, null);
+ replicationDbImplementation, 0, replServerId,
+ 0, 200, null);
replServer = new ReplicationServer(conf);
// configure and start replication of TEST_ROOT_DN_STRING on the server
@@ -398,9 +402,8 @@
int replServerPort = TestCaseUtils.findFreePort();
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddDelAddDependencyTestDb", 0,
- replServerId,
- 0, 5*AddSequenceLength+100, null);
+ new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddDelAddDependencyTestDb", replicationDbImplementation,
+ 0, replServerId, 0, 5*AddSequenceLength+100, null);
replServer = new ReplicationServer(conf);
ReplicationBroker broker = openReplicationSession(
@@ -517,9 +520,8 @@
int replServerPort = TestCaseUtils.findFreePort();
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddModdnDependencyTestDb", 0,
- replServerId,
- 0, 5*AddSequenceLength+100, null);
+ new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddModdnDependencyTestDb", replicationDbImplementation,
+ 0, replServerId, 0, 5*AddSequenceLength+100, null);
replServer = new ReplicationServer(conf);
ReplicationBroker broker = openReplicationSession(
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
index 329231c..f6c43fe 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -329,7 +329,7 @@
int rsPort = getRSPort(replServerId);
String rsDir = "generationIdTest" + replServerId + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort, rsDir, 0, replServerId, 0, 100, servers);
+ new ReplServerFakeConfiguration(rsPort, rsDir, replicationDbImplementation, 0, replServerId, 0, 100, servers);
ReplicationServer replicationServer = new ReplicationServer(conf);
Thread.sleep(1000);
return replicationServer;
@@ -557,7 +557,8 @@
*/
private void checkChangelogSize(int expectedCount, int timeout) throws Exception
{
- throw new RuntimeException("Dead code. Should we remove this method and the test calling it?");
+ // TODO : commented this throw because test is executed through a slow test
+ //throw new RuntimeException("Dead code. Should we remove this method and the test calling it?");
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index dec8127..362bde8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication;
@@ -505,11 +505,8 @@
new ReplServerFakeConfiguration(
port,
"initOnlineTest" + port + testCase + "Db",
- 0,
- replServerId,
- 0,
- 100,
- servers);
+ replicationDbImplementation,
+ 0, replServerId, 0, 100, servers);
ReplicationServer replicationServer = new ReplicationServer(conf);
Thread.sleep(1000);
return replicationServer;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index 9e24553..8dda4ff 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication;
@@ -256,8 +256,8 @@
// configure the replication Server.
replicationServer = new ReplicationServer(new ReplServerFakeConfiguration(
- replServerPort, "protocolWindowTestDb", 0,
- 1, REPLICATION_QUEUE_SIZE, WINDOW_SIZE, null));
+ replServerPort, "protocolWindowTestDb", replicationDbImplementation,
+ 0, 1, REPLICATION_QUEUE_SIZE, WINDOW_SIZE, null));
String personLdif = "dn: uid=user.windowTest," + TEST_ROOT_DN_STRING + "\n"
+ "objectClass: top\n" + "objectClass: person\n"
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 2be604a..64dedee 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -35,6 +35,7 @@
import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.ConfigException;
@@ -50,6 +51,7 @@
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.file.FileChangelogDB;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.schema.IntegerSyntax;
@@ -105,6 +107,11 @@
protected Entry synchroServerEntry;
protected Entry replServerEntry;
+ private static final String REPLICATION_DB_IMPL_PROPERTY = "org.opends.test.replicationDbImpl";
+
+ public static ReplicationDBImplementation replicationDbImplementation = ReplicationDBImplementation.valueOf(
+ System.getProperty(REPLICATION_DB_IMPL_PROPERTY, ReplicationDBImplementation.LOG.name()));
+
/**
* Replication monitor stats
*/
@@ -181,7 +188,9 @@
LDAPReplicationDomain replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDN);
genId = replDomain.getGenerationID();
}
- catch(Exception e) {}
+ catch(Exception e) {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
return genId;
}
@@ -317,8 +326,7 @@
@AfterClass
public void classCleanUp() throws Exception
{
- logError(Message.raw(Category.SYNC, Severity.NOTICE,
- " ##### Calling ReplicationTestCase.classCleanUp ##### "));
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, " ##### Calling ReplicationTestCase.classCleanUp ##### "));
removeReplicationServerDB();
@@ -365,7 +373,14 @@
protected void clearChangelogDB(ReplicationServer rs) throws Exception
{
- ((JEChangelogDB) rs.getChangelogDB()).clearDB();
+ if (replicationDbImplementation == ReplicationDBImplementation.JE)
+ {
+ ((JEChangelogDB) rs.getChangelogDB()).clearDB();
+ }
+ else
+ {
+ ((FileChangelogDB) rs.getChangelogDB()).clearDB();
+ }
}
/**
@@ -926,4 +941,9 @@
+ " Also received the following messages during wait time: " + msgs);
return null;
}
+
+ protected static void setReplicationDBImplementation(ReplicationDBImplementation impl)
+ {
+ replicationDbImplementation = impl;
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
index 96de39d..c9fbacc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2012-2013 ForgeRock AS.
+ * Portions Copyright 2012-2014 ForgeRock AS.
*/
package org.opends.server.replication;
@@ -87,6 +87,7 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: SchemaReplicationTest\n"
+ + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 105\n";
// suffix synchronized
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
index 2508268..4a1e27f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication;
@@ -130,6 +130,7 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: UpdateOperationTest\n"
+ + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 107\n";
// suffix synchronized
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 10f25fc..3e1c5c7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -534,8 +534,8 @@
String dir = testName + RS_ID + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(replServerPort, dir, 0, RS_ID, 0, 100,
- replServers);
+ new ReplServerFakeConfiguration(replServerPort, dir, replicationDbImplementation, 0, RS_ID, 0,
+ 100, replServers);
replicationServer = new ReplicationServer(conf);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/GroupIdHandshakeTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/GroupIdHandshakeTest.java
index 9133db8..82f8fbe 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/GroupIdHandshakeTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/GroupIdHandshakeTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -34,6 +34,7 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
@@ -288,8 +289,8 @@
String dir = "groupIdHandshakeTest" + serverId + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
- replServers, groupId, 1000, 5000);
+ new ReplServerFakeConfiguration(port, dir, replicationDbImplementation, 0, serverId, 0,
+ 100, replServers, groupId, 1000, 5000);
return new ReplicationServer(conf);
}
@@ -497,8 +498,8 @@
otherReplServers.add("localhost:" + rs2Port);
String dir = "groupIdHandshakeTest" + RS3_ID + testCase + "Db";
ReplServerFakeConfiguration rsConfWithNewGid =
- new ReplServerFakeConfiguration(rs3Port, dir, 0, RS3_ID, 0, 100,
- otherReplServers, 1, 1000, 5000);
+ new ReplServerFakeConfiguration(rs3Port, dir, replicationDbImplementation, 0, RS3_ID, 0,
+ 100, otherReplServers, 1, 1000, 5000);
rs3.applyConfigurationChange(rsConfWithNewGid);
/**
@@ -508,8 +509,8 @@
otherReplServers.add("localhost:" + rs2Port);
otherReplServers.add("localhost:" + rs3Port);
dir = "groupIdHandshakeTest" + RS1_ID + testCase + "Db";
- rsConfWithNewGid = new ReplServerFakeConfiguration(rs1Port, dir, 0, RS1_ID,
- 0, 100, otherReplServers, 3, 1000, 5000);
+ rsConfWithNewGid = new ReplServerFakeConfiguration(rs1Port, dir, ReplicationDBImplementation.JE, 0,
+ RS1_ID, 0, 100, otherReplServers, 3, 1000, 5000);
rs1.applyConfigurationChange(rsConfWithNewGid);
checkConnection(30, DS1_ID, RS3_ID,
"Change GID of RS3 to 1 and RS1 to 3, DS1 should reconnect to RS3 with GID=1");
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index 94646a4..b9f72b9 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -315,8 +315,8 @@
replServers.add("localhost:" + rsPort);
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering", 0, 1,
- 0, 100, replServers, 1, 1000, 5000);
+ new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering", replicationDbImplementation, 0,
+ 1, 0, 100, replServers, 1, 1000, 5000);
ReplicationServer replicationServer = new ReplicationServer(conf);
clearChangelogDB(replicationServer);
return replicationServer;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
index e2298a5..1a39399 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -363,8 +363,8 @@
String dir = "replicationServerFailoverTest" + serverId + suffix + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
- replServers);
+ new ReplServerFakeConfiguration(port, dir, replicationDbImplementation, 0, serverId, 0,
+ 100, replServers);
return new ReplicationServer(conf);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
index dbc3204..bdc5df1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -159,8 +159,8 @@
String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0, rsIndex+501, 0, 100,
- replServers, 1, 1000, 5000, weight);
+ new ReplServerFakeConfiguration(rsPort[rsIndex], dir, replicationDbImplementation, 0, rsIndex+501, 0,
+ 100, replServers, 1, 1000, 5000, weight);
return new ReplicationServer(conf);
}
@@ -190,8 +190,8 @@
}
String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
- return new ReplServerFakeConfiguration(rsPort[rsIndex], dir, 0,
- rsIndex + 501, 0, 100, replServers, 1, 1000, 5000, weight);
+ return new ReplServerFakeConfiguration(rsPort[rsIndex], dir, replicationDbImplementation,
+ 0, rsIndex + 501, 0, 100, replServers, 1, 1000, 5000, weight);
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
index 2207375..16497fb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -187,8 +187,8 @@
String dir = "stateMachineTest" + RS1_ID + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rs1Port, dir, 0, RS1_ID, 0, 100,
- replServers, 1, 1000, degradedStatusThreshold);
+ new ReplServerFakeConfiguration(rs1Port, dir, replicationDbImplementation, 0, RS1_ID, 0,
+ 100, replServers, 1, 1000, degradedStatusThreshold);
return new ReplicationServer(conf);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index 4708844..c6130fc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2014 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -369,8 +369,8 @@
String dir = "topologyViewTest" + rsId + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort, dir, 0, rsId, 0, 100,
- replServers, groupId, 1000, 5000);
+ new ReplServerFakeConfiguration(rsPort, dir, replicationDbImplementation, 0, rsId, 0,
+ 100, replServers, groupId, 1000, 5000);
return new ReplicationServer(conf);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 04f56cd..c353736 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2014 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -391,8 +391,8 @@
String dir = testName + serverId + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
- otherRsUrls, groupId, assuredTimeout, 5000);
+ new ReplServerFakeConfiguration(port, dir, replicationDbImplementation, 0, serverId, 0,
+ 100, otherRsUrls, groupId, assuredTimeout, 5000);
// No monitoring publisher to not interfere with some SocketTimeoutException
// expected at some points in these tests
conf.setMonitoringPeriod(0L);
@@ -3076,8 +3076,8 @@
// Create real RS
String dir = testName + RS1_ID + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPorts[0], dir, 0, RS1_ID, 0, 100,
- new TreeSet<String>(), DEFAULT_GID, SMALL_TIMEOUT, 1);
+ new ReplServerFakeConfiguration(rsPorts[0], dir, replicationDbImplementation, 0, RS1_ID, 0,
+ 100, new TreeSet<String>(), DEFAULT_GID, SMALL_TIMEOUT, 1);
rs1 = new ReplicationServer(conf);
/*
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 259a8e5..3125fce 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2014 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -151,7 +151,7 @@
ReplServerFakeConfiguration conf1 =
new ReplServerFakeConfiguration(
replicationServerPort, "ExternalChangeLogTestDb",
- 0, 71, 0, maxWindow, null);
+ replicationDbImplementation, 0, 71, 0, maxWindow, null);
conf1.setComputeChangeNumber(true);
replicationServer = new ReplicationServer(conf1);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
index 74297f7..77c5dff 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -172,8 +172,8 @@
int chPort = getChangelogPort(changelogId);
String chDir = "monitorTest" + changelogId + suffix + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
- servers);
+ new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0,
+ 100, servers);
ReplicationServer replicationServer = new ReplicationServer(conf);
Thread.sleep(1000);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
index a745f2b..73237fd 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2007-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -32,6 +32,7 @@
import org.opends.server.admin.Configuration;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ServerManagedObject;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.types.DN;
@@ -68,15 +69,19 @@
/** The monitoring publisher period. */
private long monitoringPeriod = 3000;
private boolean computeChangenumber;
+
+ /** The DB implementation to use for replication changelog. */
+ private final ReplicationDBImplementation dbImpl;
/**
* Constructor without group id, assured info and weight
*/
public ReplServerFakeConfiguration(
- int port, String dirName, int purgeDelay, int serverId,
- int queueSize, int windowSize, SortedSet<String> servers)
+ int port, String dirName, ReplicationDBImplementation dbImpl, int purgeDelay,
+ int serverId, int queueSize, int windowSize, SortedSet<String> servers)
{
this.port = port;
+ this.dbImpl = dbImpl;
this.dirName = dirName != null ? dirName : "changelogDb";
if (purgeDelay == 0)
@@ -115,11 +120,11 @@
* Constructor with group id and assured info
*/
public ReplServerFakeConfiguration(
- int port, String dirName, int purgeDelay, int serverId,
- int queueSize, int windowSize, SortedSet<String> servers,
- int groupId, long assuredTimeout, int degradedStatusThreshold)
+ int port, String dirName, ReplicationDBImplementation dbImpl, int purgeDelay,
+ int serverId, int queueSize, int windowSize,
+ SortedSet<String> servers, int groupId, long assuredTimeout, int degradedStatusThreshold)
{
- this(port, dirName, purgeDelay, serverId, queueSize, windowSize, servers);
+ this(port, dirName, dbImpl, purgeDelay, serverId, queueSize, windowSize, servers);
this.groupId = groupId;
this.assuredTimeout = assuredTimeout;
this.degradedStatusThreshold = degradedStatusThreshold;
@@ -129,12 +134,12 @@
* Constructor with group id, assured info and weight
*/
public ReplServerFakeConfiguration(
- int port, String dirName, int purgeDelay, int serverId,
- int queueSize, int windowSize, SortedSet<String> servers,
- int groupId, long assuredTimeout, int degradedStatusThreshold, int weight)
+ int port, String dirName, ReplicationDBImplementation dbImpl, int purgeDelay,
+ int serverId, int queueSize, int windowSize,
+ SortedSet<String> servers, int groupId, long assuredTimeout, int degradedStatusThreshold, int weight)
{
- this(port, dirName, purgeDelay, serverId, queueSize, windowSize, servers,
- groupId, assuredTimeout, degradedStatusThreshold);
+ this(port, dirName, dbImpl, purgeDelay, serverId, queueSize, windowSize,
+ servers, groupId, assuredTimeout, degradedStatusThreshold);
this.weight = weight;
}
@@ -296,4 +301,10 @@
{
this.computeChangenumber = computeChangenumber;
}
+
+ @Override
+ public ReplicationDBImplementation getReplicationDBImplementation()
+ {
+ return dbImpl;
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
index 267d8d9..7b45754 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -57,7 +57,7 @@
// instantiate a Replication server using the first port number.
ReplServerFakeConfiguration conf =
new ReplServerFakeConfiguration(
- ports[0], null, 0, 1, 0, 0, null);
+ ports[0], null, replicationDbImplementation, 0, 1, 0, 0, null);
replicationServer = new ReplicationServer(conf);
// Most of the configuration change are trivial to apply.
@@ -67,7 +67,7 @@
// connect to this new portnumber.
ReplServerFakeConfiguration newconf =
new ReplServerFakeConfiguration(
- ports[1], null, 0, 1, 0, 0, null);
+ ports[1], null, replicationDbImplementation, 0, 1, 0, 0, null);
replicationServer.applyConfigurationChange(newconf);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index d4ff462..f4c2d72 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions copyright 2011-2014 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -116,6 +116,7 @@
"--provider-name", "Multimaster Synchronization",
"--set", "replication-db-directory:" + "replicationServerTestConfigureDb",
"--set", "replication-port:" + replicationServerPort,
+ "--set", "replication-db-implementation:" + replicationDbImplementation,
"--set", "replication-server-id:71");
for (SynchronizationProvider<?> provider : DirectoryServer
@@ -644,8 +645,8 @@
servers.add(
"localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestChangelogChainingDb"+i, 0,
- changelogIds[i], 0, 100, servers);
+ new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestChangelogChainingDb"+i,
+ replicationDbImplementation, 0, changelogIds[i], 0, 100, servers);
changelogs[i] = new ReplicationServer(conf);
}
@@ -738,8 +739,8 @@
SortedSet<String> servers = new TreeSet<String>();
servers.add("localhost:" + changelogPorts[1]);
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb"+0, 0,
- changelogIds[0], 0, 100, servers);
+ new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb"+0, replicationDbImplementation,
+ 0, changelogIds[0], 0, 100, servers);
changelogs[0] = new ReplicationServer(conf);
}
@@ -790,7 +791,7 @@
SortedSet<String> servers = new TreeSet<String>();
servers.add("localhost:"+changelogPorts[0]);
ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
- changelogPorts[1], null, 0, changelogIds[1], 0, 100, null);
+ changelogPorts[1], null, replicationDbImplementation, 0, changelogIds[1], 0, 100, null);
changelogs[1] = new ReplicationServer(conf);
// Connect broker 2 to changelog2
@@ -1111,8 +1112,8 @@
if (i==0)
servers.add("localhost:" + changelogPorts[1]);
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestReplicationServerConnectedDb"+i, 0,
- changelogIds[i], 0, 100, servers);
+ new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestReplicationServerConnectedDb"+i, replicationDbImplementation,
+ 0, changelogIds[i], 0, 100, servers);
changelogs[i] = new ReplicationServer(conf);
}
@@ -1155,8 +1156,8 @@
SortedSet<String> servers = new TreeSet<String>();
// Configure replicationServer[0] to be disconnected from ReplicationServer[1]
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0,
- changelogIds[0], 0, 100, servers);
+ new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", replicationDbImplementation,
+ 0, changelogIds[0], 0, 100, servers);
changelogs[0].applyConfigurationChange(conf) ;
// The link between RS[0] & RS[1] should be destroyed by the new configuration.
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
new file mode 100644
index 0000000..2873fd4
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -0,0 +1,344 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.testng.Assert.*;
+
+@SuppressWarnings("javadoc")
+public class FileChangeNumberIndexDBTest extends ReplicationTestCase
+{
+ private final MultiDomainServerState previousCookie = new MultiDomainServerState();
+ private final List<String> cookies = new ArrayList<String>();
+
+ @BeforeMethod
+ public void clearCookie()
+ {
+ previousCookie.clear();
+ cookies.clear();
+ }
+
+ @DataProvider(name = "messages")
+ Object[][] createMessages() throws Exception
+ {
+ CSN[] csns = generateCSNs(1, 0, 3);
+ DN dn1 = DN.decode("o=baseDN1");
+ previousCookie.update(dn1, csns[0]);
+ return new Object[][] {
+ { new ChangeNumberIndexRecord(0L, previousCookie.toString(), DN.decode("o=baseDN1"), csns[1]) },
+ { new ChangeNumberIndexRecord(999L, previousCookie.toString(), DN.decode("o=baseDN1"), csns[2]) },
+ };
+ }
+
+ @Test(dataProvider="messages")
+ public void testRecordParser(ChangeNumberIndexRecord msg) throws Exception
+ {
+ RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER;
+
+ ByteString data = parser.encodeRecord(msg.getChangeNumber(), msg);
+ Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data);
+
+ assertThat(record).isNotNull();
+ assertThat(record.getKey()).isEqualTo(msg.getChangeNumber());
+ assertThat(record.getValue().getBaseDN()).isEqualTo(msg.getBaseDN());
+ assertThat(record.getValue().getCSN()).isEqualTo(msg.getCSN());
+ assertThat(record.getValue().getPreviousCookie()).isEqualTo(msg.getPreviousCookie());
+ }
+
+ @Test()
+ public void testAddAndReadRecords() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ try
+ {
+ replicationServer = newReplicationServer();
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0);
+ final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+
+ long[] changeNumbers = addThreeRecords(cnIndexDB);
+ long cn1 = changeNumbers[0];
+ long cn2 = changeNumbers[1];
+ long cn3 = changeNumbers[2];
+
+ // Checks
+ assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
+ assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+
+ assertEquals(cnIndexDB.count(), 3, "Db count");
+ assertFalse(cnIndexDB.isEmpty());
+
+ assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0));
+ assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1));
+ assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2));
+
+ DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
+ assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
+
+ cursor = cnIndexDB.getCursorFrom(cn2);
+ assertCursorReadsInOrder(cursor, cn2, cn3);
+
+ cursor = cnIndexDB.getCursorFrom(cn3);
+ assertCursorReadsInOrder(cursor, cn3);
+ }
+ finally
+ {
+ remove(replicationServer);
+ }
+ }
+
+ @Test()
+ public void testClear() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ try
+ {
+ replicationServer = newReplicationServer();
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0);
+ final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+ addThreeRecords(cnIndexDB);
+
+ cnIndexDB.clear();
+
+ assertNull(cnIndexDB.getOldestRecord());
+ assertNull(cnIndexDB.getNewestRecord());
+ assertEquals(cnIndexDB.count(), 0);
+ assertTrue(cnIndexDB.isEmpty());
+ }
+ finally
+ {
+ remove(replicationServer);
+ }
+ }
+
+
+ /**
+ * This test makes basic operations of a ChangeNumberIndexDB:
+ * <ol>
+ * <li>create the db</li>
+ * <li>add records</li>
+ * <li>read them with a cursor</li>
+ * <li>set a very short trim period</li>
+ * <li>wait for the db to be trimmed / here since the changes are not stored
+ * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
+ * </ol>
+ */
+ // TODO :: enable when purge is implemented with multi-files log
+ @Test(enabled=false)
+ public void testPurge() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ try
+ {
+ replicationServer = newReplicationServer();
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0); // disable purging
+
+ // Prepare data to be stored in the db
+ DN baseDN1 = DN.decode("o=baseDN1");
+ DN baseDN2 = DN.decode("o=baseDN2");
+ DN baseDN3 = DN.decode("o=baseDN3");
+
+ CSN[] csns = generateCSNs(1, 0, 3);
+
+ // Add records
+ final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+ long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+ addRecord(cnIndexDB, baseDN2, csns[1]);
+ long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+
+ // The ChangeNumber should not get purged
+ final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
+ assertEquals(oldestCN, cn1);
+ assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+
+ DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
+ try
+ {
+ assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0));
+ assertTrue(cursor.next());
+ assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1));
+ assertTrue(cursor.next());
+ assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2));
+ assertFalse(cursor.next());
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+
+ // Now test that purging removes all changes but the last one
+ changelogDB.setPurgeDelay(1);
+ int count = 0;
+ while (cnIndexDB.count() > 1 && count < 100)
+ {
+ Thread.sleep(10);
+ count++;
+ }
+ assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
+ }
+ finally
+ {
+ remove(replicationServer);
+ }
+ }
+
+ private CSN[] generateCSNs(int serverId, long timestamp, int number)
+ {
+ CSNGenerator gen = new CSNGenerator(serverId, timestamp);
+ CSN[] csns = new CSN[number];
+ for (int i = 0; i < csns.length; i++)
+ {
+ csns[i] = gen.newCSN();
+ }
+ return csns;
+ }
+
+ private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception
+ {
+ // Prepare data to be stored in the db
+ DN baseDN1 = DN.decode("o=baseDN1");
+ DN baseDN2 = DN.decode("o=baseDN2");
+ DN baseDN3 = DN.decode("o=baseDN3");
+
+ CSN[] csns = generateCSNs(1, 0, 3);
+
+ // Add records
+ long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+ long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
+ long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+ return new long[] { cn1, cn2, cn3 };
+ }
+
+ private long addRecord(FileChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
+ {
+ final String cookie = previousCookie.toString();
+ cookies.add(cookie);
+ final long changeNumber = cnIndexDB.addRecord(
+ new ChangeNumberIndexRecord(cookie, baseDN, csn));
+ previousCookie.update(baseDN, csn);
+ return changeNumber;
+ }
+
+ private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie)
+ {
+ assertEquals(record.getCSN(), csn);
+ assertEquals(record.getBaseDN(), baseDN);
+ assertEquals(record.getPreviousCookie(), cookie);
+ }
+
+ private FileChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
+ {
+ final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
+ final FileChangeNumberIndexDB cnIndexDB =
+ (FileChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
+ assertTrue(cnIndexDB.isEmpty());
+ return cnIndexDB;
+ }
+
+ /**
+ * The newest record is no longer cleared to ensure persistence to the last
+ * generated change number across server restarts.
+ */
+ private void assertOnlyNewestRecordIsLeft(FileChangeNumberIndexDB cnIndexDB,
+ int newestChangeNumber) throws ChangelogException
+ {
+ assertEquals(cnIndexDB.count(), 1);
+ assertFalse(cnIndexDB.isEmpty());
+ final ChangeNumberIndexRecord oldest = cnIndexDB.getOldestRecord();
+ final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord();
+ assertEquals(oldest.getChangeNumber(), newestChangeNumber);
+ assertEquals(oldest.getChangeNumber(), newest.getChangeNumber());
+ assertEquals(oldest.getPreviousCookie(), newest.getPreviousCookie());
+ assertEquals(oldest.getBaseDN(), newest.getBaseDN());
+ assertEquals(oldest.getCSN(), newest.getCSN());
+ }
+
+ private ReplicationServer newReplicationServer() throws Exception
+ {
+ TestCaseUtils.startServer();
+ final int port = TestCaseUtils.findFreePort();
+ final ReplServerFakeConfiguration cfg =
+ new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.LOG, 0, 2, 0, 100, null);
+ cfg.setComputeChangeNumber(true);
+ return new ReplicationServer(cfg);
+ }
+
+ private String getPreviousCookie(FileChangeNumberIndexDB cnIndexDB,
+ long changeNumber) throws Exception
+ {
+ DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(changeNumber);
+ try
+ {
+ return cursor.getRecord().getPreviousCookie();
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
+ long... cns) throws ChangelogException
+ {
+ try
+ {
+ for (int i = 0; i < cns.length; i++)
+ {
+ assertEquals(cursor.getRecord().getChangeNumber(), cns[i]);
+ final boolean isNotLast = i + 1 < cns.length;
+ assertEquals(cursor.next(), isNotLast);
+ }
+ }
+ finally
+ {
+ cursor.close();
+ }
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
new file mode 100644
index 0000000..9f7eca7
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -0,0 +1,575 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.config.ConfigException;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+/**
+ * Test the FileReplicaDB class
+ */
+@SuppressWarnings("javadoc")
+public class FileReplicaDBTest extends ReplicationTestCase
+{
+ private static final DebugTracer TRACER = getTracer();
+ private DN TEST_ROOT_DN;
+
+ /**
+ * Utility - log debug message - highlight it is from the test and not
+ * from the server code. Makes easier to observe the test steps.
+ */
+ private void debugInfo(String tn, String s)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST " + tn + " ** " + s);
+ }
+ }
+
+ @BeforeClass
+ public void setup() throws Exception
+ {
+ TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING);
+ }
+
+ @DataProvider(name = "messages")
+ Object[][] createMessages()
+ {
+ CSN[] csns = generateCSNs(1, 0, 2);
+ return new Object[][] {
+ { new DeleteMsg(TEST_ROOT_DN, csns[0], "uid") },
+ { new DeleteMsg(TEST_ROOT_DN, csns[1], "uid") },
+ };
+ }
+
+ @Test(dataProvider="messages")
+ public void testRecordParser(UpdateMsg msg) throws Exception
+ {
+ RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER;
+
+ ByteString data = parser.encodeRecord(msg.getCSN(), msg);
+ Record<CSN, UpdateMsg> record = parser.decodeRecord(data);
+
+ assertThat(record).isNotNull();
+ assertThat(record.getKey()).isEqualTo(msg.getCSN());
+ assertThat(record.getValue()).isEqualTo(msg);
+ }
+
+ @Test
+ public void testDomainDNWithForwardSlashes() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+
+ replicaDB = newReplicaDB(replicationServer);
+ CSN[] csns = generateCSNs(1, 0, 1);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+
+ waitChangesArePersisted(replicaDB, 1);
+ assertFoundInOrder(replicaDB, csns[0]);
+ }
+ finally
+ {
+ if (replicaDB != null) {
+ replicaDB.shutdown();
+ }
+ remove(replicationServer);
+ }
+ }
+
+ @Test
+ public void testAddAndReadRecords() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+
+ replicaDB = newReplicaDB(replicationServer);
+ CSN[] csns = generateCSNs(1, 0, 5);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+
+ waitChangesArePersisted(replicaDB, 3);
+
+ assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
+ assertNotFound(replicaDB, csns[4]);
+
+ assertEquals(replicaDB.getOldestCSN(), csns[0]);
+ assertEquals(replicaDB.getNewestCSN(), csns[2]);
+
+ DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
+ replicaDB.add(update4);
+ waitChangesArePersisted(replicaDB, 4);
+
+ assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
+ assertFoundInOrder(replicaDB, csns[2], csns[3]);
+ assertFoundInOrder(replicaDB, csns[3]);
+ assertNotFound(replicaDB, csns[4]);
+ }
+ finally
+ {
+ if (replicaDB != null) {
+ replicaDB.shutdown();
+ }
+ remove(replicationServer);
+ }
+ }
+
+ @Test
+ public void testGenerateCursorFrom() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ DBCursor<UpdateMsg> cursor = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+ replicaDB = newReplicaDB(replicationServer);
+
+ CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
+ for (int i = 0; i < 5; i++)
+ {
+ if (i != 3)
+ {
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ }
+ }
+ waitChangesArePersisted(replicaDB, 4);
+
+ cursor = replicaDB.generateCursorFrom(csns[0]);
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getCSN(), csns[1]);
+ StaticUtils.close(cursor);
+
+ cursor = replicaDB.generateCursorFrom(csns[3]);
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getCSN(), csns[4]);
+ StaticUtils.close(cursor);
+
+ cursor = replicaDB.generateCursorFrom(csns[4]);
+ assertFalse(cursor.next());
+ assertNull(cursor.getRecord());
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ if (replicaDB != null) {
+ replicaDB.shutdown();
+ }
+ remove(replicationServer);
+ }
+ }
+
+ @Test
+ public void testGenerateCursorFromWithCursorReinitialization() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ DBCursor<UpdateMsg> cursor = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+ replicaDB = newReplicaDB(replicationServer);
+
+ CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
+
+ cursor = replicaDB.generateCursorFrom(csns[0]);
+ assertFalse(cursor.next());
+
+ for (int i = 0; i < 5; i++)
+ {
+ if (i != 3)
+ {
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ }
+ }
+ waitChangesArePersisted(replicaDB, 4);
+
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getCSN(), csns[1]);
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getCSN(), csns[2]);
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getCSN(), csns[4]);
+ assertFalse(cursor.next());
+ StaticUtils.close(cursor);
+
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ if (replicaDB != null)
+ {
+ replicaDB.shutdown();
+ }
+ remove(replicationServer);
+ }
+ }
+
+ // TODO : enable when purge is enabled with multi-files log implementation
+ @Test(enabled=false)
+ public void testPurge() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+
+ final FileReplicaDB replicaDB = newReplicaDB(replicationServer);
+
+ CSN[] csns = generateCSNs(1, 0, 5);
+
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[3], "uid"));
+
+ waitChangesArePersisted(replicaDB, 4);
+
+ replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
+
+ int count = 0;
+ boolean purgeSucceeded = false;
+ final CSN expectedNewestCSN = csns[3];
+ do
+ {
+ Thread.sleep(10);
+
+ final CSN oldestCSN = replicaDB.getOldestCSN();
+ final CSN newestCSN = replicaDB.getNewestCSN();
+ purgeSucceeded =
+ oldestCSN.equals(expectedNewestCSN)
+ && newestCSN.equals(expectedNewestCSN);
+ count++;
+ }
+ while (!purgeSucceeded && count < 100);
+ assertTrue(purgeSucceeded);
+ }
+ finally
+ {
+ remove(replicationServer);
+ }
+ }
+
+ /**
+ * Test the feature of clearing a FileReplicaDB used by a replication server.
+ * The clear feature is used when a replication server receives a request to
+ * reset the generationId of a given domain.
+ */
+ @Test
+ public void testClear() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+
+ replicaDB = newReplicaDB(replicationServer);
+
+ CSN[] csns = generateCSNs(1, 0, 3);
+
+ // Add the changes and check they are here
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+
+ assertEquals(csns[0], replicaDB.getOldestCSN());
+ assertEquals(csns[2], replicaDB.getNewestCSN());
+
+ // Clear DB and check it is cleared.
+ replicaDB.clear();
+
+ assertEquals(null, replicaDB.getOldestCSN());
+ assertEquals(null, replicaDB.getNewestCSN());
+ }
+ finally
+ {
+ if (replicaDB != null)
+ {
+ replicaDB.shutdown();
+ }
+ remove(replicationServer);
+ }
+ }
+
+ /**
+ * Test the logic that manages counter records in the FileReplicaDB in order to
+ * optimize the oldest and newest records in the replication changelog db.
+ */
+ @Test(enabled=true, groups = { "opendj-256" })
+ public void testGetOldestNewestCSNs() throws Exception
+ {
+ // It's worth testing with 2 different setting for counterRecord
+ // - a counter record is put every 10 Update msg in the db - just a unit
+ // setting.
+ // - a counter record is put every 1000 Update msg in the db - something
+ // closer to real setting.
+ // In both cases, we want to test the counting algorithm,
+ // - when start and stop are before the first counter record,
+ // - when start and stop are before and after the first counter record,
+ // - when start and stop are after the first counter record,
+ // - when start and stop are before and after more than one counter record,
+ // After a purge.
+ // After shutting down/closing and reopening the db.
+
+ // TODO : do we need the management of counter records ?
+ // Use unreachable limits for now because it is not implemented
+ testGetOldestNewestCSNs(40, 100);
+ testGetOldestNewestCSNs(4000, 10000);
+ }
+
+ private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
+ {
+ String tn = "testDBCount("+max+","+counterWindow+")";
+ debugInfo(tn, "Starting test");
+
+ File testRoot = null;
+ ReplicationServer replicationServer = null;
+ ReplicationEnvironment dbEnv = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+
+ testRoot = createCleanDir();
+ dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
+ replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
+ //replicaDB.setCounterRecordWindowSize(counterWindow);
+
+ // Populate the db with 'max' msg
+ int mySeqnum = 1;
+ CSN csns[] = new CSN[2 * (max + 1)];
+ long now = System.currentTimeMillis();
+ for (int i=1; i<=max; i++)
+ {
+ csns[i] = new CSN(now + i, mySeqnum, 1);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ mySeqnum+=2;
+ }
+ waitChangesArePersisted(replicaDB, max, counterWindow);
+
+ assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
+ assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
+
+ // Now we want to test that after closing and reopening the db, the
+ // counting algo is well reinitialized and when new messages are added
+ // the new counter are correctly generated.
+ debugInfo(tn, "SHUTDOWN replicaDB and recreate");
+ replicaDB.shutdown();
+
+ replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
+ //replicaDB.setCounterRecordWindowSize(counterWindow);
+
+ assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
+ assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
+
+ // Populate the db with 'max' msg
+ for (int i=max+1; i<=2 * max; i++)
+ {
+ csns[i] = new CSN(now + i, mySeqnum, 1);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ mySeqnum+=2;
+ }
+ waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
+
+ assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
+ assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
+
+ replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
+
+ String testcase = "AFTER PURGE (oldest, newest)=";
+ debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
+ assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest=");
+
+ // Clear ...
+ debugInfo(tn,"clear:");
+ replicaDB.clear();
+
+ // Check the db is cleared.
+ assertEquals(null, replicaDB.getOldestCSN());
+ assertEquals(null, replicaDB.getNewestCSN());
+ debugInfo(tn,"Success");
+ }
+ finally
+ {
+ if (replicaDB != null)
+ {
+ replicaDB.shutdown();
+ }
+ if (dbEnv != null)
+ {
+ dbEnv.shutdown();
+ }
+ remove(replicationServer);
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+ }
+
+ private CSN[] generateCSNs(int serverId, long timestamp, int number)
+ {
+ CSNGenerator gen = new CSNGenerator(serverId, timestamp);
+ CSN[] csns = new CSN[number];
+ for (int i = 0; i < csns.length; i++)
+ {
+ csns[i] = gen.newCSN();
+ }
+ return csns;
+ }
+
+ private void waitChangesArePersisted(FileReplicaDB replicaDB,
+ int nbRecordsInserted) throws Exception
+ {
+ waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
+ }
+
+ private void waitChangesArePersisted(FileReplicaDB replicaDB,
+ int nbRecordsInserted, int counterWindow) throws Exception
+ {
+ // one counter record is inserted every time "counterWindow"
+ // records have been inserted
+ int expectedNbRecords = nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
+
+ int count = 0;
+ while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
+ {
+ Thread.sleep(10);
+ count++;
+ }
+ assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
+ }
+
+ private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
+ throws IOException, ConfigException
+ {
+ final int changelogPort = findFreePort();
+ final ReplicationServerCfg conf =
+ new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize,
+ windowSize, null);
+ return new ReplicationServer(conf);
+ }
+
+ private FileReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
+ {
+ final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
+ return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
+ }
+
+ private File createCleanDir() throws IOException
+ {
+ String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+ String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
+ + File.separator + "build");
+ path = path + File.separator + "unit-tests" + File.separator + "FileReplicaDB";
+ final File testRoot = new File(path);
+ TestCaseUtils.deleteDirectory(testRoot);
+ testRoot.mkdirs();
+ return testRoot;
+ }
+
+ private void assertFoundInOrder(FileReplicaDB replicaDB, CSN... csns) throws Exception
+ {
+ if (csns.length == 0)
+ {
+ return;
+ }
+
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
+ try
+ {
+ // Cursor points to a null record initially
+ assertNull(cursor.getRecord());
+
+ for (int i = 1; i < csns.length; i++)
+ {
+ final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+ assertTrue(cursor.next(), msg);
+ assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
+ }
+ assertFalse(cursor.next());
+ assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
+ + ", Expected null");
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ private void assertNotFound(FileReplicaDB replicaDB, CSN csn) throws Exception
+ {
+ DBCursor<UpdateMsg> cursor = null;
+ try
+ {
+ cursor = replicaDB.generateCursorFrom(csn);
+ assertFalse(cursor.next());
+ assertNull(cursor.getRecord());
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
new file mode 100644
index 0000000..0a0c37c
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -0,0 +1,385 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+
+import org.opends.messages.Message;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+
+@SuppressWarnings("javadoc")
+@Test(sequential=true)
+public class LogFileTest extends DirectoryServerTestCase
+{
+ private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
+
+ private static final StringRecordParser RECORD_PARSER = new StringRecordParser();
+
+ private static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
+ @Override
+ public Record<String, String> decodeRecord(ByteString data) throws DecodingException
+ {
+ throw new DecodingException(Message.raw("Error when parsing record"));
+ }
+ };
+
+ @BeforeMethod
+ /** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */
+ public void initialize() throws Exception
+ {
+ File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, LogFile.LOG_FILE_NAME);
+ if (theLogFile.exists())
+ {
+ theLogFile.delete();
+ }
+ LogFile<String, String> logFile = getLogFile(RECORD_PARSER);
+
+ for (int i = 1; i <= 10; i++)
+ {
+ logFile.addRecord("key"+i, "value"+i);
+ }
+ logFile.close();
+ }
+
+ @AfterMethod
+ public void cleanTestChangelogDirectory()
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ if (rootPath.exists())
+ {
+ StaticUtils.recursiveDelete(rootPath);
+ }
+ }
+
+ private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
+ {
+ LogFile<String, String> logFile = LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG), parser);
+ return logFile;
+ }
+
+ @Test
+ public void testCursor() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getCursor();
+
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenAnExistingKey() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getCursor("key5");
+
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key5", "value5"));
+ assertThatCursorCanBeFullyRead(cursor, 6, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenAnUnexistingKey() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getCursor("key");
+
+ assertThat(cursor).isNotNull();
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isFalse();
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenANullKey() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getCursor(null);
+
+ // should start from start
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test
+ public void testNearestCursorWhenGivenAnExistingKey() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getNearestCursor("key1");
+
+ // lowest higher key is key2
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key2", "value2"));
+ assertThatCursorCanBeFullyRead(cursor, 3, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test
+ public void testNearestCursorWhenGivenAnUnexistingKey() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getNearestCursor("key0");
+
+ // lowest higher key is key1
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test
+ public void testNearestCursorWhenGivenANullKey() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getNearestCursor(null);
+
+ // should start from start
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test(expectedExceptions=ChangelogException.class)
+ public void testCursorWhenParserFailsToRead() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER_FAILING_TO_READ);
+ try {
+ changelog.getCursor("key");
+ }
+ finally {
+ StaticUtils.close(changelog);
+ }
+ }
+
+ @Test
+ public void testGetOldestRecord() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = changelog.getOldestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key1", "value1"));
+ }
+ finally {
+ StaticUtils.close(changelog);
+ }
+ }
+
+ @Test
+ public void testGetNewestRecord() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = changelog.getNewestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key10", "value10"));
+ }
+ finally {
+ StaticUtils.close(changelog);
+ }
+ }
+
+ @Test
+ /**
+ * Test that changes are visible immediately to a reader after a write.
+ */
+ public void testWriteAndReadOnSameLogFile() throws Exception
+ {
+ LogFile<String, String> writeLog = null;
+ LogFile<String, String> readLog = null;
+ try
+ {
+ writeLog = getLogFile(RECORD_PARSER);
+ readLog = getLogFile(RECORD_PARSER);
+
+ for (int i = 1; i <= 100; i++)
+ {
+ Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
+ writeLog.addRecord(record);
+ assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
+ assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1"));
+ assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
+ assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key1", "value1"));
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writeLog, readLog);
+ }
+ }
+
+ @Test()
+ public void testTwoConcurrentWrite() throws Exception
+ {
+ final LogFile<String, String> writeLog1 = getLogFile(RECORD_PARSER);
+ final LogFile<String, String> writeLog2 = getLogFile(RECORD_PARSER);
+ try
+ {
+ writeLog1.addRecord(Record.from("startkey", "startvalue"));
+ Thread write1 = getWriteLogThread(writeLog1, "a");
+ Thread write2 = getWriteLogThread(writeLog2, "b");
+ write1.run();
+ write2.run();
+
+ write1.join();
+ write2.join();
+ writeLog1.syncToFileSystem();
+ DBCursor<Record<String, String>> cursor = writeLog1.getCursor("startkey");
+ for (int i = 1; i <= 200; i++)
+ {
+ assertThat(cursor.next()).isTrue();
+ }
+ assertThat(cursor.getRecord()).isIn(Record.from("k-b100", "v-b100"), Record.from("k-a100", "v-a100"));
+ }
+ finally
+ {
+ StaticUtils.close(writeLog1, writeLog2);
+ }
+ }
+
+ /**
+ * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
+ * endIndex, using (keyN, valueN) where N is the index.
+ */
+ private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+ throws Exception
+ {
+ for (int i = fromIndex; i <= endIndex; i++)
+ {
+ assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i));
+ }
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ /** Returns a thread that write 100 records to the provided log. */
+ private Thread getWriteLogThread(final LogFile<String, String> writeLog, final String recordPrefix)
+ {
+ return new Thread() {
+ public void run()
+ {
+ for (int i = 1; i <= 100; i++)
+ {
+ Record<String, String> record = Record.from("k-" + recordPrefix + i, "v-" + recordPrefix + i);
+ try
+ {
+ writeLog.addRecord(record);
+ }
+ catch (ChangelogException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Record parser implementation for records with keys as String and values as
+ * String, to be used in tests.
+ */
+ private static class StringRecordParser implements RecordParser<String, String>
+ {
+ private static final byte STRING_SEPARATOR = 0;
+
+ public Record<String, String> decodeRecord(final ByteString data) throws DecodingException
+ {
+ ByteSequenceReader reader = data.asReader();
+ String key = reader.getString(getNextStringLength(reader));
+ reader.skip(1);
+ String value = reader.getString(getNextStringLength(reader));
+ return key.isEmpty() || value.isEmpty() ? null : Record.from(key, value);
+ }
+
+ /** Returns the length of next string by looking for the zero byte used as separator. */
+ private int getNextStringLength(ByteSequenceReader reader)
+ {
+ int length = 0;
+ while (reader.peek(length) != STRING_SEPARATOR)
+ {
+ length++;
+ }
+ return length;
+ }
+
+ public ByteString encodeRecord(String key, String value)
+ {
+ return new ByteStringBuilder()
+ .append(key).append(STRING_SEPARATOR)
+ .append(value).append(STRING_SEPARATOR).toByteString();
+ }
+ }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
new file mode 100644
index 0000000..06ab151
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -0,0 +1,142 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+
+import java.io.Closeable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.data.MapEntry;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+public class ReplicationEnvironmentTest extends DirectoryServerTestCase
+{
+ private static final String DN1_AS_STRING = "cn=test1,dc=company.com";
+ private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com";
+ private static final String DN3_AS_STRING = "cn=test3,dc=company.com";
+
+ private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
+
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ // This test suite depends on having the schema available for DN decoding.
+ TestCaseUtils.startServer();
+ }
+
+ @AfterMethod
+ public void cleanTestChangelogDirectory()
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ if (rootPath.exists())
+ {
+ StaticUtils.recursiveDelete(rootPath);
+ }
+ }
+
+ @Test
+ public void testCreateThenReadChangelogStateWithSingleDN() throws Exception
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.decode(DN1_AS_STRING);
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+ LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
+ LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
+ StaticUtils.close(cnDB, replicaDB, replicaDB2);
+
+ ChangelogState state = environment.readChangelogState();
+
+ assertThat(state.getDomainToServerIds()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(1, 2)));
+ assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+ }
+
+ @Test
+ public void testCreateThenReadChangelogStateWithMultipleDN() throws Exception
+ {
+ File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+ List<LogFile<CSN,UpdateMsg>> replicaDBs = new ArrayList<LogFile<CSN,UpdateMsg>>();
+ for (int i = 0; i <= 2 ; i++)
+ {
+ for (int j = 1; j <= 10; j++)
+ {
+ replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
+ }
+ }
+ StaticUtils.close(cnDB);
+ StaticUtils.close(replicaDBs.toArray(new Closeable[] {}));
+
+ ChangelogState state = environment.readChangelogState();
+
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
+ for (int i = 0; i <= 2 ; i++)
+ {
+ assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ }
+ assertThat(state.getDomainToGenerationId()).containsOnly(
+ MapEntry.entry(domainDNs.get(0), 1L),
+ MapEntry.entry(domainDNs.get(1), 2L),
+ MapEntry.entry(domainDNs.get(2), 3L));
+ }
+
+ @Test(expectedExceptions=ChangelogException.class)
+ public void testMissingDomainDirectory() throws Exception
+ {
+ File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ DN domainDN = DN.decode(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
+ LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
+ StaticUtils.close(replicaDB, replicaDB2);
+
+ // delete the domain directory created for the 2 replica DBs to break the
+ // consistency with domain state file
+ StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
+
+ environment.readChangelogState();
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 788087b..4c85472 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -30,6 +30,7 @@
import java.util.List;
import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
@@ -41,6 +42,8 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -57,6 +60,20 @@
new MultiDomainServerState();
private final List<String> cookies = new ArrayList<String>();
+ private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
+
+ @BeforeClass
+ public void setDBImpl()
+ {
+ setReplicationDBImplementation(ReplicationDBImplementation.JE);
+ }
+
+ @AfterClass
+ public void resetDBImplToPrevious()
+ {
+ setReplicationDBImplementation(previousDBImpl);
+ }
+
@BeforeMethod
public void clearCookie()
{
@@ -254,7 +271,7 @@
TestCaseUtils.startServer();
final int port = TestCaseUtils.findFreePort();
final ReplServerFakeConfiguration cfg =
- new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null);
+ new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.JE, 0, 2, 0, 100, null);
cfg.setComputeChangeNumber(true);
return new ReplicationServer(cfg);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index e431d97..9442e44 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
@@ -43,6 +44,7 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -60,6 +62,20 @@
private static final DebugTracer TRACER = getTracer();
private DN TEST_ROOT_DN;
+ private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
+
+ @BeforeClass
+ public void setDBImpl()
+ {
+ setReplicationDBImplementation(ReplicationDBImplementation.JE);
+ }
+
+ @AfterClass
+ public void resetDBImplToPrevious()
+ {
+ setReplicationDBImplementation(previousDBImpl);
+ }
+
/**
* Utility - log debug message - highlight it is from the test and not
* from the server code. Makes easier to observe the test steps.
@@ -154,7 +170,7 @@
{
final int changelogPort = findFreePort();
final ReplicationServerCfg conf =
- new ReplServerFakeConfiguration(changelogPort, null, 0, 2, queueSize, windowSize, null);
+ new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null);
return new ReplicationServer(conf);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 1833cf0..6dd89d4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -280,8 +280,8 @@
SortedSet<String> replServers) throws Exception
{
ReplServerFakeConfiguration cfg =
- new ReplServerFakeConfiguration(replicationPort, dirName, 0, serverId,
- 0, windowSize, replServers);
+ new ReplServerFakeConfiguration(replicationPort, dirName, replicationDbImplementation, 0,
+ serverId, 0, windowSize, replServers);
return new ReplicationServer(cfg);
}
--
Gitblit v1.10.0