From 46fd9423ab622d7f9531aa1564846ec52fe09534 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Wed, 24 Apr 2013 12:44:51 +0000
Subject: [PATCH] Replication Cleanup.
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 589 +++++++++++++++++++++++++++++-----------------------------
1 files changed, 292 insertions(+), 297 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index f9ab3eb..8ceab31 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -39,6 +39,7 @@
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -165,8 +166,8 @@
* to be able to correlate all the coming back acks to the original
* operation.
*/
- private final SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
- new TreeMap<ChangeNumber, UpdateMsg>();
+ private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs =
+ new ConcurrentHashMap<ChangeNumber, UpdateMsg>();
/**
@@ -243,7 +244,7 @@
// that have not been successfully acknowledged (either because of timeout,
// wrong status or error at replay) for a particular server (DS or RS). String
// format: <server id>:<number of failed updates>
- private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
+ private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
new HashMap<Integer,Integer>();
// Number of updates received in Assured Mode, Safe Read request
private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
@@ -264,7 +265,7 @@
// Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
// that have not been successfully acknowledged because of timeout for a
// particular RS. String format: <server id>:<number of failed updates>
- private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
+ private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
new HashMap<Integer,Integer>();
/**
@@ -278,10 +279,12 @@
/* Status related monitoring fields */
- // Indicates the date when the status changed. This may be used to indicate
- // the date the session with the current replication server started (when
- // status is NORMAL for instance). All the above assured monitoring fields
- // are also reset each time the status is changed
+ /**
+ * Indicates the date when the status changed. This may be used to indicate
+ * the date the session with the current replication server started (when
+ * status is NORMAL for instance). All the above assured monitoring fields
+ * are also reset each time the status is changed
+ */
private Date lastStatusChangeDate = new Date();
/**
@@ -583,7 +586,7 @@
}
/**
- * Returns informations about the DS server related to the provided serverId.
+ * Returns information about the DS server related to the provided serverId.
* based on the TopologyMsg we received when the remote replica connected or
* disconnected. Return null when no server with the provided serverId is
* connected.
@@ -696,8 +699,7 @@
*/
public void setURLs(Set<String> referralsUrl)
{
- for (String url : referralsUrl)
- this.refUrls.add(url);
+ this.refUrls.addAll(referralsUrl);
}
/**
@@ -793,11 +795,12 @@
// Another server is exporting its entries to us
InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
+ /*
+ This must be done while we are still holding the broker lock
+ because we are now going to receive a bunch of entries from the
+ remote server and we want the import thread to catch them and
+ not the ListenerThread.
+ */
initialize(initTargetMsg, initTargetMsg.getSenderID());
}
else if (msg instanceof ErrorMsg)
@@ -805,15 +808,16 @@
ErrorMsg errorMsg = (ErrorMsg)msg;
if (ieContext != null)
{
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find the import source.
- //
- // A remote error during the import will be received in the
- // receiveEntryBytes() method.
- //
+ /*
+ This is an error termination for the 2 following cases :
+ - either during an export
+ - or before an import really started
+ For example, when we publish a request and the
+ replicationServer did not find the import source.
+
+ A remote error during the import will be received in the
+ receiveEntryBytes() method.
+ */
if (debugEnabled())
TRACER.debugInfo(
"[IE] processErrorMsg:" + this.serverID +
@@ -827,9 +831,11 @@
}
else
{
- // Simply log - happen when the ErrorMsg relates to a previous
- // attempt of initialization while we have started a new one
- // on this side.
+ /*
+ Simply log - happen when the ErrorMsg relates to a previous
+ attempt of initialization while we have started a new one
+ on this side.
+ */
logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
}
}
@@ -864,15 +870,17 @@
{
// just retry
}
- // Test if we have received and export request message and
- // if that's the case handle it now.
- // This must be done outside of the portion of code protected
- // by the broker lock so that we keep receiveing update
- // when we are doing and export and so that a possible
- // closure of the socket happening when we are publishing the
- // entries to the remote can be handled by the other
- // replay thread when they call this method and therefore the
- // broker.receive() method.
+ /*
+ Test if we have received and export request message and
+ if that's the case handle it now.
+ This must be done outside of the portion of code protected
+ by the broker lock so that we keep receiving update
+ when we are doing and export and so that a possible
+ closure of the socket happening when we are publishing the
+ entries to the remote can be handled by the other
+ replay thread when they call this method and therefore the
+ broker.receive() method.
+ */
if (initReqMsg != null)
{
// Do this work in a thread to allow replay thread continue working
@@ -898,8 +906,8 @@
* particular server in the list. This increments the counter of error for the
* passed server, or creates an initial value of 1 error for it if the server
* is not yet present in the map.
- * @param errorList
- * @param sid
+ * @param errorsByServer map of number of errors per serverID
+ * @param sid the ID of the server which produced an error
*/
private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
Integer sid)
@@ -916,7 +924,7 @@
{
// Server already present in list, just increment number of
// errors for the server
- int val = serverErrCount.intValue();
+ int val = serverErrCount;
val++;
errorsByServer.put(sid, val);
}
@@ -935,10 +943,7 @@
// Remove the message for pending ack list (this may already make the thread
// that is waiting for the ack be aware of its reception)
- synchronized (waitingAckMsgs)
- {
- update = waitingAckMsgs.remove(changeNumber);
- }
+ update = waitingAckMsgs.remove(changeNumber);
// Signal waiting thread ack has been received
if (update != null)
@@ -957,8 +962,10 @@
if ( hasTimeout || hasReplayErrors || hasWrongStatus)
{
- // Some problems detected: message not correclty reached every requested
- // servers. Log problem
+ /*
+ Some problems detected: message did not correctly reach every
+ requested servers. Log problem
+ */
Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
serviceID, Integer.toString(serverID),
update.toString(), ack.errorsToString());
@@ -1021,27 +1028,6 @@
}
}
- /**
- * Retrieves a replication domain based on the baseDn.
- *
- * @param serviceID The identifier of the domain to retrieve.
- *
- * @return The domain retrieved.
- *
- * @throws DirectoryException When an error occurred or no domain
- * match the provided baseDn.
- */
- static ReplicationDomain retrievesReplicationDomain(String serviceID)
- throws DirectoryException
- {
- ReplicationDomain replicationDomain = domains.get(serviceID);
- if (replicationDomain == null)
- {
- throw new DirectoryException(ResultCode.OTHER,
- ERR_NO_MATCHING_DOMAIN.get(serviceID));
- }
- return replicationDomain;
- }
/*
* After this point the code is related to the Total Update.
@@ -1051,7 +1037,7 @@
* This thread is launched when we want to export data to another server.
*
* When a task is created locally (so this local server is the initiator)
- * of the export (Exemple: dsreplication initialize-all),
+ * of the export (Example: dsreplication initialize-all),
* this thread is NOT used but the task thread is running the export instead).
*/
private class ExportThread extends DirectoryThread
@@ -1095,9 +1081,11 @@
initWindow);
} catch (DirectoryException de)
{
- // An error message has been sent to the peer
- // This server is not the initiator of the export so there is
- // nothing more to do locally.
+ /*
+ An error message has been sent to the peer
+ This server is not the initiator of the export so there is
+ nothing more to do locally.
+ */
}
if (debugEnabled())
@@ -1211,29 +1199,6 @@
/**
* Update the counters of the task for each entry processed during
* an import or export.
- * @throws DirectoryException if an error occurred.
- */
- public void updateCounters()
- throws DirectoryException
- {
- entryLeftCount--;
-
- if (initializeTask != null)
- {
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setLeft(entryLeftCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
- }
- }
- }
-
- /**
- * Update the counters of the task for each entry processed during
- * an import or export.
*
* @param entriesDone The number of entries that were processed
* since the last time this method was called.
@@ -1379,7 +1344,7 @@
* on this server, and the {@code importBackend(InputStream)}
* will be called on the remote server.
* <p>
- * The InputStream and OutpuStream given as a parameter to those
+ * The InputStream and OutputStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param target The server-id of the server that should be initialized.
@@ -1394,10 +1359,7 @@
public void initializeRemote(int target, Task initTask)
throws DirectoryException
{
-
initializeRemote(target, this.serverID, initTask, this.initWindow);
-
-
}
/**
@@ -1426,10 +1388,12 @@
// Acquire and initialize the export context
acquireIEContext(false);
- // We manage the list of servers to initialize in order :
- // - to test at the end that all expected servers have reconnected
- // after their import and with the right genId
- // - to update the task with the server(s) where this test failed
+ /*
+ We manage the list of servers to initialize in order :
+ - to test at the end that all expected servers have reconnected
+ after their import and with the right genId
+ - to update the task with the server(s) where this test failed
+ */
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
@@ -1526,14 +1490,15 @@
{
try
{
- // Handling the errors during export
+ /*
+ Handling the errors during export
- // Note: we could have lost the connection and another thread
- // the listener one) has already managed to reconnect.
- // So we MUST rely on the test broker.isConnected()
- // ONLY to do 'wait to be reconnected by another thread'
- // (if not yet reconnected already).
-
+ Note: we could have lost the connection and another thread
+ the listener one) has already managed to reconnect.
+ So we MUST rely on the test broker.isConnected()
+ ONLY to do 'wait to be reconnected by another thread'
+ (if not yet reconnected already).
+ */
if (!broker.isConnected())
{
// We are still disconnected, so we wait for the listener thread
@@ -1550,14 +1515,16 @@
if ((initTask != null) && broker.isConnected() &&
(serverToInitialize != RoutableMsg.ALL_SERVERS))
{
- // NewAttempt case : In the case where
- // - it's not an InitializeAll
- // - AND the previous export attempt failed
- // - AND we are (now) connected
- // - and we own the task and this task is not an InitializeAll
- // Let's :
- // - sleep to let time to the other peer to reconnect if needed
- // - and launch another attempt
+ /*
+ NewAttempt case : In the case where
+ - it's not an InitializeAll
+ - AND the previous export attempt failed
+ - AND we are (now) connected
+ - and we own the task and this task is not an InitializeAll
+ Let's :
+ - sleep to let time to the other peer to reconnect if needed
+ - and launch another attempt
+ */
try { Thread.sleep(1000); } catch(Exception e){}
logError(NOTE_RESENDING_INIT_TARGET.get(
exportRootException.getLocalizedMessage()));
@@ -1632,8 +1599,7 @@
int waitResultAttempt = 0;
Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
- for (Integer sid : ieContext.startList)
- replicasWeAreWaitingFor.add(sid);
+ replicasWeAreWaitingFor.addAll(ieContext.startList);
if (debugEnabled())
TRACER.debugInfo(
@@ -1657,8 +1623,11 @@
{
// this one is still not doing the Full Update ... retry later
done = false;
- try
- { Thread.sleep(100); } catch (InterruptedException e) {}
+ try { Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
waitResultAttempt++;
break;
}
@@ -1673,8 +1642,7 @@
while ((!done) && (waitResultAttempt<1200) // 2mn
&& (!broker.shuttingDown()));
- ieContext.failureList.addAll(
- Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
+ ieContext.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
@@ -1697,9 +1665,11 @@
TRACER.debugInfo(
"[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
- // In case some new servers appear during the init, we want them to be
- // considered in the processing of sorting the successfully initialized
- // and the others
+ /*
+ In case some new servers appear during the init, we want them to be
+ considered in the processing of sorting the successfully initialized
+ and the others
+ */
for (DSInfo dsi : getReplicasList())
replicasWeAreWaitingFor.add(dsi.getDsId());
@@ -1709,22 +1679,25 @@
done = true;
short reconnectMaxDelayInSec = 10;
short reconnectWait = 0;
- Integer[] servers = replicasWeAreWaitingFor.toArray(new Integer[0]);
- for (int serverId : servers)
+ for (int serverId : replicasWeAreWaitingFor)
{
if (ieContext.failureList.contains(serverId))
{
- // this server has already been in error during initialization
- // dont't wait for it
+ /*
+ this server has already been in error during initialization
+ don't wait for it
+ */
continue;
}
DSInfo dsInfo = isRemoteDSConnected(serverId);
if (dsInfo == null)
{
- // this server is disconnected
- // may be for a long time if it crashed or had been stopped
- // may be just the time to reconnect after import : should be short
+ /*
+ this server is disconnected
+ may be for a long time if it crashed or had been stopped
+ may be just the time to reconnect after import : should be short
+ */
if (++reconnectWait<reconnectMaxDelayInSec)
{
// let's still wait to give a chance to this server to reconnect
@@ -1764,8 +1737,7 @@
}
while ((!done) && (!broker.shuttingDown())); // infinite wait
- ieContext.failureList.addAll(
- Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
+ ieContext.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
@@ -1839,8 +1811,10 @@
}
else
{
- // When we are the exporter in the case of initializeAll
- // exporting must not be stopped on the first error.
+ /*
+ When we are the exporter in the case of initializeAll
+ exporting must not be stopped on the first error.
+ */
}
}
}
@@ -1889,7 +1863,7 @@
}
}
- // Check good sequentiality of msg received
+ // Check good ordering of msg received
if (msg instanceof EntryMsg)
{
EntryMsg entryMsg = (EntryMsg)msg;
@@ -1899,7 +1873,7 @@
if (ieContext.exporterProtocolVersion >=
ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- // check the msgCnt of the msg received to check sequenciality
+ // check the msgCnt of the msg received to check ordering
if (++ieContext.msgCnt != entryMsg.getMsgId())
{
if (ieContext.getException() == null)
@@ -1928,16 +1902,20 @@
}
else if (msg instanceof DoneMsg)
{
- // This is the normal termination of the import
- // No error is stored and the import is ended
- // by returning null
+ /*
+ This is the normal termination of the import
+ No error is stored and the import is ended
+ by returning null
+ */
return null;
}
else if (msg instanceof ErrorMsg)
{
- // This is an error termination during the import
- // The error is stored and the import is ended
- // by returning null
+ /*
+ This is an error termination during the import
+ The error is stored and the import is ended
+ by returning null
+ */
if (ieContext.getException() == null)
{
ErrorMsg errMsg = (ErrorMsg)msg;
@@ -1971,7 +1949,6 @@
}
catch(Exception e)
{
- // TODO: i18n
if (ieContext.getException() == null)
ieContext.setException(new DirectoryException(ResultCode.OTHER,
ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
@@ -1984,7 +1961,7 @@
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
- * @param entryBytes
+ * @param entryBytes the set of bytes containing one or more entries.
* @return The number of entries in the provided byte[].
*/
private int countEntryLimits(byte[] entryBytes)
@@ -1997,7 +1974,7 @@
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
- * @param entryBytes
+ * @param entryBytes the set of bytes containing one or more entries.
* @return The number of entries in the provided byte[].
*/
private int countEntryLimits(byte[] entryBytes, int pos, int length)
@@ -2029,7 +2006,8 @@
throws IOException
{
if (debugEnabled())
- TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" +
+ Arrays.toString(lDIFEntry));
// build the message
EntryMsg entryMessage = new EntryMsg(
@@ -2039,9 +2017,11 @@
// Waiting the slowest loop
while (!broker.shuttingDown())
{
- // If an error was raised - like receiving an ErrorMsg from a remote
- // server that have been stored by the listener thread in the ieContext,
- // we just abandon the export by throwing an exception.
+ /*
+ If an error was raised - like receiving an ErrorMsg from a remote
+ server that have been stored by the listener thread in the ieContext,
+ we just abandon the export by throwing an exception.
+ */
if (ieContext.getException() != null)
throw(new IOException(ieContext.getException().getMessage()));
@@ -2094,7 +2074,8 @@
} // Waiting the slowest loop
if (debugEnabled())
- TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
+ + Arrays.toString(lDIFEntry));
// publish the message
boolean sent = broker.publish(entryMessage, false);
@@ -2212,18 +2193,22 @@
errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
}
- // We must not test here whether the remote source is connected to
- // the topology by testing if it stands in the replicas list since.
- // In the case of a re-attempt of initialization, the listener thread is
- // running this method directly coming from initailize() method and did
- // not processed any topology message in between the failure and the
- // new attempt.
+ /*
+ We must not test here whether the remote source is connected to
+ the topology by testing if it stands in the replicas list since.
+ In the case of a re-attempt of initialization, the listener thread is
+ running this method directly coming from initialize() method and did
+ not processed any topology message in between the failure and the
+ new attempt.
+ */
try
{
- // We must immediatly acquire a context to store the task inside
- // The context will be used when we (the listener thread) will receive
- // the InitializeTargetMsg, process the import, and at the end
- // update the task.
+ /*
+ We must immediately acquire a context to store the task inside
+ The context will be used when we (the listener thread) will receive
+ the InitializeTargetMsg, process the import, and at the end
+ update the task.
+ */
acquireIEContext(true); //test and set if no import already in progress
ieContext.initializeTask = initTask;
@@ -2234,11 +2219,13 @@
// Publish Init request msg
broker.publish(ieContext.initReqMsgSent);
- // The normal success processing is now to receive InitTargetMsg then
- // entries from the remote server.
- // The error cases are :
- // - either local error immediatly caught below
- // - a remote error we will receive as an ErrorMsg
+ /*
+ The normal success processing is now to receive InitTargetMsg then
+ entries from the remote server.
+ The error cases are :
+ - either local error immediately caught below
+ - a remote error we will receive as an ErrorMsg
+ */
}
catch(DirectoryException de)
{
@@ -2272,15 +2259,15 @@
*
* @param initTargetMsgReceived The message received from the remote server.
*
- * @param requestorServerId The serverId of the server that requested the
+ * @param requesterServerId The serverId of the server that requested the
* initialization meaning the server where the
* task has initially been created (this server,
* or the remote server).
*/
void initialize(InitializeTargetMsg initTargetMsgReceived,
- int requestorServerId)
+ int requesterServerId)
{
- InitializeTask initFromtask = null;
+ InitializeTask initFromTask = null;
if (debugEnabled())
TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
@@ -2300,16 +2287,20 @@
// Acquire an import context if no already done (and initialize).
if (initTargetMsgReceived.getInitiatorID() == this.serverID)
{
- // The initTargetMsgReceived received is the answer to a request that
- // we (this server) sent previously. In this case, so the IEContext
- // has been already acquired when the request was published in order
- // to store the task (to be updated with the status at the end).
+ /*
+ The initTargetMsgReceived received is the answer to a request that
+ we (this server) sent previously. In this case, so the IEContext
+ has been already acquired when the request was published in order
+ to store the task (to be updated with the status at the end).
+ */
}
else
{
- // The initTargetMsgReceived is for an import initiated by the remote
- // server.
- // Test and set if no import already in progress
+ /*
+ The initTargetMsgReceived is for an import initiated by the remote
+ server.
+ Test and set if no import already in progress
+ */
acquireIEContext(true);
}
@@ -2319,16 +2310,18 @@
ieContext.initWindow = initTargetMsgReceived.getInitWindow();
// Protocol version is -1 when not known.
ieContext.exporterProtocolVersion = getProtocolVersion(source);
- initFromtask = (InitializeTask)ieContext.initializeTask;
+ initFromTask = (InitializeTask)ieContext.initializeTask;
- // Lauch the import
+ // Launch the import
importBackend(new ReplInputStream(this));
}
catch (DirectoryException e)
{
- // Store the exception raised. It will be considered if no other exception
- // has been previously stored in the context
+ /*
+ Store the exception raised. It will be considered if no other exception
+ has been previously stored in the context
+ */
if (ieContext.getException() == null)
ieContext.setException(e);
}
@@ -2339,30 +2332,37 @@
+ " ends import with exception=" + ieContext.getException()
+ " connected=" + broker.isConnected());
- // It is necessary to restart (reconnect to RS) for different reasons
- // - when everything went well, reconnect in order to exchange
- // new state, new generation ID
- // - when we have connection failure, reconnect to retry a new import
- // right here, right now
- // we never want retryOnFailure if we fails reconnecting in the restart.
+ /*
+ It is necessary to restart (reconnect to RS) for different reasons
+ - when everything went well, reconnect in order to exchange
+ new state, new generation ID
+ - when we have connection failure, reconnect to retry a new import
+ right here, right now
+ we never want retryOnFailure if we fails reconnecting in the restart.
+ */
broker.reStart(false);
if (ieContext.getException() != null)
{
- if (broker.isConnected() && (initFromtask != null)
+ if (broker.isConnected() && (initFromTask != null)
&& (++ieContext.attemptCnt<2))
{
- // Worth a new attempt
- // since initFromtask is in this server, connection is ok
+ /*
+ Worth a new attempt
+ since initFromTask is in this server, connection is ok
+ */
try
{
-
- // Wait for the exporter to stabilize - eventually reconnect as
- // well if it was connected to the same RS than the one we lost ...
+ /*
+ Wait for the exporter to stabilize - eventually reconnect as
+ well if it was connected to the same RS than the one we lost ...
+ */
Thread.sleep(1000);
- // Restart the whole import protocol exchange by sending again
- // the request
+ /*
+ Restart the whole import protocol exchange by sending again
+ the request
+ */
logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
ieContext.getException().getLocalizedMessage()));
@@ -2378,8 +2378,10 @@
}
catch(Exception e)
{
- // An error occurs when sending a new request for a new import.
- // This error is not stored, prefering to keep the initial one.
+ /*
+ An error occurs when sending a new request for a new import.
+ This error is not stored, prefering to keep the initial one.
+ */
logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
e.getLocalizedMessage(),
ieContext.getException().getLocalizedMessage()));
@@ -2394,7 +2396,7 @@
TRACER.debugInfo("[IE] Domain=" + this
+ " ends initialization with exception=" + ieContext.getException()
+ " connected=" + broker.isConnected()
- + " task=" + initFromtask
+ + " task=" + initFromTask
+ " attempt=" + ieContext.attemptCnt);
try
@@ -2402,24 +2404,28 @@
if (broker.isConnected() && (ieContext.getException() != null))
{
// Let's notify the exporter
- ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
+ ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
ieContext.getException().getMessageObject());
broker.publish(errorMsg);
}
else // !broker.isConnected()
{
- // Don't try to reconnect here.
- // The current running thread is the listener thread and will loop on
- // receive() that is expected to manage reconnects attempt.
+ /*
+ Don't try to reconnect here.
+ The current running thread is the listener thread and will loop on
+ receive() that is expected to manage reconnects attempt.
+ */
}
- // Update the task that initiated the import must be the last thing.
- // Particularly, broker.restart() after import success must be done
- // before some other operations/tasks to be launched,
- // like resetting the generation ID.
- if (initFromtask != null)
+ /*
+ Update the task that initiated the import must be the last thing.
+ Particularly, broker.restart() after import success must be done
+ before some other operations/tasks to be launched,
+ like resetting the generation ID.
+ */
+ if (initFromTask != null)
{
- initFromtask.updateTaskCompletionState(ieContext.getException());
+ initFromTask.updateTaskCompletionState(ieContext.getException());
}
}
finally
@@ -2435,10 +2441,10 @@
}
/**
- * Return the protocol version of the DS related to the provided serverid.
+ * Return the protocol version of the DS related to the provided serverId.
* Returns -1 when the protocol version is not known.
- * @param dsServerId The provided serverid.
- * @return The procotol version.
+ * @param dsServerId The provided serverId.
+ * @return The protocol version.
*/
short getProtocolVersion(int dsServerId)
{
@@ -2515,11 +2521,11 @@
private void checkGenerationID(long generationID)
throws DirectoryException
{
- boolean allset = true;
+ boolean allSet = true;
for (int i = 0; i< 50; i++)
{
- allset = true;
+ allSet = true;
for (RSInfo rsInfo : getRsList())
{
// the 'empty' RSes (generationId==-1) are considered as good citizens
@@ -2532,16 +2538,16 @@
} catch (InterruptedException e)
{
}
- allset = false;
+ allSet = false;
break;
}
}
- if (allset)
+ if (allSet)
{
break;
}
}
- if (!allset)
+ if (!allSet)
{
ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
@@ -2735,9 +2741,11 @@
*/
void processUpdateDoneSynchronous(UpdateMsg msg)
{
- // Warning: in synchronous mode, no way to tell the replay of an update went
- // wrong Just put null in processUpdateDone so that if assured replication
- // is used the ack is sent without error at replay flag.
+ /*
+ Warning: in synchronous mode, no way to tell the replay of an update went
+ wrong Just put null in processUpdateDone so that if assured replication
+ is used the ack is sent without error at replay flag.
+ */
processUpdateDone(msg, null);
state.update(msg.getChangeNumber());
}
@@ -2749,10 +2757,7 @@
*/
public boolean isConnected()
{
- if (broker != null)
- return broker.isConnected();
- else
- return false;
+ return broker != null && broker.isConnected();
}
/**
@@ -2764,10 +2769,7 @@
*/
public boolean hasConnectionError()
{
- if (broker != null)
- return broker.hasConnectionError();
- else
- return true;
+ return broker == null || broker.hasConnectionError();
}
/**
@@ -2852,24 +2854,16 @@
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged per server.
- * @return The number of updates sent in assured safe read mode that have not
- * been acknowledged per server.
+ * @return A copy of the map that contains the number of updates sent in
+ * assured safe read mode that have not been acknowledged per server.
*/
public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
{
- // Clone a snapshot with synchronized section to have a consistent view in
- // monitoring
- Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
synchronized(assuredSrServerNotAcknowledgedUpdates)
{
- Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
- for (Integer serverId : keySet)
- {
- Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
- snapshot.put(serverId, i);
- }
+ return new HashMap<Integer, Integer>(
+ assuredSrServerNotAcknowledgedUpdates);
}
- return snapshot;
}
/**
@@ -2937,24 +2931,16 @@
/**
* Gets the number of updates sent in assured safe data mode that have not
* been acknowledged due to timeout error per server.
- * @return The number of updates sent in assured safe data mode that have not
- * been acknowledged due to timeout error per server.
+ * @return A copy of the map that contains the number of updates sent in
+ * assured safe data mode that have not been acknowledged due to timeout
+ * error per server.
*/
public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
{
- // Clone a snapshot with synchronized section to have a consistent view in
- // monitoring
- Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
synchronized(assuredSdServerTimeoutUpdates)
{
- Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet();
- for (Integer serverId : keySet)
- {
- Integer i = assuredSdServerTimeoutUpdates.get(serverId);
- snapshot.put(serverId, i);
- }
+ return new HashMap<Integer, Integer>(assuredSdServerTimeoutUpdates);
}
- return snapshot;
}
/**
@@ -2981,14 +2967,20 @@
assuredSrTimeoutUpdates = new AtomicInteger(0);
assuredSrWrongStatusUpdates = new AtomicInteger(0);
assuredSrReplayErrorUpdates = new AtomicInteger(0);
- assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>();
+ synchronized (assuredSrServerNotAcknowledgedUpdates)
+ {
+ assuredSrServerNotAcknowledgedUpdates.clear();
+ }
assuredSrReceivedUpdates = new AtomicInteger(0);
assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
assuredSdSentUpdates = new AtomicInteger(0);
assuredSdAcknowledgedUpdates = new AtomicInteger(0);
assuredSdTimeoutUpdates = new AtomicInteger(0);
- assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>();
+ synchronized (assuredSdServerTimeoutUpdates)
+ {
+ assuredSdServerTimeoutUpdates.clear();
+ }
}
/*
@@ -3080,8 +3072,10 @@
{
synchronized (sessionLock)
{
- // Stop the broker first in order to prevent the listener from
- // reconnecting - see OPENDJ-457.
+ /*
+ Stop the broker first in order to prevent the listener from
+ reconnecting - see OPENDJ-457.
+ */
if (broker != null)
{
broker.stop();
@@ -3263,8 +3257,10 @@
{
broker.updateWindowAfterReplay();
- // Send an ack if it was requested and the group id is the same of the RS
- // one. Only Safe Read mode makes sense in DS for returning an ack.
+ /*
+ Send an ack if it was requested and the group id is the same of the RS
+ one. Only Safe Read mode makes sense in DS for returning an ack.
+ */
byte rsGroupId = broker.getRsGroupId();
if (msg.isAssured())
{
@@ -3282,9 +3278,9 @@
if (replayErrorMsg != null)
{
// Mark the error in the ack
- // -> replay error occured
+ // -> replay error occurred
ackMsg.setHasReplayError(true);
- // -> replay error occured in our server
+ // -> replay error occurred in our server
List<Integer> idList = new ArrayList<Integer>();
idList.add(serverID);
ackMsg.setFailedServers(idList);
@@ -3306,8 +3302,10 @@
logError(errorMsg);
} else
{
- // In safe data mode assured update that comes up to a DS requires no
- // ack from a destinator DS. Safe data mode is based on RS acks only
+ /*
+ In safe data mode assured update that comes up to a DS requires no
+ ack from a recipient DS. Safe data mode is based on RS acks only
+ */
}
}
}
@@ -3343,7 +3341,7 @@
* If assured configured, set message accordingly to request an ack in the
* right assured mode.
* No ack requested for a RS with a different group id. Assured
- * replication suported for the same locality, i.e: a topology working in
+ * replication supported for the same locality, i.e: a topology working in
* the same
* geographical location). If we are connected to a RS which is not in our
* locality, no need to ask for an ack.
@@ -3355,12 +3353,11 @@
if (assuredMode == AssuredMode.SAFE_DATA_MODE)
msg.setSafeDataLevel(assuredSdLevel);
- // Add the assured message to the list of update that are
- // waiting for acks
- synchronized (waitingAckMsgs)
- {
- waitingAckMsgs.put(msg.getChangeNumber(), msg);
- }
+ /*
+ Add the assured message to the list of update that are
+ waiting for acks
+ */
+ waitingAckMsgs.put(msg.getChangeNumber(), msg);
}
}
@@ -3410,8 +3407,10 @@
{
try
{
- // WARNING: this timeout may be difficult to optimize: too low, it
- // may use too much CPU, too high, it may penalize performance...
+ /*
+ WARNING: this timeout may be difficult to optimize: too low, it
+ may use too much CPU, too high, it may penalize performance...
+ */
msg.wait(10);
} catch (InterruptedException e)
{
@@ -3425,14 +3424,13 @@
// Timeout ?
if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
{
- // Timeout occured, be sure that ack is not being received and if so,
- // remove the update from the wait list, log the timeout error and
- // also update assured monitoring counters
+ /*
+ Timeout occurred, be sure that ack is not being received and if so,
+ remove the update from the wait list, log the timeout error and
+ also update assured monitoring counters
+ */
UpdateMsg update;
- synchronized (waitingAckMsgs)
- {
- update = waitingAckMsgs.remove(cn);
- }
+ update = waitingAckMsgs.remove(cn);
if (update != null)
{
@@ -3490,9 +3488,9 @@
}
/**
- * Publish informations to the Replication Service (not assured mode).
+ * Publish information to the Replication Service (not assured mode).
*
- * @param msg The byte array containing the informations that should
+ * @param msg The byte array containing the information that should
* be sent to the remote entities.
*/
public void publish(byte[] msg)
@@ -3501,10 +3499,11 @@
synchronized (this)
{
update = new UpdateMsg(generator.newChangeNumber(), msg);
-
- // If assured replication is configured, this will prepare blocking
- // mechanism. If assured replication is disabled, this returns
- // immediately
+ /*
+ If assured replication is configured, this will prepare blocking
+ mechanism. If assured replication is disabled, this returns
+ immediately
+ */
prepareWaitForAckIfAssuredEnabled(update);
publish(update);
@@ -3512,16 +3511,18 @@
try
{
- // If assured replication is enabled, this will wait for the matching
- // ack or time out. If assured replication is disabled, this returns
- // immediately
+ /*
+ If assured replication is enabled, this will wait for the matching
+ ack or time out. If assured replication is disabled, this returns
+ immediately
+ */
waitForAckIfAssuredEnabled(update);
} catch (TimeoutException ex)
{
// This exception may only be raised if assured replication is
// enabled
Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
- assuredTimeout), msg.toString());
+ assuredTimeout), update.toString());
logError(errorMsg);
}
}
@@ -3557,10 +3558,7 @@
*/
public boolean importInProgress()
{
- if (ieContext == null)
- return false;
- else
- return ieContext.importInProgress;
+ return ieContext != null && ieContext.importInProgress;
}
/**
@@ -3572,10 +3570,7 @@
*/
public boolean exportInProgress()
{
- if (ieContext == null)
- return false;
- else
- return !ieContext.importInProgress;
+ return ieContext != null && !ieContext.importInProgress;
}
/**
--
Gitblit v1.10.0