From 4b02c5f2c1450b7d44022c6ea52260823ae39686 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 06 Mar 2013 13:03:44 +0000
Subject: [PATCH] OPENDJ-66 (CR-1368) DS does not failover between replication servers in different groups when configured explicitly for one of the groups
---
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 260 ++++++++++++++++++++++-----------------------------
1 files changed, 113 insertions(+), 147 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c182543..00a02d1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -874,7 +874,6 @@
stopChangeTimeHeartBeatPublishing();
mustRunBestServerCheckingAlgorithm = 0;
- boolean newServerWithSameGroupId = false;
synchronized (connectPhaseLock)
{
/*
@@ -889,164 +888,45 @@
// Get info from every available replication servers
replicationServerInfos = collectReplicationServersInfo();
- ReplicationServerInfo replicationServerInfo = null;
+ ReplicationServerInfo electedRsInfo = null;
if (replicationServerInfos.size() > 0)
{
// At least one server answered, find the best one.
- replicationServerInfo = computeBestReplicationServer(true, -1, state,
- replicationServerInfos, serverId, baseDn, groupId,
- this.getGenerationID());
+ electedRsInfo = computeBestReplicationServer(true, -1, state,
+ replicationServerInfos, serverId, baseDn, groupId, getGenerationID());
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
debugInfo("serverId: " + serverId +
" phase 2 : will perform PhaseOneH with the preferred RS="
- + replicationServerInfo);
- replicationServerInfo = performPhaseOneHandshake(
- replicationServerInfo.getServerURL(), true, false);
+ + electedRsInfo);
+ electedRsInfo = performPhaseOneHandshake(
+ electedRsInfo.getServerURL(), true, false);
- if (replicationServerInfo != null)
+ if (electedRsInfo != null)
{
// Update replication server info with potentially more up to date
// data (server state for instance may have changed)
- replicationServerInfos.put(replicationServerInfo.getServerId(),
- replicationServerInfo);
+ replicationServerInfos
+ .put(electedRsInfo.getServerId(), electedRsInfo);
// Handshake phase 1 exchange went well
// Compute in which status we are starting the session to tell the RS
ServerStatus initStatus =
- computeInitialServerStatus(replicationServerInfo.getGenerationId(),
- replicationServerInfo.getServerState(),
- replicationServerInfo.getDegradedStatusThreshold(),
- this.getGenerationID());
+ computeInitialServerStatus(electedRsInfo.getGenerationId(),
+ electedRsInfo.getServerState(),
+ electedRsInfo.getDegradedStatusThreshold(),
+ getGenerationID());
// Perform session start (handshake phase 2)
TopologyMsg topologyMsg = performPhaseTwoHandshake(
- replicationServerInfo.getServerURL(), initStatus);
+ electedRsInfo.getServerURL(), initStatus);
if (topologyMsg != null) // Handshake phase 2 exchange went well
{
- try
- {
- /*
- * If we just connected to a RS with a different group id than us
- * (because for instance a RS with our group id was unreachable
- * while connecting to each RS) but the just received TopologyMsg
- * shows that in the same time a RS with our group id connected,
- * we must give up the connection to force reconnection that will
- * certainly go back to a server with our group id as server with
- * our group id have a greater priority for connection (in
- * computeBestReplicationServer). In other words, we disconnect to
- * connect to a server with our group id. If a server with our
- * group id comes back later in the topology, we will be advised
- * upon reception of a new TopologyMsg message and we will force
- * reconnection at that time to retrieve a server with our group
- * id.
- */
- byte tmpRsGroupId = replicationServerInfo.getGroupId();
- boolean someServersWithSameGroupId =
- hasSomeServerWithSameGroupId(topologyMsg.getRsList());
-
- // Really no other server with our group id ?
- if ((tmpRsGroupId == groupId) || (!someServersWithSameGroupId))
- {
- replicationServer = session.getReadableRemoteAddress();
- maxSendWindow = replicationServerInfo.getWindowSize();
- rsGroupId = replicationServerInfo.getGroupId();
- rsServerId = replicationServerInfo.getServerId();
- rsServerUrl = replicationServerInfo.getServerURL();
-
- receiveTopo(topologyMsg);
-
- // Log a message to let the administrator know that the failure
- // was resolved.
- // Wakeup all the thread that were waiting on the window
- // on the previous connection.
- connectionError = false;
- if (sendWindow != null)
- {
- /*
- * Fix (hack) for OPENDJ-401: we want to ensure that no
- * threads holding this semaphore will get blocked when they
- * acquire it. However, we also need to make sure that we
- * don't overflow the semaphore by releasing too many permits.
- */
- final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
- if (sendWindow.availablePermits() < MAX_PERMITS)
- {
- /*
- * At least 2^29 acquisitions would need to occur for this
- * to be insufficient. In addition, at least 2^30 releases
- * would need to occur for this to potentially overflow.
- * Hopefully this is unlikely to happen.
- */
- sendWindow.release(MAX_PERMITS);
- }
- }
- sendWindow = new Semaphore(maxSendWindow);
- rcvWindow = maxRcvWindow;
- connected = true;
-
- // May have created a broker with null replication domain for
- // unit test purpose.
- if (domain != null)
- {
- domain.sessionInitiated(
- initStatus, replicationServerInfo.getServerState(),
- replicationServerInfo.getGenerationId(),
- session);
- }
-
- if (getRsGroupId() != groupId)
- {
- // Connected to replication server with wrong group id:
- // warn user and start poller to recover when a server with
- // right group id arrives...
- Message message =
- WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
- Byte.toString(groupId), Integer.toString(rsServerId),
- replicationServerInfo.getServerURL(),
- Byte.toString(getRsGroupId()),
- baseDn, Integer.toString(serverId));
- logError(message);
- }
- startRSHeartBeatMonitoring();
- if (replicationServerInfo.getProtocolVersion() >=
- ProtocolVersion.REPLICATION_PROTOCOL_V3)
- {
- startChangeTimeHeartBeatPublishing();
- }
- } else
- {
- // Detected new RS with our group id: log disconnection to
- // inform administrator
- Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
- Byte.toString(groupId), baseDn.toString(),
- Integer.toString(serverId));
- logError(message);
- // Do not log connection error
- newServerWithSameGroupId = true;
- }
- } catch (Exception e)
- {
- Message message = ERR_COMPUTING_FAKE_OPS.get(
- baseDn, replicationServerInfo.getServerURL(),
- e.getLocalizedMessage() + stackTraceToSingleLineString(e));
- logError(message);
- } finally
- {
- if (connected == false)
- {
- ProtocolSession localSession = session;
- if (localSession != null)
- {
- localSession.close();
- session = null;
- }
- }
- }
+ connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
} // Could perform handshake phase 2 with best
} // Could perform handshake phase 1 with best
@@ -1057,9 +937,8 @@
{
connectPhaseLock.notify();
- if ((replicationServerInfo.getGenerationId() ==
- this.getGenerationID()) ||
- (replicationServerInfo.getGenerationId() == -1))
+ if ((electedRsInfo.getGenerationId() == getGenerationID())
+ || (electedRsInfo.getGenerationId() == -1))
{
Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
.get(serverId, rsServerId, baseDn,
@@ -1072,7 +951,7 @@
.get(serverId, rsServerId, baseDn,
session.getReadableRemoteAddress(),
getGenerationID(),
- replicationServerInfo.getGenerationId());
+ electedRsInfo.getGenerationId());
logError(message);
}
} else
@@ -1081,7 +960,7 @@
* This server could not find any replicationServer. It's going to start
* in degraded mode. Log a message.
*/
- if (!connectionError && !newServerWithSameGroupId)
+ if (!connectionError)
{
connectionError = true;
connectPhaseLock.notify();
@@ -1107,17 +986,104 @@
}
/**
- * Has the passed RS info list some servers with our group id ?
- * @return true if at least one server has the same group id
+ * Connects to a replication server.
+ *
+ * @param rsInfo
+ * the Replication Server to connect to
+ * @param initStatus
+ * The status to enter the state machine with
+ * @param topologyMsg
+ * the message containing the topology information
*/
- private boolean hasSomeServerWithSameGroupId(List<RSInfo> rsInfos)
+ private void connectToReplicationServer(ReplicationServerInfo rsInfo,
+ ServerStatus initStatus, TopologyMsg topologyMsg)
{
- for (RSInfo rsInfo : rsInfos)
+ try
{
- if (rsInfo.getGroupId() == this.groupId)
- return true;
+ replicationServer = session.getReadableRemoteAddress();
+ maxSendWindow = rsInfo.getWindowSize();
+ rsGroupId = rsInfo.getGroupId();
+ rsServerId = rsInfo.getServerId();
+ rsServerUrl = rsInfo.getServerURL();
+
+ receiveTopo(topologyMsg);
+
+ // Log a message to let the administrator know that the failure
+ // was resolved.
+ // Wakeup all the thread that were waiting on the window
+ // on the previous connection.
+ connectionError = false;
+ if (sendWindow != null)
+ {
+ /*
+ * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
+ * this semaphore will get blocked when they acquire it. However, we
+ * also need to make sure that we don't overflow the semaphore by
+ * releasing too many permits.
+ */
+ final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
+ if (sendWindow.availablePermits() < MAX_PERMITS)
+ {
+ /*
+ * At least 2^29 acquisitions would need to occur for this to be
+ * insufficient. In addition, at least 2^30 releases would need to
+ * occur for this to potentially overflow. Hopefully this is unlikely
+ * to happen.
+ */
+ sendWindow.release(MAX_PERMITS);
+ }
+ }
+ sendWindow = new Semaphore(maxSendWindow);
+ rcvWindow = maxRcvWindow;
+ connected = true;
+
+ // May have created a broker with null replication domain for
+ // unit test purpose.
+ if (domain != null)
+ {
+ domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
+ .getGenerationId(), session);
+ }
+
+ if (getRsGroupId() != groupId)
+ {
+ // Connected to replication server with wrong group id:
+ // warn user and start poller to recover when a server with
+ // right group id arrives...
+ Message message =
+ WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte
+ .toString(groupId), Integer.toString(rsServerId), rsInfo
+ .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer
+ .toString(serverId));
+ logError(message);
+ }
+ startRSHeartBeatMonitoring();
+ if (rsInfo.getProtocolVersion() >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ {
+ startChangeTimeHeartBeatPublishing();
+ }
}
- return false;
+ catch (Exception e)
+ {
+ Message message =
+ ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e
+ .getLocalizedMessage()
+ + stackTraceToSingleLineString(e));
+ logError(message);
+ }
+ finally
+ {
+ if (connected == false)
+ {
+ ProtocolSession localSession = session;
+ if (localSession != null)
+ {
+ localSession.close();
+ session = null;
+ }
+ }
+ }
}
/**
--
Gitblit v1.10.0