From 13819a2e81db0422a7c8c186f838c7b243173170 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 03 Sep 2014 06:30:37 +0000
Subject: [PATCH] OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation
---
opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java | 9
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 22
opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java | 34
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 50
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java | 2
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 10
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 24
opends/src/server/org/opends/server/backends/ChangelogBackend.java | 59
/dev/null | 2812 ----------------------------------------------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 32
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 161 --
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 55
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 121 -
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 154 --
opends/src/server/org/opends/server/replication/common/ServerState.java | 24
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 24
17 files changed, 172 insertions(+), 3,427 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index 6808a0a..11cc90f 100644
--- a/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -30,7 +30,7 @@
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
+import static org.opends.server.replication.plugin.MultimasterReplication.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.LDIFWriter.*;
import static org.opends.server.util.ServerConstants.*;
@@ -54,13 +54,11 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyCommonMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
-import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
@@ -356,28 +354,21 @@
{
return 1;
}
- // Search with cookie mode to count all update messages
- final SearchParams params = new SearchParams(getExcludedDomains());
- params.requestType = REQUEST_TYPE_FROM_COOKIE;
+
+ // Search with cookie mode to count all update messages cross replica
+ final SearchParams params = new SearchParams(getExcludedChangelogDomains());
params.cookie = new MultiDomainServerState();
- NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
try
{
+ final NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
search0(params, searchOp);
+ return searchOp.numSubordinates;
}
catch (ChangelogException e)
{
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get(
CHANGELOG_BASE_DN.toString(), stackTraceToSingleLineString(e)));
}
- return searchOp.numSubordinates;
- }
-
- private Set<String> getExcludedDomains()
- {
- final Set<String> domains = MultimasterReplication.getECLDisabledDomains();
- domains.add(DN_EXTERNAL_CHANGELOG_ROOT);
- return domains;
}
/**
@@ -542,16 +533,11 @@
private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException
{
- final SearchParams params = new SearchParams(getExcludedDomains());
+ final SearchParams params = new SearchParams(getExcludedChangelogDomains());
final ExternalChangelogRequestControl eclRequestControl =
searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
- if (eclRequestControl == null)
+ if (eclRequestControl != null)
{
- params.requestType = REQUEST_TYPE_FROM_CHANGE_NUMBER;
- }
- else
- {
- params.requestType = REQUEST_TYPE_FROM_COOKIE;
params.cookie = eclRequestControl.getCookie();
}
return params;
@@ -673,7 +659,6 @@
*/
static class SearchParams
{
- private ECLRequestType requestType;
private final Set<String> excludedBaseDNs;
private long lowestChangeNumber = -1;
private long highestChangeNumber = -1;
@@ -685,7 +670,7 @@
*/
SearchParams()
{
- this.excludedBaseDNs = Collections.emptySet();
+ this(Collections.<String> emptySet());
}
/**
@@ -700,6 +685,17 @@
}
/**
+ * Returns whether this search is cookie based.
+ *
+ * @return true if this search is cookie-based, false if this search is
+ * change number-based.
+ */
+ private boolean isCookieBasedSearch()
+ {
+ return cookie != null;
+ }
+
+ /**
* Indicates if provided change number is compatible with last change
* number.
*
@@ -909,16 +905,13 @@
private void search0(final SearchParams searchParams, final SearchOperation searchOperation)
throws DirectoryException, ChangelogException
{
- switch (searchParams.requestType)
+ if (searchParams.isCookieBasedSearch())
{
- case REQUEST_TYPE_FROM_CHANGE_NUMBER:
- searchFromChangeNumber(searchParams, searchOperation);
- break;
- case REQUEST_TYPE_FROM_COOKIE:
- searchFromCookie(searchParams, searchOperation);
- break;
- default:
- // not handled
+ searchFromCookie(searchParams, searchOperation);
+ }
+ else
+ {
+ searchFromChangeNumber(searchParams, searchOperation);
}
}
diff --git a/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java b/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
index c3e0ff7..c86939e 100644
--- a/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
@@ -34,13 +34,12 @@
import org.opends.server.api.VirtualAttributeProvider;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.*;
-import org.opends.server.util.ServerConstants;
import static org.opends.messages.ExtensionMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.plugin.MultimasterReplication.*;
/**
* This class implements a virtual attribute provider in the root-dse entry
@@ -94,11 +93,7 @@
{
if (replicationServer != null)
{
- // Set a list of excluded domains (also exclude 'cn=changelog' itself)
- Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
- excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
-
- String newestCookie = replicationServer.getNewestECLCookie(excludedDomains).toString();
+ String newestCookie = replicationServer.getNewestECLCookie(getExcludedChangelogDomains()).toString();
final ByteString cookie = ByteString.valueOf(newestCookie);
return Collections.singleton(AttributeValues.create(cookie, cookie));
}
diff --git a/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index 705a9a2..8026ce7 100644
--- a/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -383,7 +383,7 @@
* when an error occurs
* @return the split state.
*/
- public static Map<DN, ServerState> splitGenStateToServerStates(
+ private static Map<DN, ServerState> splitGenStateToServerStates(
String multiDomainServerState) throws DirectoryException
{
Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index c084dec..f66a147 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -421,28 +421,4 @@
return saved;
}
- /**
- * Build a copy of the ServerState with only CSNs older than a provided
- * timestamp. This is used when building the initial Cookie in the External
- * Changelog, to cope with purged changes.
- *
- * @param timestamp
- * The timestamp to compare the ServerState against
- * @return a copy of the ServerState which only contains the CSNs older than
- * csn.
- */
- public ServerState duplicateOnlyOlderThan(long timestamp)
- {
- final CSN csn = new CSN(timestamp, 0, 0);
- final ServerState newState = new ServerState();
- for (CSN change : serverIdToCSN.values())
- {
- if (change.isOlderThan(csn))
- {
- newState.serverIdToCSN.put(change.getServerId(), change);
- }
- }
- return newState;
- }
-
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index b66ab36..2e8f86e 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -26,7 +26,12 @@
*/
package org.opends.server.replication.plugin;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -38,16 +43,43 @@
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
-import org.opends.server.api.*;
+import org.opends.server.api.Backend;
+import org.opends.server.api.BackupTaskListener;
+import org.opends.server.api.ExportTaskListener;
+import org.opends.server.api.ImportTaskListener;
+import org.opends.server.api.RestoreTaskListener;
+import org.opends.server.api.SynchronizationProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.DSRSShutdownSync;
-import org.opends.server.types.*;
-import org.opends.server.types.operation.*;
+import org.opends.server.types.BackupConfig;
+import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.Control;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.LDIFExportConfig;
+import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.Modification;
+import org.opends.server.types.Operation;
+import org.opends.server.types.RestoreConfig;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SynchronizationProviderResult;
+import org.opends.server.types.operation.PluginOperation;
+import org.opends.server.types.operation.PostOperationAddOperation;
+import org.opends.server.types.operation.PostOperationDeleteOperation;
+import org.opends.server.types.operation.PostOperationModifyDNOperation;
+import org.opends.server.types.operation.PostOperationModifyOperation;
+import org.opends.server.types.operation.PostOperationOperation;
+import org.opends.server.types.operation.PreOperationAddOperation;
+import org.opends.server.types.operation.PreOperationDeleteOperation;
+import org.opends.server.types.operation.PreOperationModifyDNOperation;
+import org.opends.server.types.operation.PreOperationModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
+import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -814,14 +846,14 @@
}
/**
- * Gets the Set of baseDN of the domains which are disabled for the external
- * changelog.
+ * Gets the Set of domain baseDN which are disabled for the external changelog.
*
- * @return The Set of baseDNs which are disabled for the external changelog.
+ * @return The Set of domain baseDNs which are disabled for the external changelog.
*/
- public static Set<String> getECLDisabledDomains()
+ public static Set<String> getExcludedChangelogDomains()
{
- final Set<String> disabledBaseDNs = new HashSet<String>(domains.size());
+ final Set<String> disabledBaseDNs = new HashSet<String>(domains.size() + 1);
+ disabledBaseDNs.add(DN_EXTERNAL_CHANGELOG_ROOT);
for (LDAPReplicationDomain domain : domains.values())
{
if (!domain.isECLEnabled())
diff --git a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
deleted file mode 100644
index cebf245..0000000
--- a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2014 ForgeRock AS
- */
-package org.opends.server.replication.protocol;
-
-import java.util.zip.DataFormatException;
-
-import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-
-/**
- * Container for the ECL information sent from the ReplicationServer
- * to the client part (either broker over the protocol, or ECLSession).
- */
-public class ECLUpdateMsg extends ReplicationMsg
-{
- /** The replication change returned. */
- private final LDAPUpdateMsg updateMsg;
-
- /** The baseDN of the domain to which applies the change. */
- private final DN baseDN;
-
- /** The value of the cookie updated with the current change. */
- private MultiDomainServerState cookie;
-
- /** The changeNumber as specified by draft-good-ldap-changelog. */
- private long changeNumber;
-
- /**
- * Creates a new message.
- * @param updateMsg The provided update message.
- * @param cookie The provided cookie value
- * @param baseDN The provided baseDN.
- * @param changeNumber The provided change number.
- */
- public ECLUpdateMsg(LDAPUpdateMsg updateMsg, MultiDomainServerState cookie,
- DN baseDN, long changeNumber)
- {
- this.cookie = cookie;
- this.baseDN = baseDN;
- this.updateMsg = updateMsg;
- this.changeNumber = changeNumber;
- }
-
- /**
- * Creates a new message from its encoded form.
- *
- * @param in The byte array containing the encoded form of the message.
- * @throws DataFormatException If the byte array does not contain
- * a valid encoded form of the message.
- * @throws NotSupportedOldVersionPDUException when it occurs.
- */
- ECLUpdateMsg(byte[] in) throws DataFormatException,
- NotSupportedOldVersionPDUException
- {
- try
- {
- final ByteArrayScanner scanner = new ByteArrayScanner(in);
- if (scanner.nextByte() != MSG_TYPE_ECL_UPDATE)
- {
- throw new DataFormatException("byte[] is not a valid " +
- getClass().getCanonicalName());
- }
-
- this.cookie = new MultiDomainServerState(scanner.nextString());
- this.baseDN = scanner.nextDN();
- this.changeNumber = scanner.nextIntUTF8();
-
- // Decode the msg
- this.updateMsg = (LDAPUpdateMsg) ReplicationMsg.generateMsg(
- scanner.remainingBytesZeroTerminated(),
- ProtocolVersion.getCurrentVersion());
- }
- catch(DirectoryException de)
- {
- throw new DataFormatException(de.toString());
- }
- }
-
- /**
- * Getter for the cookie value.
- * @return The cookie value.
- */
- public MultiDomainServerState getCookie()
- {
- return cookie;
- }
-
- /**
- * Setter for the cookie value.
- * @param cookie The provided cookie value.
- */
- public void setCookie(MultiDomainServerState cookie)
- {
- this.cookie = cookie;
- }
-
- /**
- * Getter for the baseDN.
- *
- * @return The baseDN.
- */
- public DN getBaseDN()
- {
- return baseDN;
- }
-
- /**
- * Getter for the message.
- * @return The included replication message.
- */
- public UpdateMsg getUpdateMsg()
- {
- return updateMsg;
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return "ECLUpdateMsg:[" +
- " updateMsg: " + updateMsg +
- " cookie: " + cookie +
- " changeNumber: " + changeNumber +
- " serviceId: " + baseDN + "]";
- }
-
- /** {@inheritDoc} */
- @Override
- public byte[] getBytes(short protocolVersion)
- {
- final ByteArrayBuilder builder = new ByteArrayBuilder();
- builder.appendByte(MSG_TYPE_ECL_UPDATE);
- builder.appendString(String.valueOf(cookie));
- builder.appendDN(baseDN);
- // FIXME JNR Changing the line below to use long would require a protocol
- // version change. Leave it like this for now until the need arises.
- builder.appendIntUTF8((int) changeNumber);
- builder.appendZeroTerminatedByteArray(updateMsg.getBytes(protocolVersion));
- return builder.toByteArray();
- }
-
- /**
- * Setter for the changeNumber of this change.
- * @param changeNumber the provided changeNumber for this change.
- */
- public void setChangeNumber(long changeNumber)
- {
- this.changeNumber = changeNumber;
- }
-
- /**
- * Getter for the changeNumber of this change.
- * @return the changeNumber of this change.
- */
- public long getChangeNumber()
- {
- return this.changeNumber;
- }
-
-}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 3fef9a1..d9ab381 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -71,8 +71,11 @@
static final byte MSG_TYPE_GENERIC_UPDATE = 29;
// Added for protocol version 3
+ @Deprecated
static final byte MSG_TYPE_START_ECL = 30;
+ @Deprecated
static final byte MSG_TYPE_START_ECL_SESSION = 31;
+ @Deprecated
static final byte MSG_TYPE_ECL_UPDATE = 32;
static final byte MSG_TYPE_CT_HEARTBEAT = 33;
@@ -190,11 +193,12 @@
case MSG_TYPE_GENERIC_UPDATE:
return new UpdateMsg(buffer);
case MSG_TYPE_START_ECL:
- return new ServerStartECLMsg(buffer);
case MSG_TYPE_START_ECL_SESSION:
- return new StartECLSessionMsg(buffer);
case MSG_TYPE_ECL_UPDATE:
- return new ECLUpdateMsg(buffer);
+ // Legacy versions never sent such messages to other instances (other JVMs).
+ // They were only used in the combined DS-RS case.
+ // It is safe to totally ignore these values since code now uses the ChangelogBackend.
+ return null;
case MSG_TYPE_CT_HEARTBEAT:
return new ChangeTimeHeartbeatMsg(buffer, protocolVersion);
case MSG_TYPE_REPL_SERVER_START_DS:
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
deleted file mode 100644
index d59f01c..0000000
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2013-2014 ForgeRock AS.
- */
-package org.opends.server.replication.protocol;
-
-import java.util.zip.DataFormatException;
-
-import org.opends.server.replication.common.ServerState;
-
-/**
- * This message is used by LDAP server when they first connect.
- * to a replication server to let them know who they are and what is their state
- * (their RUV)
- */
-public class ServerStartECLMsg extends StartMsg
-{
- private final String serverURL;
- private final int maxReceiveQueue;
- private final int maxSendQueue;
- private final int maxReceiveDelay;
- private final int maxSendDelay;
- private final int windowSize;
- private final ServerState serverState;
-
- /**
- * The time in milliseconds between heartbeats from the replication
- * server. Zero means heartbeats are off.
- */
- private final long heartbeatInterval;
-
- /**
- * Whether to continue using SSL to encrypt messages after the start
- * messages have been exchanged.
- */
- private final boolean sslEncryption;
-
- /**
- * Creates a new ServerStartMsg. This message is to be sent by an LDAP
- * Server after being connected to a replication server for a given
- * replication domain.
- *
- * @param serverURL directory server URL
- * @param maxReceiveDelay The max receive delay for this server.
- * @param maxReceiveQueue The max receive Queue for this server.
- * @param maxSendDelay The max Send Delay from this server.
- * @param maxSendQueue The max send Queue from this server.
- * @param windowSize The window size used by this server.
- * @param heartbeatInterval The requested heartbeat interval.
- * @param serverState The state of this server.
- * @param generationId The generationId for this server.
- * @param sslEncryption Whether to continue using SSL to encrypt messages
- * after the start messages have been exchanged.
- * @param groupId The group id of the DS for this DN
- */
- public ServerStartECLMsg(String serverURL, int maxReceiveDelay,
- int maxReceiveQueue, int maxSendDelay,
- int maxSendQueue, int windowSize,
- long heartbeatInterval,
- ServerState serverState,
- long generationId,
- boolean sslEncryption,
- byte groupId)
- {
- super((short) -1 /* version set when sending */, generationId);
-
- this.serverURL = serverURL;
- this.maxReceiveDelay = maxReceiveDelay;
- this.maxReceiveQueue = maxReceiveQueue;
- this.maxSendDelay = maxSendDelay;
- this.maxSendQueue = maxSendQueue;
- this.windowSize = windowSize;
- this.heartbeatInterval = heartbeatInterval;
- this.sslEncryption = sslEncryption;
- this.serverState = serverState;
- this.groupId = groupId;
- }
-
- /**
- * Creates a new ServerStartMsg from its encoded form.
- *
- * @param in The byte array containing the encoded form of the
- * ServerStartMsg.
- * @throws DataFormatException If the byte array does not contain a valid
- * encoded form of the ServerStartMsg.
- */
- ServerStartECLMsg(byte[] in) throws DataFormatException
- {
- final ByteArrayScanner scanner = new ByteArrayScanner(in);
- decodeHeader(scanner, MSG_TYPE_START_ECL);
-
- serverURL = scanner.nextString();
- maxReceiveDelay = scanner.nextIntUTF8();
- maxReceiveQueue = scanner.nextIntUTF8();
- maxSendDelay = scanner.nextIntUTF8();
- maxSendQueue = scanner.nextIntUTF8();
- windowSize = scanner.nextIntUTF8();
- heartbeatInterval = scanner.nextIntUTF8();
- // FIXME awful encoding
- sslEncryption = Boolean.valueOf(scanner.nextString());
- serverState = scanner.nextServerStateMustComeLast();
- }
-
- /**
- * get the Server URL from the message.
- * @return the server URL
- */
- public String getServerURL()
- {
- return serverURL;
- }
-
- /**
- * Get the maxReceiveDelay.
- * @return Returns the maxReceiveDelay.
- */
- public int getMaxReceiveDelay()
- {
- return maxReceiveDelay;
- }
-
- /**
- * Get the maxReceiveQueue.
- * @return Returns the maxReceiveQueue.
- */
- public int getMaxReceiveQueue()
- {
- return maxReceiveQueue;
- }
-
- /**
- * Get the maxSendDelay.
- * @return Returns the maxSendDelay.
- */
- public int getMaxSendDelay()
- {
- return maxSendDelay;
- }
-
- /**
- * Get the maxSendQueue.
- * @return Returns the maxSendQueue.
- */
- public int getMaxSendQueue()
- {
- return maxSendQueue;
- }
-
- /**
- * Get the ServerState.
- * @return The ServerState.
- */
- public ServerState getServerState()
- {
- return serverState;
- }
-
- /** {@inheritDoc} */
- @Override
- public byte[] getBytes(short sessionProtocolVersion)
- {
- final ByteArrayBuilder builder = new ByteArrayBuilder();
- encodeHeader(MSG_TYPE_START_ECL, builder, sessionProtocolVersion);
- builder.appendString(serverURL);
- builder.appendIntUTF8(maxReceiveDelay);
- builder.appendIntUTF8(maxReceiveQueue);
- builder.appendIntUTF8(maxSendDelay);
- builder.appendIntUTF8(maxSendQueue);
- builder.appendIntUTF8(windowSize);
- builder.appendLongUTF8(heartbeatInterval);
- // FIXME awful encoding
- builder.appendString(Boolean.toString(sslEncryption));
- builder.appendServerStateMustComeLast(serverState);
- return builder.toByteArray();
- }
-
- /**
- * Get the window size for the ldap server that created the message.
- *
- * @return The window size for the ldap server that created the message.
- */
- public int getWindowSize()
- {
- return windowSize;
- }
-
- /**
- * Get the heartbeat interval requested by the ldap server that created the
- * message.
- *
- * @return The heartbeat interval requested by the ldap server that created
- * the message.
- */
- public long getHeartbeatInterval()
- {
- return heartbeatInterval;
- }
-
- /**
- * Get the SSL encryption value for the ldap server that created the
- * message.
- *
- * @return The SSL encryption value for the ldap server that created the
- * message.
- */
- public boolean getSSLEncryption()
- {
- return sslEncryption;
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getCanonicalName() + " content: " +
- "\nprotocolVersion: " + protocolVersion +
- "\ngenerationId: " + generationId +
- "\ngroupId: " + groupId +
- "\nheartbeatInterval: " + heartbeatInterval +
- "\nmaxReceiveDelay: " + maxReceiveDelay +
- "\nmaxReceiveQueue: " + maxReceiveQueue +
- "\nmaxSendDelay: " + maxSendDelay +
- "\nmaxSendQueue: " + maxSendQueue +
- "\nserverState: " + serverState +
- "\nserverURL: " + serverURL +
- "\nsslEncryption: " + sslEncryption +
- "\nwindowSize: " + windowSize;
- }
- }
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java b/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
deleted file mode 100644
index 2f9f80d..0000000
--- a/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2013-2014 ForgeRock AS.
- */
-package org.opends.server.replication.protocol;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.zip.DataFormatException;
-
-import org.opends.server.replication.common.CSN;
-import org.opends.server.util.StaticUtils;
-
-/**
- * This class specifies the parameters of a search request on the ECL.
- * It is used as an interface between the requestor (plugin part)
- */
-public class StartECLSessionMsg extends ReplicationMsg
-{
-
- /**
- * Type of request made to the External Changelog.
- */
- public enum ECLRequestType
- {
-
- /**
- * This specifies that the ECL is requested from a provided cookie value
- * defined as a MultiDomainServerState.
- */
- REQUEST_TYPE_FROM_COOKIE,
-
- /**
- * This specifies that the ECL is requested from a provided interval
- * of change numbers (as defined by draft-good-ldap-changelog [CHANGELOG]
- * and NOT replication CSNs).
- * TODO: not yet implemented
- */
- REQUEST_TYPE_FROM_CHANGE_NUMBER,
-
- /**
- * This specifies that the ECL is requested only for the entry that have a
- * CSN matching the provided one.
- * TODO: not yet implemented
- */
- REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER
- }
-
- /**
- * Whether the current External Changelog search is persistent and requires to
- * receive only new changes or already existing changes as well.
- */
- public enum Persistent
- {
- /**
- * This specifies that the request on the ECL is a PERSISTENT search with
- * changesOnly = false.
- * <p>
- * It will return the content of the changelog DB as it is now, plus any
- * subsequent changes.
- */
- PERSISTENT,
-
- /**
- * This specifies that the request on the ECL is a NOT a PERSISTENT search.
- * <p>
- * It will only return the content of the changelog DB as it is now, and
- * stop. It will NOT be turned into a persistent search that can return
- * subsequent changes.
- */
- NON_PERSISTENT,
-
- /**
- * This specifies that the request on the ECL is a PERSISTENT search with
- * changesOnly = true.
- * <p>
- * It will only return subsequent changes that do not exist yet in the
- * changelog DB.
- */
- PERSISTENT_CHANGES_ONLY
- }
-
- /** The type of request as defined by REQUEST_TYPE_... */
- private ECLRequestType eclRequestType;
-
- /**
- * When eclRequestType = FROM_COOKIE, specifies the provided cookie value.
- */
- private String crossDomainServerState = "";
-
- /**
- * When eclRequestType = FROM_CHANGE_NUMBER, specifies the provided change
- * number first and last - [CHANGELOG].
- */
- private long firstChangeNumber = -1;
- private long lastChangeNumber = -1;
-
- /**
- * When eclRequestType = EQUALS_REPL_CHANGE_NUMBER, specifies the provided
- * replication CSN.
- */
- private CSN csn;
-
- /**
- * Specifies whether the search is persistent and changesOnly.
- */
- private Persistent isPersistent = Persistent.NON_PERSISTENT;
-
- /**
- * This is a string identifying the operation, provided by the client part of
- * the ECL, used to help interpretation of messages logged.
- * <p>
- * It helps debugging and tracing the client operation related when
- * processing, on the RS side, a request on the ECL.
- */
- private String operationId = "";
-
- /** Excluded domains. */
- private Set<String> excludedBaseDNs = new HashSet<String>();
-
- /**
- * Creates a new StartSessionMsg message from its encoded form.
- *
- * @param in The byte array containing the encoded form of the message.
- * @throws java.util.zip.DataFormatException If the byte array does not
- * contain a valid encoded form of the message.
- */
- StartECLSessionMsg(byte[] in) throws DataFormatException
- {
- /*
- * The message is stored in the form:
- * <message type><status><assured flag><assured mode><safe data level>
- * <list of referrals urls>
- * (each referral url terminates with 0)
- */
- final ByteArrayScanner scanner = new ByteArrayScanner(in);
- final byte msgType = scanner.nextByte();
- if (msgType != MSG_TYPE_START_ECL_SESSION)
- {
- throw new DataFormatException("Input is not a valid "
- + getClass().getCanonicalName());
- }
-
- eclRequestType = ECLRequestType.values()[scanner.nextIntUTF8()];
- firstChangeNumber = scanner.nextIntUTF8();
- lastChangeNumber = scanner.nextIntUTF8();
- csn = scanner.nextCSNUTF8();
- isPersistent = Persistent.values()[scanner.nextIntUTF8()];
- crossDomainServerState = scanner.nextString();
- operationId = scanner.nextString();
- final String excludedDNsString = scanner.nextString();
- if (excludedDNsString != null && excludedDNsString.length() > 0)
- {
- Collections.addAll(excludedBaseDNs, excludedDNsString.split(";"));
- }
- }
-
- /**
- * Creates a new StartSessionMsg message with the given required parameters.
- */
- public StartECLSessionMsg()
- {
- eclRequestType = ECLRequestType.REQUEST_TYPE_FROM_COOKIE;
- crossDomainServerState = "";
- firstChangeNumber = -1;
- lastChangeNumber = -1;
- csn = new CSN(0, 0, 0);
- isPersistent = Persistent.NON_PERSISTENT;
- operationId = "-1";
- excludedBaseDNs = new HashSet<String>();
- }
-
- /** {@inheritDoc} */
- @Override
- public byte[] getBytes(short protocolVersion)
- {
- final ByteArrayBuilder builder = new ByteArrayBuilder();
- builder.appendByte(MSG_TYPE_START_ECL_SESSION);
- builder.appendIntUTF8(eclRequestType.ordinal());
- // FIXME JNR Changing the lines below to use long would require a protocol
- // version change. Leave it like this for now until the need arises.
- builder.appendIntUTF8((int) firstChangeNumber);
- builder.appendIntUTF8((int) lastChangeNumber);
- builder.appendCSNUTF8(csn);
- builder.appendIntUTF8(isPersistent.ordinal());
- builder.appendString(crossDomainServerState);
- builder.appendString(operationId);
- builder.appendString(StaticUtils.collectionToString(excludedBaseDNs, ";"));
- return builder.toByteArray();
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + " [" +
- " requestType="+ eclRequestType +
- " persistentSearch=" + isPersistent +
- " csn=" + csn +
- " firstChangeNumber=" + firstChangeNumber +
- " lastChangeNumber=" + lastChangeNumber +
- " generalizedState=" + crossDomainServerState +
- " operationId=" + operationId +
- " excludedDNs=" + excludedBaseDNs + "]";
- }
-
- /**
- * Getter on the changer number start.
- * @return the changer number start.
- */
- public long getFirstChangeNumber()
- {
- return firstChangeNumber;
- }
-
- /**
- * Specifies the last changer number requested.
- *
- * @return the last change number requested.
- */
- public long getLastChangeNumber()
- {
- return lastChangeNumber;
- }
-
- /**
- * Setter on the first changer number (as defined by [CHANGELOG]).
- * @param firstChangeNumber the provided first change number.
- */
- public void setFirstChangeNumber(long firstChangeNumber)
- {
- this.firstChangeNumber = firstChangeNumber;
- }
-
- /**
- * Setter on the last changer number (as defined by [CHANGELOG]).
- * @param lastChangeNumber the provided last change number.
- */
- public void setLastChangeNumber(long lastChangeNumber)
- {
- this.lastChangeNumber = lastChangeNumber;
- }
-
- /**
- * Getter on the replication CSN.
- * @return the replication CSN.
- */
- public CSN getCSN()
- {
- return csn;
- }
-
- /**
- * Setter on the replication CSN.
- * @param csn the provided replication CSN.
- */
- public void setCSN(CSN csn)
- {
- this.csn = csn;
- }
- /**
- * Getter on the type of request.
- * @return the type of request.
- */
- public ECLRequestType getECLRequestType()
- {
- return eclRequestType;
- }
-
- /**
- * Setter on the type of request.
- * @param eclRequestType the provided type of request.
- */
- public void setECLRequestType(ECLRequestType eclRequestType)
- {
- this.eclRequestType = eclRequestType;
- }
-
- /**
- * Getter on the persistent property of the search request on the ECL.
- * @return the persistent property.
- */
- public Persistent getPersistent()
- {
- return this.isPersistent;
- }
-
- /**
- * Setter on the persistent property of the search request on the ECL.
- * @param isPersistent the provided persistent property.
- */
- public void setPersistent(Persistent isPersistent)
- {
- this.isPersistent = isPersistent;
- }
-
- /**
- * Getter of the cross domain server state.
- * @return the cross domain server state.
- */
- public String getCrossDomainServerState()
- {
- return this.crossDomainServerState;
- }
-
- /**
- * Setter of the cross domain server state.
- * @param crossDomainServerState the provided cross domain server state.
- */
- public void setCrossDomainServerState(String crossDomainServerState)
- {
- this.crossDomainServerState = crossDomainServerState;
- }
-
- /**
- * Setter of the operation id.
- * @param operationId The provided operation id.
- */
- public void setOperationId(String operationId)
- {
- this.operationId = operationId;
- }
-
- /**
- * Getter on the operation id.
- * @return the operation id.
- */
- public String getOperationId()
- {
- return this.operationId;
- }
-
- /**
- * Getter on the list of excluded baseDNs (like cn=admin, ...).
- *
- * @return the list of excluded baseDNs.
- */
- public Set<String> getExcludedBaseDNs()
- {
- return this.excludedBaseDNs;
- }
-
- /**
- * Setter on the list of excluded baseDNs.
- *
- * @param excludedBaseDNs
- * the provided list of excluded baseDNs.
- */
- public void setExcludedDNs(Set<String> excludedBaseDNs)
- {
- this.excludedBaseDNs = excludedBaseDNs;
- }
-
-}
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
deleted file mode 100644
index bbd7a8f..0000000
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ /dev/null
@@ -1,1468 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2010-2014 ForgeRock AS
- */
-package org.opends.server.replication.server;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.opends.messages.Category;
-import org.opends.messages.Message;
-import org.opends.messages.Severity;
-import org.opends.server.backends.ChangelogBackend;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
-import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.types.*;
-import org.opends.server.util.ServerConstants;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.protocol.ProtocolVersion.*;
-import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
-import static org.opends.server.replication.protocol.StartECLSessionMsg.Persistent.*;
-import static org.opends.server.util.StaticUtils.*;
-
-/**
- * This class defines a server handler, which handles all interaction with a
- * peer replication server.
- */
-public final class ECLServerHandler extends ServerHandler
-{
-
- /**
- * Marks the end of the search on the External Change Log.
- */
- private static int UNDEFINED_PHASE = 0;
- /**
- * The External Change Log initialization phase currently reads the changes
- * from all the ReplicaDBs from oldest to newest and then:
- * <ul>
- * <li>Matches each ReplicaDBs change with the corresponding record in
- * {@link ChangeNumberIndexDB}, then assign its changeNumber in memory before
- * returning the change to the client</li>
- * <li>Once it reaches the end of the {@link ChangeNumberIndexDB}, it inserts
- * each ReplicaDBs change in the {@link ChangeNumberIndexDB} and gets back and
- * assign the changeNumber in memory to the ReplicaDBs change.</li>
- * </ul>
- * Once this phase is over the current search moves to the
- * {@link #UNDEFINED_PHASE} or the {@link #PERSISTENT_PHASE} depending on the
- * search type.
- *
- * @see #getSearchPhase()
- */
- private static int INIT_PHASE = 1;
- /**
- * The persistent phase is only used for persistent searches on the External
- * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the
- * client newly added changes to the {@link ChangeNumberIndexDB}.
- */
- private static int PERSISTENT_PHASE = 2;
-
- private StartECLSessionMsg startECLSessionMsg;
-
- /** Cursor on the {@link ChangeNumberIndexDB}. */
- private DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor;
-
- private boolean draftCompat = false;
- /**
- * Specifies whether the change number db has been read until its end.
- */
- private boolean isEndOfCNIndexDBReached = false;
- /**
- * Specifies the current search phase : INIT or PERSISTENT.
- */
- private int searchPhase = INIT_PHASE;
- /**
- * Specifies the cookie contained in the request, specifying where
- * to start serving the ECL.
- */
- private String startCookie;
- /**
- * Specifies the value of the cookie before the change currently processed is
- * returned. It is updated with the CSN of the change currently processed
- * (thus becoming the "current" cookie just before the change is returned.
- */
- private MultiDomainServerState previousCookie = new MultiDomainServerState();
-
- /**
- * The global list of contexts by domain for the search currently processed.
- */
- private Set<DomainContext> domainCtxts = Collections.emptySet();
-
- /**
- * Provides a string representation of this object.
- * @return the string representation.
- */
- private String dumpState()
- {
- return getClass().getSimpleName() +
- " [draftCompat=" + draftCompat +
- ", persistent=" + startECLSessionMsg.getPersistent() +
- ", startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
- ", endOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
- ", searchPhase=" + searchPhase +
- ", startCookie=" + startCookie +
- ", previousCookie=" + previousCookie +
- "]";
- }
-
- /**
- * Class that manages the 'by domain' state variables for the search being
- * currently processed on the ECL.
- * For example :
- * if search on 'cn=changelog' is being processed when 2 replicated domains
- * dc=us and dc=europe are configured, then there will be 2 DomainContext
- * used, one for ds=us, and one for dc=europe.
- */
- private class DomainContext
- {
- private final ReplicationServerDomain rsDomain;
-
- /**
- * Active when there are still changes supposed eligible for the ECL.
- * Here is the lifecycle of this field:
- * <ol>
- * <li>active==true at the start of the INIT phase,</li>
- * <li>active==false when there are no more changes for a domain in the the INIT phase,</li>
- * <li>active==true if it is a persistent search on external changelog. It never moves again</li>
- * </ol>
- */
- private boolean active = true;
- private UpdateMsg nextMsg;
-
- /**
- * the message handler from which are reading the changes for this domain.
- */
- private final MessageHandler mh;
- private final ServerState startState;
- private final ServerState currentState = new ServerState();
- private final ServerState stopState;
- private final long domainLatestTrimDate;
-
- public DomainContext(ReplicationServerDomain domain,
- ServerState startState, ServerState stopState, MessageHandler mh)
- {
- this.rsDomain = domain;
- this.startState = startState;
- this.stopState = stopState;
- this.mh = mh;
- this.domainLatestTrimDate = domain.getLatestDomainTrimDate();
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- StringBuilder buffer = new StringBuilder();
- toString(buffer);
- return buffer.toString();
- }
-
- private StringBuilder toString(StringBuilder buffer)
- {
- buffer.append(getClass().getSimpleName());
- buffer.append(" [");
- buffer.append(active ? "active" : "inactive");
- buffer.append(", baseDN=\"").append(rsDomain.getBaseDN()).append("\"");
- if (nextMsg != null)
- {
- buffer.append(", csn=").append(nextMsg.getCSN().toStringUI());
- }
- buffer.append(", nextMsg=[").append(nextMsg);
- buffer.append("]")
- .append(", startState=").append(startState)
- .append(", currentState=").append(currentState)
- .append(", stopState=").append(stopState)
- .append("]");
- return buffer;
- }
-
- /**
- * Computes the next available message for this domain context.
- */
- private void computeNextAvailableMessage()
- {
- nextMsg = getNextMessage();
- if (debugEnabled())
- {
- TRACER.debugInfo("In ECLServerHandler, for baseDN="
- + mh.getBaseDNString() + " computeNextAvailableMessage("
- + getOperationId() + ") : newMsg=[" + nextMsg + "] " + dumpState());
- }
- }
-
- private UpdateMsg getNextMessage()
- {
- while (true)
- {
- final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
-
- if (newMsg instanceof ReplicaOfflineMsg)
- {
- // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog
- // proceed as if it was never returned
- continue;
- }
- else if (newMsg == null)
- { // in non blocking mode, null means no more messages
- return null;
- }
- else if (newMsg.getCSN().getTime() >= domainLatestTrimDate)
- {
- // when the replication changelog is trimmed, the newest change
- // is left in the DB (whatever its age), and we don't want this change
- // to be returned in the external changelog.
- // So let's check if the change time is older than the trim date
- return newMsg;
- }
- }
- }
-
- /**
- * Unregister the handler from the DomainContext ReplicationDomain.
- * @return Whether the handler has been unregistered with success.
- */
- private boolean unRegisterHandler()
- {
- return rsDomain.unRegisterHandler(mh);
- }
-
- /**
- * Stops the DomainContext handler.
- */
- private void stopServer()
- {
- rsDomain.stopServer(mh);
- }
- }
-
- private String domaimCtxtsToString(String msg)
- {
- final StringBuilder buffer = new StringBuilder();
- buffer.append(msg).append("\n");
- for (DomainContext domainCtxt : domainCtxts) {
- domainCtxt.toString(buffer).append("\n");
- }
- return buffer.toString();
- }
-
- /**
- * Starts this handler based on a start message received from remote server.
- * @param inECLStartMsg The start msg provided by the remote server.
- * @return Whether the remote server requires encryption or not.
- * @throws DirectoryException When a problem occurs.
- */
- private boolean processStartFromRemote(ServerStartECLMsg inECLStartMsg)
- throws DirectoryException
- {
- try
- {
- session.setProtocolVersion(getCompatibleVersion(inECLStartMsg
- .getVersion()));
- serverURL = inECLStartMsg.getServerURL();
- setInitialServerState(inECLStartMsg.getServerState());
- setSendWindowSize(inECLStartMsg.getWindowSize());
- if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
- {
- // We support connection from a V1 RS
- // Only V2 protocol has the group id in repl server start message
- this.groupId = inECLStartMsg.getGroupId();
- }
- }
- catch(Exception e)
- {
- Message message = Message.raw(e.getLocalizedMessage());
- throw new DirectoryException(ResultCode.OTHER, message);
- }
- return inECLStartMsg.getSSLEncryption();
- }
-
- /**
- * Sends a start message to the remote ECL server.
- *
- * @return The StartMsg sent.
- * @throws IOException
- * When an exception occurs.
- */
- private StartMsg sendStartToRemote() throws IOException
- {
- final StartMsg startMsg;
-
- // Before V4 protocol, we sent a ReplServerStartMsg
- if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
- startMsg = createReplServerStartMsg();
- }
- else
- {
- // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
- startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
- new ServerState(), localGenerationId, sslEncryption,
- getLocalGroupId(), 0, replicationServer.getWeight(), 0);
- }
-
- send(startMsg);
- return startMsg;
- }
-
- /**
- * Creates a new handler object to a remote replication server.
- * @param session The session with the remote RS.
- * @param queueSize The queue size to manage updates to that RS.
- * @param replicationServer The hosting local RS object.
- * @param rcvWindowSize The receiving window size.
- */
- public ECLServerHandler(
- Session session,
- int queueSize,
- ReplicationServer replicationServer,
- int rcvWindowSize)
- {
- super(session, queueSize, replicationServer, rcvWindowSize);
- try
- {
- setBaseDNAndDomain(ChangelogBackend.CHANGELOG_BASE_DN, true);
- }
- catch(DirectoryException de)
- {
- // no chance to have a bad domain set here
- }
- }
-
- /**
- * Creates a new handler object to a remote replication server.
- * @param replicationServer The hosting local RS object.
- * @param startECLSessionMsg the start parameters.
- * @throws DirectoryException when an errors occurs.
- */
- public ECLServerHandler(ReplicationServer replicationServer,
- StartECLSessionMsg startECLSessionMsg) throws DirectoryException
- {
- // queueSize is hard coded to 1 else super class hangs for some reason
- this(null, 1, replicationServer, 0);
- initialize(startECLSessionMsg);
- }
-
- /**
- * Starts the handler from a remote start message received from
- * the remote server.
- * @param inECLStartMsg The provided ReplServerStart message received.
- */
- public void startFromRemoteServer(ServerStartECLMsg inECLStartMsg)
- {
- try
- {
- boolean sessionInitiatorSSLEncryption =
- processStartFromRemote(inECLStartMsg);
-
- lockDomainWithTimeout();
-
- localGenerationId = -1;
-
- StartMsg outStartMsg = sendStartToRemote();
- logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg);
-
- // until here session is encrypted then it depends on the negotiation
- // The session initiator decides whether to use SSL.
- if (!sessionInitiatorSSLEncryption)
- {
- session.stopEncryption();
- }
-
- // wait and process StartSessionMsg from remote RS
- StartECLSessionMsg inStartECLSessionMsg =
- waitAndProcessStartSessionECLFromRemoteServer();
- if (inStartECLSessionMsg == null)
- {
- // client wants to properly close the connection (client sent a StopMsg)
- logStopReceived();
- abortStart(null);
- return;
- }
-
- logStartECLSessionHandshake(inStartECLSessionMsg);
-
- initialize(inStartECLSessionMsg);
- }
- catch(DirectoryException de)
- {
- abortStart(de.getMessageObject());
- }
- catch(Exception e)
- {
- abortStart(Message.raw(e.getLocalizedMessage()));
- }
- finally
- {
- releaseDomainLock();
- }
- }
-
- /**
- * Wait receiving the StartSessionMsg from the remote DS and process it.
- *
- * @return the startSessionMsg received
- * @throws Exception
- */
- private StartECLSessionMsg waitAndProcessStartSessionECLFromRemoteServer()
- throws Exception
- {
- ReplicationMsg msg = session.receive();
-
- if (msg instanceof StopMsg)
- {
- // client wants to stop handshake (was just for handshake phase one for RS
- // choice). Return null to make the session be terminated.
- return null;
- }
- else if (!(msg instanceof StartECLSessionMsg))
- {
- Message message = Message.raw(
- "Protocol error: StartECLSessionMsg required." + msg + " received.");
- abortStart(message);
- return null;
- }
- else
- {
- // Process StartSessionMsg sent by remote DS
- return (StartECLSessionMsg) msg;
- }
- }
-
- /**
- * Initialize the handler from a provided cookie value.
- *
- * @param providedCookie
- * The provided cookie value.
- * @throws DirectoryException
- * When an error is raised.
- */
- private void initializeCLSearchFromCookie(String providedCookie)
- throws DirectoryException
- {
- this.draftCompat = false;
-
- initializeChangelogDomainCtxts(providedCookie, false);
- }
-
- /**
- * Initialize the handler from a provided start change number.
- *
- * @param startChangeNumber
- * The provided start change number.
- * @throws DirectoryException
- * When an error is raised.
- */
- private void initializeCLSearchFromChangeNumber(long startChangeNumber)
- throws DirectoryException
- {
- try
- {
- this.draftCompat = true;
-
- final String providedCookie = findCookie(startChangeNumber);
- initializeChangelogDomainCtxts(providedCookie, true);
- }
- catch(DirectoryException de)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- releaseCursor();
- throw de;
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- releaseCursor();
- throw new DirectoryException(
- ResultCode.OPERATIONS_ERROR,
- Message.raw(Category.SYNC,
- Severity.FATAL_ERROR,e.getLocalizedMessage()));
- }
- }
-
- /**
- * Finds in the {@link ChangeNumberIndexDB} the cookie corresponding to the
- * passed in startChangeNumber.
- *
- * @param startChangeNumber
- * the start change number coming from the request filter.
- * @return the cookie corresponding to the passed in startChangeNumber.
- * @throws ChangelogException
- * if a database problem occurred
- * @throws DirectoryException
- * if a database problem occurred
- */
- private String findCookie(final long startChangeNumber)
- throws ChangelogException, DirectoryException
- {
- final ChangeNumberIndexDB cnIndexDB =
- replicationServer.getChangeNumberIndexDB();
-
- if (startChangeNumber <= 1)
- {
- // Request filter DOES NOT contain any start change number
- // So we'll generate from the oldest change number in the CNIndexDB
- final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
- if (oldestRecord == null)
- { // DB is empty or closed
- isEndOfCNIndexDBReached = true;
- return null;
- }
-
- cnIndexDBCursor =
- getCursorFrom(cnIndexDB, oldestRecord.getChangeNumber());
- return oldestRecord.getPreviousCookie();
- }
-
- // Request filter DOES contain a startChangeNumber
-
- // Read the CNIndexDB to see whether it contains startChangeNumber
- DBCursor<ChangeNumberIndexRecord> cursor =
- cnIndexDB.getCursorFrom(startChangeNumber);
- if (cursor.next())
- {
- // found the provided startChangeNumber, let's return it
- cnIndexDBCursor = cursor;
- return cursor.getRecord().getPreviousCookie();
- }
- close(cursor);
-
- // startChangeNumber provided in the request IS NOT in the CNIndexDB
-
- /*
- * Get the changeNumberLimits (from the eligibleCSN obtained at the start of
- * this method) in order to have the oldest and newest change numbers.
- */
- final long[] limits = replicationServer.getECLChangeNumberLimits();
- final long oldestChangeNumber = limits[0];
- final long newestChangeNumber = limits[1];
-
- // If the startChangeNumber provided is lower than the oldestChangeNumber in
- // the DB, let's use the lower limit.
- if (startChangeNumber < oldestChangeNumber)
- {
- cursor = cnIndexDB.getCursorFrom(oldestChangeNumber);
- if (!cursor.next())
- {
- // This should not happen
- close(cursor);
- isEndOfCNIndexDBReached = true;
- return null;
- }
-
- cnIndexDBCursor = cursor;
- return cursor.getRecord().getPreviousCookie();
- }
- else if (startChangeNumber <= newestChangeNumber)
- {
- // startChangeNumber is between oldest and potential newest and has never
- // been returned yet
- final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord();
- if (newestRecord == null)
- {
- isEndOfCNIndexDBReached = true;
- return null;
- }
-
- cnIndexDBCursor =
- getCursorFrom(cnIndexDB, newestRecord.getChangeNumber());
- return newestRecord.getPreviousCookie();
-
- // TODO:ECL ... ok we'll start from the end of the CNIndexDB BUT ...
- // this may be very long. Work on perf improvement here.
- }
-
- // startChangeNumber is greater than the potential newest change number
- throw new DirectoryException(ResultCode.SUCCESS, Message.raw(""));
- }
-
- private DBCursor<ChangeNumberIndexRecord> getCursorFrom(
- ChangeNumberIndexDB cnIndexDB, long startChangeNumber)
- throws ChangelogException
- {
- DBCursor<ChangeNumberIndexRecord> cursor =
- cnIndexDB.getCursorFrom(startChangeNumber);
- cursor.next();
- if (cursor.getRecord() == null)
- {
- close(cursor);
- throw new ChangelogException(Message.raw("Change Number "
- + startChangeNumber + " is not available in the Changelog"));
- }
- return cursor;
- }
-
- /**
- * Initialize the context for each domain.
- * @param providedCookie the provided generalized state
- * @param allowUnknownDomains Provides all changes for domains not included
- * in the provided cookie.
- * @throws DirectoryException When an error occurs.
- */
- private void initializeChangelogDomainCtxts(String providedCookie,
- boolean allowUnknownDomains) throws DirectoryException
- {
- try
- {
- domainCtxts = buildDomainContexts(providedCookie, allowUnknownDomains);
- startCookie = providedCookie;
-
- // Initializes each and every domain with the next(first) eligible message
- // from the domain.
- for (DomainContext domainCtxt : domainCtxts) {
- domainCtxt.computeNextAvailableMessage();
-
- if (domainCtxt.nextMsg == null)
- {
- domainCtxt.active = false;
- }
- }
- }
- catch(DirectoryException de)
- {
- throw de;
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- // FIXME:ECL do not publish internal exception plumb to the client
- throw new DirectoryException(
- ResultCode.OPERATIONS_ERROR,
- Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " +
- e),
- e);
- }
- if (debugEnabled())
- {
- TRACER.debugInfo("initializeChangelogDomainCtxts() ends with "
- + dumpState());
- }
- }
-
- private Set<DomainContext> buildDomainContexts(String providedCookie,
- boolean allowUnknownDomains) throws DirectoryException
- {
- final Set<DomainContext> results = new HashSet<DomainContext>();
- final ReplicationServer rs = this.replicationServer;
-
- /*
- This map is initialized from the providedCookie.
- Below, it will be traversed and each domain configured with ECL will be
- checked and removed from the map.
- At the end, normally the map should be empty.
- Depending on allowUnknownDomains provided flag, a non empty map will
- be considered as an error when allowUnknownDomains is false.
- */
- final Map<DN, ServerState> startStatesFromProvidedCookie =
- MultiDomainServerState.splitGenStateToServerStates(providedCookie);
-
- final StringBuilder missingDomains = new StringBuilder();
- for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
- {
- // skip the 'unreal' changelog domain
- if (domain == this.replicationServerDomain)
- {
- continue;
- }
-
- // skip the excluded domains
- Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
- if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
- {
- // this is an excluded domain
- if (allowUnknownDomains)
- {
- startStatesFromProvidedCookie.remove(domain.getBaseDN());
- }
- continue;
- }
-
- // skip unused domains
- final ServerState latestState = domain.getLatestServerState();
- if (latestState.isEmpty())
- {
- continue;
- }
-
- // Creates the new domain context
- final DomainContext newDomainCtxt;
- final ServerState domainStartState =
- startStatesFromProvidedCookie.remove(domain.getBaseDN());
- if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY)
- {
- newDomainCtxt = newDomainContext(domain, latestState, null);
- }
- else
- {
- // let's take the start state for this domain from the provided cookie
- ServerState startState = domainStartState;
- if (providedCookie == null || providedCookie.length() == 0
- || allowUnknownDomains)
- {
- // when there is no cookie provided in the request,
- // let's start traversing this domain from the beginning of
- // what we have in the replication changelog
- if (startState == null)
- {
- startState =
- domain.getOldestState().duplicateOnlyOlderThan(
- domain.getLatestDomainTrimDate());
- }
- }
- else
- {
- // when there is a cookie provided in the request,
- if (startState == null)
- {
- missingDomains.append(domain.getBaseDN()).append(":;");
- continue;
- }
- else if (!startState.isEmpty()
- && hasCookieBeenTrimmedFromDB(domain, startState))
- {
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
- domain.getBaseDN().toNormalizedString()));
- }
- }
- newDomainCtxt = newDomainContext(domain, startState, latestState);
- }
-
- previousCookie.replace(newDomainCtxt.rsDomain.getBaseDN(),
- newDomainCtxt.startState.duplicate());
-
- results.add(newDomainCtxt);
- }
-
- if (missingDomains.length() > 0)
- {
- // If there are domain missing in the provided cookie,
- // the request is rejected and a full resync is required.
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
- missingDomains, "<" + providedCookie + missingDomains + ">"));
- }
-
- /*
- When it is valid to have the provided cookie containing unknown domains
- (allowUnknownDomains is true), 2 cases must be considered :
- - if the cookie contains a domain that is replicated but where
- ECL is disabled, then this case is considered above
- - if the cookie contains a domain that is even not replicated
- then this case need to be considered here in another loop.
- */
- if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
- {
- final Set<DN> providedDomains = startStatesFromProvidedCookie.keySet();
- for (Iterator<DN> iter = providedDomains.iterator(); iter.hasNext();)
- {
- DN providedDomain = iter.next();
- if (rs.getReplicationServerDomain(providedDomain) == null)
- {
- // the domain provided in the cookie is not replicated
- iter.remove();
- }
- }
- }
-
- // Now do the final checking
- if (!startStatesFromProvidedCookie.isEmpty())
- {
- /*
- After reading all the known domains from the provided cookie, there
- is one (or several) domain that are not currently configured.
- This domain has probably been removed or replication disabled on it.
- The request is rejected and full resync is required.
- */
- final StringBuilder sb = new StringBuilder();
- for (DomainContext domainCtxt : results) {
- sb.append(domainCtxt.rsDomain.getBaseDN()).append(":")
- .append(domainCtxt.startState).append(";");
- }
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
- startStatesFromProvidedCookie.toString(), sb.toString()));
- }
-
- return results;
- }
-
- private DomainContext newDomainContext(ReplicationServerDomain domain,
- ServerState startState, ServerState stopState) throws DirectoryException
- {
- // Create an unconnected MessageHandler for the domain
- MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
- mh.setInitialServerState(startState);
- mh.setBaseDNAndDomain(domain.getBaseDN(), false);
- // register the unconnected into the domain
- domain.registerHandler(mh);
-
- return new DomainContext(domain, startState, stopState, mh);
- }
-
- private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
- ServerState cookie)
- {
- /*
- when the provided startState is older than the replication changelogdb
- oldestState, it means that the replication changelog db has been trimmed and
- the cookie is not valid anymore.
- */
- for (CSN dbOldestChange : rsDomain.getOldestState())
- {
- CSN providedChange = cookie.getCSN(dbOldestChange.getServerId());
- if (providedChange != null && providedChange.isOlderThan(dbOldestChange))
- {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Shutdown this handler.
- */
- @Override
- public void shutdown()
- {
- if (debugEnabled())
- {
- TRACER.debugInfo(this + " shutdown()");
- }
- releaseCursor();
-
- if (domainCtxts != null)
- {
- for (DomainContext domainCtxt : domainCtxts)
- {
- if (!domainCtxt.unRegisterHandler())
- {
- logError(Message.raw(Category.SYNC, Severity.NOTICE, this
- + " shutdown() - error when unregistering handler "
- + domainCtxt.mh));
- }
- domainCtxt.stopServer();
- }
- domainCtxts = null;
- }
-
- super.shutdown();
- }
-
- private void releaseCursor()
- {
- if (this.cnIndexDBCursor != null)
- {
- this.cnIndexDBCursor.close();
- this.cnIndexDBCursor = null;
- }
- }
-
- /**
- * Request to shutdown the associated writer.
- */
- @Override
- protected void shutdownWriter()
- {
- shutdownWriter = true;
- if (writer!=null)
- {
- ECLServerWriter eclWriter = (ECLServerWriter)this.writer;
- eclWriter.shutdownWriter();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getMonitorInstanceName()
- {
- return "Connected External Changelog Server " + serverURL + " " + serverId
- + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
- }
-
- /** {@inheritDoc} */
- @Override
- public List<Attribute> getMonitorData()
- {
- // Get the generic ones
- List<Attribute> attributes = super.getMonitorData();
-
- // Add the specific RS ones
- attributes.add(Attributes.create("External-Changelog-Server", serverURL));
-
- // TODO:ECL No monitoring exist for ECL.
- return attributes;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- final String eclServer = "External changelog Server ";
- if (this.serverId != 0)
- {
- return eclServer + serverId + " " + serverURL + " "
- + getBaseDNString() + " " + getOperationId();
- }
- return eclServer + getClass().getCanonicalName() + " " + getOperationId();
- }
-
- /**
- * Gets the status of the connected DS.
- * @return The status of the connected DS.
- */
- @Override
- public ServerStatus getStatus()
- {
- // There is no other status possible for the ECL Server Handler to
- // be normally connected.
- return ServerStatus.NORMAL_STATUS;
- }
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isDataServer()
- {
- return true;
- }
-
- /**
- * Initialize the handler.
- * @param startECLSessionMsg The provided starting state.
- * @throws DirectoryException when a problem occurs.
- */
- private void initialize(StartECLSessionMsg startECLSessionMsg)
- throws DirectoryException
- {
- this.startECLSessionMsg = startECLSessionMsg;
-
- searchPhase = INIT_PHASE;
- final String cookie = startECLSessionMsg.getCrossDomainServerState();
- try
- {
- previousCookie = new MultiDomainServerState(cookie);
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
- ERR_INVALID_COOKIE_SYNTAX.get(cookie));
- }
-
- initializeChangelogSearch(startECLSessionMsg);
-
- if (session != null)
- {
- try
- {
- // Disable timeout for next communications
- // FIXME: why? and where is it reset?
- session.setSoTimeout(0);
- }
- catch(Exception e) { /* do nothing */ }
-
- // sendWindow MUST be created before starting the writer
- sendWindow = new Semaphore(sendWindowSize);
-
- reader = new ServerReader(session, this);
- reader.start();
-
- if (writer == null)
- {
- writer = new ECLServerWriter(session,this,replicationServerDomain);
- writer.start();
- }
-
- ((ECLServerWriter)writer).resumeWriter();
-
- // TODO:ECL Potential race condition if writer not yet resumed here
- }
-
- if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY)
- {
- closeInitPhase();
- }
-
- replicationServerDomain.registerHandler(this);
-
- if (debugEnabled())
- {
- TRACER.debugInfo(getClass().getSimpleName() + " " + getOperationId()
- + " initialized: " + " " + dumpState() + domaimCtxtsToString(""));
- }
- }
-
- private void initializeChangelogSearch(StartECLSessionMsg msg)
- throws DirectoryException
- {
- if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE)
- {
- initializeCLSearchFromCookie(msg.getCrossDomainServerState());
- }
- else if (msg.getECLRequestType() == REQUEST_TYPE_FROM_CHANGE_NUMBER)
- {
- initializeCLSearchFromChangeNumber(msg.getFirstChangeNumber());
- }
- }
-
- /**
- * Select the next update that must be sent to the server managed by this
- * ServerHandler.
- *
- * @return the next update that must be sent to the server managed by this
- * ServerHandler.
- * @exception DirectoryException when an error occurs.
- */
- public ECLUpdateMsg takeECLUpdate() throws DirectoryException
- {
- ECLUpdateMsg msg = getNextECLUpdate();
-
- // TODO:ECL We should refactor so that a SH always have a session
- if (session == null)
- {
- return msg;
- }
-
- boolean interrupted = true;
- boolean acquired = false;
- do
- {
- try
- {
- acquired = sendWindow.tryAcquire(500, TimeUnit.MILLISECONDS);
- interrupted = false;
- } catch (InterruptedException e)
- {
- // loop until not interrupted
- }
- } while ((interrupted || !acquired) && !shutdownWriter);
- if (msg != null)
- {
- incrementOutCount();
- }
- return msg;
- }
-
- /**
- * Get the next message - non blocking - null when none.
- * This method is currently not used but we don't want to keep the mother
- * class method since it make no sense for ECLServerHandler.
- * @param synchronous - not used
- * @return the next message
- */
- @Override
- protected UpdateMsg getNextMessage(boolean synchronous)
- {
- try
- {
- ECLUpdateMsg eclMsg = getNextECLUpdate();
- if (eclMsg != null)
- {
- return eclMsg.getUpdateMsg();
- }
- }
- catch(DirectoryException de)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- }
- return null;
- }
-
- /**
- * Returns the next update message for the External Changelog (ECL).
- * @return the ECL update message, null when there aren't anymore.
- * @throws DirectoryException when an error occurs.
- */
- public ECLUpdateMsg getNextECLUpdate() throws DirectoryException
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("In cn=changelog " + this +
- " getNextECLUpdate starts: " + dumpState());
- }
-
- ECLUpdateMsg oldestChange = null;
- try
- {
- // getMessage:
- // get the oldest msg:
- // after:
- // if stopState of domain is covered then domain is out candidate
- // until no domain candidate mean generalized stopState
- // has been reached
- // else
- // get one msg from that domain
- // no (null) msg returned: should not happen since we go to a state
- // that is computed/expected
-
- // Persistent:
- // ----------
- // step 1&2: same as non persistent
- //
- // step 3: reinit all domain are candidate
- // take the oldest
- // if one domain has no msg, still is candidate
-
- boolean continueLooping = true;
- while (continueLooping && searchPhase == INIT_PHASE)
- {
- // Step 1 & 2
- final DomainContext oldestContext = findDomainCtxtWithOldestChange();
- if (oldestContext == null)
- { // there is no oldest change to process
- closeInitPhase();
-
- // signals end of phase 1 to the caller
- return null;
- }
-
- final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
-
- // Default is not to loop, with one exception
- continueLooping = false;
- if (draftCompat)
- {
- continueLooping = !assignChangeNumber(change);
- // if we encounter a change that has been trimmed from the replicaDBs,
- // we will skip it and loop to the next oldest change from the
- // replicaDBs
- }
-
- // here we have the right oldest change
- // and in the draft case, we have its change number
-
- // Set and test the domain of the oldestChange see if we reached
- // the end of the phase for this domain
- oldestContext.currentState.update(change.getUpdateMsg().getCSN());
-
- if (oldestContext.currentState.cover(oldestContext.stopState)
- || isBeyondLastRequestedChangeNumber(change))
- {
- oldestContext.active = false;
- }
- if (oldestContext.active)
- {
- oldestContext.computeNextAvailableMessage();
- }
- oldestChange = change;
- }
-
- if (searchPhase == PERSISTENT_PHASE)
- {
- if (debugEnabled())
- TRACER.debugInfo(domaimCtxtsToString(
- "In getNextECLUpdate (persistent): "
- + "looking for the generalized oldest change"));
-
- for (DomainContext domainCtxt : domainCtxts)
- {
- domainCtxt.computeNextAvailableMessage();
- }
-
- final DomainContext oldestContext = findDomainCtxtWithOldestChange();
- if (oldestContext != null)
- {
- oldestChange = newECLUpdateMsg(oldestContext);
- oldestContext.currentState.update(
- oldestChange.getUpdateMsg().getCSN());
- }
- }
- }
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- throw new DirectoryException(
- ResultCode.OPERATIONS_ERROR,
- Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: "),
- e);
- }
-
- if (oldestChange != null)
- {
- final CSN csn = oldestChange.getUpdateMsg().getCSN();
- if (debugEnabled())
- {
- TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
- }
-
- previousCookie.update(oldestChange.getBaseDN(), csn);
- oldestChange.setCookie(previousCookie);
-
- if (debugEnabled())
- {
- TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
- + oldestChange);
- }
- }
- return oldestChange;
- }
-
- private boolean isBeyondLastRequestedChangeNumber(final ECLUpdateMsg change)
- {
- final long lastChangeNumber = startECLSessionMsg.getLastChangeNumber();
- return draftCompat
- && 0 < lastChangeNumber
- && lastChangeNumber < change.getChangeNumber();
- }
-
- private ECLUpdateMsg newECLUpdateMsg(DomainContext ctx)
- {
- // cookie will be set later AND changeNumber may be set later
- final ECLUpdateMsg change = new ECLUpdateMsg(
- (LDAPUpdateMsg) ctx.nextMsg, null, ctx.rsDomain.getBaseDN(), 0);
- ctx.nextMsg = null; // clean after use
- return change;
- }
-
- /**
- * Either retrieves a change number from the DB, or assign a new change number
- * and store in the DB.
- *
- * @param replicaDBChange
- * the replica DB change to find in the {@link ChangeNumberIndexDB}
- * where to assign the change number
- * @return <code>true</code> if a change number has been assigned to the
- * provided replicaDBChange, <code>false</code> otherwise
- * @throws DirectoryException
- * if any problem occur
- * @throws ChangelogException
- * if a database problem occurs.
- */
- private boolean assignChangeNumber(final ECLUpdateMsg replicaDBChange)
- throws ChangelogException, DirectoryException
- {
- // We also need to check if the CNIndexDB is consistent with the replicaDBs.
- // If not, 2 potential reasons:
- // a/ replicaDBs have been purged (trim) let's traverse the CNIndexDB
- // b/ changelog is late ... let's traverse the changelogDb
- // The following loop allows to loop until being on the same cn in the 2 dbs
-
- CSN csnFromReplicaDB = replicaDBChange.getUpdateMsg().getCSN();
- DN baseDNFromReplicaDB = replicaDBChange.getBaseDN();
-
- while (!isEndOfCNIndexDBReached)
- {
- final ChangeNumberIndexRecord currentRecord = cnIndexDBCursor.getRecord();
- final CSN csnFromCNIndexDB = currentRecord.getCSN();
- final DN baseDNFromCNIndexDB = currentRecord.getBaseDN();
-
- if (debugEnabled())
- {
- TRACER.debugInfo("assignChangeNumber() comparing the replicaDB's and"
- + " CNIndexDB's baseDNs :" + baseDNFromReplicaDB + "?="
- + baseDNFromCNIndexDB + " timestamps:" + asDate(csnFromReplicaDB)
- + " ?older" + asDate(csnFromCNIndexDB));
- }
-
- if (areSameChange(csnFromReplicaDB, baseDNFromReplicaDB,
- csnFromCNIndexDB, baseDNFromCNIndexDB))
- {
- // We matched the ReplicaDB change with a record in the CNIndexDB
- // => set the changeNumber in memory and return the change to the client
- if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
- + currentRecord.getChangeNumber() + " to change="
- + replicaDBChange);
-
- previousCookie.update(
- new MultiDomainServerState(currentRecord.getPreviousCookie()));
- replicaDBChange.setCookie(previousCookie);
- replicaDBChange.setChangeNumber(currentRecord.getChangeNumber());
- return true;
- }
-
- if (!csnFromCNIndexDB.isOlderThan(csnFromReplicaDB))
- {
- // the change from the replicaDB is older
- // it should have been stored lately
- // let's continue to traverse the replicaDBs
- if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() will skip " + csnFromReplicaDB
- + " and read next change from the regular changelog.");
- return false; // TO BE CHECKED
- }
-
- // The change from the CNIndexDB is older.
- // It means that the CNIndexDB change has been purged from the replicaDB
- // and CNIndexDB has not been trimmed yet.
- try
- {
- // keep traversing the CNIndexDB searching for the replicaDB change
- if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
- + " and read next change from the CNIndexDB.");
-
- isEndOfCNIndexDBReached = !cnIndexDBCursor.next();
-
- if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() has skipped to changeNumber="
- + currentRecord.getChangeNumber() + " csn="
- + currentRecord.getCSN() + " End of CNIndexDB ?"
- + isEndOfCNIndexDBReached);
- }
- catch (ChangelogException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- // FIXME There is an opportunity for an infinite loop here if the DB
- // continuously throws ChangelogExceptions
- }
- }
- return false;
- }
-
- private Date asDate(CSN csn)
- {
- return new Date(csn.getTime());
- }
-
- private boolean areSameChange(CSN csn1, DN baseDN1, CSN csn2, DN baseDN2)
- {
- boolean sameDN = baseDN1.compareTo(baseDN2) == 0;
- boolean sameCSN = csn1.compareTo(csn2) == 0;
- return sameDN && sameCSN;
- }
-
- /**
- * Terminates the first (non persistent) phase of the search on the ECL.
- */
- private void closeInitPhase()
- {
- // starvation of changelog messages
- // all domain have been unactived means are covered
- if (debugEnabled())
- {
- TRACER.debugInfo("In cn=changelog" + "," + this + " closeInitPhase(): "
- + dumpState());
- }
-
- // set all domains to be active again for the persistent phase
- for (DomainContext domainCtxt : domainCtxts) domainCtxt.active = true;
-
- if (startECLSessionMsg.getPersistent() != NON_PERSISTENT)
- {
- // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
- searchPhase = PERSISTENT_PHASE;
-
- if (writer ==null)
- {
- writer = new ECLServerWriter(session,this,replicationServerDomain);
- writer.start(); // start suspended
- }
- }
- else
- {
- // INIT_PHASE is done AND search is not persistent => re-init
- searchPhase = UNDEFINED_PHASE;
- }
-
- // End of INIT_PHASE => always release the cursor
- releaseCursor();
- }
-
- /**
- * Find the domainCtxt of the domain with the oldest change.
- *
- * @return the domainCtxt of the domain with the oldest change, null when
- * none.
- */
- private DomainContext findDomainCtxtWithOldestChange()
- {
- DomainContext oldestCtxt = null;
- for (DomainContext domainCtxt : domainCtxts)
- {
- if (domainCtxt.active
- // .nextMsg is null when the previous (non blocking) nextMessage did
- // not have any eligible msg to return
- && domainCtxt.nextMsg != null
- && (oldestCtxt == null
- || domainCtxt.nextMsg.compareTo(oldestCtxt.nextMsg) < 0))
- {
- oldestCtxt = domainCtxt;
- }
- }
-
- if (debugEnabled())
- TRACER.debugInfo("In cn=changelog," + this
- + " findDomainCtxtWithOldestChange() returns "
- + ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1"));
-
- return oldestCtxt;
- }
-
- /**
- * Returns the client operation id.
- * @return The client operation id.
- */
- public String getOperationId()
- {
- if (startECLSessionMsg != null)
- {
- return startECLSessionMsg.getOperationId();
- }
- return "";
- }
-
- /**
- * Returns whether the current search is a persistent search .
- *
- * @return true if the current search is a persistent search, false otherwise
- */
- boolean isNonPersistent()
- {
- return startECLSessionMsg.getPersistent() == NON_PERSISTENT;
- }
-
- /**
- * Returns whether the initialization phase has completed.
- *
- * @return true the initialization phase has completed, false otherwise
- */
- boolean isInitPhaseDone()
- {
- return this.searchPhase != INIT_PHASE;
- }
-
-}
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
deleted file mode 100644
index 5b50f6b..0000000
--- a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions copyright 2011-2014 ForgeRock AS
- */
-package org.opends.server.replication.server;
-
-import java.io.IOException;
-import java.net.SocketException;
-
-import org.opends.server.backends.ChangelogBackend;
-import org.opends.server.core.PersistentSearch;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.ECLUpdateMsg;
-import org.opends.server.replication.protocol.Session;
-import org.opends.server.replication.service.DSRSShutdownSync;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
-/**
- * This class defines a server writer, which is used to send changes to a
- * directory server.
- */
-class ECLServerWriter extends ServerWriter
-{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
- private final Session session;
- private final ECLServerHandler handler;
- private final ReplicationServerDomain replicationServerDomain;
- private boolean suspended;
- private volatile boolean shutdown;
- private final PersistentSearch mypsearch;
-
- /**
- * Create a ServerWriter.
- *
- * @param session the Session that will be used to send updates.
- * @param handler ECL handler for which the ServerWriter is created.
- * @param replicationServerDomain the ReplicationServerDomain of this
- * ServerWriter.
- */
- ECLServerWriter(Session session, ECLServerHandler handler,
- ReplicationServerDomain replicationServerDomain)
- {
- super(session, handler, replicationServerDomain, new DSRSShutdownSync());
-
- setName("Replication ECL Writer Thread for operation " +
- handler.getOperationId());
-
- this.session = session;
- this.handler = handler;
- this.replicationServerDomain = replicationServerDomain;
- this.suspended = false;
- this.shutdown = false;
- this.mypsearch = findPersistentSearch(handler);
- }
-
- /**
- * Look for the persistent search object related to this operation, the one
- * that will be notified with new entries to be returned.
- */
- private PersistentSearch findPersistentSearch(ECLServerHandler handler)
- {
- final ChangelogBackend backend = ChangelogBackend.getInstance();
- for (PersistentSearch psearch : backend.getPersistentSearches())
- {
- if (psearch.getSearchOperation().toString().equals(
- handler.getOperationId()))
- {
- return psearch;
- }
- }
- return null;
- }
-
- /**
- * The writer will start suspended by the Handler for the CL
- * waiting for the startCLSessionMsg. Then it may be
- * suspended between 2 jobs, each job being a separate search.
- */
- private synchronized void suspendWriter()
- {
- suspended = true;
- }
-
- /**
- * Resume the writer.
- */
- synchronized void resumeWriter()
- {
- suspended = false;
- notify();
- }
-
- /**
- * Run method for the ServerWriter.
- * Loops waiting for changes from the ReplicationServerDomain and
- * forward them to the other servers
- */
- @Override
- public void run()
- {
- try
- {
- while (true)
- {
- // wait to be resumed or shutdown
- if (suspended && !shutdown)
- {
- synchronized(this)
- {
- wait();
- }
- }
-
- if (shutdown)
- {
- return;
- }
-
- // Not suspended
- doIt();
-
- if (shutdown)
- {
- return;
- }
-
- suspendWriter();
- }
- }
- catch (SocketException e)
- {
- // Just ignore the exception and let the thread die as well.
- // session is always null if a socket exception has occurred.
- if (session != null)
- {
- logError(handler.getBadlyDisconnectedErrorMessage());
- }
- }
- catch (Exception e)
- {
- // An unexpected error happened.
- // Log an error and close the connection.
- logError(ERR_WRITER_UNEXPECTED_EXCEPTION.get(
- handler + " " + stackTraceToSingleLineString(e)));
- }
- finally
- {
- if (session != null)
- {
- session.close();
- }
- if (replicationServerDomain != null)
- {
- replicationServerDomain.stopServer(handler, false);
- }
- }
- }
-
- /**
- * Loop getting changes from the domain and publishing them either to
- * the provided session or to the ECL session interface.
- * @throws IOException when raised (connection closure)
- * @throws InterruptedException when raised
- */
- private void doIt() throws IOException, InterruptedException
- {
- while (!shutdown && !suspended)
- {
- final ECLUpdateMsg updateMsg = takeECLUpdate(handler);
- if (updateMsg == null)
- {
- if (session != null && handler.isInitPhaseDone())
- {
- // session is null in pusherOnly mode
- // Done is used to end phase 1
- session.publish(new DoneMsg(
- handler.getReplicationServerId(), handler.getServerId()));
- }
-
- if (handler.isNonPersistent())
- { // publishing is normally stopped here...
- break;
- }
-
- // ...except if we are in persistent search
- Thread.sleep(200);
- }
- else
- {
- // Publish the update to the remote server using a protocol version it supports
- publish(updateMsg);
- }
- }
- }
-
- private ECLUpdateMsg takeECLUpdate(ECLServerHandler handler)
- {
- try
- {
- return handler.takeECLUpdate();
- }
- catch(DirectoryException de)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- return null;
- }
- }
-
- /**
- * Shutdown the writer.
- */
- synchronized void shutdownWriter()
- {
- shutdown = true;
- notify();
- }
-
- /**
- * Publish a change either on the protocol session or to a persistent search.
- */
- private void publish(ECLUpdateMsg msg) throws IOException
- {
- if (debugEnabled())
- {
- TRACER.debugInfo(getName() + " publishes msg=[" + msg + "]");
- }
-
- if (session != null)
- {
- session.publish(msg);
- }
- else if (mypsearch != null)
- {
- try
- {
- // Using processAdd() because all ECLUpdateMsgs are adds to the external changelog
- // (even though the underlying changes can be adds, deletes, modifies or modDNs)
- Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg);
- mypsearch.processAdd(eclEntry);
- }
- catch (Exception e)
- {
- logError(ERR_WRITER_UNEXPECTED_EXCEPTION.get(
- handler + " " + stackTraceToSingleLineString(e)));
- mypsearch.cancel();
- }
- }
- }
-}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 47be53a..88ae89c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -268,12 +268,6 @@
session, queueSize, this, rcvWindow);
rsHandler.startFromRemoteRS((ReplServerStartMsg) msg);
}
- else if (msg instanceof ServerStartECLMsg)
- {
- ECLServerHandler eclHandler = new ECLServerHandler(
- session, queueSize, this, rcvWindow);
- eclHandler.startFromRemoteServer((ServerStartECLMsg) msg);
- }
else
{
// We did not recognize the message, close session as what
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6dd0fa4..67195ef 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,10 +27,16 @@
package org.opends.server.replication.server;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,12 +49,33 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.*;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.types.*;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
+import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -114,9 +141,6 @@
private final Map<Integer, ReplicationServerHandler> connectedRSs =
new ConcurrentHashMap<Integer, ReplicationServerHandler>();
- private final Queue<MessageHandler> otherHandlers =
- new ConcurrentLinkedQueue<MessageHandler>();
-
private final ReplicationDomainDB domainDB;
/** The ReplicationServer that created the current instance. */
private final ReplicationServer localReplicationServer;
@@ -368,11 +392,6 @@
addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
}
}
-
- // Push the message to the other subscribing handlers
- for (MessageHandler mHandler : otherHandlers) {
- mHandler.add(updateMsg);
- }
}
private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
@@ -1097,10 +1116,6 @@
{
unregisterServerHandler(sHandler, shutdown, true);
}
- else if (otherHandlers.contains(sHandler))
- {
- unregisterOtherHandler(sHandler);
- }
}
catch(Exception e)
{
@@ -1117,12 +1132,6 @@
}
}
- private void unregisterOtherHandler(MessageHandler mHandler)
- {
- unRegisterHandler(mHandler);
- mHandler.shutdown();
- }
-
private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
boolean isDirectoryServer)
{
@@ -1149,60 +1158,6 @@
}
/**
- * Stop the handler.
- * @param mHandler The handler to stop.
- */
- public void stopServer(MessageHandler mHandler)
- {
- // TODO JNR merge with stopServer(ServerHandler, boolean)
- if (debugEnabled())
- {
- debug("stopServer() on the message handler " + mHandler);
- }
- /*
- * We must prevent deadlock on replication server domain lock, when for
- * instance this code is called from dying ServerReader but also dying
- * ServerWriter at the same time, or from a thread that wants to shut down
- * the handler. So use a thread safe flag to know if the job must be done
- * or not (is already being processed or not).
- */
- if (!mHandler.engageShutdown())
- // Only do this once (prevent other thread to enter here again)
- {
- try
- {
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- }
- catch (InterruptedException ex)
- {
- // We can't deal with this here, so re-interrupt thread so that it is
- // caught during subsequent IO.
- Thread.currentThread().interrupt();
- return;
- }
-
- try
- {
- if (otherHandlers.contains(mHandler))
- {
- unregisterOtherHandler(mHandler);
- }
- }
- catch(Exception e)
- {
- logError(Message.raw(Category.SYNC, Severity.NOTICE,
- stackTraceToSingleLineString(e)));
- }
- finally
- {
- release();
- }
- }
- }
-
- /**
* Unregister this handler from the list of handlers registered to this
* domain.
* @param sHandler the provided handler to unregister.
@@ -2427,39 +2382,6 @@
return attributes;
}
- /**
- * Register in the domain an handler that subscribes to changes.
- * @param mHandler the provided subscribing handler.
- */
- public void registerHandler(MessageHandler mHandler)
- {
- this.otherHandlers.add(mHandler);
- }
-
- /**
- * Unregister from the domain an handler.
- * @param mHandler the provided unsubscribing handler.
- * @return Whether this handler has been unregistered with success.
- */
- public boolean unRegisterHandler(MessageHandler mHandler)
- {
- return this.otherHandlers.remove(mHandler);
- }
-
- /**
- * Returns the oldest known state for the domain, made of the oldest CSN
- * stored for each serverId.
- * <p>
- * Note: Because the replication changelogDB trimming always keep one change
- * whatever its date, the CSN contained in the returned state can be very old.
- *
- * @return the start state of the domain.
- */
- public ServerState getOldestState()
- {
- return domainDB.getDomainOldestCSNs(baseDN);
- }
-
private void sendTopologyMsg(String type, ServerHandler handler,
TopologyMsg msg)
{
@@ -2524,18 +2446,6 @@
}
}
-
-
- /**
- * Get the latest (more recent) trim date of the changelog dbs associated
- * to this domain.
- * @return The latest trim date.
- */
- public long getLatestDomainTrimDate()
- {
- return domainDB.getDomainLatestTrimDate(baseDN);
- }
-
/**
* Return the monitor instance name of the ReplicationServer that created the
* current instance.
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 63718bf..58009ad 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -114,12 +114,12 @@
/**
* The associated ServerWriter that sends messages to the remote server.
*/
- protected ServerWriter writer;
+ private ServerWriter writer;
/**
* The associated ServerReader that receives messages from the remote server.
*/
- protected ServerReader reader;
+ private ServerReader reader;
// window
private int rcvWindow;
@@ -132,11 +132,11 @@
/**
* Semaphore that the writer uses to control the flow to the remote server.
*/
- protected Semaphore sendWindow;
+ private Semaphore sendWindow;
/**
* The initial size of the sending window.
*/
- protected int sendWindowSize;
+ private int sendWindowSize;
/**
* remote generation id.
*/
@@ -161,7 +161,7 @@
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
- protected long heartbeatInterval = 0;
+ protected long heartbeatInterval;
/**
* The thread that will send heartbeats.
@@ -171,7 +171,7 @@
/**
* Set when ServerWriter is stopping.
*/
- protected volatile boolean shutdownWriter = false;
+ private volatile boolean shutdownWriter;
/**
* Weight of this remote server.
@@ -539,22 +539,16 @@
}
// Window stats
- attributes.add(Attributes.create("max-send-window", String
- .valueOf(sendWindowSize)));
- attributes.add(Attributes.create("current-send-window", String
- .valueOf(sendWindow.availablePermits())));
- attributes.add(Attributes.create("max-rcv-window", String
- .valueOf(maxRcvWindow)));
- attributes.add(Attributes.create("current-rcv-window", String
- .valueOf(rcvWindow)));
+ attributes.add(Attributes.create("max-send-window", String.valueOf(sendWindowSize)));
+ attributes.add(Attributes.create("current-send-window", String.valueOf(sendWindow.availablePermits())));
+ attributes.add(Attributes.create("max-rcv-window", String.valueOf(maxRcvWindow)));
+ attributes.add(Attributes.create("current-rcv-window", String.valueOf(rcvWindow)));
// Encryption
- attributes.add(Attributes.create("ssl-encryption", String
- .valueOf(session.isEncrypted())));
+ attributes.add(Attributes.create("ssl-encryption", String.valueOf(session.isEncrypted())));
// Data generation
- attributes.add(Attributes.create("generation-id", String
- .valueOf(generationId)));
+ attributes.add(Attributes.create("generation-id", String.valueOf(generationId)));
return attributes;
}
@@ -919,20 +913,12 @@
}
/**
- * Requests to shutdown the writer.
- */
- protected void shutdownWriter()
- {
- shutdownWriter = true;
- }
-
- /**
* Shutdown This ServerHandler.
*/
@Override
public void shutdown()
{
- shutdownWriter();
+ shutdownWriter = true;
setConsumerActive(false);
super.shutdown();
@@ -1137,21 +1123,6 @@
}
/**
- * Log the messages involved in the Topology/StartSession handshake.
- * @param inStartECLSessionMsg The message received first.
- */
- protected void logStartECLSessionHandshake(
- StartECLSessionMsg inStartECLSessionMsg)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
- + ", " + getClass().getSimpleName() + " " + this + " :"
- + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
- }
- }
-
- /**
* Process a Ack message received.
* @param ack the message received.
*/
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 8970417..32f9fba 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -43,18 +43,6 @@
{
/**
- * Returns the oldest {@link CSN}s from the replicaDBs for each serverId in
- * the specified replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- * @return a new ServerState object holding the {serverId => oldest CSN}
- * mapping. If a replica DB is empty or closed, the oldest CSN will be
- * null for that replica. The caller owns the generated ServerState.
- */
- ServerState getDomainOldestCSNs(DN baseDN);
-
- /**
* Returns the newest {@link CSN}s from the replicaDBs for each serverId in
* the specified replication domain.
*
@@ -67,18 +55,6 @@
ServerState getDomainNewestCSNs(DN baseDN);
/**
- * Retrieves the latest trim date for the specified replication domain.
- * <p>
- * FIXME will be removed when ECLServerHandler will not be responsible anymore
- * for lazily building the ChangeNumberIndexDB.
- *
- * @param baseDN
- * the replication domain baseDN
- * @return the domain latest trim date
- */
- long getDomainLatestTrimDate(DN baseDN);
-
- /**
* Removes all the data relating to the specified replication domain and
* shutdown all its replica databases. In particular, it will:
* <ol>
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index aea82ec..6458bac 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -120,13 +120,12 @@
*/
private long purgeDelayInMillis;
private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
- private volatile long latestPurgeDate;
/** The local replication server. */
private final ReplicationServer replicationServer;
private final AtomicBoolean shutdown = new AtomicBoolean();
- static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
+ private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
/**
@@ -486,18 +485,6 @@
/** {@inheritDoc} */
@Override
- public ServerState getDomainOldestCSNs(DN baseDN)
- {
- final ServerState result = new ServerState();
- for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
- {
- result.update(replicaDB.getOldestCSN());
- }
- return result;
- }
-
- /** {@inheritDoc} */
- @Override
public ServerState getDomainNewestCSNs(DN baseDN)
{
final ServerState result = new ServerState();
@@ -622,13 +609,6 @@
/** {@inheritDoc} */
@Override
- public long getDomainLatestTrimDate(final DN baseDN)
- {
- return latestPurgeDate;
- }
-
- /** {@inheritDoc} */
- @Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
synchronized (cnIndexDBLock)
@@ -927,8 +907,6 @@
}
}
- latestPurgeDate = purgeTimestamp;
-
jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
}
catch (InterruptedException e)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 15d29b4..5f5ae2a 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -115,7 +115,6 @@
*/
private long purgeDelayInMillis;
private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
- private volatile long latestPurgeDate;
/** The local replication server. */
private final ReplicationServer replicationServer;
@@ -513,18 +512,6 @@
/** {@inheritDoc} */
@Override
- public ServerState getDomainOldestCSNs(DN baseDN)
- {
- final ServerState result = new ServerState();
- for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
- {
- result.update(replicaDB.getOldestCSN());
- }
- return result;
- }
-
- /** {@inheritDoc} */
- @Override
public ServerState getDomainNewestCSNs(DN baseDN)
{
final ServerState result = new ServerState();
@@ -671,13 +658,6 @@
/** {@inheritDoc} */
@Override
- public long getDomainLatestTrimDate(final DN baseDN)
- {
- return latestPurgeDate;
- }
-
- /** {@inheritDoc} */
- @Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
synchronized (cnIndexDBLock)
@@ -976,8 +956,6 @@
}
}
- latestPurgeDate = purgeTimestamp;
-
jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
}
catch (InterruptedException e)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index d1b0187..3f85b22 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -53,7 +53,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.HostPort;
-import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -289,7 +288,7 @@
{
shutdown = false;
this.rcvWindow = getMaxRcvWindow();
- connect();
+ connectAsDataServer();
}
}
@@ -702,19 +701,6 @@
}
}
- private void connect()
- {
- if (getBaseDN().toNormalizedString().equalsIgnoreCase(
- ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
- {
- connectAsECL();
- }
- else
- {
- connectAsDataServer();
- }
- }
-
/**
* Contacts all replication servers to get information from them and being
* able to choose the more suitable.
@@ -728,7 +714,7 @@
for (String serverUrl : getReplicationServerUrls())
{
// Connect to server + get and store info about it
- final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
+ final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
final ReplicationServerInfo rsInfo = rs.rsInfo;
if (rsInfo != null)
{
@@ -740,35 +726,6 @@
}
/**
- * Special aspects of connecting as ECL (External Change Log) compared to
- * connecting as data server are :
- * <ul>
- * <li>1 single RS configured</li>
- * <li>so no choice of the preferred RS</li>
- * <li>?? Heartbeat</li>
- * <li>Start handshake is :
- *
- * <pre>
- * Broker ---> StartECLMsg ---> RS
- * <---- ReplServerStartMsg ---
- * ---> StartSessionECLMsg --> RS
- * </pre>
- *
- * </li>
- * </ul>
- */
- private void connectAsECL()
- {
- // FIXME:ECL List of RS to connect is for now limited to one RS only
- final String bestServerURL = getReplicationServerUrls().iterator().next();
- final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
- if (rs.isConnected())
- {
- performECLPhaseTwoHandshake(bestServerURL, rs);
- }
- }
-
- /**
* Connect to a ReplicationServer.
*
* Handshake sequences between a DS and a RS is divided into 2 logical
@@ -844,7 +801,7 @@
+ evals.getBestRS());
final ConnectedRS electedRS = performPhaseOneHandshake(
- evals.getBestRS().getServerURL(), true, false);
+ evals.getBestRS().getServerURL(), true);
final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
if (electedRsInfo != null)
{
@@ -1116,12 +1073,9 @@
* Do we keep session opened or not after handshake. Use true if want
* to perform handshake phase 2 with the same session and keep the
* session to create as the current one.
- * @param isECL
- * Indicates whether or not the an ECL handshake is to be performed.
* @return The answer from the server . Null if could not get an answer.
*/
- private ConnectedRS performPhaseOneHandshake(String serverURL,
- boolean keepSession, boolean isECL)
+ private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
{
Session newSession = null;
Socket socket = null;
@@ -1135,8 +1089,7 @@
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
- socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
- timeoutMS);
+ socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
boolean isSslEncryption = replSessionSecurity.isSslEncryption();
@@ -1144,19 +1097,9 @@
final HostPort hp = new HostPort(
socket.getLocalAddress().getHostName(), socket.getLocalPort());
final String url = hp.toString();
- final StartMsg serverStartMsg;
- if (!isECL)
- {
- serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
- getMaxRcvWindow(), config.getHeartbeatInterval(), state,
- getGenerationID(), isSslEncryption, getGroupId());
- }
- else
- {
- serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
- getMaxRcvWindow(), config.getHeartbeatInterval(), state,
- getGenerationID(), isSslEncryption, getGroupId());
- }
+ final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
+ getMaxRcvWindow(), config.getHeartbeatInterval(), state,
+ getGenerationID(), isSslEncryption, getGroupId());
newSession.publish(serverStartMsg);
// Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1247,45 +1190,6 @@
return setConnectedRS(ConnectedRS.noConnectedRS());
}
-
-
- /**
- * Performs the second phase handshake for External Change Log (send
- * StartSessionMsg and receive TopologyMsg messages exchange) and return the
- * reply message from the replication server.
- *
- * @param server Server we are connecting with.
- */
- private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
- {
- try
- {
- // Send our Start Session
- final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
- startECLSessionMsg.setOperationId("-1");
- rs.session.publish(startECLSessionMsg);
-
- // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
- if (debugEnabled())
- {
- debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
- }
-
- // Alright set the timeout to the desired value
- rs.session.setSoTimeout(timeout);
- setConnectedRS(rs);
- }
- catch (Exception e)
- {
- logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
- getServerId(), server, getBaseDN().toNormalizedString(),
- stackTraceToSingleLineString(e)));
-
- rs.session.close();
- setConnectedRS(ConnectedRS.noConnectedRS());
- }
- }
-
/**
* Performs the second phase handshake (send StartSessionMsg and receive
* TopologyMsg messages exchange) and return the reply message from the
@@ -2288,7 +2192,7 @@
try
{
- connect();
+ connectAsDataServer();
rs = connectedRS.get();
}
catch (Exception e)
@@ -3103,7 +3007,8 @@
Map<Integer, ReplicationServerInfo> previousRsInfos)
{
this.rsServerId = rsServerId;
- this.replicaInfos = dsInfosToKeep;
+ this.replicaInfos = dsInfosToKeep == null
+ ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
previousRsInfos, configuredReplicationServerUrls);
}
@@ -3310,7 +3215,9 @@
@Override
public String toString()
{
- return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
+ return getClass().getSimpleName()
+ + " rsServerId=" + rsServerId
+ + ", replicaInfos=" + replicaInfos.values()
+ ", rsInfos=" + rsInfos.values();
}
}
diff --git a/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java b/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
deleted file mode 100644
index d0997db..0000000
--- a/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
+++ /dev/null
@@ -1,1267 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2010-2014 ForgeRock AS
- */
-package org.opends.server.workflowelement.externalchangelog;
-
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-import org.opends.messages.Category;
-import org.opends.messages.Message;
-import org.opends.messages.Severity;
-import org.opends.server.api.ClientConnection;
-import org.opends.server.api.plugin.PluginResult;
-import org.opends.server.backends.ChangelogBackend;
-import org.opends.server.config.ConfigConstants;
-import org.opends.server.controls.*;
-import org.opends.server.core.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.plugin.MultimasterReplication;
-import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.ECLServerHandler;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.types.*;
-import org.opends.server.types.operation.PostOperationSearchOperation;
-import org.opends.server.types.operation.PreOperationSearchOperation;
-import org.opends.server.types.operation.SearchEntrySearchOperation;
-import org.opends.server.types.operation.SearchReferenceSearchOperation;
-import org.opends.server.util.ServerConstants;
-import org.opends.server.util.TimeThread;
-
-import static org.opends.messages.CoreMessages.*;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.backends.ChangelogBackend.*;
-import static org.opends.server.config.ConfigConstants.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
-import static org.opends.server.replication.protocol.StartECLSessionMsg.Persistent.*;
-import static org.opends.server.util.LDIFWriter.*;
-import static org.opends.server.util.ServerConstants.*;
-import static org.opends.server.util.StaticUtils.*;
-
-/**
- * This class defines an operation used to search for entries in a local backend
- * of the Directory Server.
- */
-public class ECLSearchOperation
- extends SearchOperationWrapper
- implements PreOperationSearchOperation, PostOperationSearchOperation,
- SearchEntrySearchOperation, SearchReferenceSearchOperation
-{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
- /** The set of supported controls for this WE. */
- private static final Set<String> CHANGELOG_SUPPORTED_CONTROLS =
- new HashSet<String>(Arrays.asList(
- ServerConstants.OID_SERVER_SIDE_SORT_REQUEST_CONTROL,
- ServerConstants.OID_VLV_REQUEST_CONTROL));
-
- /** The set of objectclasses that will be used in ECL root entry. */
- private static final Map<ObjectClass, String>
- CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
- static
- {
- ObjectClass topOC = DirectoryServer.getObjectClass(OC_TOP, true);
- CHANGELOG_ROOT_OBJECT_CLASSES.put(topOC, OC_TOP);
-
- ObjectClass containerOC = DirectoryServer.getObjectClass("container", true);
- CHANGELOG_ROOT_OBJECT_CLASSES.put(containerOC, "container");
- }
-
- /** The set of objectclasses that will be used in ECL entries. */
- private static final Map<ObjectClass, String>
- CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
- static
- {
- ObjectClass topOC = DirectoryServer.getObjectClass(OC_TOP, true);
- CHANGELOG_ENTRY_OBJECT_CLASSES.put(topOC, OC_TOP);
-
- ObjectClass eclEntryOC = DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY,
- true);
- CHANGELOG_ENTRY_OBJECT_CLASSES.put(eclEntryOC, OC_CHANGELOG_ENTRY);
- }
-
-
- /** The attribute type for the "creatorsName" attribute. */
- private static final AttributeType CREATORS_NAME_TYPE =
- DirectoryConfig.getAttributeType(OP_ATTR_CREATORS_NAME_LC, true);
-
- /** The attribute type for the "modifiersName" attribute. */
- private static final AttributeType MODIFIERS_NAME_TYPE =
- DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
-
- /**
- * The replication server in which the search on ECL is to be performed.
- */
- private ReplicationServer replicationServer;
-
- /**
- * The client connection for the search operation.
- */
- private ClientConnection clientConnection;
-
- /**
- * The base DN for the search.
- */
- private DN baseDN;
-
- /**
- * The persistent search request, if applicable.
- */
- private PersistentSearch persistentSearch;
-
- /**
- * The filter for the search.
- */
- private SearchFilter filter;
-
- private ECLServerHandler eclServerHandler;
-
- /**
- * A flag to know if the ECLControl has been requested.
- */
- private boolean returnECLControl = false;
-
- /**
- * Creates a new operation that may be used to search for entries in a local
- * backend of the Directory Server.
- *
- * @param search The operation to process.
- */
- ECLSearchOperation(SearchOperation search)
- {
- super(search);
-
- ECLWorkflowElement.attachLocalOperation(search, this);
- }
-
-
-
- /**
- * Process this search operation against a local backend.
- *
- * @param wfe
- * The local backend work-flow element.
- * @throws CanceledOperationException
- * if this operation should be canceled
- */
- void processECLSearch(ECLWorkflowElement wfe)
- throws CanceledOperationException
- {
- boolean executePostOpPlugins = false;
-
- // Get the plugin config manager that will be used for invoking plugins.
- PluginConfigManager pluginConfigManager =
- DirectoryServer.getPluginConfigManager();
-
- // Check for a request to cancel this operation.
- checkIfCanceled(false);
-
- searchProcessing:
- {
- replicationServer = wfe.getReplicationServer();
- clientConnection = getClientConnection();
-
- if (!clientConnection.hasPrivilege(Privilege.CHANGELOG_READ, this))
- {
- setResultCode(ResultCode.INSUFFICIENT_ACCESS_RIGHTS);
- appendErrorMessage(NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
- return;
- }
-
- final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
-
- // Set default behavior as "from change number".
- // "from cookie" is set only when cookie is provided.
- startECLSessionMsg.setECLRequestType(REQUEST_TYPE_FROM_CHANGE_NUMBER);
-
- // Set a string operationId that will help correlate any error message
- // logged for this operation with the 'real' client operation.
- startECLSessionMsg.setOperationId(toString());
-
- // Set a list of excluded domains (also exclude 'cn=changelog' itself)
- Set<String> excludedDomains =
- MultimasterReplication.getECLDisabledDomains();
- excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
- startECLSessionMsg.setExcludedDNs(excludedDomains);
-
- // Process the search base and filter to convert them from their raw forms
- // as provided by the client to the forms required for the rest of the
- // search processing.
- baseDN = getBaseDN();
- filter = getFilter();
- if (baseDN == null || filter == null)
- {
- break searchProcessing;
- }
-
- // Test existence of the RS - normally should always be here
- if (replicationServer == null)
- {
- setResultCode(ResultCode.OPERATIONS_ERROR);
- appendErrorMessage(ERR_SEARCH_BASE_DOESNT_EXIST.get(
- String.valueOf(baseDN)));
- break searchProcessing;
- }
-
- // Analyse controls - including the cookie control
- try
- {
- handleRequestControls(startECLSessionMsg);
- }
- catch (DirectoryException de)
- {
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- setResponseData(de);
- break searchProcessing;
- }
-
- // Process search parameters to optimize session query.
- try
- {
- evaluateSearchParameters(startECLSessionMsg, baseDN, filter);
- }
- catch (DirectoryException de)
- {
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- setResponseData(de);
- break searchProcessing;
- }
-
- // Check for a request to cancel this operation.
- checkIfCanceled(false);
-
- // Invoke the pre-operation search plugins.
- executePostOpPlugins = true;
- PluginResult.PreOperation preOpResult =
- pluginConfigManager.invokePreOperationSearchPlugins(this);
- if (!preOpResult.continueProcessing())
- {
- setResultCode(preOpResult.getResultCode());
- appendErrorMessage(preOpResult.getErrorMessage());
- setMatchedDN(preOpResult.getMatchedDN());
- setReferralURLs(preOpResult.getReferralURLs());
- break searchProcessing;
- }
-
- // Check for a request to cancel this operation.
- checkIfCanceled(false);
-
- // Be optimistic by default.
- setResultCode(ResultCode.SUCCESS);
-
- // If there's a persistent search, then register it with the server.
- if (persistentSearch != null)
- {
- ChangelogBackend.getInstance().registerPersistentSearch(persistentSearch);
- // TODO JNR Add callback on cancel,
- // see ECLWorkflowElement.registerPersistentSearch().
- // This will be removed very soon anyway.
- persistentSearch.enable();
- }
-
- // Process the search.
- try
- {
- processSearch(startECLSessionMsg);
- }
- catch (DirectoryException de)
- {
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
-
- setResponseData(de);
-
- if (persistentSearch != null)
- {
- persistentSearch.cancel();
- setSendResponse(true);
- }
- break searchProcessing;
- }
- catch (CanceledOperationException coe)
- {
- if (persistentSearch != null)
- {
- persistentSearch.cancel();
- setSendResponse(true);
- }
- shutdownECLServerHandler();
- throw coe;
- }
- catch (Exception e)
- {
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
-
- setResultCode(DirectoryServer.getServerErrorResultCode());
- appendErrorMessage(ERR_SEARCH_BACKEND_EXCEPTION.get(
- getExceptionMessage(e)));
- if (persistentSearch != null)
- {
- persistentSearch.cancel();
- setSendResponse(true);
- }
- break searchProcessing;
- }
- }
-
- // Check for a request to cancel this operation.
- checkIfCanceled(false);
-
- // Invoke the post-operation search plugins.
- if (executePostOpPlugins)
- {
- PluginResult.PostOperation postOpResult =
- pluginConfigManager.invokePostOperationSearchPlugins(this);
- if (!postOpResult.continueProcessing())
- {
- setResultCode(postOpResult.getResultCode());
- appendErrorMessage(postOpResult.getErrorMessage());
- setMatchedDN(postOpResult.getMatchedDN());
- setReferralURLs(postOpResult.getReferralURLs());
- }
- }
- }
-
-
- /**
- * Handles any controls contained in the request - including the cookie ctrl.
- *
- * @throws DirectoryException If there is a problem with any of the request
- * controls.
- */
- private void handleRequestControls(StartECLSessionMsg startECLSessionMsg)
- throws DirectoryException
- {
- List<Control> requestControls = getRequestControls();
- if (requestControls != null && !requestControls.isEmpty())
- {
- for (Iterator<Control> iter = requestControls.iterator(); iter.hasNext();)
- {
- final Control c = iter.next();
- final String oid = c.getOID();
-
- if (!AccessControlConfigManager.getInstance().getAccessControlHandler()
- .isAllowed(baseDN, this, c))
- {
- // As per RFC 4511 4.1.11.
- if (c.isCritical())
- {
- throw new DirectoryException(
- ResultCode.UNAVAILABLE_CRITICAL_EXTENSION,
- ERR_CONTROL_INSUFFICIENT_ACCESS_RIGHTS.get(oid));
- }
- // We don't want to process this non-critical control, so remove it.
- iter.remove();
- continue;
- }
-
- if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(oid))
- {
- ExternalChangelogRequestControl eclControl =
- getRequestControl(ExternalChangelogRequestControl.DECODER);
- MultiDomainServerState cookie = eclControl.getCookie();
- returnECLControl = true;
- if (cookie != null)
- {
- startECLSessionMsg.setECLRequestType(REQUEST_TYPE_FROM_COOKIE);
- startECLSessionMsg.setCrossDomainServerState(cookie.toString());
- }
- }
- else if (OID_LDAP_ASSERTION.equals(oid))
- {
- LDAPAssertionRequestControl assertControl =
- getRequestControl(LDAPAssertionRequestControl.DECODER);
-
- try
- {
- // FIXME -- We need to determine whether the current user has
- // permission to make this determination.
- SearchFilter assertionFilter = assertControl.getSearchFilter();
- Entry entry;
- try
- {
- // FIXME: this is broken (recursive)?
- entry = DirectoryServer.getEntry(baseDN);
- }
- catch (DirectoryException de)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- }
-
- throw new DirectoryException(de.getResultCode(),
- ERR_SEARCH_CANNOT_GET_ENTRY_FOR_ASSERTION.get(
- de.getMessageObject()));
- }
-
- if (entry == null)
- {
- throw new DirectoryException(ResultCode.NO_SUCH_OBJECT,
- ERR_SEARCH_NO_SUCH_ENTRY_FOR_ASSERTION.get());
- }
-
- if (! assertionFilter.matchesEntry(entry))
- {
- throw new DirectoryException(ResultCode.ASSERTION_FAILED,
- ERR_SEARCH_ASSERTION_FAILED.get());
- }
- }
- catch (DirectoryException de)
- {
- if (de.getResultCode() == ResultCode.ASSERTION_FAILED)
- {
- throw de;
- }
-
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- }
-
- throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
- ERR_SEARCH_CANNOT_PROCESS_ASSERTION_FILTER.get(
- de.getMessageObject()), de);
- }
- }
- else if (OID_PROXIED_AUTH_V1.equals(oid))
- {
- // Log usage of legacy proxy authz V1 control.
- addAdditionalLogItem(AdditionalLogItem.keyOnly(getClass(),
- "obsoleteProxiedAuthzV1Control"));
-
- // The requester must have the PROXIED_AUTH privilege in order to be
- // able to use this control.
- if (! clientConnection.hasPrivilege(Privilege.PROXIED_AUTH, this))
- {
- throw new DirectoryException(ResultCode.AUTHORIZATION_DENIED,
- ERR_PROXYAUTH_INSUFFICIENT_PRIVILEGES.get());
- }
-
- ProxiedAuthV1Control proxyControl =
- getRequestControl(ProxiedAuthV1Control.DECODER);
-
- Entry authorizationEntry = proxyControl.getAuthorizationEntry();
- setAuthorizationEntry(authorizationEntry);
- if (authorizationEntry == null)
- {
- setProxiedAuthorizationDN(DN.nullDN());
- }
- else
- {
- setProxiedAuthorizationDN(authorizationEntry.getDN());
- }
- }
- else if (OID_PROXIED_AUTH_V2.equals(oid))
- {
- // The requester must have the PROXIED_AUTH privilege in order to be
- // able to use this control.
- if (! clientConnection.hasPrivilege(Privilege.PROXIED_AUTH, this))
- {
- throw new DirectoryException(ResultCode.AUTHORIZATION_DENIED,
- ERR_PROXYAUTH_INSUFFICIENT_PRIVILEGES.get());
- }
-
- ProxiedAuthV2Control proxyControl =
- getRequestControl(ProxiedAuthV2Control.DECODER);
-
- Entry authorizationEntry = proxyControl.getAuthorizationEntry();
- setAuthorizationEntry(authorizationEntry);
- if (authorizationEntry == null)
- {
- setProxiedAuthorizationDN(DN.nullDN());
- }
- else
- {
- setProxiedAuthorizationDN(authorizationEntry.getDN());
- }
- }
- else if (OID_PERSISTENT_SEARCH.equals(oid))
- {
- PersistentSearchControl psearchControl =
- getRequestControl(PersistentSearchControl.DECODER);
-
- persistentSearch = new PersistentSearch(this,
- psearchControl.getChangeTypes(),
- psearchControl.getChangesOnly(),
- psearchControl.getReturnECs());
-
- // If we're only interested in changes, then we don't actually want
- // to process the search now.
- if (psearchControl.getChangesOnly())
- startECLSessionMsg.setPersistent(PERSISTENT_CHANGES_ONLY);
- else
- startECLSessionMsg.setPersistent(PERSISTENT);
- }
- else if (OID_LDAP_SUBENTRIES.equals(oid))
- {
- SubentriesControl subentriesControl =
- getRequestControl(SubentriesControl.DECODER);
- setReturnSubentriesOnly(subentriesControl.getVisibility());
- }
- else if (OID_LDUP_SUBENTRIES.equals(oid))
- {
- // Support for legacy draft-ietf-ldup-subentry.
- addAdditionalLogItem(AdditionalLogItem.keyOnly(getClass(),
- "obsoleteSubentryControl"));
-
- setReturnSubentriesOnly(true);
- }
- else if (OID_MATCHED_VALUES.equals(oid))
- {
- MatchedValuesControl matchedValuesControl =
- getRequestControl(MatchedValuesControl.DECODER);
- setMatchedValuesControl(matchedValuesControl);
- }
- else if (OID_ACCOUNT_USABLE_CONTROL.equals(oid))
- {
- setIncludeUsableControl(true);
- }
- else if (OID_REAL_ATTRS_ONLY.equals(oid))
- {
- setRealAttributesOnly(true);
- }
- else if (OID_VIRTUAL_ATTRS_ONLY.equals(oid))
- {
- setVirtualAttributesOnly(true);
- }
- else if (OID_GET_EFFECTIVE_RIGHTS.equals(oid) &&
- DirectoryServer.isSupportedControl(OID_GET_EFFECTIVE_RIGHTS))
- {
- // Do nothing here and let AciHandler deal with it.
- }
-
- // TODO: Add support for additional controls, including VLV
- else if (c.isCritical()
- && (replicationServer == null || !supportsControl(oid)))
- {
- throw new DirectoryException(
- ResultCode.UNAVAILABLE_CRITICAL_EXTENSION,
- ERR_SEARCH_UNSUPPORTED_CRITICAL_CONTROL.get(oid));
- }
- }
- }
- }
-
- private void processSearch(StartECLSessionMsg startECLSessionMsg)
- throws DirectoryException, CanceledOperationException
- {
- if (debugEnabled())
- {
- TRACER.debugInfo(" processSearch toString=[" + toString() + "] opid=["
- + startECLSessionMsg.getOperationId() + "]");
- }
-
- // Start a specific ECL server handler
- eclServerHandler =
- new ECLServerHandler(replicationServer, startECLSessionMsg);
- boolean abortECLSession = false;
- try
- {
- // Get first update (this is needed to determine hasSubordinates.
- ECLUpdateMsg update = eclServerHandler.getNextECLUpdate();
-
- // Return root entry if requested.
- if (CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, getScope()))
- {
- final Entry entry = createRootEntry(update != null);
- if (filter.matchesEntry(entry) && !returnEntry(entry, null))
- {
- // Abandon, Size limit reached.
- abortECLSession = true;
- return;
- }
- }
-
- if (baseDN.equals(CHANGELOG_BASE_DN)
- && getScope().equals(SearchScope.BASE_OBJECT))
- {
- // Only the change log root entry was requested. There is no need to
- // process other entries.
- return;
- }
-
- int lookthroughCount = 0;
- int lookthroughLimit = getClientConnection().getLookthroughLimit();
-
- // Process change log entries.
- while (update != null)
- {
- if(lookthroughLimit > 0 && lookthroughCount > lookthroughLimit)
- {
- //Lookthrough limit exceeded
- setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
- appendErrorMessage(
- NOTE_ECL_LOOKTHROUGH_LIMIT_EXCEEDED.get(lookthroughLimit));
- return;
- }
- // Check for a request to cancel this operation.
- checkIfCanceled(false);
-
- if (!buildAndReturnEntry(update))
- {
- // Abandon, Size limit reached.
- abortECLSession = true;
- return;
- }
-
- lookthroughCount++;
-
- update = eclServerHandler.getNextECLUpdate();
- }
- }
- catch (CanceledOperationException e)
- {
- abortECLSession = true;
- throw e;
- }
- catch (DirectoryException e)
- {
- abortECLSession = true;
- throw e;
- }
- finally
- {
- if (persistentSearch == null || abortECLSession)
- {
- shutdownECLServerHandler();
- }
- }
- }
-
- private boolean supportsControl(String oid)
- {
- return CHANGELOG_SUPPORTED_CONTROLS.contains(oid);
- }
-
- /**
- * Build an ECL entry from a provided ECL msg and return it.
- * @param eclMsg The provided ECL msg.
- * @return <CODE>true</CODE> if the caller should continue processing the
- * search request and sending additional entries and references, or
- * <CODE>false</CODE> if not for some reason (e.g., the size limit
- * has been reached or the search has been abandoned).
- * @throws DirectoryException When an errors occurs.
- */
- private boolean buildAndReturnEntry(ECLUpdateMsg eclMsg)
- throws DirectoryException
- {
- final Entry entry = createEntryFromMsg(eclMsg);
- if (matchScopeAndFilter(entry))
- {
- List<Control> controls = null;
- if (returnECLControl)
- {
- controls = new ArrayList<Control>(1);
-
- EntryChangelogNotificationControl clrc =
- new EntryChangelogNotificationControl(
- true, eclMsg.getCookie().toString());
- controls.add(clrc);
- }
- return returnEntry(entry, controls);
- }
-
- // Check the timelimit here as well, in case there are no matches
- if ((getTimeLimit() > 0) && (TimeThread.getTime() >=
- getTimeLimitExpiration()))
- {
- setResultCode(ResultCode.TIME_LIMIT_EXCEEDED);
- appendErrorMessage(ERR_SEARCH_TIME_LIMIT_EXCEEDED.get(getTimeLimit()));
- return false;
- }
-
- return true;
- }
-
-
-
- /**
- * Test if the provided entry matches the filter, base and scope.
- *
- * @param entry
- * The provided entry
- * @return whether the entry matches.
- * @throws DirectoryException
- * When a problem occurs.
- */
- private boolean matchScopeAndFilter(Entry entry) throws DirectoryException
- {
- return entry.matchesBaseAndScope(getBaseDN(), getScope())
- && getFilter().matchesEntry(entry);
- }
-
- /**
- * Create an ECL entry from a provided ECL msg.
- *
- * @param eclMsg
- * the provided ECL msg.
- * @return the created ECL entry.
- * @throws DirectoryException
- * When an error occurs.
- */
- public static Entry createEntryFromMsg(ECLUpdateMsg eclMsg)
- throws DirectoryException
- {
- Entry entry = null;
-
- // Get the meat from the ecl msg
- UpdateMsg msg = eclMsg.getUpdateMsg();
-
- if (msg instanceof AddMsg)
- {
- AddMsg addMsg = (AddMsg) msg;
-
- // Map addMsg to an LDIF string for the 'changes' attribute, and pull
- // out change initiators name if available which is contained in the
- // creatorsName attribute.
- String changeInitiatorsName = null;
- String ldifChanges = null;
-
- try
- {
- StringBuilder builder = new StringBuilder(256);
- for (Attribute a : addMsg.getAttributes())
- {
- if (a.getAttributeType().equals(CREATORS_NAME_TYPE)
- && !a.isEmpty())
- {
- // This attribute is not multi-valued.
- changeInitiatorsName = a.iterator().next().toString();
- }
-
- String attrName = a.getNameWithOptions();
- for (AttributeValue v : a)
- {
- builder.append(attrName);
- appendLDIFSeparatorAndValue(builder, v.getValue());
- builder.append('\n');
- }
- }
- ldifChanges = builder.toString();
- }
- catch (Exception e)
- {
- // Unable to decode the message - log an error.
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
-
- logError(Message.raw(
- Category.SYNC,
- Severity.MILD_ERROR,
- "An exception was encountered while try to encode a "
- + "replication add message for entry \""
- + addMsg.getDN()
- + "\" into an External Change Log entry: "
- + e.getMessage()));
- }
-
- entry = createChangelogEntry(eclMsg,
- addMsg,
- ldifChanges, // entry as created (in LDIF format)
- "add", changeInitiatorsName);
- }
- else if (msg instanceof ModifyCommonMsg)
- {
- ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
-
- // Map the modifyMsg to an LDIF string for the 'changes' attribute, and
- // pull out change initiators name if available which is contained in the
- // modifiersName attribute.
- String changeInitiatorsName = null;
- String ldifChanges = null;
-
- try
- {
- StringBuilder builder = new StringBuilder(128);
- for (Modification m : modifyMsg.getMods())
- {
- Attribute a = m.getAttribute();
-
- if (m.getModificationType() == ModificationType.REPLACE
- && a.getAttributeType().equals(MODIFIERS_NAME_TYPE)
- && !a.isEmpty())
- {
- // This attribute is not multi-valued.
- changeInitiatorsName = a.iterator().next().toString();
- }
-
- String attrName = a.getNameWithOptions();
- builder.append(m.getModificationType().getLDIFName());
- builder.append(": ");
- builder.append(attrName);
- builder.append('\n');
-
- for (AttributeValue v : a)
- {
- builder.append(attrName);
- appendLDIFSeparatorAndValue(builder, v.getValue());
- builder.append('\n');
- }
- builder.append("-\n");
- }
- ldifChanges = builder.toString();
- }
- catch (Exception e)
- {
- // Unable to decode the message - log an error.
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
-
- logError(Message.raw(
- Category.SYNC,
- Severity.MILD_ERROR,
- "An exception was encountered while try to encode a "
- + "replication modify message for entry \""
- + modifyMsg.getDN()
- + "\" into an External Change Log entry: "
- + e.getMessage()));
- }
-
- final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
- entry = createChangelogEntry(eclMsg,
- modifyMsg,
- ldifChanges,
- (isModifyDNMsg ? "modrdn" : "modify"),
- changeInitiatorsName);
-
- if (isModifyDNMsg)
- {
- ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
-
- addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
- if (modDNMsg.getNewSuperior() != null)
- {
- addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
- }
- addAttribute(entry, "deleteoldrdn",
- String.valueOf(modDNMsg.deleteOldRdn()));
- }
- }
- else if (msg instanceof DeleteMsg)
- {
- DeleteMsg delMsg = (DeleteMsg) msg;
-
- entry = createChangelogEntry(eclMsg,
- delMsg,
- null, // no changes
- "delete",
- delMsg.getInitiatorsName());
- }
-
- return entry;
- }
-
- private static void addAttribute(Entry e, String attrType, String attrValue)
- {
- e.addAttribute(Attributes.create(attrType, attrValue), null);
- }
-
- /**
- * Creates the root entry of the external changelog.
- * @param hasSubordinates whether the root entry has subordinates or not.
- * @return The root entry created.
- */
- private Entry createRootEntry(boolean hasSubordinates)
- {
- // Attributes
- Map<AttributeType, List<Attribute>> userAttrs =
- new LinkedHashMap<AttributeType,List<Attribute>>();
- Map<AttributeType, List<Attribute>> operationalAttrs =
- new LinkedHashMap<AttributeType,List<Attribute>>();
-
- addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME,
- "changelog", userAttrs, operationalAttrs);
- addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC,
- ATTR_SUBSCHEMA_SUBENTRY, ConfigConstants.DN_DEFAULT_SCHEMA_ROOT,
- userAttrs, operationalAttrs);
-
- // TODO:numSubordinates
-
- addAttributeByUppercaseName("hassubordinates", "hasSubordinates",
- Boolean.toString(hasSubordinates), userAttrs, operationalAttrs);
- addAttributeByUppercaseName("entrydn", "entryDN",
- DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
-
- return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES,
- userAttrs, operationalAttrs);
- }
-
- private void addAttributeByUppercaseName(String attrNameLowercase,
- String attrNameUppercase, String attrValue,
- Map<AttributeType, List<Attribute>> userAttrs,
- Map<AttributeType, List<Attribute>> operationalAttrs)
- {
- AttributeType aType = DirectoryServer.getAttributeType(attrNameLowercase);
- if (aType == null)
- {
- aType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
- }
- final Attribute a = Attributes.create(attrNameUppercase, attrValue);
- final List<Attribute> attrList = Collections.singletonList(a);
- if (aType.isOperational())
- {
- operationalAttrs.put(aType, attrList);
- }
- else
- {
- userAttrs.put(aType, attrList);
- }
- }
-
- private static void addAttributeByType(String attrNameLowercase,
- String attrNameUppercase, String attrValue,
- Map<AttributeType, List<Attribute>> userAttrs,
- Map<AttributeType, List<Attribute>> operationalAttrs)
- {
- AttributeType aType = DirectoryServer.getAttributeType(attrNameLowercase);
- if (aType == null)
- {
- aType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
- }
- Attribute a = Attributes.create(aType, attrValue);
- List<Attribute> attrList = Collections.singletonList(a);
- if (aType.isOperational())
- {
- operationalAttrs.put(aType, attrList);
- }
- else
- {
- userAttrs.put(aType, attrList);
- }
- }
-
- /**
- * Create an ECL entry from a set of provided information. This is the part of
- * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
- *
- * @param eclMsg
- * The provided ECLUpdateMsg for which to build the changelog entry.
- * @param msg
- * The provided LDAPUpdateMsg for which to build the changelog entry.
- * @param ldifChanges
- * The provided LDIF changes for ADD and MODIFY
- * @param changeType
- * The provided change type (add, ...)
- * @param changeInitiatorsName
- * The provided initiators name
- * @return The created ECL entry.
- * @throws DirectoryException
- * When any error occurs.
- */
- private static Entry createChangelogEntry(
- ECLUpdateMsg eclMsg,
- LDAPUpdateMsg msg,
- String ldifChanges,
- String changeType,
- String changeInitiatorsName)
- throws DirectoryException
- {
- final DN baseDN = eclMsg.getBaseDN();
- final long changeNumber = eclMsg.getChangeNumber();
- final CSN csn = msg.getCSN();
-
- String dnString;
- if (changeNumber == 0)
- {
- // cookie mode
- dnString = "replicationCSN=" + csn + "," + baseDN.toNormalizedString()
- + "," + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
- }
- else
- {
- // Draft compat mode
- dnString = "changeNumber=" + changeNumber + ","
- + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
- }
-
- // Attributes
- Map<AttributeType, List<Attribute>> uAttrs =
- new LinkedHashMap<AttributeType,List<Attribute>>();
- Map<AttributeType, List<Attribute>> operationalAttrs =
- new LinkedHashMap<AttributeType,List<Attribute>>();
-
- // Operational standard attributes
- addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC,
- ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, uAttrs, operationalAttrs);
- addAttributeByType("numsubordinates", "numSubordinates", "0", uAttrs,
- operationalAttrs);
- addAttributeByType("hassubordinates", "hasSubordinates", "false", uAttrs,
- operationalAttrs);
- addAttributeByType("entrydn", "entryDN", dnString, uAttrs,
- operationalAttrs);
-
- // REQUIRED attributes
-
- // ECL Changelog change number
- addAttributeByType("changenumber", "changeNumber",
- String.valueOf(changeNumber), uAttrs, operationalAttrs);
-
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
- dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
- final String format = dateFormat.format(new Date(csn.getTime()));
- addAttributeByType("changetime", "changeTime", format, uAttrs,
- operationalAttrs);
- addAttributeByType("changetype", "changeType", changeType, uAttrs,
- operationalAttrs);
- addAttributeByType("targetdn", "targetDN", msg.getDN().toNormalizedString(),
- uAttrs, operationalAttrs);
-
- // NON REQUESTED attributes
-
- addAttributeByType("replicationcsn", "replicationCSN",
- csn.toString(), uAttrs, operationalAttrs);
- addAttributeByType("replicaidentifier", "replicaIdentifier",
- Integer.toString(csn.getServerId()), uAttrs, operationalAttrs);
-
- if (ldifChanges != null)
- {
- addAttributeByType("changes", "changes", ldifChanges, uAttrs,
- operationalAttrs);
- }
-
- if (changeInitiatorsName != null)
- {
- addAttributeByType("changeinitiatorsname", "changeInitiatorsName",
- changeInitiatorsName, uAttrs, operationalAttrs);
- }
-
- final String targetUUID = msg.getEntryUUID();
- if (targetUUID != null)
- {
- addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID,
- uAttrs, operationalAttrs);
- }
-
- final String cookie = eclMsg.getCookie().toString();
- addAttributeByType("changelogcookie", "changeLogCookie", cookie, uAttrs,
- operationalAttrs);
-
- final List<RawAttribute> includedAttributes = msg.getEclIncludes();
- if (includedAttributes != null && !includedAttributes.isEmpty())
- {
- StringBuilder builder = new StringBuilder(256);
- for (RawAttribute includedAttribute : includedAttributes)
- {
- String name = includedAttribute.getAttributeType();
- for (ByteString value : includedAttribute.getValues())
- {
- builder.append(name);
- appendLDIFSeparatorAndValue(builder, value);
- builder.append('\n');
- }
- }
- String includedAttributesLDIF = builder.toString();
-
- addAttributeByType("includedattributes", "includedAttributes",
- includedAttributesLDIF, uAttrs, operationalAttrs);
- }
-
- // at the end build the CL entry to be returned
- return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES,
- uAttrs, operationalAttrs);
- }
-
- /** {@inheritDoc} */
- @Override
- public CancelResult cancel(CancelRequest cancelRequest)
- {
- if (debugEnabled())
- TRACER.debugInfo(this + " cancel() " + eclServerHandler);
- shutdownECLServerHandler();
- return super.cancel(cancelRequest);
- }
-
- /** {@inheritDoc} */
- @Override
- public void abort(CancelRequest cancelRequest)
- {
- if (debugEnabled())
- TRACER.debugInfo(this + " abort() " + eclServerHandler);
- shutdownECLServerHandler();
- }
-
- private void shutdownECLServerHandler()
- {
- if (eclServerHandler != null)
- {
- eclServerHandler.shutdown();
- }
- }
-
- /**
- * Traverse the provided search filter, looking for some conditions
- * on attributes that can be optimized in the ECL.
- * When found, populate the provided StartECLSessionMsg.
- * @param startCLmsg the startCLMsg to be populated.
- * @param baseDN the provided search baseDN.
- * @param sf the provided search filter.
- * @throws DirectoryException when an exception occurs.
- */
- public static void evaluateSearchParameters(StartECLSessionMsg startCLmsg,
- DN baseDN, SearchFilter sf) throws DirectoryException
- {
- // Select whether to use the DN or the filter.
- switch (baseDN.getNumComponents())
- {
- case 1:
- // cn=changelog - use user provided search filter.
- break;
- case 2:
- // changeNumber=xxx,cn=changelog - draft ECL - use faked up equality
- // filter.
-
- // The DN could also be a new ECL <service-id>,cn=changelog so be sure it
- // is draft ECL.
- RDN rdn = baseDN.getRDN();
-
- AttributeType at = DirectoryServer.getAttributeType("changenumber");
- if (at == null)
- {
- at = DirectoryServer.getDefaultAttributeType("changeNumber");
- }
-
- AttributeValue av = rdn.getAttributeValue(at);
- if (av != null)
- {
- sf = SearchFilter.createEqualityFilter(at, av);
- }
- break;
- default:
- // replicationCSN=xxx,<service-id>,cn=changelog - new ECL - use faked up
- // equality filter.
- rdn = baseDN.getRDN();
-
- at = DirectoryServer.getAttributeType("replicationcsn");
- if (at == null)
- {
- at = DirectoryServer.getDefaultAttributeType("replicationCSN");
- }
-
- av = rdn.getAttributeValue(at);
- if (av != null)
- {
- sf = SearchFilter.createEqualityFilter(at, av);
- }
- break;
- }
-
- StartECLSessionMsg msg = evaluateSearchParameters2(sf);
- startCLmsg.setFirstChangeNumber(msg.getFirstChangeNumber());
- startCLmsg.setLastChangeNumber(msg.getLastChangeNumber());
- startCLmsg.setCSN(msg.getCSN());
- }
-
- private static StartECLSessionMsg evaluateSearchParameters2(SearchFilter sf)
- throws DirectoryException
- {
- StartECLSessionMsg startCLmsg = new StartECLSessionMsg();
- startCLmsg.setFirstChangeNumber(-1);
- startCLmsg.setLastChangeNumber(-1);
- startCLmsg.setCSN(new CSN(0, 0, 0));
-
- // If there's no filter, just return
- if (sf == null)
- {
- return startCLmsg;
- }
-
- // Here are the 3 elementary cases we know how to optimize
- if (matches(sf, FilterType.GREATER_OR_EQUAL, "changeNumber"))
- {
- int sn = Integer.decode(
- sf.getAssertionValue().getNormalizedValue().toString());
- startCLmsg.setFirstChangeNumber(sn);
- }
- else if (matches(sf, FilterType.LESS_OR_EQUAL, "changeNumber"))
- {
- int sn = Integer.decode(
- sf.getAssertionValue().getNormalizedValue().toString());
- startCLmsg.setLastChangeNumber(sn);
- }
- else if (matches(sf, FilterType.EQUALITY, "replicationcsn"))
- {
- // == exact CSN
- startCLmsg.setCSN(new CSN(sf.getAssertionValue().toString()));
- }
- else if (matches(sf, FilterType.EQUALITY, "changenumber"))
- {
- int sn = Integer.decode(
- sf.getAssertionValue().getNormalizedValue().toString());
- startCLmsg.setFirstChangeNumber(sn);
- startCLmsg.setLastChangeNumber(sn);
- }
- else if (sf.getFilterType() == FilterType.AND)
- {
- // Here is the only binary operation we know how to optimize
- Collection<SearchFilter> comps = sf.getFilterComponents();
- SearchFilter sfs[] = comps.toArray(new SearchFilter[0]);
- long l1 = -1;
- long f1 = -1;
- long l2 = -1;
- long f2 = -1;
- StartECLSessionMsg m1;
- StartECLSessionMsg m2;
- if (sfs.length > 0)
- {
- m1 = evaluateSearchParameters2(sfs[0]);
- l1 = m1.getLastChangeNumber();
- f1 = m1.getFirstChangeNumber();
- }
- if (sfs.length > 1)
- {
- m2 = evaluateSearchParameters2(sfs[1]);
- l2 = m2.getLastChangeNumber();
- f2 = m2.getFirstChangeNumber();
- }
- if (l1 == -1)
- startCLmsg.setLastChangeNumber(l2);
- else if (l2 == -1)
- startCLmsg.setLastChangeNumber(l1);
- else
- startCLmsg.setLastChangeNumber(Math.min(l1, l2));
-
- startCLmsg.setFirstChangeNumber(Math.max(f1,f2));
- }
- return startCLmsg;
- }
-
- private static boolean matches(SearchFilter sf, FilterType filterType,
- String primaryName)
- {
- return sf.getFilterType() == filterType
- && sf.getAttributeType() != null
- && sf.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
- }
-}
diff --git a/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLWorkflowElement.java b/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLWorkflowElement.java
deleted file mode 100644
index 70206ad..0000000
--- a/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLWorkflowElement.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2012-2014 ForgeRock AS
- */
-package org.opends.server.workflowelement.externalchangelog;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.opends.server.admin.std.server.WorkflowElementCfg;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.SearchOperation;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.types.CanceledOperationException;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Operation;
-import org.opends.server.workflowelement.LeafWorkflowElement;
-
-/**
- * This class defines a workflow element for the external changelog (ECL);
- * e-g an entity that handles the processing of an operation against the ECL.
- */
-public class ECLWorkflowElement extends LeafWorkflowElement<WorkflowElementCfg>
-{
-
- /**
- * A string indicating the type of the workflow element.
- */
- public static final String ECL_WORKFLOW_ELEMENT = "EXTERNAL CHANGE LOG";
-
- /**
- * The replication server object to which we will submits request
- * on the ECL. Retrieved from the local DirectoryServer.
- */
- private final ReplicationServer replicationServer;
-
- /**
- * Creates a new instance of the External Change Log workflow element.
- * @param rs the provided replication server
- * @throws DirectoryException If the ECL workflow is already registered.
- */
- public ECLWorkflowElement(ReplicationServer rs)
- throws DirectoryException
- {
- this.replicationServer =rs;
- super.initialize(ECL_WORKFLOW_ELEMENT, ECL_WORKFLOW_ELEMENT);
- super.setPrivate(true);
- DirectoryServer.registerWorkflowElement(this);
- }
-
- /** {@inheritDoc} */
- @Override
- public void finalizeWorkflowElement()
- {
- // null all fields so that any use of the finalized object will raise a NPE
- super.initialize(ECL_WORKFLOW_ELEMENT, null);
- }
-
- /** {@inheritDoc} */
- @Override
- public void execute(Operation operation) throws CanceledOperationException {
- switch (operation.getOperationType())
- {
- case SEARCH:
- ECLSearchOperation searchOperation =
- new ECLSearchOperation((SearchOperation) operation);
- searchOperation.processECLSearch(this);
- break;
- case ABANDON:
- // There is no processing for an abandon operation.
- break;
-
- case BIND:
- case ADD:
- case DELETE:
- case MODIFY:
- case MODIFY_DN:
- case COMPARE:
- default:
- throw new AssertionError("Attempted to execute an invalid operation " +
- "type: " + operation.getOperationType() +
- " (" + operation + ")");
- }
- }
-
-
-
- /**
- * Attaches the current local operation to the global operation so that
- * operation runner can execute local operation post response later on.
- *
- * @param <O> subtype of Operation
- * @param <L> subtype of LocalBackendOperation
- * @param globalOperation the global operation to which local operation
- * should be attached to
- * @param currentLocalOperation the local operation to attach to the global
- * operation
- */
- @SuppressWarnings("unchecked")
- public static <O extends Operation,L> void
- attachLocalOperation (O globalOperation, L currentLocalOperation)
- {
- List<?> existingAttachment =
- (List<?>) globalOperation.getAttachment(Operation.LOCALBACKENDOPERATIONS);
-
- List<L> newAttachment = new ArrayList<L>();
-
- if (existingAttachment != null)
- {
- // This line raises an unchecked conversion warning.
- // There is nothing we can do to prevent this warning
- // so let's get rid of it since we know the cast is safe.
- newAttachment.addAll ((List<L>) existingAttachment);
- }
- newAttachment.add (currentLocalOperation);
- globalOperation.setAttachment(Operation.LOCALBACKENDOPERATIONS,
- newAttachment);
- }
-
- /**
- * Returns the associated replication server.
- * @return the rs.
- */
- public ReplicationServer getReplicationServer()
- {
- return this.replicationServer;
- }
-}
-
diff --git a/opends/src/server/org/opends/server/workflowelement/externalchangelog/package-info.java b/opends/src/server/org/opends/server/workflowelement/externalchangelog/package-info.java
deleted file mode 100644
index cf609af..0000000
--- a/opends/src/server/org/opends/server/workflowelement/externalchangelog/package-info.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- */
-
-
-
-/**
- * This package contains source for the local backend workflow element, which
- * are used to process operations against data stored in local backend databases
- * and other repositories that are considered "local".
- */
-@org.opends.server.types.PublicAPI(
- stability=org.opends.server.types.StabilityLevel.PRIVATE)
-package org.opends.server.workflowelement.externalchangelog;
-
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java
index e1d6127..01eff69 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java
@@ -51,7 +51,6 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.DomainFakeCfg;
import org.opends.server.replication.plugin.ExternalChangelogDomainFakeCfg;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
@@ -66,6 +65,9 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
@@ -428,13 +430,11 @@
generateDeleteMsg(TEST_ROOT_DN_STRING, csn9, test, 9));
// ensure oldest state is correct for each suffix and for each server id
- final ServerState oldestState = getDomainOldestState(DN_OTEST);
- assertEquals(oldestState.getCSN(SERVER_ID_1), csn1);
- assertEquals(oldestState.getCSN(serverId22), csn7);
+ isOldestCSNForReplica(DN_OTEST, csn1);
+ isOldestCSNForReplica(DN_OTEST, csn7);
- final ServerState oldestState2 = getDomainOldestState(DN_OTEST2);
- assertEquals(oldestState2.getCSN(SERVER_ID_2), csn2);
- assertEquals(oldestState2.getCSN(serverId11), csn6);
+ isOldestCSNForReplica(DN_OTEST2, csn2);
+ isOldestCSNForReplica(DN_OTEST2, csn6);
// test last cookie on root DSE
MultiDomainServerState expectedLastCookie =
@@ -460,7 +460,20 @@
finally
{
removeBackend(backendForSecondSuffix);
- //replicationServer.getChangelogDB().getReplicationDomainDB().removeDomain(ROOT_DN_OTEST2);
+ }
+ }
+
+ private void isOldestCSNForReplica(DN baseDN, CSN csn) throws Exception
+ {
+ final ReplicationDomainDB domainDB = replicationServer.getChangelogDB().getReplicationDomainDB();
+ final DBCursor<UpdateMsg> cursor =
+ domainDB.getCursorFrom(baseDN, csn.getServerId(), null, PositionStrategy.ON_MATCHING_KEY);
+ try {
+ assertTrue(cursor.next(),
+ "Expected to be to find at least one change in replicaDB(" + baseDN + " " + csn.getServerId() + ")");
+ assertEquals(cursor.getRecord().getCSN(), csn);
+ }finally{
+ close(cursor);
}
}
@@ -819,11 +832,6 @@
return results;
}
- private ServerState getDomainOldestState(DN baseDN)
- {
- return replicationServer.getReplicationServerDomain(baseDN).getOldestState();
- }
-
private void assertSearchParameters(SearchParams searchParams, long firstChangeNumber,
long lastChangeNumber, CSN csn) throws Exception
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 007e482..5fc0de5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -36,8 +36,6 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.*;
-import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
-import org.opends.server.replication.protocol.StartECLSessionMsg.Persistent;
import org.opends.server.types.*;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.localbackend.LocalBackendAddOperation;
@@ -630,52 +628,6 @@
msg1.getBytes(getCurrentVersion()), getCurrentVersion());
}
- @Test(enabled=true)
- public void eclUpdateMsg()
- throws Exception
- {
- // create a msg to put in the eclupdatemsg
- InternalClientConnection connection =
- InternalClientConnection.getRootConnection();
- DeleteOperation deleteOp =
- new DeleteOperationBasis(connection, 1, 1,null, DN.decode("cn=t1"));
- LocalBackendDeleteOperation op = new LocalBackendDeleteOperation(deleteOp);
- CSN csn = new CSN(TimeThread.getTime(), 123, 45);
- op.setAttachment(SYNCHROCONTEXT, new DeleteContext(csn, "uniqueid"));
- DeleteMsg delmsg = new DeleteMsg(op);
- long changeNumber = 21;
-
- DN baseDN = DN.decode("dc=example,dc=com");
-
- // create a cookie
- MultiDomainServerState cookie =
- new MultiDomainServerState(
- "o=test:000001210b6f21e904b100000001 000001210b6f21e904b200000001;" +
- "o=test2:000001210b6f21e904b100000002 000001210b6f21e904b200000002;");
-
- // Constructor test
- ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, baseDN, changeNumber);
- assertTrue(msg1.getCookie().equalsTo(cookie));
- assertEquals(msg1.getBaseDN(), baseDN);
- assertEquals(msg1.getChangeNumber(), changeNumber);
- DeleteMsg delmsg2 = (DeleteMsg)msg1.getUpdateMsg();
- assertEquals(delmsg.compareTo(delmsg2), 0);
-
- // Constructor test (with byte[])
- ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes(getCurrentVersion()));
- assertTrue(msg2.getCookie().equalsTo(msg2.getCookie()));
- assertTrue(msg2.getCookie().equalsTo(cookie));
- assertEquals(msg2.getBaseDN(), msg1.getBaseDN());
- assertEquals(msg2.getBaseDN(), baseDN);
- assertEquals(msg2.getChangeNumber(), msg1.getChangeNumber());
- assertEquals(msg2.getChangeNumber(), changeNumber);
-
- DeleteMsg delmsg1 = (DeleteMsg)msg1.getUpdateMsg();
- delmsg2 = (DeleteMsg)msg2.getUpdateMsg();
- assertEquals(delmsg2.compareTo(delmsg), 0);
- assertEquals(delmsg2.compareTo(delmsg1), 0);
- }
-
@DataProvider(name="createServerStartData")
public Object[][] createServerStartData() throws Exception
{
@@ -766,55 +718,6 @@
newMsg.getDegradedStatusThreshold());
}
- @DataProvider(name="createReplServerStartDSData")
- public Object[][] createReplServerStartDSData() throws Exception
- {
- DN baseDN = TEST_ROOT_DN;
-
- final ServerState state1 = new ServerState();
- state1.update(new CSN(0, 0, 0));
- final ServerState state2 = new ServerState();
- state2.update(new CSN(75, 5, 263));
- final ServerState state3 = new ServerState();
- state3.update(new CSN(123, 5, 98));
-
- return new Object[][]
- {
- {1, baseDN, 0, "localhost:8989", state1, 0L, (byte)0, 0, 0, 0},
- {16, baseDN, 100, "anotherHost:1025", state2, 1245L, (byte)25, 3456, 3, 31512},
- {36, baseDN, 100, "anotherHostAgain:8017", state3, 6841L, (byte)32, 2496, 630, 9524},
- };
- }
-
- /**
- * Test that ReplServerStartDSMsg encoding and decoding works
- * by checking that : msg == new ReplServerStartMsg(msg.getBytes()).
- */
- @Test(dataProvider="createReplServerStartDSData")
- public void replServerStartDSMsgTest(int serverId, DN baseDN, int window,
- String url, ServerState state, long genId, byte groupId, int degTh,
- int weight, int connectedDSNumber) throws Exception
- {
- ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId,
- url, baseDN, window, state, genId,
- true, groupId, degTh, weight, connectedDSNumber);
- ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes(getCurrentVersion()));
- assertEquals(msg.getServerId(), newMsg.getServerId());
- assertEquals(msg.getServerURL(), newMsg.getServerURL());
- assertEquals(msg.getBaseDN(), newMsg.getBaseDN());
- assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
- assertEquals(msg.getServerState().getCSN(1),
- newMsg.getServerState().getCSN(1));
- assertEquals(newMsg.getVersion(), getCurrentVersion());
- assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
- assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
- assertEquals(msg.getGroupId(), newMsg.getGroupId());
- assertEquals(msg.getDegradedStatusThreshold(),
- newMsg.getDegradedStatusThreshold());
- assertEquals(msg.getWeight(), newMsg.getWeight());
- assertEquals(msg.getConnectedDSNumber(), newMsg.getConnectedDSNumber());
- }
-
/**
* Test that StopMsg encoding and decoding works
* by checking that : msg == new StopMsg(msg.getBytes()).
@@ -1262,70 +1165,6 @@
assertEquals(test.getBytes(), newMsg.getPayload());
}
- /**
- * Test that ServerStartMsg encoding and decoding works
- * by checking that : msg == new ServerStartMsg(msg.getBytes()).
- */
- @Test(enabled=true,dataProvider="createServerStartData")
- public void startECLMsgTest(int serverId, DN baseDN, int window,
- ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception
- {
- ServerStartECLMsg msg = new ServerStartECLMsg(
- "localhost:1234", window, window, window, window, window, window, state,
- genId, sslEncryption, groupId);
- ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes(getCurrentVersion()));
- assertEquals(msg.getServerURL(), newMsg.getServerURL());
- assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
- assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
- assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay());
- assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue());
- assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
- assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
- assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
- assertEquals(msg.getServerState().getCSN(1),
- newMsg.getServerState().getCSN(1));
- assertEquals(newMsg.getVersion(), getCurrentVersion());
- assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
- assertEquals(msg.getGroupId(), newMsg.getGroupId());
- }
-
- /**
- * Test StartSessionMsg encoding and decoding.
- */
- @Test()
- public void startECLSessionMsgTest() throws Exception
- {
- // data
- CSN csn = new CSN(TimeThread.getTime(), 123, 45);
- ServerState state = new ServerState();
- assertTrue(state.update(new CSN(75, 5,263)));
-
- // create original
- StartECLSessionMsg msg = new StartECLSessionMsg();
- msg.setCSN(csn);
- msg.setCrossDomainServerState("fakegenstate");
- msg.setPersistent(Persistent.PERSISTENT);
- msg.setFirstChangeNumber(13);
- msg.setLastChangeNumber(14);
- msg.setECLRequestType(ECLRequestType.REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER);
- msg.setOperationId("fakeopid");
- String dn1 = "cn=admin data";
- String dn2 = "cn=config";
- msg.setExcludedDNs(newSet(dn1, dn2));
-
- // create copy
- StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes(getCurrentVersion()));
- // test equality between the two copies
- assertEquals(msg.getCSN(), newMsg.getCSN());
- assertEquals(msg.getPersistent(), newMsg.getPersistent());
- assertEquals(msg.getFirstChangeNumber(), newMsg.getFirstChangeNumber());
- assertEquals(msg.getECLRequestType(), newMsg.getECLRequestType());
- assertEquals(msg.getLastChangeNumber(), newMsg.getLastChangeNumber());
- assertTrue(msg.getCrossDomainServerState().equalsIgnoreCase(newMsg.getCrossDomainServerState()));
- assertTrue(msg.getOperationId().equalsIgnoreCase(newMsg.getOperationId()));
- Assertions.assertThat(newMsg.getExcludedBaseDNs()).containsOnly(dn1, dn2);
- }
-
private int perfRep = 100000;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
deleted file mode 100644
index 275450d..0000000
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ /dev/null
@@ -1,2812 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2014 ForgeRock AS
- */
-package org.opends.server.replication.server;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.StringReader;
-import java.lang.reflect.Method;
-import java.net.Socket;
-import java.util.*;
-
-import org.assertj.core.api.Assertions;
-import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
-import org.opends.server.api.Backend;
-import org.opends.server.api.ConnectionHandler;
-import org.opends.server.backends.MemoryBackend;
-import org.opends.server.controls.ExternalChangelogRequestControl;
-import org.opends.server.controls.PersistentSearchChangeType;
-import org.opends.server.controls.PersistentSearchControl;
-import org.opends.server.core.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.plugins.InvocationCounterPlugin;
-import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.protocols.internal.InternalSearchOperation;
-import org.opends.server.protocols.ldap.*;
-import org.opends.server.replication.ReplicationTestCase;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.CSNGenerator;
-import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.plugin.DomainFakeCfg;
-import org.opends.server.replication.plugin.ExternalChangelogDomainFakeCfg;
-import org.opends.server.replication.plugin.LDAPReplicationDomain;
-import org.opends.server.replication.plugin.MultimasterReplication;
-import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.je.JEChangeNumberIndexDB;
-import org.opends.server.replication.service.ReplicationBroker;
-import org.opends.server.tools.LDAPSearch;
-import org.opends.server.tools.LDAPWriter;
-import org.opends.server.types.*;
-import org.opends.server.util.LDIFWriter;
-import org.opends.server.util.TimeThread;
-import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
-import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.assertj.core.api.Assertions.*;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.TestCaseUtils.*;
-import static org.opends.server.controls.PersistentSearchChangeType.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.protocol.OperationContext.*;
-import static org.opends.server.types.ResultCode.*;
-import static org.opends.server.util.CollectionUtils.*;
-import static org.opends.server.util.StaticUtils.*;
-import static org.testng.Assert.*;
-
-/**
- * Tests for the replicationServer code.
- */
-@SuppressWarnings("javadoc")
-public class ExternalChangeLogTest extends ReplicationTestCase
-{
-
- private static class Results
- {
-
- public final List<SearchResultEntryProtocolOp> searchResultEntries =
- new ArrayList<SearchResultEntryProtocolOp>();
- public long searchReferences;
- public long searchesDone;
-
- }
-
- private static final int SERVER_ID_1 = 1201;
- private static final int SERVER_ID_2 = 1202;
-
- /** The tracer object for the debug logger */
- private static final DebugTracer TRACER = getTracer();
-
- /** The replicationServer that will be used in this test. */
- private ReplicationServer replicationServer;
-
- /** The port of the replicationServer. */
- private int replicationServerPort;
-
- /** base DN for "o=test" */
- private static DN TEST_ROOT_DN;
- /** base DN for "o=test2" */
- private static DN TEST_ROOT_DN2;
-
- private static final String TEST_BACKEND_ID2 = "test2";
- private static final String TEST_ROOT_DN_STRING2 = "o=" + TEST_BACKEND_ID2;
-
- /** The LDAPStatistics object associated with the LDAP connection handler. */
- private LDAPStatistics ldapStatistics;
-
- private final int brokerSessionTimeout = 5000;
- private final int maxWindow = 100;
-
- /**
- * When used in a search operation, it includes all attributes (user and
- * operational)
- */
- private static final Set<String> ALL_ATTRIBUTES = newHashSet("*", "+");
- private static final List<Control> NO_CONTROL = null;
-
- /**
- * Set up the environment for performing the tests in this Class.
- * Replication
- *
- * @throws Exception
- * If the environment could not be set up.
- */
- @BeforeClass
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING);
- TEST_ROOT_DN2 = DN.decode(TEST_ROOT_DN_STRING2);
-
- // This test suite depends on having the schema available.
- configure();
- }
-
- /**
- * Utility : configure a replicationServer.
- */
- private void configure() throws Exception
- {
- replicationServerPort = TestCaseUtils.findFreePort();
-
- ReplServerFakeConfiguration conf1 =
- new ReplServerFakeConfiguration(
- replicationServerPort, "ExternalChangeLogTestDb",
- replicationDbImplementation, 0, 71, 0, maxWindow, null);
- conf1.setComputeChangeNumber(true);
-
- replicationServer = new ReplicationServer(conf1);
- debugInfo("configure", "ReplicationServer created"+replicationServer);
- }
-
- @Test(enabled = true, dependsOnMethods = { "TestECLIsNotASupportedSuffix" })
- public void PrimaryTest() throws Exception
- {
- replicationServer.getChangelogDB().setPurgeDelay(0);
-
- // Test all types of ops.
- ECLAllOps(); // Do not clean the db for the next test
-
- // First and last should be ok whenever a request has been done or not
- // in compat mode.
- ECLCompatTestLimits(1,4,true);
- }
-
- @Test(enabled=true, dependsOnMethods = { "PrimaryTest"})
- public void TestWithAndWithoutControl() throws Exception
- {
- final String tn = "TestWithAndWithoutControl";
- replicationServer.getChangelogDB().setPurgeDelay(0);
- // Write changes and read ECL from start
- ECLCompatWriteReadAllOps(1, tn);
-
- ECLCompatNoControl(1);
-
- // Write additional changes and read ECL from a provided change number
- ECLCompatWriteReadAllOps(5, tn);
- }
-
- @Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
- public void PrimaryFullTest() throws Exception
- {
- // ***********************************************
- // First set of test are in the cookie mode
- // ***********************************************
-
- // Test that private backend is excluded from ECL
- ECLOnPrivateBackend();
- }
-
- @Test(enabled=true, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestRemoteAPIWithEmptyECL() throws Exception
- {
- // Test remote API (ECL through replication protocol) with empty ECL
- ECLRemoteEmpty();
- }
-
- @Test(enabled=true, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestWithEmptyECL() throws Exception
- {
- // Test with empty changelog
- ECLEmpty();
- }
-
- @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestPrimaryPlusOperationAttributesNotVisible() throws Exception
- {
- replicationServer.getChangelogDB().setPurgeDelay(0);
- // Test all types of ops.
- ECLAllOps(); // Do not clean the db for the next test
-
- // Test after this one will test access in RootDSE. This one checks in data.
- TestECLOperationalAttributesNotVisibleOutsideRootDSE();
-
- // First and last should be ok whenever a request has been done or not
- // in compat mode.
- ECLCompatTestLimits(1, 4, true);
- }
-
- @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestRemoteAPIWithNonEmptyECL() throws Exception
- {
- // Test remote API (ECL through replication protocol) with NON empty ECL
- ECLRemoteNonEmpty();
- }
-
- /** Persistent search with changesOnly request */
- @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestPersistentSearchWithChangesOnlyRequest() throws Exception
- {
- ECLPsearch(true, false);
- }
-
- /** Persistent search with init values request */
- @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestPersistentSearchWithInitValuesRequest() throws Exception
- {
- ECLPsearch(false, false);
- }
-
- // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned
- // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned
- // TODO:ECL Test invalid DN in cookie returns UNWILLING + message
- // TODO:ECL Test the attributes list and values returned in ECL entries
- // TODO:ECL Test search -s base, -s one
-
- @Test(enabled=true, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestFilters() throws Exception
- {
- // Test the different forms of filter that are parsed in order to
- // optimize the request.
- ECLFilterTest();
- }
-
- @Test(enabled=true, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestDraftCompatModeWithEmptyECL() throws Exception
- {
- // ***********************************************
- // Second set of test are in the draft compat mode
- // ***********************************************
- // Empty replication changelog
- ECLCompatEmpty();
- }
-
- @Test(enabled=true, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void FullTestRequestFromInvalidChangeNumber() throws Exception
- {
- // Request from an invalid change number
- ECLCompatBadSeqnum();
- }
-
- @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void ECLReplicationServerFullTest15() throws Exception
- {
- final String tn = "ECLReplicationServerFullTest15";
- replicationServer.getChangelogDB().setPurgeDelay(0);
- // Write 4 changes and read ECL from start
- ECLCompatWriteReadAllOps(1, tn);
-
- // Write 4 additional changes and read ECL from a provided change number
- CSN csn = ECLCompatWriteReadAllOps(5, tn);
-
- // Test request from a provided change number - read 6
- ECLCompatReadFrom(6, csn);
-
- // Test request from a provided change number interval - read 5-7
- ECLCompatReadFromTo(5,7);
-
- // Test first and last change number
- ECLCompatTestLimits(1,8, true);
-
- // Test first and last change number, add a new change, do not
- // search again the ECL, but search for first and last
- ECLCompatTestLimitsAndAdd(1, 8, 4);
-
- // Test CNIndexDB is purged when replication change log is purged
- final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
- cnIndexDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
- assertTrue(cnIndexDB.isEmpty());
- ECLPurgeCNIndexDBAfterChangelogClear();
-
- // Test first and last are updated
- ECLCompatTestLimits(0,0, true);
-
- // Persistent search in changesOnly mode
- ECLPsearch(true, true);
- }
-
- @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
- public void ECLReplicationServerFullTest16() throws Exception
- {
- // Persistent search in init + changes mode
- CSN csn = ECLPsearch(false, true);
-
- // Test Filter on replication csn
- // TODO: test with optimization when code done.
- ECLFilterOnReplicationCSN(csn);
- }
-
- /**
- * Verifies that is not possible to read the changelog without the changelog-read privilege
- */
- @Test(enabled = false, dependsOnMethods = { "PrimaryTest" })
- public void ECLChangelogReadPrivilegeTest() throws Exception
- {
- AuthenticationInfo nonPrivilegedUser = new AuthenticationInfo();
-
- InternalClientConnection conn = new InternalClientConnection(nonPrivilegedUser);
- InternalSearchOperation ico = conn.processSearch("cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)");
-
- assertEquals(ico.getResultCode(), ResultCode.INSUFFICIENT_ACCESS_RIGHTS);
- assertEquals(ico.getErrorMessage().toMessage(), NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
- }
-
- /** No RSDomain created yet => RS only case => ECL is not a supported. */
- @Test(enabled = true)
- public void TestECLIsNotASupportedSuffix() throws Exception
- {
- try
- {
- invoke(replicationServer, "shutdownExternalChangelog");
- ECLCompatTestLimits(0, 0, false);
- }
- finally
- {
- invoke(replicationServer, "enableExternalChangeLog");
- }
- }
-
- private void invoke(Object obj, String methodName) throws Exception
- {
- final Method m = obj.getClass().getDeclaredMethod(methodName);
- m.setAccessible(true);
- m.invoke(obj);
- }
-
- /**
- * Objectives
- * - Test that everything is ok with no changes
- * Procedure
- * - Does a SEARCH from 3 different remote ECL session,
- * - Verify DoneMsg is received on each session.
- */
- private void ECLRemoteEmpty() throws Exception
- {
- String tn = "ECLRemoteEmpty";
- debugInfo(tn, "Starting test\n\n");
-
- ReplicationBroker[] brokers = new ReplicationBroker[3];
-
- try
- {
- // Create 3 ECL broker
- final DN changelogDN = DN.decode("cn=changelog");
- brokers[0] = openReplicationSession(
- changelogDN, 1111, 100, replicationServerPort, brokerSessionTimeout);
- assertTrue(brokers[0].isConnected());
- brokers[1] = openReplicationSession(
- changelogDN, 2222, 100, replicationServerPort, brokerSessionTimeout);
- assertTrue(brokers[1].isConnected());
- brokers[2] = openReplicationSession(
- changelogDN, 3333, 100, replicationServerPort, brokerSessionTimeout);
- assertTrue(brokers[2].isConnected());
-
- assertOnlyDoneMsgReceived(tn, brokers[0]);
- assertOnlyDoneMsgReceived(tn, brokers[1]);
- assertOnlyDoneMsgReceived(tn, brokers[2]);
- debugInfo(tn, "Ending test successfully\n\n");
- }
- finally
- {
- stop(brokers);
- }
- }
-
- private void assertOnlyDoneMsgReceived(String tn, ReplicationBroker server)
- throws Exception
- {
- ReplicationMsg msg;
- int msgc = 0;
- do
- {
- msg = server.receive();
- msgc++;
- }
- while (!(msg instanceof DoneMsg));
- final String className = msg.getClass().getCanonicalName();
- assertEquals(msgc, 1, "Ending " + tn + " with incorrect message number :" + className);
- }
-
- /**
- * Objectives:
- * <ul>
- * <li>Test that everything is ok with changes on 2 suffixes</li>
- * </ul>
- * Procedure:
- * <ul>
- * <li>From 1 remote ECL session,</li>
- * <li>Test simple update to be received from 2 suffixes</li>
- * </ul>
- */
- private void ECLRemoteNonEmpty() throws Exception
- {
- String tn = "ECLRemoteNonEmpty";
- debugInfo(tn, "Starting test\n\n");
-
- ReplicationBroker server01 = null;
- ReplicationBroker server02 = null;
- ReplicationBroker serverECL = null;
-
- try
- {
- // create 2 regular brokers on the 2 suffixes
- server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
-
- server02 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
- 100, replicationServerPort, brokerSessionTimeout, EMPTY_DN_GENID);
-
- // create and publish 1 change on each suffix
- long time = TimeThread.getTime();
- int ts = 1;
- CSN csn1 = new CSN(time, ts++, SERVER_ID_1);
- DeleteMsg delMsg1 = newDeleteMsg("o=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, "ECLBasicMsg1uid");
- server01.publish(delMsg1);
- debugInfo(tn, "publishes:" + delMsg1);
-
- CSN csn2 = new CSN(time, ts++, SERVER_ID_2);
- DeleteMsg delMsg2 = newDeleteMsg("o=" + tn + "2," + TEST_ROOT_DN_STRING2, csn2, "ECLBasicMsg2uid");
- server02.publish(delMsg2);
- debugInfo(tn, "publishes:" + delMsg2);
-
- // wait for the server to take these changes into account
- Thread.sleep(500);
-
- // open ECL broker
- serverECL = openReplicationSession(
- DN.decode("cn=changelog"), 10, 100, replicationServerPort, brokerSessionTimeout);
- assertTrue(serverECL.isConnected());
-
- // receive change 1 from suffix 1
- ReplicationMsg msg;
- msg = serverECL.receive();
- ECLUpdateMsg eclu = (ECLUpdateMsg)msg;
- UpdateMsg u = eclu.getUpdateMsg();
- debugInfo(tn, "RESULT:" + u.getCSN() + " " + eclu.getCookie());
- assertTrue(u.getCSN().equals(csn1), "RESULT:" + u.getCSN());
- assertTrue(eclu.getCookie().equalsTo(new MultiDomainServerState(
- "o=test:"+delMsg1.getCSN()+";o=test2:;")));
-
- // receive change 2 from suffix 2
- msg = serverECL.receive();
- eclu = (ECLUpdateMsg)msg;
- u = eclu.getUpdateMsg();
- debugInfo(tn, "RESULT:" + u.getCSN());
- assertTrue(u.getCSN().equals(csn2), "RESULT:" + u.getCSN());
- assertTrue(eclu.getCookie().equalsTo(new MultiDomainServerState(
- "o=test2:"+delMsg2.getCSN()+";"+
- "o=test:"+delMsg1.getCSN()+";")));
-
- // receive Done
- msg = serverECL.receive();
- debugInfo(tn, "RESULT:" + msg);
- assertTrue(msg instanceof DoneMsg, "RESULT:" + msg);
-
- debugInfo(tn, "Ending test successfully");
- }
- finally
- {
- stop(serverECL, server01, server02);
- }
- }
-
- /**
- * From embedded ECL (no remote session)
- * With empty RS, simple search should return only root entry.
- */
- private void ECLEmpty() throws Exception
- {
- String tn = "ECLEmpty";
- debugInfo(tn, "Starting test\n\n");
-
- // root entry returned
- searchOnChangelog("(objectclass=*)", Collections.<String> emptySet(), createCookieControl(""),
- 1, ResultCode.SUCCESS, tn);
-
- debugInfo(tn, "Ending test successfully");
- }
-
- /**
- * Build a list of controls including the cookie provided.
- * @param cookie The provided cookie.
- * @return The built list of controls.
- */
- private List<Control> createCookieControl(String cookie) throws DirectoryException
- {
- final MultiDomainServerState state = new MultiDomainServerState(cookie);
- final Control cookieControl = new ExternalChangelogRequestControl(true, state);
- return newList(cookieControl);
- }
-
- /**
- * Utility - creates an LDIFWriter to dump result entries.
- */
- private static LDIFWriter getLDIFWriter() throws Exception
- {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- LDIFExportConfig exportConfig = new LDIFExportConfig(stream);
- return new LDIFWriter(exportConfig);
- }
-
- /** Add an entry in the database */
- private void addEntry(Entry entry) throws Exception
- {
- waitOpResult(connection.processAdd(entry), ResultCode.SUCCESS);
- assertNotNull(getEntry(entry.getDN(), 1000, true));
- }
-
- private void ECLOnPrivateBackend() throws Exception
- {
- String tn = "ECLOnPrivateBackend";
- debugInfo(tn, "Starting test");
-
- ReplicationBroker server01 = null;
- LDAPReplicationDomain domain = null;
- LDAPReplicationDomain domain2 = null;
- Backend<?> backend2 = null;
-
- // Use different values than other tests to avoid test interactions in concurrent test runs
- final String backendId2 = tn + 2;
- final DN baseDN2 = DN.decode("o=" + backendId2);
- try
- {
- server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
- DomainFakeCfg domainConf = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort);
- domain = startNewDomain(domainConf, null, null);
-
- // create and publish 1 change on each suffix
- long time = TimeThread.getTime();
-
- CSN csn1 = new CSN(time, 1, SERVER_ID_1);
- DeleteMsg delMsg1 = newDeleteMsg("o=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, "ECLBasicMsg1uid");
- server01.publish(delMsg1);
- debugInfo(tn, "publishes:" + delMsg1);
-
- // Configure replication on this backend
- // Add the root entry in the backend
- backend2 = initializeTestBackend(false, backendId2);
- backend2.setPrivateBackend(true);
- SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
-
- DomainFakeCfg domainConf2 = new DomainFakeCfg(baseDN2, 1602, replServers);
- domain2 = startNewDomain(domainConf2, null, null);
-
- Thread.sleep(1000);
- addEntry(createEntry(baseDN2));
-
- // Search on ECL from start on all suffixes
- // Expect root entry returned
- String cookie = "";
- searchOnCookieChangelog("(targetDN=*)", cookie, 2, tn, SUCCESS);
-
- ExternalChangelogDomainCfg eclCfg = new ExternalChangelogDomainFakeCfg(false, null, null);
- domainConf2.setExternalChangelogDomain(eclCfg);
- domain2.applyConfigurationChange(domainConf2);
-
- // Expect only entry from o=test returned
- searchOnCookieChangelog("(targetDN=*)", cookie, 1, tn, SUCCESS);
-
- // Test lastExternalChangelogCookie attribute of the ECL
- // (does only refer to non private backend)
- MultiDomainServerState expectedLastCookie =
- new MultiDomainServerState("o=test:" + csn1 + ";");
-
- String lastCookie = readLastCookie();
- assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(lastCookie)),
- " Expected last cookie attribute value:" + expectedLastCookie +
- " Read from server: " + lastCookie + " are equal :");
- }
- finally
- {
- remove(domain, domain2);
- removeTestBackend(backend2);
- stop(server01);
- }
- debugInfo(tn, "Ending test successfully");
- }
-
- /**
- * From embedded ECL Search ECL with 4 messages on 2 suffixes from 2 brokers.
- * Test with a mix of domains, a mix of DSes.
- */
- @Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
- public void TestECLWithTwoDomains() throws Exception
- {
- replicationServer.getChangelogDB().setPurgeDelay(0);
-
-
- String tn = "TestECLWithTwoDomains";
- debugInfo(tn, "Starting test");
-
- ReplicationBroker s1test = null;
- ReplicationBroker s1test2 = null;
- ReplicationBroker s2test = null;
- ReplicationBroker s2test2 = null;
-
- Backend<?> backend2 = null;
- LDAPReplicationDomain domain1 = null;
- LDAPReplicationDomain domain2 = null;
- try
- {
- backend2 = initializeTestBackend(true, TEST_BACKEND_ID2);
-
- s1test = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
- s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
- 100, replicationServerPort, brokerSessionTimeout, EMPTY_DN_GENID);
- DomainFakeCfg domainConf1 = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort);
- domain1 = startNewDomain(domainConf1, null, null);
- DomainFakeCfg domainConf2 = newFakeCfg(TEST_ROOT_DN2, SERVER_ID_2, replicationServerPort);
- domain2 = startNewDomain(domainConf2, null, null);
- Thread.sleep(500);
-
- // Produce updates
- long time = TimeThread.getTime();
- int ts = 1;
- CSN csn1 = new CSN(time, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn1, tn, 1);
-
- CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
- publishDeleteMsgInOTest2(s2test2, csn2, tn, 2);
-
- CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
- publishDeleteMsgInOTest2(s2test2, csn3, tn, 3);
-
- CSN csn4 = new CSN(time, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn4, tn, 4);
-
- // Changes are :
- // s1 s2
- // o=test msg1/msg4
- // o=test2 msg2/msg2
-
- // search on 'cn=changelog'
- LDIFWriter ldifWriter = getLDIFWriter();
- String cookie = "";
- InternalSearchOperation searchOp =
- searchOnCookieChangelog("(targetDN=*" + tn + "*)", cookie, 4, tn, SUCCESS);
- cookie = readCookie(searchOp.getSearchEntries(), 2);
-
- // Now start from last cookie and expect to get ONLY the 4th change
- searchOp = searchOnCookieChangelog("(targetDN=*" + tn + "*)", cookie, 1, tn, SUCCESS);
- cookie = assertContainsAndReadCookie(tn, searchOp.getSearchEntries(), ldifWriter, csn4);
-
- // Now publishes a new change and search from the previous cookie
- CSN csn5 = new CSN(time, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn5, tn, 5);
-
- // Changes are :
- // s1 s2
- // o=test msg1,msg5 msg4
- // o=test2 msg3 msg2
-
- searchOp = searchOnCookieChangelog("(targetDN=*" + tn + "*)", cookie, 1,tn, SUCCESS);
- cookie = assertContainsAndReadCookie(tn, searchOp.getSearchEntries(), ldifWriter, csn5);
-
- cookie = "";
- searchOp = searchOnCookieChangelog("(targetDN=*" + tn + "*,o=test)", cookie, 3, tn, SUCCESS);
- // we expect msg1 + msg4 + msg5
- cookie = assertContainsAndReadCookie(tn, searchOp.getSearchEntries(), ldifWriter, csn1, csn4, csn5);
-
- // Test startState ("first cookie") of the ECL
- // --
- s1test2 = openReplicationSession(TEST_ROOT_DN2, 1203,
- 100, replicationServerPort, brokerSessionTimeout, EMPTY_DN_GENID);
- s2test = openReplicationSession(TEST_ROOT_DN, 1204,
- 100, replicationServerPort, brokerSessionTimeout);
- Thread.sleep(500);
-
- time = TimeThread.getTime();
- CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
- publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
-
- CSN csn7 = new CSN(time, ts++, s2test.getServerId());
- publishDeleteMsgInOTest(s2test, csn7, tn, 7);
-
- CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
- publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
-
- CSN csn9 = new CSN(time, ts++, s2test.getServerId());
- publishDeleteMsgInOTest(s2test, csn9, tn, 9);
- Thread.sleep(500);
-
- final ServerState oldestState = getDomainOldestState(TEST_ROOT_DN);
- assertEquals(oldestState.getCSN(s1test.getServerId()), csn1);
- assertEquals(oldestState.getCSN(s2test.getServerId()), csn7);
-
- final ServerState oldestState2 = getDomainOldestState(TEST_ROOT_DN2);
- assertEquals(oldestState2.getCSN(s2test2.getServerId()), csn2);
- assertEquals(oldestState2.getCSN(s1test2.getServerId()), csn6);
-
- // Test lastExternalChangelogCookie attribute of the ECL
- MultiDomainServerState expectedLastCookie =
- new MultiDomainServerState("o=test:" + csn5 + " " + csn9
- + ";o=test2:" + csn3 + " " + csn8 + ";");
-
- String lastCookie = readLastCookie();
-
- assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(lastCookie)),
- " Expected last cookie attribute value:" + expectedLastCookie +
- " Read from server: " + lastCookie + " are equal :");
-
- // Test invalid cookie
- cookie += ";o=test6:";
- debugInfo(tn, "Search with bad domain in cookie=" + cookie);
- searchOp = searchOnCookieChangelog("(targetDN=*" + tn + "*,o=test)", cookie, 0, tn,
- PROTOCOL_ERROR);
- final String cookieStr = new MultiDomainServerState(cookie).toString();
- Assertions.assertThat(searchOp.getErrorMessage().toString()).startsWith(
- ERR_INVALID_COOKIE_SYNTAX.get(cookieStr).toString());
-
- // Test unknown domain in provided cookie
- // This case seems to be very hard to obtain in the real life
- // (how to remove a domain from a RS topology ?)
- // let's do a very quick test here.
- String newCookie = lastCookie + "o=test6:";
- debugInfo(tn, "Search with bad domain in cookie=" + cookie);
- searchOp = searchOnCookieChangelog("(targetDN=*" + tn + "*,o=test)", newCookie, 0,
- tn, UNWILLING_TO_PERFORM);
-
- // Test missing domain in provided cookie
- newCookie = lastCookie.substring(lastCookie.indexOf(';')+1);
- debugInfo(tn, "Search with bad domain in cookie=" + cookie);
- searchOp = searchOnCookieChangelog("(targetDN=*" + tn + "*,o=test)", newCookie, 0,
- tn, UNWILLING_TO_PERFORM);
- String expectedError = ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE
- .get("o=test:;","<"+ newCookie + "o=test:;>").toString();
- assertThat(searchOp.getErrorMessage().toString()).isEqualToIgnoringCase(expectedError);
- }
- finally
- {
- remove(domain1, domain2);
- removeTestBackend(backend2);
- stop(s1test2, s2test, s1test, s2test2);
- }
- debugInfo(tn, "Ending test successfully");
- }
-
- private String readCookie(List<SearchResultEntry> entries, int i)
- {
- SearchResultEntry entry = entries.get(i);
- return entry.getAttribute("changelogcookie").get(0).iterator().next().toString();
- }
-
- private ServerState getDomainOldestState(DN baseDN)
- {
- return replicationServer.getReplicationServerDomain(baseDN).getOldestState();
- }
-
- private String assertContainsAndReadCookie(String tn, List<SearchResultEntry> entries,
- LDIFWriter ldifWriter, CSN... csns) throws Exception
- {
- assertThat(getCSNs(entries)).containsExactly(csns);
- debugAndWriteEntries(ldifWriter, entries, tn);
- return readCookie(entries, csns.length - 1);
- }
-
- private List<CSN> getCSNs(List<SearchResultEntry> entries)
- {
- List<CSN> results = new ArrayList<CSN>(entries.size());
- for (SearchResultEntry entry : entries)
- {
- results.add(new CSN(getAttributeValue(entry, "replicationCSN")));
- }
- return results;
- }
-
- private void publishDeleteMsgInOTest(ReplicationBroker broker, CSN csn,
- String tn, int i) throws DirectoryException
- {
- publishDeleteMsg(broker, csn, tn, i, TEST_ROOT_DN_STRING);
- }
-
- private void publishDeleteMsgInOTest2(ReplicationBroker broker, CSN csn,
- String tn, int i) throws DirectoryException
- {
- publishDeleteMsg(broker, csn, tn, i, TEST_ROOT_DN_STRING2);
- }
-
- private void publishDeleteMsg(ReplicationBroker broker, CSN csn, String tn,
- int i, String baseDn) throws DirectoryException
- {
- String dn = "uid=" + tn + i + "," + baseDn;
- DeleteMsg delMsg = newDeleteMsg(dn, csn, tn + "uuid" + i);
- broker.publish(delMsg);
- debugInfo(tn, " publishes " + delMsg.getCSN());
- }
-
- private DeleteMsg newDeleteMsg(String dn, CSN csn, String entryUUID) throws DirectoryException
- {
- return new DeleteMsg(DN.decode(dn), csn, entryUUID);
- }
-
- private InternalSearchOperation searchOnCookieChangelog(String filterString,
- String cookie, int expectedNbEntries, String testName, ResultCode expectedResultCode)
- throws Exception
- {
- debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]");
- return searchOnChangelog(filterString, ALL_ATTRIBUTES, createCookieControl(cookie),
- expectedNbEntries, expectedResultCode, testName);
- }
-
- private InternalSearchOperation searchOnChangelog(String filterString,
- int expectedNbEntries, String testName, ResultCode expectedResultCode)
- throws Exception
- {
- debugInfo(testName, " Search: " + filterString);
- return searchOnChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL,
- expectedNbEntries, expectedResultCode, testName);
- }
-
- private InternalSearchOperation searchOnChangelog(String filterString,
- Set<String> attributes, List<Control> controls, int expectedNbEntries,
- ResultCode expectedResultCode, String testName) throws Exception
- {
- InternalSearchOperation op = null;
- int cnt = 0;
- do
- {
- Thread.sleep(10);
- op = connection.processSearch(
- "cn=changelog",
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, // Size limit
- 0, // Time limit
- false, // Types only
- filterString,
- attributes,
- controls,
- null);
- cnt++;
- }
- while (cnt < 300 // wait at most 3s
- && op.getSearchEntries().size() != expectedNbEntries);
- final List<SearchResultEntry> entries = op.getSearchEntries();
- assertThat(entries).hasSize(expectedNbEntries);
- debugAndWriteEntries(getLDIFWriter(), entries, testName);
- waitOpResult(op, expectedResultCode);
- return op;
- }
-
- /** Test ECL content after replication changelogDB trimming */
- @Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
- public void testECLAfterChangelogTrim() throws Exception
- {
- String testName = "testECLAfterChangelogTrim";
- debugInfo(testName, "Starting test");
-
- ReplicationBroker server01 = null;
- try
- {
- // ---
- // 1. Populate the changelog and read the cookie
-
- // Creates broker on o=test
- server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
-
- final CSN[] csns = generateCSNs(3, SERVER_ID_1);
- publishDeleteMsgInOTest(server01, csns[0], testName, 1);
- final String firstCookie = assertLastCookieDifferentThanLastValue("");
- String lastCookie = firstCookie;
- publishDeleteMsgInOTest(server01, csns[1], testName, 2);
- lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
- publishDeleteMsgInOTest(server01, csns[2], testName, 3);
- lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
-
- // ---
- // 2. Now set up a very short purge delay on the replication changelogs
- // so that this test can play with a trimmed changelog.
- replicationServer.getChangelogDB().setPurgeDelay(1);
-
- // ---
- // 3. Assert that a request with an empty cookie returns nothing
- // since replication changelog has been trimmed
- String cookie= "";
- InternalSearchOperation searchOp =
- searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
-
- // ---
- // 4. Assert that a request with the current last cookie returns nothing
- // since replication changelog has been trimmed
- cookie = readLastCookie();
- debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
- searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
-
- // ---
- // 5. Assert that a request with an "old" cookie - one that refers to
- // changes that have been removed by the replication changelog trimming
- // returns the appropriate error.
- debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
- debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
- searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
- assertTrue(searchOp.getErrorMessage().toString().startsWith(
- ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()),
- searchOp.getErrorMessage().toString());
- }
- finally
- {
- stop(server01);
- // And reset changelog purge delay for the other tests.
- replicationServer.getChangelogDB().setPurgeDelay(15 * 1000);
- }
- debugInfo(testName, "Ending test successfully");
- }
-
- /** Test ECL content after a domain has been removed. */
- @Test(enabled = false, dependsOnMethods = { "PrimaryTest" })
- public void testECLAfterDomainIsRemoved() throws Exception
- {
- String testName = "testECLAfterDomainIsRemoved";
- debugInfo(testName, "Starting test");
-
- ReplicationBroker server01 = null;
- try
- {
- // ---
- // 1. Populate the changelog and read the cookie
-
- // Creates server broker on o=test
- server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, 100, replicationServerPort, brokerSessionTimeout);
-
- final CSN[] csns = generateCSNs(3, SERVER_ID_1);
- publishDeleteMsgInOTest(server01, csns[0], testName, 1);
- final String firstCookie = assertLastCookieDifferentThanLastValue("");
- String lastCookie = firstCookie;
- publishDeleteMsgInOTest(server01, csns[1], testName, 2);
- lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
- publishDeleteMsgInOTest(server01, csns[2], testName, 3);
- lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
-
- // ---
- // 2. Now remove the domain by sending a reset message
- server01.publish(new ResetGenerationIdMsg(23657));
-
- // ---
- // 3. Assert that a request with an empty cookie returns nothing
- // since replication changelog has been cleared
- String cookie= "";
- searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
-
- // ---
- // 4. Assert that a request with the current last cookie returns nothing
- // since replication changelog has been cleared
- cookie = readLastCookie();
- debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
- searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
-
- // ---
- // 5. Assert that a request with an "old" cookie - one that refers to
- // changes that have been removed by the replication changelog clearing
- // returns the appropriate error.
- debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
- debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
- final InternalSearchOperation searchOp =
- searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
- assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString());
- }
- finally
- {
- stop(server01);
- }
- debugInfo(testName, "Ending test successfully");
- }
-
- private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception
- {
- int cnt = 0;
- while (cnt < 100)
- {
- final String newCookie = readLastCookie();
- if (!newCookie.equals(lastCookie))
- {
- return newCookie;
- }
- cnt++;
- Thread.sleep(10);
- }
- Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'");
- return null;// dead code
- }
-
- private void debugAndWriteEntries(LDIFWriter ldifWriter,
- List<SearchResultEntry> entries, String tn) throws Exception
- {
- if (entries != null)
- {
- for (SearchResultEntry entry : entries)
- {
- // Can use entry.toSingleLineString()
- debugInfo(tn, " RESULT entry returned:" + entry.toLDIFString());
- if (ldifWriter != null)
- {
- ldifWriter.writeEntry(entry);
- }
- }
- }
- }
-
- private String readLastCookie() throws Exception
- {
- String cookie = "";
- LDIFWriter ldifWriter = getLDIFWriter();
-
- final Set<String> attrs = newHashSet("lastExternalChangelogCookie");
- List<SearchResultEntry> entries = searchOnRootDSE(attrs).getSearchEntries();
- if (entries != null)
- {
- for (SearchResultEntry resultEntry : entries)
- {
- ldifWriter.writeEntry(resultEntry);
- cookie = getAttributeValue(resultEntry, "lastexternalchangelogcookie");
- }
- }
- return cookie;
- }
-
- /** simple update to be received*/
- private void ECLAllOps() throws Exception
- {
- String tn = "ECLAllOps";
- debugInfo(tn, "Starting test\n\n");
- ReplicationBroker server01 = null;
- ReplicationBroker server02 = null;
- LDAPReplicationDomain domain = null;
- try
- {
- // Creates brokers on o=test and o=test2
- server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
- server02 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
- 100, replicationServerPort, brokerSessionTimeout);
-
- DomainFakeCfg domainConf = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort);
- domain = startNewDomain(domainConf, null, null);
-
- String user1entryUUID = "11111111-1111-1111-1111-111111111111";
- String baseUUID = "22222222-2222-2222-2222-222222222222";
-
- final int expectedNbEntries = 4;
- CSN[] csns = generateCSNs(expectedNbEntries, SERVER_ID_1);
-
- // Publish DEL
- int csnCounter = 0;
- publishDeleteMsgInOTest(server01, csns[csnCounter], tn, csnCounter + 1);
-
- // Publish ADD
- csnCounter++;
- Entry entry = TestCaseUtils.entryFromLdifString(
- "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n"
- + "objectClass: top\n"
- + "objectClass: domain\n"
- + "entryUUID: " + user1entryUUID + "\n");
- AddMsg addMsg = new AddMsg(
- csns[csnCounter],
- DN.decode("uid="+tn+"2," + TEST_ROOT_DN_STRING),
- user1entryUUID,
- baseUUID,
- entry.getObjectClassAttribute(),
- entry.getAttributes(),
- Collections.<Attribute> emptyList());
- server01.publish(addMsg);
- debugInfo(tn, " publishes " + addMsg.getCSN());
-
- // Publish MOD
- csnCounter++;
- DN baseDN = DN.decode("uid=" + tn + "3," + TEST_ROOT_DN_STRING);
- List<Modification> mods = createMods("description", "new value");
- ModifyMsg modMsg = new ModifyMsg(csns[csnCounter], baseDN, mods, tn + "uuid3");
- server01.publish(modMsg);
- debugInfo(tn, " publishes " + modMsg.getCSN());
-
- // Publish modDN
- csnCounter++;
- final DN newSuperior = TEST_ROOT_DN2;
- ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
- DN.decode("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN
- RDN.decode("uid="+tn+"new4"), // new rdn
- true, // deleteoldrdn
- newSuperior);
- op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csns[csnCounter],
- tn + "uuid4", "newparentId"));
- LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
- ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
- server01.publish(modDNMsg);
- debugInfo(tn, " publishes " + modDNMsg.getCSN());
-
- String cookie= "";
- InternalSearchOperation searchOp =
- searchOnCookieChangelog("(targetdn=*" + tn + "*,o=test)", cookie, expectedNbEntries, tn, SUCCESS);
-
- // test 4 entries returned
- final String[] cookies = new String[expectedNbEntries];
- for (int j = 0; j < cookies.length; j++)
- {
- cookies[j] = "o=test:" + csns[j] + ";";
- }
-
- int i=0;
- for (SearchResultEntry resultEntry : searchOp.getSearchEntries())
- {
- i++;
- checkDn(csns[i - 1], resultEntry);
- checkValue(resultEntry, "targetdn", "uid=" + tn + i + "," + TEST_ROOT_DN_STRING);
- checkValue(resultEntry, "replicationcsn", csns[i - 1].toString());
- checkValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
- checkValue(resultEntry, "changelogcookie", cookies[i - 1]);
- assertNull(getAttributeValue(resultEntry, "changenumber"));
-
- if (i==1)
- {
- checkValue(resultEntry, "changetype", "delete");
- checkValue(resultEntry,"targetentryuuid",tn+"uuid1");
- } else if (i==2)
- {
- checkValue(resultEntry, "changetype", "add");
- checkLDIF(resultEntry, "changes",
- "objectClass: domain",
- "objectClass: top",
- "entryUUID: 11111111-1111-1111-1111-111111111111");
- checkValue(resultEntry,"targetentryuuid",user1entryUUID);
- } else if (i==3)
- {
- // check the MOD entry has the right content
- checkValue(resultEntry, "changetype", "modify");
- checkLDIF(resultEntry, "changes",
- "replace: description",
- "description: new value",
- "-");
- checkValue(resultEntry,"targetentryuuid",tn+"uuid3");
- } else if (i==4)
- {
- checkValue(resultEntry,"changetype","modrdn");
- checkValue(resultEntry,"targetentryuuid",tn+"uuid4");
- checkValue(resultEntry,"newrdn","uid=ECLAllOpsnew4");
- if (newSuperior != null)
- {
- checkValue(resultEntry, "newsuperior", TEST_ROOT_DN_STRING2);
- }
- checkValue(resultEntry,"deleteoldrdn","true");
- }
- }
-
- // Test the response control with ldapsearch tool
- String result = ldapsearch("cn=changelog");
- debugInfo(tn, "Entries:" + result);
- assertThat(getControls(result)).containsExactly(cookies);
- }
- finally
- {
- remove(domain);
- stop(server01, server02);
- }
- debugInfo(tn, "Ending test with success");
- }
-
- private CSN[] generateCSNs(int nb, int serverId)
- {
- long startTime = TimeThread.getTime();
-
- CSN[] csns = new CSN[nb];
- for (int i = 0; i < nb; i++)
- {
- // seqNum must be greater than 0, so start at 1
- csns[i] = new CSN(startTime + i, i + 1, serverId);
- }
- return csns;
- }
-
- private void checkDn(CSN csn, SearchResultEntry resultEntry)
- {
- String actualDN = resultEntry.getDN().toNormalizedString();
- String expectedDN =
- "replicationcsn=" + csn + "," + TEST_ROOT_DN_STRING + ",cn=changelog";
- assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
- }
-
- private List<String> getControls(String resultString) throws Exception
- {
- final BufferedReader br = new BufferedReader(new StringReader(resultString));
- final List<String> ctrlList = new ArrayList<String>();
- while (true)
- {
- final String s = br.readLine();
- if (s == null)
- {
- break;
- }
- if (!s.startsWith("#"))
- {
- continue;
- }
- final String[] a = s.split(": ");
- if (a.length != 2)
- {
- break;
- }
- ctrlList.add(a[1]);
- }
- return ctrlList;
- }
-
- private static final ByteArrayOutputStream oStream = new ByteArrayOutputStream();
- private static final ByteArrayOutputStream eStream = new ByteArrayOutputStream();
-
- private String ldapsearch(String baseDN)
- {
- // test search as directory manager returns content
- String[] args3 =
- {
- "-h", "127.0.0.1",
- "-p", String.valueOf(TestCaseUtils.getServerAdminPort()),
- "-Z", "-X",
- "-D", "cn=Directory Manager",
- "-w", "password",
- "-b", baseDN,
- "-s", "sub",
- "--control", "1.3.6.1.4.1.26027.1.5.4:false:;",
- "(objectclass=*)"
- };
-
- oStream.reset();
- eStream.reset();
- int retVal = LDAPSearch.mainSearch(args3, false, oStream, eStream);
- assertEquals(0, retVal, "Returned error: " + eStream);
- return oStream.toString();
- }
-
- private static void checkValue(Entry entry, String attrName, String expectedValue)
- {
- assertFalse(expectedValue.contains("\n"),
- "Use checkLDIF() method for asserting on value: \"" + expectedValue + "\"");
- final String actualValue = getAttributeValue(entry, attrName);
- assertThat(actualValue)
- .as("In entry " + entry + " incorrect value for attr '" + attrName + "'")
- .isEqualToIgnoringCase(expectedValue);
- }
-
- /**
- * Asserts the attribute value as LDIF to ignore lines ordering.
- */
- private static void checkLDIF(Entry entry, String attrName, String... expectedLDIFLines)
- {
- final String actualVal = getAttributeValue(entry, attrName);
- final Set<Set<String>> actual = toLDIFEntries(actualVal.split("\n"));
- final Set<Set<String>> expected = toLDIFEntries(expectedLDIFLines);
- assertThat(actual)
- .as("In entry " + entry + " incorrect value for attr '" + attrName + "'")
- .isEqualTo(expected);
- }
-
- /**
- * Returns a data structure allowing to compare arbitrary LDIF lines. The
- * algorithm splits LDIF entries on lines containing only a dash ("-"). It
- * then returns LDIF entries and lines in an LDIF entry in ordering
- * insensitive data structures.
- * <p>
- * Note: a last line with only a dash ("-") is significant. i.e.:
- *
- * <pre>
- * <code>
- * boolean b = toLDIFEntries("-").equals(toLDIFEntries()));
- * System.out.println(b); // prints "false"
- * </code>
- * </pre>
- */
- private static Set<Set<String>> toLDIFEntries(String... ldifLines)
- {
- final Set<Set<String>> results = new HashSet<Set<String>>();
- Set<String> ldifEntryLines = new HashSet<String>();
- for (String ldifLine : ldifLines)
- {
- if (!"-".equals(ldifLine))
- {
- // same entry keep adding
- ldifEntryLines.add(ldifLine);
- }
- else
- {
- // this is a new entry
- results.add(ldifEntryLines);
- ldifEntryLines = new HashSet<String>();
- }
- }
- results.add(ldifEntryLines);
- return results;
- }
-
- private static String getAttributeValue(Entry entry, String attrName)
- {
- List<Attribute> attrs = entry.getAttribute(attrName.toLowerCase());
- if (attrs == null)
- {
- return null;
- }
- Attribute a = attrs.iterator().next();
- AttributeValue av = a.iterator().next();
- return av.toString();
- }
-
- private static void checkValues(Entry entry, String attrName, Set<String> expectedValues)
- {
- final Set<String> values = new HashSet<String>();
- for (Attribute a : entry.getAttribute(attrName))
- {
- for (AttributeValue av : a)
- {
- values.add(av.toString());
- }
- }
- assertThat(values)
- .as("In entry " + entry + " incorrect values for attr '" + attrName + "'")
- .isEqualTo(expectedValues);
- }
-
- /**
- * Test persistent search
- */
- private CSN ECLPsearch(boolean changesOnly, boolean compatMode) throws Exception
- {
- String tn = "ECLPsearch_" + changesOnly + "_" + compatMode;
- debugInfo(tn, "Starting test \n\n");
- Socket s;
-
- // create stats
- for (ConnectionHandler<?> ch : DirectoryServer.getConnectionHandlers())
- {
- if (ch instanceof LDAPConnectionHandler)
- {
- LDAPConnectionHandler lch = (LDAPConnectionHandler) ch;
- if (!lch.useSSL())
- {
- ldapStatistics = lch.getStatTracker();
- }
- }
- }
- assertNotNull(ldapStatistics);
-
- try
- {
- // Create broker on suffix
- ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
-
- CSN[] csns = generateCSNs(2, SERVER_ID_1);
-
- // Produce update on this suffix
- DeleteMsg delMsg =
- newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0],
- "11111111-1112-1113-1114-111111111114");
- debugInfo(tn, " publishing " + delMsg.getCSN());
- server01.publish(delMsg);
- Thread.sleep(500); // let's be sure the message is in the RS
-
- // Creates cookie control
- String cookie = "";
- List<Control> controls = createCookieControl(cookie);
- if (compatMode)
- {
- cookie = null;
- controls = new ArrayList<Control>(0);
- }
-
- // Creates psearch control
- Set<PersistentSearchChangeType> changeTypes =
- EnumSet.of(ADD, DELETE, MODIFY, MODIFY_DN);
- controls.add(new PersistentSearchControl(changeTypes, changesOnly, true));
-
- SearchRequestProtocolOp searchRequest =
- createSearchRequest("(targetDN=*" + tn + "*,o=test)", null);
-
- // Connects and bind
- debugInfo(tn, "Search with cookie=" + cookie + "\"");
- s = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
- org.opends.server.tools.LDAPReader r = new org.opends.server.tools.LDAPReader(s);
- LDAPWriter w = new LDAPWriter(s);
- s.setSoTimeout(5000);
- bindAsManager(w, r);
-
- // Since we are going to be watching the post-response count, we need to
- // wait for the server to become idle before kicking off the next request
- // to ensure that any remaining post-response processing from the previous
- // operation has completed.
- assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000));
-
- InvocationCounterPlugin.resetAllCounters();
-
- final Results results = new Results();
- results.searchReferences = ldapStatistics.getSearchResultReferences();
- results.searchesDone = ldapStatistics.getSearchResultsDone();
-
- debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)");
- w.writeMessage(new LDAPMessage(2, searchRequest, controls));
- Thread.sleep(500);
-
- if (!changesOnly)
- {
- // Wait for change 1
- debugInfo(tn, "Waiting for init search expected to return change 1");
- readMessages(tn, r, results, 1, "Init search Result=");
- for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
- {
- // FIXME:ECL Double check 1 is really the valid value here.
- final String cn = compatMode ? "1" : "0";
- checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
- }
- debugInfo(tn, "INIT search done with success. searchEntries="
- + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
- }
-
- // Produces change 2
- final CSN csn = csns[1];
- String expectedDn = "uid=" + tn + "2," + TEST_ROOT_DN_STRING;
- delMsg = newDeleteMsg(expectedDn, csn,
- "11111111-1112-1113-1114-111111111115");
- debugInfo(tn, " publishing " + delMsg.getCSN());
- server01.publish(delMsg);
- Thread.sleep(1000);
-
- debugInfo(tn, delMsg.getCSN() +
- " published , psearch will now wait for new entries");
-
- // wait for the 1 new entry
- readMessages(tn, r, results, 1, "psearch search Result=");
- SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0);
- Thread.sleep(1000);
-
- // Check we received change 2
- checkAttributeValue(searchResultEntry, "targetDN", expectedDn);
- debugInfo(tn, "Second search done successfully : " + searchResultEntry);
-
- stop(server01);
- waitForClose(s);
-
- // TODO: Testing ACI is disabled because it is currently failing when
- // ran in the precommit target while it works well when running alone.
- // anonymous search returns entries from cn=changelog whereas it
- // should not. Probably a previous test in the nightlytests suite is
- // removing/modifying some ACIs...
- // When problem found, we have to re-enable this test.
- if (false)
- {
- // ACI step
- debugInfo(tn, "Starting ACI step");
- s = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
- r = new org.opends.server.tools.LDAPReader(s);
- w = new LDAPWriter(s);
- s.setSoTimeout(5000);
- bindAsWhoEver(w, r, "toto", "tutu", LDAPResultCode.OPERATIONS_ERROR);
-
- searchRequest =
- createSearchRequest("(targetDN=*directpsearch*,o=test)", null);
-
- debugInfo(tn, "ACI test : sending search");
- w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl("")));
-
- LDAPMessage message;
- int searchesDone = 0;
- int searchEntries = 0;
- int searchReferences = 0;
- while ((searchesDone==0) && (message = r.readMessage()) != null)
- {
- debugInfo(tn, "ACI test : message returned " +
- message.getProtocolOpType() + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- //assertTrue(false, "Unexpected entry returned in ACI test of " + tn + searchResultEntry);
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- searchReferences++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
- // search should end with success
- assertEquals(searchesDone, 1);
- // but returning no entry
- assertEquals(searchEntries,0, "Bad search entry# in ACI test of " + tn);
- }
-
- close(s);
- while (!s.isClosed())
- Thread.sleep(100);
-
- return csn;
- }
- finally
- {
- debugInfo(tn, "Ends test successfully");
- }
- }
-
- private void checkAttributeValue(SearchResultEntryProtocolOp entry,
- String attrType, String expectedDN)
- {
- for (LDAPAttribute a : entry.getAttributes())
- {
- if (attrType.equalsIgnoreCase(a.getAttributeType()))
- {
- for (ByteString av : a.getValues())
- {
- assertThat(av.toString())
- .as("Wrong entry returned by psearch")
- .isEqualToIgnoringCase(expectedDN);
- return;
- }
- }
- }
- fail();
- }
-
- private SearchRequestProtocolOp createSearchRequest(String filterString,
- final Set<String> attributes) throws LDAPException
- {
- return new SearchRequestProtocolOp(
- ByteString.valueOf("cn=changelog"),
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- Integer.MAX_VALUE,
- Integer.MAX_VALUE,
- false,
- LDAPFilter.decode(filterString),
- attributes);
- }
-
- /**
- * Test parallel simultaneous persistent search with different filters.
- */
- @Test(enabled = false, groups = "slow", dependsOnMethods = { "PrimaryTest" })
- public void FullTestSimultaneousPersistentSearches() throws Exception
- {
- String tn = "FullTestSimultaneousPersistentSearches";
- debugInfo(tn, "Starting test \n\n");
- Socket s1 = null, s2 = null, s3 = null;
- ReplicationBroker server01 = null;
- ReplicationBroker server02 = null;
- boolean compatMode = false;
- boolean changesOnly = false;
-
- // create stats
- for (ConnectionHandler<?> ch : DirectoryServer.getConnectionHandlers())
- {
- if (ch instanceof LDAPConnectionHandler)
- {
- LDAPConnectionHandler lch = (LDAPConnectionHandler) ch;
- if (!lch.useSSL())
- {
- ldapStatistics = lch.getStatTracker();
- }
- }
- }
- assertNotNull(ldapStatistics);
-
- try
- {
- // Create broker on o=test
- DomainFakeCfg config1 = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort);
- config1.setChangetimeHeartbeatInterval(100); // ms
- server01 = openReplicationSession(config1, replicationServerPort,
- brokerSessionTimeout, getGenerationId(TEST_ROOT_DN));
-
- // Create broker on o=test2
- DomainFakeCfg config2 = newFakeCfg(TEST_ROOT_DN2, SERVER_ID_2, replicationServerPort);
- config2.setChangetimeHeartbeatInterval(100); //ms
- server02 = openReplicationSession(config2, replicationServerPort,
- brokerSessionTimeout, EMPTY_DN_GENID);
-
- int ts = 1;
- // Produce update 1
- CSN csn1 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_1);
- DeleteMsg delMsg1 =
- newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn1,
- "11111111-1111-1111-1111-111111111111");
- debugInfo(tn, " publishing " + delMsg1);
- server01.publish(delMsg1);
- Thread.sleep(500); // let's be sure the message is in the RS
-
- // Produce update 2
- CSN csn2 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
- DeleteMsg delMsg2 =
- newDeleteMsg("uid=" + tn + "2," + TEST_ROOT_DN_STRING2, csn2,
- "22222222-2222-2222-2222-222222222222");
- debugInfo(tn, " publishing " + delMsg2);
- server02.publish(delMsg2);
- Thread.sleep(500); // let's be sure the message is in the RS
-
- // Produce update 3
- CSN csn3 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
- DeleteMsg delMsg3 =
- newDeleteMsg("uid=" + tn + "3," + TEST_ROOT_DN_STRING2, csn3,
- "33333333-3333-3333-3333-333333333333");
- debugInfo(tn, " publishing " + delMsg3);
- server02.publish(delMsg3);
- Thread.sleep(500); // let's be sure the message is in the RS
-
- // Creates cookie control
- String cookie = "";
- List<Control> controls = createCookieControl(cookie);
- if (compatMode)
- {
- cookie = null;
- controls = new ArrayList<Control>(0);
- }
-
- // Creates psearch control
- Set<PersistentSearchChangeType> changeTypes =
- EnumSet.of(ADD, DELETE, MODIFY, MODIFY_DN);
- controls.add(new PersistentSearchControl(changeTypes, changesOnly, true));
-
- final Set<String> attributes = ALL_ATTRIBUTES;
- SearchRequestProtocolOp searchRequest1 = createSearchRequest("(targetDN=*"+tn+"*,o=test)", attributes);
- SearchRequestProtocolOp searchRequest2 = createSearchRequest("(targetDN=*"+tn+"*,o=test2)", attributes);
- SearchRequestProtocolOp searchRequest3 = createSearchRequest("objectclass=*", attributes);
-
- // Connects and bind
- s1 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
- org.opends.server.tools.LDAPReader r1 = new org.opends.server.tools.LDAPReader(s1);
- LDAPWriter w1 = new LDAPWriter(s1);
- s1.setSoTimeout(15000);
- bindAsManager(w1, r1);
-
- // Connects and bind
- s2 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
- org.opends.server.tools.LDAPReader r2 = new org.opends.server.tools.LDAPReader(s2);
- LDAPWriter w2 = new LDAPWriter(s2);
- s2.setSoTimeout(30000);
- bindAsManager(w2, r2);
-
- // Connects and bind
- s3 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
- org.opends.server.tools.LDAPReader r3 = new org.opends.server.tools.LDAPReader(s3);
- LDAPWriter w3 = new LDAPWriter(s3);
- s3.setSoTimeout(15000);
- bindAsManager(w3, r3);
-
- // Since we are going to be watching the post-response count, we need to
- // wait for the server to become idle before kicking off the next request
- // to ensure that any remaining post-response processing from the previous
- // operation has completed.
- assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000));
-
- InvocationCounterPlugin.resetAllCounters();
-
- final Results results = new Results();
- results.searchesDone = ldapStatistics.getSearchResultsDone();
-
- w1.writeMessage(new LDAPMessage(2, searchRequest1, controls));
- Thread.sleep(500);
- w2.writeMessage(new LDAPMessage(2, searchRequest2, controls));
- Thread.sleep(500);
- w3.writeMessage(new LDAPMessage(2, searchRequest3, controls));
- Thread.sleep(500);
-
- if (!changesOnly)
- {
- debugInfo(tn, "Search1 Persistent filter=" + searchRequest1.getFilter()
- + " expected to return change " + csn1);
- {
- readMessages(tn, r1, results, 1, "Search1 Result=");
- final int searchEntries = results.searchResultEntries.size();
- if (searchEntries == 1)
- {
- final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1);
- final String cn = compatMode ? "10" : "0";
- checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
- checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
- }
- debugInfo(tn, "Search1 done with success. searchEntries="
- + searchEntries + " #searchesDone=" + results.searchesDone);
- }
-
- {
- debugInfo(tn, "Search 2 Persistent filter=" + searchRequest2.getFilter()
- + " expected to return change " + csn2 + " & " + csn3);
- readMessages(tn, r2, results, 2, "Search 2 Result=");
- for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
- {
- final String cn = compatMode ? "10" : "0";
- checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
- }
- debugInfo(tn, "Search2 done with success. searchEntries="
- + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
- }
-
-
- debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter()
- + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
- readMessages(tn, r3, results, 4, "Search3 Result=");
- debugInfo(tn, "Search3 done with success. searchEntries="
- + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
- }
-
- // Produces additional change
- CSN csn11 = new CSN(TimeThread.getTime(), 11, SERVER_ID_1);
- String expectedDn11 = "uid=" + tn + "11," + TEST_ROOT_DN_STRING;
- DeleteMsg delMsg11 = newDeleteMsg(expectedDn11, csn11,
- "44444444-4444-4444-4444-444444444444");
- debugInfo(tn, " publishing " + delMsg11);
- server01.publish(delMsg11);
- Thread.sleep(500);
- debugInfo(tn, delMsg11.getCSN() + " published additionally ");
-
- // Produces additional change
- CSN csn12 = new CSN(TimeThread.getTime(), 12, SERVER_ID_2);
- String expectedDn12 = "uid=" + tn + "12," + TEST_ROOT_DN_STRING2;
- DeleteMsg delMsg12 = newDeleteMsg(expectedDn12, csn12,
- "55555555-5555-5555-5555-555555555555");
- debugInfo(tn, " publishing " + delMsg12 );
- server02.publish(delMsg12);
- Thread.sleep(500);
- debugInfo(tn, delMsg12.getCSN() + " published additionally ");
-
- // Produces additional change
- CSN csn13 = new CSN(TimeThread.getTime(), 13, SERVER_ID_2);
- String expectedDn13 = "uid=" + tn + "13," + TEST_ROOT_DN_STRING2;
- DeleteMsg delMsg13 = newDeleteMsg(expectedDn13, csn13,
- "66666666-6666-6666-6666-666666666666");
- debugInfo(tn, " publishing " + delMsg13);
- server02.publish(delMsg13);
- Thread.sleep(500);
- debugInfo(tn, delMsg13.getCSN() + " published additionally ");
-
- // wait 11
- readMessages(tn, r1, results, 1, "Search 11 Result=");
- Thread.sleep(1000);
- debugInfo(tn, "Search 1 successfully receives additional changes");
-
- // wait 12 & 13
- readMessages(tn, r2, results, 2, "psearch search 12 Result=");
- Thread.sleep(1000);
- debugInfo(tn, "Search 2 successfully receives additional changes");
-
- // wait 11 & 12 & 13
- readMessages(tn, r3, results, 3, "psearch search 13 Result=");
- SearchResultEntryProtocolOp searchResultEntry =
- results.searchResultEntries.get(results.searchResultEntries.size() - 1);
- Thread.sleep(1000);
-
- // Check we received change 13
- checkAttributeValue(searchResultEntry, "targetDN", expectedDn13);
- debugInfo(tn, "Search 3 successfully receives additional changes");
- }
- finally
- {
- stop(server01, server02);
- waitForClose(s1, s2, s3);
- }
- debugInfo(tn, "Ends test successfully");
- }
-
- private void readMessages(String tn, org.opends.server.tools.LDAPReader r,
- final Results results, final int i, final String string) throws Exception
- {
- results.searchResultEntries.clear();
-
- LDAPMessage message;
- while (results.searchResultEntries.size() < i
- && (message = r.readMessage()) != null)
- {
- debugInfo(tn, string + message.getProtocolOpType() + " " + message);
-
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- results.searchResultEntries.add(message.getSearchResultEntryProtocolOp());
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- results.searchReferences++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- results.searchesDone++;
- break;
- }
- }
- }
-
- private void assertSuccessful(LDAPMessage message)
- {
- SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp();
- assertEquals(doneOp.getResultCode(), ResultCode.SUCCESS.getIntValue(),
- doneOp.getErrorMessage().toString());
- }
-
- private void waitForClose(Socket... sockets) throws InterruptedException
- {
- for (Socket s : sockets)
- {
- if (s != null)
- {
- close(s);
- while (!s.isClosed())
- {
- Thread.sleep(100);
- }
- }
- }
- }
-
- /** utility - bind as required */
- private void bindAsManager(LDAPWriter w, org.opends.server.tools.LDAPReader r)
- throws Exception
- {
- bindAsWhoEver(w, r,
- "cn=Directory Manager", "password", LDAPResultCode.SUCCESS);
- }
-
- /** utility - bind as required */
- private void bindAsWhoEver(LDAPWriter w, org.opends.server.tools.LDAPReader r,
- String bindDN, String password, int expected) throws Exception
- {
-// Since we are going to be watching the post-response count, we need to
-// wait for the server to become idle before kicking off the next request to
-// ensure that any remaining post-response processing from the previous
-// operation has completed.
- assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000));
-
-
- InvocationCounterPlugin.resetAllCounters();
- BindRequestProtocolOp bindRequest =
- new BindRequestProtocolOp(
- ByteString.valueOf(bindDN),
- 3, ByteString.valueOf(password));
- w.writeMessage(new LDAPMessage(1, bindRequest));
-
- final LDAPMessage message = r.readMessage();
- BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp();
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- assertEquals(bindResponse.getResultCode(), expected);
- }
-
- /**
- * Clean up the environment.
- *
- * @throws Exception If the environment could not be set up.
- */
- @Override
- @AfterClass
- public void classCleanUp() throws Exception
- {
- callParanoiaCheck = false;
- super.classCleanUp();
-
- shutdown();
-
- paranoiaCheck();
- }
-
- @AfterMethod
- public void clearReplicationDb() throws Exception
- {
- clearChangelogDB(replicationServer);
- }
-
- /**
- * After the tests stop the replicationServer.
- */
- private void shutdown() throws Exception
- {
- remove(replicationServer);
- replicationServer = null;
- }
-
- /**
- * Utility - log debug message - highlight it is from the test and not
- * from the server code. Makes easier to observe the test steps.
- */
- private void debugInfo(String testName, String message)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("** TEST " + testName + " ** " + message);
- }
- }
-
- /**
- * Utility - create a second backend in order to test ECL with 2 suffixes.
- */
- private static Backend<?> initializeTestBackend(boolean createBaseEntry,
- String backendId) throws Exception
- {
- DN baseDN = DN.decode("o=" + backendId);
-
- // Retrieve backend. Warning: it is important to perform this each time,
- // because a test may have disabled then enabled the backend (i.e a test
- // performing an import task). As it is a memory backend, when the backend
- // is re-enabled, a new backend object is in fact created and old reference
- // to memory backend must be invalidated. So to prevent this problem, we
- // retrieve the memory backend reference each time before cleaning it.
- MemoryBackend memoryBackend =
- (MemoryBackend)DirectoryServer.getBackend(backendId);
-
- if (memoryBackend == null)
- {
- memoryBackend = new MemoryBackend();
- memoryBackend.setBackendID(backendId);
- memoryBackend.setBaseDNs(new DN[] {baseDN});
- memoryBackend.initializeBackend();
- DirectoryServer.registerBackend(memoryBackend);
- }
-
- memoryBackend.clearMemoryBackend();
-
- if (createBaseEntry)
- {
- memoryBackend.addEntry(createEntry(baseDN), null);
- }
- return memoryBackend;
- }
-
- private static void removeTestBackend(Backend<?>... backends)
- {
- for (Backend<?> backend : backends)
- {
- if (backend != null)
- {
- MemoryBackend memoryBackend = (MemoryBackend) backend;
- memoryBackend.clearMemoryBackend();
- memoryBackend.finalizeBackend();
- DirectoryServer.deregisterBackend(memoryBackend);
- }
- }
- }
-
- /**
- * FIXME this test actually tests nothing: there are no asserts.
- */
- @Test(enabled = true, dependsOnMethods = { "PrimaryTest" })
- public void testChangeTimeHeartbeat() throws Exception
- {
- String tn = "testChangeTimeHeartbeat";
- debugInfo(tn, "Starting test");
- ReplicationBroker s1test = null;
- ReplicationBroker s2test = null;
- ReplicationBroker s1test2 = null;
- ReplicationBroker s2test2 = null;
- Backend<?> backend2 = null;
-
- try
- {
- backend2 = initializeTestBackend(true, TEST_BACKEND_ID2);
-
- // --
- s1test = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
- s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
- 100, replicationServerPort, brokerSessionTimeout, EMPTY_DN_GENID);
- Thread.sleep(500);
-
- // Produce updates
- long time = TimeThread.getTime();
- int ts = 1;
- CSN csn1 = new CSN(time, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn1, tn, 1);
-
- CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
- publishDeleteMsgInOTest(s2test2, csn2, tn, 2);
-
- CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
- publishDeleteMsgInOTest(s2test2, csn3, tn, 3);
-
- CSN csn4 = new CSN(time, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn4, tn, 4);
- Thread.sleep(500);
-
- // --
- s1test2 = openReplicationSession(TEST_ROOT_DN2, 1203,
- 100, replicationServerPort, brokerSessionTimeout, EMPTY_DN_GENID);
- s2test = openReplicationSession(TEST_ROOT_DN, 1204,
- 100, replicationServerPort, brokerSessionTimeout);
- Thread.sleep(500);
-
- // Test startState ("first cookie") of the ECL
- time = TimeThread.getTime();
- CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
- publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
-
- CSN csn7 = new CSN(time, ts++, s2test.getServerId());
- publishDeleteMsgInOTest(s2test, csn7, tn, 7);
-
- CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
- publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
-
- CSN csn9 = new CSN(time, ts++, s2test.getServerId());
- publishDeleteMsgInOTest(s2test, csn9, tn, 9);
- Thread.sleep(500);
-
- ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
- debugInfo(tn, rsd1.getBaseDN() + " LatestServerState=" + rsd1.getLatestServerState());
- // FIXME:ECL Enable this test by adding an assert on the right value
-
- ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
- debugInfo(tn, rsd2.getBaseDN() + " LatestServerState=" + rsd2.getLatestServerState());
- // FIXME:ECL Enable this test by adding an assert on the right value
- }
- finally
- {
- stop(s1test2, s2test2, s1test, s2test);
- removeTestBackend(backend2);
- }
- debugInfo(tn, "Ending test successfully");
- }
-
- /**
- * From embedded ECL (no remote session)
- * With empty RS, simple search should return only root entry.
- */
- private void ECLCompatEmpty() throws Exception
- {
- String tn = "ECLCompatEmpty";
- debugInfo(tn, "Starting test\n\n");
-
- final InternalSearchOperation op = connection.processSearch(
- "cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)");
- assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage().toString());
- assertEquals(op.getEntriesSent(), 1, "The root entry should have been returned");
- debugInfo(tn, "Ending test successfully");
- }
-
- private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception
- {
- String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber;
- debugInfo(tn, "Starting test\n\n");
- LDAPReplicationDomain domain = null;
- try
- {
- // Creates broker on o=test
- ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
-
- DomainFakeCfg domainConf = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort);
- domain = startNewDomain(domainConf, null, null);
-
- String user1entryUUID = "11111111-1112-1113-1114-111111111115";
- String baseUUID = "22222222-2222-2222-2222-222222222222";
-
- CSN[] csns = generateCSNs(4, SERVER_ID_1);
-
- // Publish DEL
- DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
- server01.publish(delMsg);
- debugInfo(tn, " publishes " + delMsg.getCSN());
-
- // Publish ADD
- Entry entry = TestCaseUtils.entryFromLdifString(
- "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n"
- + "objectClass: top\n"
- + "objectClass: domain\n"
- + "entryUUID: " + user1entryUUID + "\n");
- AddMsg addMsg = new AddMsg(
- csns[1],
- entry.getDN(),
- user1entryUUID,
- baseUUID,
- entry.getObjectClassAttribute(),
- entry.getAttributes(),
- Collections.<Attribute> emptyList());
- server01.publish(addMsg);
- debugInfo(tn, " publishes " + addMsg.getCSN());
-
- // Publish MOD
- DN baseDN = DN.decode("uid="+tn+"-3," + TEST_ROOT_DN_STRING);
- List<Modification> mods = createMods("description", "new value");
- ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID);
- server01.publish(modMsg);
- debugInfo(tn, " publishes " + modMsg.getCSN());
-
- // Publish modDN
- ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
- DN.decode("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN
- RDN.decode("uid="+tn+"new4"), // new rdn
- true, // deleteoldrdn
- TEST_ROOT_DN2); // new superior
- op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csns[3], user1entryUUID, "newparentId"));
- LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
- ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
- server01.publish(modDNMsg);
- debugInfo(tn, " publishes " + modDNMsg.getCSN());
-
- InternalSearchOperation searchOp =
- searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS);
-
- // test 4 entries returned
- final LDIFWriter ldifWriter = getLDIFWriter();
- assertFourEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
- ldifWriter, user1entryUUID, csns);
-
- stop(server01);
-
- // Test with filter on change number
- String filter =
- "(&(targetdn=*" + tn + "*,o=test)"
- + "(&(changenumber>=" + firstChangeNumber + ")"
- + "(changenumber<=" + (firstChangeNumber + 3) + ")))";
- searchOp = searchOnChangelog(filter, 4, tn, SUCCESS);
-
- assertFourEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
- ldifWriter, user1entryUUID, csns);
- assertThat(searchOp.getSearchEntries()).hasSize(csns.length);
- return csns[1];
- }
- finally
- {
- remove(domain);
- debugInfo(tn, "Ending test with success");
- }
- }
-
- private void assertFourEntries(List<SearchResultEntry> entries,
- long firstChangeNumber, String tn, LDIFWriter ldifWriter,
- String user1entryUUID, CSN... csns) throws Exception
- {
- debugAndWriteEntries(ldifWriter, entries, tn);
- assertThat(entries).hasSize(4);
-
- int i = -1;
- // check the DEL entry has the right content
- final SearchResultEntry delEntry = entries.get(++i);
- checkValue(delEntry, "changetype", "delete");
- commonAssert(delEntry, user1entryUUID, firstChangeNumber, i, tn, csns[i]);
- checkValue(delEntry, "targetuniqueid", user1entryUUID);
-
- // check the ADD entry has the right content
- final SearchResultEntry addEntry = entries.get(++i);
- checkValue(addEntry, "changetype", "add");
- commonAssert(addEntry, user1entryUUID, firstChangeNumber, i, tn, csns[i]);
- checkLDIF(addEntry, "changes",
- "objectClass: domain",
- "objectClass: top",
- "entryUUID: " + user1entryUUID);
-
- // check the MOD entry has the right content
- final SearchResultEntry modEntry = entries.get(++i);
- checkValue(modEntry, "changetype", "modify");
- commonAssert(modEntry, user1entryUUID, firstChangeNumber, i, tn, csns[i]);
- checkLDIF(modEntry, "changes",
- "replace: description",
- "description: new value",
- "-");
-
- // check the MODDN entry has the right content
- final SearchResultEntry moddnEntry = entries.get(++i);
- checkValue(moddnEntry, "changetype", "modrdn");
- commonAssert(moddnEntry, user1entryUUID, firstChangeNumber, i, tn, csns[i]);
- checkValue(moddnEntry, "newrdn", "uid=" + tn + "new4");
- checkValue(moddnEntry, "newsuperior", TEST_ROOT_DN_STRING2);
- checkValue(moddnEntry, "deleteoldrdn", "true");
- }
-
- private void commonAssert(SearchResultEntry resultEntry, String entryUUID,
- long firstChangeNumber, int i, String tn, CSN csn)
- {
- final long changeNumber = firstChangeNumber + i;
- final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING;
-
- assertDNEquals(resultEntry, changeNumber);
- checkValue(resultEntry, "changenumber", String.valueOf(changeNumber));
- checkValue(resultEntry, "targetentryuuid", entryUUID);
- checkValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
- checkValue(resultEntry, "replicationcsn", csn.toString());
- checkValue(resultEntry, "changelogcookie", "o=test:" + csn + ";");
- checkValue(resultEntry, "targetdn", targetDN);
- }
-
- private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber)
- {
- final String actualDN = resultEntry.getDN().toNormalizedString();
- final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
- assertThat(actualDN)
- .as("Unexpected DN for entry " + resultEntry)
- .isEqualToIgnoringCase(expectedDN);
- }
-
- private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
- {
- String tn = "ECLCompatReadFrom/" + firstChangeNumber;
- debugInfo(tn, "Starting test\n\n");
-
- // Creates broker on o=test
- ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
-
- String user1entryUUID = "11111111-1112-1113-1114-111111111115";
-
- String filter = "(changenumber=" + firstChangeNumber + ")";
- InternalSearchOperation searchOp = searchOnChangelog(filter, 1, tn, SUCCESS);
-
- // check the entry has the right content
- SearchResultEntry resultEntry = searchOp.getSearchEntries().get(0);
- assertTrue("changenumber=6,cn=changelog".equalsIgnoreCase(resultEntry.getDN().toNormalizedString()));
- checkValue(resultEntry, "replicationcsn", csn.toString());
- checkValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
- checkValue(resultEntry, "changetype", "add");
- checkValue(resultEntry, "changelogcookie", "o=test:" + csn + ";");
- checkValue(resultEntry, "targetentryuuid", user1entryUUID);
- checkValue(resultEntry, "changenumber", "6");
-
- stop(server01);
-
- debugInfo(tn, "Ending test with success");
- }
-
- /**
- * Process similar search as but only check that there's no control returned
- * as part of the entry.
- */
- private void ECLCompatNoControl(long firstChangeNumber) throws Exception
- {
- String tn = "ECLCompatNoControl/" + firstChangeNumber;
- debugInfo(tn, "Starting test\n\n");
-
- // Creates broker on o=test
- ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, 100,
- replicationServerPort, brokerSessionTimeout);
-
- String filter = "(changenumber=" + firstChangeNumber + ")";
- InternalSearchOperation searchOp = searchOnChangelog(filter, 1, tn, SUCCESS);
-
- // Just verify that no entry contains the ChangeLogCookie control
- List<Control> controls = searchOp.getSearchEntries().get(0).getControls();
- assertTrue(controls.isEmpty());
-
- stop(server01);
-
- debugInfo(tn, "Ending test with success");
- }
-
- /**
- * Read the ECL in compat mode from firstChangeNumber and to lastChangeNumber.
- *
- * @param firstChangeNumber
- * the lower limit
- * @param lastChangeNumber
- * the higher limit
- */
- private void ECLCompatReadFromTo(int firstChangeNumber, int lastChangeNumber) throws Exception
- {
- String tn = "ECLCompatReadFromTo/" + firstChangeNumber + "/" + lastChangeNumber;
- debugInfo(tn, "Starting test\n\n");
-
- String filter =
- "(&(changenumber>=" + firstChangeNumber + ")" + "(changenumber<=" + lastChangeNumber + "))";
- final int expectedNbEntries = lastChangeNumber - firstChangeNumber + 1;
- searchOnChangelog(filter, expectedNbEntries, tn, SUCCESS);
-
- debugInfo(tn, "Ending test with success");
- }
-
- /**
- * Read the ECL in compat mode providing an unknown change number.
- */
- private void ECLCompatBadSeqnum() throws Exception
- {
- String tn = "ECLCompatBadSeqnum";
- debugInfo(tn, "Starting test\n\n");
-
- searchOnChangelog("(changenumber=1000)", 0, tn, SUCCESS);
-
- debugInfo(tn, "Ending test with success");
- }
-
- /**
- * Read the ECL in compat mode providing an unknown change number.
- */
- private void ECLFilterOnReplicationCSN(CSN csn) throws Exception
- {
- String tn = "ECLFilterOnReplicationCsn";
- debugInfo(tn, "Starting test\n\n");
-
- String filter = "(replicationcsn=" + csn + ")";
- InternalSearchOperation searchOp = searchOnChangelog(filter, 1, tn, SUCCESS);
-
- // check the DEL entry has the right content
- SearchResultEntry resultEntry = searchOp.getSearchEntries().get(0);
- checkValue(resultEntry, "replicationcsn", csn.toString());
- // TODO:ECL check values of the other attributes
-
- debugInfo(tn, "Ending test with success");
- }
-
- /**
- * Test that different values of filter are correctly decoded to find if the
- * search op on the ECL can be optimized regarding the change numbers.
- */
- private void ECLFilterTest() throws Exception
- {
- String tn = "ECLFilterTest";
- debugInfo(tn, "Starting test\n\n");
-
- {
- DN baseDN = DN.decode("cn=changelog");
-
- evaluateSearchParameters(baseDN, -1, -1, "(objectclass=*)");
- evaluateSearchParameters(baseDN, 2, -1, "(changenumber>=2)");
- evaluateSearchParameters(baseDN, 2, 5, "(&(changenumber>=2)(changenumber<=5))");
-
- try
- {
- final StartECLSessionMsg startCLmsg = new StartECLSessionMsg();
- ECLSearchOperation.evaluateSearchParameters(startCLmsg,
- baseDN, SearchFilter.createFilterFromString("(&(changenumber>=2)(changenumber<+5))"));
- assertEquals(startCLmsg.getFirstChangeNumber(), 1);
- }
- catch (DirectoryException expected)
- {
- }
-
- evaluateSearchParameters(baseDN, 2, 5,
- "(&(dc=x)(&(changenumber>=2)(changenumber<=5)))");
- evaluateSearchParameters(baseDN, 3, 4,
- "(&(&(changenumber>=3)(changenumber<=4))(&(|(dc=y)(dc=x))(&(changenumber>=2)(changenumber<=5))))");
- evaluateSearchParameters(baseDN, -1, -1,
- "(|(objectclass=*)(&(changenumber>=2)(changenumber<=5)))");
- evaluateSearchParameters(baseDN, 8, 8, "(changenumber=8)");
-
- //
- CSN csn = new CSNGenerator(1, 0).newCSN();
- final StartECLSessionMsg startCLmsg =
- evaluateSearchParameters(baseDN, -1, -1, "(replicationcsn=" + csn + ")");
- assertEquals(startCLmsg.getCSN(), csn);
-
- // Use change number as base object.
- baseDN = DN.decode("changeNumber=8,cn=changelog");
-
- //
- evaluateSearchParameters(baseDN, 8, 8, "(objectclass=*)");
-
- // The base DN should take preference.
- evaluateSearchParameters(baseDN, 8, 8, "(changenumber>=2)");
- }
- debugInfo(tn, "Ending test with success");
- }
-
- private StartECLSessionMsg evaluateSearchParameters(DN baseDN,
- long firstChangeNumber, long lastChangeNumber, String filterString) throws Exception
- {
- final StartECLSessionMsg startCLmsg = new StartECLSessionMsg();
- ECLSearchOperation.evaluateSearchParameters(startCLmsg, baseDN,
- SearchFilter.createFilterFromString(filterString));
- assertEquals(startCLmsg.getFirstChangeNumber(), firstChangeNumber);
- assertEquals(startCLmsg.getLastChangeNumber(), lastChangeNumber);
- return startCLmsg;
- }
-
- /**
- * Put a short purge delay to the CNIndexDB, clear the changelogDB, expect the
- * CNIndexDB to be purged accordingly.
- */
- private void ECLPurgeCNIndexDBAfterChangelogClear() throws Exception
- {
- String tn = "ECLPurgeCNIndexDBAfterChangelogClear";
- debugInfo(tn, "Starting test\n\n");
-
- final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
- assertEquals(cnIndexDB.count(), 8);
- replicationServer.getChangelogDB().setPurgeDelay(1000);
-
- clearChangelogDB(replicationServer);
-
- // Expect changes purged from the changelog db to be sometimes
- // also purged from the CNIndexDB.
- while (!cnIndexDB.isEmpty())
- {
- debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
- Thread.sleep(10);
- }
-
- debugInfo(tn, "Ending test with success");
- }
-
- /**
- * Test that ECL Operational, virtual attributes are not visible outside rootDSE.
- */
- @Test(enabled = true, dependsOnMethods = { "PrimaryTest" })
- public void TestECLOperationalAttributesNotVisibleOutsideRootDSE() throws Exception
- {
- String tn = "TestECLOperationalAttributesNotVisibleOutsideRootDSE";
- // The goal is to verify that the Changelog attributes are not
- // available in other entries. We u
- debugInfo(tn, "Starting test \n\n");
-
- Set<String> attributes = newHashSet("firstchangenumber", "lastchangenumber",
- "changelog", "lastExternalChangelogCookie");
-
- debugInfo(tn, " Search: " + TEST_ROOT_DN_STRING);
- InternalSearchOperation searchOp = connection.processSearch(
- TEST_ROOT_DN_STRING,
- SearchScope.BASE_OBJECT,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, // Size limit
- 0, // Time limit
- false, // Types only
- "(objectclass=*)",
- attributes);
- waitOpResult(searchOp, ResultCode.SUCCESS);
-
- final List<SearchResultEntry> entries = searchOp.getSearchEntries();
- assertThat(entries).hasSize(1);
- debugAndWriteEntries(null, entries, tn);
- for (SearchResultEntry resultEntry : entries)
- {
- assertNull(getAttributeValue(resultEntry, "firstchangenumber"));
- assertNull(getAttributeValue(resultEntry, "lastchangenumber"));
- assertNull(getAttributeValue(resultEntry, "changelog"));
- assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
- }
-
- debugInfo(tn, "Ending test with success");
- }
-
- private void ECLCompatTestLimits(int expectedFirst, int expectedLast,
- boolean eclEnabled) throws Exception
- {
- String tn = "ECLCompatTestLimits";
- debugInfo(tn, "Starting test\n\n");
- debugInfo(tn, " Search: rootDSE");
-
- final List<SearchResultEntry> entries =
- assertECLLimits(eclEnabled, expectedFirst, expectedLast);
-
- debugAndWriteEntries(getLDIFWriter(), entries, tn);
- debugInfo(tn, "Ending test with success");
- }
-
- private List<SearchResultEntry> assertECLLimits(
- boolean eclEnabled, int expectedFirst, int expectedLast) throws Exception
- {
- AssertionError e = null;
-
- int count = 0;
- while (count < 30)
- {
- count++;
-
- try
- {
- final Set<String> attributes = new LinkedHashSet<String>();
- if (expectedFirst > 0)
- attributes.add("firstchangenumber");
- attributes.add("lastchangenumber");
- attributes.add("changelog");
- attributes.add("lastExternalChangelogCookie");
-
- final InternalSearchOperation searchOp = searchOnRootDSE(attributes);
- final List<SearchResultEntry> entries = searchOp.getSearchEntries();
- assertThat(entries).hasSize(1);
-
- final SearchResultEntry resultEntry = entries.get(0);
- if (eclEnabled)
- {
- if (expectedFirst > 0)
- checkValue(resultEntry, "firstchangenumber", String.valueOf(expectedFirst));
- checkValue(resultEntry, "lastchangenumber", String.valueOf(expectedLast));
- checkValue(resultEntry, "changelog", String.valueOf("cn=changelog"));
- assertNotNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
- }
- else
- {
- if (expectedFirst > 0)
- assertNull(getAttributeValue(resultEntry, "firstchangenumber"));
- assertNull(getAttributeValue(resultEntry, "lastchangenumber"));
- assertNull(getAttributeValue(resultEntry, "changelog"));
- assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
- }
- return entries;
- }
- catch (AssertionError ae)
- {
- // try again to see if changes have been persisted
- e = ae;
- }
-
- Thread.sleep(100);
- }
- assertNotNull(e);
- throw e;
- }
-
- private InternalSearchOperation searchOnRootDSE(Set<String> attributes)
- throws Exception
- {
- final InternalSearchOperation searchOp = connection.processSearch(
- "",
- SearchScope.BASE_OBJECT,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, // Size limit
- 0, // Time limit
- false, // Types only
- "(objectclass=*)",
- attributes);
- waitOpResult(searchOp, ResultCode.SUCCESS);
- return searchOp;
- }
-
- private void ECLCompatTestLimitsAndAdd(int expectedFirst, int expectedLast,
- int ts) throws Exception
- {
- String tn = "ECLCompatTestLimitsAndAdd";
- debugInfo(tn, "Starting test\n\n");
-
- ECLCompatTestLimits(expectedFirst, expectedLast, true);
-
- // Creates broker on o=test
- ReplicationBroker server01 = null;
- LDAPReplicationDomain domain = null;
- try
- {
- server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
- 100, replicationServerPort, brokerSessionTimeout);
-
- DomainFakeCfg domainConf = newFakeCfg(TEST_ROOT_DN, SERVER_ID_1, replicationServerPort);
- domain = startNewDomain(domainConf, null, null);
-
- String user1entryUUID = "11111111-1112-1113-1114-111111111115";
-
- // Publish DEL
- CSN csn1 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_1);
- DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING,
- csn1, user1entryUUID);
- server01.publish(delMsg);
- debugInfo(tn, " publishes " + delMsg.getCSN());
- Thread.sleep(500);
- }
- finally
- {
- remove(domain);
- stop(server01);
- }
-
- ECLCompatTestLimits(expectedFirst, expectedLast + 1, true);
-
- debugInfo(tn, "Ending test with success");
- }
-
- private JEChangeNumberIndexDB getCNIndexDB()
- {
- return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
- }
-
- /**
- * Test ECl entry attributes, and their configuration.
- */
- @Test(enabled = true, dependsOnMethods = { "TestWithAndWithoutControl" })
- public void TestECLWithIncludeAttributes() throws Exception
- {
- String tn = "TestECLWithIncludeAttributes";
- debugInfo(tn, "Starting test\n\n");
-
- final String backendId3 = "test3";
- final DN baseDN3 = DN.decode("o=" + backendId3);
- Backend<?> backend2 = null;
- Backend<?> backend3 = null;
- LDAPReplicationDomain domain2 = null;
- LDAPReplicationDomain domain3 = null;
- LDAPReplicationDomain domain21 = null;
- try
- {
- // Configure replication on this backend
- // Add the root entry in the backend
- backend2 = initializeTestBackend(false, TEST_BACKEND_ID2);
-
- SortedSet<String> replServers = newSortedSet("localhost:" + replicationServerPort);
-
- // on o=test2,sid=1702 include attrs set to : 'sn'
- SortedSet<String> eclInclude = newSortedSet("sn", "roomnumber");
-
- DomainFakeCfg domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1702, replServers);
- domain2 = startNewDomain(domainConf, eclInclude, eclInclude);
-
- backend3 = initializeTestBackend(false, backendId3);
-
- // on o=test3,sid=1703 include attrs set to : 'objectclass'
- eclInclude = newSortedSet("objectclass");
-
- SortedSet<String> eclIncludeForDeletes = newSortedSet("*");
-
- domainConf = new DomainFakeCfg(baseDN3, 1703, replServers);
- domain3 = startNewDomain(domainConf, eclInclude, eclIncludeForDeletes);
-
- // on o=test2,sid=1704 include attrs set to : 'cn'
- eclInclude = newSortedSet("cn");
-
- domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers);
- domain21 = startNewDomain(domainConf, eclInclude, eclInclude);
-
- Thread.sleep(1000);
-
- addEntry(createEntry(TEST_ROOT_DN2));
- addEntry(createEntry(baseDN3));
-
- Entry uentry1 = TestCaseUtils.entryFromLdifString(
- "dn: cn=Fiona Jensen," + TEST_ROOT_DN_STRING2 + "\n"
- + "objectclass: top\n"
- + "objectclass: person\n"
- + "objectclass: organizationalPerson\n"
- + "objectclass: inetOrgPerson\n"
- + "cn: Fiona Jensen\n"
- + "sn: Jensen\n"
- + "uid: fiona\n"
- + "telephonenumber: 12121212");
- addEntry(uentry1); // add fiona in o=test2
-
- Entry uentry2 = TestCaseUtils.entryFromLdifString(
- "dn: cn=Robert Hue," + baseDN3 + "\n"
- + "objectclass: top\n"
- + "objectclass: person\n"
- + "objectclass: organizationalPerson\n"
- + "objectclass: inetOrgPerson\n"
- + "cn: Robert Hue\n"
- + "sn: Robby\n"
- + "uid: robert\n"
- + "telephonenumber: 131313");
- addEntry(uentry2); // add robert in o=test3
-
- // mod 'sn' of fiona (o=test2) with 'sn' configured as ecl-incl-att
- final ModifyOperation modOp1 = connection.processModify(
- uentry1.getDN(), createMods("sn", "newsn"));
- waitOpResult(modOp1, ResultCode.SUCCESS);
-
- // mod 'telephonenumber' of robert (o=test3)
- final ModifyOperation modOp2 = connection.processModify(
- uentry2.getDN(), createMods("telephonenumber", "555555"));
- waitOpResult(modOp2, ResultCode.SUCCESS);
-
- // moddn robert (o=test3) to robert2 (o=test3)
- ModifyDNOperation modDNOp = connection.processModifyDN(
- DN.decode("cn=Robert Hue," + baseDN3),
- RDN.decode("cn=Robert Hue2"), true,
- baseDN3);
- waitOpResult(modDNOp, ResultCode.SUCCESS);
-
- // del robert (o=test3)
- final DeleteOperation delOp = connection.processDelete(DN.decode("cn=Robert Hue2," + baseDN3));
- waitOpResult(delOp, ResultCode.SUCCESS);
-
- // Search on ECL from start on all suffixes
- String cookie = "";
- InternalSearchOperation searchOp =
- searchOnCookieChangelog("(targetDN=*)", cookie, 8, tn, SUCCESS);
-
- for (SearchResultEntry resultEntry : searchOp.getSearchEntries())
- {
- String targetdn = getAttributeValue(resultEntry, "targetdn");
-
- if (targetdn.endsWith("cn=robert hue,o=test3")
- || targetdn.endsWith("cn=robert hue2,o=test3"))
- {
- Entry targetEntry = parseIncludedAttributes(resultEntry, targetdn);
-
- Set<String> eoc = newHashSet("person", "inetOrgPerson", "organizationalPerson", "top");
- checkValues(targetEntry, "objectclass", eoc);
-
- String changeType = getAttributeValue(resultEntry, "changetype");
- if ("delete".equals(changeType))
- {
- // We are using "*" for deletes so should get back 4 attributes.
- assertThat(targetEntry.getAttributes()).hasSize(4);
- checkValue(targetEntry, "uid", "robert");
- checkValue(targetEntry, "cn", "Robert Hue2");
- checkValue(targetEntry, "telephonenumber", "555555");
- checkValue(targetEntry, "sn", "Robby");
- }
- else
- {
- assertThat(targetEntry.getAttributes()).isEmpty();
- }
- }
- else if (targetdn.endsWith("cn=fiona jensen,o=test2"))
- {
- Entry targetEntry = parseIncludedAttributes(resultEntry, targetdn);
-
- assertThat(targetEntry.getAttributes()).hasSize(2);
- checkValue(targetEntry,"sn","jensen");
- checkValue(targetEntry,"cn","Fiona Jensen");
- }
- checkValue(resultEntry,"changeinitiatorsname", "cn=Internal Client,cn=Root DNs,cn=config");
- }
- }
- finally
- {
- final DN fionaDN = DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
- waitOpResult(connection.processDelete(fionaDN), ResultCode.SUCCESS);
- waitOpResult(connection.processDelete(TEST_ROOT_DN2), ResultCode.SUCCESS);
- waitOpResult(connection.processDelete(baseDN3), ResultCode.SUCCESS);
-
- remove(domain21, domain2, domain3);
- removeTestBackend(backend2, backend3);
- }
- debugInfo(tn, "Ending test with success");
- }
-
- private void remove(LDAPReplicationDomain... domains)
- {
- for (LDAPReplicationDomain domain : domains)
- {
- if (domain != null)
- {
- domain.shutdown();
- MultimasterReplication.deleteDomain(domain.getBaseDN());
- }
- }
- }
-
- private LDAPReplicationDomain startNewDomain(DomainFakeCfg domainConf,
- SortedSet<String> eclInclude, SortedSet<String> eclIncludeForDeletes)
- throws Exception
- {
- domainConf.setExternalChangelogDomain(
- new ExternalChangelogDomainFakeCfg(true, eclInclude, eclIncludeForDeletes));
- // Set a Changetime heartbeat interval low enough (less than default value
- // that is 1000 ms) for the test to be sure to consider all changes as eligible.
- domainConf.setChangetimeHeartbeatInterval(10);
- LDAPReplicationDomain newDomain = MultimasterReplication.createNewDomain(domainConf);
- newDomain.start();
- return newDomain;
- }
-
- private List<Modification> createMods(String attributeName, String valueString)
- {
- Attribute attr = Attributes.create(attributeName, valueString);
- return newList(new Modification(ModificationType.REPLACE, attr));
- }
-
- private Entry parseIncludedAttributes(SearchResultEntry resultEntry,
- String targetdn) throws Exception
- {
- // Parse includedAttributes as an entry.
- String includedAttributes = getAttributeValue(resultEntry, "includedattributes");
- String[] ldifAttributeLines = includedAttributes.split("\\n");
- String[] ldif = new String[ldifAttributeLines.length + 1];
- System.arraycopy(ldifAttributeLines, 0, ldif, 1, ldifAttributeLines.length);
- ldif[0] = "dn: " + targetdn;
- return TestCaseUtils.makeEntry(ldif);
- }
-
- private void waitOpResult(Operation operation, ResultCode expectedResult) throws Exception
- {
- int i = 0;
- while (operation.getResultCode() == ResultCode.UNDEFINED
- || operation.getResultCode() != expectedResult)
- {
- Thread.sleep(50);
- i++;
- if (i > 10)
- {
- assertEquals(operation.getResultCode(), expectedResult,
- operation.getErrorMessage().toString());
- }
- }
- }
-}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index a9d707c..8813012 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -153,9 +153,7 @@
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
-
- assertEquals(replicaDB.getOldestCSN(), csns[0]);
- assertEquals(replicaDB.getNewestCSN(), csns[2]);
+ assertLimits(replicaDB, csns[0], csns[2]);
DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
replicaDB.add(update4);
@@ -338,15 +336,11 @@
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
-
- assertEquals(csns[0], replicaDB.getOldestCSN());
- assertEquals(csns[2], replicaDB.getNewestCSN());
+ assertLimits(replicaDB, csns[0], csns[2]);
// Clear DB and check it is cleared.
replicaDB.clear();
-
- assertEquals(null, replicaDB.getOldestCSN());
- assertEquals(null, replicaDB.getNewestCSN());
+ assertLimits(replicaDB, null, null);
}
finally
{
@@ -445,9 +439,7 @@
mySeqnum+=2;
}
waitChangesArePersisted(replicaDB, max, counterWindow);
-
- assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
- assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
+ assertLimits(replicaDB, csns[1], csns[max]);
// Now we want to test that after closing and reopening the db, the
// counting algo is well reinitialized and when new messages are added
@@ -456,9 +448,7 @@
replicaDB.shutdown();
replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
-
- assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
- assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
+ assertLimits(replicaDB, csns[1], csns[max]);
// Populate the db with 'max' msg
for (int i=max+1; i<=2 * max; i++)
@@ -468,9 +458,7 @@
mySeqnum+=2;
}
waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
-
- assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
- assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
+ assertLimits(replicaDB, csns[1], csns[2 * max]);
replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
@@ -499,6 +487,14 @@
}
}
+ private void assertLimits(FileReplicaDB replicaDB, CSN oldestCSN, CSN newestCSN)
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(replicaDB.getOldestCSN()).as("Wrong oldest CSN").isEqualTo(oldestCSN);
+ softly.assertThat(replicaDB.getNewestCSN()).as("Wrong newest CSN").isEqualTo(newestCSN);
+ softly.assertAll();
+ }
+
private void shutdown(FileReplicaDB replicaDB)
{
if (replicaDB != null)
--
Gitblit v1.10.0