From d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 18 Nov 2009 16:55:52 +0000
Subject: [PATCH] Fix for Issue 4300 : stop replication server cause OutOfMemoryError
---
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 52 +++
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java | 12
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java | 335 ++++++++++++++-------
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 372 +++++++++++++++++------
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 72 +++
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java | 18 +
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 12
7 files changed, 634 insertions(+), 239 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index ca1cf7d..c85caf4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.service.ReplicationMonitor;
import org.opends.server.tasks.TaskUtils;
@@ -174,9 +175,62 @@
*/
public class LDAPReplicationDomain extends ReplicationDomain
implements ConfigurationChangeListener<ReplicationDomainCfg>,
- AlertGenerator, InternalSearchListener
+ AlertGenerator
{
/**
+ * This class is used in the session establishment phase
+ * when no Replication Server with all the local changes has been found
+ * and we therefore need to recover them.
+ * A search is then performed on the database using this
+ * internalSearchListener.
+ */
+ private class ScanSearchListener implements InternalSearchListener
+ {
+ private ChangeNumber startingChangeNumber = null;
+ private ChangeNumber endChangeNumber = null;
+
+ public ScanSearchListener(
+ ChangeNumber startingChangeNumber,
+ ChangeNumber endChangeNumber)
+ {
+ this.startingChangeNumber = startingChangeNumber;
+ this.endChangeNumber = endChangeNumber;
+ }
+
+ @Override
+ public void handleInternalSearchEntry(
+ InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
+ throws DirectoryException
+ {
+ // Build the list of Operations that happened on this entry
+ // after startingChangeNumber and before endChangeNumber and
+ // add them to the replayOperations list
+ Iterable<FakeOperation> updates =
+ Historical.generateFakeOperations(searchEntry);
+
+ for (FakeOperation op : updates)
+ {
+ ChangeNumber cn = op.getChangeNumber();
+ if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
+ {
+ synchronized (replayOperations)
+ {
+ replayOperations.put(cn, op);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void handleInternalSearchReference(
+ InternalSearchOperation searchOperation,
+ SearchResultReference searchReference) throws DirectoryException
+ {
+ // Nothing to do.
+ }
+ }
+
+ /**
* The fully-qualified name of this class.
*/
private static final String CLASS_NAME =
@@ -398,6 +452,80 @@
}
/**
+ * The thread that is responsible to update the RS to which this domain is
+ * connected in case it is late and there is no RS which is up to date.
+ */
+ private class RSUpdater extends DirectoryThread
+ {
+ private ChangeNumber startChangeNumber;
+ protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
+ {
+ super("Replication Server Updater for server id " +
+ serverId + " and domain " + baseDn.toString());
+ this.startChangeNumber = replServerMaxChangeNumber;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ // Replication server is missing some of our changes: let's
+ // send them to him.
+ Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+ logError(message);
+
+ /*
+ * Get all the changes that have not been seen by this
+ * replication server and publish them.
+ */
+ try
+ {
+ if (buildAndPublishMissingChanges(startChangeNumber, broker))
+ {
+ message = DEBUG_CHANGES_SENT.get();
+ logError(message);
+ synchronized(replayOperations)
+ {
+ replayOperations.clear();
+ }
+ }
+ else
+ {
+ /*
+ * An error happened trying to search for the updates
+ * This server will start accepting again new updates but
+ * some inconsistencies will stay between servers.
+ * Log an error for the repair tool
+ * that will need to re-synchronize the servers.
+ */
+ message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ }
+ } catch (Exception e)
+ {
+ /*
+ * An error happened trying to search for the updates
+ * This server will start accepting again new updates but
+ * some inconsistencies will stay between servers.
+ * Log an error for the repair tool
+ * that will need to re-synchronize the servers.
+ */
+ message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ }
+ finally
+ {
+ broker.setRecoveryRequired(false);
+ }
+ }
+ }
+
+
+ /**
* Creates a new ReplicationDomain using configuration from configEntry.
*
* @param configuration The configuration of this ReplicationDomain.
@@ -490,9 +618,6 @@
saveGenerationId(generationId);
}
- startPublishService(replicationServers, window, heartbeatInterval,
- configuration.getChangetimeHeartbeatInterval());
-
/*
* ChangeNumberGenerator is used to create new unique ChangeNumbers
* for each operation done on this replication domain.
@@ -505,6 +630,9 @@
pendingChanges =
new PendingChanges(generator, this);
+ startPublishService(replicationServers, window, heartbeatInterval,
+ configuration.getChangetimeHeartbeatInterval());
+
remotePendingChanges = new RemotePendingChanges(getServerState());
// listen for changes on the configuration
@@ -4356,74 +4484,9 @@
if ((ourMaxChangeNumber != null) &&
(!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
-
- // Replication server is missing some of our changes: let's
- // send them to him.
-
- Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
- logError(message);
-
- /*
- * Get all the changes that have not been seen by this
- * replication server and populate the replayOperations
- * list.
- */
- InternalSearchOperation op = searchForChangedEntries(
- baseDn, replServerMaxChangeNumber, this);
- if (op.getResultCode() != ResultCode.SUCCESS)
- {
- /*
- * An error happened trying to search for the updates
- * This server will start accepting again new updates but
- * some inconsistencies will stay between servers.
- * Log an error for the repair tool
- * that will need to re-synchronize the servers.
- */
- message = ERR_CANNOT_RECOVER_CHANGES.get(
- baseDn.toNormalizedString());
- logError(message);
- } else
- {
- for (FakeOperation replayOp :
- replayOperations.tailMap(replServerMaxChangeNumber).values())
- {
- ChangeNumber cn = replayOp.getChangeNumber();
- /*
- * Because the entry returned by the search operation
- * can contain old historical information, it is
- * possible that some of the FakeOperation are
- * actually older than the last ChangeNumber known by
- * the Replication Server.
- * In such case don't send the operation.
- */
- if (!cn.newer(replServerMaxChangeNumber))
- {
- continue;
- }
-
- /*
- * Check if the DeleteOperation has been abandoned before
- * being processed. This is necessary because the replayOperation
- *
- */
- if (replayOp instanceof FakeDelOperation)
- {
- FakeDelOperation delOp = (FakeDelOperation) replayOp;
- if (findEntryDN(delOp.getUUID()) != null)
- {
- continue;
- }
- }
- message =
- DEBUG_SENDING_CHANGE.get(
- replayOp.getChangeNumber().toString());
- logError(message);
- session.publish(replayOp.generateMessage());
- }
- message = DEBUG_CHANGES_SENT.get();
- logError(message);
- }
- replayOperations.clear();
+ pendingChanges.setRecovering(true);
+ broker.setRecoveryRequired(true);
+ new RSUpdater(replServerMaxChangeNumber).start();
}
}
} catch (Exception e)
@@ -4437,19 +4500,124 @@
}
/**
+ * Build the list of changes that have been processed by this server
+ * after the ChangeNumber given as a parameter and publish them
+ * using the given session.
+ *
+ * @param startingChangeNumber The ChangeNumber whe we need to start the
+ * search
+ * @param session The session to use to publish the changes
+ *
+ * @return A boolean indicating he success of the
+ * operation.
+ * @throws Exception if an Exception happens during the search.
+ */
+ public boolean buildAndPublishMissingChanges(
+ ChangeNumber startingChangeNumber,
+ ReplicationBroker session)
+ throws Exception
+ {
+ // Trim the changes in replayOperations that are older than
+ // the startingChangeNumber.
+ synchronized (replayOperations)
+ {
+ Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
+ while (it.hasNext())
+ {
+ if (it.next().olderOrEqual(startingChangeNumber))
+ {
+ it.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ ChangeNumber lastRetrievedChange = null;
+ long missingChangesDelta;
+ InternalSearchOperation op;
+ ChangeNumber currentStartChangeNumber = startingChangeNumber;
+ do
+ {
+ lastRetrievedChange = null;
+ // We can't do the search in one go because we need to
+ // store the results so that we are sure we send the operations
+ // in order and because the list might be large
+ // So we search by interval of 10 seconds
+ // and store the results in the replayOperations list
+ // so that they are sorted before sending them.
+ missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
+ ChangeNumber endChangeNumber =
+ new ChangeNumber(
+ missingChangesDelta, 0xffffffff, serverId);
+
+ ScanSearchListener listener =
+ new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
+ op = searchForChangedEntries(
+ baseDn, currentStartChangeNumber, endChangeNumber, listener);
+
+ // Publish and remove all the changes from the replayOperations list
+ // that are older than the endChangeNumber.
+ LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
+ synchronized (replayOperations)
+ {
+ Iterator<FakeOperation> itOp = replayOperations.values().iterator();
+ while (itOp.hasNext())
+ {
+ FakeOperation fakeOp = itOp.next();
+ if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
+ && state.cover(fakeOp.getChangeNumber()))
+ {
+ lastRetrievedChange = fakeOp.getChangeNumber();
+ opsToSend.add(fakeOp);
+ itOp.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ for (FakeOperation opToSend : opsToSend)
+ {
+ session.publishRecovery(opToSend.generateMessage());
+ }
+ opsToSend.clear();
+ if (lastRetrievedChange != null)
+ {
+ currentStartChangeNumber = lastRetrievedChange;
+ }
+ else
+ {
+ currentStartChangeNumber = endChangeNumber;
+ }
+
+ } while (pendingChanges.RecoveryUntil(lastRetrievedChange) &&
+ (op.getResultCode().equals(ResultCode.SUCCESS)));
+
+ return op.getResultCode().equals(ResultCode.SUCCESS);
+ }
+
+
+ /**
* Search for the changes that happened since fromChangeNumber
* based on the historical attribute. The only changes that will
* be send will be the one generated on the serverId provided in
* fromChangeNumber.
* @param baseDn the base DN
- * @param fromChangeNumber The change number from which we want the changes
- * @param resultListener that will process the entries returned.
+ * @param fromChangeNumber The ChangeNumber from which we want the changes
+ * @param lastChangeNumber The max ChangeNumber that the search should return
+ * @param resultListener The listener that will process the entries returned
* @return the internal search operation
* @throws Exception when raised.
*/
public static InternalSearchOperation searchForChangedEntries(
DN baseDn,
ChangeNumber fromChangeNumber,
+ ChangeNumber lastChangeNumber,
InternalSearchListener resultListener)
throws Exception
{
@@ -4457,8 +4625,16 @@
InternalClientConnection.getRootConnection();
Integer serverId = fromChangeNumber.getServerId();
- String maxValueForId = "ffffffffffffffff" +
- String.format("%04x", serverId) + "ffffffff";
+ String maxValueForId;
+ if (lastChangeNumber == null)
+ {
+ maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
+ + "ffffffff";
+ }
+ else
+ {
+ maxValueForId = lastChangeNumber.toString();
+ }
LDAPFilter filter = LDAPFilter.decode(
"(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
@@ -4479,36 +4655,24 @@
}
/**
- * {@inheritDoc}
+ * Search for the changes that happened since fromChangeNumber
+ * based on the historical attribute. The only changes that will
+ * be send will be the one generated on the serverId provided in
+ * fromChangeNumber.
+ * @param baseDn the base DN
+ * @param fromChangeNumber The change number from which we want the changes
+ * @param resultListener that will process the entries returned.
+ * @return the internal search operation
+ * @throws Exception when raised.
*/
- public void handleInternalSearchEntry(
- InternalSearchOperation searchOperation,
- SearchResultEntry searchEntry)
+ public static InternalSearchOperation searchForChangedEntries(
+ DN baseDn,
+ ChangeNumber fromChangeNumber,
+ InternalSearchListener resultListener)
+ throws Exception
{
- /*
- * This call back is called at session establishment phase
- * for each entry that has been changed by this server and the changes
- * have not been sent to any Replication Server.
- * The role of this method is to build equivalent operation from
- * the historical information and add them in the replayOperations
- * table.
- */
- Iterable<FakeOperation> updates =
- Historical.generateFakeOperations(searchEntry);
- for (FakeOperation op : updates)
- {
- replayOperations.put(op.getChangeNumber(), op);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void handleInternalSearchReference(
- InternalSearchOperation searchOperation,
- SearchResultReference searchReference)
- {
- // TODO to be implemented
+ return searchForChangedEntries(
+ baseDn, fromChangeNumber, null, resultListener);
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index ad1207d..72414a3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -212,7 +212,14 @@
{
numSentUpdates++;
LDAPUpdateMsg updateMsg = firstChange.getMsg();
- domain.publish(updateMsg);
+ if (!recoveringOldChanges)
+ {
+ domain.publish(updateMsg);
+ }
+ else
+ {
+ domain.getServerState().update(updateMsg.getChangeNumber());
+ }
}
pendingChanges.remove(firstChangeNumber);
@@ -248,4 +255,47 @@
_commit(changeNumber, msg);
return _pushCommittedChanges();
}
+
+ private boolean recoveringOldChanges = false;
+ /**
+ * Set the PendingChangesList structure in a mode where it is
+ * waiting for the RS to receive all the previous changes to
+ * be sent before starting to process the changes normally.
+ * In this mode, The Domain does not publish the changes from
+ * the pendingChanges because there are older changes that
+ * need to be published before.
+ *
+ * @param b The recovering status that must be set.
+ */
+ public void setRecovering(boolean b)
+ {
+ recoveringOldChanges = b;
+ }
+
+ /**
+ * Allows to update the recovery situation by comparing the ChangeNumber of
+ * the last change that was sent to the ReplicationServer with the
+ * ChangeNumber of the last operation that was taken out of the
+ * PendingChanges list.
+ * If he two match then the recovery is completed and normal procedure can
+ * restart. Otherwise the RSUpdate thread must continue to look for
+ * older changes and no changes can be committed from the pendingChanges list.
+ *
+ * @param recovered The ChangeNumber of the last change that was published
+ * to the ReplicationServer.
+ *
+ * @return A boolean indicating if the recovery is completed (false)
+ * or must continue (true).
+ */
+
+ public synchronized boolean RecoveryUntil(ChangeNumber recovered)
+ {
+ ChangeNumber lastLocalChange = domain.getLastLocalChange();
+
+ if ((recovered != null) && (recovered.newerOrEquals(lastLocalChange)))
+ {
+ recoveringOldChanges = false;
+ }
+ return recoveringOldChanges;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
index 1f67c46..a4010f9 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -123,6 +123,18 @@
}
/**
+ * Checks that the ChangeNumber given as a parameter is in this ServerState.
+ *
+ * @param covered The ChangeNumber that should be checked.
+ * @return A boolean indicating if this ServerState contains the ChangeNumber
+ * given in parameter.
+ */
+ public boolean cover(ChangeNumber covered)
+ {
+ return state.cover(covered);
+ }
+
+ /**
* Update the Server State with a ChangeNumber.
* All operations with smaller CSN and the same serverID must be committed
* before calling this method.
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c1eb5e5..24fc481 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@
private long generationID;
private int updateDoneCount = 0;
+ private boolean connectRequiresRecovery = false;
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -694,6 +695,21 @@
rsServerId = serverInfo.getServerId();
rsServerUrl = bestServer;
+ receiveTopo(topologyMsg);
+
+ // Log a message to let the administrator know that the failure
+ // was resolved.
+ // Wakeup all the thread that were waiting on the window
+ // on the previous connection.
+ connectionError = false;
+ if (sendWindow != null)
+ {
+ sendWindow.release(Integer.MAX_VALUE);
+ }
+ sendWindow = new Semaphore(maxSendWindow);
+ rcvWindow = maxRcvWindow;
+ connected = true;
+
// May have created a broker with null replication domain for
// unit test purpose.
if (domain != null)
@@ -703,8 +719,7 @@
serverInfo.getGenerationId(),
session);
}
- receiveTopo(topologyMsg);
- connected = true;
+
if (getRsGroupId() != groupId)
{
// Connected to replication server with wrong group id:
@@ -766,17 +781,6 @@
if (connected)
{
- // Log a message to let the administrator know that the failure was
- // resolved.
- // Wakeup all the thread that were waiting on the window
- // on the previous connection.
- connectionError = false;
- if (sendWindow != null)
- {
- sendWindow.release(Integer.MAX_VALUE);
- }
- sendWindow = new Semaphore(maxSendWindow);
- rcvWindow = maxRcvWindow;
connectPhaseLock.notify();
if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
@@ -1786,6 +1790,25 @@
*/
public void publish(ReplicationMsg msg)
{
+ _publish(msg, false);
+ }
+
+ /**
+ * Publish a recovery message to the other servers.
+ * @param msg the message to publish
+ */
+ public void publishRecovery(ReplicationMsg msg)
+ {
+ _publish(msg, true);
+ }
+
+ /**
+ * Publish a message to the other servers.
+ * @param msg the message to publish
+ * @param recoveryMsg the message is a recovery Message
+ */
+ void _publish(ReplicationMsg msg, boolean recoveryMsg)
+ {
boolean done = false;
while (!done && !shutdown)
@@ -1825,6 +1848,15 @@
currentWindowSemaphore = sendWindow;
}
+ // If the Replication domain has decided that there is a need to
+ // recover some changes then it is not allowed to send this
+ // change but it will be the responsibility of the recovery thread to
+ // do it.
+ if (!recoveryMsg & connectRequiresRecovery)
+ {
+ return;
+ }
+
if (msg instanceof UpdateMsg)
{
// Acquiring the window credit must be done outside of the
@@ -2548,4 +2580,18 @@
ctHeartbeatPublisherThread = null;
}
}
+
+ /**
+ * Set the connectRequiresRecovery to the provided value.
+ * This flag is used to indicate if a recovery of Update is necessary
+ * after a reconnection to a RS.
+ * It is the responsibility of the ReplicationDomain to set it during the
+ * sessionInitiated phase.
+ *
+ * @param b the new value of the connectRequiresRecovery.
+ */
+ public void setRecoveryRequired(boolean b)
+ {
+ connectRequiresRecovery = b;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 893bfa7..96bc116 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2909,4 +2909,16 @@
{
return eClIncludes;
}
+
+ /**
+ * Returns the ChangeNUmber of the last Change that was fully processed
+ * by this ReplicationDomain.
+ *
+ * @return The ChangeNUmber of the last Change that was fully processed
+ * by this ReplicationDomain.
+ */
+ public ChangeNumber getLastLocalChange()
+ {
+ return state.getMaxChangeNumber(serverID);
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index a46136a..7ef92f1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1231,17 +1231,31 @@
* Deletes the provided entry from the Directory Server using an
* internal operation.
*
- * @param entry The entry to be added.
+ * @param entry The entry to be deleted.
*
* @throws Exception If an unexpected problem occurs.
*/
public static void deleteEntry(Entry entry)
throws Exception
{
+ deleteEntry(entry.getDN());
+ }
+
+ /**
+ * Deletes the provided entry from the Directory Server using an
+ * internal operation.
+ *
+ * @param dn The dn of entry to be deleted
+ *
+ * @throws Exception If an unexpected problem occurs.
+ */
+ public static void deleteEntry(DN dn)
+ throws Exception
+ {
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
- DeleteOperation deleteOperation = conn.processDelete(entry.getDN());
+ DeleteOperation deleteOperation = conn.processDelete(dn);
assertEquals(deleteOperation.getResultCode(), ResultCode.SUCCESS);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index e92b6f9..8f19763 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -26,127 +26,71 @@
*/
package org.opends.server.replication.plugin;
+import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.io.IOException;
import java.net.ServerSocket;
+import java.util.LinkedList;
import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
-import org.opends.server.core.AddOperationBasis;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
+import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
+import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchResultEntry;
-import org.testng.annotations.BeforeClass;
+import org.opends.server.util.TimeThread;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
/**
* Test the usage of the historical data of the replication.
*/
public class HistoricalCsnOrderingTest
-extends ReplicationTestCase
+ extends ReplicationTestCase
{
- /**
- * A "person" entry
- */
- protected Entry personEntry;
- private int replServerPort;
+ final int serverId = 123;
- /**
- * Set up the environment for performing the tests in this Class.
- *
- * @throws Exception
- * If the environment could not be set up.
- */
- @BeforeClass
- @Override
- public void setUp() throws Exception
+ public class TestBroker extends ReplicationBroker
{
- super.setUp();
+ LinkedList<ReplicationMsg> list = null;
- // Create necessary backend top level entry
- String topEntry = "dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
- + "objectClass: top\n"
- + "objectClass: organizationalUnit\n"
- + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
- addEntry(TestCaseUtils.entryFromLdifString(topEntry));
+ public TestBroker(LinkedList<ReplicationMsg> list)
+ {
+ super(null, null, null, 0, 0, (long) 0, (long) 0, null, (byte) 0, (long) 0);
+ this.list = list;
+ }
- // find a free port for the replicationServer
- ServerSocket socket = TestCaseUtils.bindFreePort();
- replServerPort = socket.getLocalPort();
- socket.close();
+ public void publishRecovery(ReplicationMsg msg)
+ {
+ list.add(msg);
+ }
- // replication server
- String replServerLdif =
- "dn: cn=Replication Server, " + SYNCHRO_PLUGIN_DN + "\n"
- + "objectClass: top\n"
- + "objectClass: ds-cfg-replication-server\n"
- + "cn: Replication Server\n"
- + "ds-cfg-replication-port: " + replServerPort + "\n"
- + "ds-cfg-replication-db-directory: HistoricalCsnOrderingTestDb\n"
- + "ds-cfg-replication-server-id: 101\n";
- replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
- // suffix synchronized
- String testName = "historicalCsnOrderingTest";
- String synchroServerLdif =
- "dn: cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN + "\n"
- + "objectClass: top\n"
- + "objectClass: ds-cfg-replication-domain\n"
- + "cn: " + testName + "\n"
- + "ds-cfg-base-dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
- + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
- + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
- synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
-
- String personLdif = "dn: uid=user.1,ou=People," + TEST_ROOT_DN_STRING + "\n"
- + "objectClass: top\n" + "objectClass: person\n"
- + "objectClass: organizationalPerson\n"
- + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
- + "homePhone: 951-245-7634\n"
- + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
- + "mobile: 027-085-0537\n"
- + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
- + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
- + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
- + "street: 17984 Thirteenth Street\n"
- + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
- + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
- + "userPassword: password\n" + "initials: AA\n";
- personEntry = TestCaseUtils.entryFromLdifString(personLdif);
-
- configureReplication();
- }
-
- /**
- * Add an entry in the database
- *
- */
- private void addEntry(Entry entry) throws Exception
- {
- AddOperationBasis addOp = new AddOperationBasis(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
- entry.getUserAttributes(), entry.getOperationalAttributes());
- addOp.setInternalOperation(true);
- addOp.run();
- assertNotNull(getEntry(entry.getDN(), 1000, true));
}
/**
@@ -182,10 +126,19 @@
* informations.
*/
@Test()
- public void changesCmpTest()
+ public void buildAndPublishMissingChangesOneEntryTest()
throws Exception
{
- final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
+ final int serverId = 123;
+ final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
+ TestCaseUtils.initializeTestBackend(true);
+ ReplicationServer rs = createReplicationServer();
+ // Create Replication Server and Domain
+ LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
+
+ try
+ {
+ long startTime = TimeThread.getTime();
final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
final AttributeType histType =
DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
@@ -246,39 +199,183 @@
"Second historical value:" + av.getValue().toString()));
}
+ LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
+ TestBroker session = new TestBroker(opList);
+
+ boolean result =
+ rd1.buildAndPublishMissingChanges(
+ new ChangeNumber(startTime, 0, serverId),
+ session);
+ assertTrue(result, "buildAndPublishMissingChanges has failed");
+ assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
+ assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
+
+
// Build a change number from the first modification
String hv[] = histValue.split(":");
- logError(Message.raw(Category.SYNC, Severity.INFORMATION,
- hv[1]));
- ChangeNumber fromChangeNumber =
- new ChangeNumber(hv[1]);
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION, hv[1]));
+ ChangeNumber fromChangeNumber = new ChangeNumber(hv[1]);
- // Retrieves the entries that have changed since the first modification
- InternalSearchOperation op =
- LDAPReplicationDomain.searchForChangedEntries(
- baseDn, fromChangeNumber, null);
+ opList = new LinkedList<ReplicationMsg>();
+ session = new TestBroker(opList);
- // The expected result is one entry .. the one previously modified
- assertEquals(op.getResultCode(), ResultCode.SUCCESS);
- assertEquals(op.getSearchEntries().size(), 1);
-
- // From the historical of this entry, rebuild operations
- // Since there have been 2 modifications and 1 add, there should be 3
- // operations rebuild from this state.
- int updatesCnt = 0;
- for (SearchResultEntry searchEntry : op.getSearchEntries())
- {
- logError(Message.raw(Category.SYNC, Severity.INFORMATION,
- searchEntry.toString()));
- Iterable<FakeOperation> updates =
- Historical.generateFakeOperations(searchEntry);
- for (FakeOperation fop : updates)
- {
- logError(Message.raw(Category.SYNC, Severity.INFORMATION,
- fop.generateMessage().toString()));
- updatesCnt++;
- }
+ result =
+ rd1.buildAndPublishMissingChanges(
+ fromChangeNumber,
+ session);
+ assertTrue(result, "buildAndPublishMissingChanges has failed");
+ assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
+ assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
}
- assertTrue(updatesCnt == 3);
+ finally
+ {
+ MultimasterReplication.deleteDomain(baseDn);
+ rs.remove();
+ }
+ }
+
+ /**
+ * Test that we can retrieve the entries that were missed by
+ * a replication server and can re-build operations from the historical
+ * informations.
+ */
+ @Test()
+ public void buildAndPublishMissingChangesSeveralEntriesTest()
+ throws Exception
+ {
+ final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
+ TestCaseUtils.initializeTestBackend(true);
+ ReplicationServer rs = createReplicationServer();
+ // Create Replication Server and Domain
+ LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
+ long startTime = TimeThread.getTime();
+
+ try
+ {
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ "Starting replication test : changesCmpTest"));
+
+ // Add 3 entries.
+ String dnTest1 = "cn=test1," + baseDn.toString();
+ String dnTest2 = "cn=test2," + baseDn.toString();
+ String dnTest3 = "cn=test3," + baseDn.toString();
+ TestCaseUtils.addEntry(
+ "dn: " + dnTest3,
+ "displayname: Test1",
+ "objectClass: top",
+ "objectClass: person",
+ "objectClass: organizationalPerson",
+ "objectClass: inetOrgPerson",
+ "cn: test1",
+ "sn: test"
+ );
+ TestCaseUtils.addEntry(
+ "dn: " + dnTest1,
+ "displayname: Test1",
+ "objectClass: top",
+ "objectClass: person",
+ "objectClass: organizationalPerson",
+ "objectClass: inetOrgPerson",
+ "cn: test1",
+ "sn: test"
+ );
+ TestCaseUtils.deleteEntry(DN.decode(dnTest3));
+ TestCaseUtils.addEntry(
+ "dn: " + dnTest2,
+ "displayname: Test1",
+ "objectClass: top",
+ "objectClass: person",
+ "objectClass: organizationalPerson",
+ "objectClass: inetOrgPerson",
+ "cn: test1",
+ "sn: test"
+ );
+
+ // Perform modifications on the 2 entries
+ int resultCode = TestCaseUtils.applyModifications(false,
+ "dn: cn=test2," + baseDn.toString(),
+ "changetype: modify",
+ "add: description",
+ "description: foo");
+ resultCode = TestCaseUtils.applyModifications(false,
+ "dn: cn=test1," + baseDn.toString(),
+ "changetype: modify",
+ "add: description",
+ "description: foo");
+ assertEquals(resultCode, 0);
+
+ LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
+ TestBroker session = new TestBroker(opList);
+
+ // Call the buildAndPublishMissingChanges and check that this method
+ // correctly generates the 4 operations in the correct order.
+ boolean result =
+ rd1.buildAndPublishMissingChanges(
+ new ChangeNumber(startTime, 0, serverId),
+ session);
+ assertTrue(result, "buildAndPublishMissingChanges has failed");
+ assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
+ ReplicationMsg msg = opList.removeFirst();
+ assertTrue(msg.getClass().equals(AddMsg.class));
+ assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
+ msg = opList.removeFirst();
+ assertTrue(msg.getClass().equals(DeleteMsg.class));
+ assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest3);
+ msg = opList.removeFirst();
+ assertTrue(msg.getClass().equals(AddMsg.class));
+ assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
+ msg = opList.removeFirst();
+ assertTrue(msg.getClass().equals(ModifyMsg.class));
+ assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
+ msg = opList.removeFirst();
+ assertTrue(msg.getClass().equals(ModifyMsg.class));
+ assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
+ }
+ finally
+ {
+ MultimasterReplication.deleteDomain(baseDn);
+ rs.remove();
+ }
+ }
+
+ SortedSet<String> replServers = new TreeSet<String>();
+ private ReplicationServer createReplicationServer() throws ConfigException
+ {
+ int rsPort;
+ try
+ {
+ ServerSocket socket1 = TestCaseUtils.bindFreePort();
+ rsPort = socket1.getLocalPort();
+ socket1.close();
+ replServers.add("localhost:" + rsPort);
+
+
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering",
+ 0, 1, 0, 100, replServers, 1, 1000, 5000);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ replicationServer.clearDb();
+ return replicationServer;
+ }
+ catch (IOException e)
+ {
+ fail("Unable to determinate some free ports " +
+ stackTraceToSingleLineString(e));
+ return null;
+ }
+ }
+
+ private LDAPReplicationDomain createReplicationDomain(int dsId)
+ throws DirectoryException, ConfigException
+ {
+ DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
+ DomainFakeCfg domainConf =
+ new DomainFakeCfg(baseDn, dsId, replServers, AssuredType.NOT_ASSURED,
+ 2, 1, 0, null);
+ LDAPReplicationDomain replicationDomain =
+ MultimasterReplication.createNewDomain(domainConf);
+ replicationDomain.start();
+
+ return replicationDomain;
}
}
--
Gitblit v1.10.0