From 4b02c5f2c1450b7d44022c6ea52260823ae39686 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 06 Mar 2013 13:03:44 +0000
Subject: [PATCH] OPENDJ-66 (CR-1368) DS does not failover between replication servers in different groups when configured explicitly for one of the groups 

---
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  260 ++++++++++++++++++++++-----------------------------
 1 files changed, 113 insertions(+), 147 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c182543..00a02d1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -874,7 +874,6 @@
     stopChangeTimeHeartBeatPublishing();
     mustRunBestServerCheckingAlgorithm = 0;
 
-    boolean newServerWithSameGroupId = false;
     synchronized (connectPhaseLock)
     {
       /*
@@ -889,164 +888,45 @@
       // Get info from every available replication servers
       replicationServerInfos = collectReplicationServersInfo();
 
-      ReplicationServerInfo replicationServerInfo = null;
+      ReplicationServerInfo electedRsInfo = null;
 
       if (replicationServerInfos.size() > 0)
       {
         // At least one server answered, find the best one.
-        replicationServerInfo = computeBestReplicationServer(true, -1, state,
-          replicationServerInfos, serverId, baseDn, groupId,
-          this.getGenerationID());
+        electedRsInfo = computeBestReplicationServer(true, -1, state,
+          replicationServerInfos, serverId, baseDn, groupId, getGenerationID());
 
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
           debugInfo("serverId: " + serverId +
             " phase 2 : will perform PhaseOneH with the preferred RS="
-              + replicationServerInfo);
-        replicationServerInfo = performPhaseOneHandshake(
-          replicationServerInfo.getServerURL(), true, false);
+              + electedRsInfo);
+        electedRsInfo = performPhaseOneHandshake(
+          electedRsInfo.getServerURL(), true, false);
 
-        if (replicationServerInfo != null)
+        if (electedRsInfo != null)
         {
           // Update replication server info with potentially more up to date
           // data (server state for instance may have changed)
-          replicationServerInfos.put(replicationServerInfo.getServerId(),
-              replicationServerInfo);
+          replicationServerInfos
+              .put(electedRsInfo.getServerId(), electedRsInfo);
 
           // Handshake phase 1 exchange went well
 
           // Compute in which status we are starting the session to tell the RS
           ServerStatus initStatus =
-            computeInitialServerStatus(replicationServerInfo.getGenerationId(),
-            replicationServerInfo.getServerState(),
-            replicationServerInfo.getDegradedStatusThreshold(),
-            this.getGenerationID());
+            computeInitialServerStatus(electedRsInfo.getGenerationId(),
+            electedRsInfo.getServerState(),
+            electedRsInfo.getDegradedStatusThreshold(),
+            getGenerationID());
 
           // Perform session start (handshake phase 2)
           TopologyMsg topologyMsg = performPhaseTwoHandshake(
-            replicationServerInfo.getServerURL(), initStatus);
+            electedRsInfo.getServerURL(), initStatus);
 
           if (topologyMsg != null) // Handshake phase 2 exchange went well
           {
-            try
-            {
-              /*
-               * If we just connected to a RS with a different group id than us
-               * (because for instance a RS with our group id was unreachable
-               * while connecting to each RS) but the just received TopologyMsg
-               * shows that in the same time a RS with our group id connected,
-               * we must give up the connection to force reconnection that will
-               * certainly go back to a server with our group id as server with
-               * our group id have a greater priority for connection (in
-               * computeBestReplicationServer). In other words, we disconnect to
-               * connect to a server with our group id. If a server with our
-               * group id comes back later in the topology, we will be advised
-               * upon reception of a new TopologyMsg message and we will force
-               * reconnection at that time to retrieve a server with our group
-               * id.
-               */
-              byte tmpRsGroupId = replicationServerInfo.getGroupId();
-              boolean someServersWithSameGroupId =
-                hasSomeServerWithSameGroupId(topologyMsg.getRsList());
-
-              // Really no other server with our group id ?
-              if ((tmpRsGroupId == groupId) || (!someServersWithSameGroupId))
-              {
-                replicationServer = session.getReadableRemoteAddress();
-                maxSendWindow = replicationServerInfo.getWindowSize();
-                rsGroupId = replicationServerInfo.getGroupId();
-                rsServerId = replicationServerInfo.getServerId();
-                rsServerUrl = replicationServerInfo.getServerURL();
-
-                receiveTopo(topologyMsg);
-
-                // Log a message to let the administrator know that the failure
-                // was resolved.
-                // Wakeup all the thread that were waiting on the window
-                // on the previous connection.
-                connectionError = false;
-                if (sendWindow != null)
-                {
-                  /*
-                   * Fix (hack) for OPENDJ-401: we want to ensure that no
-                   * threads holding this semaphore will get blocked when they
-                   * acquire it. However, we also need to make sure that we
-                   * don't overflow the semaphore by releasing too many permits.
-                   */
-                  final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
-                  if (sendWindow.availablePermits() < MAX_PERMITS)
-                  {
-                    /*
-                     * At least 2^29 acquisitions would need to occur for this
-                     * to be insufficient. In addition, at least 2^30 releases
-                     * would need to occur for this to potentially overflow.
-                     * Hopefully this is unlikely to happen.
-                     */
-                    sendWindow.release(MAX_PERMITS);
-                  }
-                }
-                sendWindow = new Semaphore(maxSendWindow);
-                rcvWindow = maxRcvWindow;
-                connected = true;
-
-                // May have created a broker with null replication domain for
-                // unit test purpose.
-                if (domain != null)
-                {
-                  domain.sessionInitiated(
-                    initStatus, replicationServerInfo.getServerState(),
-                    replicationServerInfo.getGenerationId(),
-                    session);
-                }
-
-                if (getRsGroupId() != groupId)
-                {
-                  // Connected to replication server with wrong group id:
-                  // warn user and start poller to recover when a server with
-                  // right group id arrives...
-                  Message message =
-                    WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
-                    Byte.toString(groupId), Integer.toString(rsServerId),
-                    replicationServerInfo.getServerURL(),
-                    Byte.toString(getRsGroupId()),
-                    baseDn, Integer.toString(serverId));
-                  logError(message);
-                }
-                startRSHeartBeatMonitoring();
-                if (replicationServerInfo.getProtocolVersion() >=
-                  ProtocolVersion.REPLICATION_PROTOCOL_V3)
-                {
-                  startChangeTimeHeartBeatPublishing();
-                }
-              } else
-              {
-                // Detected new RS with our group id: log disconnection to
-                // inform administrator
-                Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
-                  Byte.toString(groupId), baseDn.toString(),
-                  Integer.toString(serverId));
-                logError(message);
-                // Do not log connection error
-                newServerWithSameGroupId = true;
-              }
-            } catch (Exception e)
-            {
-              Message message = ERR_COMPUTING_FAKE_OPS.get(
-                baseDn, replicationServerInfo.getServerURL(),
-                e.getLocalizedMessage() + stackTraceToSingleLineString(e));
-              logError(message);
-            } finally
-            {
-              if (connected == false)
-              {
-                ProtocolSession localSession = session;
-                if (localSession != null)
-                {
-                  localSession.close();
-                  session = null;
-                }
-              }
-            }
+            connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
           } // Could perform handshake phase 2 with best
 
         } // Could perform handshake phase 1 with best
@@ -1057,9 +937,8 @@
       {
         connectPhaseLock.notify();
 
-        if ((replicationServerInfo.getGenerationId() ==
-          this.getGenerationID()) ||
-          (replicationServerInfo.getGenerationId() == -1))
+        if ((electedRsInfo.getGenerationId() == getGenerationID())
+            || (electedRsInfo.getGenerationId() == -1))
         {
           Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
               .get(serverId, rsServerId, baseDn,
@@ -1072,7 +951,7 @@
               .get(serverId, rsServerId, baseDn,
                   session.getReadableRemoteAddress(),
                   getGenerationID(),
-                  replicationServerInfo.getGenerationId());
+                  electedRsInfo.getGenerationId());
           logError(message);
         }
       } else
@@ -1081,7 +960,7 @@
          * This server could not find any replicationServer. It's going to start
          * in degraded mode. Log a message.
          */
-        if (!connectionError && !newServerWithSameGroupId)
+        if (!connectionError)
         {
           connectionError = true;
           connectPhaseLock.notify();
@@ -1107,17 +986,104 @@
   }
 
   /**
-   * Has the passed RS info list some servers with our group id ?
-   * @return true if at least one server has the same group id
+   * Connects to a replication server.
+   *
+   * @param rsInfo
+   *          the Replication Server to connect to
+   * @param initStatus
+   *          The status to enter the state machine with
+   * @param topologyMsg
+   *          the message containing the topology information
    */
-  private boolean hasSomeServerWithSameGroupId(List<RSInfo> rsInfos)
+  private void connectToReplicationServer(ReplicationServerInfo rsInfo,
+      ServerStatus initStatus, TopologyMsg topologyMsg)
   {
-    for (RSInfo rsInfo : rsInfos)
+    try
     {
-      if (rsInfo.getGroupId() == this.groupId)
-        return true;
+      replicationServer = session.getReadableRemoteAddress();
+      maxSendWindow = rsInfo.getWindowSize();
+      rsGroupId = rsInfo.getGroupId();
+      rsServerId = rsInfo.getServerId();
+      rsServerUrl = rsInfo.getServerURL();
+
+      receiveTopo(topologyMsg);
+
+      // Log a message to let the administrator know that the failure
+      // was resolved.
+      // Wakeup all the thread that were waiting on the window
+      // on the previous connection.
+      connectionError = false;
+      if (sendWindow != null)
+      {
+        /*
+         * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
+         * this semaphore will get blocked when they acquire it. However, we
+         * also need to make sure that we don't overflow the semaphore by
+         * releasing too many permits.
+         */
+        final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
+        if (sendWindow.availablePermits() < MAX_PERMITS)
+        {
+          /*
+           * At least 2^29 acquisitions would need to occur for this to be
+           * insufficient. In addition, at least 2^30 releases would need to
+           * occur for this to potentially overflow. Hopefully this is unlikely
+           * to happen.
+           */
+          sendWindow.release(MAX_PERMITS);
+        }
+      }
+      sendWindow = new Semaphore(maxSendWindow);
+      rcvWindow = maxRcvWindow;
+      connected = true;
+
+      // May have created a broker with null replication domain for
+      // unit test purpose.
+      if (domain != null)
+      {
+        domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
+            .getGenerationId(), session);
+      }
+
+      if (getRsGroupId() != groupId)
+      {
+        // Connected to replication server with wrong group id:
+        // warn user and start poller to recover when a server with
+        // right group id arrives...
+        Message message =
+            WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte
+                .toString(groupId), Integer.toString(rsServerId), rsInfo
+                .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer
+                .toString(serverId));
+        logError(message);
+      }
+      startRSHeartBeatMonitoring();
+      if (rsInfo.getProtocolVersion() >=
+        ProtocolVersion.REPLICATION_PROTOCOL_V3)
+      {
+        startChangeTimeHeartBeatPublishing();
+      }
     }
-    return false;
+    catch (Exception e)
+    {
+      Message message =
+          ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e
+              .getLocalizedMessage()
+              + stackTraceToSingleLineString(e));
+      logError(message);
+    }
+    finally
+    {
+      if (connected == false)
+      {
+        ProtocolSession localSession = session;
+        if (localSession != null)
+        {
+          localSession.close();
+          session = null;
+        }
+      }
+    }
   }
 
   /**

--
Gitblit v1.10.0