From 7b84e53457bce1f0733afa87797afc9928568c52 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 09 Oct 2006 14:15:55 +0000
Subject: [PATCH] - Change the synchronization code so that the changelog server can now be used directly through the ChangelogBroker class as a client API.
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java | 81 +++
opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java | 2
opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java | 15
opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java | 61 +-
opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java | 14
opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java | 2
opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java | 4
opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java | 26 +
opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java | 8
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java | 29 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java | 853 ++++++++++++++++++++++++++++++++------------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java | 4
opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java | 9
13 files changed, 807 insertions(+), 301 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java
index 177b974..1448e47 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java
@@ -27,6 +27,7 @@
package org.opends.server.changelog;
import java.io.IOException;
+import java.net.SocketException;
import java.util.zip.DataFormatException;
import org.opends.server.synchronization.SynchronizationMessage;
@@ -90,4 +91,18 @@
* @return The IP address of the remote server.
*/
public abstract String getRemoteAddress();
+
+
+ /**
+ * Set a timeout value.
+ * With this option set to a non-zero value, calls to the receive() method
+ * block for only this amount of time after which a
+ * java.net.SocketTimeoutException is raised.
+ * The Broker is valid and useable even after such an Exception is raised.
+ *
+ * @param timeout the specified timeout, in milliseconds.
+ * @throws SocketException if there is an error in the underlying protocol,
+ * such as a TCP error.
+ */
+ public abstract void setSoTimeout(int timeout) throws SocketException;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java
index 23d87c2..c2b0a5c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java
@@ -111,4 +111,12 @@
{
return socket.getInetAddress().getHostAddress();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setSoTimeout(int timeout) throws SocketException
+ {
+ socket.setSoTimeout(timeout);
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java
index b19f27d..4bb87d6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java
@@ -30,6 +30,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.net.SocketException;
import java.util.zip.DataFormatException;
import org.opends.server.synchronization.SynchronizationMessage;
@@ -128,4 +129,12 @@
{
return socket.getInetAddress().getHostAddress();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setSoTimeout(int timeout) throws SocketException
+ {
+ socket.setSoTimeout(timeout);
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
index d177029..a69c8b9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
@@ -55,8 +55,8 @@
public class AddMsg extends UpdateMessage
{
private static final long serialVersionUID = -4905520652801395185L;
- private byte[] encodedAttributes ;
- private String parentUniqueId;
+ private final byte[] encodedAttributes;
+ private final String parentUniqueId;
/**
* Creates a new AddMessage.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index 3a4d466..b265bb3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -39,6 +39,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import org.opends.server.changelog.ProtocolSession;
import org.opends.server.changelog.SocketSession;
@@ -47,6 +49,7 @@
import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -63,43 +66,64 @@
{
private boolean shutdown = false;
private List<String> servers;
- private Short identifier;
private boolean connected = false;
- private SynchronizationDomain domain;
private final Object lock = new Object();
private String changelogServer = "Not connected";
private TreeSet<FakeOperation> replayOperations;
private ProtocolSession session = null;
+ private final ServerState state;
+ private final DN baseDn;
+ private final short serverID;
+ private int maxSendDelay;
+ private int maxReceiveDelay;
+ private int maxSendQueue;
+ private int maxReceiveQueue;
/**
* Creates a new Changelog Broker for a particular SynchronizationDomain.
*
- * @param domain The SynchronizationDomain for which the borker is created.
+ * @param state The ServerState that should be used by this broker
+ * when negociating the session with the changelog servers.
+ * @param baseDn The base DN that should be used by this broker
+ * when negociating the session with the changelog servers.
+ * @param serverID The server ID that should be used by this broker
+ * when negociating the session with the changelog servers.
+ * @param maxReceiveQueue The maximum size of the receive queue to use on
+ * the changelog server.
+ * @param maxReceiveDelay The maximum replication delay to use on the
+ * changelog server.
+ * @param maxSendQueue The maximum size of the send queue to use on
+ * the changelog server.
+ * @param maxSendDelay The maximum send delay to use on the changelog server.
*/
- public ChangelogBroker(SynchronizationDomain domain)
+ public ChangelogBroker(ServerState state, DN baseDn, short serverID,
+ int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+ int maxSendDelay )
{
- this.domain = domain;
+ this.baseDn = baseDn;
+ this.serverID = serverID;
+ this.maxReceiveDelay = maxReceiveDelay;
+ this.maxSendDelay = maxSendDelay;
+ this.maxReceiveQueue = maxReceiveQueue;
+ this.maxSendQueue = maxSendQueue;
+ this.state = state;
replayOperations =
new TreeSet<FakeOperation>(new FakeOperationComparator());
}
-
/**
* Start the ChangelogBroker.
*
- * @param identifier identifier of the changelog
* @param servers list of servers used
* @throws Exception : in case of errors
*/
- public void start(Short identifier,
- List<String> servers)
+ public void start(List<String> servers)
throws Exception
{
/*
* Open Socket to the Changelog
* Send the Start message
*/
- this.identifier = identifier;
this.servers = servers;
if (servers.size() < 1)
{
@@ -147,7 +171,9 @@
/*
* Send our ServerStartMessage.
*/
- ServerStartMessage msg = domain.newServerStartMessage();
+ ServerStartMessage msg = new ServerStartMessage( serverID, baseDn,
+ maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+ state);
session.publish(msg);
@@ -167,10 +193,10 @@
* those changes and send them again to any changelog server.
*/
ChangeNumber changelogMaxChangeNumber =
- startMsg.getServerState().getMaxChangeNumber(identifier);
+ startMsg.getServerState().getMaxChangeNumber(serverID);
if (changelogMaxChangeNumber == null)
- changelogMaxChangeNumber = new ChangeNumber(0, 0, identifier);
- ChangeNumber ourMaxChangeNumber = domain.getMaxChangeNumber();
+ changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+ ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID);
if ((ourMaxChangeNumber == null) ||
(ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
{
@@ -206,7 +232,7 @@
LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
attrs.add(Historical.HISTORICALATTRIBUTENAME);
InternalSearchOperation op = conn.processSearch(
- new ASN1OctetString(domain.getBaseDN().toString()),
+ new ASN1OctetString(baseDn.toString()),
SearchScope.WHOLE_SUBTREE,
DereferencePolicy.NEVER_DEREF_ALIASES,
0, 0, false, filter,
@@ -381,8 +407,10 @@
/**
* Receive a message.
* @return the received message
+ * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+ * has expired
*/
- public SynchronizationMessage receive()
+ public SynchronizationMessage receive() throws SocketTimeoutException
{
while (shutdown == false)
{
@@ -392,6 +420,11 @@
return session.receive();
} catch (Exception e)
{
+ if (e instanceof SocketTimeoutException)
+ {
+ SocketTimeoutException e1 = (SocketTimeoutException) e;
+ throw e1;
+ }
if (shutdown == false)
{
synchronized (lock)
@@ -439,6 +472,22 @@
}
/**
+ * Set a timeout value.
+ * With this option set to a non-zero value, calls to the receive() method
+ * block for only this amount of time after which a
+ * java.net.SocketTimeoutException is raised.
+ * The Broker is valid and useable even after such an Exception is raised.
+ *
+ * @param timeout the specified timeout, in milliseconds.
+ * @throws SocketException if there is an error in the underlying protocol,
+ * such as a TCP error.
+ */
+ public void setSoTimeout(int timeout) throws SocketException
+ {
+ session.setSoTimeout(timeout);
+ }
+
+ /**
* Get the name of the changelog server to which this broker is currently
* connected.
*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
index 3fec9b3..1d288e5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
@@ -45,6 +45,7 @@
/**
* Creates a new delete message.
+ *
* @param op the Operation from which the message must be created.
*/
public DeleteMsg(DeleteOperation op)
@@ -54,6 +55,19 @@
}
/**
+ * Creates a new delete message.
+ *
+ * @param dn The dn with which the message must be created.
+ * @param changeNumber The change number with which the message must be
+ * created.
+ * @param uid The unique id with which the message must be created.
+ */
+ public DeleteMsg(String dn, ChangeNumber changeNumber, String uid)
+ {
+ super(new DeleteContext(changeNumber, uid), dn);
+ }
+
+ /**
* Creates a new Add message from a byte[].
*
* @param in The byte[] from which the operation must be read.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
index eb4a162..c794f4d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
@@ -69,6 +69,32 @@
}
/**
+ * construct a new Modify DN message.
+ *
+ * @param dn The dn to use for building the message.
+ * @param changeNumber The changeNumberto use for building the message.
+ * @param uid The unique id to use for building the message.
+ * @param newParentUid The new parent unique id to use for building
+ * the message.
+ * @param deleteOldRdn boolean indicating if old rdn must be deleted to use
+ * for building the message.
+ * @param newSuperior The new Superior entry to use for building the message.
+ * @param newRDN The new Rdn to use for building the message.
+ */
+ public ModifyDNMsg(String dn, ChangeNumber changeNumber, String uid,
+ String newParentUid, boolean deleteOldRdn,
+ String newSuperior, String newRDN)
+ {
+ super(new ModifyDnContext(changeNumber, uid, newParentUid), dn);
+
+ newSuperiorId = newParentUid;
+
+ this.deleteOldRdn = deleteOldRdn;
+ this.newSuperior = newSuperior;
+ this.newRDN = newRDN;
+ }
+
+ /**
* Creates a new ModifyDN message from a byte[].
*
* @param in The byte[] from which the operation must be read.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java
index 336afb0..59560b3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java
@@ -249,7 +249,7 @@
for (Short key : list.keySet())
{
ChangeNumber change = list.get(key);
- str += change.toString();
+ str += " " + change.toString();
}
return str;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
index 7e71f2e..d593969 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -34,6 +34,7 @@
import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT;
import static org.opends.server.synchronization.Historical.*;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -290,10 +291,11 @@
*/
try
{
- broker = new ChangelogBroker(this);
+ broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
+ maxReceiveDelay, maxSendQueue, maxSendDelay);
synchronized (broker)
{
- broker.start(serverId, changelogServers);
+ broker.start(changelogServers);
if (!receiveStatus)
broker.suspendReceive();
}
@@ -379,7 +381,7 @@
{
broker.stop();
changelogServers = newChangelogServers;
- broker.start(serverId, changelogServers);
+ broker.start(changelogServers);
}
/*
@@ -676,14 +678,22 @@
UpdateMessage update = null;
while (update == null)
{
- SynchronizationMessage msg = broker.receive();
- if (msg == null)
+ SynchronizationMessage msg;
+ try
{
- // The server is in the shutdown process
- return null;
+ msg = broker.receive();
+ if (msg == null)
+ {
+ // The server is in the shutdown process
+ return null;
+ }
+
+ update = msg.processReceive(this);
+ } catch (SocketTimeoutException e)
+ {
+ // just retry
}
- update = msg.processReceive(this);
}
return update;
}
@@ -1001,28 +1011,6 @@
}
/**
- * Get the largest ChangeNumber that has been processed locally.
- *
- * @return The largest ChangeNumber that has been processed locally.
- */
- public ChangeNumber getMaxChangeNumber()
- {
- return state.getMaxChangeNumber(serverId);
- }
-
- /**
- * Create a new serverStartMessage suitable for this SynchronizationDomain.
- *
- * @return A new serverStartMessage suitable for this SynchronizationDomain.
- */
- public ServerStartMessage newServerStartMessage()
- {
- return new ServerStartMessage(serverId, baseDN, maxReceiveDelay,
- maxReceiveQueue, maxSendDelay, maxSendQueue,
- state);
- }
-
- /**
* Create and replay a synchronized Operation from an UpdateMessage.
*
* @param msg The UpdateMessage to be replayed.
@@ -1074,6 +1062,13 @@
{
done = true; // unknown type of operation ?!
}
+ if (done)
+ {
+ // the update became a dummy update and the result
+ // of the conflict resolution phase is to do nothing.
+ // however we still need to push this change to the serverState
+ updateError(changeNumber);
+ }
}
else
{
@@ -1377,8 +1372,8 @@
}
else
{
- RDN entryRdn = op.getEntryDN().getRDN();
- msg.setDn(parentDn + "," + entryRdn);
+ RDN entryRdn = DN.decode(msg.getDn()).getRDN();
+ msg.setDn(entryRdn + "," + parentDn);
return false;
}
}
@@ -1534,7 +1529,7 @@
*/
private String generateConflictDn(String entryUid, String dn)
{
- return dn + "entryuuid=" + entryUid;
+ return "entryuuid=" + entryUid + "+" + dn;
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
index ddf53f7..b5df1ee 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
@@ -102,7 +102,7 @@
/**
* Generates an Update Message which the provided information.
*
- * @param op The operation fo which the message must be created.
+ * @param op The operation for which the message must be created.
* @param isAssured flag indicating if the operation is an assured operation.
* @return The generated message.
*/
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 1c0cd69..90065bb 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.net.ServerSocket;
import java.net.InetSocketAddress;
+import java.net.SocketException;
import org.opends.server.backends.MemoryBackend;
import org.opends.server.config.ConfigException;
@@ -200,19 +201,13 @@
ServerSocket serverJmxSocket = null;
ServerSocket serverLdapsSocket = null;
- serverLdapSocket = new ServerSocket();
- serverLdapSocket.setReuseAddress(true);
- serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+ serverLdapSocket = bindFreePort();
serverLdapPort = serverLdapSocket.getLocalPort();
- serverJmxSocket = new ServerSocket();
- serverJmxSocket.setReuseAddress(true);
- serverJmxSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+ serverJmxSocket = bindFreePort();
serverJmxPort = serverJmxSocket.getLocalPort();
- serverLdapsSocket = new ServerSocket();
- serverLdapsSocket.setReuseAddress(true);
- serverLdapsSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+ serverLdapsSocket = bindFreePort();
serverLdapsPort = serverLdapsSocket.getLocalPort();
BufferedReader reader = new BufferedReader(new FileReader(
@@ -264,6 +259,22 @@
}
/**
+ * Find and binds to a free server socket port on the local host.
+ * @return the bounded Server socket.
+ *
+ * @throws IOException in case of underlying exception.
+ * @throws SocketExceptionin case of underlying exception.
+ */
+ public static ServerSocket bindFreePort() throws IOException, SocketException
+ {
+ ServerSocket serverLdapSocket;
+ serverLdapSocket = new ServerSocket();
+ serverLdapSocket.setReuseAddress(true);
+ serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+ return serverLdapSocket;
+ }
+
+ /**
* Shut down the server, if it has been started.
* @param reason The reason for the shutdown.
*/
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java
index 1d4b710..f163395 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java
@@ -297,9 +297,7 @@
// change the configuration of the connection handler to use
// a free port
- ServerSocket serverJmxSocket = new ServerSocket();
- serverJmxSocket.setReuseAddress(true);
- serverJmxSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+ ServerSocket serverJmxSocket = TestCaseUtils.bindFreePort();
int serverJmxPort = serverJmxSocket.getLocalPort();
ConfigEntry config = new ConfigEntry(TestCaseUtils.makeEntry(
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index 2b45b55..d90cab8 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -27,16 +27,13 @@
package org.opends.server.synchronization;
+import static org.testng.Assert.*;
+
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.*;
-
import org.opends.server.TestCaseUtils;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
@@ -46,26 +43,26 @@
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
-
import org.opends.server.protocols.internal.InternalClientConnection;
-
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.OperationType;
import org.opends.server.types.RDN;
-import org.opends.server.types.ResultCode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
/**
- * Test the contructors, encoders and decoders of the synchronization
- * AckMsg, ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
+ * Test the contructors, encoders and decoders of the synchronization AckMsg,
+ * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
*/
-public class UpdateOperationTest
- extends SynchronizationTestCase
+public class UpdateOperationTest extends SynchronizationTestCase
{
/**
@@ -87,18 +84,21 @@
* The synchronization plugin entry
*/
private String synchroPluginStringDN;
+
private Entry synchroPluginEntry;
/**
* The Server synchro entry
*/
private String synchroServerStringDN;
+
private Entry synchroServerEntry;
/**
* The Change log entry
*/
private String changeLogStringDN;
+
private Entry changeLogEntry;
/**
@@ -107,26 +107,38 @@
private Entry personEntry;
/**
+ * An entry with a entryUUID
+ */
+ private Entry personWithUUIDEntry;
+ private Entry personWithSecondUniqueID;
+
+ /**
* schema check flag
*/
private boolean schemaCheck;
-
// WORKAROUND FOR BUG #639 - BEGIN -
/**
*
*/
MultimasterSynchronization mms;
- // WORKAROUND FOR BUG #639 - END -
+ private String baseUUID;
+ private String user1dn;
+
+ private String user1entrysecondUUID;
+
+ private String user1entryUUID;
+
+ // WORKAROUND FOR BUG #639 - END -
/**
* Set up the environment for performing the tests in this Class.
* synchronization
*
* @throws Exception
- * If the environment could not be set up.
+ * If the environment could not be set up.
*/
@BeforeClass
@Override
@@ -146,21 +158,23 @@
String[] topEntries = new String[2];
topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
+ "objectClass: domain\n";
- topEntries[1] = "dn: ou=People,dc=example,dc=com\n"
- + "objectClass: top\n" + "objectClass: organizationalUnit\n";
- Entry entry ;
+ topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
+ + "objectClass: organizationalUnit\n"
+ + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
+ Entry entry;
for (int i = 0; i < topEntries.length; i++)
{
entry = TestCaseUtils.entryFromLdifString(topEntries[i]);
AddOperation addOp = new AddOperation(connection,
- InternalClientConnection.nextOperationID(),
- InternalClientConnection.nextMessageID(), null, entry.getDN(),
- entry.getObjectClasses(), entry.getUserAttributes(), entry
- .getOperationalAttributes());
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+ entry.getUserAttributes(), entry.getOperationalAttributes());
addOp.setInternalOperation(true);
addOp.run();
entryList.add(entry);
}
+
+ baseUUID = getEntryUUID(DN.decode("ou=People,dc=example,dc=com"));
// top level synchro provider
synchroStringDN = "cn=Synchronization Providers,cn=config";
@@ -182,8 +196,7 @@
String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+ "objectClass: top\n"
+ "objectClass: ds-cfg-synchronization-changelog-server-config\n"
- + "cn: Changelog Server\n"
- + "ds-cfg-changelog-port: 8989\n"
+ + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
+ "ds-cfg-changelog-server-id: 1\n";
changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
@@ -195,8 +208,7 @@
+ "cn: example\n"
+ "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
+ "ds-cfg-changelog-server: localhost:8989\n"
- + "ds-cfg-directory-server-id: 1\n"
- + "ds-cfg-receive-status: true\n";
+ + "ds-cfg-directory-server-id: 1\n" + "ds-cfg-receive-status: true\n";
synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
@@ -204,8 +216,8 @@
+ "objectClass: organizationalPerson\n"
+ "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ "homePhone: 951-245-7634\n"
- + "description: This is the description for Aaccf Amar.\n"
- + "st: NC\n" + "mobile: 027-085-0537\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
@@ -214,49 +226,613 @@
+ "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ "userPassword: password\n" + "initials: AA\n";
personEntry = TestCaseUtils.entryFromLdifString(personLdif);
+
+ /*
+ * The 2 entries defined in the following code are used for the naming
+ * conflict resolution test (called namingConflicts)
+ * They must have the same DN but different entryUUID.
+ */
+ user1entryUUID = "33333333-3333-3333-3333-333333333333";
+ user1entrysecondUUID = "22222222-2222-2222-2222-222222222222";
+ user1dn = "uid=user1,ou=People,dc=example,dc=com";
+ String entryWithUUIDldif = "dn: "+ user1dn + "\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+ + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n"
+ + "entryUUID: " + user1entryUUID + "\n";
+ personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
+
+ String entryWithSecondUUID = "dn: "+ user1dn + "\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+ + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n"
+ + "entryUUID: "+ user1entrysecondUUID + "\n";
+ personWithSecondUniqueID =
+ TestCaseUtils.entryFromLdifString(entryWithSecondUUID);
+
+ configureSynchronization();
}
/**
* Clean up the environment. return null;
*
* @throws Exception
- * If the environment could not be set up.
+ * If the environment could not be set up.
*/
@AfterClass
public void classCleanUp() throws Exception
{
DirectoryServer.setCheckSchema(schemaCheck);
- DeleteOperation op;
- // WORKAROUND FOR BUG #639 - BEGIN -
+ // WORKAROUND FOR BUG #639 - BEGIN -
DirectoryServer.deregisterSynchronizationProvider(mms);
mms.finalizeSynchronizationProvider();
- // WORKAROUND FOR BUG #639 - END -
+ // WORKAROUND FOR BUG #639 - END -
+ cleanEntries();
+ }
+
+ /**
+ * suppress all the entries created by the tests in this class
+ */
+ private void cleanEntries()
+ {
+ DeleteOperation op;
// Delete entries
- Entry entries[] = entryList.toArray(new Entry[0]) ;
- for (int i = entries.length -1 ; i != 0 ; i--)
+ Entry entries[] = entryList.toArray(new Entry[0]);
+ for (int i = entries.length - 1; i != 0; i--)
{
try
{
op = new DeleteOperation(connection, InternalClientConnection
- .nextOperationID(), InternalClientConnection.nextMessageID(),
- null, entries[i].getDN());
+ .nextOperationID(), InternalClientConnection.nextMessageID(), null,
+ entries[i].getDN());
op.run();
- }
- catch (Exception e)
+ } catch (Exception e)
{
}
}
}
/**
- * Tests that performed operation will generate synchronization messages
+ * Tests the naming conflict resolution code.
+ * In this test, the local server act both as an LDAP server and
+ * a changelog server that are inter-connected.
+ *
+ * The test creates an other session to the changelog server using
+ * directly the ChangelogBroker API.
+ * It then uses this session to siomulate conflicts and therefore
+ * test the naming conflict resolution code.
*/
- @Test()
+ @Test(enabled=true)
+ public void namingConflicts() throws Exception
+ {
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+ /*
+ * Open a session to the changelog server using the Changelog broker API.
+ * This must use a serverId different from the LDAP server ID
+ */
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 2);
+
+ /*
+ * Create a Change number generator to generate new changenumbers
+ * when we need to send operations messages to the changelog server.
+ */
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
+
+
+ /*
+ * Test that the conflict resolution code is able to find entries
+ * that have been renamed by an other master.
+ * To simulate this, create and entry with a given UUID and a given DN
+ * then send a modify operation using another DN but the same UUID.
+ * Finally check that the modify operation has been applied.
+ */
+ // create the entry with a given DN
+ AddMsg addMsg = new AddMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID,
+ baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ broker.publish(addMsg);
+
+ // Check that the entry has been created in the local DS.
+ Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+ assertNotNull(resultEntry,
+ "The send ADD synchronization message was not applied");
+ entryList.add(resultEntry);
+
+ // send a modify operation with the correct unique ID but another DN
+ List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+ ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(),
+ DN.decode("cn=something,ou=People,dc=example,dc=com"), mods,
+ user1entryUUID);
+ broker.publish(modMsg);
+
+ // check that the modify has been applied as if the entry had been renamed.
+ boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+ "telephonenumber", "01 02 45", 1000);
+ if (found == false)
+ fail("The modification has not been correctly replayed.");
+
+ /*
+ * Test that the conflict resolution code is able to detect
+ * that and entry have been renamed and that a new entry has
+ * been created with the same DN but another entry UUID
+ * To simulate this, create and entry with a given UUID and a given DN
+ * then send a modify operation using the same DN but another UUID.
+ * Finally check that the modify operation has not been applied to the
+ * entry with the given DN.
+ */
+
+ // create the entry with a given DN and unique ID
+ addMsg = new AddMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID, baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ broker.publish(addMsg);
+
+ // Check that the entry has been created in the local DS.
+ resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+ assertNotNull(resultEntry,
+ "The ADD synchronization message was not applied");
+ entryList.add(resultEntry);
+
+ // send a modify operation with a wrong unique ID but the same DN
+ mods = generatemods("telephonenumber", "02 01 03 05");
+ modMsg = new ModifyMsg(gen.NewChangeNumber(),
+ DN.decode("cn=something,ou=People,dc=example,dc=com"), mods,
+ "10000000-9abc-def0-1234-1234567890ab");
+ broker.publish(modMsg);
+
+ // check that the modify has not been applied
+ found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+ "telephonenumber", "02 01 03 05", 1000);
+ if (found == true)
+ fail("The modification has been replayed while it should not.");
+
+
+ /*
+ * Test that the conflict resolution code is able to find entries
+ * that have been renamed by an other master.
+ * To simulate this, send a delete operation using another DN but
+ * the same UUID has the entry that has been used in the tests above.
+ * Finally check that the delete operation has been applied.
+ */
+ // send a delete operation with a wong dn but the unique ID of the entry
+ // used above
+ DeleteMsg delMsg =
+ new DeleteMsg("cn=anotherdn,ou=People,dc=example,dc=com",
+ gen.NewChangeNumber(), user1entryUUID);
+ broker.publish(delMsg);
+
+ // check that the delete operation has been applied
+ resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+
+ assertNull(resultEntry,
+ "The DELETE synchronization message was not replayed");
+
+ /*
+ * Test that two adds with the same DN but a different unique ID result
+ * cause a conflict and result in the second entry to be renamed.
+ */
+
+ // create an entry with a given DN and unique ID
+ addMsg = new AddMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID, baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ broker.publish(addMsg);
+
+ // Check that the entry has been created in the local DS.
+ resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+ assertNotNull(resultEntry,
+ "The ADD synchronization message was not applied");
+ entryList.add(resultEntry);
+
+ // create an entry with the same DN and another unique ID
+ addMsg = new AddMsg(gen.NewChangeNumber(),
+ personWithSecondUniqueID.getDN().toString(),
+ user1entrysecondUUID, baseUUID,
+ personWithSecondUniqueID.getObjectClassAttribute(),
+ personWithSecondUniqueID.getAttributes(), new ArrayList<Attribute>());
+ broker.publish(addMsg);
+
+ // Check that the entry has been renamed and created in the local DS.
+ resultEntry = getEntry(
+ DN.decode("entryuuid=" + user1entrysecondUUID +" + " + user1dn), 1000);
+ assertNotNull(resultEntry,
+ "The ADD synchronization message was not applied");
+
+ // delete the entries to clean the database.
+ delMsg =
+ new DeleteMsg(personWithUUIDEntry.getDN().toString(),
+ gen.NewChangeNumber(), user1entryUUID);
+ broker.publish(delMsg);
+ delMsg =
+ new DeleteMsg(personWithSecondUniqueID.getDN().toString(),
+ gen.NewChangeNumber(), user1entrysecondUUID);
+ broker.publish(delMsg);
+ resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+
+ // check that the delete operation has been applied
+ assertNull(resultEntry,
+ "The DELETE synchronization message was not replayed");
+ /*
+ * Check that and added entry is correctly added below it's
+ * parent entry when this parent entry has been renamed.
+ *
+ * Simulate this by trying to add an entry below a DN that does not
+ * exist but with a parent ID that exist.
+ */
+ addMsg = new AddMsg(gen.NewChangeNumber(),
+ "uid=new person,o=nothere,o=below,ou=People,dc=example,dc=com",
+ user1entryUUID,
+ baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ broker.publish(addMsg);
+
+ // Check that the entry has been renamed and created in the local DS.
+ resultEntry = getEntry(
+ DN.decode("uid=new person,ou=People,dc=example,dc=com"), 1000);
+ assertNotNull(resultEntry,
+ "The ADD synchronization message was not applied");
+
+ /*
+ * Check that when replaying delete the naming conflict code
+ * verify that the unique ID op the replayed operation is
+ * the same as the unique ID of the entry with the given DN
+ *
+ * To achieve this send a delete operation with a correct DN
+ * but a wrong unique ID.
+ */
+
+ // delete the entry to clean the database
+ delMsg =
+ new DeleteMsg("uid=new person,ou=People,dc=example,dc=com",
+ gen.NewChangeNumber(), "11111111-9abc-def0-1234-1234567890ab");
+ broker.publish(delMsg);
+ resultEntry = getEntry(
+ DN.decode("uid=new person,ou=People,dc=example,dc=com"), 1000);
+
+ // check that the delete operation has not been applied
+ assertNotNull(resultEntry,
+ "The DELETE synchronization message was replayed when it should not");
+
+ // delete the entry to clean the database
+ delMsg =
+ new DeleteMsg("uid=new person,ou=People,dc=example,dc=com",
+ gen.NewChangeNumber(), user1entryUUID);
+ broker.publish(delMsg);
+ resultEntry = getEntry(
+ DN.decode("uid=new person,ou=People,dc=example,dc=com"), 1000);
+
+ // check that the delete operation has been applied
+ assertNull(resultEntry,
+ "The DELETE synchronization message was not replayed");
+
+ broker.stop();
+ }
+
+ /**
+ * Tests done using directly the ChangelogBroker interface.
+ */
+ @Test(enabled=true)
public void updateOperations() throws Exception
{
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
+
+ /*
+ * Test that operations done on this server are sent to the
+ * changelog server and forwarded to our changelog broker session.
+ */
+
+ // Create an Entry (add operation)
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, personEntry.getDN(), personEntry
+ .getObjectClasses(), personEntry.getUserAttributes(), personEntry
+ .getOperationalAttributes());
+ addOp.run();
+ entryList.add(personEntry);
+ assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+ "The Add Entry operation failed");
+
+ // Check if the client has received the msg
+ SynchronizationMessage msg = broker.receive();
+ assertTrue(msg instanceof AddMsg,
+ "The received synchronization message is not an ADD msg");
+ AddMsg addMsg = (AddMsg) msg;
+
+ Operation receivedOp = addMsg.createOperation(connection);
+ assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+ "The received synchronization message is not an ADD msg");
+
+ assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+ "The received ADD synchronization message is not for the excepted DN");
+
+ // Modify the entry
+ List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+
+ ModifyOperation modOp = new ModifyOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, personEntry.getDN(), mods);
+ modOp.setInternalOperation(true);
+ modOp.run();
+
+ // See if the client has received the msg
+ msg = broker.receive();
+ assertTrue(msg instanceof ModifyMsg,
+ "The received synchronization message is not a MODIFY msg");
+ ModifyMsg modMsg = (ModifyMsg) msg;
+
+ receivedOp = modMsg.createOperation(connection);
+ assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+ "The received MODIFY synchronization message is not for the excepted DN");
+
+ // Modify the entry DN
+ DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
+ ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, personEntry.getDN(), RDN
+ .decode("uid=new person"), true, DN
+ .decode("ou=People,dc=example,dc=com"));
+ modDNOp.run();
+ assertNotNull(DirectoryServer.getEntry(newDN),
+ "The MOD_DN operation didn't create the new person entry");
+ assertNull(DirectoryServer.getEntry(personEntry.getDN()),
+ "The MOD_DN operation didn't delete the old person entry");
+ entryList.add(DirectoryServer.getEntry(newDN));
+
+ // See if the client has received the msg
+ msg = broker.receive();
+ assertTrue(msg instanceof ModifyDNMsg,
+ "The received synchronization message is not a MODIFY DN msg");
+ ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
+ receivedOp = moddnMsg.createOperation(connection);
+
+ assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+ "The received MODIFY_DN message is not for the excepted DN");
+
+ // Delete the entry
+ Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
+ DeleteOperation delOp = new DeleteOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, DN
+ .decode("uid= new person,ou=People,dc=example,dc=com"));
+ delOp.run();
+ assertNull(DirectoryServer.getEntry(newDN),
+ "Unable to delete the new person Entry");
+ entryList.remove(newPersonEntry);
+
+ // See if the client has received the msg
+ msg = broker.receive();
+ assertTrue(msg instanceof DeleteMsg,
+ "The received synchronization message is not a MODIFY DN msg");
+ DeleteMsg delMsg = (DeleteMsg) msg;
+ receivedOp = delMsg.createOperation(connection);
+ assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
+ .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
+ "The received DELETE message is not for the excepted DN");
+
+ /*
+ * Now check that when we send message to the Changelog server
+ * and that they are received and correctly replayed by the server.
+ *
+ * Start by testing the Add message reception
+ */
+ addMsg = new AddMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID, baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ broker.publish(addMsg);
+
+ /*
+ * Check that the entry has been created in the local DS.
+ */
+ Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+ assertNotNull(resultEntry,
+ "The send ADD synchronization message was not applied");
+ entryList.add(resultEntry);
+
+ /*
+ * Test the reception of Modify Msg
+ */
+ modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
+ mods, user1entryUUID);
+ broker.publish(modMsg);
+
+ boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+ "telephonenumber", "01 02 45", 1000);
+
+ if (found == false)
+ fail("The modification has not been correctly replayed.");
+
+ /*
+ * Test the Reception of Modify Dn Msg
+ */
+ moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
+ gen.NewChangeNumber(),
+ user1entryUUID, null,
+ true, null, "uid= new person");
+ broker.publish(moddnMsg);
+
+ resultEntry = getEntry(
+ DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000);
+
+ assertNotNull(resultEntry,
+ "The modify DN synchronization message was not applied");
+
+ /*
+ * Test the Reception of Delete Msg
+ */
+ delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
+ gen.NewChangeNumber(), user1entryUUID);
+ broker.publish(delMsg);
+ resultEntry = getEntry(
+ DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000);
+
+ assertNull(resultEntry,
+ "The DELETE synchronization message was not replayed");
+
+ broker.stop();
+ }
+
+ /**
+ * @return
+ */
+ private List<Modification> generatemods(String attrName, String attrValue)
+ {
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
+ LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+ values.add(new AttributeValue(attrType, attrValue));
+ Attribute attr = new Attribute(attrType, attrName, values);
+ List<Modification> mods = new ArrayList<Modification>();
+ Modification mod = new Modification(ModificationType.REPLACE, attr);
+ mods.add(mod);
+ return mods;
+ }
+
+ /**
+ * Open a changelog session to the local Changelog server.
+ *
+ */
+ private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
+ throws Exception, SocketException
+ {
+ ServerState state = new ServerState(baseDn);
+ state.loadState();
+ ChangelogBroker broker = new ChangelogBroker(state, baseDn,
+ serverId, 0, 0, 0, 0);
+ ArrayList<String> servers = new ArrayList<String>(1);
+ servers.add("localhost:8989");
+ broker.start(servers);
+ broker.setSoTimeout(1000);
+ return broker;
+ }
+
+ /**
+ * Check that the entry with the given dn has the given valueString value
+ * for the given attrTypeStr attribute type.
+ */
+ private boolean checkEntryHasAttribute(DN dn, String attrTypeStr,
+ String valueString, int timeout) throws Exception
+ {
+ // Wait no more than 1 second (synchro operation has to be sent,
+ // received and replay)
+ int i = timeout/100;
+ if (i<1)
+ i=1;
+ boolean found = false;
+ while ((i> 0) && (!found))
+ {
+ Thread.sleep(100);
+ Entry newEntry = DirectoryServer.getEntry(personWithUUIDEntry.getDN());
+ if (newEntry == null)
+ fail("The entry " + personWithUUIDEntry.getDN() +
+ " has incorrectly been deleted from the database.");
+ List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
+ Attribute tmpAttr = tmpAttrList.get(0);
+
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrTypeStr, true);
+ found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
+ i-- ;
+ }
+ return found;
+ }
+
+ /**
+ * Check that the entry with the given dn has the given valueString value
+ * for the given attrTypeStr attribute type.
+ */
+ private String getEntryUUID(DN dn) throws Exception
+ {
+ // Wait no more than 1 second (synchro operation has to be sent,
+ // received and replay)
+ int i = 10;
+ if (i<1)
+ i=1;
+ String found = null;
+ while ((i> 0) && (found == null))
+ {
+ Thread.sleep(100);
+ Entry newEntry = DirectoryServer.getEntry(dn);
+ if (newEntry != null)
+ {
+ List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
+ Attribute tmpAttr = tmpAttrList.get(0);
+
+ LinkedHashSet<AttributeValue> vals = tmpAttr.getValues();
+
+ for (AttributeValue val : vals)
+ {
+ found = val.getStringValue();
+ break;
+ }
+ }
+ }
+ return found;
+ }
+
+ /**
+ * Retrieves an entry from the local Directory Server.
+ *
+ * @throws InterruptedException
+ * @throws DirectoryException
+ */
+ private Entry getEntry(DN dn, int timeout)
+ throws InterruptedException, DirectoryException
+ {
+ Entry newEntry = null ;
+ int i = timeout/100;
+ if (i<1)
+ i=1;
+ while ((i> 0) && (newEntry == null))
+ {
+ Thread.sleep(100);
+ newEntry = DirectoryServer.getEntry(dn);
+ i--;
+ }
+ return newEntry;
+ }
+
+ /**
+ * Configure the Synchronization for this test.
+ */
+ private void configureSynchronization() throws Exception
+ {
//
// Add the Multimaster synchronization plugin
DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
@@ -294,200 +870,5 @@
assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
"Unable to add the syncrhonized server");
entryList.add(synchroServerEntry);
-
- //
- // Check Server state
- DN synchroServerDN = DN.decode("ou=People,dc=example,dc=com");
- ServerState ss = MultimasterSynchronization
- .getServerState(synchroServerDN);
- // TODO Check server state field
-
- //
- // Be Client of the Change log
- // We will check that the Chanlog server send us messages
- // suffix synchronized
- String synchroServerLdif2 = "dn: cn=example2, " + synchroPluginStringDN + "\n"
- + "objectClass: top\n"
- + "objectClass: ds-cfg-synchronization-provider-config\n"
- + "cn: example\n"
- + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
- + "ds-cfg-changelog-server: localhost:8989\n"
- + "ds-cfg-directory-server-id: 2\n"
- + "ds-cfg-receive-status: true\n";
- Entry synchroServerEntry2 = TestCaseUtils
- .entryFromLdifString(synchroServerLdif2);
- ConfigEntry newConfigEntry = new ConfigEntry(synchroServerEntry2,
- DirectoryServer.getConfigEntry(DN.decode(synchroPluginStringDN)));
- SynchronizationDomain syncDomain2 = new SynchronizationDomain(
- newConfigEntry);
-
- //
- // Create an Entry (add operation)
- AddOperation addOp = new AddOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, personEntry.getDN(), personEntry
- .getObjectClasses(), personEntry.getUserAttributes(), personEntry
- .getOperationalAttributes());
- addOp.run();
- entryList.add(personEntry);
- assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
- "The Add Entry operation fails");
-
- // Check if the client has receive the msg
- UpdateMessage msg = syncDomain2.receive() ;
- Operation receivedOp = msg.createOperation(connection);
- assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
- "The received synchronization message is not an ADD msg");
- assertTrue(DN.decode(msg.getDn()).compareTo(personEntry.getDN()) == 0,
- "The received ADD synchronization message is not for the excepted DN");
-
-
- // Modify the entry
- String attrName = "telephoneNumber";
- AttributeType attrType = DirectoryServer.getAttributeType(attrName, true);
-
- LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(attrType, "01 02 45"));
- Attribute attr = new Attribute(attrType, attrName, values);
- List<Modification> mods = new ArrayList<Modification>();
- Modification mod = new Modification(ModificationType.REPLACE, attr);
- mods.add(mod);
-
- ModifyOperation modOp = new ModifyOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, personEntry.getDN(), mods);
- modOp.setInternalOperation(true);
- modOp.run();
- // TODO Check the telephoneNumber attribute
-
- // See if the client has receive the msg
- msg = syncDomain2.receive() ;
- receivedOp = msg.createOperation(connection);
- assertTrue(OperationType.MODIFY.compareTo(receivedOp.getOperationType()) == 0,
- "The received synchronization message is not a MODIFY msg");
- assertTrue(DN.decode(msg.getDn()).compareTo(personEntry.getDN()) == 0,
- "The received MODIFY synchronization message is not for the excepted DN");
-
- //
- // Modify the entry DN
- DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
- ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, personEntry.getDN(), RDN
- .decode("uid=new person"), true, DN
- .decode("ou=People,dc=example,dc=com"));
- modDNOp.run();
- assertNotNull(DirectoryServer.getEntry(newDN),
- "The MOD_DN operation didn't create the new person entry");
- assertNull(DirectoryServer.getEntry(personEntry.getDN()),
- "The MOD_DN operation didn't delete the old person entry");
- entryList.add(DirectoryServer.getEntry(newDN));
-
- // See if the client has receive the msg
- msg = syncDomain2.receive() ;
- receivedOp = msg.createOperation(connection);
- assertTrue(OperationType.MODIFY_DN.compareTo(receivedOp.getOperationType()) == 0,
- "The received synchronization message is not a MODIFY_DN msg");
- assertTrue(DN.decode(msg.getDn()).compareTo(personEntry.getDN()) == 0,
- "The received MODIFY_DN synchronization message is not for the excepted DN");
-
-
- // Delete the entry
- Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
- DeleteOperation delOp = new DeleteOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, DN
- .decode("uid= new person,ou=People,dc=example,dc=com"));
- delOp.run();
- assertNull(DirectoryServer.getEntry(newDN),
- "Unable to delete the new person Entry");
- entryList.remove(newPersonEntry);
-
- // See if the client has receive the msg
- msg = syncDomain2.receive() ;
- receivedOp = msg.createOperation(connection);
- assertTrue(OperationType.DELETE.compareTo(receivedOp.getOperationType()) == 0,
- "The received synchronization message is not a DELETE msg");
- assertTrue(DN.decode(msg.getDn()).compareTo(DN
- .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
- "The received DELETE synchronization message is not for the excepted DN");
-
- // Purge the client
- syncDomain2.shutdown() ;
-
- //
- // Be new Client of the Change log
- // We will check that when we send message to the Chanlog server
- // the synchronization domain apply those changes.
-
- // Create a new synchronization domain
- String synchroServerLdif3 = "dn: cn=example3, " + synchroPluginStringDN + "\n"
- + "objectClass: top\n"
- + "objectClass: ds-cfg-synchronization-provider-config\n"
- + "cn: example\n"
- + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
- + "ds-cfg-changelog-server: localhost:8989\n"
- + "ds-cfg-directory-server-id: 3\n"
- + "ds-cfg-receive-status: true\n";
- Entry synchroServerEntry3 = TestCaseUtils
- .entryFromLdifString(synchroServerLdif3);
- newConfigEntry = new ConfigEntry(synchroServerEntry3, DirectoryServer
- .getConfigEntry(DN.decode(synchroPluginStringDN)));
- SynchronizationDomain syncDomain3 = new SynchronizationDomain(
- newConfigEntry);
-
- //
- // Message to Create the Entry
- addOp = new AddOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, personEntry.getDN(), personEntry
- .getObjectClasses(), personEntry.getUserAttributes(), personEntry
- .getOperationalAttributes());
- syncDomain3.doPreOperation(addOp);
- addOp.setResultCode(ResultCode.SUCCESS) ;
- syncDomain3.synchronize(addOp);
-
- // Wait no more than 1 second (synchro operation has to be sent,
- // received and replay)
- Entry newEntry = null ;
- int i = 10 ;
- while ((i> 0) && (newEntry == null))
- {
- Thread.sleep(100);
- newEntry = DirectoryServer.getEntry(personEntry.getDN());
- i-- ;
- }
- newEntry = DirectoryServer.getEntry(personEntry.getDN());
- assertNotNull(newEntry,
- "The send ADD synchronization message was not applied");
- entryList.add(newEntry);
-
- // Message to Modify the new created entry
-
- // Unable to test it: the doPreOperation operation use
- // modifyOperation.getModifiedEntry() which cannot be set the
- // the public level.
- // Otherwise, code will look like:
- /*
- modOp = new ModifyOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, personEntry.getDN(), mods);
- modOp.setModifiedEntry(...);
- mms.doPreOperation(modOp);
- modOp.setResultCode(ResultCode.SUCCESS) ;
- syncDomain3.synchronize(modOp);
- // TODO Check the telephoneNumber attribute
- */
-
- // Purge the message sender
- syncDomain3.shutdown() ;
-
-
- // Check synchronization monitoring
- SynchronizationDomain syncDomain = new SynchronizationDomain(
- DirectoryServer.getConfigEntry(synchroServerEntry.getDN()));
- SynchronizationMonitor mon = new SynchronizationMonitor(syncDomain);
- ArrayList<Attribute> monData = mon.getMonitorData();
- // TODO Check Monitoring Attributes
}
}
--
Gitblit v1.10.0