From d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 18 Nov 2009 16:55:52 +0000
Subject: [PATCH] Fix for Issue 4300 : stop replication server cause OutOfMemoryError
---
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 372 ++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 268 insertions(+), 104 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index ca1cf7d..c85caf4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.service.ReplicationMonitor;
import org.opends.server.tasks.TaskUtils;
@@ -174,9 +175,62 @@
*/
public class LDAPReplicationDomain extends ReplicationDomain
implements ConfigurationChangeListener<ReplicationDomainCfg>,
- AlertGenerator, InternalSearchListener
+ AlertGenerator
{
/**
+ * This class is used in the session establishment phase
+ * when no Replication Server with all the local changes has been found
+ * and we therefore need to recover them.
+ * A search is then performed on the database using this
+ * internalSearchListener.
+ */
+ private class ScanSearchListener implements InternalSearchListener
+ {
+ private ChangeNumber startingChangeNumber = null;
+ private ChangeNumber endChangeNumber = null;
+
+ public ScanSearchListener(
+ ChangeNumber startingChangeNumber,
+ ChangeNumber endChangeNumber)
+ {
+ this.startingChangeNumber = startingChangeNumber;
+ this.endChangeNumber = endChangeNumber;
+ }
+
+ @Override
+ public void handleInternalSearchEntry(
+ InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
+ throws DirectoryException
+ {
+ // Build the list of Operations that happened on this entry
+ // after startingChangeNumber and before endChangeNumber and
+ // add them to the replayOperations list
+ Iterable<FakeOperation> updates =
+ Historical.generateFakeOperations(searchEntry);
+
+ for (FakeOperation op : updates)
+ {
+ ChangeNumber cn = op.getChangeNumber();
+ if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
+ {
+ synchronized (replayOperations)
+ {
+ replayOperations.put(cn, op);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void handleInternalSearchReference(
+ InternalSearchOperation searchOperation,
+ SearchResultReference searchReference) throws DirectoryException
+ {
+ // Nothing to do.
+ }
+ }
+
+ /**
* The fully-qualified name of this class.
*/
private static final String CLASS_NAME =
@@ -398,6 +452,80 @@
}
/**
+ * The thread that is responsible to update the RS to which this domain is
+ * connected in case it is late and there is no RS which is up to date.
+ */
+ private class RSUpdater extends DirectoryThread
+ {
+ private ChangeNumber startChangeNumber;
+ protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
+ {
+ super("Replication Server Updater for server id " +
+ serverId + " and domain " + baseDn.toString());
+ this.startChangeNumber = replServerMaxChangeNumber;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ // Replication server is missing some of our changes: let's
+ // send them to him.
+ Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+ logError(message);
+
+ /*
+ * Get all the changes that have not been seen by this
+ * replication server and publish them.
+ */
+ try
+ {
+ if (buildAndPublishMissingChanges(startChangeNumber, broker))
+ {
+ message = DEBUG_CHANGES_SENT.get();
+ logError(message);
+ synchronized(replayOperations)
+ {
+ replayOperations.clear();
+ }
+ }
+ else
+ {
+ /*
+ * An error happened trying to search for the updates
+ * This server will start accepting again new updates but
+ * some inconsistencies will stay between servers.
+ * Log an error for the repair tool
+ * that will need to re-synchronize the servers.
+ */
+ message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ }
+ } catch (Exception e)
+ {
+ /*
+ * An error happened trying to search for the updates
+ * This server will start accepting again new updates but
+ * some inconsistencies will stay between servers.
+ * Log an error for the repair tool
+ * that will need to re-synchronize the servers.
+ */
+ message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ }
+ finally
+ {
+ broker.setRecoveryRequired(false);
+ }
+ }
+ }
+
+
+ /**
* Creates a new ReplicationDomain using configuration from configEntry.
*
* @param configuration The configuration of this ReplicationDomain.
@@ -490,9 +618,6 @@
saveGenerationId(generationId);
}
- startPublishService(replicationServers, window, heartbeatInterval,
- configuration.getChangetimeHeartbeatInterval());
-
/*
* ChangeNumberGenerator is used to create new unique ChangeNumbers
* for each operation done on this replication domain.
@@ -505,6 +630,9 @@
pendingChanges =
new PendingChanges(generator, this);
+ startPublishService(replicationServers, window, heartbeatInterval,
+ configuration.getChangetimeHeartbeatInterval());
+
remotePendingChanges = new RemotePendingChanges(getServerState());
// listen for changes on the configuration
@@ -4356,74 +4484,9 @@
if ((ourMaxChangeNumber != null) &&
(!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
-
- // Replication server is missing some of our changes: let's
- // send them to him.
-
- Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
- logError(message);
-
- /*
- * Get all the changes that have not been seen by this
- * replication server and populate the replayOperations
- * list.
- */
- InternalSearchOperation op = searchForChangedEntries(
- baseDn, replServerMaxChangeNumber, this);
- if (op.getResultCode() != ResultCode.SUCCESS)
- {
- /*
- * An error happened trying to search for the updates
- * This server will start accepting again new updates but
- * some inconsistencies will stay between servers.
- * Log an error for the repair tool
- * that will need to re-synchronize the servers.
- */
- message = ERR_CANNOT_RECOVER_CHANGES.get(
- baseDn.toNormalizedString());
- logError(message);
- } else
- {
- for (FakeOperation replayOp :
- replayOperations.tailMap(replServerMaxChangeNumber).values())
- {
- ChangeNumber cn = replayOp.getChangeNumber();
- /*
- * Because the entry returned by the search operation
- * can contain old historical information, it is
- * possible that some of the FakeOperation are
- * actually older than the last ChangeNumber known by
- * the Replication Server.
- * In such case don't send the operation.
- */
- if (!cn.newer(replServerMaxChangeNumber))
- {
- continue;
- }
-
- /*
- * Check if the DeleteOperation has been abandoned before
- * being processed. This is necessary because the replayOperation
- *
- */
- if (replayOp instanceof FakeDelOperation)
- {
- FakeDelOperation delOp = (FakeDelOperation) replayOp;
- if (findEntryDN(delOp.getUUID()) != null)
- {
- continue;
- }
- }
- message =
- DEBUG_SENDING_CHANGE.get(
- replayOp.getChangeNumber().toString());
- logError(message);
- session.publish(replayOp.generateMessage());
- }
- message = DEBUG_CHANGES_SENT.get();
- logError(message);
- }
- replayOperations.clear();
+ pendingChanges.setRecovering(true);
+ broker.setRecoveryRequired(true);
+ new RSUpdater(replServerMaxChangeNumber).start();
}
}
} catch (Exception e)
@@ -4437,19 +4500,124 @@
}
/**
+ * Build the list of changes that have been processed by this server
+ * after the ChangeNumber given as a parameter and publish them
+ * using the given session.
+ *
+ * @param startingChangeNumber The ChangeNumber whe we need to start the
+ * search
+ * @param session The session to use to publish the changes
+ *
+ * @return A boolean indicating he success of the
+ * operation.
+ * @throws Exception if an Exception happens during the search.
+ */
+ public boolean buildAndPublishMissingChanges(
+ ChangeNumber startingChangeNumber,
+ ReplicationBroker session)
+ throws Exception
+ {
+ // Trim the changes in replayOperations that are older than
+ // the startingChangeNumber.
+ synchronized (replayOperations)
+ {
+ Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
+ while (it.hasNext())
+ {
+ if (it.next().olderOrEqual(startingChangeNumber))
+ {
+ it.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ ChangeNumber lastRetrievedChange = null;
+ long missingChangesDelta;
+ InternalSearchOperation op;
+ ChangeNumber currentStartChangeNumber = startingChangeNumber;
+ do
+ {
+ lastRetrievedChange = null;
+ // We can't do the search in one go because we need to
+ // store the results so that we are sure we send the operations
+ // in order and because the list might be large
+ // So we search by interval of 10 seconds
+ // and store the results in the replayOperations list
+ // so that they are sorted before sending them.
+ missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
+ ChangeNumber endChangeNumber =
+ new ChangeNumber(
+ missingChangesDelta, 0xffffffff, serverId);
+
+ ScanSearchListener listener =
+ new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
+ op = searchForChangedEntries(
+ baseDn, currentStartChangeNumber, endChangeNumber, listener);
+
+ // Publish and remove all the changes from the replayOperations list
+ // that are older than the endChangeNumber.
+ LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
+ synchronized (replayOperations)
+ {
+ Iterator<FakeOperation> itOp = replayOperations.values().iterator();
+ while (itOp.hasNext())
+ {
+ FakeOperation fakeOp = itOp.next();
+ if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
+ && state.cover(fakeOp.getChangeNumber()))
+ {
+ lastRetrievedChange = fakeOp.getChangeNumber();
+ opsToSend.add(fakeOp);
+ itOp.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ for (FakeOperation opToSend : opsToSend)
+ {
+ session.publishRecovery(opToSend.generateMessage());
+ }
+ opsToSend.clear();
+ if (lastRetrievedChange != null)
+ {
+ currentStartChangeNumber = lastRetrievedChange;
+ }
+ else
+ {
+ currentStartChangeNumber = endChangeNumber;
+ }
+
+ } while (pendingChanges.RecoveryUntil(lastRetrievedChange) &&
+ (op.getResultCode().equals(ResultCode.SUCCESS)));
+
+ return op.getResultCode().equals(ResultCode.SUCCESS);
+ }
+
+
+ /**
* Search for the changes that happened since fromChangeNumber
* based on the historical attribute. The only changes that will
* be send will be the one generated on the serverId provided in
* fromChangeNumber.
* @param baseDn the base DN
- * @param fromChangeNumber The change number from which we want the changes
- * @param resultListener that will process the entries returned.
+ * @param fromChangeNumber The ChangeNumber from which we want the changes
+ * @param lastChangeNumber The max ChangeNumber that the search should return
+ * @param resultListener The listener that will process the entries returned
* @return the internal search operation
* @throws Exception when raised.
*/
public static InternalSearchOperation searchForChangedEntries(
DN baseDn,
ChangeNumber fromChangeNumber,
+ ChangeNumber lastChangeNumber,
InternalSearchListener resultListener)
throws Exception
{
@@ -4457,8 +4625,16 @@
InternalClientConnection.getRootConnection();
Integer serverId = fromChangeNumber.getServerId();
- String maxValueForId = "ffffffffffffffff" +
- String.format("%04x", serverId) + "ffffffff";
+ String maxValueForId;
+ if (lastChangeNumber == null)
+ {
+ maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
+ + "ffffffff";
+ }
+ else
+ {
+ maxValueForId = lastChangeNumber.toString();
+ }
LDAPFilter filter = LDAPFilter.decode(
"(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
@@ -4479,36 +4655,24 @@
}
/**
- * {@inheritDoc}
+ * Search for the changes that happened since fromChangeNumber
+ * based on the historical attribute. The only changes that will
+ * be send will be the one generated on the serverId provided in
+ * fromChangeNumber.
+ * @param baseDn the base DN
+ * @param fromChangeNumber The change number from which we want the changes
+ * @param resultListener that will process the entries returned.
+ * @return the internal search operation
+ * @throws Exception when raised.
*/
- public void handleInternalSearchEntry(
- InternalSearchOperation searchOperation,
- SearchResultEntry searchEntry)
+ public static InternalSearchOperation searchForChangedEntries(
+ DN baseDn,
+ ChangeNumber fromChangeNumber,
+ InternalSearchListener resultListener)
+ throws Exception
{
- /*
- * This call back is called at session establishment phase
- * for each entry that has been changed by this server and the changes
- * have not been sent to any Replication Server.
- * The role of this method is to build equivalent operation from
- * the historical information and add them in the replayOperations
- * table.
- */
- Iterable<FakeOperation> updates =
- Historical.generateFakeOperations(searchEntry);
- for (FakeOperation op : updates)
- {
- replayOperations.put(op.getChangeNumber(), op);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void handleInternalSearchReference(
- InternalSearchOperation searchOperation,
- SearchResultReference searchReference)
- {
- // TODO to be implemented
+ return searchForChangedEntries(
+ baseDn, fromChangeNumber, null, resultListener);
}
--
Gitblit v1.10.0