From 2caca3a5c55076f212fe2b2d725769737160c59c Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Mon, 31 May 2010 08:33:05 +0000
Subject: [PATCH] Fix for issue #4526. Fixes a race condition in Replication Server when resetting the GenerationID

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java |    6 ++-
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                             |   25 +++++++++++-
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                             |   84 +++++++++++++++++++++++++++--------------
 3 files changed, 82 insertions(+), 33 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c8f4e77..32a4d33 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1810,8 +1810,12 @@
 
   /**
    * Creates a new list that contains only replication servers that have the
-   * passed generation id, from a passed replication server list.
-   * @param bestServers The list of replication servers to filter
+   * provided generation id, from a provided replication server list.
+   * When the selected replication servers have no change (empty serverState)
+   * then the 'empty'(generationId==-1) replication servers are also included
+   * in the result list.
+   *
+   * @param bestServers  The list of replication servers to filter
    * @param generationId The generation id that must match
    * @return The sub list of replication servers matching the requested
    * generation id (which may be empty)
@@ -1822,6 +1826,7 @@
   {
     Map<Integer, ReplicationServerInfo> result =
       new HashMap<Integer, ReplicationServerInfo>();
+    boolean emptyState = true;
 
     for (Integer rsId : bestServers.keySet())
     {
@@ -1829,6 +1834,22 @@
       if (replicationServerInfo.getGenerationId() == generationId)
       {
         result.put(rsId, replicationServerInfo);
+        if (!replicationServerInfo.serverState.isEmpty())
+          emptyState = false;
+      }
+    }
+
+    if (emptyState)
+    {
+      // If the RS with a generationId have all an empty state,
+      // then the 'empty'(genId=-1) RSes are also candidate
+      for (Integer rsId : bestServers.keySet())
+      {
+        ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
+        if (replicationServerInfo.getGenerationId() == -1)
+        {
+          result.put(rsId, replicationServerInfo);
+        }
       }
     }
     return result;
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 2ee1ec0..de6d555 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -591,19 +591,21 @@
   }
 
   /**
-   * Check if a remote replica (DS) is connected to the topology based on
-   * the TopologyMsg we received when the remote replica connected or
-   * disconnected.
+   * Returns informations about the DS server related to the provided serverId.
+   * based on the TopologyMsg we received when the remote replica connected or
+   * disconnected. Return null when no server with the provided serverId is
+   * connected.
    *
-   * @param serverId The provided serverId of the remote replica
-   * @return whether the remote replica is connected or not.
+   * @param  serverId The provided serverId of the remote replica
+   * @return the info related to this remote server if it is connected,
+   *                  null is the server is NOT connected.
    */
-  public boolean isRemoteDSConnected(int serverId)
+  public DSInfo isRemoteDSConnected(int serverId)
   {
     for (DSInfo remoteDS : getReplicasList())
       if (remoteDS.getDsId() == serverId)
-        return true;
-    return false;
+        return remoteDS;
+    return null;
   }
 
   /**
@@ -1670,13 +1672,12 @@
   }
 
   /*
-   * For all remote servers in tht start list,
+   * For all remote servers in the start list,
    * - wait it has finished the import and present the expected generationID
    * - build the failureList
    */
   private void waitForRemoteEndOfInit()
   {
-    int waitResultAttempt = 0;
     Set<Integer> replicasWeAreWaitingFor =  new HashSet<Integer>(0);
 
     for (Integer sid : ieContext.startList)
@@ -1696,36 +1697,60 @@
     do
     {
       done = true;
-      for (DSInfo dsi : getReplicasList())
+      short reconnectMaxDelayInSec = 10;
+      short reconnectWait = 0;
+      for (int serverId : replicasWeAreWaitingFor)
       {
-        if (debugEnabled())
-          TRACER.debugInfo(
-            "[IE] wait for end dsid " + dsi.getDsId()
-            + " " + dsi.getStatus()
-            + " " + dsi.getGenerationId()
-            + " " + this.getGenerationID());
-        if (!ieContext.failureList.contains(dsi.getDsId()))
+        if (ieContext.failureList.contains(serverId))
         {
-          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
+          // this server has already been in error during initialization
+          // dont't wait for it
+          continue;
+        }
+
+        DSInfo dsInfo = null;
+        dsInfo = isRemoteDSConnected(serverId);
+        if (dsInfo == null)
+        {
+          // this server is disconnected
+          // may be for a long time if it crashed or had been stopped
+          // may be just the time to reconnect after import : should be short
+          if (++reconnectWait<reconnectMaxDelayInSec)
+          {
+            // let's still wait to give a chance to this server to reconnect
+            done = false;
+          }
+          else
+          {
+            // we left enough time to the servers to reconnect - now it's too
+            // late
+          }
+        }
+        else
+        {
+          // this server is connected
+          if (dsInfo.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
           {
             // this one is still doing the Full Update ... retry later
             done = false;
-            try
-            { Thread.sleep(1000); } catch (InterruptedException e) {} // 1s
-            waitResultAttempt++;
             break;
           }
           else
           {
             // this one is done with the Full Update
-            if (dsi.getGenerationId() == this.getGenerationID())
+            if (dsInfo.getGenerationId() == this.getGenerationID())
             {
               // and with the expected generationId
-              replicasWeAreWaitingFor.remove(dsi.getDsId());
+              replicasWeAreWaitingFor.remove(serverId);
             }
           }
         }
       }
+
+      // loop and wait
+      if (!done)
+        try { Thread.sleep(1000); } catch (InterruptedException e) {} // 1sec
+
     }
     while ((!done) && (!broker.shuttingDown())); // infinite wait
 
@@ -1921,7 +1946,7 @@
           // Other messages received during an import are trashed except
           // the topologyMsg.
           if ((msg instanceof TopologyMsg) &&
-              (!this.isRemoteDSConnected(ieContext.importSource)))
+              (isRemoteDSConnected(ieContext.importSource)==null))
           {
             Message errMsg =
               Message.raw(Category.SYNC, Severity.NOTICE,
@@ -2013,7 +2038,7 @@
         throw(new IOException(ieContext.getException().getMessage()));
 
       int slowestServerId = ieContext.getSlowestServer();
-      if (!isRemoteDSConnected(slowestServerId))
+      if (isRemoteDSConnected(slowestServerId)==null)
       {
         ieContext.setException(new DirectoryException(ResultCode.OTHER,
             ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
@@ -2491,12 +2516,14 @@
   {
     boolean allset = true;
 
-    for (int i = 0; i< 10; i++)
+    for (int i = 0; i< 50; i++)
     {
       allset = true;
       for (RSInfo rsInfo : getRsList())
       {
-        if (rsInfo.getGenerationId() != generationID)
+        // the 'empty' RSes (generationId==-1) are considered as good citizens
+        if ((rsInfo.getGenerationId() != -1) &&
+            (rsInfo.getGenerationId() != generationID))
         {
           try
           {
@@ -2513,7 +2540,6 @@
         break;
       }
     }
-
     if (!allset)
     {
       ResultCode resultCode = ResultCode.OTHER;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 756297a..a331d2d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -162,13 +162,15 @@
 
       domain1.setGenerationID(2);
       domain1.resetReplicationLog();
-
+      Thread.sleep(500);
       replServers = domain1.getRsList();
 
       for (RSInfo replServerInfo : replServers)
       {
         // The generation Id of the remote should now be 2
-        assertTrue(replServerInfo.getGenerationId() == 2);
+        assertEquals(replServerInfo.getGenerationId(), 2,
+            "Unexpected value of generationId in RSInfo for RS="
+            + replServerInfo.toString());
       }
 
       int sleepTime = 50;

--
Gitblit v1.10.0