From faa556c351fae0bf73a939962426876944dddd25 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Jul 2014 10:19:11 +0000
Subject: [PATCH] Code cleanup.
---
opends/src/server/org/opends/server/replication/server/ServerWriter.java | 135 ++++++------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 294 ++++++++++++++---------------
opends/src/server/org/opends/server/replication/server/ServerReader.java | 148 ++++++++------
3 files changed, 286 insertions(+), 291 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index fbedaa5..434fa85 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,7 +27,6 @@
package org.opends.server.replication.server;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -288,16 +287,15 @@
* Add an update that has been received to the list of
* updates that must be forwarded to all other servers.
*
- * @param update The update that has been received.
+ * @param updateMsg The update that has been received.
* @param sourceHandler The ServerHandler for the server from which the
* update was received
* @throws IOException When an IO exception happens during the update
* processing.
*/
- public void put(UpdateMsg update, ServerHandler sourceHandler)
- throws IOException
+ public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException
{
- sourceHandler.updateServerState(update);
+ sourceHandler.updateServerState(updateMsg);
sourceHandler.incrementInCount();
setGenerationIdIfUnset(sourceHandler.getGenerationId());
@@ -319,81 +317,14 @@
* data centers, but one will request and wait acks only from servers of the
* data center 1.
*/
- boolean assuredMessage = update.isAssured();
- PreparedAssuredInfo preparedAssuredInfo = null;
- if (assuredMessage)
- {
- // Assured feature is supported starting from replication protocol V2
- if (sourceHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V2)
- {
- // According to assured sub-mode, prepare structures to keep track of
- // the acks we are interested in.
- AssuredMode assuredMode = update.getAssuredMode();
- if (assuredMode == AssuredMode.SAFE_DATA_MODE)
- {
- sourceHandler.incrementAssuredSdReceivedUpdates();
- preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
- } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
- {
- sourceHandler.incrementAssuredSrReceivedUpdates();
- preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
- } else
- {
- // Unknown assured mode: should never happen
- Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
- Integer.toString(localReplicationServer.getServerId()),
- assuredMode.toString(), baseDN.toNormalizedString(),
- update.toString());
- logError(errorMsg);
- assuredMessage = false;
- }
- } else
- {
- assuredMessage = false;
- }
- }
+ final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler);
- if (!publishUpdateMsg(update))
+ if (!publishUpdateMsg(updateMsg))
{
return;
}
- List<Integer> expectedServers = null;
- if (assuredMessage)
- {
- expectedServers = preparedAssuredInfo.expectedServers;
- if (expectedServers != null)
- {
- // Store the expected acks info into the global map.
- // The code for processing reception of acks for this update will update
- // info kept in this object and if enough acks received, it will send
- // back the final ack to the requester and remove the object from this
- // map
- // OR
- // The following timer will time out and send an timeout ack to the
- // requester if the acks are not received in time. The timer will also
- // remove the object from this map.
- CSN csn = update.getCSN();
- waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
-
- // Arm timer for this assured update message (wait for acks until it
- // times out)
- AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
- assuredTimeoutTimer.schedule(assuredTimeoutTask,
- localReplicationServer.getAssuredTimeout());
- // Purge timer every 100 treated messages
- assuredTimeoutTimerPurgeCounter++;
- if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
- {
- assuredTimeoutTimer.purge();
- }
- }
- }
-
- if (expectedServers == null)
- {
- expectedServers = Collections.emptyList();
- }
+ final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo);
/**
* The update message equivalent to the originally received update message,
@@ -407,7 +338,8 @@
* references posted on every writer queues. That is why we need a message
* version with assured flag on and another one with assured flag off.
*/
- NotAssuredUpdateMsg notAssuredUpdate = null;
+ final NotAssuredUpdateMsg notAssuredUpdateMsg =
+ preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null;
// Push the message to the replication servers
if (sourceHandler.isDataServer())
@@ -418,80 +350,147 @@
* Ignore updates to RS with bad gen id
* (no system managed status for a RS)
*/
- if (isDifferentGenerationId(rsHandler.getGenerationId()))
+ if (!isDifferentGenerationId(rsHandler, updateMsg))
{
- if (debugEnabled())
- {
- debug("update " + update.getCSN()
- + " will not be sent to replication server "
- + rsHandler.getServerId() + " with generation id "
- + rsHandler.getGenerationId() + " different from local "
- + "generation id " + generationId);
- }
-
- continue;
+ addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
}
-
- notAssuredUpdate = addUpdate(rsHandler, update, notAssuredUpdate,
- assuredMessage, expectedServers);
}
}
// Push the message to the LDAP servers
for (DataServerHandler dsHandler : connectedDSs.values())
{
- // Don't forward the change to the server that just sent it
- if (dsHandler == sourceHandler)
+ // Do not forward the change to the server that just sent it
+ if (dsHandler != sourceHandler
+ && !isUpdateMsgFiltered(updateMsg, dsHandler))
{
- continue;
+ addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
}
-
- /**
- * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
- *
- * The RSD lock should not be taken here as it is acceptable to have a
- * delay between the time the server has a wrong status and the fact we
- * detect it: the updates that succeed to pass during this time will have
- * no impact on remote server. But it is interesting to not saturate
- * uselessly the network if the updates are not necessary so this check to
- * stop sending updates is interesting anyway. Not taking the RSD lock
- * allows to have better performances in normal mode (most of the time).
- */
- ServerStatus dsStatus = dsHandler.getStatus();
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
- || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
- {
- if (debugEnabled())
- {
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
- {
- debug("update " + update.getCSN()
- + " will not be sent to directory server "
- + dsHandler.getServerId() + " with generation id "
- + dsHandler.getGenerationId() + " different from local "
- + "generation id " + generationId);
- }
- if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
- {
- debug("update " + update.getCSN()
- + " will not be sent to directory server "
- + dsHandler.getServerId() + " as it is in full update");
- }
- }
-
- continue;
- }
-
- notAssuredUpdate = addUpdate(dsHandler, update, notAssuredUpdate,
- assuredMessage, expectedServers);
}
// Push the message to the other subscribing handlers
for (MessageHandler mHandler : otherHandlers) {
- mHandler.add(update);
+ mHandler.add(updateMsg);
}
}
+ private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
+ UpdateMsg updateMsg)
+ {
+ final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId());
+ if (isDifferent && debugEnabled())
+ {
+ debug("updateMsg " + updateMsg.getCSN()
+ + " will not be sent to replication server "
+ + rsHandler.getServerId() + " with generation id "
+ + rsHandler.getGenerationId() + " different from local "
+ + "generation id " + generationId);
+ }
+ return isDifferent;
+ }
+
+ /**
+ * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS.
+ * <p>
+ * The RSD lock should not be taken here as it is acceptable to have a delay
+ * between the time the server has a wrong status and the fact we detect it:
+ * the updates that succeed to pass during this time will have no impact on
+ * remote server. But it is interesting to not saturate uselessly the network
+ * if the updates are not necessary so this check to stop sending updates is
+ * interesting anyway. Not taking the RSD lock allows to have better
+ * performances in normal mode (most of the time).
+ */
+ private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler)
+ {
+ final ServerStatus dsStatus = dsHandler.getStatus();
+ if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+ {
+ if (debugEnabled())
+ {
+ debug("updateMsg " + updateMsg.getCSN()
+ + " will not be sent to directory server "
+ + dsHandler.getServerId() + " with generation id "
+ + dsHandler.getGenerationId() + " different from local "
+ + "generation id " + generationId);
+ }
+ return true;
+ }
+ else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+ {
+ if (debugEnabled())
+ {
+ debug("updateMsg " + updateMsg.getCSN()
+ + " will not be sent to directory server "
+ + dsHandler.getServerId() + " as it is in full update");
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg,
+ ServerHandler sourceHandler) throws IOException
+ {
+ // Assured feature is supported starting from replication protocol V2
+ if (!updateMsg.isAssured()
+ || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2)
+ {
+ return null;
+ }
+
+ // According to assured sub-mode, prepare structures to keep track of
+ // the acks we are interested in.
+ switch (updateMsg.getAssuredMode())
+ {
+ case SAFE_DATA_MODE:
+ sourceHandler.incrementAssuredSdReceivedUpdates();
+ return processSafeDataUpdateMsg(updateMsg, sourceHandler);
+
+ case SAFE_READ_MODE:
+ sourceHandler.incrementAssuredSrReceivedUpdates();
+ return processSafeReadUpdateMsg(updateMsg, sourceHandler);
+
+ default:
+ // Unknown assured mode: should never happen
+ logError(ERR_RS_UNKNOWN_ASSURED_MODE.get(
+ Integer.toString(localReplicationServer.getServerId()),
+ updateMsg.getAssuredMode().toString(), baseDN.toNormalizedString(),
+ updateMsg.toString()));
+ return null;
+ }
+ }
+
+ private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo)
+ {
+ List<Integer> expectedServers = null;
+ if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null)
+ {
+ expectedServers = preparedAssuredInfo.expectedServers;
+ // Store the expected acks info into the global map.
+ // The code for processing reception of acks for this update will update
+ // info kept in this object and if enough acks received, it will send
+ // back the final ack to the requester and remove the object from this map
+ // OR
+ // The following timer will time out and send an timeout ack to the
+ // requester if the acks are not received in time. The timer will also
+ // remove the object from this map.
+ final CSN csn = updateMsg.getCSN();
+ waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
+
+ // Arm timer for this assured update message (wait for acks until it times out)
+ final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
+ assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout());
+ // Purge timer every 100 treated messages
+ assuredTimeoutTimerPurgeCounter++;
+ if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
+ {
+ assuredTimeoutTimer.purge();
+ }
+ }
+
+ return expectedServers != null ? expectedServers : Collections.<Integer> emptyList();
+ }
+
private boolean publishUpdateMsg(UpdateMsg updateMsg)
{
try
@@ -538,33 +537,20 @@
}
}
- private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
- UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
- boolean assuredMessage, List<Integer> expectedServers)
- throws UnsupportedEncodingException
+ private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg,
+ NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers)
{
- if (assuredMessage)
+ // Assured mode: post an assured or not assured matching update message
+ // according to what has been computed for the destination server
+ if (notAssuredUpdateMsg != null
+ && !assuredServers.contains(sHandler.getServerId()))
{
- // Assured mode: post an assured or not assured matching update
- // message according to what has been computed for the destination server
- if (expectedServers.contains(sHandler.getServerId()))
- {
- sHandler.add(update);
- }
- else
- {
- if (notAssuredUpdate == null)
- {
- notAssuredUpdate = new NotAssuredUpdateMsg(update);
- }
- sHandler.add(notAssuredUpdate);
- }
+ sHandler.add(notAssuredUpdateMsg);
}
else
{
- sHandler.add(update);
+ sHandler.add(updateMsg);
}
- return notAssuredUpdate;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 6e85261..5684511 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -83,7 +83,6 @@
@Override
public void run()
{
- Message errMessage = null;
if (debugEnabled())
{
TRACER.debugInfo(getName() + " starting");
@@ -93,91 +92,34 @@
* grab all incoming messages and publish them to the
* replicationServerDomain
*/
+ Message errMessage = null;
try
{
while (true)
{
try
{
- ReplicationMsg msg = session.receive();
+ final ReplicationMsg msg = session.receive();
if (debugEnabled())
+ {
TRACER.debugInfo("In " + getName() + " receives " + msg);
+ }
if (msg instanceof AckMsg)
{
handler.checkWindow();
handler.processAck((AckMsg) msg);
- } else if (msg instanceof UpdateMsg)
+ }
+ else if (msg instanceof UpdateMsg)
{
- UpdateMsg updateMsg = (UpdateMsg) msg;
-
- boolean filtered = false;
- // Ignore updates in some cases
- if (handler.isDataServer())
- {
- /**
- * Ignore updates from DS in bad BAD_GENID_STATUS or
- * FULL_UPDATE_STATUS
- *
- * The RSD lock should not be taken here as it is acceptable to
- * have a delay between the time the server has a wrong status and
- * the fact we detect it: the updates that succeed to pass during
- * this time will have no impact on remote server. But it is
- * interesting to not saturate uselessly the network if the
- * updates are not necessary so this check to stop sending updates
- * is interesting anyway. Not taking the RSD lock allows to have
- * better performances in normal mode (most of the time).
- */
- ServerStatus dsStatus = handler.getStatus();
- if (dsStatus == BAD_GEN_ID_STATUS
- || dsStatus == FULL_UPDATE_STATUS)
- {
- long referenceGenerationId = handler.getReferenceGenId();
- if (dsStatus == BAD_GEN_ID_STATUS)
- logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
- handler.getReplicationServerId(),
- updateMsg.getCSN().toString(),
- handler.getBaseDNString(), handler.getServerId(),
- session.getReadableRemoteAddress(),
- handler.getGenerationId(),
- referenceGenerationId));
- if (dsStatus == FULL_UPDATE_STATUS)
- logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
- handler.getReplicationServerId(),
- updateMsg.getCSN().toString(),
- handler.getBaseDNString(), handler.getServerId(),
- session.getReadableRemoteAddress()));
- filtered = true;
- }
- } else
- {
- /**
- * Ignore updates from RS with bad gen id
- * (no system managed status for a RS)
- */
- long referenceGenerationId = handler.getReferenceGenId();
- if (referenceGenerationId > 0
- && referenceGenerationId != handler.getGenerationId())
- {
- logError(
- WARN_IGNORING_UPDATE_FROM_RS.get(
- handler.getReplicationServerId(),
- updateMsg.getCSN().toString(),
- handler.getBaseDNString(),
- handler.getServerId(),
- session.getReadableRemoteAddress(),
- handler.getGenerationId(),
- referenceGenerationId));
- filtered = true;
- }
- }
-
- if (!filtered)
+ final UpdateMsg updateMsg = (UpdateMsg) msg;
+ if (!isUpdateMsgFiltered(updateMsg))
{
handler.put(updateMsg);
}
- } else if (msg instanceof WindowMsg)
+ }
+ else if (msg instanceof WindowMsg)
{
handler.updateWindow((WindowMsg) msg);
}
@@ -301,6 +243,76 @@
}
}
+ /**
+ * Returns whether the update message is filtered in one of those cases:
+ * <ul>
+ * <li>Ignore updates from DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS</li>
+ * <li>Ignore updates from RS with bad gen id</li>
+ * </ul>
+ */
+ private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
+ {
+ if (handler.isDataServer())
+ {
+ /**
+ * Ignore updates from DS in bad BAD_GENID_STATUS or
+ * FULL_UPDATE_STATUS
+ *
+ * The RSD lock should not be taken here as it is acceptable to
+ * have a delay between the time the server has a wrong status and
+ * the fact we detect it: the updates that succeed to pass during
+ * this time will have no impact on remote server. But it is
+ * interesting to not saturate uselessly the network if the
+ * updates are not necessary so this check to stop sending updates
+ * is interesting anyway. Not taking the RSD lock allows to have
+ * better performances in normal mode (most of the time).
+ */
+ final ServerStatus dsStatus = handler.getStatus();
+ if (dsStatus == BAD_GEN_ID_STATUS)
+ {
+ logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+ handler.getReplicationServerId(),
+ updateMsg.getCSN().toString(),
+ handler.getBaseDNString(), handler.getServerId(),
+ session.getReadableRemoteAddress(),
+ handler.getGenerationId(),
+ handler.getReferenceGenId()));
+ return true;
+ }
+ else if (dsStatus == FULL_UPDATE_STATUS)
+ {
+ logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
+ handler.getReplicationServerId(),
+ updateMsg.getCSN().toString(),
+ handler.getBaseDNString(), handler.getServerId(),
+ session.getReadableRemoteAddress()));
+ return true;
+ }
+ }
+ else
+ {
+ /**
+ * Ignore updates from RS with bad gen id
+ * (no system managed status for a RS)
+ */
+ long referenceGenerationId = handler.getReferenceGenId();
+ if (referenceGenerationId > 0
+ && referenceGenerationId != handler.getGenerationId())
+ {
+ logError(WARN_IGNORING_UPDATE_FROM_RS.get(
+ handler.getReplicationServerId(),
+ updateMsg.getCSN().toString(),
+ handler.getBaseDNString(),
+ handler.getServerId(),
+ session.getReadableRemoteAddress(),
+ handler.getGenerationId(),
+ referenceGenerationId));
+ return true;
+ }
+ }
+ return false;
+ }
+
private void logException(Exception e)
{
if (debugEnabled())
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 23a2880..1de2e40 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -38,6 +38,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -99,82 +100,19 @@
{
while (true)
{
- UpdateMsg update = replicationServerDomain.take(this.handler);
- if (update == null)
+ final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
+ if (updateMsg == null)
{
// this connection is closing
errMessage = Message.raw(
"Connection closure: null update returned by domain.");
return;
}
-
- // Ignore updates in some cases
- long referenceGenerationId = replicationServerDomain.getGenerationId();
- if (handler.isDataServer())
+ else if (!isUpdateMsgFiltered(updateMsg))
{
- /**
- * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
- *
- * The RSD lock should not be taken here as it is acceptable to have a
- * delay between the time the server has a wrong status and the fact
- * we detect it: the updates that succeed to pass during this time
- * will have no impact on remote server. But it is interesting to not
- * saturate uselessly the network if the updates are not necessary so
- * this check to stop sending updates is interesting anyway. Not
- * taking the RSD lock allows to have better performances in normal
- * mode (most of the time).
- */
- ServerStatus dsStatus = handler.getStatus();
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
- || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
- {
- if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
- {
- logError(WARN_IGNORING_UPDATE_TO_DS_BADGENID.get(
- handler.getReplicationServerId(),
- update.getCSN().toString(),
- handler.getBaseDNString(), handler.getServerId(),
- session.getReadableRemoteAddress(),
- handler.getGenerationId(),
- referenceGenerationId));
- }
- else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
- {
- logError(WARN_IGNORING_UPDATE_TO_DS_FULLUP.get(
- handler.getReplicationServerId(),
- update.getCSN().toString(),
- handler.getBaseDNString(), handler.getServerId(),
- session.getReadableRemoteAddress()));
- }
- continue;
- }
+ // Publish the update to the remote server using a protocol version it supports
+ session.publish(updateMsg);
}
- else
- {
- /**
- * Ignore updates to RS with bad gen id
- * (no system managed status for a RS)
- */
- if (referenceGenerationId != handler.getGenerationId()
- || referenceGenerationId == -1
- || handler.getGenerationId() == -1)
- {
- logError(
- WARN_IGNORING_UPDATE_TO_RS.get(
- handler.getReplicationServerId(),
- update.getCSN().toString(),
- handler.getBaseDNString(),
- handler.getServerId(),
- session.getReadableRemoteAddress(),
- handler.getGenerationId(),
- referenceGenerationId));
- continue;
- }
- }
-
- // Publish the update to the remote server using a protocol version he
- // it supports
- session.publish(update);
}
}
catch (SocketException e)
@@ -205,4 +143,63 @@
}
}
}
+
+ private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
+ {
+ if (handler.isDataServer())
+ {
+ /**
+ * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
+ *
+ * The RSD lock should not be taken here as it is acceptable to have a delay
+ * between the time the server has a wrong status and the fact we detect it:
+ * the updates that succeed to pass during this time will have no impact on remote server.
+ * But it is interesting to not saturate uselessly the network
+ * if the updates are not necessary so this check to stop sending updates is interesting anyway.
+ * Not taking the RSD lock allows to have better performances in normal mode (most of the time).
+ */
+ final ServerStatus dsStatus = handler.getStatus();
+ if (dsStatus == BAD_GEN_ID_STATUS)
+ {
+ logError(WARN_IGNORING_UPDATE_TO_DS_BADGENID.get(
+ handler.getReplicationServerId(),
+ updateMsg.getCSN().toString(),
+ handler.getBaseDNString(), handler.getServerId(),
+ session.getReadableRemoteAddress(),
+ handler.getGenerationId(),
+ replicationServerDomain.getGenerationId()));
+ return true;
+ }
+ else if (dsStatus == FULL_UPDATE_STATUS)
+ {
+ logError(WARN_IGNORING_UPDATE_TO_DS_FULLUP.get(
+ handler.getReplicationServerId(),
+ updateMsg.getCSN().toString(),
+ handler.getBaseDNString(), handler.getServerId(),
+ session.getReadableRemoteAddress()));
+ return true;
+ }
+ }
+ else
+ {
+ /**
+ * Ignore updates to RS with bad gen id
+ * (no system managed status for a RS)
+ */
+ final long referenceGenerationId = replicationServerDomain.getGenerationId();
+ if (referenceGenerationId != handler.getGenerationId()
+ || referenceGenerationId == -1 || handler.getGenerationId() == -1)
+ {
+ logError(WARN_IGNORING_UPDATE_TO_RS.get(
+ handler.getReplicationServerId(),
+ updateMsg.getCSN().toString(),
+ handler.getBaseDNString(), handler.getServerId(),
+ session.getReadableRemoteAddress(),
+ handler.getGenerationId(),
+ referenceGenerationId));
+ return true;
+ }
+ }
+ return false;
+ }
}
--
Gitblit v1.10.0