From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java | 6
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java | 25
opends/src/server/org/opends/server/tasks/InitializeTask.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java | 1401 +++++++++++++++++
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java | 17
opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java | 42
opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java | 78
opends/src/server/org/opends/server/replication/server/DbHandler.java | 52
opends/src/server/org/opends/server/replication/server/ServerWriter.java | 29
opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java | 13
opends/src/server/org/opends/server/core/SchemaConfigManager.java | 8
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 11
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java | 23
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 313 +++
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 673 ++++++-
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java | 2
opends/src/messages/messages/replication.properties | 51
opends/src/server/org/opends/server/replication/protocol/StartMessage.java | 34
opends/src/server/org/opends/server/backends/SchemaBackend.java | 15
opends/src/server/org/opends/server/replication/common/ServerState.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 137 +
opends/resource/schema/02-config.ldif | 12
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 113 +
opends/src/server/org/opends/server/replication/protocol/SocketSession.java | 10
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java | 51
opends/src/server/org/opends/server/replication/server/ReplicationCache.java | 346 ++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java | 14
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java | 3
opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java | 5
opends/resource/config/config.ldif | 1
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java | 73
opends/src/server/org/opends/server/replication/server/ServerReader.java | 151 +
opends/src/server/org/opends/server/types/Schema.java | 33
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java | 4
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 139 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 144 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java | 10
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMessage.java | 10
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 42
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 29
opends/src/server/org/opends/server/config/ConfigConstants.java | 24
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 481 ++++-
opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java | 128 +
44 files changed, 4,245 insertions(+), 518 deletions(-)
diff --git a/opends/resource/config/config.ldif b/opends/resource/config/config.ldif
index 0bd7b5c..e37b617 100644
--- a/opends/resource/config/config.ldif
+++ b/opends/resource/config/config.ldif
@@ -57,6 +57,7 @@
ds-cfg-allowed-task: org.opends.server.tasks.ImportTask
ds-cfg-allowed-task: org.opends.server.tasks.InitializeTargetTask
ds-cfg-allowed-task: org.opends.server.tasks.InitializeTask
+ds-cfg-allowed-task: org.opends.server.tasks.SetGenerationIdTask
ds-cfg-allowed-task: org.opends.server.tasks.LeaveLockdownModeTask
ds-cfg-allowed-task: org.opends.server.tasks.RebuildTask
ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 1f0cb54..a7834a5 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -1515,6 +1515,10 @@
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.450 NAME 'ds-cfg-message-body'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.479
+ NAME 'ds-sync-generation-id'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 USAGE directoryOperation SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.451
NAME 'ds-task-import-clear-backend'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
@@ -1559,6 +1563,10 @@
NAME 'ds-cfg-notification-sender-address'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.480
+ NAME 'ds-task-reset-generation-id-domain-base-dn'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.466
NAME 'ds-cfg-plugin-order-subordinate-modify-dn'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
@@ -2251,6 +2259,10 @@
MUST ds-task-disconnect-connection-id
MAY ( ds-task-disconnect-message $ ds-task-disconnect-notify-client )
X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.124
+ NAME 'ds-task-reset-generation-id' SUP ds-task
+ MUST ( ds-task-reset-generation-id-domain-base-dn )
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.121
NAME 'ds-cfg-regular-expression-identity-mapper' SUP ds-cfg-identity-mapper
STRUCTURAL MUST ( ds-cfg-match-attribute $ ds-cfg-match-pattern )
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index da6857b..b319e0d 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -62,10 +62,10 @@
MILD_ERR_UNKNOWN_TYPE_7=Unknown operation type : %s
MILD_ERR_OPERATION_NOT_FOUND_IN_PENDING_9=Internal Error : Operation %s \
change number %s was not found in pending list
-MILD_ERR_COULD_NOT_INITIALIZE_DB_10=Changelog failed to start because the \
+MILD_ERR_COULD_NOT_INITIALIZE_DB_10=The replication server failed to start because the \
database %s could not be opened
-MILD_ERR_COULD_NOT_READ_DB_11=Changelog failed to start because the database \
- %s could not be read
+MILD_ERR_COULD_NOT_READ_DB_11=The replication server failed to start because the database \
+ %s could not be read : %s
MILD_ERR_EXCEPTION_REPLAYING_OPERATION_12=An Exception was caught while \
replaying operation %s : %s
MILD_ERR_NEED_CHANGELOG_PORT_13=The replication server port must be defined
@@ -77,15 +77,15 @@
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
listening on %s
NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \
- changes that this server has already processed
+ changes that this server has already processed on suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
server should be configured
-SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught exception during initial \
- communication with replication server: %s
+NOTICE_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
+ communication on domain %s with replication server %s : %s
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
database for base DN %s
NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \
- replication server that has seen all the local changes. Going to replay \
+ replication server that has seen all the local changes on suffix %s. Going to replay \
changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
server on suffix %s, retrying...
@@ -175,7 +175,7 @@
NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
has been dropped by the Replication Server
SEVERE_ERR_CHANGELOG_ERROR_SENDING_INFO_64=An unexpected error occurred \
- while sending a Server Info message to %s. This connection is going to be \
+ while sending a Server Info message to %s. This connection is going to be \
closed and reopened
SEVERE_ERR_CHANGELOG_ERROR_SENDING_ERROR_65=An unexpected error occurred \
while sending an Error Message to %s. This connection is going to be closed \
@@ -186,11 +186,42 @@
ChangeNumber %s error %s %s
MILD_ERR_UNKNOWN_ATTRIBUTE_IN_HISTORICAL_68=The entry %s has historical \
information for attribute %s which is not defined in the schema. This \
- information will be ignored
+ information will be ignored
NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s
SEVERE_ERR_COULD_NOT_CLOSE_THE_SOCKET_70=The Replication Server socket could not \
be closed : %s
SEVERE_ERR_COULD_NOT_STOP_LISTEN_THREAD_71=The thread listening on the \
replication server port could not be stopped : %s
DEBUG_REPLICATION_PORT_IOEXCEPTION_72=An IOException was caught while \
- listening on the replication port
\ No newline at end of file
+ listening on the replication port
+SEVERE_ERR_SEARCHING_GENERATION_ID_73=An unexpected error %s occured when \
+searching for generation id for domain : %s
+SEVERE_ERR_SEARCHING_DOMAIN_BACKEND_74=An unexpected error occured when \
+searching for the backend of the domain : %s
+SEVERE_ERR_LOADING_GENERATION_ID_75=An unexpected error occured when \
+searching in %s for the generation ID : %s
+SEVERE_ERR_UPDATING_GENERATION_ID_76=An unexpected error %s occured \
+when updating generation ID for the domain : %s
+NOTICE_BAD_GENERATION_ID_77=On suffix %s. server %s presented generation ID=%s \
+when expected generation ID=%s. Consequently, replication is degraded for that server
+NOTICE_RESET_GENERATION_ID_78=The generation ID has been reset for domain %s.\
+Replication is now degraded for this domain
+MILD_ERR_ERROR_MSG_RECEIVED_79=The following error has been received : <%s>
+MILD_ERR_IGNORING_UPDATE_FROM_80=Update <%s> received from server <%s> is \
+ignored due to a bad generation ID of this server
+MILD_ERR_IGNORING_UPDATE_TO_81=Update <%s> will not be sent to server %s that has \
+not the right generation ID
+SEVERE_ERR_INIT_IMPORT_NOT_SUPPORTED_82= Initialization cannot be done because \
+import is not supported by the backend %s
+SEVERE_ERR_INIT_EXPORT_NOT_SUPPORTED_83= Initialization cannot be done because \
+export is not supported by the backend %s
+SEVERE_ERR_INIT_CANNOT_LOCK_BACKEND_84= Initialization cannot be done because \
+the following error occured while locking the backend %s : %s
+NOTICE_EXCEPTION_RESTARTING_SESSION_85=Caught Exception during reinitialization of \
+ communication on domain %s : %s
+SEVERE_ERR_EXCEPTION_LISTENING_86=Replication server caught exception while \
+ listening for client connections %s
+SEVERE_ERR_ERROR_CLEARING_DB_87=While clearing the database %s , the following \
+ error happened: %s
+ NOTICE_ERR_ROUTING_TO_SERVER_88=Protocol error : a replication server is not expected \
+ to be the destination of a message of type %s
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/backends/SchemaBackend.java b/opends/src/server/org/opends/server/backends/SchemaBackend.java
index fab9a47..f4b8e02 100644
--- a/opends/src/server/org/opends/server/backends/SchemaBackend.java
+++ b/opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -171,6 +171,10 @@
// The attribute type that will be used to save the synchronization state.
private AttributeType synchronizationStateType;
+ // The attribute type that will be used to save the synchronization
+ // generationId.
+ private AttributeType synchronizationGenerationIdType;
+
// The value containing DN of the user we'll say created the configuration.
private AttributeValue creatorsName;
@@ -264,6 +268,9 @@
nameFormsType = DirectoryServer.getAttributeType(ATTR_NAME_FORMS_LC, true);
synchronizationStateType =
DirectoryServer.getAttributeType(ATTR_SYNCHRONIZATION_STATE_LC, true);
+ synchronizationGenerationIdType =
+ DirectoryServer.getAttributeType(ATTR_SYNCHRONIZATION_GENERATIONID_LC,
+ true);
// Initialize the lastmod attributes.
@@ -912,6 +919,14 @@
attrList.add(attr);
operationalAttrs.put(synchronizationStateType, attrList);
+ // Add the synchronization GenerationId attribute.
+ valueSet = DirectoryServer.getSchema().getSynchronizationGenerationId();
+ attr = new Attribute(synchronizationGenerationIdType,
+ ATTR_SYNCHRONIZATION_GENERATIONID_LC, valueSet);
+ attrList = new ArrayList<Attribute>(1);
+ attrList.add(attr);
+ operationalAttrs.put(synchronizationGenerationIdType, attrList);
+
// Add all the user-defined attributes.
for (Attribute a : userDefinedAttributes)
{
diff --git a/opends/src/server/org/opends/server/config/ConfigConstants.java b/opends/src/server/org/opends/server/config/ConfigConstants.java
index 592f63f..ea43bf3 100644
--- a/opends/src/server/org/opends/server/config/ConfigConstants.java
+++ b/opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -1335,12 +1335,19 @@
public static final String ATTR_MATCHING_RULE_USE_LC = "matchingruleuse";
/**
- * The name of the attribute that holds the sycnhronization state,
+ * The name of the attribute that holds the synchronization state,
* formatted in lowercase.
*/
public static final String ATTR_SYNCHRONIZATION_STATE_LC = "ds-sync-state";
/**
+ * The name of the attribute that holds the relication generationId,
+ * formatted in lowercase.
+ */
+ public static final String ATTR_SYNCHRONIZATION_GENERATIONID_LC =
+ "ds-sync-generation-id";
+
+ /**
* The default maximum request size that should be used if none is specified
* in the configuration.
*/
@@ -4248,6 +4255,21 @@
NAME_PREFIX_TASK + "rebuild-max-threads";
/**
+ * The name of the objectclass that will be used for a Directory Server
+ * reset generationId task definition.
+ */
+ public static final String OC_RESET_GENERATION_ID_TASK =
+ NAME_PREFIX_TASK + "reset-generation-id";
+
+
+ /**
+ * The name of the attribute containing the baseDn related to the replication
+ * domain to which applies the task.
+ */
+ public static final String ATTR_TASK_SET_GENERATION_ID_DOMAIN_DN =
+ OC_RESET_GENERATION_ID_TASK + "-domain-base-dn";
+
+ /**
* The name of the attribute in an import task definition that specifies
* whether the backend should be cleared before the import.
*/
diff --git a/opends/src/server/org/opends/server/core/SchemaConfigManager.java b/opends/src/server/org/opends/server/core/SchemaConfigManager.java
index fcdd15d..938d54e 100644
--- a/opends/src/server/org/opends/server/core/SchemaConfigManager.java
+++ b/opends/src/server/org/opends/server/core/SchemaConfigManager.java
@@ -720,6 +720,14 @@
new MatchingRuleUseSyntax());
}
+ AttributeType synchronizationGenerationIdType =
+ schema.getAttributeType(ATTR_SYNCHRONIZATION_GENERATIONID_LC);
+ if (synchronizationGenerationIdType == null)
+ {
+ synchronizationGenerationIdType = DirectoryServer.getDefaultAttributeType
+ (ATTR_SYNCHRONIZATION_GENERATIONID_LC, new MatchingRuleUseSyntax());
+ }
+
List<Attribute> synchronizationState =
entry.getAttribute(synchronizationStateType);
if (synchronizationState != null && !(synchronizationState.isEmpty()))
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 048891a..530e914 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -41,7 +41,7 @@
/**
* ServerState class.
- * This object is used to store the last update seem on this server
+ * This object is used to store the last update seen on this server
* from each server.
* It is exchanged with the replication servers at connection establishment
* time.
@@ -204,7 +204,7 @@
*/
public ArrayList<ASN1OctetString> toASN1ArrayList()
{
- ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
+ ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(0);
synchronized (this)
{
diff --git a/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java b/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
index c040325..e86e485 100644
--- a/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
+++ b/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
@@ -28,6 +28,8 @@
package org.opends.server.replication.plugin;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
import org.opends.server.loggers.debug.DebugTracer;
import java.io.IOException;
@@ -99,8 +101,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Heartbeat monitor is starting, expected interval is %d",
- heartbeatInterval);
+ TRACER.debugInfo("Heartbeat monitor is starting, expected interval is " +
+ heartbeatInterval +
+ stackTraceToSingleLineString(new Exception()));
}
try
{
@@ -110,9 +113,6 @@
long lastReceiveTime = session.getLastReceiveTime();
if (now > lastReceiveTime + 2 * heartbeatInterval)
{
- TRACER.debugInfo("Heartbeat monitor is closing the broker session " +
- "because it could not detect a heartbeat.");
-
// Heartbeat is well overdue so the server is assumed to be dead.
if (debugEnabled())
{
@@ -140,7 +140,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Heartbeat monitor is exiting.");
+ TRACER.debugInfo("Heartbeat monitor is exiting." +
+ stackTraceToSingleLineString(new Exception()));
}
}
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
index 34f508e..fe033d7 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -313,9 +313,6 @@
{
ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
- if (values.size() == 0)
- return ResultCode.SUCCESS;
-
LDAPAttribute attr =
new LDAPAttribute(REPLICATION_STATE, values);
LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
@@ -343,4 +340,26 @@
}
return op.getResultCode();
}
+
+ /**
+ * Empty the ServerState.
+ * After this call the Server State will be in the same state
+ * as if it was just created.
+ */
+ public void clearInMemory()
+ {
+ super.clear();
+ this.savedStatus = false;
+ }
+
+ /**
+ * Empty the ServerState.
+ * After this call the Server State will be in the same state
+ * as if it was just created.
+ */
+ public void clear()
+ {
+ clearInMemory();
+ save();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
index 45d63cc..dbc4c63 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.io.OutputStream;
-
/**
* This class creates an output stream that can be used to export entries
* to a synchonization domain.
@@ -37,7 +36,15 @@
public class ReplLDIFOutputStream
extends OutputStream
{
+ // The synchronization domain on which the export is done
ReplicationDomain domain;
+
+ // The number of entries to be exported
+ long numEntries;
+
+ // The current number of entries exported
+ long numExportedEntries;
+
String entryBuffer = "";
/**
@@ -45,10 +52,12 @@
* domain.
*
* @param domain The replication domain
+ * @param numEntries The max number of entry to process.
*/
- public ReplLDIFOutputStream(ReplicationDomain domain)
+ public ReplLDIFOutputStream(ReplicationDomain domain, long numEntries)
{
this.domain = domain;
+ this.numEntries = numEntries;
}
/**
@@ -75,11 +84,19 @@
endOfEntryIndex = ebytes.indexOf("\n\n");
if ( endOfEntryIndex >= 0 )
{
+
endOfEntryIndex += 2;
entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
// Send the entry
- domain.sendEntryLines(entryBuffer);
+ if ((numEntries>0) && (numExportedEntries > numEntries))
+ {
+ // This outputstream has reached the total number
+ // of entries to export.
+ return;
+ }
+ domain.exportLDIFEntry(entryBuffer);
+ numExportedEntries++;
startOfEntryIndex = startOfEntryIndex + endOfEntryIndex;
entryBuffer = "";
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index ed6270d..6733366 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,8 +25,7 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -93,6 +92,7 @@
private int maxRcvWindow;
private int timeout = 0;
private short protocolVersion;
+ private long generationId = -1;
private ReplSessionSecurity replSessionSecurity;
/**
@@ -143,12 +143,15 @@
* @param window The size of the send and receive window to use.
* @param heartbeatInterval The interval between heartbeats requested of the
* replicationServer, or zero if no heartbeats are requested.
+ *
+ * @param generationId The generationId for the server associated to the
+ * provided serverID and for the domain associated to the provided baseDN.
* @param replSessionSecurity The session security configuration.
*/
public ReplicationBroker(ServerState state, DN baseDn, short serverID,
int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
int maxSendDelay, int window, long heartbeatInterval,
- ReplSessionSecurity replSessionSecurity)
+ long generationId, ReplSessionSecurity replSessionSecurity)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -164,6 +167,7 @@
this.halfRcvWindow = window/2;
this.heartbeatInterval = heartbeatInterval;
this.protocolVersion = ProtocolVersion.currentVersion();
+ this.generationId = generationId;
this.replSessionSecurity = replSessionSecurity;
}
@@ -198,7 +202,7 @@
*/
private void connect()
{
- ReplServerStartMessage startMsg;
+ ReplServerStartMessage replServerStartMsg;
// Stop any existing heartbeat monitor from a previous session.
if (heartbeatMonitor != null)
@@ -207,8 +211,24 @@
heartbeatMonitor = null;
}
+ // checkState is true for the first loop on all replication servers
+ // looking for one already up-to-date.
+ // If we found some responding replication servers but none up-to-date
+ // then we set check-state to false and do a second loop where the first
+ // found will be the one elected and then we will update this replication
+ // server.
boolean checkState = true;
boolean receivedResponse = true;
+
+ // TODO: We are doing here 2 loops opening , closing , reopening session to
+ // the same servers .. risk to have 'same server id' erros.
+ // Would be better to do only one loop, keeping the best candidate while
+ // traversing the list of replication servers to connect to.
+ if (servers.size()==1)
+ {
+ checkState = false;
+ }
+
synchronized (connectPhaseLock)
{
while ((!connected) && (!shutdown) && (receivedResponse))
@@ -240,7 +260,7 @@
ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
halfRcvWindow*2, heartbeatInterval, state,
- protocolVersion, isSslEncryption);
+ protocolVersion, generationId, isSslEncryption);
session.publish(msg);
@@ -248,7 +268,7 @@
* Read the ReplServerStartMessage that should come back.
*/
session.setSoTimeout(1000);
- startMsg = (ReplServerStartMessage) session.receive();
+ replServerStartMsg = (ReplServerStartMessage) session.receive();
receivedResponse = true;
/*
@@ -257,7 +277,7 @@
* if it is an old replication server).
*/
protocolVersion = ProtocolVersion.minWithCurrent(
- startMsg.getVersion());
+ replServerStartMsg.getVersion());
session.setSoTimeout(timeout);
if (!isSslEncryption)
@@ -276,7 +296,7 @@
* those changes and send them again to any replicationServer.
*/
ChangeNumber replServerMaxChangeNumber =
- startMsg.getServerState().getMaxChangeNumber(serverID);
+ replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
if (replServerMaxChangeNumber == null)
replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
ChangeNumber ourMaxChangeNumber =
@@ -285,7 +305,7 @@
(ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
+ maxSendWindow = replServerStartMsg.getWindowSize();
connected = true;
startHeartBeat();
break;
@@ -298,7 +318,8 @@
* of our changes, we are going to try another server
* but before log a notice message
*/
- Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server);
+ Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
+ baseDn.toNormalizedString());
logError(message);
}
else
@@ -341,7 +362,7 @@
else
{
replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
+ maxSendWindow = replServerStartMsg.getWindowSize();
connected = true;
for (FakeOperation replayOp : replayOperations)
{
@@ -371,8 +392,9 @@
}
catch (Exception e)
{
- Message message =
- ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage());
+ Message message = NOTE_EXCEPTION_STARTING_SESSION.get(
+ baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
logError(message);
}
finally
@@ -392,7 +414,9 @@
}
}
}
- }
+ } // for servers
+
+ // We have traversed all the replication servers
if ((!connected) && (checkState == true) && receivedResponse)
{
@@ -401,12 +425,16 @@
* changes that this server has already processed, start again
* the loop looking for any replicationServer.
*/
- Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get();
+ Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
+ baseDn.toNormalizedString());
logError(message);
checkState = false;
}
}
+ // We have traversed all the replication servers as many times as needed
+ // to find one if one is up and running.
+
if (connected)
{
// This server has connected correctly.
@@ -462,9 +490,9 @@
/**
* restart the ReplicationBroker.
*/
- private void reStart()
+ public void reStart()
{
- reStart(null);
+ reStart(this.session);
}
/**
@@ -472,7 +500,7 @@
*
* @param failingSession the socket which failed
*/
- private void reStart(ProtocolSession failingSession)
+ public void reStart(ProtocolSession failingSession)
{
try
{
@@ -498,7 +526,8 @@
} catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage()));
+ mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
+ baseDn.toNormalizedString(), e.getLocalizedMessage()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
@@ -533,6 +562,13 @@
// choice than to return without sending the ReplicationMessage
// and relying on the resend procedure of the connect phase to
// fix the problem when we finally connect.
+
+ if (debugEnabled())
+ {
+ debugInfo("ReplicationBroker.publish() Publishing a " +
+ " message is not possible due to existing connection error.");
+ }
+
return;
}
@@ -601,12 +637,22 @@
} catch (InterruptedException e1)
{
// ignore
+ if (debugEnabled())
+ {
+ debugInfo("ReplicationBroker.publish() " +
+ "IO exception raised : " + e.getLocalizedMessage());
+ }
}
}
}
catch (InterruptedException e)
{
// just loop.
+ if (debugEnabled())
+ {
+ debugInfo("ReplicationBroker.publish() " +
+ "Interrupted exception raised." + e.getLocalizedMessage());
+ }
}
}
}
@@ -628,7 +674,7 @@
{
if (!connected)
{
- reStart();
+ reStart(null);
}
ProtocolSession failingSession = session;
@@ -663,6 +709,9 @@
Message message =
NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
logError(message);
+
+ debugInfo("ReplicationBroker.receive() " + baseDn +
+ " Exception raised." + e + e.getLocalizedMessage());
this.reStart(failingSession);
}
}
@@ -683,7 +732,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker Stop Closing session");
+ debugInfo("ReplicationBroker is stopping. and will" +
+ "close the connection");
}
if (session != null)
@@ -713,6 +763,20 @@
}
/**
+ * Set the value of the generationId for that broker. Normally the
+ * generationId is set through the constructor but there are cases
+ * where the value of the generationId must be changed while the broker
+ * already exist for example after an on-line import.
+ *
+ * @param generationId The value of the generationId.
+ *
+ */
+ public void setGenerationId(long generationId)
+ {
+ this.generationId = generationId;
+ }
+
+ /**
* Get the name of the replicationServer to which this broker is currently
* connected.
*
@@ -857,6 +921,13 @@
return !connectionError;
}
+ private boolean debugEnabled() { return true; }
+ private static final void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ TRACER.debugInfo(s);
+ }
+
/**
* Determine whether the connection to the replication server is encrypted.
* @return true if the connection is encrypted, false otherwise.
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 788f72d..0724851 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -27,8 +27,6 @@
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
-
-import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -38,12 +36,14 @@
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import org.opends.server.protocols.asn1.ASN1OctetString;
import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -52,12 +52,14 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
+import java.util.zip.Adler32;
+import java.io.OutputStream;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
-import org.opends.server.admin.std.server.BackendCfg;
import org.opends.server.api.AlertGenerator;
import org.opends.server.api.Backend;
import org.opends.server.api.DirectoryThread;
@@ -77,6 +79,9 @@
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPAttribute;
+import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
@@ -89,6 +94,7 @@
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
@@ -100,6 +106,7 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.RDN;
+import org.opends.server.types.RawModification;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
@@ -172,6 +179,9 @@
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
private int maxSendDelay = 0;
+ private long generationId = -1;
+ private long rejectedGenerationId = -1;
+ private boolean requestedResetSinceLastStart = false;
/**
* This object is used to store the list of update currently being
@@ -224,7 +234,7 @@
private int window = 100;
/**
- * The isoalation policy that this domain is going to use.
+ * The isolation policy that this domain is going to use.
* This field describes the behavior of the domain when an update is
* attempted and the domain could not connect to any Replication Server.
* Possible values are accept-updates or deny-updates, but other values
@@ -254,17 +264,20 @@
// The total entry count expected to be processed
long entryCount = 0;
- // The count for the entry left to be processed
+ // The count for the entry not yet processed
long entryLeftCount = 0;
+ boolean checksumOutput = false;
+
// The exception raised when any
DirectoryException exception = null;
+ long checksumOutputValue = (long)0;
/**
- * Initializes the counters of the task with the provider value.
+ * Initializes the import/export counters with the provider value.
* @param count The value with which to initialize the counters.
*/
- public void initTaskCounters(long count)
+ public void initImportExportCounters(long count)
{
entryCount = count;
entryLeftCount = count;
@@ -288,7 +301,7 @@
* Update the counters of the task for each entry processed during
* an import or export.
*/
- public void updateTaskCounters()
+ public void updateCounters()
{
entryLeftCount--;
@@ -342,12 +355,12 @@
configDn = configuration.dn();
/*
- * Modify conflicts are solved for all suffixes but the schema suffix
- * because we don't want to store extra information in the schema
- * ldif files.
- * This has no negative impact because the changes on schema should
- * not produce conflicts.
- */
+ * Modify conflicts are solved for all suffixes but the schema suffix
+ * because we don't want to store extra information in the schema
+ * ldif files.
+ * This has no negative impact because the changes on schema should
+ * not produce conflicts.
+ */
if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
{
solveConflictFlag = false;
@@ -370,27 +383,33 @@
monitor = new ReplicationMonitor(this);
DirectoryServer.registerMonitorProvider(monitor);
+ backend = retrievesBackend(baseDN);
+ if (backend == null)
+ {
+ throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
+ baseDN.toNormalizedString()));
+ }
+
+ try
+ {
+ generationId = loadGenerationId();
+ }
+ catch (DirectoryException e)
+ {
+ logError(ERR_LOADING_GENERATION_ID.get(
+ baseDN.toNormalizedString(), e.getLocalizedMessage()));
+ }
+
/*
* create the broker object used to publish and receive changes
*/
broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
maxReceiveDelay, maxSendQueue, maxSendDelay, window,
- heartbeatInterval, new ReplSessionSecurity(configuration));
+ heartbeatInterval, generationId,
+ new ReplSessionSecurity(configuration));
broker.start(replicationServers);
- // Retrieves the related backend and its config entry
- try
- {
- retrievesBackendInfos(baseDN);
- } catch (DirectoryException e)
- {
- // The backend associated to this suffix is not able to
- // perform export and import.
- // The replication can continue but this replicationDomain
- // won't be able to use total update.
- }
-
/*
* ChangeNumberGenerator is used to create new unique ChangeNumbers
* for each operation done on the replication domain.
@@ -558,7 +577,7 @@
* If not set the ResultCode and the response message,
* interrupt the operation, and return false
*
- * @param op The Operation that needs to be checked.
+ * @param Operation The Operation that needs to be checked.
*
* @return true when it OK to process the Operation, false otherwise.
* When false is returned the resultCode and the reponse message
@@ -798,7 +817,11 @@
// The server is in the shutdown process
return null;
}
- log(Message.raw("Broker received message :" + msg));
+
+ if (debugEnabled())
+ if (!(msg instanceof HeartbeatMessage))
+ TRACER.debugInfo("Message received <" + msg + ">");
+
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -808,7 +831,7 @@
{
// Another server requests us to provide entries
// for a total update
- initMsg = (InitializeRequestMessage) msg;
+ initMsg = (InitializeRequestMessage)msg;
}
else if (msg instanceof InitializeTargetMessage)
{
@@ -822,20 +845,19 @@
// bunch of entries from the remote server and we
// want the import thread to catch them and
// not the ListenerThread.
- importBackend(importMsg);
+ initialize(importMsg);
}
catch(DirectoryException de)
{
+ // Returns an error message to notify the sender
+ ErrorMessage errorMsg =
+ new ErrorMessage(importMsg.getsenderID(),
+ de.getMessageObject());
MessageBuilder mb = new MessageBuilder();
mb.append(de.getMessageObject());
mb.append("Backend ID: ");
mb.append(backend.getBackendID());
- log(mb.toMessage());
-
- // Return an error message to notify the sender
- ErrorMessage errorMsg =
- new ErrorMessage(importMsg.getsenderID(),
- de.getMessageObject());
+ TRACER.debugInfo(Message.toString(mb.toMessage()));
broker.publish(errorMsg);
}
}
@@ -850,6 +872,39 @@
// replicationServer did not find any import source.
abandonImportExport((ErrorMessage)msg);
}
+ else
+ {
+ /* We can receive an error message from the replication server
+ * in the following cases :
+ * - we connected with an incorrect generation id
+ */
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+
+ if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId())
+ {
+ TRACER.debugInfo("requestedResetSinceLastStart=" +
+ requestedResetSinceLastStart +
+ "rejectedGenerationId=" + rejectedGenerationId);
+
+ if (requestedResetSinceLastStart && (rejectedGenerationId>0))
+ {
+ // When the last generation presented was refused and we are
+ // the 'reseter' server then restart automatically to become
+ // the 'master'
+ state.clear();
+ rejectedGenerationId = -1;
+ requestedResetSinceLastStart = false;
+ broker.stop();
+ broker.start(replicationServers);
+ }
+ }
+ if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId())
+ {
+ rejectedGenerationId = generationId;
+ }
+ }
}
else if (msg instanceof UpdateMessage)
{
@@ -875,7 +930,7 @@
{
try
{
- initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+ initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
null);
}
catch(DirectoryException de)
@@ -2100,7 +2155,7 @@
public void disable()
{
state.save();
- state.clear();
+ state.clearInMemory();
disabled = true;
// stop the listener threads
for (ListenerThread thread : synchroThreads)
@@ -2126,20 +2181,252 @@
* The domain will connect back to a replication Server and
* will recreate threads to listen for messages from the Sycnhronization
* server.
+ * The generationId will be retrieved or computed if necessary.
* The ServerState will also be read again from the local database.
*/
public void enable()
{
- state.clear();
+ state.clearInMemory();
state.loadState();
disabled = false;
+
+ try
+ {
+ generationId = loadGenerationId();
+ }
+ catch (Exception e)
+ {
+ /* TODO should mark that replicationServer service is
+ * not available, log an error and retry upon timeout
+ * should we stop the modifications ?
+ */
+ logError(ERR_LOADING_GENERATION_ID.get(
+ baseDN.toNormalizedString(), e.getLocalizedMessage()));
+ return;
+ }
+
+ // After an on-line import, the value of the generationId is new
+ // and it is necessary for the broker to send this new value as part
+ // of the serverStart message.
+ broker.setGenerationId(generationId);
+
broker.start(replicationServers);
createListeners();
}
/**
+ * Compute the data generationId associated with the current data present
+ * in the backend for this domain.
+ * @return The computed generationId.
+ * @throws DirectoryException When an error occurs.
+ */
+ public long computeGenerationId() throws DirectoryException
+ {
+ long bec = backend.getEntryCount();
+ if (bec<0)
+ backend = this.retrievesBackend(baseDN);
+ bec = backend.getEntryCount();
+ this.acquireIEContext();
+ ieContext.checksumOutput = true;
+ ieContext.entryCount = (bec<1000?bec:1000);
+ ieContext.entryLeftCount = ieContext.entryCount;
+ exportBackend();
+ long genId = ieContext.checksumOutputValue;
+
+ if (debugEnabled())
+ TRACER.debugInfo("Computed generationId: #entries=" + bec +
+ " generationId=" + ieContext.checksumOutputValue);
+ ieContext.checksumOutput = false;
+ this.releaseIEContext();
+ return genId;
+ }
+
+ /**
+ * Returns the generationId set for this domain.
+ *
+ * @return The generationId.
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * The attribute name used to store the state in the backend.
+ */
+ protected static final String REPLICATION_GENERATION_ID =
+ "ds-sync-generation-id";
+
+ /**
+ * Stores the value of the generationId.
+ * @param generationId The value of the generationId.
+ * @return a ResultCode indicating if the method was successfull.
+ */
+ public ResultCode saveGenerationId(long generationId)
+ {
+ // The generationId is stored in the root entry of the domain.
+ ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+
+ ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
+ ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
+ values.add(value);
+
+ LDAPAttribute attr =
+ new LDAPAttribute(REPLICATION_GENERATION_ID, values);
+ LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
+ ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
+ mods.add(mod);
+
+ ModifyOperationBasis op =
+ new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(),
+ new ArrayList<Control>(0), asn1BaseDn,
+ mods);
+ op.setInternalOperation(true);
+ op.setSynchronizationOperation(true);
+ op.setDontSynchronize(true);
+
+ op.run();
+
+ ResultCode result = op.getResultCode();
+ if (result != ResultCode.SUCCESS)
+ {
+ Message message = ERR_UPDATING_GENERATION_ID.get(
+ op.getResultCode().getResultCodeName() + " " +
+ op.getErrorMessage(),
+ baseDN.toString());
+ logError(message);
+ }
+ return result;
+ }
+
+
+ /**
+ * Load the GenerationId from the root entry of the domain
+ * from the REPLICATION_GENERATION_ID attribute in database
+ * to memory, or compute it if not found.
+ *
+ * @return generationId The retrieved value of generationId
+ * @throws DirectoryException When an error occurs.
+ */
+ public long loadGenerationId()
+ throws DirectoryException
+ {
+ long generationId=-1;
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "Attempt to read generation ID from DB " + baseDN.toString());
+
+ ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+ boolean found = false;
+ LDAPFilter filter;
+ try
+ {
+ filter = LDAPFilter.decode("objectclass=*");
+ }
+ catch (LDAPException e)
+ {
+ // can not happen
+ return -1;
+ }
+
+ /*
+ * Search the database entry that is used to periodically
+ * save the ServerState
+ */
+ InternalSearchOperation search = null;
+ LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
+ attributes.add(REPLICATION_GENERATION_ID);
+ search = conn.processSearch(asn1BaseDn,
+ SearchScope.BASE_OBJECT,
+ DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
+ filter,attributes);
+ if (((search.getResultCode() != ResultCode.SUCCESS)) &&
+ ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
+ {
+ Message message = ERR_SEARCHING_GENERATION_ID.get(
+ search.getResultCode().getResultCodeName() + " " +
+ search.getErrorMessage(),
+ baseDN.toString());
+ logError(message);
+ }
+
+ SearchResultEntry resultEntry = null;
+ if (search.getResultCode() == ResultCode.SUCCESS)
+ {
+ LinkedList<SearchResultEntry> result = search.getSearchEntries();
+ resultEntry = result.getFirst();
+ if (resultEntry != null)
+ {
+ AttributeType synchronizationGenIDType =
+ DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
+ List<Attribute> attrs =
+ resultEntry.getAttribute(synchronizationGenIDType);
+ if (attrs != null)
+ {
+ Attribute attr = attrs.get(0);
+ LinkedHashSet<AttributeValue> values = attr.getValues();
+ if (values.size()!=1)
+ {
+ Message message = ERR_LOADING_GENERATION_ID.get(
+ baseDN.toString(), "#Values != 1");
+ logError(message);
+ }
+ else
+ {
+ found=true;
+ try
+ {
+ generationId = Long.decode(values.iterator().next().
+ getStringValue());
+ }
+ catch(Exception e)
+ {
+ Message message = ERR_LOADING_GENERATION_ID.get(
+ baseDN.toString(), e.getLocalizedMessage());
+ logError(message);
+ }
+ }
+ }
+ }
+ }
+
+ if (!found)
+ {
+ generationId = computeGenerationId();
+ saveGenerationId(generationId);
+
+ if (debugEnabled())
+ TRACER.debugInfo("Generation ID created for domain base DN=" +
+ baseDN.toString() +
+ " generationId=" + generationId);
+ }
+ else
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "Generation ID successfully read from domain base DN=" + baseDN +
+ " generationId=" + generationId);
+ }
+ return generationId;
+ }
+
+ /**
+ * Reset the generationId of this domain in the whole topology.
+ * A message is sent to the Replication Servers for them to reset
+ * their change dbs.
+ */
+ public void resetGenerationId()
+ {
+ requestedResetSinceLastStart = true;
+ ResetGenerationId genIdMessage = new ResetGenerationId();
+ broker.publish(genIdMessage);
+ }
+
+ /**
* Do whatever is needed when a backup is started.
* We need to make sure that the serverState is correclty save.
*/
@@ -2175,18 +2462,20 @@
{
msg = broker.receive();
+ if (debugEnabled())
+ TRACER.debugInfo("Import: EntryBytes received " + msg);
if (msg == null)
{
// The server is in the shutdown process
return null;
}
- log(Message.raw("receiveEntryBytes: received " + msg));
+
if (msg instanceof EntryMessage)
{
// FIXME
EntryMessage entryMsg = (EntryMessage)msg;
byte[] entryBytes = entryMsg.getEntryBytes().clone();
- ieContext.updateTaskCounters();
+ ieContext.updateCounters();
return entryBytes;
}
else if (msg instanceof DoneMessage)
@@ -2202,8 +2491,9 @@
// The error is stored and the import is ended
// by returning null
ErrorMessage errorMsg = (ErrorMessage)msg;
- ieContext.exception = new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails());
+ ieContext.exception = new DirectoryException(
+ ResultCode.OTHER,
+ errorMsg.getDetails());
return null;
}
else
@@ -2213,9 +2503,10 @@
}
catch(Exception e)
{
+ // TODO: i18n
ieContext.exception = new DirectoryException(ResultCode.OTHER,
- Message.raw("received an unexpected message type"), e);
- return null;
+ Message.raw("received an unexpected message type" +
+ e.getLocalizedMessage()));
}
}
}
@@ -2299,26 +2590,17 @@
}
/**
- * Log debug message.
- * @param message The message to log.
+ * Export the entries from the backend.
+ * The ieContext must have been set before calling.
+ *
+ * @throws DirectoryException when an error occured
*/
- private void log(Message message)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("DebugInfo" + message);
- }
- }
-
- /**
- * Export the entries.
- * @throws DirectoryException when an error occurred
- */
- protected void exportBackend() throws DirectoryException
+ protected void exportBackend()
+ throws DirectoryException
{
// FIXME Temporary workaround - will probably be fixed when implementing
// dynamic config
- retrievesBackendInfos(this.baseDN);
+ backend = retrievesBackend(this.baseDN);
// Acquire a shared lock for the backend.
try
@@ -2344,13 +2626,53 @@
ResultCode.OTHER, message, null);
}
- ReplLDIFOutputStream os = new ReplLDIFOutputStream(this);
+ OutputStream os;
+ ReplLDIFOutputStream ros;
+ if (ieContext.checksumOutput)
+ {
+ ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
+ os = new CheckedOutputStream(ros, new Adler32());
+ try
+ {
+ os.write((Long.toString(ieContext.entryCount)).getBytes());
+ }
+ catch(Exception e)
+ {
+ // Should never happen
+ }
+ }
+ else
+ {
+ ros = new ReplLDIFOutputStream(this, (short)-1);
+ os = ros;
+ }
LDIFExportConfig exportConfig = new LDIFExportConfig(os);
+
+ // baseDN branch is the only one included in the export
List<DN> includeBranches = new ArrayList<DN>(1);
includeBranches.add(this.baseDN);
exportConfig.setIncludeBranches(includeBranches);
+ // For the checksum computing mode, only consider the 'stable' attributes
+ if (ieContext.checksumOutput)
+ {
+ String includeAttributeStrings[] =
+ {"objectclass", "sn", "cn", "entryuuid"};
+ HashSet<AttributeType> includeAttributes;
+ includeAttributes = new HashSet<AttributeType>();
+ for (String attrName : includeAttributeStrings)
+ {
+ AttributeType attrType = DirectoryServer.getAttributeType(attrName);
+ if (attrType == null)
+ {
+ attrType = DirectoryServer.getDefaultAttributeType(attrName);
+ }
+ includeAttributes.add(attrType);
+ }
+ exportConfig.setIncludeAttributes(includeAttributes);
+ }
+
// Launch the export.
try
{
@@ -2374,8 +2696,19 @@
}
finally
{
- // Clean up after the export by closing the export config.
- exportConfig.close();
+
+ if ((ieContext != null) && (ieContext.checksumOutput))
+ {
+ ieContext.checksumOutputValue =
+ ((CheckedOutputStream)os).getChecksum().getValue();
+ }
+ else
+ {
+ // Clean up after the export by closing the export config.
+ // Will also flush the export and export the remaining entries.
+ // This is a real export where writer has been initialized.
+ exportConfig.close();
+ }
// Release the shared lock on the backend.
try
@@ -2403,54 +2736,26 @@
}
/**
- * Retrieves the backend object related to the domain and the backend's
- * config entry. They will be used for import and export.
- * TODO This should be in a shared package rather than here.
+ * Retrieves the backend related to the domain.
*
+ * @return The backend of that domain.
* @param baseDN The baseDN to retrieve the backend
- * @throws DirectoryException when an error occired
*/
- protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
+ protected Backend retrievesBackend(DN baseDN)
{
// Retrieves the backend related to this domain
- Backend domainBackend = DirectoryServer.getBackend(baseDN);
- if (domainBackend == null)
- {
- Message message = ERR_CANNOT_DECODE_BASE_DN.get(DN_BACKEND_BASE, "");
- throw new DirectoryException(
- ResultCode.OTHER, message, null);
- }
-
- // Retrieves its configuration
- BackendCfg backendCfg = TaskUtils.getConfigEntry(domainBackend);
- if (backendCfg == null)
- {
- Message message =
- ERR_LDIFIMPORT_NO_BACKENDS_FOR_ID.get();
- logError(message);
- throw new DirectoryException(
- ResultCode.OTHER, message, null);
- }
-
- this.backend = domainBackend;
- if (! domainBackend.supportsLDIFImport())
- {
- Message message = ERR_LDIFIMPORT_CANNOT_IMPORT.get(
- String.valueOf(baseDN));
- logError(message);
- throw new DirectoryException(
- ResultCode.OTHER, message, null);
- }
+ return DirectoryServer.getBackend(baseDN);
}
/**
- * Sends lDIFEntry entry lines to the export target currently set.
+ * Exports an entry in LDIF format.
*
- * @param lDIFEntry The lines for the LDIF entry.
+ * @param lDIFEntry The entry to be exported..
+ *
* @throws IOException when an error occurred.
*/
- public void sendEntryLines(String lDIFEntry) throws IOException
+ public void exportLDIFEntry(String lDIFEntry) throws IOException
{
// If an error was raised - like receiving an ErrorMessage
// we just let down the export.
@@ -2461,12 +2766,14 @@
throw ioe;
}
- // new entry then send the current one
- EntryMessage entryMessage = new EntryMessage(
+ if (ieContext.checksumOutput == false)
+ {
+ // Actually send the entry
+ EntryMessage entryMessage = new EntryMessage(
serverId, ieContext.exportTarget, lDIFEntry.getBytes());
- broker.publish(entryMessage);
-
- ieContext.updateTaskCounters();
+ broker.publish(entryMessage);
+ }
+ ieContext.updateCounters();
}
/**
@@ -2477,9 +2784,11 @@
* and should be updated of its progress.
* @throws DirectoryException when an error occurs
*/
- public void initialize(short source, Task initTask)
+ public void initializeFromRemote(short source, Task initTask)
throws DirectoryException
{
+ // TRACER.debugInfo("Entering initializeFromRemote");
+
acquireIEContext();
ieContext.initializeTask = initTask;
@@ -2495,13 +2804,14 @@
/**
* Verifies that the given string represents a valid source
* from which this server can be initialized.
- * @param sourceString The string representaing the source
+ * @param sourceString The string representing the source
* @return The source as a short value
* @throws DirectoryException if the string is not valid
*/
public short decodeSource(String sourceString)
throws DirectoryException
{
+ TRACER.debugInfo("Entering decodeSource");
short source = 0;
Throwable cause = null;
try
@@ -2512,8 +2822,6 @@
// TODO Verifies serverID is in the domain
// We shold check here that this is a server implied
// in the current domain.
-
- log(Message.raw("Source decoded for import:" + source));
return source;
}
}
@@ -2525,11 +2833,15 @@
ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_INVALID_IMPORT_SOURCE.get();
if (cause != null)
+ {
throw new DirectoryException(
resultCode, message, cause);
+ }
else
+ {
throw new DirectoryException(
resultCode, message);
+ }
}
/**
@@ -2600,57 +2912,65 @@
* @param target The target that should be initialized
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
+ *
* @exception DirectoryException When an error occurs.
*/
- public void initializeTarget(short target, Task initTask)
+ public void initializeRemote(short target, Task initTask)
throws DirectoryException
{
- initializeTarget(target, serverId, initTask);
+ initializeRemote(target, serverId, initTask);
}
/**
* Process the initialization of some other server or servers in the topology
- * specified by the target argument when this initialization has been
- * initiated by another server than this one.
+ * specified by the target argument when this initialization specifying the
+ * server that requests the initialization.
+ *
* @param target The target that should be initialized.
* @param requestorID The server that initiated the export.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
+ *
* @exception DirectoryException When an error occurs.
*/
- public void initializeTarget(short target, short requestorID, Task initTask)
+ public void initializeRemote(short target, short requestorID, Task initTask)
throws DirectoryException
{
- // FIXME Temporary workaround - will probably be fixed when implementing
- // dynamic config
- retrievesBackendInfos(this.baseDN);
-
- acquireIEContext();
-
- ieContext.exportTarget = target;
- if (initTask != null)
- {
- ieContext.initializeTask = initTask;
- ieContext.initTaskCounters(backend.getEntryCount());
- }
-
- // Send start message to the peer
- InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
- baseDN, serverId, ieContext.exportTarget, requestorID,
- backend.getEntryCount());
-
- log(Message.raw("SD : publishes " + initializeMessage +
- " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount));
-
- broker.publish(initializeMessage);
-
try
{
+ // FIXME Temporary workaround - will probably be fixed when implementing
+ // dynamic config
+ backend = retrievesBackend(this.baseDN);
+
+ if (!backend.supportsLDIFExport())
+ {
+ Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
+ backend.getBackendID().toString());
+ logError(message);
+ throw new DirectoryException(ResultCode.OTHER, message);
+ }
+
+ acquireIEContext();
+
+ ieContext.exportTarget = target;
+ if (initTask != null)
+ {
+ ieContext.initializeTask = initTask;
+ ieContext.initImportExportCounters(backend.getEntryCount());
+ }
+
+ // Send start message to the peer
+ InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
+ baseDN, serverId, ieContext.exportTarget, requestorID,
+ backend.getEntryCount());
+
+ broker.publish(initializeMessage);
+
exportBackend();
// Notify the peer of the success
DoneMessage doneMsg = new DoneMessage(serverId,
- initializeMessage.getDestination());
+ initializeMessage.getDestination());
broker.publish(doneMsg);
releaseIEContext();
@@ -2658,7 +2978,9 @@
catch(DirectoryException de)
{
// Notify the peer of the failure
- ErrorMessage errorMsg = new ErrorMessage(target, de.getMessageObject());
+ ErrorMessage errorMsg =
+ new ErrorMessage(target,
+ de.getMessageObject());
broker.publish(errorMsg);
releaseIEContext();
@@ -2686,8 +3008,9 @@
StringBuilder failureReason = new StringBuilder();
if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
{
- Message message = ERR_LDIFIMPORT_CANNOT_LOCK_BACKEND.get(
- backend.getBackendID(), String.valueOf(failureReason));
+ Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get(
+ backend.getBackendID(),
+ String.valueOf(failureReason));
logError(message);
throw new DirectoryException(ResultCode.OTHER, message);
}
@@ -2698,14 +3021,22 @@
* @param initializeMessage The message that initiated the import.
* @exception DirectoryException Thrown when an error occurs.
*/
- protected void importBackend(InitializeTargetMessage initializeMessage)
+ protected void initialize(InitializeTargetMessage initializeMessage)
throws DirectoryException
{
LDIFImportConfig importConfig = null;
+ DirectoryException de = null;
+
+ if (!backend.supportsLDIFImport())
+ {
+ Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
+ backend.getBackendID().toString());
+ logError(message);
+ throw new DirectoryException(ResultCode.OTHER, message);
+ }
+
try
{
- log(Message.raw("startImport"));
-
if (initializeMessage.getRequestorID() == serverId)
{
// The import responds to a request we did so the IEContext
@@ -2718,7 +3049,7 @@
ieContext.importSource = initializeMessage.getsenderID();
ieContext.entryLeftCount = initializeMessage.getEntryCount();
- ieContext.initTaskCounters(initializeMessage.getEntryCount());
+ ieContext.initImportExportCounters(initializeMessage.getEntryCount());
preBackendImport(this.backend);
@@ -2737,20 +3068,14 @@
// Process import
this.backend.importLDIF(importConfig);
+ TRACER.debugInfo("The import has ended successfully.");
stateSavingDisabled = false;
- // Re-exchange state with SS
- broker.stop();
- broker.start(replicationServers);
-
}
catch(Exception e)
{
- DirectoryException de =
- new DirectoryException(
- ResultCode.OTHER, Message.raw(e.getLocalizedMessage()));
- ieContext.exception = de;
- throw (de);
+ de = new DirectoryException(ResultCode.OTHER,
+ Message.raw(e.getLocalizedMessage()));
}
finally
{
@@ -2766,12 +3091,33 @@
((InitializeTask)ieContext.initializeTask).
setState(ieContext.updateTaskCompletionState(),ieContext.exception);
}
-
releaseIEContext();
- log(Message.raw("End importBackend"));
+ // Retrieves the generation ID associated with the data imported
+ try
+ {
+ generationId = loadGenerationId();
+ }
+ catch (DirectoryException e)
+ {
+ logError(ERR_LOADING_GENERATION_ID.get(
+ baseDN.toNormalizedString(),
+ e.getLocalizedMessage()));
+ }
+ rejectedGenerationId = -1;
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "After import, the replication plugin restarts connections" +
+ " to all RSs to provide new generation ID=" + generationId);
+ broker.setGenerationId(generationId);
+
+ // Re-exchange generationID and state with RS
+ broker.reStart();
}
- // Success
+ // Sends up the root error.
+ if (de != null)
+ throw de;
}
/**
@@ -2794,7 +3140,6 @@
throw new DirectoryException(ResultCode.OTHER, message);
}
- // FIXME setBackendEnabled should be part taskUtils ?
TaskUtils.enableBackend(backend.getBackendID());
}
@@ -2988,6 +3333,16 @@
}
/**
+ * Check if the domain is connected to a ReplicationServer.
+ *
+ * @return true if the server is connected, false if not.
+ */
+ public boolean isConnected()
+ {
+ return broker.isConnected();
+ }
+
+ /**
* Determine whether the connection to the replication server is encrypted.
* @return true if the connection is encrypted, false otherwise.
*/
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
index f9e1fa1..2993c1e 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -151,7 +151,10 @@
attributes.add(attr);
attributes.add(new Attribute("ssl-encryption",
- String.valueOf(domain.isSessionEncrypted())));
+ String.valueOf(domain.isSessionEncrypted())));
+
+ attributes.add(new Attribute("generation-id",
+ String.valueOf(domain.getGenerationId())));
return attributes;
diff --git a/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java b/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
index 5c3d41d..3b7d2f0 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
@@ -27,10 +27,15 @@
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
+import org.opends.server.loggers.debug.DebugTracer;
+
/**
* This message is part of the replication protocol.
* This message is sent by a server or a replication server when an error
@@ -41,6 +46,9 @@
{
private static final long serialVersionUID = 2726389860247088266L;
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
+
// Specifies the messageID built form the error that was detected
private int msgID;
@@ -48,10 +56,11 @@
private Message details = null;
/**
- * Create a InitializeMessage.
+ * Creates an ErrorMessage providing the destination server.
+ *
* @param sender The server ID of the server that send this message.
* @param destination The destination server or servers of this message.
- * @param details The details of the error.
+ * @param details The message containing the details of the error.
*/
public ErrorMessage(short sender, short destination,
Message details)
@@ -59,10 +68,13 @@
super(sender, destination);
this.msgID = details.getDescriptor().getId();
this.details = details;
+
+ if (debugEnabled())
+ TRACER.debugInfo(" Creating error message" + this.toString());
}
/**
- * Create a InitializeMessage.
+ * Creates an ErrorMessage.
*
* @param destination replication server id
* @param details details of the error
@@ -72,13 +84,17 @@
super((short)-2, destination);
this.msgID = details.getDescriptor().getId();
this.details = details;
+
+ if (debugEnabled())
+ TRACER.debugInfo(this.toString());
}
/**
- * Creates a new InitializeMessage by decoding the provided byte array.
- * @param in A byte array containing the encoded information for the Message
+ * Creates a new ErrorMessage by decoding the provided byte array.
+ *
+ * @param in A byte array containing the encoded information for the Message
* @throws DataFormatException If the in does not contain a properly
- * encoded InitializeMessage.
+ * encoded message.
*/
public ErrorMessage(byte[] in) throws DataFormatException
{
@@ -185,4 +201,18 @@
return null;
}
}
+
+ /**
+ * Returns a string representation of the message.
+ *
+ * @return the string representation of this message.
+ */
+ public String toString()
+ {
+ return "ErrorMessage=["+
+ " sender=" + this.senderID +
+ " destination=" + this.destination +
+ " msgID=" + this.msgID +
+ " details=" + this.details + "]";
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMessage.java b/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMessage.java
index ce04bea..da3089a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMessage.java
@@ -155,4 +155,14 @@
return null;
}
}
+
+ /**
+ * Get a string representation of this object.
+ * @return A string representation of this object.
+ */
+ public String toString()
+ {
+ return "InitializeRequestMessage: baseDn="+baseDn+" senderId="+senderID +
+ " destination=" + destination;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
index b048b8a..22215c6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -51,6 +51,7 @@
public class ReplServerInfoMessage extends ReplicationMessage
{
private List<String> connectedServers = null;
+ private long generationId;
/**
* Creates a new changelogInfo message from its encoded form.
@@ -68,15 +69,23 @@
throw new DataFormatException(
"Input is not a valid changelogInfo Message.");
- connectedServers = new ArrayList<String>();
int pos = 1;
+
+ /* read the generationId */
+ int length = getNextLength(in, pos);
+ generationId = Long.valueOf(new String(in, pos, length,
+ "UTF-8"));
+ pos += length +1;
+
+ /* read the connected servers */
+ connectedServers = new ArrayList<String>();
while (pos < in.length)
{
/*
* Read the next server ID
* first calculate the length then construct the string
*/
- int length = getNextLength(in, pos);
+ length = getNextLength(in, pos);
connectedServers.add(new String(in, pos, length, "UTF-8"));
pos += length +1;
}
@@ -92,10 +101,14 @@
* connected servers.
*
* @param connectedServers The list of currently connected servers ID.
+ * @param generationId The generationId currently associated with this
+ * domain.
*/
- public ReplServerInfoMessage(List<String> connectedServers)
+ public ReplServerInfoMessage(List<String> connectedServers,
+ long generationId)
{
this.connectedServers = connectedServers;
+ this.generationId = generationId;
}
/**
@@ -110,6 +123,12 @@
/* Put the message type */
oStream.write(MSG_TYPE_REPL_SERVER_INFO);
+
+ // Put the generationId
+ oStream.write(String.valueOf(generationId).getBytes("UTF-8"));
+ oStream.write(0);
+
+ // Put the servers
if (connectedServers.size() >= 1)
{
for (String server : connectedServers)
@@ -119,6 +138,7 @@
oStream.write(0);
}
}
+
return oStream.toByteArray();
}
catch (IOException e)
@@ -139,4 +159,29 @@
{
return connectedServers;
}
+
+ /**
+ * Get the generationId from this message.
+ * @return The generationId.
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ String csrvs = "";
+ for (String s : connectedServers)
+ {
+ csrvs += s + "/";
+ }
+ return ("ReplServerInfoMessage: genId=" + getGenerationId() +
+ " Connected peers:" + csrvs);
+ }
+
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
index 96140c3..29cb040 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
@@ -65,6 +65,7 @@
* @param windowSize The window size.
* @param serverState our ServerState for this baseDn.
* @param protocolVersion The replication protocol version of the creator.
+ * @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
* after the start messages have been exchanged.
*/
@@ -72,9 +73,10 @@
int windowSize,
ServerState serverState,
short protocolVersion,
+ long generationId,
boolean sslEncryption)
{
- super(protocolVersion);
+ super(protocolVersion, generationId);
this.serverId = serverId;
this.serverURL = serverURL;
if (baseDn != null)
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
index 4bed982..ba72bf7 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -54,6 +54,8 @@
static final byte MSG_TYPE_ERROR = 14;
static final byte MSG_TYPE_WINDOW_PROBE = 15;
static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
+ static final byte MSG_TYPE_RESET_GENERATION_ID = 17;
+
// Adding a new type of message here probably requires to
// change accordingly generateMsg method below
@@ -76,6 +78,7 @@
* MSG_TYPE_ERROR
* MSG_TYPE_WINDOW_PROBE
* MSG_TYPE_REPL_SERVER_INFO
+ * MSG_TYPE_RESET_GENERATION_ID
*
* @return the byte[] representation of this message.
* @throws UnsupportedEncodingException When the encoding of the message
@@ -140,6 +143,9 @@
case MSG_TYPE_ERROR:
msg = new ErrorMessage(buffer);
break;
+ case MSG_TYPE_RESET_GENERATION_ID:
+ msg = new ResetGenerationId(buffer);
+ break;
case MSG_TYPE_WINDOW_PROBE:
msg = new WindowProbe(buffer);
break;
diff --git a/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java b/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
new file mode 100644
index 0000000..0945377
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
@@ -0,0 +1,78 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.Serializable;
+import java.util.zip.DataFormatException;
+
+
+/**
+ * This message is used by an LDAP server to communicate to the topology
+ * that the generation must be reset for the domain.
+ */
+public class ResetGenerationId extends ReplicationMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = 7657049716115572226L;
+
+
+ /**
+ * Creates a new message.
+ */
+ public ResetGenerationId()
+ {
+ }
+
+ /**
+ * Creates a new GenerationIdMessage from its encoded form.
+ *
+ * @param in The byte array containing the encoded form of the
+ * WindowMessage.
+ * @throws DataFormatException If the byte array does not contain a valid
+ * encoded form of the WindowMessage.
+ */
+ public ResetGenerationId(byte[] in) throws DataFormatException
+ {
+ if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
+ throw new
+ DataFormatException("input is not a valid GenerationId Message");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ int length = 1;
+ byte[] resultByteArray = new byte[length];
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_RESET_GENERATION_ID;
+ return resultByteArray;
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
index 693223f..878973e 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -69,7 +69,9 @@
private boolean sslEncryption;
/**
- * Create a new ServerStartMessage.
+ * Creates a new ServerStartMessage. This message is to be sent by an LDAP
+ * Server after being connected to a replication server for a given
+ * replication domain.
*
* @param serverId The serverId of the server for which the ServerStartMessage
* is created.
@@ -82,6 +84,7 @@
* @param heartbeatInterval The requested heartbeat interval.
* @param serverState The state of this server.
* @param protocolVersion The replication protocol version of the creator.
+ * @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
* after the start messages have been exchanged.
*/
@@ -91,9 +94,10 @@
long heartbeatInterval,
ServerState serverState,
short protocolVersion,
+ long generationId,
boolean sslEncryption)
{
- super(protocolVersion);
+ super(protocolVersion, generationId);
this.serverId = serverId;
this.baseDn = baseDn.toString();
@@ -128,10 +132,6 @@
{
super(MSG_TYPE_SERVER_START, in);
- /* The ServerStartMessage is encoded in the form :
- * <header><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
- * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
- */
try
{
/* first bytes are the header */
@@ -303,11 +303,6 @@
@Override
public byte[] getBytes()
{
- /*
- * ServerStartMessage contains.
- * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
- * <maxSendDelay><maxSendQueue><windowsize><heartbeatInterval><ServerState>
- */
try {
byte[] byteDn = baseDn.getBytes("UTF-8");
byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 91b8130..03fe632 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -37,6 +37,7 @@
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
/**
* This class Implement a protocol session using a basic socket and relying on
@@ -91,7 +92,8 @@
{
if (debugEnabled())
{
- TRACER.debugVerbose("Closing SocketSession.");
+ TRACER.debugInfo("Closing SocketSession."
+ + stackTraceToSingleLineString(new Exception()));
}
socket.close();
}
@@ -104,6 +106,12 @@
{
byte[] buffer = msg.getBytes();
String str = String.format("%08x", buffer.length);
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("SocketSession publish <" + str + ">");
+ }
+
byte[] sendLengthBuf = str.getBytes();
output.write(sendLengthBuf);
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartMessage.java b/opends/src/server/org/opends/server/replication/protocol/StartMessage.java
index 150d3d9..48c0e39 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StartMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StartMessage.java
@@ -39,6 +39,7 @@
public abstract class StartMessage extends ReplicationMessage
{
private short protocolVersion;
+ private long generationId;
/**
* The length of the header of this message.
@@ -49,11 +50,14 @@
* Create a new StartMessage.
*
* @param protocolVersion The Replication Protocol version of the server
- * for which the StartMessage is created.
+ * for which the StartMessage is created.
+ * @param generationId The generationId for this server.
+ *
*/
- public StartMessage(short protocolVersion)
+ public StartMessage(short protocolVersion, long generationId)
{
this.protocolVersion = protocolVersion;
+ this.generationId = generationId;
}
/**
@@ -86,11 +90,14 @@
throws UnsupportedEncodingException
{
byte[] versionByte = Short.toString(protocolVersion).getBytes("UTF-8");
+ byte[] byteGenerationID =
+ String.valueOf(generationId).getBytes("UTF-8");
/* The message header is stored in the form :
* <message type><protocol version>
*/
int length = 1 + versionByte.length + 1 +
+ byteGenerationID.length + 1 +
additionalLength;
byte[] encodedMsg = new byte[length];
@@ -100,7 +107,10 @@
int pos = 1;
/* put the protocol version */
- headerLength = addByteArray(versionByte, encodedMsg, pos);
+ pos = addByteArray(versionByte, encodedMsg, pos);
+
+ /* put the generationId */
+ headerLength = addByteArray(byteGenerationID, encodedMsg, pos);
return encodedMsg;
}
@@ -129,6 +139,14 @@
protocolVersion = Short.valueOf(
new String(encodedMsg, pos, length, "UTF-8"));
pos += length + 1;
+
+ /* read the generationId */
+ length = getNextLength(encodedMsg, pos);
+ generationId = Long.valueOf(new String(encodedMsg, pos, length,
+ "UTF-8"));
+ pos += length +1;
+
+
return pos;
} catch (UnsupportedEncodingException e)
{
@@ -147,4 +165,14 @@
{
return protocolVersion;
}
+
+ /**
+ * Get the generationId from this message.
+ * @return The generationId.
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
}
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 8807098..e0ad4bd 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -65,11 +65,19 @@
*/
public class DbHandler implements Runnable
{
- // This queue hold all the updates not yet saved to stable storage
- // it is only used as a temporary placeholder so that the write
+ // The msgQueue holds all the updates not yet saved to stable storage.
+ // This list is only used as a temporary placeholder so that the write
// in the stable storage can be grouped for efficiency reason.
- // it is never read back by replicationServer threads that are responsible
+ // Adding an update synchronously add the update to this list.
+ // A dedicated thread loops on flush() and trim().
+ // flush() : get a number of changes from the in memory list by block
+ // and write them to the db.
+ // trim() : deletes from the DB a number of changes that are older than a
+ // certain date.
+ //
+ // Changes are not read back by replicationServer threads that are responsible
// for pushing the changes to other replication server or to LDAP server
+ //
private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>();
private ReplicationDB db;
private ChangeNumber firstChange = null;
@@ -81,6 +89,8 @@
private boolean done = false;
private DirectoryThread thread = null;
private Object flushLock = new Object();
+ private ReplicationDbEnv dbenv;
+
// The High and low water mark for the max size of the msgQueue.
// the threads calling add() method will be blocked if the size of
@@ -90,23 +100,29 @@
final static int MSG_QUEUE_LOWMARK = 4000;
/**
- * the trim age in milliseconds.
+ *
+ * The trim age in milliseconds. Changes record in the change DB that
+ * are older than this age are removed.
+ *
*/
private long trimage;
/**
- * Creates a New dbHandler associated to a given LDAP server.
+ * Creates a new dbHandler associated to a given LDAP server.
*
* @param id Identifier of the DB.
* @param baseDn the baseDn for which this DB was created.
* @param replicationServer The ReplicationServer that creates this dbHandler.
* @param dbenv the Database Env to use to create the ReplicationServer DB.
+ * @param generationId The generationId of the data contained in the LDAP
+ * server for this domain.
* @throws DatabaseException If a database problem happened
*/
public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
- ReplicationDbEnv dbenv)
+ ReplicationDbEnv dbenv, long generationId)
throws DatabaseException
{
+ this.dbenv = dbenv;
this.serverId = id;
this.baseDn = baseDn;
this.trimage = replicationServer.getTrimage();
@@ -261,7 +277,7 @@
*
* @param number the number of changes to be removed.
*/
- private void clear(int number)
+ private void clearQueue(int number)
{
synchronized (msgQueue)
{
@@ -346,7 +362,7 @@
}
/**
- * Flush old change information from this replicationServer database.
+ * Trim old changes from this replicationServer database.
* @throws DatabaseException In case of database problem.
*/
private void trim() throws DatabaseException, Exception
@@ -417,7 +433,7 @@
db.addEntries(changes);
// remove the changes from the list of changes to be saved.
- clear(changes.size());
+ clearQueue(changes.size());
}
} while (size >=500);
}
@@ -517,4 +533,22 @@
trimage = delay;
}
+ /**
+ * Clear the changes from this DB (from both memory cache and DB storage).
+ * @throws DatabaseException When an exception occurs while removing the
+ * changes from the DB.
+ * @throws Exception When an exception occurs while accessing a resource
+ * from the DB.
+ *
+ */
+ public void clear() throws DatabaseException, Exception
+ {
+ synchronized(flushLock)
+ {
+ msgQueue.clear();
+ }
+ db.clear();
+ firstChange = db.readFirstChange();
+ lastChange = db.readLastChange();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index d618cf8..990eb1f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -28,6 +28,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -47,6 +49,7 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
import com.sleepycat.je.DatabaseException;
@@ -106,6 +109,15 @@
new ConcurrentHashMap<Short, DbHandler>();
private ReplicationServer replicationServer;
+ /* GenerationId management */
+ private long generationId = -1;
+ private boolean generationIdSavedStatus = false;
+
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
/**
* Creates a new ReplicationCache associated to the DN baseDn.
*
@@ -117,7 +129,13 @@
{
this.baseDn = baseDn;
this.replicationServer = replicationServer;
- }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Created Cache for " + baseDn + " " +
+ stackTraceToSingleLineString(new Exception()));
+}
/**
* Add an update that has been received to the list of
@@ -138,6 +156,7 @@
* other replication server before pushing it to the LDAP servers
*/
+ short id = update.getChangeNumber().getServerId();
sourceHandler.updateServerState(update);
sourceHandler.incrementInCount();
@@ -158,19 +177,21 @@
}
}
- // look for the dbHandler that is responsible for the master server which
+ // look for the dbHandler that is responsible for the LDAP server which
// generated the change.
DbHandler dbHandler = null;
synchronized (sourceDbHandlers)
{
- short id = update.getChangeNumber().getServerId();
dbHandler = sourceDbHandlers.get(id);
if (dbHandler == null)
{
try
{
- dbHandler = replicationServer.newDbHandler(id, baseDn);
- } catch (DatabaseException e)
+ dbHandler = replicationServer.newDbHandler(id,
+ baseDn, generationId);
+ generationIdSavedStatus = true;
+ }
+ catch (DatabaseException e)
{
/*
* Because of database problem we can't save any more changes
@@ -250,6 +271,15 @@
}
connectedServers.put(handler.getServerId(), handler);
+ // It can be that the server that connects here is the
+ // first server connected for a domain.
+ // In that case, we will establish the appriopriate connections
+ // to the other repl servers for this domain and receive
+ // their ReplServerInfo messages.
+ // FIXME: Is it necessary to end this above processing BEFORE listening
+ // to incoming messages for that domain ? But the replica
+ // would raise Read Timeout for replica that connects.
+
// Update the remote replication servers with our list
// of connected LDAP servers
sendReplServerInfo();
@@ -265,17 +295,90 @@
*/
public void stopServer(ServerHandler handler)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " stopServer " + handler.getMonitorInstanceName());
+
handler.stopHandler();
if (handler.isReplicationServer())
+ {
replicationServers.remove(handler.getServerId());
+ }
else
{
connectedServers.remove(handler.getServerId());
+ }
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendReplServerInfo();
+ mayResetGenerationId();
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
+ }
+
+ /**
+ * Resets the generationId for this domain if there is no LDAP
+ * server currently connected and if the generationId has never
+ * been saved.
+ */
+ protected void mayResetGenerationId()
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " mayResetGenerationId generationIdSavedStatus=" +
+ generationIdSavedStatus);
+
+ // If there is no more any LDAP server connected to this domain in the
+ // topology and the generationId has never been saved, then we can reset
+ // it and the next LDAP server to connect will become the new reference.
+ boolean lDAPServersConnectedInTheTopology = false;
+ if (connectedServers.isEmpty())
+ {
+ for (ServerHandler rsh : replicationServers.values())
+ {
+ if (generationId != rsh.getGenerationId())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
+ " thas different genId");
+ }
+ else
+ {
+ if (!rsh.getRemoteLDAPServers().isEmpty())
+ {
+ lDAPServersConnectedInTheTopology = true;
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
+ " has servers connected to it - will not reset generationId");
+ }
+ }
+ }
+ }
+ else
+ {
+ lDAPServersConnectedInTheTopology = true;
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " has servers connected to it - will not reset generationId");
+ }
+
+ if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus))
+ {
+ setGenerationId(-1, false);
}
}
@@ -321,7 +424,7 @@
// Update this server with the list of LDAP servers
// already connected
handler.sendInfo(
- new ReplServerInfoMessage(getConnectedLDAPservers()));
+ new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
return true;
}
@@ -437,17 +540,20 @@
}
/**
- * creates a new ReplicationDB with specified identifier.
- * @param id the identifier of the new ReplicationDB.
- * @param db the new db.
+ * Sets the provided DbHandler associated to the provided serverId.
+ *
+ * @param serverId the serverId for the server to which is
+ * associated the Dbhandler.
+ * @param dbHandler the dbHandler associated to the serverId.
*
* @throws DatabaseException If a database error happened.
*/
- public void newDb(short id, DbHandler db) throws DatabaseException
+ public void setDbHandler(short serverId, DbHandler dbHandler)
+ throws DatabaseException
{
synchronized (sourceDbHandlers)
{
- sourceDbHandlers.put(id , db);
+ sourceDbHandlers.put(serverId , dbHandler);
}
}
@@ -557,7 +663,8 @@
}
/**
- * Process an InitializeRequestMessage.
+ * Processes a message coming from one server in the topology
+ * and potentially forwards it to one or all other servers.
*
* @param msg The message received and to be processed.
* @param senderHandler The server handler of the server that emitted
@@ -565,6 +672,23 @@
*/
public void process(RoutableMessage msg, ServerHandler senderHandler)
{
+ // A replication server is not expected to be the destination
+ // of a routable message except for an error message.
+ if (msg.getDestination() == this.replicationServer.getServerId())
+ {
+ if (msg instanceof ErrorMessage)
+ {
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+ }
+ else
+ {
+ logError(NOTE_ERR_ROUTING_TO_SERVER.get(
+ msg.getClass().getCanonicalName()));
+ }
+ return;
+ }
List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
@@ -572,9 +696,13 @@
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
- mb.append("serverID:" + msg.getDestination());
+ mb.append(" unreachable server ID=" + msg.getDestination());
+ mb.append(" unroutable message =" + msg);
ErrorMessage errMsg = new ErrorMessage(
- msg.getsenderID(), mb.toMessage());
+ this.replicationServer.getServerId(),
+ msg.getsenderID(),
+ mb.toMessage());
+
try
{
senderHandler.send(errMsg);
@@ -583,8 +711,8 @@
{
// TODO Handle error properly (sender timeout in addition)
/*
- * An error happened trying the send back an error to this server.
- * Log an error and close the connection to the sender server.
+ * An error happened trying to send an error msg to this server.
+ * Log an error and close the connection to this server.
*/
MessageBuilder mb2 = new MessageBuilder();
mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -788,7 +916,7 @@
private void sendReplServerInfo()
{
ReplServerInfoMessage info =
- new ReplServerInfoMessage(getConnectedLDAPservers());
+ new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
for (ServerHandler handler : replicationServers.values())
{
try
@@ -811,6 +939,26 @@
}
/**
+ * Get the generationId associated to this domain.
+ *
+ * @return The generationId
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * Get the generationId saved status.
+ *
+ * @return The generationId saved status.
+ */
+ public boolean getGenerationIdSavedStatus()
+ {
+ return generationIdSavedStatus;
+ }
+
+ /**
* Sets the replication server informations for the provided
* handler from the provided ReplServerInfoMessage.
*
@@ -822,4 +970,162 @@
{
handler.setReplServerInfo(infoMsg);
}
+
+ /**
+ * Sets the provided value as the new in memory generationId.
+ *
+ * @param generationId The new value of generationId.
+ * @param savedStatus The saved status of the generationId.
+ */
+ synchronized public void setGenerationId(long generationId,
+ boolean savedStatus)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " RCache.set GenerationId=" + generationId);
+
+ if (generationId == this.generationId)
+ return;
+
+ if (this.generationId>0)
+ {
+ for (ServerHandler handler : connectedServers.values())
+ {
+ handler.resetGenerationId();
+ }
+ }
+
+ this.generationId = generationId;
+ this.generationIdSavedStatus = savedStatus;
+
+ }
+
+ /**
+ * Resets the generationID.
+ *
+ * @param senderHandler The handler associated to the server
+ * that requested to reset the generationId.
+ */
+ public void resetGenerationId(ServerHandler senderHandler)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " RCache.resetGenerationId");
+
+ // Notifies the others LDAP servers that from now on
+ // they have the bad generationId
+ for (ServerHandler handler : connectedServers.values())
+ {
+ handler.resetGenerationId();
+ }
+
+ // Propagates the reset message to the others replication servers
+ // dealing with the same domain.
+ if (senderHandler.isLDAPserver())
+ {
+ for (ServerHandler handler : replicationServers.values())
+ {
+ try
+ {
+ handler.sendGenerationId(new ResetGenerationId());
+ }
+ catch (IOException e)
+ {
+ logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
+ get(handler.getMonitorInstanceName()));
+ }
+ }
+ }
+
+ // Reset the localchange and state db for the current domain
+ synchronized (sourceDbHandlers)
+ {
+ for (DbHandler dbHandler : sourceDbHandlers.values())
+ {
+ try
+ {
+ dbHandler.clear();
+ }
+ catch (Exception e)
+ {
+ // TODO: i18n
+ logError(Message.raw(
+ "Exception caught while clearing dbHandler:" +
+ e.getLocalizedMessage()));
+ }
+ }
+ sourceDbHandlers.clear();
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " The source db handler has been cleared");
+ }
+ try
+ {
+ replicationServer.clearGenerationId(baseDn);
+ }
+ catch (Exception e)
+ {
+ // TODO: i18n
+ logError(Message.raw(
+ "Exception caught while clearing generationId:" +
+ e.getLocalizedMessage()));
+ }
+
+ // Reset the in memory domain generationId
+ generationId = -1;
+ }
+
+ /**
+ * Returns whether the provided server is in degraded
+ * state due to the fact that the peer server has an invalid
+ * generationId for this domain.
+ *
+ * @param serverId The serverId for which we want to know the
+ * the state.
+ * @return Whether it is degraded or not.
+ */
+
+ public boolean isDegradedDueToGenerationId(short serverId)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " isDegraded serverId=" + serverId +
+ " given local generation Id=" + this.generationId);
+
+ ServerHandler handler = replicationServers.get(serverId);
+ if (handler == null)
+ {
+ handler = connectedServers.get(serverId);
+ if (handler == null)
+ {
+ return false;
+ }
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " Compute degradation of serverId=" + serverId +
+ " LS server generation Id=" + handler.getGenerationId());
+ return (handler.getGenerationId() != this.generationId);
+ }
+
+ /**
+ * Return the associated replication server.
+ * @return The replication server.
+ */
+ public ReplicationServer getReplicationServer()
+ {
+ return replicationServer;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index f1b67af..4f4ea3c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -62,11 +62,11 @@
/**
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
- * @param serverId Identifier of the LDAP server.
- * @param baseDn baseDn of the LDAP server.
- * @param replicationServer the ReplicationServer that needs to be shutdown
- * @param dbenv the Db encironemnet to use to create the db
- * @throws DatabaseException if a database problem happened
+ * @param serverId The identifier of the LDAP server.
+ * @param baseDn The baseDn of the replication domain.
+ * @param replicationServer The ReplicationServer that needs to be shutdown.
+ * @param dbenv The Db environment to use to create the db.
+ * @throws DatabaseException If a database problem happened.
*/
public ReplicationDB(Short serverId, DN baseDn,
ReplicationServer replicationServer,
@@ -77,8 +77,10 @@
this.baseDn = baseDn;
this.dbenv = dbenv;
this.replicationServer = replicationServer;
- db = dbenv.getOrAddDb(serverId, baseDn);
+ // Get or create the associated Replicationcache and Db.
+ db = dbenv.getOrAddDb(serverId, baseDn,
+ replicationServer.getReplicationCache(baseDn, true).getGenerationId());
}
/**
@@ -472,4 +474,19 @@
cursor.delete();
}
}
+
+ /**
+ * Clears this change DB from the changes it contains.
+ *
+ * @throws Exception Throws an exception it occurs.
+ * @throws DatabaseException Throws a DatabaseException when it occurs.
+ */
+ public void clear() throws Exception, DatabaseException
+ {
+ // Clears the changes
+ dbenv.clearDb(this.toString());
+
+ // Clears the reference to this serverID
+ dbenv.clearServerId(baseDn, serverId);
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index 0a4f630..fc7742c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -25,10 +25,12 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -58,6 +60,12 @@
private Environment dbEnvironment = null;
private Database stateDb = null;
private ReplicationServer replicationServer = null;
+ private static final String GENERATION_ID_TAG = "GENID";
+ private static final String FIELD_SEPARATOR = " ";
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
/**
* Initialize this class.
@@ -117,16 +125,120 @@
Cursor cursor = stateDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
+
try
{
+ /*
+ * Get the domain base DN/ generationIDs records
+ */
OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
while (status == OperationStatus.SUCCESS)
{
try
{
String stringData = new String(data.getData(), "UTF-8");
- String[] str = stringData.split(" ", 2);
- short serverId = new Short(str[0]);
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Read tag baseDn generationId=" + stringData);
+
+ String[] str = stringData.split(FIELD_SEPARATOR, 3);
+ if (str[0].equals(GENERATION_ID_TAG))
+ {
+ long generationId=-1;
+
+ DN baseDn;
+
+ try
+ {
+ // <generationId>
+ generationId = new Long(str[1]);
+ }
+ catch (NumberFormatException e)
+ {
+ // should never happen
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw(
+ "replicationServer state database has a wrong format: " +
+ e.getLocalizedMessage()
+ + "<" + str[1] + ">"));
+ }
+
+ // <baseDn>
+ baseDn = null;
+ try
+ {
+ baseDn = DN.decode(str[2]);
+ } catch (DirectoryException e)
+ {
+ Message message =
+ ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
+ logError(message);
+
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Has read baseDn=" + baseDn
+ + " generationId=" + generationId);
+
+ replicationServer.getReplicationCache(baseDn, true).
+ setGenerationId(generationId, true);
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // should never happens
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw("need UTF-8 support"));
+ }
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+
+ /*
+ * Get the server Id / domain base DN records
+ */
+ status = cursor.getFirst(key, data, LockMode.DEFAULT);
+ while (status == OperationStatus.SUCCESS)
+ {
+ String stringData = null;
+ try
+ {
+ stringData = new String(data.getData(), "UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // should never happens
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw(
+ "need UTF-8 support"));
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Read serverId BaseDN=" + stringData);
+
+ String[] str = stringData.split(FIELD_SEPARATOR, 2);
+ if (!str[0].equals(GENERATION_ID_TAG))
+ {
+ short serverId = -1;
+ try
+ {
+ // <serverId>
+ serverId = new Short(str[0]);
+ } catch (NumberFormatException e)
+ {
+ // should never happen
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw(
+ "replicationServer state database has a wrong format: " +
+ e.getLocalizedMessage()
+ + "<" + str[0] + ">"));
+ }
+ // <baseDn>
DN baseDn = null;
try
{
@@ -134,113 +246,302 @@
} catch (DirectoryException e)
{
Message message =
- ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
+ ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
logError(message);
}
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Has read: baseDn=" + baseDn
+ + " serverId=" + serverId);
+
DbHandler dbHandler =
- new DbHandler(serverId, baseDn, replicationServer, this);
- replicationServer.getReplicationCache(baseDn).newDb(serverId,
- dbHandler);
- } catch (NumberFormatException e)
- {
- // should never happen
- // TODO: i18n
- throw new ReplicationDBException(Message.raw(
- "replicationServer state database has a wrong format"));
- } catch (UnsupportedEncodingException e)
- {
- // should never happens
- // TODO: i18n
- throw new ReplicationDBException(Message.raw(
- "need UTF-8 support"));
+ new DbHandler(serverId, baseDn, replicationServer, this, 1);
+
+ replicationServer.getReplicationCache(baseDn, true).
+ setDbHandler(serverId, dbHandler);
}
+
status = cursor.getNext(key, data, LockMode.DEFAULT);
}
cursor.close();
- } catch (DatabaseException dbe) {
+ }
+ catch (DatabaseException dbe)
+ {
cursor.close();
throw dbe;
}
}
- /**
- * Find or create the database used to store changes from the server
- * with the given serverId and the given baseDn.
- * @param serverId The server id that identifies the server.
- * @param baseDn The baseDn that identifies the server.
- * @return the Database.
- * @throws DatabaseException in case of underlying Exception.
- */
- public Database getOrAddDb(Short serverId, DN baseDn)
- throws DatabaseException
- {
- try
+ /**
+ * Finds or creates the database used to store changes from the server
+ * with the given serverId and the given baseDn.
+ *
+ * @param serverId The server id that identifies the server.
+ * @param baseDn The baseDn that identifies the domain.
+ * @param generationId The generationId associated to this domain.
+ * @return the Database.
+ * @throws DatabaseException in case of underlying Exception.
+ */
+ public Database getOrAddDb(Short serverId, DN baseDn, Long generationId)
+ throws DatabaseException
{
- String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
- byte[] byteId;
-
- byteId = stringId.getBytes("UTF-8");
-
- // Open the database. Create it if it does not already exist.
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(true);
- Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
-
- DatabaseEntry key = new DatabaseEntry();
- key.setData(byteId);
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
- if (status == OperationStatus.NOTFOUND)
+ if (debugEnabled())
+ TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
+ serverId + " " + baseDn + " " + generationId);
+ try
{
- Transaction txn = dbEnvironment.beginTransaction(null, null);
- try {
- data.setData(byteId);
- stateDb.put(txn, key, data);
- txn.commitWriteNoSync();
- } catch (DatabaseException dbe)
+ String stringId = serverId.toString() + FIELD_SEPARATOR
+ + baseDn.toNormalizedString();
+
+ // Opens the database for the changes received from this server
+ // on this domain. Create it if it does not already exist.
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setAllowCreate(true);
+ dbConfig.setTransactional(true);
+ Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
+
+ // Creates the record serverId/domain base Dn in the stateDb
+ // if it does not already exist.
+ byte[] byteId;
+ byteId = stringId.getBytes("UTF-8");
+ DatabaseEntry key = new DatabaseEntry();
+ key.setData(byteId);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
{
- // Abort the txn and propagate the Exception to the caller
- txn.abort();
- throw dbe;
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try {
+ data.setData(byteId);
+ if (debugEnabled())
+ TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
+ " serverId/Domain=<"+stringId+">");
+ stateDb.put(txn, key, data);
+ txn.commitWriteNoSync();
+ } catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+
+ // Creates the record domain base Dn/ generationId in the stateDb
+ // if it does not already exist.
+ stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+ baseDn.toNormalizedString();
+ String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+ generationId.toString() + FIELD_SEPARATOR +
+ baseDn.toNormalizedString();
+ byteId = stringId.getBytes("UTF-8");
+ byte[] dataByteId;
+ dataByteId = dataStringId.getBytes("UTF-8");
+ key = new DatabaseEntry();
+ key.setData(byteId);
+ data = new DatabaseEntry();
+ status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try {
+ data.setData(dataByteId);
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "Created in the state Db record Tag/Domain/GenId key=" +
+ stringId + " value=" + dataStringId);
+ stateDb.put(txn, key, data);
+ txn.commitWriteNoSync();
+ } catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+ return db;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // can't happen
+ return null;
+ }
+ }
+
+ /**
+ * Creates a new transaction.
+ *
+ * @return the transaction.
+ * @throws DatabaseException in case of underlying database Exception.
+ */
+ public Transaction beginTransaction() throws DatabaseException
+ {
+ return dbEnvironment.beginTransaction(null, null);
+ }
+
+ /**
+ * Shutdown the Db environment.
+ */
+ public void shutdown()
+ {
+ try
+ {
+ stateDb.close();
+ dbEnvironment.close();
+ } catch (DatabaseException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ }
+ }
+
+ /**
+ * Clears the provided generationId associated to the provided baseDn
+ * from the state Db.
+ *
+ * @param baseDn The baseDn for which the generationID must be cleared.
+ *
+ */
+ public void clearGenerationId(DN baseDn)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " clearGenerationId " + baseDn);
+ try
+ {
+ // Deletes the record domain base Dn/ generationId in the stateDb
+ String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+ baseDn.toNormalizedString();
+ byte[] byteId = stringId.getBytes("UTF-8");
+ DatabaseEntry key = new DatabaseEntry();
+ key.setData(byteId);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if ((status == OperationStatus.SUCCESS) ||
+ (status == OperationStatus.KEYEXIST))
+ {
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try
+ {
+ stateDb.delete(txn, key);
+ txn.commitWriteNoSync();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " clearGenerationId (" +
+ baseDn +") succeeded.");
+ }
+ catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+ else
+ {
+ // TODO : should have a better error logging
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " clearGenerationId ("+ baseDn + " failed" + status.toString());
}
}
- return db;
- } catch (UnsupportedEncodingException e)
- {
- // can't happen
- return null;
+ catch (UnsupportedEncodingException e)
+ {
+ // can't happen
+ }
+ catch (DatabaseException dbe)
+ {
+ // can't happen
+ }
}
- }
- /**
- * Creates a new transaction.
- *
- * @return the transaction.
- * @throws DatabaseException in case of underlying database Exception.
- */
- public Transaction beginTransaction() throws DatabaseException
- {
- return dbEnvironment.beginTransaction(null, null);
- }
+ /**
+ * Clears the provided serverId associated to the provided baseDn
+ * from the state Db.
+ *
+ * @param baseDn The baseDn for which the generationID must be cleared.
+ * @param serverId The serverId to remove from the Db.
+ *
+ */
+ public void clearServerId(DN baseDn, Short serverId)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
+ try
+ {
+ String stringId = serverId.toString() + FIELD_SEPARATOR
+ + baseDn.toNormalizedString();
- /**
- * Shutdown the Db environment.
- */
- public void shutdown()
- {
- try
- {
- stateDb.close();
- dbEnvironment.close();
- } catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
+ // Deletes the record serverId/domain base Dn in the stateDb
+ byte[] byteId;
+ byteId = stringId.getBytes("UTF-8");
+ DatabaseEntry key = new DatabaseEntry();
+ key.setData(byteId);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.NOTFOUND)
+ {
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try {
+ data.setData(byteId);
+ stateDb.delete(txn, key);
+ txn.commitWriteNoSync();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ " In " + this.replicationServer.getMonitorInstanceName() +
+ " clearServerId() succeeded " + baseDn + " " +
+ serverId);
+ }
+ catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // can't happen
+ }
+ catch (DatabaseException dbe)
+ {
+ // can't happen
+ }
}
- }
+ /**
+ * Clears the database.
+ *
+ * @param databaseName The name of the database to clear.
+ */
+ public final void clearDb(String databaseName)
+ {
+ try
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "clearDb" + databaseName);
+
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ dbEnvironment.truncateDatabase(txn, databaseName, false);
+ }
+ catch (DatabaseException dbe)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
+ dbe.getLocalizedMessage()));
+ logError(mb.toMessage());
+ }
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 2ae06c4..414dd88 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,13 +25,13 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
-import org.opends.messages.Message;
-
+import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.util.StaticUtils.getFileForPath;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
@@ -61,6 +61,8 @@
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.ResultCode;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
import com.sleepycat.je.DatabaseException;
@@ -101,12 +103,17 @@
private int queueSize;
private String dbDirname = null;
private long trimAge; // the time (in sec) after which the changes must
+ // be deleted from the persistent storage.
private int replicationPort;
- // de deleted from the persistent storage.
private boolean stopListen = false;
private ReplSessionSecurity replSessionSecurity;
/**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
* Creates a new Replication server using the provided configuration entry.
*
* @param configuration The configuration of this replication server.
@@ -191,7 +198,7 @@
// The socket has probably been closed as part of the
// shutdown or changing the port number process.
// just log debug information and loop.
- Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
+ Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
logError(message);
}
}
@@ -272,6 +279,10 @@
String hostname = serverURL.substring(0, separator);
boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
+ if (debugEnabled())
+ TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
+ " connects to " + serverURL);
+
try
{
InetSocketAddress ServerAddr = new InetSocketAddress(
@@ -329,22 +340,41 @@
listenSocket.bind(new InetSocketAddress(changelogPort));
/*
- * create working threads
+ * creates working threads
+ * We must first connect, then start to listen.
*/
- listenThread =
- new ReplicationServerListenThread("Replication Server Listener", this);
- listenThread.start();
+ if (debugEnabled())
+ TRACER.debugInfo("RS " +getMonitorInstanceName()+
+ " creates connect threads");
connectThread =
new ReplicationServerConnectThread("Replication Server Connect", this);
connectThread.start();
+ // FIXME : Is it better to have the time to receive the ReplServerInfo
+ // from all the other replication servers since this info is necessary
+ // to route an early received total update request.
+
+ if (debugEnabled())
+ TRACER.debugInfo("RS " +getMonitorInstanceName()+
+ " creates listen threads");
+
+ listenThread =
+ new ReplicationServerListenThread("Replication Server Listener", this);
+ listenThread.start();
+
+ if (debugEnabled())
+ TRACER.debugInfo("RS " +getMonitorInstanceName()+
+ " successfully initialized");
+
} catch (DatabaseException e)
{
- Message message = ERR_COULD_NOT_INITIALIZE_DB.get(dbDirname);
+ Message message = ERR_COULD_NOT_INITIALIZE_DB.get(
+ getFileForPath(dbDirname).getAbsolutePath());
logError(message);
} catch (ReplicationDBException e)
{
- Message message = ERR_COULD_NOT_READ_DB.get(dbDirname);
+ Message message = ERR_COULD_NOT_READ_DB.get(dbDirname,
+ e.getLocalizedMessage());
logError(message);
} catch (UnknownHostException e)
{
@@ -362,18 +392,22 @@
* Get the ReplicationCache associated to the base DN given in parameter.
*
* @param baseDn The base Dn for which the ReplicationCache must be returned.
+ * @param create Specifies whether to create the ReplicationCache if it does
+ * not already exist.
* @return The ReplicationCache associated to the base DN given in parameter.
*/
- public ReplicationCache getReplicationCache(DN baseDn)
+ public ReplicationCache getReplicationCache(DN baseDn, boolean create)
{
ReplicationCache replicationCache;
synchronized (baseDNs)
{
replicationCache = baseDNs.get(baseDn);
- if (replicationCache == null)
+ if ((replicationCache == null) && (create))
+ {
replicationCache = new ReplicationCache(baseDn, this);
- baseDNs.put(baseDn, replicationCache);
+ baseDNs.put(baseDn, replicationCache);
+ }
}
return replicationCache;
@@ -384,6 +418,9 @@
*/
public void shutdown()
{
+ if (shutdown)
+ return;
+
shutdown = true;
// shutdown the connect thread
@@ -404,6 +441,12 @@
// replication Server service is closing anyway.
}
+ // shutdown the listen thread
+ if (listenThread != null)
+ {
+ listenThread.interrupt();
+ }
+
// shutdown all the ChangelogCaches
for (ReplicationCache replicationCache : baseDNs.values())
{
@@ -424,13 +467,36 @@
*
* @param id The serverId for which the dbHandler must be created.
* @param baseDn The DN for which the dbHandler muste be created.
+ * @param generationId The generationId for this server and this domain.
* @return The new DB handler for this ReplicationServer and the serverId and
* DN given in parameter.
* @throws DatabaseException in case of underlying database problem.
*/
- DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
+ public DbHandler newDbHandler(short id, DN baseDn, long generationId)
+ throws DatabaseException
{
- return new DbHandler(id, baseDn, this, dbEnv);
+ return new DbHandler(id, baseDn, this, dbEnv, generationId);
+ }
+
+ /**
+ * Clears the generationId for the domain related to the provided baseDn.
+ * @param baseDn The baseDn for which to delete the generationId.
+ * @throws DatabaseException When it occurs.
+ */
+ public void clearGenerationId(DN baseDn)
+ throws DatabaseException
+ {
+ try
+ {
+ dbEnv.clearGenerationId(baseDn);
+ }
+ catch(Exception e)
+ {
+ TRACER.debugInfo(
+ "In RS <" + getMonitorInstanceName() +
+ " Exception in clearGenerationId" +
+ stackTraceToSingleLineString(e) + e.getLocalizedMessage());
+ }
}
/**
@@ -618,7 +684,50 @@
Attribute bases = new Attribute(baseType, "base-dn", baseValues);
attributes.add(bases);
+ // Publish to monitor the generation ID by domain
+ AttributeType generationIdType=
+ DirectoryServer.getAttributeType("base-dn-generation-id", true);
+ LinkedHashSet<AttributeValue> generationIdValues =
+ new LinkedHashSet<AttributeValue>();
+ for (DN base : baseDNs.keySet())
+ {
+ long generationId=-1;
+ ReplicationCache cache = getReplicationCache(base, false);
+ if (cache != null)
+ generationId = cache.getGenerationId();
+ generationIdValues.add(new AttributeValue(generationIdType,
+ base.toString() + " " + generationId));
+ }
+ Attribute generationIds = new Attribute(generationIdType, "generation-id",
+ generationIdValues);
+ attributes.add(generationIds);
+
return attributes;
}
+ /**
+ * Get the value of generationId for the replication domain
+ * associated with the provided baseDN.
+ *
+ * @param baseDN The baseDN of the domain.
+ * @return The value of the generationID.
+ */
+ public long getGenerationId(DN baseDN)
+ {
+ ReplicationCache rc = this.getReplicationCache(baseDN, false);
+ if (rc!=null)
+ return rc.getGenerationId();
+ return -1;
+ }
+
+ /**
+ * Get the serverId for this replication server.
+ *
+ * @return The value of the serverId.
+ *
+ */
+ public short getServerId()
+ {
+ return serverId;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 858a498..cc97ccf 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -51,6 +51,8 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
+
+import org.opends.messages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -67,6 +69,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -143,6 +146,7 @@
private short replicationServerId;
private short protocolVersion;
+ private long generationId=-1;
/**
@@ -189,7 +193,7 @@
* Then create the reader and writer thread.
*
* @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
- * null if this is an incoming connection.
+ * null if this is an incoming connection (listen).
* @param replicationServerId The identifier of the replicationServer that
* creates this server handler.
* @param replicationServerURL The URL of the replicationServer that creates
@@ -206,22 +210,34 @@
int windowSize, boolean sslEncryption,
ReplicationServer replicationServer)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
+ " starts a new LS or RS " +
+ ((baseDn == null)?"incoming connection":"outgoing connection"));
+
this.replicationServerId = replicationServerId;
rcvWindowSizeHalf = windowSize/2;
maxRcvWindow = windowSize;
rcvWindow = windowSize;
+ long localGenerationId=-1;
try
{
if (baseDn != null)
{
// This is an outgoing connection. Publish our start message.
this.baseDn = baseDn;
- replicationCache = replicationServer.getReplicationCache(baseDn);
+
+ // Get or create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(baseDn, true);
+ localGenerationId = replicationCache.getGenerationId();
+
ServerState localServerState = replicationCache.getDbServerState();
ReplServerStartMessage msg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
baseDn, windowSize, localServerState,
- protocolVersion, sslEncryption);
+ protocolVersion, localGenerationId,
+ sslEncryption);
+
session.publish(msg);
}
@@ -229,9 +245,10 @@
ReplicationMessage msg = session.receive();
if (msg instanceof ServerStartMessage)
{
- // The remote server is an LDAP Server
+ // The remote server is an LDAP Server.
ServerStartMessage receivedMsg = (ServerStartMessage) msg;
+ generationId = receivedMsg.getGenerationId();
protocolVersion = ProtocolVersion.minWithCurrent(
receivedMsg.getVersion());
serverId = receivedMsg.getServerId();
@@ -281,15 +298,69 @@
serverIsLDAPserver = true;
- // This an incoming connection. Publish our start message
- replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ // Get or Create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(this.baseDn,
+ true);
+ localGenerationId = replicationCache.getGenerationId();
+
ServerState localServerState = replicationCache.getDbServerState();
+ // This an incoming connection. Publish our start message
ReplServerStartMessage myStartMsg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
this.baseDn, windowSize, localServerState,
- protocolVersion, sslEncryption);
+ protocolVersion, localGenerationId,
+ sslEncryption);
session.publish(myStartMsg);
sendWindowSize = receivedMsg.getWindowSize();
+
+ /* Until here session is encrypted then it depends on the negociation */
+ if (!sslEncryption)
+ {
+ session.stopEncryption();
+ }
+
+ if (debugEnabled())
+ {
+ Set<String> ss = this.serverState.toStringSet();
+ Set<String> lss = replicationCache.getDbServerState().toStringSet();
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ ", SH received START from LS serverId=" + serverId +
+ " baseDN=" + this.baseDn +
+ " generationId=" + generationId +
+ " localGenerationId=" + localGenerationId +
+ " state=" + ss +
+ " and sent ReplServerStart with state=" + lss);
+ }
+
+ /*
+ * If we have already a generationID set for the domain
+ * then
+ * if the connecting replica has not the same
+ * then it is degraded locally and notified by an error message
+ * else
+ * we set the generationID from the one received
+ * (unsaved yet on disk . will be set with the 1rst change received)
+ */
+ if (localGenerationId>0)
+ {
+ if (generationId != localGenerationId)
+ {
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ receivedMsg.getBaseDn().toNormalizedString(),
+ Short.toString(receivedMsg.getServerId()),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+
+ ErrorMessage errorMsg =
+ new ErrorMessage(replicationServerId, serverId, message);
+ session.publish(errorMsg);
+ }
+ }
+ else
+ {
+ replicationCache.setGenerationId(generationId, false);
+ }
}
else if (msg instanceof ReplServerStartMessage)
{
@@ -297,6 +368,7 @@
ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
protocolVersion = ProtocolVersion.minWithCurrent(
receivedMsg.getVersion());
+ generationId = receivedMsg.getGenerationId();
serverId = receivedMsg.getServerId();
serverURL = receivedMsg.getServerURL();
int separator = serverURL.lastIndexOf(':');
@@ -306,7 +378,10 @@
this.baseDn = receivedMsg.getBaseDn();
if (baseDn == null)
{
- replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ // Get or create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(this.baseDn,
+ true);
+ localGenerationId = replicationCache.getGenerationId();
ServerState serverState = replicationCache.getDbServerState();
// The session initiator decides whether to use SSL.
@@ -317,7 +392,9 @@
new ReplServerStartMessage(replicationServerId,
replicationServerURL,
this.baseDn, windowSize, serverState,
- protocolVersion, sslEncryption);
+ protocolVersion,
+ localGenerationId,
+ sslEncryption);
session.publish(outMsg);
}
else
@@ -326,6 +403,107 @@
}
this.serverState = receivedMsg.getServerState();
sendWindowSize = receivedMsg.getWindowSize();
+
+ /* Until here session is encrypted then it depends on the negociation */
+ if (!sslEncryption)
+ {
+ session.stopEncryption();
+ }
+
+ if (debugEnabled())
+ {
+ Set<String> ss = this.serverState.toStringSet();
+ Set<String> lss = replicationCache.getDbServerState().toStringSet();
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ ", SH received START from RS serverId=" + serverId +
+ " baseDN=" + this.baseDn +
+ " generationId=" + generationId +
+ " localGenerationId=" + localGenerationId +
+ " state=" + ss +
+ " and sent ReplServerStart with state=" + lss);
+ }
+
+ // if the remote RS and the local RS have the same genID
+ // then it's ok and nothing else to do
+ if (generationId == localGenerationId)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() + " RS with serverID=" + serverId +
+ " is connected with the right generation ID");
+ }
+ }
+ else
+ {
+ if (localGenerationId>0)
+ {
+ // if the local RS is initialized
+ if (generationId>0)
+ {
+ // if the remote RS is initialized
+ if (generationId != localGenerationId)
+ {
+ // if the 2 RS have different generationID
+ if (replicationCache.getGenerationIdSavedStatus())
+ {
+ // it the present RS has received changes regarding its
+ // gen ID and so won't change without a reset
+ // then we are just degrading the peer.
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ this.baseDn.toNormalizedString(),
+ Short.toString(receivedMsg.getServerId()),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+
+ ErrorMessage errorMsg =
+ new ErrorMessage(replicationServerId, serverId, message);
+ session.publish(errorMsg);
+ }
+ else
+ {
+ // The present RS has never received changes regarding its
+ // gen ID.
+ //
+ // Example case:
+ // - we are in RS1
+ // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
+ // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
+ // - we are in RS1 and we receive a START msg from RS2
+ // - Each RS keeps its genID / is degraded and when LS2 will
+ // be populated from LS1 everything will becomes ok.
+ //
+ // Issue:
+ // FIXME : Would it be a good idea in some cases to just
+ // set the gen ID received from the peer RS
+ // specially if the peer has a non nul state and
+ // we have a nul state ?
+ // replicationCache.setGenerationId(generationId, false);
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ this.baseDn.toNormalizedString(),
+ Short.toString(receivedMsg.getServerId()),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+
+ ErrorMessage errorMsg =
+ new ErrorMessage(replicationServerId, serverId, message);
+ session.publish(errorMsg);
+ }
+ }
+ }
+ else
+ {
+ // The remote has no genId. We don't change anything for the
+ // current RS.
+ }
+ }
+ else
+ {
+ // The local RS is not initialized - take the one received
+ replicationCache.setGenerationId(generationId, false);
+ }
+ }
}
else
{
@@ -333,12 +511,9 @@
return; // we did not recognize the message, ignore it
}
- if (!sslEncryption)
- {
- session.stopEncryption();
- }
-
- replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ // Get or create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(this.baseDn,
+ true);
boolean started;
if (serverIsLDAPserver)
@@ -352,10 +527,11 @@
if (started)
{
- writer = new ServerWriter(session, serverId, this, replicationCache);
+ // sendWindow MUST be created before starting the writer
+ sendWindow = new Semaphore(sendWindowSize);
- reader = new ServerReader(session, serverId, this,
- replicationCache);
+ writer = new ServerWriter(session, serverId, this, replicationCache);
+ reader = new ServerReader(session, serverId, this, replicationCache);
reader.start();
writer.start();
@@ -377,6 +553,12 @@
// the connection is not valid, close it.
try
{
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() + " RS failed to start locally " +
+ " the connection from serverID="+serverId);
+ }
session.close();
} catch (IOException e1)
{
@@ -388,7 +570,8 @@
{
// some problem happened, reject the connection
MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(this.toString()));
+ mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
+ this.getMonitorInstanceName()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
try
@@ -399,7 +582,6 @@
// ignore
}
}
- sendWindow = new Semaphore(sendWindowSize);
}
/**
@@ -720,6 +902,21 @@
*/
public void add(UpdateMessage update, ServerHandler sourceHandler)
{
+ /*
+ * Ignore updates from a server that is degraded due to
+ * its inconsistent generationId
+ */
+ long referenceGenerationId = replicationCache.getGenerationId();
+ if ((referenceGenerationId>0) &&
+ (referenceGenerationId != generationId))
+ {
+ logError(ERR_IGNORING_UPDATE_TO.get(
+ update.getDn(),
+ this.getMonitorInstanceName()));
+
+ return;
+ }
+
synchronized (msgQueue)
{
/*
@@ -1164,7 +1361,7 @@
if (serverIsLDAPserver)
return "Remote LDAP Server " + str;
else
- return "Remote Replication Server " + str;
+ return "Remote Repl Server " + str;
}
/**
@@ -1261,7 +1458,10 @@
attributes.add(attr);
attributes.add(new Attribute("ssl-encryption",
- String.valueOf(session.isEncrypted())));
+ String.valueOf(session.isEncrypted())));
+
+ attributes.add(new Attribute("generation-id",
+ String.valueOf(generationId)));
return attributes;
}
@@ -1385,9 +1585,10 @@
public void process(RoutableMessage msg)
{
if (debugEnabled())
- TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
- msg + " from " + serverId);
-
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " processes received msg=" + msg);
replicationCache.process(msg, this);
}
@@ -1401,6 +1602,12 @@
public void sendInfo(ReplServerInfoMessage info)
throws IOException
{
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " sends message=" + info);
+
session.publish(info);
}
@@ -1412,7 +1619,13 @@
*/
public void setReplServerInfo(ReplServerInfoMessage infoMsg)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " sets replServerInfo " + "<" + infoMsg + ">");
remoteLDAPservers = infoMsg.getConnectedServers();
+ generationId = infoMsg.getGenerationId();
}
/**
@@ -1458,8 +1671,10 @@
public void send(RoutableMessage msg) throws IOException
{
if (debugEnabled())
- TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
- msg + " to " + serverId);
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " sends message=" + msg);
session.publish(msg);
}
@@ -1492,4 +1707,48 @@
checkWindow();
}
}
+
+ /**
+ * Returns the value of generationId for that handler.
+ * @return The value of the generationId.
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * Resets the generationId for this domain.
+ */
+ public void resetGenerationId()
+ {
+ // Notify the peer that it is now invalid regarding the generationId
+ // We are now waiting a startServer message from this server with
+ // a valid generationId.
+ try
+ {
+ Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
+ ErrorMessage errorMsg =
+ new ErrorMessage(serverId, replicationServerId, message);
+ session.publish(errorMsg);
+ }
+ catch (Exception e)
+ {
+ // FIXME Log exception when sending reset error message
+ }
+ }
+
+ /**
+ * Sends a message containing a generationId to a peer server.
+ * The peer is expected to be a replication server.
+ *
+ * @param msg The GenerationIdMessage message to be sent.
+ * @throws IOException When it occurs while sending the message,
+ *
+ */
+ public void sendGenerationId(ResetGenerationId msg)
+ throws IOException
+ {
+ session.publish(msg);
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index e61446f..cb66bd2 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -30,6 +30,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -39,6 +40,7 @@
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -76,7 +78,7 @@
* Constructor for the LDAP server reader part of the replicationServer.
*
* @param session The ProtocolSession from which to read the data.
- * @param serverId The server ID of the server from which we read changes.
+ * @param serverId The server ID of the server from which we read messages.
* @param handler The server handler for this server reader.
* @param replicationCache The ReplicationCache for this server reader.
*/
@@ -97,14 +99,11 @@
{
if (debugEnabled())
{
- if (handler.isReplicationServer())
- {
- TRACER.debugInfo("Replication server reader starting " + serverId);
- }
- else
- {
- TRACER.debugInfo("LDAP server reader starting " + serverId);
- }
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ (handler.isReplicationServer()?" RS ":" LS")+
+ " reader starting for serverId=" + serverId);
}
/*
* wait on input stream
@@ -116,6 +115,25 @@
{
ReplicationMessage msg = session.receive();
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ {
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ "> from RS server with serverId=" + serverId +
+ " receives " + msg);
+ }
+ else
+ {
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ "> from LDAP server with serverId=" + serverId +
+ " receives " + msg);
+ }
+ }
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -124,9 +142,22 @@
}
else if (msg instanceof UpdateMessage)
{
- UpdateMessage update = (UpdateMessage) msg;
- handler.decAndCheckWindow();
- replicationCache.put(update, handler);
+ // Ignore update received from a replica with
+ // a bad generation ID
+ long referenceGenerationId = replicationCache.getGenerationId();
+ if ((referenceGenerationId>0) &&
+ (referenceGenerationId != handler.getGenerationId()))
+ {
+ logError(ERR_IGNORING_UPDATE_FROM.get(
+ msg.toString(),
+ handler.getMonitorInstanceName()));
+ }
+ else
+ {
+ UpdateMessage update = (UpdateMessage) msg;
+ handler.decAndCheckWindow();
+ replicationCache.put(update, handler);
+ }
}
else if (msg instanceof WindowMessage)
{
@@ -159,6 +190,11 @@
ErrorMessage errorMsg = (ErrorMessage) msg;
handler.process(errorMsg);
}
+ else if (msg instanceof ResetGenerationId)
+ {
+ ResetGenerationId genIdMsg = (ResetGenerationId) msg;
+ replicationCache.resetGenerationId(this.handler);
+ }
else if (msg instanceof WindowProbe)
{
WindowProbe windowProbeMsg = (WindowProbe) msg;
@@ -168,6 +204,52 @@
{
ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
handler.setReplServerInfo(infoMsg);
+
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ TRACER.debugInfo(
+ "In RS " + replicationCache.getReplicationServer().
+ getServerId() +
+ " Receiving replServerInfo from " + handler.getServerId() +
+ " baseDn=" + replicationCache.getBaseDn() +
+ " genId=" + infoMsg.getGenerationId());
+ }
+
+ if (replicationCache.getGenerationId()<0)
+ {
+ // Here is the case where a ReplicationServer receives from
+ // another ReplicationServer the generationId for a domain
+ // for which the generation ID has never been set.
+ replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
+ }
+ else
+ {
+ if (infoMsg.getGenerationId()<0)
+ {
+ // Here is the case where another ReplicationServer
+ // signals that it has no generationId set for the domain.
+ // If we have generationId set locally and no server currently
+ // connected for that domain in the topology then we may also
+ // reset the generationId localy.
+ replicationCache.mayResetGenerationId();
+ }
+
+ if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
+ {
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ replicationCache.getBaseDn().toNormalizedString(),
+ Short.toString(handler.getServerId()),
+ Long.toString(infoMsg.getGenerationId()),
+ Long.toString(replicationCache.getGenerationId()));
+
+ ErrorMessage errorMsg = new ErrorMessage(
+ replicationCache.getReplicationServer().getServerId(),
+ handler.getServerId(),
+ message);
+ session.publish(errorMsg);
+ }
+ }
}
else if (msg == null)
{
@@ -187,21 +269,40 @@
* Log a message and exit from this loop
* So that this handler is stopped.
*/
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " reader IO EXCEPTION serverID=" + serverId
+ + stackTraceToSingleLineString(e) + e.getLocalizedMessage() +
+ e.getCause());
Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
logError(message);
} catch (ClassNotFoundException e)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " reader CNF EXCEPTION serverID=" + serverId
+ + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
- * close the conenction.
+ * close the connection.
*/
Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
logError(message);
} catch (Exception e)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " server reader EXCEPTION serverID=" + serverId
+ + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
- * close the conenction.
+ * close the connection.
*/
Message message = NOTE_READER_EXCEPTION.get(handler.toString());
logError(message);
@@ -213,6 +314,12 @@
* happen.
* Attempt to close the socket and stop the server handler.
*/
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " reader CLOSE serverID=" + serverId
+ + stackTraceToSingleLineString(new Exception()));
try
{
session.close();
@@ -223,15 +330,11 @@
replicationCache.stopServer(handler);
}
if (debugEnabled())
- {
- if (handler.isReplicationServer())
- {
- TRACER.debugInfo("Replication server reader stopping " + serverId);
- }
- else
- {
- TRACER.debugInfo("LDAP server reader stopping " + serverId);
- }
- }
+ TRACER.debugInfo(
+ "In RS <" + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ (handler.isReplicationServer()?"RS":"LDAP") +
+ " server reader stopped for serverID=" + serverId
+ + stackTraceToSingleLineString(new Exception()));
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 9a865a0..b051836 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -30,6 +30,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.messages.ReplicationMessages.*;
import java.io.IOException;
@@ -97,12 +98,35 @@
TRACER.debugInfo("LDAP server writer starting " + serverId);
}
}
- try {
+ try
+ {
while (true)
{
UpdateMessage update = replicationCache.take(this.handler);
if (update == null)
return; /* this connection is closing */
+
+ // Ignore update to be sent to a replica with a bad generation ID
+ long referenceGenerationId = replicationCache.getGenerationId();
+ if (referenceGenerationId != handler.getGenerationId())
+ {
+ logError(ERR_IGNORING_UPDATE_TO.get(
+ update.getDn(),
+ this.handler.getMonitorInstanceName()));
+ continue;
+ }
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo(
+ "In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ ", writer to " + this.handler.getMonitorInstanceName() +
+ " publishes" + update.toString() +
+ " refgenId=" + referenceGenerationId +
+ " server=" + handler.getServerId() +
+ " generationId=" + handler.getGenerationId());
+ }
session.publish(update);
}
}
@@ -130,7 +154,8 @@
* An unexpected error happened.
* Log an error and close the connection.
*/
- Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString());
+ Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
+ " " + stackTraceToSingleLineString(e));
logError(message);
}
finally {
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
index 787dd48..e98356a 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -129,7 +129,7 @@
}
try
{
- domain.initializeTarget(target, this);
+ domain.initializeRemote(target, this);
}
catch(DirectoryException de)
{
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTask.java b/opends/src/server/org/opends/server/tasks/InitializeTask.java
index 05afb59..11a4fd8 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -139,7 +139,7 @@
try
{
// launch the import
- domain.initialize(source, this);
+ domain.initializeFromRemote(source, this);
synchronized(initState)
{
diff --git a/opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java b/opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java
new file mode 100644
index 0000000..ba4b078
--- /dev/null
+++ b/opends/src/server/org/opends/server/tasks/SetGenerationIdTask.java
@@ -0,0 +1,128 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.tasks;
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.core.DirectoryServer.getAttributeType;
+
+import java.util.List;
+
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.TaskMessages;
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ResultCode;
+
+/**
+ * This class provides an implementation of a Directory Server task that can
+ * be used to import data over the replication protocol from another
+ * server hosting the same replication domain.
+ */
+public class SetGenerationIdTask extends Task
+{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ boolean isCompressed = false;
+ boolean isEncrypted = false;
+ boolean skipSchemaValidation = false;
+ String domainString = null;
+ ReplicationDomain domain = null;
+ TaskState initState;
+
+ private static final void debugInfo(String s)
+ {
+ if (debugEnabled())
+ {
+ // System.out.println(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ TRACER.debugInfo(s);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void initializeTask() throws DirectoryException
+ {
+ if (TaskState.isDone(getTaskState()))
+ {
+ return;
+ }
+
+ // FIXME -- Do we need any special authorization here?
+ Entry taskEntry = getTaskEntry();
+
+ AttributeType typeDomainBase;
+
+ // Retrieves the replication domain
+ typeDomainBase =
+ getAttributeType(ATTR_TASK_SET_GENERATION_ID_DOMAIN_DN, true);
+
+ List<Attribute> attrList;
+ attrList = taskEntry.getAttribute(typeDomainBase);
+ domainString = TaskUtils.getSingleValueString(attrList);
+ DN domainDN = DN.nullDN();
+ try
+ {
+ domainDN = DN.decode(domainString);
+ }
+ catch(Exception e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get());
+ mb.append(e.getMessage());
+ throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX,
+ mb.toMessage());
+ }
+
+ domain = ReplicationDomain.retrievesReplicationDomain(domainDN);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected TaskState runTask()
+ {
+ debugInfo("setGenerationIdTask is starting on domain%s" +
+ domain.getBaseDN());
+
+ domain.resetGenerationId();
+
+ debugInfo("setGenerationIdTask is ending SUCCESSFULLY");
+ return TaskState.COMPLETED_SUCCESSFULLY;
+ }
+}
diff --git a/opends/src/server/org/opends/server/types/Schema.java b/opends/src/server/org/opends/server/types/Schema.java
index d9b1b3b..be481b3 100644
--- a/opends/src/server/org/opends/server/types/Schema.java
+++ b/opends/src/server/org/opends/server/types/Schema.java
@@ -207,9 +207,13 @@
// file.
private long youngestModificationTime;
- // The Synchronization State.
+ // The synchronization State.
private LinkedHashSet<AttributeValue> synchronizationState = null;
+ // The synchronization generationId.
+ private LinkedHashSet<AttributeValue> synchronizationGenerationId
+ = null;
+
/**
@@ -2766,6 +2770,11 @@
dupSchema.synchronizationState =
new LinkedHashSet<AttributeValue>(synchronizationState);
}
+ if (synchronizationGenerationId != null)
+ {
+ dupSchema.synchronizationGenerationId = new
+ LinkedHashSet<AttributeValue>(synchronizationGenerationId);
+ }
return dupSchema;
}
@@ -2786,6 +2795,28 @@
*
* @param values Synchronization state for this schema.
*/
+ public void setSynchronizationGenerationId(
+ LinkedHashSet<AttributeValue> values)
+ {
+ synchronizationGenerationId = values;
+ }
+
+ /**
+ * Retrieves the Synchronization generationId for this schema.
+ *
+ * @return The Synchronization generationId for this schema.
+ */
+ public LinkedHashSet<AttributeValue>
+ getSynchronizationGenerationId()
+ {
+ return synchronizationGenerationId;
+ }
+
+ /**
+ * Sets the Synchronization state for this schema.
+ *
+ * @param values Synchronization state for this schema.
+ */
public void setSynchronizationState(
LinkedHashSet<AttributeValue> values)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
new file mode 100644
index 0000000..a6b866f
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -0,0 +1,1401 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication;
+
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.File;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.net.SocketTimeoutException;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.backends.task.TaskState;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.AddOperationBasis;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DoneMessage;
+import org.opends.server.replication.protocol.EntryMessage;
+import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.InitializeTargetMessage;
+import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.SocketSession;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.schema.DirectoryStringSyntax;
+import org.opends.server.tasks.LdifFileWriter;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
+import org.opends.server.types.SearchScope;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests contained here:
+ *
+ * - testSingleRS : test generation ID setting with different servers and one
+ * Replication server.
+ *
+ * - testMultiRS : tests generation ID propagatoion with more than one
+ * Replication server.
+ *
+ */
+
+public class GenerationIdTest extends ReplicationTestCase
+{
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
+
+ private static final String baseDnStr = "dc=example,dc=com";
+ private static final String baseSnStr = "genidcom";
+
+ private static final int WINDOW_SIZE = 10;
+ private static final int CHANGELOG_QUEUE_SIZE = 100;
+ private static final short server1ID = 1;
+ private static final short server2ID = 2;
+ private static final short server3ID = 3;
+ private static final short changelog1ID = 11;
+ private static final short changelog2ID = 12;
+ private static final short changelog3ID = 13;
+
+ private DN baseDn;
+ private ReplicationBroker broker2 = null;
+ private ReplicationBroker broker3 = null;
+ private ReplicationServer replServer1 = null;
+ private ReplicationServer replServer2 = null;
+ private ReplicationServer replServer3 = null;
+ private boolean emptyOldChanges = true;
+ ReplicationDomain replDomain = null;
+ private Entry taskInitRemoteS2;
+ SocketSession ssSession = null;
+ boolean ssShutdownRequested = false;
+ protected String[] updatedEntries;
+
+ private static int[] replServerPort = new int[20];
+
+ /**
+ * A temporary LDIF file containing some test entries.
+ */
+ private File ldifFile;
+
+ /**
+ * A temporary file to contain rejected entries.
+ */
+ private File rejectFile;
+
+ /**
+ * A makeldif template used to create some test entries.
+ */
+ private static String diff = "";
+ private static String[] template = new String[] {
+ "define suffix=" + baseDnStr,
+ "define maildomain=example.com",
+ "define numusers=11",
+ "",
+ "branch: [suffix]",
+ "",
+ "branch: ou=People,[suffix]",
+ "subordinateTemplate: person:[numusers]",
+ "",
+ "template: person",
+ "rdnAttr: uid",
+ "objectClass: top",
+ "objectClass: person",
+ "objectClass: organizationalPerson",
+ "objectClass: inetOrgPerson",
+ "givenName: <first>",
+ "sn: <last>",
+ "cn: {givenName} {sn}",
+ "initials: {givenName:1}<random:chars:" +
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ:1>{sn:1}",
+ "employeeNumber: <sequential:0>",
+ "uid: user.{employeeNumber}",
+ "mail: {uid}@[maildomain]",
+ "userPassword: password",
+ "telephoneNumber: <random:telephone>",
+ "homePhone: <random:telephone>",
+ "pager: <random:telephone>",
+ "mobile: <random:telephone>",
+ "street: <random:numeric:5> <file:streets> Street",
+ "l: <file:cities>",
+ "st: <file:states>",
+ "postalCode: <random:numeric:5>",
+ "postalAddress: {cn}${street}${l}, {st} {postalCode}",
+ "description: This is the description for {cn} " + diff,
+ ""};
+
+
+ private void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST **" + s);
+ }
+ }
+ protected void debugInfo(String message, Exception e)
+ {
+ debugInfo(message + stackTraceToSingleLineString(e));
+ }
+
+ /**
+ * Set up the environment for performing the tests in this Class.
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled());
+
+ // This test suite depends on having the schema available.
+ TestCaseUtils.startServer();
+
+ baseDn = DN.decode(baseDnStr);
+
+ updatedEntries = newLDIFEntries();
+
+ // Create an internal connection in order to provide operations
+ // to DS to populate the db -
+ connection = InternalClientConnection.getRootConnection();
+
+ // Synchro provider
+ String synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+ // Synchro multi-master
+ synchroPluginStringDN = "cn=Multimaster Synchronization, "
+ + synchroStringDN;
+
+ // Synchro suffix
+ synchroServerEntry = null;
+
+ // Add config entries to the current DS server based on :
+ // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
+ // Add synchroServerEntry
+ // Add replServerEntry
+ configureReplication();
+
+ taskInitRemoteS2 = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
+ "ds-task-initialize-domain-dn: " + baseDn,
+ "ds-task-initialize-replica-server-id: " + server2ID);
+
+ // Change log
+ String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
+ String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
+ + "ds-cfg-changelog-server-id: 1\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
+ + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
+ replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
+ replServerEntry = null;
+
+ }
+
+ // Tests that entries have been written in the db
+ private int testEntriesInDb()
+ {
+ debugInfo("TestEntriesInDb");
+ short found = 0;
+
+ for (String entry : updatedEntries)
+ {
+
+ int dns = entry.indexOf("dn: ");
+ int dne = entry.indexOf("dc=com");
+ String dn = entry.substring(dns+4,dne+6);
+
+ debugInfo("Search Entry: " + dn);
+
+ DN entryDN = null;
+ try
+ {
+ entryDN = DN.decode(dn);
+ }
+ catch(Exception e)
+ {
+ debugInfo("TestEntriesInDb/" + e);
+ }
+
+ try
+ {
+ Entry resultEntry = getEntry(entryDN, 1000, true);
+ if (resultEntry==null)
+ {
+ debugInfo("Entry not found <" + dn + ">");
+ }
+ else
+ {
+ debugInfo("Entry found <" + dn + ">");
+ found++;
+ }
+ }
+ catch(Exception e)
+ {
+ debugInfo("TestEntriesInDb/", e);
+ }
+ }
+ return found;
+ }
+
+ /**
+ * Add a task to the configuration of the current running DS.
+ * @param taskEntry The task to add.
+ * @param expectedResult The expected result code for the ADD.
+ * @param errorMessageID The expected error messageID when the expected
+ * result code is not SUCCESS
+ */
+ private void addTask(Entry taskEntry, ResultCode expectedResult,
+ Message errorMessage)
+ {
+ try
+ {
+ debugInfo("AddTask/" + taskEntry);
+
+ // Change config of DS to launch the total update task
+ InternalClientConnection connection =
+ InternalClientConnection.getRootConnection();
+
+ // Add the task.
+
+ AddOperation addOperation =
+ connection.processAdd(taskEntry.getDN(),
+ taskEntry.getObjectClasses(),
+ taskEntry.getUserAttributes(),
+ taskEntry.getOperationalAttributes());
+
+ assertEquals(addOperation.getResultCode(), expectedResult,
+ "Result of ADD operation of the task is: "
+ + addOperation.getResultCode()
+ + " Expected:"
+ + expectedResult + " Details:" + addOperation.getErrorMessage()
+ + addOperation.getAdditionalLogMessage());
+
+ if (expectedResult != ResultCode.SUCCESS)
+ {
+ assertTrue(addOperation.getErrorMessage().toString().
+ startsWith(errorMessage.toString()),
+ "Error MsgID of the task <"
+ + addOperation.getErrorMessage()
+ + "> equals <"
+ + errorMessage + ">");
+ debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId()
+ + addOperation.getErrorMessage() + ">");
+
+ }
+ else
+ {
+ waitTaskState(taskEntry, TaskState.RUNNING, null);
+ }
+
+ // Entry will be removed at the end of the test
+ entryList.addLast(taskEntry.getDN());
+
+ debugInfo("AddedTask/" + taskEntry.getDN());
+ }
+ catch(Exception e)
+ {
+ fail("Exception when adding task:"+ e.getMessage());
+ }
+ }
+
+ private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
+ Message expectedMessage)
+ {
+ TaskState taskState = null;
+ try
+ {
+
+ SearchFilter filter =
+ SearchFilter.createFilterFromString("(objectclass=*)");
+ Entry resultEntry = null;
+ do
+ {
+ InternalSearchOperation searchOperation =
+ connection.processSearch(taskEntry.getDN(),
+ SearchScope.BASE_OBJECT,
+ filter);
+ try
+ {
+ resultEntry = searchOperation.getSearchEntries().getFirst();
+ } catch (Exception e)
+ {
+ fail("Task entry was not returned from the search.");
+ continue;
+ }
+
+ try
+ {
+ // Check that the task state is as expected.
+ AttributeType taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
+ String stateString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+ taskState = TaskState.fromString(stateString);
+ }
+ catch(Exception e)
+ {
+ fail("Exception"+ e.getMessage()+e.getStackTrace());
+ }
+ Thread.sleep(500);
+ }
+ while ((taskState != expectedTaskState) &&
+ (taskState != TaskState.STOPPED_BY_ERROR));
+
+ // Check that the task contains some log messages.
+ AttributeType logMessagesType = DirectoryServer.getAttributeType(
+ ATTR_TASK_LOG_MESSAGES.toLowerCase());
+ ArrayList<String> logMessages = new ArrayList<String>();
+ resultEntry.getAttributeValues(logMessagesType,
+ DirectoryStringSyntax.DECODER,
+ logMessages);
+
+ if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
+ && (taskState != TaskState.RUNNING))
+ {
+ if (logMessages.size() == 0)
+ {
+ fail("No log messages were written to the task entry on a failed task");
+ }
+ else
+ {
+ if (expectedMessage != null)
+ {
+ debugInfo(logMessages.get(0));
+ debugInfo(expectedMessage.toString());
+ assertTrue(logMessages.get(0).indexOf(
+ expectedMessage.toString())>0);
+ }
+ }
+ }
+
+ assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
+ " Expected task state:" + expectedTaskState);
+ }
+ catch(Exception e)
+ {
+ fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ /**
+ * Add to the current DB the entries necessary to the test
+ */
+ private void addTestEntriesToDB(String[] ldifEntries)
+ {
+ try
+ {
+ // Change config of DS to launch the total update task
+ InternalClientConnection connection =
+ InternalClientConnection.getRootConnection();
+
+ for (String ldifEntry : ldifEntries)
+ {
+ Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
+ AddOperationBasis addOp = new AddOperationBasis(
+ connection,
+ InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(),
+ null,
+ entry.getDN(),
+ entry.getObjectClasses(),
+ entry.getUserAttributes(),
+ entry.getOperationalAttributes());
+ addOp.setInternalOperation(true);
+ addOp.run();
+ if (addOp.getResultCode() != ResultCode.SUCCESS)
+ {
+ debugInfo("addEntry: Failed" + addOp.getResultCode());
+ }
+ // They will be removed at the end of the test
+ entryList.addLast(entry.getDN());
+ }
+ }
+ catch(Exception e)
+ {
+ fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ /*
+ * Creates entries necessary to the test.
+ */
+ private String[] newLDIFEntries()
+ {
+ String[] entries =
+ {
+ "dn: " + baseDn + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
+ + "\n",
+ "dn: ou=People," + baseDn + "\n"
+ + "objectClass: top\n"
+ + "objectClass: organizationalUnit\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
+ + "\n",
+ "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n"
+ + "objectclass: top\n"
+ + "objectclass: person\n"
+ + "objectclass: organizationalPerson\n"
+ + "objectclass: inetOrgPerson\n"
+ + "cn: Fiona Jensen\n"
+ + "sn: Jensen\n"
+ + "uid: fiona\n"
+ + "telephonenumber: +1 408 555 1212\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
+ + "\n",
+ "dn: cn=Robert Langman,ou=people," + baseDn + "\n"
+ + "objectclass: top\n"
+ + "objectclass: person\n"
+ + "objectclass: organizationalPerson\n"
+ + "objectclass: inetOrgPerson\n"
+ + "cn: Robert Langman\n"
+ + "sn: Langman\n"
+ + "uid: robert\n"
+ + "telephonenumber: +1 408 555 1213\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
+ + "\n"
+ };
+
+ return entries;
+ }
+
+ private int receiveImport(ReplicationBroker broker, short serverID,
+ String[] updatedEntries)
+ {
+ // Expect the broker to receive the entries
+ ReplicationMessage msg;
+ short entriesReceived = 0;
+ while (true)
+ {
+ try
+ {
+ debugInfo("Broker " + serverID + " Wait for entry or done msg");
+ msg = broker.receive();
+
+ if (msg == null)
+ break;
+
+ if (msg instanceof InitializeTargetMessage)
+ {
+ debugInfo("Broker " + serverID + " receives InitializeTargetMessage ");
+ entriesReceived = 0;
+ }
+ else if (msg instanceof EntryMessage)
+ {
+ EntryMessage em = (EntryMessage)msg;
+ debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
+ entriesReceived++;
+ }
+ else if (msg instanceof DoneMessage)
+ {
+ debugInfo("Broker " + serverID + " receives done ");
+ break;
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ ErrorMessage em = (ErrorMessage)msg;
+ debugInfo("Broker " + serverID + " receives ERROR "
+ + em.toString());
+ break;
+ }
+ else
+ {
+ debugInfo("Broker " + serverID + " receives and trashes " + msg);
+ }
+ }
+ catch(Exception e)
+ {
+ debugInfo("receiveUpdatedEntries" + stackTraceToSingleLineString(e));
+ }
+ }
+
+ if (updatedEntries != null)
+ {
+ assertTrue(entriesReceived == updatedEntries.length,
+ " Received entries("+entriesReceived +
+ ") == Expected entries("+updatedEntries.length+")");
+ }
+
+ return entriesReceived;
+ }
+
+ /**
+ * Creates a new replicationServer.
+ * @param changelogId The serverID of the replicationServer to create.
+ * @param all Specifies whether to coonect the created replication
+ * server to the other replication servers in the test.
+ * @return The new created replication server.
+ */
+ private ReplicationServer createReplicationServer(short changelogId,
+ boolean all, String suffix)
+ {
+ SortedSet<String> servers = null;
+ servers = new TreeSet<String>();
+ try
+ {
+ if (changelogId==changelog1ID)
+ {
+ if (replServer1!=null)
+ return replServer1;
+ }
+ else if (changelogId==changelog2ID)
+ {
+ if (replServer2!=null)
+ return replServer2;
+ }
+ else if (changelogId==changelog3ID)
+ {
+ if (replServer3!=null)
+ return replServer3;
+ }
+ if (all)
+ {
+ servers.add("localhost:" + getChangelogPort(changelog1ID));
+ servers.add("localhost:" + getChangelogPort(changelog2ID));
+ servers.add("localhost:" + getChangelogPort(changelog3ID));
+ }
+ int chPort = getChangelogPort(changelogId);
+ String chDir = "genid"+changelogId+suffix+"Db";
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
+ servers);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ Thread.sleep(1000);
+
+ return replicationServer;
+
+ }
+ catch (Exception e)
+ {
+ fail("createChangelog" + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Create a synchronized suffix in the current server providing the
+ * replication Server ID.
+ * @param changelogID
+ */
+ private void connectToReplServer(short changelogID)
+ {
+ // Connect DS to the replicationServer
+ try
+ {
+ // suffix synchronized
+ String synchroServerStringDN = synchroPluginStringDN;
+ String synchroServerLdif =
+ "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider-config\n"
+ + "cn: " + baseSnStr + "\n"
+ + "ds-cfg-synchronization-dn: " + baseDnStr + "\n"
+ + "ds-cfg-changelog-server: localhost:"
+ + getChangelogPort(changelogID)+"\n"
+ + "ds-cfg-directory-server-id: " + server1ID + "\n"
+ + "ds-cfg-receive-status: true\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE;
+
+ if (synchroServerEntry == null)
+ {
+ synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the synchronized server");
+ configEntryList.add(synchroServerEntry.getDN());
+
+ replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
+
+ }
+ if (replDomain != null)
+ {
+ debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning());
+ }
+ }
+ catch(Exception e)
+ {
+ debugInfo("connectToReplServer", e);
+ fail("connectToReplServer", e);
+ }
+ }
+
+ /*
+ * Disconnect DS from the replicationServer
+ */
+ private void disconnectFromReplServer(short changelogID)
+ {
+ try
+ {
+ // suffix synchronized
+ String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," +
+ synchroPluginStringDN;
+ if (synchroServerEntry != null)
+ {
+ DN synchroServerDN = DN.decode(synchroServerStringDN);
+ DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null);
+ assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null,
+ "Unable to delete the synchronized domain");
+ synchroServerEntry = null;
+
+ configEntryList.remove(configEntryList.indexOf(synchroServerDN));
+ }
+ }
+ catch(Exception e)
+ {
+ fail("disconnectFromReplServer", e);
+ }
+ }
+
+ private int getChangelogPort(short changelogID)
+ {
+ if (replServerPort[changelogID] == 0)
+ {
+ try
+ {
+ // Find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ replServerPort[changelogID] = socket.getLocalPort();
+ socket.close();
+ }
+ catch(Exception e)
+ {
+ fail("Cannot retrieve a free port for replication server."
+ + e.getMessage());
+ }
+ }
+ return replServerPort[changelogID];
+ }
+
+ protected static final String REPLICATION_GENERATION_ID =
+ "ds-sync-generation-id";
+
+ private long readGenId()
+ {
+ long genId=-1;
+ try
+ {
+ Entry resultEntry = getEntry(baseDn, 1000, true);
+ if (resultEntry==null)
+ {
+ debugInfo("Entry not found <" + baseDn + ">");
+ }
+ else
+ {
+ debugInfo("Entry found <" + baseDn + ">");
+
+ AttributeType synchronizationGenIDType =
+ DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
+ List<Attribute> attrs =
+ resultEntry.getAttribute(synchronizationGenIDType);
+ if (attrs != null)
+ {
+ Attribute attr = attrs.get(0);
+ LinkedHashSet<AttributeValue> values = attr.getValues();
+ if (values.size() == 1)
+ {
+ genId = Long.decode(values.iterator().next().getStringValue());
+ }
+ }
+
+ }
+ }
+ catch(Exception e)
+ {
+ fail("Exception raised in readGenId", e);
+ }
+ return genId;
+ }
+
+ private Entry getTaskImport()
+ {
+ Entry task = null;
+
+ try
+ {
+ // Create a temporary test LDIF file.
+ ldifFile = File.createTempFile("import-test", ".ldif");
+ String resourcePath = DirectoryServer.getServerRoot() + File.separator +
+ "config" + File.separator + "MakeLDIF";
+ LdifFileWriter.makeLdif(ldifFile.getPath(), resourcePath, template);
+ // Create a temporary rejects file.
+ rejectFile = File.createTempFile("import-test-rejects", ".ldif");
+
+ task = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-import",
+ "ds-task-class-name: org.opends.server.tasks.ImportTask",
+ "ds-task-import-backend-id: userRoot",
+ "ds-task-import-ldif-file: " + ldifFile.getPath(),
+ "ds-task-import-reject-file: " + rejectFile.getPath(),
+ "ds-task-import-overwrite-rejects: TRUE",
+ "ds-task-import-exclude-attribute: description"
+ );
+ }
+ catch(Exception e)
+ {
+ }
+ return task;
+ }
+
+ private String createEntry(UUID uid)
+ {
+ String user2dn = "uid=user"+uid+",ou=People," + baseDnStr;
+ return new String(
+ "dn: "+ user2dn + "\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n"
+ + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n");
+ }
+
+ static protected ReplicationMessage createAddMsg()
+ {
+ Entry personWithUUIDEntry = null;
+ String user1entryUUID;
+ String baseUUID = null;
+ String user1dn;
+
+ /*
+ * Create a Change number generator to generate new changenumbers
+ * when we need to send operation messages to the replicationServer.
+ */
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
+
+ user1entryUUID = "33333333-3333-3333-3333-333333333333";
+ user1dn = "uid=user1,ou=People," + baseDnStr;
+ String entryWithUUIDldif = "dn: "+ user1dn + "\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+ + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n"
+ + "entryUUID: " + user1entryUUID + "\n";
+
+ try
+ {
+ personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
+ }
+ catch(Exception e)
+ {
+ fail(e.getMessage());
+ }
+
+ // Create and publish an update message to add an entry.
+ AddMsg addMsg = new AddMsg(gen.newChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID,
+ baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+
+ return addMsg;
+ }
+
+ /**
+ * SingleRS tests basic features of generationID
+ * with one single Replication Server.
+ *
+ * @throws Exception
+ */
+ @Test(enabled=true)
+ public void testSingleRS() throws Exception
+ {
+ String testCase = "testSingleRS";
+ debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled());
+
+ debugInfo(testCase + " Clearing DS1 backend");
+ ReplicationDomain.clearJEBackend(false,
+ "userRoot",
+ baseDn.toNormalizedString());
+
+ try
+ {
+ long rgenId;
+ long genId;
+
+ replServer1 = createReplicationServer(changelog1ID, false, testCase);
+ assertEquals(replServer1.getGenerationId(baseDn), -1);
+
+ /*
+ * Test : empty replicated backend
+ * Check : nothing is broken - no generationId generated
+ */
+
+ // Connect DS to RS with no data
+ // Read generationId - should be not retrievable since no entry
+ debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")");
+ connectToReplServer(changelog1ID);
+ Thread.sleep(1000);
+
+ debugInfo(testCase + " Expect genId attribute to be not retrievable");
+ genId = readGenId();
+ assertEquals(genId,-1);
+
+ debugInfo(testCase + " Expect genId to be set in memory on the replication " +
+ " server side even if not wrote on disk/db since no change occured.");
+ rgenId = replServer1.getGenerationId(baseDn);
+ assertEquals(rgenId, 3211313L);
+
+ debugInfo(testCase + " Disconnecting DS1 from replServer1(" + changelog1ID + ")");
+ disconnectFromReplServer(changelog1ID);
+
+ /*
+ * Test : non empty replicated backend
+ * Check : generationId correctly generated
+ */
+
+ // Now disconnect - create entries and reconnect
+ // Test that generation has been added to the data.
+ debugInfo(testCase + " add test entries to DS");
+ this.addTestEntriesToDB(updatedEntries);
+ connectToReplServer(changelog1ID);
+
+ // Test that the generationId is written in the DB in the
+ // root entry on the replica side
+ genId = readGenId();
+ assertTrue(genId != -1);
+ assertTrue(genId != 3211313L);
+
+ // Test that the generationId is set on the replication server side
+ rgenId = replServer1.getGenerationId(baseDn);
+ assertEquals(genId, rgenId);
+
+ /*
+ * Test : Connection from 2nd broker with a different generationId
+ * Check: We should receive an error message
+ */
+
+ try
+ {
+ broker2 = openReplicationSession(baseDn,
+ server2ID, 100, getChangelogPort(changelog1ID),
+ 1000, !emptyOldChanges, genId+1);
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+ try
+ {
+ ReplicationMessage msg = broker2.receive();
+ if (!(msg instanceof ErrorMessage))
+ {
+ fail("Broker connection is expected to receive an ErrorMessage."
+ + msg);
+ }
+ ErrorMessage emsg = (ErrorMessage)msg;
+ debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
+ }
+ catch(SocketTimeoutException se)
+ {
+ fail("Broker is expected to receive an ErrorMessage.");
+ }
+
+ /*
+ * Test : Connect with same generationId
+ * Check : Must be accepted.
+ */
+ try
+ {
+ broker3 = openReplicationSession(baseDn,
+ server3ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId);
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+
+ /*
+ * Test : generationID persistence in Replication server
+ * Shutdown/Restart Replication Server and redo connections
+ * with valid and invalid generationId
+ * Check : same expected connections results
+ */
+
+ // The changes from broker2 should be ignored
+ broker2.publish(createAddMsg());
+
+ try
+ {
+ broker3.receive();
+ fail("No update message is supposed to be received here.");
+ }
+ catch(SocketTimeoutException e)
+ {
+ // This is the expected result
+ }
+
+ // Now create a change that must be replicated
+ String ent1[] = { createEntry(UUID.randomUUID()) };
+ this.addTestEntriesToDB(ent1);
+
+ try
+ {
+ ReplicationMessage msg = broker3.receive();
+ debugInfo("Broker 3 received expected update msg" + msg);
+ }
+ catch(SocketTimeoutException e)
+ {
+ fail("Update message is supposed to be received.");
+ }
+
+ long genIdBeforeShut = replServer1.getGenerationId(baseDn);
+
+ debugInfo("Shutdown replServer1");
+ broker2.stop();
+ broker2 = null;
+ broker3.stop();
+ broker3 = null;
+ replServer1.shutdown();
+ replServer1 = null;
+
+ debugInfo("Create again replServer1");
+ replServer1 = createReplicationServer(changelog1ID, false, testCase);
+ debugInfo("Delay to allow DS to reconnect to replServer1");
+ Thread.sleep(200);
+
+ long genIdAfterRestart = replServer1.getGenerationId(baseDn);
+ debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
+ assertTrue(replServer1!=null, "Replication server creation failed.");
+ assertTrue(genIdBeforeShut == genIdAfterRestart,
+ "generationId is expected to have the same value after replServer1 restart");
+
+ try
+ {
+ debugInfo("Create again broker2");
+ broker2 = openReplicationSession(baseDn,
+ server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId);
+ assertTrue(broker2.isConnected(), "Broker2 failed to connect to replication server");
+
+ debugInfo("Create again broker3");
+ broker3 = openReplicationSession(baseDn,
+ server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId);
+ assertTrue(broker3.isConnected(), "Broker3 failed to connect to replication server");
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+
+ /*
+ *
+ * FIXME Should clearJEBackend() regenerate generationId and do a start
+ * against ReplicationServer ?
+ */
+
+ /*
+ * Test: Reset the replication server in order to allow new data set.
+ */
+
+ Entry taskReset = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=resetgenid"+genId+ UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-reset-generation-id",
+ "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask",
+ "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr);
+
+ debugInfo("Reset generationId");
+ addTask(taskReset, ResultCode.SUCCESS, null);
+ waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
+ Thread.sleep(200);
+
+ // TODO: Test that replication server db has been cleared
+
+ assertEquals(replServer1.getGenerationId(baseDn),
+ -1, "Expected genId to be reset in replServer1");
+
+ ReplicationMessage rcvmsg = broker2.receive();
+ if (!(rcvmsg instanceof ErrorMessage))
+ {
+ fail("Broker2 is expected to receive an ErrorMessage " +
+ " to signal degradation due to reset" + rcvmsg);
+ }
+ ErrorMessage emsg = (ErrorMessage)rcvmsg;
+ debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
+
+ rcvmsg = broker3.receive();
+ if (!(rcvmsg instanceof ErrorMessage))
+ {
+ fail("Broker3 is expected to receive an ErrorMessage " +
+ " to signal degradation due to reset" + rcvmsg);
+ }
+ emsg = (ErrorMessage)rcvmsg;
+ debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
+
+ rgenId = replServer1.getGenerationId(baseDn);
+ assertTrue(rgenId==-1,"Expecting that genId has been reset in replServer1: rgenId="+rgenId);
+
+ assertTrue(replServer1.getReplicationCache(baseDn, false).
+ isDegradedDueToGenerationId(server1ID),
+ "Expecting that DS is degraded since domain genId has been reset");
+
+ assertTrue(replServer1.getReplicationCache(baseDn, false).
+ isDegradedDueToGenerationId(server2ID),
+ "Expecting that broker2 is degraded since domain genId has been reset");
+ assertTrue(replServer1.getReplicationCache(baseDn, false).
+ isDegradedDueToGenerationId(server3ID),
+ "Expecting that broker3 is degraded since domain genId has been reset");
+
+
+ // Now create a change that normally would be replicated
+ // but will not be replicated here since DS and brokers are degraded
+ String[] ent2 = { createEntry(UUID.randomUUID()) };
+ this.addTestEntriesToDB(ent2);
+
+ try
+ {
+ ReplicationMessage msg = broker2.receive();
+ fail("No update message is supposed to be received by degraded broker2" + msg);
+ } catch(SocketTimeoutException e) { /* expected */ }
+
+ try
+ {
+ ReplicationMessage msg = broker3.receive();
+ fail("No update message is supposed to be received by degraded broker3"+ msg);
+ } catch(SocketTimeoutException e) { /* expected */ }
+
+ debugInfo("broker2 is publishing a change, " +
+ "replServer1 expected to ignore this change.");
+ broker2.publish(createAddMsg());
+ try
+ {
+ ReplicationMessage msg = broker3.receive();
+ fail("No update message is supposed to be received by degraded broker3"+ msg);
+ } catch(SocketTimeoutException e) { /* expected */ }
+
+
+ debugInfo("Launch an on-line import on DS.");
+ genId=-1;
+ Entry importTask = getTaskImport();
+ addTask(importTask, ResultCode.SUCCESS, null);
+ waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null);
+ Thread.sleep(500);
+
+ debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
+ genId = readGenId();
+ assertTrue(genId != -1, "DS is expected to have a new genID computed " +
+ " after on-line import but genId=" + genId);
+
+ rgenId = replServer1.getGenerationId(baseDn);
+ assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
+
+ // In S1 launch the total update to initialize S2
+ addTask(taskInitRemoteS2, ResultCode.SUCCESS, null);
+
+ // S2 should be re-initialized and have a new valid genId
+ int receivedEntriesNb = this.receiveImport(broker2, server2ID, null);
+ debugInfo("broker2 has been initialized from DS with #entries=" + receivedEntriesNb);
+
+ debugInfo("Adding reset task to DS.");
+ taskReset = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=resetgenid"+ UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-reset-generation-id",
+ "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask",
+ "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr);
+
+ addTask(taskReset, ResultCode.SUCCESS, null);
+ waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
+ Thread.sleep(200);
+
+ debugInfo("Verifying that replServer1 has been reset.");
+ assertEquals(replServer1.getGenerationId(baseDn), -1);
+
+ debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
+ disconnectFromReplServer(changelog1ID);
+
+ postTest();
+
+ debugInfo(testCase + " Clearing DS backend");
+ ReplicationDomain.clearJEBackend(false,
+ replDomain.getBackend().getBackendID(),
+ baseDn.toNormalizedString());
+
+ // At this moment, root entry of the domain has been removed so
+ // genId is no more in the database ... but it has still the old
+ // value in memory.
+ int found = testEntriesInDb();
+ replDomain.loadGenerationId();
+
+ debugInfo("Successfully ending " + testCase);
+ }
+ catch(Exception e)
+ {
+ fail(testCase + " Exception:"+ e.getMessage() + " " +
+ stackTraceToSingleLineString(e));
+ }
+ }
+
+ /**
+ /**
+ * SingleRS tests basic features of generationID
+ * with more than one Replication Server.
+ * The following test focus on:
+ * - genId checking accross multiple starting RS (replication servers)
+ * - genId setting propagation from one RS to the others
+ * - genId reset propagation from one RS to the others
+ */
+ @Test(enabled=true)
+ public void testMultiRS() throws Exception
+ {
+ String testCase = "testMultiRS";
+ long genId;
+
+ debugInfo("Starting " + testCase);
+
+ ReplicationDomain.clearJEBackend(false,
+ "userRoot",
+ baseDn.toNormalizedString());
+
+ debugInfo ("Creating 3 RS");
+ replServer1 = createReplicationServer(changelog1ID, true, testCase);
+ replServer2 = createReplicationServer(changelog2ID, true, testCase);
+ replServer3 = createReplicationServer(changelog3ID, true, testCase);
+ Thread.sleep(500);
+
+ debugInfo("Connecting DS to replServer1");
+ connectToReplServer(changelog1ID);
+ Thread.sleep(1500);
+
+ debugInfo("Expect genId are set in all replServers.");
+ assertEquals(replServer1.getGenerationId(baseDn), 3211313L, " in replServer1");
+ assertEquals(replServer2.getGenerationId(baseDn), 3211313L, " in replServer2");
+ assertEquals(replServer3.getGenerationId(baseDn), 3211313L, " in replServer3");
+
+ debugInfo("Disconnect DS from replServer1.");
+ disconnectFromReplServer(changelog1ID);
+ Thread.sleep(1000);
+
+ debugInfo("Expect genId to be unset(-1) in all servers since no server is " +
+ " connected and no change ever occured");
+ assertEquals(replServer1.getGenerationId(baseDn), -1, " in replServer1");
+ assertEquals(replServer2.getGenerationId(baseDn), -1, " in replServer2");
+ assertEquals(replServer3.getGenerationId(baseDn), -1, " in replServer3");
+
+ debugInfo("Add entries to DS");
+ this.addTestEntriesToDB(updatedEntries);
+
+ debugInfo("Connecting DS to replServer2");
+ connectToReplServer(changelog2ID);
+ Thread.sleep(1000);
+
+ debugInfo("Expect genIds to be set in all servers based on the added entries.");
+ genId = readGenId();
+ assertTrue(genId != -1);
+ assertEquals(replServer1.getGenerationId(baseDn), genId);
+ assertEquals(replServer2.getGenerationId(baseDn), genId);
+ assertEquals(replServer3.getGenerationId(baseDn), genId);
+
+ debugInfo("Connecting broker2 to replServer3 with a good genId");
+ try
+ {
+ broker2 = openReplicationSession(baseDn,
+ server2ID, 100, getChangelogPort(changelog3ID),
+ 1000, !emptyOldChanges, genId);
+ Thread.sleep(1000);
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+
+ debugInfo("Expecting that broker2 is not degraded since it has a correct genId");
+ assertTrue(!replServer1.getReplicationCache(baseDn, false).
+ isDegradedDueToGenerationId(server2ID));
+
+ debugInfo("Disconnecting DS from replServer1");
+ disconnectFromReplServer(changelog1ID);
+
+ debugInfo("Expect all genIds to keep their value since broker2 is still connected.");
+ assertEquals(replServer1.getGenerationId(baseDn), genId);
+ assertEquals(replServer2.getGenerationId(baseDn), genId);
+ assertEquals(replServer3.getGenerationId(baseDn), genId);
+
+ debugInfo("Connecting broker2 to replServer1 with a bad genId");
+ try
+ {
+ long badgenId=1;
+ broker3 = openReplicationSession(baseDn,
+ server3ID, 100, getChangelogPort(changelog1ID),
+ 1000, !emptyOldChanges, badgenId);
+ Thread.sleep(1000);
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+
+ debugInfo("Expecting that broker3 is degraded since it has a bad genId");
+ assertTrue(replServer1.getReplicationCache(baseDn, false).
+ isDegradedDueToGenerationId(server3ID));
+
+ int found = testEntriesInDb();
+ assertEquals(found, updatedEntries.length,
+ " Entries present in DB :" + found +
+ " Expected entries :" + updatedEntries.length);
+
+ debugInfo("Connecting DS to replServer1.");
+ connectToReplServer(changelog1ID);
+ Thread.sleep(1000);
+
+
+ debugInfo("Adding reset task to DS.");
+ Entry taskReset = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=resetgenid"+ UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-reset-generation-id",
+ "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask",
+ "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr);
+ addTask(taskReset, ResultCode.SUCCESS, null);
+ waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
+ Thread.sleep(500);
+
+ debugInfo("Verifying that all replservers genIds have been reset.");
+ genId = readGenId();
+ assertEquals(replServer1.getGenerationId(baseDn), -1);
+ assertEquals(replServer2.getGenerationId(baseDn), -1);
+ assertEquals(replServer3.getGenerationId(baseDn), -1);
+
+ debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
+ disconnectFromReplServer(changelog1ID);
+
+ debugInfo("Cleaning entries");
+ postTest();
+
+ debugInfo("Successfully ending " + testCase);
+ }
+
+ /**
+ * Disconnect broker and remove entries from the local DB
+ * @throws Exception
+ */
+ protected void postTest()
+ {
+ debugInfo("Post test cleaning.");
+
+ // Clean brokers
+ if (broker2 != null)
+ broker2.stop();
+ broker2 = null;
+ if (broker3 != null)
+ broker3.stop();
+ broker3 = null;
+
+ if (replServer1 != null)
+ replServer1.shutdown();
+ if (replServer2 != null)
+ replServer2.shutdown();
+ if (replServer2 != null)
+ replServer2.shutdown();
+ replServer1 = null;
+ replServer2 = null;
+ replServer3 = null;
+
+ super.cleanRealEntries();
+
+ try
+ {
+ ReplicationDomain.clearJEBackend(false,
+ replDomain.getBackend().getBackendID(),
+ baseDn.toNormalizedString());
+
+ // At this moment, root entry of the domain has been removed so
+ // genId is no more in the database ... but it has still the old
+ // value in memory.
+ testEntriesInDb();
+ replDomain.loadGenerationId();
+ }
+ catch (Exception e) {}
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index a119ab3..12ddbfc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -48,6 +48,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
+import java.net.SocketTimeoutException;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
@@ -124,14 +125,14 @@
boolean ssShutdownRequested = false;
protected String[] updatedEntries;
boolean externalDS = false;
- private static final short server1ID = 11;
- private static final short server2ID = 21;
- private static final short server3ID = 31;
- private static final short changelog1ID = 1;
- private static final short changelog2ID = 2;
- private static final short changelog3ID = 3;
+ private static final short server1ID = 1;
+ private static final short server2ID = 2;
+ private static final short server3ID = 3;
+ private static final short changelog1ID = 8;
+ private static final short changelog2ID = 9;
+ private static final short changelog3ID = 10;
- private static int[] replServerPort = new int[4];
+ private static int[] replServerPort = new int[20];
private DN baseDn;
ReplicationBroker server2 = null;
@@ -140,7 +141,7 @@
ReplicationServer changelog2 = null;
ReplicationServer changelog3 = null;
boolean emptyOldChanges = true;
- ReplicationDomain sd = null;
+ ReplicationDomain replDomain = null;
private void log(String s)
{
@@ -169,7 +170,6 @@
// This test suite depends on having the schema available.
TestCaseUtils.startServer();
-
baseDn = DN.decode("dc=example,dc=com");
updatedEntries = newLDIFEntries();
@@ -756,9 +756,14 @@
servers.add("localhost:" + getChangelogPort(changelog2ID));
servers.add("localhost:" + getChangelogPort(changelog3ID));
- int chPort = getChangelogPort(changelogId);
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
+ new ReplServerFakeConfiguration(
+ getChangelogPort(changelogId),
+ "rsdbdirname" + getChangelogPort(changelogId),
+ 0,
+ changelogId,
+ 0,
+ 100,
servers);
ReplicationServer replicationServer = new ReplicationServer(conf);
Thread.sleep(1000);
@@ -804,17 +809,18 @@
assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
"Unable to add the synchronized server");
- sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
+ replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
// Clear the backend
ReplicationDomain.clearJEBackend(false,
- sd.getBackend().getBackendID(),
+ replDomain.getBackend().getBackendID(),
baseDn.toNormalizedString());
}
- if (sd != null)
+ if (replDomain != null)
{
- log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+ assertTrue(!replDomain.ieRunning(),
+ "ReplicationDomain: Import/Export is not expected to be running");
}
}
catch(Exception e)
@@ -890,7 +896,7 @@
// Test import result in S1
testEntriesInDb();
- cleanEntries();
+ afterTest();
log("Successfully ending " + testCase);
}
@@ -929,7 +935,7 @@
receiveUpdatedEntries(server2, server2ID, updatedEntries);
- cleanEntries();
+ afterTest();
log("Successfully ending "+testCase);
}
@@ -968,7 +974,7 @@
// Tests that entries have been received by S2
receiveUpdatedEntries(server2, server2ID, updatedEntries);
- cleanEntries();
+ afterTest();
log("Successfully ending " + testCase);
@@ -1012,7 +1018,7 @@
receiveUpdatedEntries(server2, server2ID, updatedEntries);
receiveUpdatedEntries(server3, server3ID, updatedEntries);
- cleanEntries();
+ afterTest();
log("Successfully ending " + testCase);
@@ -1049,7 +1055,7 @@
// Test that entries have been imported in S1
testEntriesInDb();
- cleanEntries();
+ afterTest();
log("Successfully ending " + testCase);
}
@@ -1103,7 +1109,7 @@
// Scope containing a serverID absent from the domain
// createTask(taskInitTargetS2);
- cleanEntries();
+ afterTest();
log("Successfully ending " + testCase);
}
@@ -1172,7 +1178,7 @@
// Scope containing a serverID absent from the domain
// createTask(taskInitTargetS2);
- cleanEntries();
+ afterTest();
log("Successfully ending " + testCase);
}
@@ -1231,25 +1237,25 @@
// Check that the list of connected LDAP servers is correct
// in each replication servers
- List<String> l1 = changelog1.getReplicationCache(baseDn).
+ List<String> l1 = changelog1.getReplicationCache(baseDn, false).
getConnectedLDAPservers();
assertEquals(l1.size(), 1);
assertEquals(l1.get(0), String.valueOf(server1ID));
List<String> l2;
- l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+ l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
assertEquals(l2.size(), 2);
assertEquals(l2.get(0), String.valueOf(server3ID));
assertEquals(l2.get(1), String.valueOf(server2ID));
List<String> l3;
- l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
+ l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers();
assertEquals(l3.size(), 0);
// Test updates
broker3.stop();
Thread.sleep(1000);
- l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+ l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
assertEquals(l2.size(), 1);
assertEquals(l2.get(0), String.valueOf(server2ID));
@@ -1257,7 +1263,7 @@
server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
broker2.stop();
Thread.sleep(1000);
- l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+ l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
assertEquals(l2.size(), 1);
assertEquals(l2.get(0), String.valueOf(server3ID));
@@ -1266,7 +1272,7 @@
broker2.stop();
broker3.stop();
- cleanEntries();
+ afterTest();
changelog3.shutdown();
changelog3 = null;
@@ -1313,7 +1319,7 @@
// Tests that entries have been received by S2
receiveUpdatedEntries(server2, server2ID, updatedEntries);
- cleanEntries();
+ afterTest();
changelog2.shutdown();
changelog2 = null;
@@ -1335,42 +1341,64 @@
changelog2 = createChangelogServer(changelog2ID);
Thread.sleep(1000);
- changelog3 = createChangelogServer(changelog3ID);
- Thread.sleep(1000);
-
// Connect DS to the replicationServer 1
connectServer1ToChangelog(changelog1ID);
// Put entries in DB
+ log(testCase + " Will add entries");
addTestEntriesToDB();
// Connect a broker acting as server 2 to Repl Server 2
if (server2 == null)
{
+ log(testCase + " Will connect server 2 to " + changelog2ID);
server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
server2ID, 100, getChangelogPort(changelog2ID),
- 1000, emptyOldChanges);
+ 1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
}
// Connect a broker acting as server 3 to Repl Server 3
+ log(testCase + " Will create replServer " + changelog3ID);
+ changelog3 = createChangelogServer(changelog3ID);
+ Thread.sleep(500);
if (server3 == null)
{
+ log(testCase + " Will connect server 3 to " + changelog3ID);
server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
server3ID, 100, getChangelogPort(changelog3ID),
- 1000, emptyOldChanges);
+ 1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
}
- Thread.sleep(3000);
+ Thread.sleep(500);
- // S2 sends init request
+ // S3 sends init request
+ log(testCase + " server 3 Will send reqinit to " + server1ID);
InitializeRequestMessage initMsg =
- new InitializeRequestMessage(baseDn, server2ID, server1ID);
- server2.publish(initMsg);
+ new InitializeRequestMessage(baseDn, server3ID, server1ID);
+ server3.publish(initMsg);
- // S2 should receive target, entries & done
- receiveUpdatedEntries(server2, server2ID, updatedEntries);
+ // S3 should receive target, entries & done
+ log(testCase + " Will verify server 3 has received expected entries");
+ receiveUpdatedEntries(server3, server3ID, updatedEntries);
- cleanEntries();
+ while(true)
+ {
+ try
+ {
+ ReplicationMessage msg = server3.receive();
+ fail("Receive unexpected message " + msg);
+ }
+ catch(SocketTimeoutException e)
+ {
+ // Test is a success
+ break;
+ }
+ }
+
+ afterTest();
+
+ changelog3.shutdown();
+ changelog3 = null;
changelog2.shutdown();
changelog2 = null;
@@ -1419,9 +1447,10 @@
addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
- if (sd != null)
+ if (replDomain != null)
{
- log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+ assertTrue(!replDomain.ieRunning(),
+ "ReplicationDomain: Import/Export is not expected to be running");
}
log("Successfully ending "+testCase);
@@ -1458,9 +1487,10 @@
waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
- if (sd != null)
+ if (replDomain != null)
{
- log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+ assertTrue(!replDomain.ieRunning(),
+ "ReplicationDomain: Import/Export is not expected to be running");
}
log("Successfully ending "+testCase);
@@ -1554,7 +1584,7 @@
waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
null);
- cleanEntries();
+ afterTest();
log("Successfully ending "+testCase);
@@ -1563,23 +1593,30 @@
/**
* Disconnect broker and remove entries from the local DB
*/
- protected void cleanEntries()
+ protected void afterTest()
{
- if (sd != null)
+ if (replDomain != null)
{
- log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+ assertTrue(!replDomain.ieRunning(),
+ "ReplicationDomain: Import/Export is not expected to be running");
}
// Clean brokers
if (server2 != null)
{
server2.stop();
-
TestCaseUtils.sleep(100); // give some time to the broker to disconnect
// from the replicationServer.
server2 = null;
}
+ if (server3 != null)
+ {
+ server3.stop();
+ TestCaseUtils.sleep(100); // give some time to the broker to disconnect
+ // from the replicationServer.
+ server3 = null;
+ }
super.cleanRealEntries();
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index 6557146..2de4b70 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -49,6 +49,7 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMessage;
@@ -349,9 +350,10 @@
ProtocolVersion.setCurrentVersion((short)2);
ReplicationBroker broker = new ReplicationBroker(
- new ServerState(),
- baseDn,
+ new ServerState(),
+ baseDn,
(short) 13, 0, 0, 0, 0, 1000, 0,
+ ReplicationTestCase.getGenerationId(baseDn),
getReplSessionSecurity());
@@ -369,5 +371,11 @@
// Check broker negociated version
pversion = broker.getProtocolVersion();
assertEquals(pversion, 0);
- }
+
+ broker.stop();
+
+ logError(Message.raw(
+ Category.SYNC, Severity.INFORMATION,
+ "Ending Replication ProtocolWindowTest : protocolVersion"));
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 6bf5821..93193b2 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,13 +28,13 @@
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
-import org.opends.server.config.ConfigException;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -42,13 +42,14 @@
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
-import org.opends.messages.MessageBuilder;
-import org.opends.messages.Message;
-import org.opends.messages.Category;
-import org.opends.messages.Severity;
import org.opends.server.backends.task.TaskState;
+import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
@@ -58,7 +59,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.types.Attribute;
@@ -136,13 +140,50 @@
}
/**
- * Open a replicationServer session to the local ReplicationServer.
+ * Retrieves the domain associated to the baseDn, and the value of the generationId
+ * of this domain. If the domain does not exist, returns the default hard-coded\
+ * value of the generationId corresponding to 'no entry'.
*
+ * @param baseDn The baseDn for which we want the generationId
+ * @return The value of the generationId.
+ */
+ static protected long getGenerationId(DN baseDn)
+ {
+ // This is the value of the generationId computed by the server when the
+ // suffix is empty.
+ long genId = 3276850;
+ try
+ {
+ ReplicationDomain replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
+ genId = replDomain.getGenerationId();
+ }
+ catch(Exception e) {}
+ return genId;
+ }
+
+ /**
+ * Open a replicationServer session to the local ReplicationServer.
+ * The generation is read from the replicationDomain object. If it
+ * does not exist, take the 'empty backend' generationID.
*/
protected ReplicationBroker openReplicationSession(
final DN baseDn, short serverId, int window_size,
int port, int timeout, boolean emptyOldChanges)
- throws Exception
+ throws Exception, SocketException
+ {
+ return openReplicationSession(baseDn, serverId, window_size,
+ port, timeout, emptyOldChanges, getGenerationId(baseDn));
+ }
+
+ /**
+ * Open a replicationServer session to the local ReplicationServer
+ * providing the generationId.
+ */
+ protected ReplicationBroker openReplicationSession(
+ final DN baseDn, short serverId, int window_size,
+ int port, int timeout, boolean emptyOldChanges,
+ long generationId)
+ throws Exception, SocketException
{
ServerState state;
if (emptyOldChanges)
@@ -151,8 +192,8 @@
state = new ServerState();
ReplicationBroker broker = new ReplicationBroker(
- state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
- getReplSessionSecurity());
+ state, baseDn, serverId, 0, 0, 0, 0,
+ window_size, 0, generationId, getReplSessionSecurity());
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
broker.start(servers);
@@ -170,7 +211,15 @@
{
while (true)
{
- broker.receive();
+ ReplicationMessage rMsg = broker.receive();
+ if (rMsg instanceof ErrorMessage)
+ {
+ ErrorMessage eMsg = (ErrorMessage)rMsg;
+ logError(new MessageBuilder(
+ "ReplicationTestCase/openReplicationSession ").append(
+ " received ErrorMessage when emptying old changes ").append(
+ eMsg.getDetails()).toMessage());
+ }
}
}
catch (Exception e)
@@ -184,16 +233,30 @@
}
/**
+ * Open a replicationServer session to the local ReplicationServer
+ * with a default value generationId.
+ *
+ */
+ protected ReplicationBroker openReplicationSession(
+ final DN baseDn, short serverId, int window_size,
+ int port, int timeout, ServerState state)
+ throws Exception, SocketException
+ {
+ return openReplicationSession(baseDn, serverId, window_size,
+ port, timeout, state, getGenerationId(baseDn));
+ }
+
+ /**
* Open a new session to the ReplicationServer
* starting with a given ServerState.
*/
protected ReplicationBroker openReplicationSession(
final DN baseDn, short serverId, int window_size,
- int port, int timeout, ServerState state)
- throws Exception
+ int port, int timeout, ServerState state, long generationId)
+ throws Exception, SocketException
{
ReplicationBroker broker = new ReplicationBroker(
- state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
+ state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, generationId,
getReplSessionSecurity());
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
@@ -213,7 +276,18 @@
final DN baseDn, short serverId, int window_size,
int port, int timeout, int maxSendQueue, int maxRcvQueue,
boolean emptyOldChanges)
- throws Exception
+ throws Exception, SocketException
+ {
+ return openReplicationSession(baseDn, serverId, window_size,
+ port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
+ getGenerationId(baseDn));
+ }
+
+ protected ReplicationBroker openReplicationSession(
+ final DN baseDn, short serverId, int window_size,
+ int port, int timeout, int maxSendQueue, int maxRcvQueue,
+ boolean emptyOldChanges, long generationId)
+ throws Exception, SocketException
{
ServerState state;
if (emptyOldChanges)
@@ -223,7 +297,7 @@
ReplicationBroker broker = new ReplicationBroker(
state, baseDn, serverId, maxRcvQueue, 0,
- maxSendQueue, 0, window_size, 0,
+ maxSendQueue, 0, window_size, 0, generationId,
getReplSessionSecurity());
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
@@ -240,7 +314,15 @@
{
while (true)
{
- broker.receive();
+ ReplicationMessage rMsg = broker.receive();
+ if (rMsg instanceof ErrorMessage)
+ {
+ ErrorMessage eMsg = (ErrorMessage)rMsg;
+ logError(new MessageBuilder(
+ "ReplicationTestCase/openReplicationSession ").append(
+ " received ErrorMessage when emptying old changes ").append(
+ eMsg.getDetails()).toMessage());
+ }
}
}
catch (Exception e)
@@ -264,13 +346,22 @@
while (true)
{
DN dn = configEntryList.removeLast();
- logError(Message.raw(Category.SYNC, Severity.NOTICE,
+
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
"cleaning config entry " + dn));
op = new DeleteOperationBasis(connection, InternalClientConnection
.nextOperationID(), InternalClientConnection.nextMessageID(), null,
dn);
op.run();
+ if ((op.getResultCode() != ResultCode.SUCCESS) &&
+ (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ "ReplicationTestCase/Cleaning config entries" +
+ "DEL " + dn +
+ " failed " + op.getResultCode().getResultCodeName()));
+ }
}
}
catch (NoSuchElementException e) {
@@ -293,14 +384,23 @@
while (true)
{
DN dn = entryList.removeLast();
- logError(Message.raw(Category.SYNC, Severity.NOTICE,
- "cleaning entry " + dn));
- op = new DeleteOperationBasis(connection, InternalClientConnection
- .nextOperationID(), InternalClientConnection.nextMessageID(), null,
- dn);
+ op = new DeleteOperationBasis(connection,
+ InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(),
+ null,
+ dn);
op.run();
+
+ if ((op.getResultCode() != ResultCode.SUCCESS) &&
+ (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ "ReplicationTestCase/Cleaning entries" +
+ "DEL " + dn +
+ " failed " + op.getResultCode().getResultCodeName()));
+ }
}
}
catch (NoSuchElementException e) {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
index 246430d..67c5904 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -158,7 +158,7 @@
ResultCode code = modOp.getResultCode();
assertTrue(code.equals(ResultCode.SUCCESS),
- "The original operation failed");
+ "The original operation failed: " + code.getResultCodeName());
// See if the client has received the msg
ReplicationMessage msg = broker.receive();
@@ -217,7 +217,7 @@
public void replaySchemaChange() throws Exception
{
logError(Message.raw(Category.SYNC, Severity.NOTICE,
- "Starting replication test : pushSchemaChange "));
+ "Starting replication test : replaySchemaChange "));
final DN baseDn = DN.decode("cn=schema");
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java
index 18a1b21..7a3284b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java
@@ -110,7 +110,8 @@
// chek that the operation was successful.
// check that the update failed.
- assertEquals(ResultCode.SUCCESS, op.getResultCode());
+ assertEquals(op.getResultCode(), ResultCode.SUCCESS,
+ op.getAdditionalLogMessage().toString());
}
finally
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
index f461e1f..7b7a232 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -107,8 +107,8 @@
ChangeNumber cn1 = gen1.newChangeNumber();
ChangeNumber cn2 = gen2.newChangeNumber();
- state.update(cn1);
- state.update(cn2);
+ assertEquals(state.update(cn1), true);
+ assertEquals(state.update(cn2), true);
state.save();
@@ -120,6 +120,12 @@
"cn1 has not been saved or loaded correctly for " + dn);
assertEquals(cn2Saved, cn2,
"cn2 has not been saved or loaded correctly for " + dn);
+
+ state.clear();
+ stateSaved = new PersistentServerState(baseDn);
+ cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
+ assertEquals(cn1Saved, null,
+ "cn1 has not been saved after clear for " + dn);
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 1866047..5d09572 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -493,7 +493,8 @@
{
state.update(new ChangeNumber((long)1, 1,(short)1));
ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
- window, window, window, window, window, window, state, (short)1, true);
+ window, window, window, window, window, window, state, (short)1,
+ (long)1, true);
ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -507,6 +508,7 @@
assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
newMsg.getServerState().getMaxChangeNumber((short)1));
assertEquals(msg.getVersion(), newMsg.getVersion());
+ assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
}
@DataProvider(name="changelogStart")
@@ -527,7 +529,7 @@
{
state.update(new ChangeNumber((long)1, 1,(short)1));
ReplServerStartMessage msg = new ReplServerStartMessage(serverId,
- url, baseDN, window, state, (short)1, true);
+ url, baseDN, window, state, (short)1, (long)1, true);
ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes());
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getServerURL(), newMsg.getServerURL());
@@ -536,6 +538,7 @@
assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
newMsg.getServerState().getMaxChangeNumber((short)1));
assertEquals(msg.getVersion(), newMsg.getVersion());
+ assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
}
@@ -572,9 +575,11 @@
List<String> connectedServers = new ArrayList<String>(0);
connectedServers.add("s1");
connectedServers.add("s2");
- ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers);
+ ReplServerInfoMessage msg =
+ new ReplServerInfoMessage(connectedServers, 13);
ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes());
assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers());
+ assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 3488877..46f45bb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -73,7 +73,7 @@
ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
DbHandler handler =
- new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
+ new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -118,4 +118,75 @@
TestCaseUtils.deleteDirectory(testRoot);
}
+ /*
+ * Test the feature of clearing a dbHandler used by a replication server.
+ * The clear feature is used when a replication server receives a request
+ * to reset the generationId of a given domain.
+ */
+ @Test()
+ void testDbHandlerClear() throws Exception
+ {
+ TestCaseUtils.startServer();
+
+ // find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ int changelogPort = socket.getLocalPort();
+ socket.close();
+
+ // configure a ReplicationServer.
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(changelogPort, null, 0,
+ 2, 0, 100, null);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+
+ // create or clean a directory for the dbHandler
+ String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+ String path = buildRoot + File.separator + "build" + File.separator +
+ "unit-tests" + File.separator + "dbHandler";
+ File testRoot = new File(path);
+ if (testRoot.exists())
+ {
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+ testRoot.mkdirs();
+
+ ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
+
+ DbHandler handler =
+ new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
+
+ // Creates changes added to the dbHandler
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
+ ChangeNumber changeNumber1 = gen.newChangeNumber();
+ ChangeNumber changeNumber2 = gen.newChangeNumber();
+ ChangeNumber changeNumber3 = gen.newChangeNumber();
+
+ DeleteMsg update1 = new DeleteMsg("o=test", changeNumber1, "uid");
+ DeleteMsg update2 = new DeleteMsg("o=test", changeNumber2, "uid");
+ DeleteMsg update3 = new DeleteMsg("o=test", changeNumber3, "uid");
+
+ // Add the changes
+ handler.add(update1);
+ handler.add(update2);
+ handler.add(update3);
+
+ // Check they are here
+ assertEquals(changeNumber1, handler.getFirstChange());
+ assertEquals(changeNumber3, handler.getLastChange());
+
+ // Clear ...
+ handler.clear();
+
+ // Check the db is cleared.
+ assertEquals(null, handler.getFirstChange());
+ assertEquals(null, handler.getLastChange());
+
+ handler.shutdown();
+ dbEnv.shutdown();
+ replicationServer.shutdown();
+
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+
+
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 5869f80..cc54b98 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -30,6 +30,8 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.*;
import java.net.InetAddress;
@@ -41,8 +43,12 @@
import java.util.SortedSet;
import java.util.TreeSet;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.ModifyDNOperationBasis;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
@@ -68,6 +74,8 @@
public class ReplicationServerTest extends ReplicationTestCase
{
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
/**
* The replicationServer that will be used in this test.
*/
@@ -105,6 +113,15 @@
replicationServer = new ReplicationServer(conf);
}
+ private void debugInfo(String s)
+ {
+ // logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST **" + s);
+ }
+ }
+
/**
* Basic test of the replicationServer code :
* Connect 2 clients to the replicationServer and exchange messages
@@ -116,6 +133,7 @@
@Test()
public void changelogBasic() throws Exception
{
+ debugInfo("Starting changelogBasic");
ReplicationBroker server1 = null;
ReplicationBroker server2 = null;
@@ -188,7 +206,7 @@
fail("ReplicationServer basic : incorrect message type received.");
/*
- * Send and receive a Delete Msg from server 1 to server 2
+ * Send and receive a Delete Msg from server 2 to server 1
*/
msg =
new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2,
@@ -226,6 +244,7 @@
if (server2 != null)
server2.stop();
}
+ debugInfo("Ending changelogBasic");
}
/**
@@ -235,6 +254,7 @@
@Test(enabled=true, dependsOnMethods = { "changelogBasic" })
public void newClient() throws Exception
{
+ debugInfo("Starting newClient");
ReplicationBroker broker = null;
try {
@@ -244,7 +264,7 @@
ReplicationMessage msg2 = broker.receive();
if (!(msg2 instanceof DeleteMsg))
- fail("ReplicationServer basic transmission failed");
+ fail("ReplicationServer basic transmission failed:" + msg2);
else
{
DeleteMsg del = (DeleteMsg) msg2;
@@ -258,6 +278,7 @@
if (broker != null)
broker.stop();
}
+ debugInfo("Ending newClient");
}
@@ -277,11 +298,13 @@
try {
broker =
openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
- 100, replicationServerPort, 1000, state);
+ 100, replicationServerPort, 5000, state);
ReplicationMessage msg2 = broker.receive();
if (!(msg2 instanceof DeleteMsg))
- fail("ReplicationServer basic transmission failed");
+ {
+ fail("ReplicationServer basic transmission failed:" + msg2);
+ }
else
{
DeleteMsg del = (DeleteMsg) msg2;
@@ -304,6 +327,7 @@
@Test(enabled=true, dependsOnMethods = { "changelogBasic" })
public void newClientWithFirstChanges() throws Exception
{
+ debugInfo("Starting newClientWithFirstChanges");
/*
* Create a ServerState updated with the first changes from both servers
* done in test changelogBasic.
@@ -313,6 +337,7 @@
state.update(firstChangeNumberServer2);
newClientWithChanges(state, secondChangeNumberServer1);
+ debugInfo("Ending newClientWithFirstChanges");
}
/**
@@ -792,7 +817,7 @@
ServerStartMessage msg =
new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
- ProtocolVersion.currentVersion(), sslEncryption);
+ ProtocolVersion.currentVersion(), 0, sslEncryption);
session.publish(msg);
// Read the Replication Server state from the ReplServerStartMessage that
@@ -819,10 +844,13 @@
// send a ServerStartMessage containing the ServerState that was just
// received.
+ DN baseDn = DN.decode("dc=example,dc=com");
msg = new ServerStartMessage(
- (short) 1724, DN.decode("dc=example,dc=com"),
+ (short) 1724, baseDn,
0, 0, 0, 0, WINDOW, (long) 5000, replServerState,
- ProtocolVersion.currentVersion(), sslEncryption);
+ ProtocolVersion.currentVersion(),
+ ReplicationTestCase.getGenerationId(baseDn),
+ sslEncryption);
session.publish(msg);
// Read the ReplServerStartMessage that come back.
--
Gitblit v1.10.0