From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 1236 +++++++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 965 insertions(+), 271 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 7729aa3..2ee1ec0 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -72,8 +72,9 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.opends.messages.Category;
import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.loggers.debug.DebugTracer;
@@ -91,11 +92,13 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
+import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -284,6 +287,15 @@
private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
new HashMap<Integer,Integer>();
+ /**
+ * Window size used during initialization .. between
+ * - the initializer/exporter DS that listens/waits acknowledges and that
+ * slows down data msg publishing based on the slowest server
+ * - and each initialized/importer DS that publishes acknowledges each
+ * WINDOW/2 data msg received.
+ */
+ protected int initWindow = 100;
+
/* Status related monitoring fields */
// Indicates the date when the status changed. This may be used to indicate
@@ -328,6 +340,28 @@
* to the Replication Domain.
* This identifier should be different for each server that
* is participating to a given Replication Domain.
+ * @param initWindow Window used during initialization.
+ */
+ public ReplicationDomain(String serviceID, int serverID,int initWindow)
+ {
+ this.serviceID = serviceID;
+ this.serverID = serverID;
+ this.initWindow = initWindow;
+ this.state = new ServerState();
+ this.generator = new ChangeNumberGenerator(serverID, state);
+
+ domains.put(serviceID, this);
+ }
+
+ /**
+ * Creates a ReplicationDomain with the provided parameters.
+ *
+ * @param serviceID The identifier of the Replication Domain to which
+ * this object is participating.
+ * @param serverID The identifier of the server that is participating
+ * to the Replication Domain.
+ * This identifier should be different for each server that
+ * is participating to a given Replication Domain.
*/
public ReplicationDomain(String serviceID, int serverID)
{
@@ -557,6 +591,22 @@
}
/**
+ * Check if a remote replica (DS) is connected to the topology based on
+ * the TopologyMsg we received when the remote replica connected or
+ * disconnected.
+ *
+ * @param serverId The provided serverId of the remote replica
+ * @return whether the remote replica is connected or not.
+ */
+ public boolean isRemoteDSConnected(int serverId)
+ {
+ for (DSInfo remoteDS : getReplicasList())
+ if (remoteDS.getDsId() == serverId)
+ return true;
+ return false;
+ }
+
+ /**
* Gets the States of all the Replicas currently in the
* Topology.
* When this method is called, a Monitoring message will be sent
@@ -708,7 +758,8 @@
/**
* Receives an update message from the replicationServer.
- * also responsible for updating the list of pending changes
+ * The other types of messages are processed in an opaque way for the caller.
+ * Also responsible for updating the list of pending changes
* @return the received message - null if none
*/
UpdateMsg receive()
@@ -717,11 +768,11 @@
while (update == null)
{
- InitializeRequestMsg initMsg = null;
+ InitializeRequestMsg initReqMsg = null;
ReplicationMsg msg;
try
{
- msg = broker.receive(true);
+ msg = broker.receive(true, true, false);
if (msg == null)
{
// The server is in the shutdown process
@@ -741,54 +792,58 @@
{
// Another server requests us to provide entries
// for a total update
- initMsg = (InitializeRequestMsg)msg;
+ initReqMsg = (InitializeRequestMsg)msg;
}
else if (msg instanceof InitializeTargetMsg)
{
// Another server is exporting its entries to us
- InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
+ InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
- try
- {
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
- initialize(importMsg);
- }
- catch(DirectoryException de)
- {
- // Returns an error message to notify the sender
- ErrorMsg errorMsg =
- new ErrorMsg(importMsg.getsenderID(),
- de.getMessageObject());
- MessageBuilder mb = new MessageBuilder();
- mb.append(de.getMessageObject());
- TRACER.debugInfo(Message.toString(mb.toMessage()));
- broker.publish(errorMsg);
- logError(de.getMessageObject());
- }
+ // This must be done while we are still holding the
+ // broker lock because we are now going to receive a
+ // bunch of entries from the remote server and we
+ // want the import thread to catch them and
+ // not the ListenerThread.
+ initialize(initTargetMsg, initTargetMsg.getSenderID());
}
else if (msg instanceof ErrorMsg)
{
+ ErrorMsg errorMsg = (ErrorMsg)msg;
if (ieContext != null)
{
// This is an error termination for the 2 following cases :
// - either during an export
// - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
- abandonImportExport((ErrorMsg)msg);
+ // For example, when we publish a request and the
+ // replicationServer did not find the import source.
+ //
+ // A remote error during the import will be received in the
+ // receiveEntryBytes() method.
+ //
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] processErrorMsg:" + this.serverID +
+ " serviceID: " + this.serviceID +
+ " Error Msg received: " + errorMsg);
+
+ if (errorMsg.getCreationTime() > ieContext.startTime)
+ {
+ // consider only ErrorMsg that relate to the current import/export
+ processErrorMsg(errorMsg);
+ }
+ else
+ {
+ // Simply log - happen when the ErrorMsg relates to a previous
+ // attempt of initialization while we have started a new one
+ // on this side.
+ logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
+ }
}
else
{
- /*
- * Log error message
- */
- ErrorMsg errorMsg = (ErrorMsg)msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
+ // Simply log - happen if import/export has been terminated
+ // on our side before receiving this ErrorMsg.
+ logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
}
}
else if (msg instanceof ChangeStatusMsg)
@@ -801,6 +856,15 @@
update = (UpdateMsg) msg;
generator.adjust(update.getChangeNumber());
}
+ else if (msg instanceof InitializeRcvAckMsg)
+ {
+ if (ieContext != null)
+ {
+ InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
+ ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
+ }
+ // Trash this msg When no input/export is running/should never happen
+ }
}
catch (SocketTimeoutException e)
{
@@ -815,10 +879,11 @@
// entries to the remote can be handled by the other
// replay thread when they call this method and therefore the
// broker.receive() method.
- if (initMsg != null)
+ if (initReqMsg != null)
{
// Do this work in a thread to allow replay thread continue working
- ExportThread exportThread = new ExportThread(initMsg.getsenderID());
+ ExportThread exportThread = new ExportThread(
+ initReqMsg.getSenderID(), initReqMsg.getInitWindow());
exportThread.start();
}
}
@@ -989,23 +1054,29 @@
*/
/**
- * This thread is launched when we want to export data to another server that
- * has requested to be initialized with the data of our backend.
+ * This thread is launched when we want to export data to another server.
+ *
+ * When a task is created locally (so this local server is the initiator)
+ * of the export (Exemple: dsreplication initialize-all),
+ * this thread is NOT used but the task thread is running the export instead).
*/
private class ExportThread extends DirectoryThread
{
- // Id of server that will receive updates
- private int target;
+ // Id of server that will be initialized
+ private int serverToInitialize;
+ private int initWindow;
/**
* Constructor for the ExportThread.
*
- * @param i Id of server that will receive updates
+ * @param serverToInitialize serverId of server that will receive entries
*/
- public ExportThread(int i)
+ public ExportThread(int serverToInitialize, int initWindow)
{
- super("Export thread " + serverID);
- this.target = i;
+ super("Export thread from serverId=" + serverID
+ + " to serverId=" + serverToInitialize);
+ this.serverToInitialize = serverToInitialize;
+ this.initWindow = initWindow;
}
/**
@@ -1015,22 +1086,20 @@
public void run()
{
if (debugEnabled())
- {
- TRACER.debugInfo("Export thread starting.");
- }
-
+ TRACER.debugInfo("[IE] starting " + this.getName());
try
{
- initializeRemote(target, target, null);
+ initializeRemote(serverToInitialize, serverToInitialize, null,
+ initWindow);
} catch (DirectoryException de)
{
- // An error message has been sent to the peer
- // Nothing more to do locally
+ // An error message has been sent to the peer
+ // This server is not the initiator of the export so there is
+ // nothing more to do locally.
}
+
if (debugEnabled())
- {
- TRACER.debugInfo("Export thread stopping.");
- }
+ TRACER.debugInfo("[IE] ending " + this.getName());
}
}
@@ -1052,13 +1121,49 @@
// The count for the entry not yet processed
long entryLeftCount = 0;
- // The exception raised when any
+ // Exception raised during the initialization.
DirectoryException exception = null;
- // A boolean indicating if the context is related to an
- // import or an export.
+ // Whether the context is related to an import or an export.
boolean importInProgress;
+ // Current counter of messages exchanged during the initialization
+ int msgCnt = 0;
+
+ // Number of connections lost when we start the initialization.
+ // Will help counting connections lost during initialization,
+ int initNumLostConnections = 0;
+
+ // Request message sent when this server has the initializeFromRemote task.
+ InitializeRequestMsg initReqMsgSent = null;
+
+ // Start time of the initialization process. ErrorMsg timestamped
+ // before thi startTime will be ignored.
+ long startTime;
+
+ // List fo replicas (DS) connected to the topology when
+ // initialization started.
+ Set<Integer> startList = new HashSet<Integer>(0);
+
+ // List fo replicas (DS) with a failure (disconnected from the topology)
+ // since the initialization started.
+ Set<Integer> failureList = new HashSet<Integer>(0);
+
+ // Flow control during initialization
+ // - for each remote server, counter of messages received
+ private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
+ // - serverId of the slowest server (the one with the smallest non null
+ // counter)
+ private int slowestServerId = -1;
+
+ short exporterProtocolVersion = -1;
+
+ // Window used during this initialization
+ int initWindow;
+
+ // Number of attempt already done for this initialization
+ short attemptCnt;
+
/**
* Creates a new IEContext.
*
@@ -1069,19 +1174,21 @@
public IEContext(boolean importInProgress)
{
this.importInProgress = importInProgress;
+ this.startTime = System.currentTimeMillis();
+ this.attemptCnt = 0;
+
}
/**
* Initializes the import/export counters with the provider value.
* @param total Total number of entries to be processed.
- * @param left Remaining number of entries to be processed.
* @throws DirectoryException if an error occurred.
*/
- public void setCounters(long total, long left)
+ private void initializeCounters(long total)
throws DirectoryException
{
entryCount = total;
- entryLeftCount = left;
+ entryLeftCount = total;
if (initializeTask != null)
{
@@ -1193,7 +1300,42 @@
{
this.exception = exception;
}
- }
+
+ /**
+ * Set the id of the EntryMsg acknowledged from a receiver (importer)server.
+ * (updated via the listener thread)
+ * @param serverId serverId of the acknowledger/receiver/importer server.
+ * @param numAck id of the message received.
+ */
+ public void setAckVal(int serverId, int numAck)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck);
+
+ this.ackVals.put(serverId, numAck);
+
+ // Recompute the server with the minAck returned,means the slowest server.
+ slowestServerId = serverId;
+ for (Integer sid : ieContext.ackVals.keySet())
+ if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
+ slowestServerId = sid;
+ }
+
+ /**
+ * Returns the serverId of the server that acknowledged the smallest
+ * EntryMsg id.
+ * @return serverId of the server with latest acknowledge.
+ * 0 when no ack has been received yet.
+ */
+ public int getSlowestServer()
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] getSlowestServer" + slowestServerId
+ + " " + this.ackVals.get(slowestServerId));
+
+ return this.slowestServerId;
+ }
+}
/**
* Verifies that the given string represents a valid source
* from which this server can be initialized.
@@ -1260,34 +1402,10 @@
public void initializeRemote(int target, Task initTask)
throws DirectoryException
{
- initializeRemote(target, serverID, initTask);
- if (target == RoutableMsg.ALL_SERVERS)
- {
- // Check for the status of all remote servers to check if they
- // are all finished with the import.
- boolean done = true;
- do
- {
- done = true;
- for (DSInfo dsi : getReplicasList())
- {
- if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
- {
- done = false;
- try
- {
- Thread.sleep(100);
- } catch (InterruptedException e)
- {
- // just loop again.
- }
- break;
- }
- }
- }
- while (!done);
- }
+ initializeRemote(target, this.serverID, initTask, this.initWindow);
+
+
}
/**
@@ -1295,76 +1413,332 @@
* specified by the target argument when this initialization specifying the
* server that requests the initialization.
*
- * @param target The target that should be initialized.
- * @param target2 The server that initiated the export.
- * @param initTask The task that triggers this initialization and that should
- * be updated with its progress.
+ * @param serverToInitialize The target server that should be initialized.
+ * @param serverRunningTheTask The server that initiated the export. It can
+ * be the serverID of this server, or the serverID of a remote server.
+ * @param initTask The task in this server that triggers this initialization
+ * and that should be updated with its progress. Null when the export is done
+ * following a request coming from a remote server (task is remote).
+ * @param initWindow The value of the initialization window for flow control
+ * between the importer and the exporter.
*
- * @exception DirectoryException When an error occurs.
+ * @exception DirectoryException When an error occurs. No exception raised
+ * means success.
*/
- protected void initializeRemote(int target, int target2,
- Task initTask) throws DirectoryException
+ protected void initializeRemote(int serverToInitialize,
+ int serverRunningTheTask, Task initTask, int initWindow)
+ throws DirectoryException
{
- Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
- Integer.toString(serverID),
- serviceID,
- Integer.toString(target2));
- logError(msg);
+ DirectoryException exportRootException = null;
+ boolean contextAcquired = false;
- boolean contextAcquired=false;
-
+ // Acquire and initialize the export context
acquireIEContext(false);
contextAcquired = true;
- ieContext.exportTarget = target;
- if (initTask != null)
+ Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
+ Integer.toString(serverID), Long.toString(countEntries()), serviceID,
+ Integer.toString(serverToInitialize));
+ logError(msg);
+
+ // We manage the list of servers to initialize in order :
+ // - to test at the end that all expected servers have reconnected
+ // after their import and with the right genId
+ // - to update the task with the server(s) where this test failed
+
+ if (serverToInitialize == RoutableMsg.ALL_SERVERS)
+ for (DSInfo dsi : getReplicasList())
+ ieContext.startList.add(dsi.getDsId());
+ else
+ ieContext.startList.add(serverToInitialize);
+
+ // We manage the list of servers with which a flow control can be enabled
+ for (DSInfo dsi : getReplicasList())
{
- ieContext.initializeTask = initTask;
+ if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ ieContext.setAckVal(dsi.getDsId(), 0);
}
- // The number of entries to be exported is the number of entries under
- // the base DN entry and the base entry itself.
- long entryCount = this.countEntries();
-
-
- ieContext.setCounters(entryCount, entryCount);
-
- // Send start message to the peer
- InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
- serviceID, serverID, target, target2, entryCount);
-
- broker.publish(initializeMessage);
-
- try
+ // loop for the case where the exporter is the initiator
+ int attempt = 0;
+ boolean done = false;
+ while ((!done) && (++attempt<2)) // attempt loop
{
- exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
+ try
+ {
+ ieContext.exportTarget = serverToInitialize;
+ if (initTask != null)
+ ieContext.initializeTask = initTask;
+ ieContext.initializeCounters(this.countEntries());
+ ieContext.msgCnt = 0;
+ ieContext.initNumLostConnections = broker.getNumLostConnections();
+ ieContext.initWindow = initWindow;
- // Notify the peer of the success
- DoneMsg doneMsg = new DoneMsg(serverID,
- initializeMessage.getDestination());
- broker.publish(doneMsg);
+ // Send start message to the peer
+ InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
+ serviceID, serverID, serverToInitialize, serverRunningTheTask,
+ ieContext.entryCount, initWindow);
+ broker.publish(initTargetMsg);
+
+ // Wait for all servers to be ok
+ waitForRemoteStartOfInit();
+
+ // Servers that left in the list are those for which we could not test
+ // that they have been successfully initialized.
+ if (!ieContext.failureList.isEmpty())
+ {
+ throw new DirectoryException(
+ ResultCode.OTHER,
+ ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
+ ieContext.failureList.toString()));
+ }
+
+ exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
+
+ // Notify the peer of the success
+ DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
+ broker.publish(doneMsg);
+
+ }
+ catch(DirectoryException exportException)
+ {
+ // Give priority to the first exception raised - stored in the context
+ if (ieContext.exception != null)
+ exportRootException = ieContext.exception;
+ else
+ exportRootException = exportException;
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] In " + this.monitor.getMonitorInstanceName()
+ + " export ends with "
+ + " connected=" + broker.isConnected()
+ + " exportRootException=" + exportRootException);
+
+ if (exportRootException != null)
+ {
+ try
+ {
+ // Handling the errors during export
+
+ // Note: we could have lost the connection and another thread
+ // the listener one) has already managed to reconnect.
+ // So we MUST rely on the test broker.isConnected()
+ // ONLY to do 'wait to be reconnected by another thread'
+ // (if not yet reconnected already).
+
+ if (!broker.isConnected())
+ {
+ // We are still disconnected, so we wait for the listener thread
+ // to reconnect - wait 10s
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] Exporter wait for reconnection by the listener thread");
+ int att=0;
+ while ((!broker.shuttingDown()) &&
+ (!broker.isConnected())&& (++att<100))
+ try { Thread.sleep(100); } catch(Exception e){}
+ }
+
+ if ((initTask != null) && broker.isConnected() &&
+ (serverToInitialize != RoutableMsg.ALL_SERVERS))
+ {
+ // NewAttempt case : In the case where
+ // - it's not an InitializeAll
+ // - AND the previous export attempt failed
+ // - AND we are (now) connected
+ // - and we own the task and this task is not an InitializeAll
+ // Let's :
+ // - sleep to let time to the other peer to reconnect if needed
+ // - and launch another attempt
+ try { Thread.sleep(1000); } catch(Exception e){}
+ logError(NOTE_RESENDING_INIT_TARGET.get((exportRootException!=null?
+ exportRootException.getLocalizedMessage():"")));
+
+ continue;
+ }
+
+ ErrorMsg errorMsg =
+ new ErrorMsg(serverToInitialize,
+ exportRootException.getMessageObject());
+ broker.publish(errorMsg);
+ }
+ catch(Exception e)
+ {
+ // Ignore the failure raised while proceeding the root failure
+ }
+ }
+
+ // We are always done for this export ...
+ // ... except in the NewAttempt case (see above)
+ done = true;
+
+ } // attempt loop
+
+ // Wait for all servers to be ok, and build the failure list
+ waitForRemoteEndOfInit();
+
+ // Servers that left in the list are those for which we could not test
+ // that they have been successfully initialized.
+ if (!ieContext.failureList.isEmpty())
+ {
+ if (exportRootException == null)
+ exportRootException = new DirectoryException(ResultCode.OTHER,
+ ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
+ Long.toString(getGenerationID()),
+ ieContext.failureList.toString()));
+ }
+
+ if (contextAcquired)
releaseIEContext();
- }
- catch(DirectoryException de)
- {
- // Notify the peer of the failure
- ErrorMsg errorMsg =
- new ErrorMsg(target,
- de.getMessageObject());
- broker.publish(errorMsg);
-
- if (contextAcquired)
- releaseIEContext();
-
- throw(de);
- }
msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
Integer.toString(serverID),
- serviceID,
- Integer.toString(target2));
+ serviceID,
+ Integer.toString(serverToInitialize),
+ (exportRootException!=null?exportRootException.getLocalizedMessage():""));
logError(msg);
+
+ if (exportRootException != null)
+ {
+ throw(exportRootException);
+ }
+
+ }
+
+ /*
+ * For all remote servers in tht start list,
+ * - wait it has finished the import and present the expected generationID
+ * - build the failureList
+ */
+ private void waitForRemoteStartOfInit()
+ {
+ int waitResultAttempt = 0;
+ Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
+
+ for (Integer sid : ieContext.startList)
+ replicasWeAreWaitingFor.add(sid);
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
+
+ boolean done = true;
+ do
+ {
+ done = true;
+ for (DSInfo dsi : getReplicasList())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] wait for start dsid " + dsi.getDsId()
+ + " " + dsi.getStatus()
+ + " " + dsi.getGenerationId()
+ + " " + this.getGenerationID());
+ if (ieContext.startList.contains(dsi.getDsId()))
+ {
+ if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
+ {
+ // this one is still not doing the Full Update ... retry later
+ done = false;
+ try
+ { Thread.sleep(100); } catch (InterruptedException e) {}
+ waitResultAttempt++;
+ break;
+ }
+ else
+ {
+ // this one is ok
+ replicasWeAreWaitingFor.remove(dsi.getDsId());
+ }
+ }
+ }
+ }
+ while ((!done) && (waitResultAttempt<1200) // 2mn
+ && (!broker.shuttingDown()));
+
+ // Add to the failure list the servers that were here at start time but
+ // that never ended with the right generationId.
+ for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
+ ieContext.failureList.add(sid);
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] wait for start ends with " + ieContext.failureList);
+ }
+
+ /*
+ * For all remote servers in tht start list,
+ * - wait it has finished the import and present the expected generationID
+ * - build the failureList
+ */
+ private void waitForRemoteEndOfInit()
+ {
+ int waitResultAttempt = 0;
+ Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
+
+ for (Integer sid : ieContext.startList)
+ replicasWeAreWaitingFor.add(sid);
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
+
+ // In case some new servers appear during the init, we want them to be
+ // considered in the processing of sorting the successfully initialized
+ // and the others
+ for (DSInfo dsi : getReplicasList())
+ replicasWeAreWaitingFor.add(dsi.getDsId());
+
+ boolean done = true;
+ do
+ {
+ done = true;
+ for (DSInfo dsi : getReplicasList())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] wait for end dsid " + dsi.getDsId()
+ + " " + dsi.getStatus()
+ + " " + dsi.getGenerationId()
+ + " " + this.getGenerationID());
+ if (!ieContext.failureList.contains(dsi.getDsId()))
+ {
+ if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
+ {
+ // this one is still doing the Full Update ... retry later
+ done = false;
+ try
+ { Thread.sleep(1000); } catch (InterruptedException e) {} // 1s
+ waitResultAttempt++;
+ break;
+ }
+ else
+ {
+ // this one is done with the Full Update
+ if (dsi.getGenerationId() == this.getGenerationID())
+ {
+ // and with the expected generationId
+ replicasWeAreWaitingFor.remove(dsi.getDsId());
+ }
+ }
+ }
+ }
+ }
+ while ((!done) && (!broker.shuttingDown())); // infinite wait
+
+ // Add to the failure list the servers that were here at start time but
+ // that never ended with the right generationId.
+ for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0]))
+ ieContext.failureList.add(sid);
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] wait for end ends with " + ieContext.failureList);
+
+
}
/**
@@ -1398,33 +1772,42 @@
}
/**
- * Processes an error message received while an import/export is
- * on going.
+ * Processes an error message received while an export is
+ * on going, or an import will start.
+ *
* @param errorMsg The error message received.
*/
- void abandonImportExport(ErrorMsg errorMsg)
+ private void processErrorMsg(ErrorMsg errorMsg)
{
- // FIXME TBD Treat the case where the error happens while entries
- // are being exported
-
- if (debugEnabled())
- TRACER.debugVerbose(
- " abandonImportExport:" + this.serverID +
- " serviceID: " + this.serviceID +
- " Error Msg received: " + errorMsg);
-
if (ieContext != null)
{
- ieContext.setException(new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails()));
-
- if (ieContext.initializeTask instanceof InitializeTask)
+ if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
{
- // Update the task that initiated the import
- ((InitializeTask)ieContext.initializeTask).
- updateTaskCompletionState(ieContext.getException());
+ // The ErrorMsg is received while we have started an initialization
+ if (ieContext.getException() == null)
+ ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ errorMsg.getDetails()));
- releaseIEContext();
+ /*
+ * This can happen :
+ * - on the first InitReqMsg sent when source in not known for example
+ * - on the next attempt when source crashed and did not reconnect
+ * even after the nextInitAttemptDelay
+ * During the import, the ErrorMsg will be received by receiveEntryBytes
+ */
+ if (ieContext.initializeTask instanceof InitializeTask)
+ {
+ // Update the task that initiated the import
+ ((InitializeTask)ieContext.initializeTask).
+ updateTaskCompletionState(ieContext.getException());
+
+ releaseIEContext();
+ }
+ }
+ else
+ {
+ // When we are the exporter in the case of initializeAll
+ // exporting must not be stopped on the first error.
}
}
}
@@ -1442,24 +1825,72 @@
{
try
{
- msg = broker.receive();
+ // In the context of the total update, we don't want any automatic
+ // re-connection done transparently by the broker because of a better
+ // RS or because of a connection failure.
+ // We want to be notified of topology change in order to track a
+ // potential disconnection of the exporter.
+ msg = broker.receive(false, false, true);
if (debugEnabled())
- TRACER.debugVerbose(
- " sid:" + serverID +
- " base DN:" + serviceID +
- " Import EntryBytes received " + msg);
+ TRACER.debugInfo(
+ "[IE] In " + this.monitor.getMonitorInstanceName() +
+ ", receiveEntryBytes " + msg);
+
if (msg == null)
{
- // The server is in the shutdown process
- return null;
+ if (broker.shuttingDown())
+ {
+ // The server is in the shutdown process
+ return null;
+ }
+ else
+ {
+ // Handle connection issues
+ if (ieContext.getException() == null)
+ ieContext.setException(new DirectoryException(
+ ResultCode.OTHER,
+ ERR_INIT_RS_DISCONNECTION_DURING_IMPORT.get(
+ broker.getReplicationServer())));
+ return null;
+ }
}
+ // Check good sequentiality of msg received
if (msg instanceof EntryMsg)
{
EntryMsg entryMsg = (EntryMsg)msg;
byte[] entryBytes = entryMsg.getEntryBytes();
ieContext.updateCounters(countEntryLimits(entryBytes));
+
+ if (ieContext.exporterProtocolVersion >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // check the msgCnt of the msg received to check sequenciality
+ if (++ieContext.msgCnt != entryMsg.getMsgId())
+ {
+ if (ieContext.getException() == null)
+ ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
+ String.valueOf(ieContext.msgCnt),
+ String.valueOf(entryMsg.getMsgId()))));
+ return null;
+ }
+
+ // send the ack of flow control mgmt
+ if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
+ {
+ InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
+ this.serverID,
+ entryMsg.getSenderID(),
+ ieContext.msgCnt);
+ broker.publish(amsg, false);
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "[IE] In " + this.monitor.getMonitorInstanceName() +
+ ", publish InitializeRcvAckMsg" + amsg);
+ }
+ }
return entryBytes;
}
else if (msg instanceof DoneMsg)
@@ -1474,22 +1905,43 @@
// This is an error termination during the import
// The error is stored and the import is ended
// by returning null
- ErrorMsg errorMsg = (ErrorMsg)msg;
- ieContext.setException(new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails()));
- return null;
+ if (ieContext.getException() == null)
+ {
+ ErrorMsg errMsg = (ErrorMsg)msg;
+ if (errMsg.getCreationTime() > ieContext.startTime)
+ {
+ ieContext.setException(
+ new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
+ return null;
+ }
+ }
}
else
{
- // Other messages received during an import are trashed
+ // Other messages received during an import are trashed except
+ // the topologyMsg.
+ if ((msg instanceof TopologyMsg) &&
+ (!this.isRemoteDSConnected(ieContext.importSource)))
+ {
+ Message errMsg =
+ Message.raw(Category.SYNC, Severity.NOTICE,
+ ERR_INIT_EXPORTER_DISCONNECTION.get(
+ this.serviceID,
+ Integer.toString(this.serverID),
+ Integer.toString(ieContext.importSource)));
+ if (ieContext.getException()==null)
+ ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ errMsg));
+ return null;
+ }
}
}
catch(Exception e)
{
// TODO: i18n
- ieContext.setException(new DirectoryException(ResultCode.OTHER,
- Message.raw("received an unexpected message type" +
- e.getLocalizedMessage())));
+ if (ieContext.getException() == null)
+ ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
}
}
}
@@ -1540,27 +1992,108 @@
*
* @throws IOException when an error occurred.
*/
- void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) throws IOException
+ public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
+ throws IOException
{
- // If an error was raised - like receiving an ErrorMsg
- // we just let down the export.
- if (ieContext.getException() != null)
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
+
+ // build the message
+ EntryMsg entryMessage = new EntryMsg(
+ serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
+ ++ieContext.msgCnt);
+
+ // Waiting the slowest loop
+ while (!broker.shuttingDown())
{
- IOException ioe = new IOException(ieContext.getException().getMessage());
- ieContext = null;
- throw ioe;
+ // If an error was raised - like receiving an ErrorMsg from a remote
+ // server that have been stored by the listener thread in the ieContext,
+ // we just abandon the export by throwing an exception.
+ if (ieContext.getException() != null)
+ throw(new IOException(ieContext.getException().getMessage()));
+
+ int slowestServerId = ieContext.getSlowestServer();
+ if (!isRemoteDSConnected(slowestServerId))
+ {
+ ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
+ Integer.toString(ieContext.getSlowestServer()))));
+ // .. and abandon the export by throwing an exception.
+ IOException ioe =
+ new IOException("IOException with nested DirectoryException");
+ ioe.initCause(ieContext.getException());
+ throw ioe;
+ }
+
+ int ourLastExportedCnt = ieContext.msgCnt;
+ int slowestCnt = ieContext.ackVals.get(slowestServerId);
+
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
+ " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
+
+ if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
+
+ // our export is too far beyond the slowest importer - let's wait
+ try { Thread.sleep(100); } catch(Exception e) {}
+
+ // process any connection error
+ if ((broker.hasConnectionError())||
+ (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
+ {
+ // publish failed - store the error in the ieContext ...
+ DirectoryException de = new DirectoryException(ResultCode.OTHER,
+ ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
+ Integer.toString(broker.getRsServerId())));
+ if (ieContext.getException() == null)
+ ieContext.setException(de);
+ // .. and abandon the export by throwing an exception.
+ throw new IOException(de.getMessage());
+ }
+ }
+ else
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] slowest got to us => stop waiting");
+ break;
+ }
+ } // Waiting the slowest loop
+
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
+
+ // publish the message
+ boolean sent = broker.publish(entryMessage, false);
+
+ // process any publish error
+ if (((!sent)||
+ (broker.hasConnectionError()))||
+ (broker.getNumLostConnections() != ieContext.initNumLostConnections))
+ {
+ // publish failed - store the error in the ieContext ...
+ DirectoryException de = new DirectoryException(ResultCode.OTHER,
+ ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
+ Integer.toString(broker.getRsServerId())));
+ if (ieContext.getException() == null)
+ ieContext.setException(de);
+ // .. and abandon the export by throwing an exception.
+ throw new IOException(de.getMessage());
}
- EntryMsg entryMessage = new EntryMsg(
- serverID,ieContext.getExportTarget(), lDIFEntry, pos, length);
- broker.publish(entryMessage);
-
+ // publish succeeded
try
{
ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
}
catch (DirectoryException de)
{
+ // store the error in the ieContext ...
+ if (ieContext.getException() == null)
+ ieContext.setException(de);
+ // .. and abandon the export by throwing an exception.
throw new IOException(de.getMessage());
}
}
@@ -1614,127 +2147,285 @@
}
/**
- * Initializes this domain from another source server.
+ * Initializes asynchronously this domain from a remote source server.
+ * Before returning from this call, for the provided task :
+ * - the progressing counters are updated during the initialization using
+ * setTotal() and setLeft().
+ * - the end of the initialization using updateTaskCompletionState().
* <p>
- * When this method is called, a request for initialization will
- * be sent to the source server asking for initialization.
+ * When this method is called, a request for initialization is sent to the
+ * remote source server requesting initialization.
* <p>
- * The {@link #exportBackend(OutputStream)} will therefore be called
- * on the source server, and the {@link #importBackend(InputStream)}
- * will be called on his server.
- * <p>
- * The InputStream and OutpuStream given as a parameter to those
- * methods will be connected through the replication protocol.
*
* @param source The server-id of the source from which to initialize.
* The source can be discovered using the
* {@link #getReplicasList()} method.
+ *
* @param initTask The task that launched the initialization
* and should be updated of its progress.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
+ * The task state is updated.
*/
public void initializeFromRemote(int source, Task initTask)
throws DirectoryException
{
+ Message errMsg = null;
+
if (debugEnabled())
- TRACER.debugInfo("Entering initializeFromRemote");
+ TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this);
if (!broker.isConnected())
{
- if (initTask instanceof InitializeTask)
- {
- InitializeTask task = (InitializeTask) initTask;
- task.updateTaskCompletionState(
- new DirectoryException(
- ResultCode.OTHER, ERR_INITIALIZATION_FAILED_NOCONN.get(
- getServiceID())));
- }
- return;
+ errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
}
- acquireIEContext(true);
- ieContext.initializeTask = initTask;
+ // We must not test here whether the remote source is connected to
+ // the topology by testing if it stands in the replicas list since.
+ // In the case of a re-attempt of initialization, the listener thread is
+ // running this method directly coming from initailize() method and did
+ // not processed any topology message in between the failure and the
+ // new attempt.
+ try
+ {
+ // We must immediatly acquire a context to store the task inside
+ // The context will be used when we (the listener thread) will receive
+ // the InitializeTargetMsg, process the import, and at the end
+ // update the task.
- InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
- serviceID, serverID, source);
+ acquireIEContext(true); //test and set if no import already in progress
+ ieContext.initializeTask = initTask;
+ ieContext.attemptCnt = 0;
+ ieContext.initReqMsgSent = new InitializeRequestMsg(
+ serviceID, serverID, source, this.initWindow);
- // Publish Init request msg
- broker.publish(initializeMsg);
+ // Publish Init request msg
+ broker.publish(ieContext.initReqMsgSent);
- // .. we expect to receive entries or err after that
+ // The normal success processing is now to receive InitTargetMsg then
+ // entries from the remote server.
+ // The error cases are :
+ // - either local error immediatly caught below
+ // - a remote error we will receive as an ErrorMsg
+ }
+ catch(DirectoryException de)
+ {
+ errMsg = de.getMessageObject();
+ }
+ catch(Exception e)
+ {
+ // Should not happen
+ errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
+ e.getLocalizedMessage());
+ logError(errMsg);
+ }
+
+ // When error, update the task and raise the error to the caller
+ if (errMsg != null)
+ {
+ // No need to call here updateTaskCompletionState - will be done
+ // by the caller
+ releaseIEContext();
+ DirectoryException de = new DirectoryException(
+ ResultCode.OTHER,
+ errMsg);
+ throw (de);
+ }
}
/**
- * Initializes the domain's backend with received entries.
- * @param initializeMessage The message that initiated the import.
- * @exception DirectoryException Thrown when an error occurs.
+ * Processes an InitializeTargetMsg received from a remote server
+ * meaning processes an initialization from the entries expected to be
+ * received now.
+ *
+ * @param initTargetMsgReceived The message received from the remote server.
+ *
+ * @param requestorServerId The serverId of the server that requested the
+ * initialization meaning the server where the
+ * task has initially been created (this server,
+ * or the remote server).
*/
- void initialize(InitializeTargetMsg initializeMessage)
- throws DirectoryException
+ void initialize(InitializeTargetMsg initTargetMsgReceived,
+ int requestorServerId)
{
- DirectoryException de = null;
+ InitializeTask initFromtask = null;
- Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
- Integer.toString(serverID),
- serviceID,
- Long.toString(initializeMessage.getRequestorID()));
- logError(msg);
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
- // Go into full update status
- setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
-
- if (initializeMessage.getRequestorID() == serverID)
- {
- // The import responds to a request we did so the IEContext
- // is already acquired
- }
- else
- {
- acquireIEContext(true);
- }
-
- ieContext.importSource = initializeMessage.getsenderID();
- ieContext.entryLeftCount = initializeMessage.getEntryCount();
- ieContext.setCounters(
- initializeMessage.getEntryCount(),
- initializeMessage.getEntryCount());
+ int source = initTargetMsgReceived.getSenderID();
try
{
+ // Log starting
+ Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
+ Integer.toString(serverID),
+ serviceID,
+ Long.toString(initTargetMsgReceived.getInitiatorID()));
+ logError(msg);
+
+ // Go into full update status
+ setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
+
+ // Acquire an import context if no already done (and initialize).
+ if (initTargetMsgReceived.getInitiatorID() == this.serverID)
+ {
+ // The initTargetMsgReceived received is the answer to a request that
+ // we (this server) sent previously. In this case, so the IEContext
+ // has been already acquired when the request was published in order
+ // to store the task (to be updated with the status at the end).
+ }
+ else
+ {
+ // The initTargetMsgReceived is for an import initiated by the remote
+ // server.
+ // Test and set if no import already in progress
+ acquireIEContext(true);
+ }
+
+ // Initialize stuff
+ ieContext.importSource = source;
+ ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
+ ieContext.initWindow = initTargetMsgReceived.getInitWindow();
+ // Protocol version is -1 when not known.
+ ieContext.exporterProtocolVersion = getProtocolVersion(source);
+ initFromtask = (InitializeTask)ieContext.initializeTask;
+
+ // Lauch the import
importBackend(new ReplInputStream(this));
- broker.reStart();
+
}
catch (DirectoryException e)
{
- de = e;
+ // Store the exception raised. It will be considered if no other exception
+ // has been previously stored in the context
+ if (ieContext.getException() == null)
+ ieContext.setException(e);
}
finally
{
- if ((ieContext != null) && (ieContext.getException() != null))
- de = ieContext.getException();
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] Domain=" + this
+ + " ends import with exception=" + ieContext.getException()
+ + " connected=" + broker.isConnected());
- // Update the task that initiated the import
- if ((ieContext != null ) && (ieContext.initializeTask != null))
+ // It is necessary to restart (reconnect to RS) for different reasons
+ // - when everything went well, reconnect in order to exchange
+ // new state, new generation ID
+ // - when we have connection failure, reconnect to retry a new import
+ // right here, right now
+ // we never want retryOnFailure if we fails reconnecting in the restart.
+ broker.reStart(false);
+
+ if (ieContext.getException() != null)
{
- ((InitializeTask)ieContext.initializeTask).
- updateTaskCompletionState(de);
+ if (broker.isConnected() && (initFromtask != null)
+ && (++ieContext.attemptCnt<2))
+ {
+ // Worth a new attempt
+ // since initFromtask is in this server, connection is ok
+ try
+ {
+
+ // Wait for the exporter to stabilize - eventually reconnect as
+ // well if it was connected to the same RS than the one we lost ...
+ Thread.sleep(1000);
+
+ // Restart the whole import protocol exchange by sending again
+ // the request
+ logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
+ ieContext.getException().getLocalizedMessage()));
+
+ broker.publish(ieContext.initReqMsgSent);
+
+ ieContext.initializeCounters(0);
+ ieContext.exception = null;
+ ieContext.msgCnt = 0;
+
+ // Processing of the received initTargetMsgReceived is done
+ // let's wait for the next one
+ return;
+ }
+ catch(Exception e)
+ {
+ // An error occurs when sending a new request for a new import.
+ // This error is not stored, prefering to keep the initial one.
+ logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
+ e.getLocalizedMessage(),
+ ieContext.getException().getLocalizedMessage()));
+ }
+ }
}
- releaseIEContext();
- }
- // Sends up the root error.
- if (de != null)
+ // ===================
+ // No new attempt case
+
+ if (debugEnabled())
+ TRACER.debugInfo("[IE] Domain=" + this
+ + " ends initialization with exception=" + ieContext.getException()
+ + " connected=" + broker.isConnected()
+ + " task=" + initFromtask
+ + " attempt=" + ieContext.attemptCnt);
+
+ try
+ {
+ if (broker.isConnected() && (ieContext.getException() != null))
+ {
+ // Let's notify the exporter
+ ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
+ ieContext.getException().getMessageObject());
+ broker.publish(errorMsg);
+ }
+ else // !broker.isConnected()
+ {
+ // Don't try to reconnect here.
+ // The current running thread is the listener thread and will loop on
+ // receive() that is expected to manage reconnects attempt.
+ }
+
+ // Update the task that initiated the import must be the last thing.
+ // Particularly, broker.restart() after import success must be done
+ // before some other operations/tasks to be launched,
+ // like resetting the generation ID.
+ if (initFromtask != null)
+ {
+ initFromtask.updateTaskCompletionState(ieContext.getException());
+ }
+ }
+ finally
+ {
+
+ Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
+ Integer.toString(serverID),
+ serviceID,
+ Long.toString(initTargetMsgReceived.getInitiatorID()),
+ (ieContext.getException()!=null?
+ ieContext.getException().getLocalizedMessage():""));
+ logError(msg);
+ releaseIEContext();
+ } // finally
+ } // finally
+ }
+
+ /**
+ * Return the protocol version of the DS related to the provided serverid.
+ * Returns -1 when the protocol version is not known.
+ * @param dsServerId The provided serverid.
+ * @return The procotol version.
+ */
+ short getProtocolVersion(int dsServerId)
+ {
+ short protocolVersion = -1;
+ for (DSInfo dsi : getReplicasList())
{
- throw de;
+ if (dsi.getDsId() == dsServerId)
+ {
+ protocolVersion = dsi.getProtocolVersion();
+ break;
+ }
}
-
- msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
- Integer.toString(serverID),
- serviceID,
- Long.toString(initializeMessage.getRequestorID()));
- logError(msg);
+ return protocolVersion;
}
/**
@@ -1887,15 +2578,7 @@
if (debugEnabled())
TRACER.debugInfo(
"Server id " + serverID + " and domain " + serviceID
- + "resetGenerationId" + generationIdNewValue);
-
- if (!isConnected())
- {
- ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
- throw new DirectoryException(
- resultCode, message);
- }
+ + " resetGenerationId " + generationIdNewValue);
ResetGenerationIdMsg genIdMessage = null;
@@ -1907,6 +2590,16 @@
{
genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
}
+
+ if (!isConnected())
+ {
+ ResultCode resultCode = ResultCode.OTHER;
+ Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID,
+ Integer.toString(serverID),
+ Long.toString(genIdMessage.getGenerationId()));
+ throw new DirectoryException(
+ resultCode, message);
+ }
broker.publish(genIdMessage);
// check that at least one ReplicationServer did change its generation-id
@@ -2410,6 +3103,7 @@
// Wait for the listener thread to stop
if (listenerThread != null)
listenerThread.waitForShutdown();
+
}
/**
--
Gitblit v1.10.0