From 55065c7531e93a725b02dc619f6c526228e768ce Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 23 Oct 2013 14:19:46 +0000
Subject: [PATCH] LDAPReplicationDomain.java: Replaced instance fields with directly storing and using the ReplicationDomainCfg object.
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 313 ++++++++++++++++++++++-----------------------------
1 files changed, 135 insertions(+), 178 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 4cb07d3..18d1730 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -41,6 +41,7 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
+import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
@@ -68,8 +69,7 @@
* The startup phase of the ReplicationDomain subclass,
* should read the list of replication servers from the configuration,
* instantiate a {@link ServerState} then start the publish service
- * by calling
- * {@link #startPublishService(Set, int, long, long)}.
+ * by calling {@link #startPublishService(ReplicationDomainCfg)}.
* At this point it can start calling the {@link #publish(UpdateMsg)}
* method if needed.
* <p>
@@ -274,7 +274,7 @@
* - and each initialized/importer DS that publishes acknowledges each
* WINDOW/2 data msg received.
*/
- protected int initWindow = 100;
+ protected final int initWindow;
/* Status related monitoring fields */
@@ -304,8 +304,7 @@
private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
new HashMap<Integer, Set<String>>();
- private Set<String> eclIncludesForDeletesAllServers = Collections
- .emptySet();
+ private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
/**
* An object used to protect the initialization of the underlying broker
@@ -363,6 +362,7 @@
{
this.baseDN = baseDN;
this.serverID = serverID;
+ this.initWindow = 100;
this.state = serverState;
this.generator = new CSNGenerator(serverID, state);
@@ -1060,7 +1060,7 @@
public void run()
{
if (debugEnabled())
- TRACER.debugInfo("[IE] starting " + this.getName());
+ TRACER.debugInfo("[IE] starting " + getName());
try
{
initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
@@ -1075,7 +1075,7 @@
}
if (debugEnabled())
- TRACER.debugInfo("[IE] ending " + this.getName());
+ TRACER.debugInfo("[IE] ending " + getName());
}
}
@@ -1313,7 +1313,7 @@
*/
public int decodeTarget(String targetString) throws DirectoryException
{
- if (targetString.equalsIgnoreCase("all"))
+ if ("all".equalsIgnoreCase(targetString))
{
return RoutableMsg.ALL_SERVERS;
}
@@ -1612,7 +1612,7 @@
"[IE] wait for start dsid " + dsi.getDsId()
+ " " + dsi.getStatus()
+ " " + dsi.getGenerationId()
- + " " + this.getGenerationID());
+ + " " + getGenerationID());
if (ieContext.startList.contains(dsi.getDsId()))
{
if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
@@ -1711,7 +1711,7 @@
}
else
{
- if (dsInfo.getGenerationId() == this.getGenerationID())
+ if (dsInfo.getGenerationId() == getGenerationID())
{ // and with the expected generationId
// We're done with this server
it.remove();
@@ -1757,8 +1757,7 @@
{
// Rejects 2 simultaneous exports
Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
- throw new DirectoryException(ResultCode.OTHER,
- message);
+ throw new DirectoryException(ResultCode.OTHER, message);
}
ieContext = new IEContext(importInProgress);
@@ -1777,34 +1776,30 @@
*/
private void processErrorMsg(ErrorMsg errorMsg)
{
- if (ieContext != null)
+ //Exporting must not be stopped on the first error, if we run initialize-all
+ if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
{
- /*
- Exporting must not be stopped on the first error, if we
- run initialize-all.
- */
- if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
+ // The ErrorMsg is received while we have started an initialization
+ if (ieContext.getException() == null)
{
- // The ErrorMsg is received while we have started an initialization
- if (ieContext.getException() == null)
- ieContext.setException(new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails()));
+ ieContext.setException(
+ new DirectoryException(ResultCode.OTHER, errorMsg.getDetails()));
+ }
- /*
- * This can happen :
- * - on the first InitReqMsg sent when source in not known for example
- * - on the next attempt when source crashed and did not reconnect
- * even after the nextInitAttemptDelay
- * During the import, the ErrorMsg will be received by receiveEntryBytes
- */
- if (ieContext.initializeTask instanceof InitializeTask)
- {
- // Update the task that initiated the import
- ((InitializeTask)ieContext.initializeTask).
- updateTaskCompletionState(ieContext.getException());
+ /*
+ * This can happen :
+ * - on the first InitReqMsg sent when source in not known for example
+ * - on the next attempt when source crashed and did not reconnect
+ * even after the nextInitAttemptDelay
+ * During the import, the ErrorMsg will be received by receiveEntryBytes
+ */
+ if (ieContext.initializeTask instanceof InitializeTask)
+ {
+ // Update the task that initiated the import
+ ((InitializeTask) ieContext.initializeTask)
+ .updateTaskCompletionState(ieContext.getException());
- releaseIEContext();
- }
+ releaseIEContext();
}
}
}
@@ -1894,8 +1889,7 @@
{
/*
This is the normal termination of the import
- No error is stored and the import is ended
- by returning null
+ No error is stored and the import is ended by returning null
*/
return null;
}
@@ -1903,8 +1897,7 @@
{
/*
This is an error termination during the import
- The error is stored and the import is ended
- by returning null
+ The error is stored and the import is ended by returning null
*/
if (ieContext.getException() == null)
{
@@ -1921,8 +1914,8 @@
{
// Other messages received during an import are trashed except
// the topologyMsg.
- if ((msg instanceof TopologyMsg) &&
- (isRemoteDSConnected(ieContext.importSource)==null))
+ if (msg instanceof TopologyMsg
+ && isRemoteDSConnected(ieContext.importSource) == null)
{
Message errMsg =
Message.raw(Category.SYNC, Severity.NOTICE,
@@ -2043,8 +2036,8 @@
catch(Exception e) { /* do nothing */ }
// process any connection error
- if ((broker.hasConnectionError())||
- (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
+ if (broker.hasConnectionError()
+ || broker.getNumLostConnections() != ieContext.initNumLostConnections)
{
// publish failed - store the error in the ieContext ...
DirectoryException de = new DirectoryException(ResultCode.OTHER,
@@ -2485,8 +2478,7 @@
* @throws DirectoryException When the generation ID of the Replication
* Servers is not the expected value.
*/
- private void checkGenerationID(long generationID)
- throws DirectoryException
+ private void checkGenerationID(long generationID) throws DirectoryException
{
boolean allSet = true;
@@ -2535,7 +2527,7 @@
public void resetReplicationLog() throws DirectoryException
{
// Reset the Generation ID to -1 to clean the ReplicationServers.
- resetGenerationId((long)-1);
+ resetGenerationId(-1L);
// check that at least one ReplicationServer did change its generation-id
checkGenerationID(-1);
@@ -2573,43 +2565,35 @@
* @throws DirectoryException When an error occurs
*/
public void resetGenerationId(Long generationIdNewValue)
- throws DirectoryException
+ throws DirectoryException
{
if (debugEnabled())
TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
+ " resetGenerationId " + generationIdNewValue);
- ResetGenerationIdMsg genIdMessage;
-
- if (generationIdNewValue == null)
- {
- genIdMessage = new ResetGenerationIdMsg(this.getGenerationID());
- }
- else
- {
- genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
- }
+ ResetGenerationIdMsg genIdMessage =
+ new ResetGenerationIdMsg(getGenId(generationIdNewValue));
if (!isConnected())
{
- ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
Integer.toString(serverID),
Long.toString(genIdMessage.getGenerationId()));
- throw new DirectoryException(
- resultCode, message);
+ throw new DirectoryException(ResultCode.OTHER, message);
}
broker.publish(genIdMessage);
// check that at least one ReplicationServer did change its generation-id
- if (generationIdNewValue == null)
+ checkGenerationID(getGenId(generationIdNewValue));
+ }
+
+ private long getGenId(Long generationIdNewValue)
+ {
+ if (generationIdNewValue != null)
{
- checkGenerationID(this.getGenerationID());
+ return generationIdNewValue;
}
- else
- {
- checkGenerationID(generationIdNewValue);
- }
+ return getGenerationID();
}
@@ -2945,24 +2929,17 @@
*/
/**
- * Start the publish mechanism of the Replication Service.
- * After this method has been called, the publish service can be used
- * by calling the {@link #publish(UpdateMsg)} method.
+ * Start the publish mechanism of the Replication Service. After this method
+ * has been called, the publish service can be used by calling the
+ * {@link #publish(UpdateMsg)} method.
*
- * @param replicationServers The replication servers that should be used.
- * @param window The window size of this replication domain.
- * @param heartbeatInterval The heartbeatInterval that should be used
- * to check the availability of the replication
- * servers.
- * @param changetimeHeartbeatInterval The interval used to send change
- * time heartbeat to the replication server.
- *
- * @throws ConfigException If the DirectoryServer configuration was
- * incorrect.
+ * @param config
+ * The configuration that should be used.
+ * @throws ConfigException
+ * If the DirectoryServer configuration was incorrect.
*/
- public void startPublishService(Set<String> replicationServers, int window,
- long heartbeatInterval, long changetimeHeartbeatInterval)
- throws ConfigException
+ public void startPublishService(ReplicationDomainCfg config)
+ throws ConfigException
{
synchronized (sessionLock)
{
@@ -2970,15 +2947,8 @@
{
// create the broker object used to publish and receive changes
broker = new ReplicationBroker(
- this, state, baseDN,
- serverID, window,
- getGenerationID(),
- heartbeatInterval,
- new ReplSessionSecurity(),
- getGroupId(),
- changetimeHeartbeatInterval);
-
- broker.start(replicationServers);
+ this, state, config, getGenerationID(), new ReplSessionSecurity());
+ broker.start();
}
}
}
@@ -2990,7 +2960,7 @@
* calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
* <p>
* This method must be called once and must be called after the
- * {@link #startPublishService(Collection, int, long, long)}.
+ * {@link #startPublishService(ReplicationDomainCfg)}.
*/
public void startListenService()
{
@@ -3040,12 +3010,12 @@
* <p>
* The Replication Service will restart from the point indicated by the
* {@link ServerState} that was given as a parameter to the
- * {@link #startPublishService(Collection, int, long, long)}
- * at startup time.
+ * {@link #startPublishService(ReplicationDomainCfg)} at startup time.
+ * <p>
* If some data have changed in the repository during the period of time when
* the Replication Service was disabled, this {@link ServerState} should
- * therefore be updated by the Replication Domain subclass before calling
- * this method. This method is not MT safe.
+ * therefore be updated by the Replication Domain subclass before calling this
+ * method. This method is not MT safe.
*/
public void enableService()
{
@@ -3071,21 +3041,14 @@
/**
* Change some ReplicationDomain parameters.
*
- * @param replicationServers The new set of Replication Servers that this
- * domain should now use.
- * @param windowSize The window size that this domain should use.
- * @param heartbeatInterval The heartbeatInterval that this domain should
- * use.
- * @param groupId The new group id to use
+ * @param config
+ * The new configuration that this domain should now use.
*/
- public void changeConfig(Set<String> replicationServers, int windowSize,
- long heartbeatInterval, byte groupId)
+ public void changeConfig(ReplicationDomainCfg config)
{
- this.groupId = groupId;
+ this.groupId = (byte) config.getGroupId();
- if (broker != null
- && broker.changeConfig(replicationServers, windowSize,
- heartbeatInterval, groupId))
+ if (broker != null && broker.changeConfig(config))
{
disableService();
enableService();
@@ -3195,47 +3158,46 @@
one. Only Safe Read mode makes sense in DS for returning an ack.
*/
byte rsGroupId = broker.getRsGroupId();
- if (msg.isAssured())
+ // Assured feature is supported starting from replication protocol V2
+ if (msg.isAssured()
+ && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
{
- // Assured feature is supported starting from replication protocol V2
- if (broker.getProtocolVersion() >=
- ProtocolVersion.REPLICATION_PROTOCOL_V2)
+ AssuredMode msgAssuredMode = msg.getAssuredMode();
+ if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
{
- AssuredMode msgAssuredMode = msg.getAssuredMode();
- if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
+ if (rsGroupId == groupId)
{
- if (rsGroupId == groupId)
+ // Send the ack
+ AckMsg ackMsg = new AckMsg(msg.getCSN());
+ if (replayErrorMsg != null)
{
- // Send the ack
- AckMsg ackMsg = new AckMsg(msg.getCSN());
- if (replayErrorMsg != null)
- {
- // Mark the error in the ack
- // -> replay error occurred
- ackMsg.setHasReplayError(true);
- // -> replay error occurred in our server
- List<Integer> idList = new ArrayList<Integer>();
- idList.add(serverID);
- ackMsg.setFailedServers(idList);
- }
- broker.publish(ackMsg);
- if (replayErrorMsg != null)
- {
- assuredSrReceivedUpdatesNotAcked.incrementAndGet();
- } else
- {
- assuredSrReceivedUpdatesAcked.incrementAndGet();
- }
+ // Mark the error in the ack
+ // -> replay error occurred
+ ackMsg.setHasReplayError(true);
+ // -> replay error occurred in our server
+ List<Integer> idList = new ArrayList<Integer>();
+ idList.add(serverID);
+ ackMsg.setFailedServers(idList);
}
- } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
- {
- Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
- Integer.toString(serverID), msgAssuredMode.toString(),
- getBaseDNString(), msg.toString());
- logError(errorMsg);
+ broker.publish(ackMsg);
+ if (replayErrorMsg != null)
+ {
+ assuredSrReceivedUpdatesNotAcked.incrementAndGet();
+ }
+ else
+ {
+ assuredSrReceivedUpdatesAcked.incrementAndGet();
+ }
}
- // Nothing to do in Assured safe data mode, only RS ack updates.
}
+ else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+ {
+ Message errorMsg =
+ ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
+ msgAssuredMode.toString(), getBaseDNString(), msg.toString());
+ logError(errorMsg);
+ }
+ // Nothing to do in Assured safe data mode, only RS ack updates.
}
incProcessedUpdates();
@@ -3301,7 +3263,7 @@
{
byte rsGroupId = broker.getRsGroupId();
- // If assured mode configured, wait for acknowledgement for the just sent
+ // If assured mode configured, wait for acknowledgment for the just sent
// message
if (assured && rsGroupId == groupId)
{
@@ -3354,40 +3316,37 @@
remove the update from the wait list, log the timeout error and
also update assured monitoring counters
*/
- UpdateMsg update = waitingAckMsgs.remove(csn);
-
- if (update != null)
- {
- // No luck, this is a real timeout
- // Increment assured replication monitoring counters
- switch (msg.getAssuredMode())
- {
- case SAFE_READ_MODE:
- assuredSrNotAcknowledgedUpdates.incrementAndGet();
- assuredSrTimeoutUpdates.incrementAndGet();
- // Increment number of errors for our RS
- updateAssuredErrorsByServer(
- assuredSrServerNotAcknowledgedUpdates,
- broker.getRsServerId());
- break;
- case SAFE_DATA_MODE:
- assuredSdTimeoutUpdates.incrementAndGet();
- // Increment number of errors for our RS
- updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
- broker.getRsServerId());
- break;
- default:
- // Should not happen
- }
-
- throw new TimeoutException("No ack received for message csn: "
- + csn + " and replication servceID: " + baseDN + " after "
- + assuredTimeout + " ms.");
- } else
+ final UpdateMsg update = waitingAckMsgs.remove(csn);
+ if (update == null)
{
// Ack received just before timeout limit: we can exit
break;
}
+
+ // No luck, this is a real timeout
+ // Increment assured replication monitoring counters
+ switch (msg.getAssuredMode())
+ {
+ case SAFE_READ_MODE:
+ assuredSrNotAcknowledgedUpdates.incrementAndGet();
+ assuredSrTimeoutUpdates.incrementAndGet();
+ // Increment number of errors for our RS
+ updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates,
+ broker.getRsServerId());
+ break;
+ case SAFE_DATA_MODE:
+ assuredSdTimeoutUpdates.incrementAndGet();
+ // Increment number of errors for our RS
+ updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
+ broker.getRsServerId());
+ break;
+ default:
+ // Should not happen
+ }
+
+ throw new TimeoutException("No ack received for message csn: " + csn
+ + " and replication domain: " + baseDN + " after "
+ + assuredTimeout + " ms.");
}
}
}
@@ -3425,8 +3384,7 @@
update = new UpdateMsg(generator.newCSN(), msg);
/*
If assured replication is configured, this will prepare blocking
- mechanism. If assured replication is disabled, this returns
- immediately
+ mechanism. If assured replication is disabled, this returns immediately
*/
prepareWaitForAckIfAssuredEnabled(update);
@@ -3443,8 +3401,7 @@
waitForAckIfAssuredEnabled(update);
} catch (TimeoutException ex)
{
- // This exception may only be raised if assured replication is
- // enabled
+ // This exception may only be raised if assured replication is enabled
Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
Long.toString(assuredTimeout), update.toString());
logError(errorMsg);
--
Gitblit v1.10.0