From 2a1a1bec32261f04e304eea6c7a1f045a3bedba5 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 02 Aug 2013 14:31:00 +0000
Subject: [PATCH] serviceId => baseDN (To make the code less confusing)
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 483 ++++++++++++++++++++++++++---------------------------
1 files changed, 237 insertions(+), 246 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 4505ed6..79b67d2 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -117,11 +117,11 @@
private static final DebugTracer TRACER = getTracer();
/**
- * An identifier for the Replication Service.
- * All Replication Domain using this identifier will be connected
+ * The baseDN for the Replication Service.
+ * All Replication Domain using this baseDN will be connected
* through the Replication Service.
*/
- private final String serviceID;
+ private final String baseDN;
/**
* The identifier of this Replication Domain inside the
@@ -167,19 +167,22 @@
/*
* Assured mode properties
*/
- // Is assured mode enabled or not for this domain ?
+ /** Whether assured mode is enabled for this domain. */
private boolean assured = false;
- // Assured sub mode (used when assured is true)
+ /** Assured sub mode (used when assured is true). */
private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
- // Safe Data level (used when assuredMode is SAFE_DATA)
- private byte assuredSdLevel = (byte)1;
- // The timeout in ms that should be used, when waiting for assured acks
+ /** Safe Data level (used when assuredMode is SAFE_DATA). */
+ private byte assuredSdLevel = 1;
+ /** The timeout in ms that should be used, when waiting for assured acks. */
private long assuredTimeout = 2000;
- // Group id
- private byte groupId = (byte)1;
- // Referrals urls to be published to other servers of the topology
- // TODO: fill that with all currently opened urls if no urls configured
+ /** Group id. */
+ private byte groupId = 1;
+ /**
+ * Referrals urls to be published to other servers of the topology.
+ * <p>
+ * TODO: fill that with all currently opened urls if no urls configured
+ */
private final List<String> refUrls = new ArrayList<String>();
/**
@@ -189,52 +192,77 @@
private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
private AtomicInteger numSentUpdates = new AtomicInteger(0);
- /* Assured replication monitoring counters */
+ /** Assured replication monitoring counters. */
- // Number of updates sent in Assured Mode, Safe Read
+ /** Number of updates sent in Assured Mode, Safe Read. */
private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Read, that have been
- // successfully acknowledged
+ /**
+ * Number of updates sent in Assured Mode, Safe Read, that have been
+ * successfully acknowledged.
+ */
private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Read, that have not been
- // successfully acknowledged (either because of timeout, wrong status or error
- // at replay)
+ /**
+ * Number of updates sent in Assured Mode, Safe Read, that have not been
+ * successfully acknowledged (either because of timeout, wrong status or error
+ * at replay).
+ */
private AtomicInteger assuredSrNotAcknowledgedUpdates =
new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Read, that have not been
- // successfully acknowledged because of timeout
+ /**
+ * Number of updates sent in Assured Mode, Safe Read, that have not been
+ * successfully acknowledged because of timeout.
+ */
private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Read, that have not been
- // successfully acknowledged because of wrong status
+ /**
+ * Number of updates sent in Assured Mode, Safe Read, that have not been
+ * successfully acknowledged because of wrong status.
+ */
private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Read, that have not been
- // successfully acknowledged because of replay error
+ /**
+ * Number of updates sent in Assured Mode, Safe Read, that have not been
+ * successfully acknowledged because of replay error.
+ */
private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0);
- // Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
- // that have not been successfully acknowledged (either because of timeout,
- // wrong status or error at replay) for a particular server (DS or RS). String
- // format: <server id>:<number of failed updates>
+ /**
+ * Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
+ * that have not been successfully acknowledged (either because of timeout,
+ * wrong status or error at replay) for a particular server (DS or RS).
+ * <p>
+ * String format: <server id>:<number of failed updates>
+ */
private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
new HashMap<Integer,Integer>();
- // Number of updates received in Assured Mode, Safe Read request
+ /** Number of updates received in Assured Mode, Safe Read request. */
private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
- // Number of updates received in Assured Mode, Safe Read request that we have
- // acked without errors
+ /**
+ * Number of updates received in Assured Mode, Safe Read request that we have
+ * acked without errors.
+ */
private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
- // Number of updates received in Assured Mode, Safe Read request that we have
- // acked with errors
+ /**
+ * Number of updates received in Assured Mode, Safe Read request that we have
+ * acked with errors.
+ */
private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Data
+ /** Number of updates sent in Assured Mode, Safe Data. */
private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Data, that have been
- // successfully acknowledged
+ /**
+ * Number of updates sent in Assured Mode, Safe Data, that have been
+ * successfully acknowledged.
+ */
private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0);
- // Number of updates sent in Assured Mode, Safe Data, that have not been
- // successfully acknowledged because of timeout
+ /**
+ * Number of updates sent in Assured Mode, Safe Data, that have not been
+ * successfully acknowledged because of timeout.
+ */
private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0);
- // Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
- // that have not been successfully acknowledged because of timeout for a
- // particular RS. String format: <server id>:<number of failed updates>
+ /**
+ * Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
+ * that have not been successfully acknowledged because of timeout for a
+ * particular RS.
+ * <p>
+ * String format: <server id>:<number of failed updates>
+ */
private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
new HashMap<Integer,Integer>();
@@ -299,7 +327,7 @@
/**
* Creates a ReplicationDomain with the provided parameters.
*
- * @param serviceID The identifier of the Replication Domain to which
+ * @param baseDN The identifier of the Replication Domain to which
* this object is participating.
* @param serverID The identifier of the server that is participating
* to the Replication Domain.
@@ -307,42 +335,22 @@
* is participating to a given Replication Domain.
* @param initWindow Window used during initialization.
*/
- public ReplicationDomain(String serviceID, int serverID,int initWindow)
+ public ReplicationDomain(String baseDN, int serverID,int initWindow)
{
- this.serviceID = serviceID;
+ this.baseDN = baseDN;
this.serverID = serverID;
this.initWindow = initWindow;
this.state = new ServerState();
this.generator = new ChangeNumberGenerator(serverID, state);
- domains.put(serviceID, this);
- }
-
- /**
- * Creates a ReplicationDomain with the provided parameters.
- *
- * @param serviceID The identifier of the Replication Domain to which
- * this object is participating.
- * @param serverID The identifier of the server that is participating
- * to the Replication Domain.
- * This identifier should be different for each server that
- * is participating to a given Replication Domain.
- */
- public ReplicationDomain(String serviceID, int serverID)
- {
- this.serviceID = serviceID;
- this.serverID = serverID;
- this.state = new ServerState();
- this.generator = new ChangeNumberGenerator(serverID, state);
-
- domains.put(serviceID, this);
+ domains.put(baseDN, this);
}
/**
* Creates a ReplicationDomain with the provided parameters.
* (for unit test purpose only)
*
- * @param serviceID The identifier of the Replication Domain to which
+ * @param baseDN The identifier of the Replication Domain to which
* this object is participating.
* @param serverID The identifier of the server that is participating
* to the Replication Domain.
@@ -350,15 +358,15 @@
* is participating to a given Replication Domain.
* @param serverState The serverState to use
*/
- public ReplicationDomain(String serviceID, int serverID,
+ public ReplicationDomain(String baseDN, int serverID,
ServerState serverState)
{
- this.serviceID = serviceID;
+ this.baseDN = baseDN;
this.serverID = serverID;
this.state = serverState;
this.generator = new ChangeNumberGenerator(serverID, state);
- domains.put(serviceID, this);
+ domains.put(baseDN, this);
}
/**
@@ -389,7 +397,7 @@
if (!isValidInitialStatus(initStatus))
{
Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
- serviceID, Integer.toString(serverID));
+ baseDN, Integer.toString(serverID));
logError(msg);
} else
{
@@ -408,7 +416,7 @@
private void receiveChangeStatus(ChangeStatusMsg csMsg)
{
if (debugEnabled())
- TRACER.debugInfo("Replication domain " + serviceID +
+ TRACER.debugInfo("Replication domain " + baseDN +
" received change status message:\n" + csMsg);
ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -418,7 +426,7 @@
if (event == StatusMachineEvent.INVALID_EVENT)
{
Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
- serviceID, Integer.toString(serverID));
+ baseDN, Integer.toString(serverID));
logError(msg);
return;
}
@@ -474,13 +482,13 @@
}
/**
- * Gets the identifier of this domain.
+ * Gets the baseDN of this domain.
*
- * @return The identifier for this domain.
+ * @return The baseDN for this domain.
*/
- public String getServiceID()
+ public String getBaseDNString()
{
- return serviceID;
+ return baseDN;
}
/**
@@ -631,8 +639,7 @@
{
if (numProcessedUpdates != null)
return numProcessedUpdates.get();
- else
- return 0;
+ return 0;
}
/**
@@ -644,8 +651,7 @@
{
if (numRcvdUpdates != null)
return numRcvdUpdates.get();
- else
- return 0;
+ return 0;
}
/**
@@ -657,8 +663,7 @@
{
if (numSentUpdates != null)
return numSentUpdates.get();
- else
- return 0;
+ return 0;
}
/**
@@ -745,9 +750,10 @@
return null;
}
- if (debugEnabled())
- if (!(msg instanceof HeartbeatMsg))
- TRACER.debugVerbose("Message received <" + msg + ">");
+ if (debugEnabled() && !(msg instanceof HeartbeatMsg))
+ {
+ TRACER.debugVerbose("Message received <" + msg + ">");
+ }
if (msg instanceof AckMsg)
{
@@ -791,7 +797,7 @@
if (debugEnabled())
TRACER.debugInfo(
"[IE] processErrorMsg:" + this.serverID +
- " serviceID: " + this.serviceID +
+ " baseDN: " + this.baseDN +
" Error Msg received: " + errorMsg);
if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -862,8 +868,9 @@
numRcvdUpdates.incrementAndGet();
byte rsGroupId = broker.getRsGroupId();
- if ( update.isAssured() && (update.getAssuredMode() ==
- AssuredMode.SAFE_READ_MODE) && (rsGroupId == groupId) )
+ if (update.isAssured()
+ && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
+ && rsGroupId == groupId)
{
assuredSrReceivedUpdates.incrementAndGet();
}
@@ -937,7 +944,7 @@
requested servers. Log problem
*/
Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
- serviceID, Integer.toString(serverID),
+ baseDN, Integer.toString(serverID),
update.toString(), ack.errorsToString());
logError(errorMsg);
@@ -1012,8 +1019,8 @@
*/
private class ExportThread extends DirectoryThread
{
- // Id of server that will be initialized
- private final int serverToInitialize;
+ /** Id of server that will be initialized. */
+ private final int serverIdToInitialize;
private final int initWindow;
@@ -1021,17 +1028,17 @@
/**
* Constructor for the ExportThread.
*
- * @param serverToInitialize
+ * @param serverIdToInitialize
* serverId of server that will receive entries
* @param initWindow
* The value of the initialization window for flow control between
* the importer and the exporter.
*/
- public ExportThread(int serverToInitialize, int initWindow)
+ public ExportThread(int serverIdToInitialize, int initWindow)
{
super("Export thread from serverId=" + serverID + " to serverId="
- + serverToInitialize);
- this.serverToInitialize = serverToInitialize;
+ + serverIdToInitialize);
+ this.serverIdToInitialize = serverIdToInitialize;
this.initWindow = initWindow;
}
@@ -1047,7 +1054,7 @@
TRACER.debugInfo("[IE] starting " + this.getName());
try
{
- initializeRemote(serverToInitialize, serverToInitialize, null,
+ initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
initWindow);
} catch (DirectoryException de)
{
@@ -1069,61 +1076,75 @@
*/
protected class IEContext
{
- // The private task that initiated the operation.
- Task initializeTask;
- // The destination in the case of an export
- int exportTarget = RoutableMsg.UNKNOWN_SERVER;
- // The source in the case of an import
- int importSource = RoutableMsg.UNKNOWN_SERVER;
+ /** The private task that initiated the operation. */
+ private Task initializeTask;
+ /** The destination in the case of an export. */
+ private int exportTarget = RoutableMsg.UNKNOWN_SERVER;
+ /** The source in the case of an import. */
+ private int importSource = RoutableMsg.UNKNOWN_SERVER;
- // The total entry count expected to be processed
- long entryCount = 0;
- // The count for the entry not yet processed
- long entryLeftCount = 0;
+ /** The total entry count expected to be processed. */
+ private long entryCount = 0;
+ /** The count for the entry not yet processed. */
+ private long entryLeftCount = 0;
- // Exception raised during the initialization.
- DirectoryException exception = null;
+ /** Exception raised during the initialization. */
+ private DirectoryException exception = null;
- // Whether the context is related to an import or an export.
- boolean importInProgress;
+ /** Whether the context is related to an import or an export. */
+ private boolean importInProgress;
- // Current counter of messages exchanged during the initialization
- int msgCnt = 0;
+ /** Current counter of messages exchanged during the initialization. */
+ private int msgCnt = 0;
- // Number of connections lost when we start the initialization.
- // Will help counting connections lost during initialization,
- int initNumLostConnections = 0;
+ /**
+ * Number of connections lost when we start the initialization. Will help
+ * counting connections lost during initialization,
+ */
+ private int initNumLostConnections = 0;
- // Request message sent when this server has the initializeFromRemote task.
- InitializeRequestMsg initReqMsgSent = null;
+ /**
+ * Request message sent when this server has the initializeFromRemote task.
+ */
+ private InitializeRequestMsg initReqMsgSent = null;
- // Start time of the initialization process. ErrorMsg timestamped
- // before thi startTime will be ignored.
- long startTime;
+ /**
+ * Start time of the initialization process. ErrorMsg timestamped before
+ * this startTime will be ignored.
+ */
+ private long startTime;
- // List fo replicas (DS) connected to the topology when
- // initialization started.
- Set<Integer> startList = new HashSet<Integer>(0);
+ /**
+ * List for replicas (DS) connected to the topology when initialization
+ * started.
+ */
+ private Set<Integer> startList = new HashSet<Integer>(0);
- // List fo replicas (DS) with a failure (disconnected from the topology)
- // since the initialization started.
- Set<Integer> failureList = new HashSet<Integer>(0);
+ /**
+ * List for replicas (DS) with a failure (disconnected from the topology)
+ * since the initialization started.
+ */
+ private Set<Integer> failureList = new HashSet<Integer>(0);
- // Flow control during initialization
- // - for each remote server, counter of messages received
+ /**
+ * Flow control during initialization: for each remote server, counter of
+ * messages received.
+ */
private final HashMap<Integer, Integer> ackVals =
new HashMap<Integer, Integer>();
- // - serverId of the slowest server (the one with the smallest non null
- // counter)
+ /**
+ * ServerId of the slowest server (the one with the smallest non null
+ * counter).
+ */
private int slowestServerId = -1;
- short exporterProtocolVersion = -1;
+ private short exporterProtocolVersion = -1;
- // Window used during this initialization
- int initWindow;
+ /** Window used during this initialization. */
+ private int initWindow;
- // Number of attempt already done for this initialization
- short attemptCnt;
+ /** Number of attempt already done for this initialization. */
+ private short attemptCnt;
/**
* Creates a new IEContext.
@@ -1137,7 +1158,6 @@
this.importInProgress = importInProgress;
this.startTime = System.currentTimeMillis();
this.attemptCnt = 0;
-
}
/**
@@ -1368,7 +1388,7 @@
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
- countEntries(), serviceID, serverID);
+ countEntries(), baseDN, serverID);
logError(msg);
for (DSInfo dsi : getReplicasList())
@@ -1384,7 +1404,7 @@
else
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
- countEntries(), serviceID, serverID, serverToInitialize);
+ countEntries(), baseDN, serverID, serverToInitialize);
logError(msg);
ieContext.startList.add(serverToInitialize);
@@ -1392,8 +1412,8 @@
// We manage the list of servers with which a flow control can be enabled
for (DSInfo dsi : getReplicasList())
{
- if (dsi.getDsId() == serverToInitialize)
- if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ if (dsi.getDsId() == serverToInitialize &&
+ dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
ieContext.setAckVal(dsi.getDsId(), 0);
}
}
@@ -1401,7 +1421,7 @@
// loop for the case where the exporter is the initiator
int attempt = 0;
boolean done = false;
- while ((!done) && (++attempt<2)) // attempt loop
+ while (!done && ++attempt < 2) // attempt loop
{
try
{
@@ -1415,7 +1435,7 @@
// Send start message to the peer
InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
- serviceID, serverID, serverToInitialize, serverRunningTheTask,
+ baseDN, serverID, serverToInitialize, serverRunningTheTask,
ieContext.entryCount, initWindow);
broker.publish(initTargetMsg);
@@ -1475,14 +1495,14 @@
TRACER.debugInfo(
"[IE] Exporter wait for reconnection by the listener thread");
int att=0;
- while ((!broker.shuttingDown()) &&
- (!broker.isConnected())&& (++att<100))
+ while (!broker.shuttingDown() && !broker.isConnected()
+ && ++att < 100)
try { Thread.sleep(100); }
catch(Exception e){ /* do nothing */ }
}
- if ((initTask != null) && broker.isConnected() &&
- (serverToInitialize != RoutableMsg.ALL_SERVERS))
+ if (initTask != null && broker.isConnected()
+ && serverToInitialize != RoutableMsg.ALL_SERVERS)
{
/*
NewAttempt case : In the case where
@@ -1524,13 +1544,12 @@
// Servers that left in the list are those for which we could not test
// that they have been successfully initialized.
- if (!ieContext.failureList.isEmpty())
+ if (!ieContext.failureList.isEmpty() && exportRootException == null)
{
- if (exportRootException == null)
- exportRootException = new DirectoryException(ResultCode.OTHER,
- ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
- Long.toString(getGenerationID()),
- ieContext.failureList.toString()));
+ exportRootException = new DirectoryException(ResultCode.OTHER,
+ ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
+ Long.toString(getGenerationID()),
+ ieContext.failureList.toString()));
}
// Don't forget to release IEcontext acquired at beginning.
@@ -1541,22 +1560,21 @@
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL
- .get(serviceID, serverID, cause);
+ .get(baseDN, serverID, cause);
logError(msg);
}
else
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
- serviceID, serverID, serverToInitialize, cause);
+ baseDN, serverID, serverToInitialize, cause);
logError(msg);
}
if (exportRootException != null)
{
- throw(exportRootException);
+ throw exportRootException;
}
-
}
private String getReplicationMonitorInstanceName()
@@ -1564,10 +1582,10 @@
return broker.getReplicationMonitor().getMonitorInstanceName();
}
- /*
- * For all remote servers in tht start list,
- * - wait it has finished the import and present the expected generationID
- * - build the failureList
+ /**
+ * For all remote servers in the start list:
+ * - wait it has finished the import and present the expected generationID,
+ * - build the failureList.
*/
private void waitForRemoteStartOfInit()
{
@@ -1614,8 +1632,7 @@
}
}
}
- while ((!done) && (waitResultAttempt<1200) // 2mn
- && (!broker.shuttingDown()));
+ while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
ieContext.failureList.addAll(replicasWeAreWaitingFor);
@@ -1624,17 +1641,15 @@
"[IE] wait for start ends with " + ieContext.failureList);
}
- /*
- * For all remote servers in the start list,
- * - wait it has finished the import and present the expected generationID
- * - build the failureList
+ /**
+ * For all remote servers in the start list:
+ * - wait it has finished the import and present the expected generationID,
+ * - build the failureList.
*/
private void waitForRemoteEndOfInit()
{
- Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
-
- for (Integer sid : ieContext.startList)
- replicasWeAreWaitingFor.add(sid);
+ Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(
+ ieContext.startList);
if (debugEnabled())
TRACER.debugInfo(
@@ -1710,7 +1725,7 @@
} // 1sec
}
- while ((!done) && (!broker.shuttingDown())); // infinite wait
+ while (!done && !broker.shuttingDown()); // infinite wait
ieContext.failureList.addAll(replicasWeAreWaitingFor);
@@ -1908,7 +1923,7 @@
Message errMsg =
Message.raw(Category.SYNC, Severity.NOTICE,
ERR_INIT_EXPORTER_DISCONNECTION.get(
- this.serviceID,
+ this.baseDN,
Integer.toString(this.serverID),
Integer.toString(ieContext.importSource)));
if (ieContext.getException()==null)
@@ -1994,7 +2009,7 @@
we just abandon the export by throwing an exception.
*/
if (ieContext.getException() != null)
- throw(new IOException(ieContext.getException().getMessage()));
+ throw new IOException(ieContext.getException().getMessage());
int slowestServerId = ieContext.getSlowestServer();
if (isRemoteDSConnected(slowestServerId)==null)
@@ -2162,7 +2177,7 @@
if (!broker.isConnected())
{
- errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
+ errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString());
}
/*
@@ -2186,7 +2201,7 @@
ieContext.initializeTask = initTask;
ieContext.attemptCnt = 0;
ieContext.initReqMsgSent = new InitializeRequestMsg(
- serviceID, serverID, source, this.initWindow);
+ baseDN, serverID, source, this.initWindow);
// Publish Init request msg
broker.publish(ieContext.initReqMsgSent);
@@ -2217,10 +2232,7 @@
// No need to call here updateTaskCompletionState - will be done
// by the caller
releaseIEContext();
- DirectoryException de = new DirectoryException(
- ResultCode.OTHER,
- errMsg);
- throw (de);
+ throw new DirectoryException(ResultCode.OTHER, errMsg);
}
}
@@ -2250,7 +2262,7 @@
{
// Log starting
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
- serviceID, initTargetMsgReceived.getSenderID(), serverID);
+ baseDN, initTargetMsgReceived.getSenderID(), serverID);
logError(msg);
// Go into full update status
@@ -2305,11 +2317,11 @@
*/
broker.reStart(false);
- if (ieContext.getException() != null)
+ if (ieContext.getException() != null
+ && broker.isConnected()
+ && initFromTask != null
+ && ++ieContext.attemptCnt < 2)
{
- if (broker.isConnected() && (initFromTask != null)
- && (++ieContext.attemptCnt<2))
- {
/*
Worth a new attempt
since initFromTask is in this server, connection is ok
@@ -2343,13 +2355,12 @@
{
/*
An error occurs when sending a new request for a new import.
- This error is not stored, prefering to keep the initial one.
+ This error is not stored, preferring to keep the initial one.
*/
logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
e.getLocalizedMessage(),
ieContext.getException().getLocalizedMessage()));
}
- }
}
// ===================
@@ -2364,7 +2375,7 @@
try
{
- if (broker.isConnected() && (ieContext.getException() != null))
+ if (broker.isConnected() && ieContext.getException() != null)
{
// Let's notify the exporter
ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
@@ -2385,7 +2396,7 @@
finally
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
- serviceID, initTargetMsgReceived.getSenderID(), serverID,
+ baseDN, initTargetMsgReceived.getSenderID(), serverID,
(ieContext.getException() != null ? ieContext
.getException().getLocalizedMessage() : ""));
logError(msg);
@@ -2426,7 +2437,7 @@
if (newStatus == ServerStatus.INVALID_STATUS)
{
- Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(serviceID,
+ Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDN,
Integer.toString(serverID), status.toString(), event.toString());
logError(msg);
return;
@@ -2444,8 +2455,8 @@
status = newStatus;
if (debugEnabled())
- TRACER.debugInfo("Replication domain " + serviceID +
- " new status is: " + status);
+ TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
+ + status);
// Perform whatever actions are needed to apply properties for being
// compliant with new status
@@ -2461,7 +2472,7 @@
*/
public boolean ieRunning()
{
- return (ieContext != null);
+ return ieContext != null;
}
/**
@@ -2483,8 +2494,8 @@
for (RSInfo rsInfo : getRsList())
{
// the 'empty' RSes (generationId==-1) are considered as good citizens
- if ((rsInfo.getGenerationId() != -1) &&
- (rsInfo.getGenerationId() != generationID))
+ if (rsInfo.getGenerationId() != -1 &&
+ rsInfo.getGenerationId() != generationID)
{
try
{
@@ -2505,7 +2516,7 @@
if (!allSet)
{
ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
+ Message message = ERR_RESET_GENERATION_ID_FAILED.get(baseDN);
throw new DirectoryException(
resultCode, message);
}
@@ -2536,7 +2547,7 @@
// wait for the domain to reconnect.
int count = 0;
- while (!isConnected() && (count < 10))
+ while (!isConnected() && count < 10)
{
try
{
@@ -2565,8 +2576,7 @@
throws DirectoryException
{
if (debugEnabled())
- TRACER.debugInfo(
- "Server id " + serverID + " and domain " + serviceID
+ TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
+ " resetGenerationId " + generationIdNewValue);
ResetGenerationIdMsg genIdMessage;
@@ -2583,7 +2593,7 @@
if (!isConnected())
{
ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID,
+ Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(baseDN,
Integer.toString(serverID),
Long.toString(genIdMessage.getGenerationId()));
throw new DirectoryException(
@@ -2620,8 +2630,7 @@
{
if (broker != null)
return broker.getMaxRcvWindow();
- else
- return 0;
+ return 0;
}
/**
@@ -2633,8 +2642,7 @@
{
if (broker != null)
return broker.getCurrentRcvWindow();
- else
- return 0;
+ return 0;
}
/**
@@ -2646,8 +2654,7 @@
{
if (broker != null)
return broker.getMaxSendWindow();
- else
- return 0;
+ return 0;
}
/**
@@ -2659,8 +2666,7 @@
{
if (broker != null)
return broker.getCurrentSendWindow();
- else
- return 0;
+ return 0;
}
/**
@@ -2671,8 +2677,7 @@
{
if (broker != null)
return broker.getNumLostConnections();
- else
- return 0;
+ return 0;
}
/**
@@ -2736,8 +2741,7 @@
{
if (broker != null)
return broker.getReplicationServer();
- else
- return ReplicationBroker.NO_CONNECTED_SERVER;
+ return ReplicationBroker.NO_CONNECTED_SERVER;
}
/**
@@ -2965,11 +2969,9 @@
{
if (broker == null)
{
- /*
- * create the broker object used to publish and receive changes
- */
+ // create the broker object used to publish and receive changes
broker = new ReplicationBroker(
- this, state, serviceID,
+ this, state, baseDN,
serverID, window,
getGenerationID(),
heartbeatInterval,
@@ -2996,7 +2998,6 @@
{
synchronized (sessionLock)
{
- //
// Create the listener thread
listenerThread = new ListenerThread(this);
listenerThread.start();
@@ -3066,7 +3067,7 @@
public void stopDomain()
{
disableService();
- domains.remove(serviceID);
+ domains.remove(baseDN);
}
/**
@@ -3087,14 +3088,12 @@
{
this.groupId = groupId;
- if (broker != null)
+ if (broker != null
+ && broker.changeConfig(replicationServers, windowSize,
+ heartbeatInterval, groupId))
{
- if (broker.changeConfig(
- replicationServers, windowSize, heartbeatInterval, groupId))
- {
- disableService();
- enableService();
- }
+ disableService();
+ enableService();
}
}
@@ -3112,14 +3111,11 @@
public void changeConfig(Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
{
- if (setEclIncludes(serverID, includeAttributes,
- includeAttributesForDeletes))
+ if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
+ && broker != null)
{
- if (broker != null)
- {
- disableService();
- enableService();
- }
+ disableService();
+ enableService();
}
}
@@ -3239,7 +3235,7 @@
} else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
{
Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
- Integer.toString(serverID), msgAssuredMode.toString(), serviceID,
+ Integer.toString(serverID), msgAssuredMode.toString(), baseDN,
msg.toString());
logError(errorMsg);
}
@@ -3283,17 +3279,14 @@
* geographical location). If we are connected to a RS which is not in our
* locality, no need to ask for an ack.
*/
- if (assured && (rsGroupId == groupId))
+ if (assured && rsGroupId == groupId)
{
msg.setAssured(true);
msg.setAssuredMode(assuredMode);
if (assuredMode == AssuredMode.SAFE_DATA_MODE)
msg.setSafeDataLevel(assuredSdLevel);
- /*
- Add the assured message to the list of update that are
- waiting for acks
- */
+ // Add the assured message to the list of update that are waiting for acks
waitingAckMsgs.put(msg.getChangeNumber(), msg);
}
}
@@ -3315,7 +3308,7 @@
// If assured mode configured, wait for acknowledgement for the just sent
// message
- if (assured && (rsGroupId == groupId))
+ if (assured && rsGroupId == groupId)
{
// Increment assured replication monitoring counters
switch (assuredMode)
@@ -3354,7 +3347,7 @@
if (debugEnabled())
{
TRACER.debugInfo("waitForAck method interrupted for replication " +
- "serviceID: " + serviceID);
+ "baseDN: " + baseDN);
}
break;
}
@@ -3394,7 +3387,7 @@
}
throw new TimeoutException("No ack received for message cn: " + cn +
- " and replication servceID: " + serviceID + " after " +
+ " and replication servceID: " + baseDN + " after " +
assuredTimeout + " ms.");
} else
{
@@ -3458,7 +3451,7 @@
{
// This exception may only be raised if assured replication is
// enabled
- Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
+ Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(baseDN, Long.toString(
assuredTimeout), update.toString());
logError(errorMsg);
}
@@ -3521,8 +3514,7 @@
{
if (ieContext != null)
return ieContext.entryLeftCount;
- else
- return 0;
+ return 0;
}
/**
@@ -3548,8 +3540,7 @@
{
if (ieContext != null)
return ieContext.entryCount;
- else
- return 0;
+ return 0;
}
--
Gitblit v1.10.0