From cb1bb5d131addd27e2927ec90cc572a8c4d40f80 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 09 Jan 2014 09:52:23 +0000
Subject: [PATCH] Front-port of r10098.
---
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 503 +++++++++++++++++++++++++++----------------------------
1 files changed, 250 insertions(+), 253 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index b01feaa..7f41ec9 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -39,6 +39,7 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
@@ -56,6 +57,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.common.StatusMachine.*;
/**
@@ -100,7 +102,7 @@
* implementation using methods {@link #initializeRemote(int)}
* or {@link #initializeFromRemote(int)}.
* <p>
- * At shutdown time, the {@link #stopDomain()} method should be called to
+ * At shutdown time, the {@link #disableService()} method should be called to
* cleanly stop the replication service.
*/
public abstract class ReplicationDomain
@@ -115,25 +117,21 @@
*/
private static final DebugTracer TRACER = getTracer();
+ /** The configuration of the replication domain. */
+ protected volatile ReplicationDomainCfg config;
/**
- * The baseDN for the Replication Service.
- * All Replication Domain using this baseDN will be connected
- * through the Replication Service.
+ * The assured configuration of the replication domain. It is a duplicate of
+ * {@link #config} because of its update model.
+ *
+ * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
*/
- private final DN baseDN;
-
- /**
- * The identifier of this Replication Domain inside the
- * Replication Service.
- * Each Domain must use a unique ServerID.
- */
- private final int serverID;
+ private volatile ReplicationDomainCfg assuredConfig;
/**
* The ReplicationBroker that is used by this ReplicationDomain to
* connect to the ReplicationService.
*/
- protected ReplicationBroker broker = null;
+ protected ReplicationBroker broker;
/**
* This Map is used to store all outgoing assured messages in order
@@ -158,33 +156,6 @@
private volatile DirectoryThread listenerThread = null;
/**
- * A Map used to store all the ReplicationDomains created on this server.
- */
- private static Map<DN, ReplicationDomain> domains =
- new HashMap<DN, ReplicationDomain>();
-
- /*
- * Assured mode properties
- */
- /** Whether assured mode is enabled for this domain. */
- private boolean assured = false;
- /** Assured sub mode (used when assured is true). */
- private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
- /** Safe Data level (used when assuredMode is SAFE_DATA). */
- private byte assuredSdLevel = 1;
- /** The timeout in ms that should be used, when waiting for assured acks. */
- private long assuredTimeout = 2000;
-
- /** Group id. */
- private byte groupId = 1;
- /**
- * Referrals urls to be published to other servers of the topology.
- * <p>
- * TODO: fill that with all currently opened urls if no urls configured
- */
- private final List<String> refUrls = new ArrayList<String>();
-
- /**
* A set of counters used for Monitoring.
*/
private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
@@ -265,15 +236,6 @@
private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
new HashMap<Integer,Integer>();
- /**
- * Window size used during initialization .. between
- * - the initializer/exporter DS that listens/waits acknowledges and that
- * slows down data msg publishing based on the slowest server
- * - and each initialized/importer DS that publishes acknowledges each
- * WINDOW/2 data msg received.
- */
- protected final int initWindow;
-
/* Status related monitoring fields */
/**
@@ -311,6 +273,12 @@
private final Object sessionLock = new Object();
/**
+ * The generationId for this replication domain. It is made of a hash of the
+ * 1000 first entries for this domain.
+ */
+ protected volatile long generationId;
+
+ /**
* Returns the {@link CSNGenerator} that will be used to
* generate {@link CSN} for this domain.
*
@@ -325,46 +293,39 @@
/**
* Creates a ReplicationDomain with the provided parameters.
*
- * @param baseDN The identifier of the Replication Domain to which
- * this object is participating.
- * @param serverID The identifier of the server that is participating
- * to the Replication Domain.
- * This identifier should be different for each server that
- * is participating to a given Replication Domain.
- * @param initWindow Window used during initialization.
+ * @param config
+ * The configuration object for this ReplicationDomain
+ * @param generationId
+ * the generation of this ReplicationDomain
*/
- public ReplicationDomain(DN baseDN, int serverID, int initWindow)
+ public ReplicationDomain(ReplicationDomainCfg config, long generationId)
{
- this.baseDN = baseDN;
- this.serverID = serverID;
- this.initWindow = initWindow;
+ this.config = config;
+ this.assuredConfig = config;
+ this.generationId = generationId;
this.state = new ServerState();
- this.generator = new CSNGenerator(serverID, state);
-
- domains.put(baseDN, this);
+ this.generator = new CSNGenerator(getServerId(), state);
}
/**
- * Creates a ReplicationDomain with the provided parameters.
- * (for unit test purpose only)
+ * Creates a ReplicationDomain with the provided parameters. (for unit test
+ * purpose only)
*
- * @param baseDN The identifier of the Replication Domain to which
- * this object is participating.
- * @param serverID The identifier of the server that is participating
- * to the Replication Domain.
- * This identifier should be different for each server that
- * is participating to a given Replication Domain.
- * @param serverState The serverState to use
+ * @param config
+ * The configuration object for this ReplicationDomain
+ * @param generationId
+ * the generation of this ReplicationDomain
+ * @param serverState
+ * The serverState to use
*/
- public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
+ public ReplicationDomain(ReplicationDomainCfg config, long generationId,
+ ServerState serverState)
{
- this.baseDN = baseDN;
- this.serverID = serverID;
- this.initWindow = 100;
+ this.config = config;
+ this.assuredConfig = config;
+ this.generationId = generationId;
this.state = serverState;
- this.generator = new CSNGenerator(serverID, state);
-
- domains.put(baseDN, this);
+ this.generator = new CSNGenerator(getServerId(), state);
}
/**
@@ -387,7 +348,7 @@
if (!isValidInitialStatus(initStatus))
{
logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
- getBaseDNString(), Integer.toString(serverID)));
+ getBaseDNString(), Integer.toString(getServerId())));
}
else
{
@@ -406,7 +367,7 @@
private void receiveChangeStatus(ChangeStatusMsg csMsg)
{
if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDN +
+ TRACER.debugInfo("Replication domain " + getBaseDN() +
" received change status message:\n" + csMsg);
ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -416,7 +377,7 @@
if (event == StatusMachineEvent.INVALID_EVENT)
{
logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
- getBaseDNString(), Integer.toString(serverID)));
+ getBaseDNString(), Integer.toString(getServerId())));
return;
}
@@ -468,13 +429,24 @@
}
/**
- * Returns the base DN of this ReplicationDomain.
+ * Returns the current config of this ReplicationDomain.
+ *
+ * @return the config
+ */
+ protected ReplicationDomainCfg getConfig()
+ {
+ return config;
+ }
+
+ /**
+ * Returns the base DN of this ReplicationDomain. All Replication Domain using
+ * this baseDN will be connected through the Replication Service.
*
* @return The base DN of this ReplicationDomain
*/
public DN getBaseDN()
{
- return baseDN;
+ return config.getBaseDN();
}
/**
@@ -484,16 +456,32 @@
*/
public String getBaseDNString()
{
- return baseDN.toNormalizedString();
+ return getBaseDN().toNormalizedString();
}
/**
- * Get the server ID.
+ * Get the server ID. The identifier of this Replication Domain inside the
+ * Replication Service. Each Domain must use a unique ServerID.
+ *
* @return The server ID.
*/
public int getServerId()
{
- return serverID;
+ return config.getServerId();
+ }
+
+ /**
+ * Window size used during initialization .. between - the
+ * initializer/exporter DS that listens/waits acknowledges and that slows down
+ * data msg publishing based on the slowest server - and each
+ * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
+ * received.
+ *
+ * @return the initWindow
+ */
+ protected int getInitWindow()
+ {
+ return config.getInitializationWindowSize();
}
/**
@@ -502,25 +490,38 @@
*/
public boolean isAssured()
{
- return assured;
+ return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
+ || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
}
/**
- * Gives the mode for the assured replication of the domain.
+ * Gives the mode for the assured replication of the domain. Only used when
+ * assured is true).
+ *
* @return The mode for the assured replication of the domain.
*/
public AssuredMode getAssuredMode()
{
- return assuredMode;
+ switch (assuredConfig.getAssuredType())
+ {
+ case SAFE_DATA:
+ case NOT_ASSURED: // The assured mode will be ignored in that case anyway
+ return AssuredMode.SAFE_DATA_MODE;
+ case SAFE_READ:
+ return AssuredMode.SAFE_READ_MODE;
+ }
+ return null; // should never happen
}
/**
- * Gives the assured level of the replication of the domain.
+ * Gives the assured Safe Data level of the replication of the domain. (used
+ * when assuredMode is SAFE_DATA).
+ *
* @return The assured level of the replication of the domain.
*/
public byte getAssuredSdLevel()
{
- return assuredSdLevel;
+ return (byte) assuredConfig.getAssuredSdLevel();
}
/**
@@ -529,7 +530,7 @@
*/
public long getAssuredTimeout()
{
- return assuredTimeout;
+ return assuredConfig.getAssuredTimeout();
}
/**
@@ -538,16 +539,20 @@
*/
public byte getGroupId()
{
- return groupId;
+ return (byte) config.getGroupId();
}
/**
- * Gets the referrals URLs this domain publishes.
+ * Gets the referrals URLs this domain publishes. Referrals urls to be
+ * published to other servers of the topology.
+ * <p>
+ * TODO: fill that with all currently opened urls if no urls configured
+ *
* @return The referrals URLs this domain publishes.
*/
- public List<String> getRefUrls()
+ public Set<String> getRefUrls()
{
- return refUrls;
+ return config.getReferralsUrl();
}
/**
@@ -673,67 +678,6 @@
}
/**
- * Set the list of Referrals that should be returned when an
- * operation needs to be redirected to this server.
- *
- * @param referralsUrl The list of referrals.
- */
- public void setURLs(Set<String> referralsUrl)
- {
- this.refUrls.addAll(referralsUrl);
- }
-
- /**
- * Set the timeout of the assured replication.
- *
- * @param assuredTimeout the timeout of the assured replication.
- */
- public void setAssuredTimeout(long assuredTimeout)
- {
- this.assuredTimeout = assuredTimeout;
- }
-
- /**
- * Sets the groupID.
- *
- * @param groupId The groupID.
- */
- public void setGroupId(byte groupId)
- {
- this.groupId = groupId;
- }
-
- /**
- * Sets the level of assured replication.
- *
- * @param assuredSdLevel The level of assured replication.
- */
- public void setAssuredSdLevel(byte assuredSdLevel)
- {
- this.assuredSdLevel = assuredSdLevel;
- }
-
- /**
- * Sets the assured replication mode.
- *
- * @param dataMode The assured replication mode.
- */
- public void setAssuredMode(AssuredMode dataMode)
- {
- this.assuredMode = dataMode;
- }
-
- /**
- * Sets assured replication.
- *
- * @param assured A boolean indicating if assured replication should be used.
- */
- public void setAssured(boolean assured)
- {
- this.assured = assured;
- }
-
- /**
* Receives an update message from the replicationServer.
* The other types of messages are processed in an opaque way for the caller.
* Also responsible for updating the list of pending changes
@@ -802,8 +746,8 @@
*/
if (debugEnabled())
TRACER.debugInfo(
- "[IE] processErrorMsg:" + this.serverID +
- " baseDN: " + this.baseDN +
+ "[IE] processErrorMsg:" + getServerId() +
+ " baseDN: " + getBaseDN() +
" Error Msg received: " + errorMsg);
if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -873,10 +817,9 @@
}
numRcvdUpdates.incrementAndGet();
- byte rsGroupId = broker.getRsGroupId();
if (update.isAssured()
- && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
- && rsGroupId == groupId)
+ && broker.getRsGroupId() == getGroupId()
+ && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
{
assuredSrReceivedUpdates.incrementAndGet();
}
@@ -949,7 +892,7 @@
requested servers. Log problem
*/
logError(NOTE_DS_RECEIVED_ACK_ERROR.get(
- getBaseDNString(), Integer.toString(serverID),
+ getBaseDNString(), Integer.toString(getServerId()),
update.toString(), ack.errorsToString()));
List<Integer> failedServers = ack.getFailedServers();
@@ -1048,7 +991,7 @@
*/
public ExportThread(int serverIdToInitialize, int initWindow)
{
- super("Export thread from serverId=" + serverID + " to serverId="
+ super("Export thread from serverId=" + getServerId() + " to serverId="
+ serverIdToInitialize);
this.serverIdToInitialize = serverIdToInitialize;
this.initWindow = initWindow;
@@ -1379,7 +1322,7 @@
public void initializeRemote(int target, Task initTask)
throws DirectoryException
{
- initializeRemote(target, this.serverID, initTask, this.initWindow);
+ initializeRemote(target, getServerId(), initTask, getInitWindow());
}
/**
@@ -1418,7 +1361,7 @@
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
- countEntries(), getBaseDNString(), serverID));
+ countEntries(), getBaseDNString(), getServerId()));
for (DSInfo dsi : getReplicasList())
{
@@ -1436,8 +1379,8 @@
}
else
{
- logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
- countEntries(), getBaseDNString(), serverID, serverToInitialize));
+ logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
+ getBaseDNString(), getServerId(), serverToInitialize));
ieContext.startList.add(serverToInitialize);
@@ -1471,7 +1414,7 @@
// Send start message to the peer
InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
- getBaseDN(), serverID, serverToInitialize,
+ getBaseDN(), getServerId(), serverToInitialize,
serverRunningTheTask, ieContext.entryCount, initWindow);
broker.publish(initTargetMsg);
@@ -1492,8 +1435,8 @@
exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
// Notify the peer of the success
- DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
- broker.publish(doneMsg);
+ broker.publish(
+ new DoneMsg(getServerId(), initTargetMsg.getDestination()));
}
catch(DirectoryException exportException)
{
@@ -1595,12 +1538,12 @@
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
- getBaseDNString(), serverID, cause));
+ getBaseDNString(), getServerId(), cause));
}
else
{
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
- getBaseDNString(), serverID, serverToInitialize, cause));
+ getBaseDNString(), getServerId(), serverToInitialize, cause));
}
@@ -1894,10 +1837,8 @@
// send the ack of flow control mgmt
if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
{
- InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
- this.serverID,
- entryMsg.getSenderID(),
- ieContext.msgCnt);
+ final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
+ getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
broker.publish(amsg, false);
if (debugEnabled())
{
@@ -1945,7 +1886,7 @@
Message.raw(Category.SYNC, Severity.NOTICE,
ERR_INIT_EXPORTER_DISCONNECTION.get(
getBaseDNString(),
- Integer.toString(this.serverID),
+ Integer.toString(getServerId()),
Integer.toString(ieContext.importSource)));
ieContext.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, errMsg));
@@ -2017,7 +1958,7 @@
// build the message
EntryMsg entryMessage = new EntryMsg(
- serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
+ getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
++ieContext.msgCnt);
// Waiting the slowest loop
@@ -2219,7 +2160,7 @@
ieContext.initializeTask = initTask;
ieContext.attemptCnt = 0;
ieContext.initReqMsgSent = new InitializeRequestMsg(
- getBaseDN(), serverID, source, this.initWindow);
+ getBaseDN(), getServerId(), source, getInitWindow());
// Publish Init request msg
broker.publish(ieContext.initReqMsgSent);
@@ -2281,14 +2222,14 @@
try
{
// Log starting
- logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
- getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID));
+ logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(),
+ initTargetMsgReceived.getSenderID(), getServerId()));
// Go into full update status
setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
// Acquire an import context if no already done (and initialize).
- if (initTargetMsgReceived.getInitiatorID() != this.serverID)
+ if (initTargetMsgReceived.getInitiatorID() != getServerId())
{
/*
The initTargetMsgReceived is for an import initiated by the remote
@@ -2418,7 +2359,8 @@
finally
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
- getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
+ getBaseDNString(), initTargetMsgReceived.getSenderID(),
+ getServerId(),
(ieContext.getException() == null ? ""
: ieContext.getException().getLocalizedMessage()));
logError(msg);
@@ -2458,7 +2400,7 @@
if (newStatus == ServerStatus.INVALID_STATUS)
{
logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
- Integer.toString(serverID), status.toString(), event.toString()));
+ String.valueOf(getServerId()), status.toString(), event.toString()));
return;
}
@@ -2472,13 +2414,11 @@
resetMonitoringCounters();
}
- // Store new status
status = newStatus;
-
if (debugEnabled())
{
- TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
- + status);
+ TRACER.debugInfo("Replication domain " + getBaseDN()
+ + " new status is: " + status);
}
// Perform whatever actions are needed to apply properties for being
@@ -2560,10 +2500,8 @@
// check that at least one ReplicationServer did change its generation-id
checkGenerationID(-1);
- // Reconnect to the Replication Server so that it adopt our
- // GenerationID.
- disableService();
- enableService();
+ // Reconnect to the Replication Server so that it adopts our GenerationID.
+ restartService();
// wait for the domain to reconnect.
int count = 0;
@@ -2597,8 +2535,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
- + " resetGenerationId " + generationIdNewValue);
+ TRACER.debugInfo("Server id " + getServerId() + " and domain "
+ + getBaseDN() + " resetGenerationId " + generationIdNewValue);
}
ResetGenerationIdMsg genIdMessage =
@@ -2607,7 +2545,7 @@
if (!isConnected())
{
Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
- Integer.toString(serverID),
+ Integer.toString(getServerId()),
Long.toString(genIdMessage.getGenerationId()));
throw new DirectoryException(ResultCode.OTHER, message);
}
@@ -3110,33 +3048,19 @@
}
/**
- * Definitively stops the Replication Service.
- */
- public void stopDomain()
- {
- disableService();
- domains.remove(baseDN);
- }
-
- /**
* Change some ReplicationDomain parameters.
*
* @param config
* The new configuration that this domain should now use.
*/
- public void changeConfig(ReplicationDomainCfg config)
+ protected void changeConfig(ReplicationDomainCfg config)
{
- this.groupId = (byte) config.getGroupId();
-
if (broker != null && broker.changeConfig(config))
{
- disableService();
- enableService();
+ restartService();
}
}
-
-
/**
* Applies a configuration change to the attributes which should be be
* included in the ECL.
@@ -3149,15 +3073,19 @@
public void changeConfig(Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
{
- if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
- && broker != null)
+ final boolean attrsModified = setEclIncludes(
+ getServerId(), includeAttributes, includeAttributesForDeletes);
+ if (attrsModified && broker != null)
{
- disableService();
- enableService();
+ restartService();
}
}
-
+ private void restartService()
+ {
+ disableService();
+ enableService();
+ }
/**
* This method should trigger an export of the replicated data.
@@ -3236,15 +3164,13 @@
Send an ack if it was requested and the group id is the same of the RS
one. Only Safe Read mode makes sense in DS for returning an ack.
*/
- byte rsGroupId = broker.getRsGroupId();
// Assured feature is supported starting from replication protocol V2
if (msg.isAssured()
&& broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
{
- AssuredMode msgAssuredMode = msg.getAssuredMode();
- if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
+ if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
{
- if (rsGroupId == groupId)
+ if (broker.getRsGroupId() == getGroupId())
{
// Send the ack
AckMsg ackMsg = new AckMsg(msg.getCSN());
@@ -3255,7 +3181,7 @@
ackMsg.setHasReplayError(true);
// -> replay error occurred in our server
List<Integer> idList = new ArrayList<Integer>();
- idList.add(serverID);
+ idList.add(getServerId());
ackMsg.setFailedServers(idList);
}
broker.publish(ackMsg);
@@ -3269,10 +3195,11 @@
}
}
}
- else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+ else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
{
- logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
- msgAssuredMode.toString(), getBaseDNString(), msg.toString()));
+ logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()),
+ msg.getAssuredMode().toString(), getBaseDNString(),
+ msg.toString()));
}
// Nothing to do in Assured safe data mode, only RS ack updates.
}
@@ -3303,23 +3230,22 @@
*/
protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
{
- byte rsGroupId = broker.getRsGroupId();
/*
* If assured configured, set message accordingly to request an ack in the
* right assured mode.
- * No ack requested for a RS with a different group id. Assured
- * replication supported for the same locality, i.e: a topology working in
- * the same
- * geographical location). If we are connected to a RS which is not in our
- * locality, no need to ask for an ack.
+ * No ack requested for a RS with a different group id.
+ * Assured replication supported for the same locality,
+ * i.e: a topology working in the same geographical location).
+ * If we are connected to a RS which is not in our locality,
+ * no need to ask for an ack.
*/
- if (assured && rsGroupId == groupId)
+ if (needsAck())
{
msg.setAssured(true);
- msg.setAssuredMode(assuredMode);
- if (assuredMode == AssuredMode.SAFE_DATA_MODE)
+ msg.setAssuredMode(getAssuredMode());
+ if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
{
- msg.setSafeDataLevel(assuredSdLevel);
+ msg.setSafeDataLevel(getAssuredSdLevel());
}
// Add the assured message to the list of update that are waiting for acks
@@ -3327,6 +3253,11 @@
}
}
+ private boolean needsAck()
+ {
+ return isAssured() && broker.getRsGroupId() == getGroupId();
+ }
+
/**
* Wait for the processing of an assured message after it has been sent, if
* assured replication is configured, otherwise, do nothing.
@@ -3340,14 +3271,10 @@
protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
throws TimeoutException
{
- byte rsGroupId = broker.getRsGroupId();
-
- // If assured mode configured, wait for acknowledgment for the just sent
- // message
- if (assured && rsGroupId == groupId)
+ if (needsAck())
{
// Increment assured replication monitoring counters
- switch (assuredMode)
+ switch (getAssuredMode())
{
case SAFE_READ_MODE:
assuredSrSentUpdates.incrementAndGet();
@@ -3383,12 +3310,12 @@
if (debugEnabled())
{
TRACER.debugInfo("waitForAck method interrupted for replication " +
- "baseDN: " + baseDN);
+ "baseDN: " + getBaseDN());
}
break;
}
// Timeout ?
- if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+ if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout())
{
/*
Timeout occurred, be sure that ack is not being received and if so,
@@ -3424,8 +3351,8 @@
}
throw new TimeoutException("No ack received for message csn: " + csn
- + " and replication domain: " + baseDN + " after "
- + assuredTimeout + " ms.");
+ + " and replication domain: " + getBaseDN() + " after "
+ + getAssuredTimeout() + " ms.");
}
}
}
@@ -3481,7 +3408,7 @@
{
// This exception may only be raised if assured replication is enabled
logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
- Long.toString(assuredTimeout), update.toString()));
+ Long.toString(getAssuredTimeout()), update.toString()));
}
}
@@ -3493,11 +3420,25 @@
*
* @return The GenerationID.
*/
- public abstract long getGenerationID();
+ public long getGenerationID()
+ {
+ return generationId;
+ }
/**
- * Subclasses should use this method to add additional monitoring
- * information in the ReplicationDomain.
+ * Sets the generationId for this replication domain.
+ *
+ * @param generationId
+ * the generationId to set
+ */
+ public void setGenerationID(long generationId)
+ {
+ this.generationId = generationId;
+ }
+
+ /**
+ * Subclasses should use this method to add additional monitoring information
+ * in the ReplicationDomain.
*
* @return Additional monitoring attributes that will be added in the
* ReplicationDomain monitoring entry.
@@ -3711,13 +3652,69 @@
*/
public CSN getLastLocalChange()
{
- return state.getCSN(serverID);
+ return state.getCSN(getServerId());
+ }
+
+ /**
+ * Gets and stores the assured replication configuration parameters. Returns a
+ * boolean indicating if the passed configuration has changed compared to
+ * previous values and the changes require a reconnection.
+ *
+ * @param config
+ * The configuration object
+ * @param allowReconnection
+ * Tells if one must reconnect if significant changes occurred
+ */
+ protected void readAssuredConfig(ReplicationDomainCfg config,
+ boolean allowReconnection)
+ {
+ // Disconnect if required: changing configuration values before
+ // disconnection would make assured replication used immediately and
+ // disconnection could cause some timeouts error.
+ if (needReconnection(config) && allowReconnection)
+ {
+ disableService();
+
+ assuredConfig = config;
+
+ enableService();
+ }
+ }
+
+ private boolean needReconnection(ReplicationDomainCfg cfg)
+ {
+ final AssuredMode assuredMode = getAssuredMode();
+ switch (cfg.getAssuredType())
+ {
+ case NOT_ASSURED:
+ if (isAssured())
+ {
+ return true;
+ }
+ break;
+ case SAFE_DATA:
+ if (!isAssured() || assuredMode == SAFE_READ_MODE)
+ {
+ return true;
+ }
+ break;
+ case SAFE_READ:
+ if (!isAssured() || assuredMode == SAFE_DATA_MODE)
+ {
+ return true;
+ }
+ break;
+ }
+
+ return isAssured()
+ && assuredMode == SAFE_DATA_MODE
+ && cfg.getAssuredSdLevel() != getAssuredSdLevel();
}
/** {@inheritDoc} */
@Override
public String toString()
{
- return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
+ return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
}
}
--
Gitblit v1.10.0