From ea629fa971db08f2267b50522360563a8fec7f86 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 104 ++++++++++++++++++++++++++++-----------------------
1 files changed, 57 insertions(+), 47 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index f597000..775dbc5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -67,7 +67,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -535,7 +534,8 @@
throws ConfigException
{
super(configuration.getBaseDN().toNormalizedString(),
- configuration.getServerId());
+ configuration.getServerId(),
+ configuration.getInitializationWindowSize());
/**
* The time in milliseconds between heartbeats from the replication
@@ -1786,7 +1786,7 @@
*/
@Override
protected void initializeRemote(int target, int requestorID,
- Task initTask) throws DirectoryException
+ Task initTask, int initWindow) throws DirectoryException
{
if ((target == RoutableMsg.ALL_SERVERS) && fractionalConfig.isFractional())
{
@@ -1795,7 +1795,7 @@
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
} else
{
- super.initializeRemote(target, requestorID, initTask);
+ super.initializeRemote(target, requestorID, initTask, this.initWindow);
}
}
@@ -2449,23 +2449,25 @@
*/
public void shutdown()
{
- // stop the flush thread
- shutdown = true;
-
- // stop the thread in charge of flushing the ServerState.
- if (flushThread != null)
+ if (!shutdown)
{
- synchronized (flushThread)
+ shutdown = true;
+
+ // stop the thread in charge of flushing the ServerState.
+ if (flushThread != null)
{
- flushThread.notify();
+ synchronized (flushThread)
+ {
+ flushThread.notify();
+ }
}
+
+ DirectoryServer.deregisterAlertGenerator(this);
+
+ // stop the ReplicationDomain
+ stopDomain();
}
- DirectoryServer.deregisterAlertGenerator(this);
-
- // stop the ReplicationDomain
- stopDomain();
-
// wait for completion of the persistentServerState thread.
try
{
@@ -3945,16 +3947,15 @@
}
/**
- * This method should trigger an import of the replicated data.
+ * This method triggers an import of the replicated data.
*
- * @param input The InputStream from which
+ * @param input The InputStream from which the data are read.
* @throws DirectoryException When needed.
*/
@Override
- public void importBackend(InputStream input) throws DirectoryException
+ protected void importBackend(InputStream input) throws DirectoryException
{
LDIFImportConfig importConfig = null;
- DirectoryException de = null;
Backend backend = retrievesBackend(baseDn);
@@ -3964,8 +3965,9 @@
{
Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
backend.getBackendID().toString());
- logError(message);
- de = new DirectoryException(ResultCode.OTHER, message);
+ if (ieContext.getException() == null)
+ ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ message));
}
else
{
@@ -3997,30 +3999,33 @@
}
catch(Exception e)
{
- de = new DirectoryException(ResultCode.OTHER,
- Message.raw(e.getLocalizedMessage()));
+ if (ieContext.getException() == null)
+ ieContext.setException(new DirectoryException(
+ ResultCode.OTHER,
+ ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
}
finally
{
- // Cleanup
- if (importConfig != null)
- {
- importConfig.close();
-
- // Re-enable backend
- closeBackendImport(backend);
-
- backend = retrievesBackend(baseDn);
- }
-
try
{
+ // Cleanup
+ if (importConfig != null)
+ {
+ importConfig.close();
+ closeBackendImport(backend); // Re-enable backend
+ backend = retrievesBackend(baseDn);
+ }
+
loadDataState();
- if (debugEnabled())
- TRACER.debugInfo(
- "After import, the replication plugin restarts connections" +
- " to all RSs to provide new generation ID=" + generationId);
+ if (ieContext.getException() != null)
+ {
+ // When an error occurred during an import, most of times
+ // the generationId coming in the root entry of the imported data,
+ // is not valid anymore (partial data in the backend).
+ generationId = computeGenerationId();
+ saveGenerationId(generationId);
+ }
}
catch (DirectoryException fe)
{
@@ -4029,15 +4034,15 @@
// so we don't bother about the new Exception.
// However if there was no Exception before we want
// to return this Exception to the task creator.
- if (de == null)
- de = fe;
+ if (ieContext.getException() == null)
+ ieContext.setException(new DirectoryException(
+ ResultCode.OTHER,
+ ERR_INIT_IMPORT_FAILURE.get(fe.getLocalizedMessage())));
}
}
- // Sends up the root error.
- if (de != null)
- {
- throw de;
- }
+
+ if (ieContext.getException() != null)
+ throw ieContext.getException();
}
/**
@@ -4810,14 +4815,19 @@
}
ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_INVALID_IMPORT_SOURCE.get();
if (cause != null)
{
+ Message message = ERR_INVALID_IMPORT_SOURCE.get(
+ baseDn.toNormalizedString(), Integer.toString(serverId),
+ Integer.toString(source),"Details:" + cause.getLocalizedMessage());
throw new DirectoryException(
resultCode, message, cause);
}
else
{
+ Message message = ERR_INVALID_IMPORT_SOURCE.get(
+ baseDn.toNormalizedString(), Integer.toString(serverId),
+ Integer.toString(source),"");
throw new DirectoryException(
resultCode, message);
}
--
Gitblit v1.10.0