From 2b6d9196406b28e334b525e5e7642d71a9722a4f Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 16 Oct 2009 06:41:58 +0000
Subject: [PATCH] Fix for 4272 : Changes not replayed initially on adde

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                 |   28 +++++
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                |   83 +++++++---------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java |    8 +
 opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java                                       |   20 ++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java     |  126 +++++++++++++++++++++++++
 5 files changed, 216 insertions(+), 49 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 02aea22..61df06a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -366,6 +366,26 @@
   }
 
   /**
+   * Checks that the ChangeNumber given as a parameter is in this ServerState.
+   *
+   * @param   covered The ChangeNumber that should be checked.
+   * @return  A boolean indicating if this ServerState contains the ChangeNumber
+   *          given in parameter.
+   */
+  public boolean cover(ChangeNumber covered)
+  {
+    ChangeNumber change = this.list.get(covered.getServerId());
+    if ((change == null) || (change.older(covered)))
+    {
+      return false;
+    }
+    else
+    {
+      return true;
+    }
+  }
+
+  /**
    * Tests if the state is empty.
    *
    * @return True if the state is empty.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c093c86..b24c5e5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1503,6 +1503,34 @@
   }
 
   /**
+   * WARNING : only use this methods for tests purpose.
+   *
+   * Add the Replication Server given as a parameter in the list
+   * of local replication servers.
+   *
+   * @param server The server to be added.
+   */
+  public static void onlyForTestsAddlocalReplicationServer(String server)
+  {
+    int separator = server.lastIndexOf(':');
+    if (separator == -1)
+      return ;
+    int port = Integer.parseInt(server.substring(separator + 1));
+    localPorts.add(port);
+  }
+
+  /**
+   * WARNING : only use this methods for tests purpose.
+   *
+   * Clear the list of local Replication Servers
+   *
+   */
+  public static void onlyForTestsClearLocalReplicationServerList()
+  {
+    localPorts.clear();
+  }
+
+  /**
    * This method allows to check if the Replication Server given
    * as the parameter is running in the local JVM.
    *
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index e058301..983cffa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1375,13 +1375,14 @@
    * @param myState The local server state.
    * @param rsInfos The list of available replication servers and their
    *                 associated information (choice will be made among them).
-   * @param serverId2 The server id for the suffix we are working for.
+   * @param localServerId The server id for the suffix we are working for.
    * @param baseDn The suffix for which we are working for.
    * @param groupId The groupId we prefer being connected to if possible
    * @return The computed best replication server.
    */
   public static String computeBestReplicationServer(ServerState myState,
-    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
+    Map<String, ServerInfo> rsInfos, int localServerId,
+    String baseDn, byte groupId)
   {
     /*
      * Preference is given to servers with the requested group id:
@@ -1405,11 +1406,11 @@
     if (sameGroupIdRsInfos.size() > 0)
     {
       return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
-        serverId2, baseDn);
+        localServerId, baseDn);
     } else
     {
       return searchForBestReplicationServer(myState, rsInfos,
-        serverId2, baseDn);
+        localServerId, baseDn);
     }
   }
 
@@ -1422,12 +1423,12 @@
    * @param myState The local server state.
    * @param rsInfos The list of available replication servers and their
    *                 associated information (choice will be made among them).
-   * @param serverId2 The server id for the suffix we are working for.
+   * @param localServerID The server id for the suffix we are working for.
    * @param baseDn The suffix for which we are working for.
    * @return The computed best replication server.
    */
   private static String searchForBestReplicationServer(ServerState myState,
-    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
+    Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
   {
     /*
      * Find replication servers who are up to date (or more up to date than us,
@@ -1455,6 +1456,8 @@
     }
 
     String bestServer = null;
+    boolean bestServerIsLocal = false;
+
     // Servers up to dates with regard to our changes
     HashMap<String, ServerState> upToDateServers =
       new HashMap<String, ServerState>();
@@ -1464,19 +1467,19 @@
     /*
      * Start loop to differentiate up to date servers from late ones.
      */
-    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
+    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
     if (myChangeNumber == null)
     {
-      myChangeNumber = new ChangeNumber(0, 0, serverId2);
+      myChangeNumber = new ChangeNumber(0, 0, localServerID);
     }
     for (String repServer : rsInfos.keySet())
     {
 
       ServerState rsState = rsInfos.get(repServer).getServerState();
-      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId2);
+      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
       if (rsChangeNumber == null)
       {
-        rsChangeNumber = new ChangeNumber(0, 0, serverId2);
+        rsChangeNumber = new ChangeNumber(0, 0, localServerID);
       }
 
       // Store state in right list
@@ -1491,7 +1494,6 @@
 
     if (upToDateServers.size() > 0)
     {
-
       /*
        * Some up to date servers, among them, choose either :
        * - The local one
@@ -1504,35 +1506,10 @@
        */
 
       Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
-        upToDateServers.size(), baseDn, Integer.toString(serverId2));
+        upToDateServers.size(), baseDn, Integer.toString(localServerID));
       logError(message);
 
       /*
-       * If there are local Replication Servers, remove all the other one
-       * from the list so that we are sure that we choose a local one.
-       */
-      boolean localRS = false;
-      for (String upServer : upToDateServers.keySet())
-      {
-        if (ReplicationServer.isLocalReplicationServer(upServer))
-        {
-          localRS = true;
-          break;
-        }
-      }
-      if (localRS)
-      {
-        Iterator<String> it = upToDateServers.keySet().iterator();
-        while (it.hasNext())
-        {
-          if (!ReplicationServer.isLocalReplicationServer(it.next()))
-          {
-            it.remove();
-          }
-        }
-      }
-
-      /*
        * First of all, compute the virtual server state for the whole topology,
        * which is composed of the most up to date change numbers for
        * each server id in the topology.
@@ -1570,7 +1547,7 @@
          * number loops and comes back to 0 (computation would have becomen
          * meaningless).
          */
-        long shift = -1L;
+        long shift = 0;
         ServerState curState = upToDateServers.get(upServer);
         Iterator<Integer> it = curState.iterator();
         while (it.hasNext())
@@ -1586,17 +1563,21 @@
           // Cannot be negative as topoState computed as being the max CN
           // for each server id in the topology
           long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
-          if (tmpShift > shift)
-          {
-            shift = tmpShift;
-          }
+          shift +=tmpShift;
         }
 
+        boolean upServerIsLocal =
+          ReplicationServer.isLocalReplicationServer(upServer);
         if ((minShift < 0) // First time in loop
-          || (shift < minShift))
+            || ((shift < minShift) && upServerIsLocal)
+            || (((bestServerIsLocal == false) && (shift < minShift)))
+            || ((bestServerIsLocal == false) && (upServerIsLocal &&
+                                              (shift<(minShift + 60)) ))
+            || (shift+120 < minShift))
         {
           // This server is even closer to topo state
           bestServer = upServer;
+          bestServerIsLocal = upServerIsLocal;
           minShift = shift;
         }
       } // For up to date servers
@@ -1616,26 +1597,32 @@
       long minShift = -1L;
       for (String lateServer : lateOnes.keySet())
       {
-
         /*
          * Choose the server who is the closest to us regarding our server id
          * (this is the most up to date regarding our server id).
          */
         ServerState curState = lateOnes.get(lateServer);
-        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId2);
+        ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
         if (ourSidCn == null)
         {
-          ourSidCn = new ChangeNumber(0, 0, serverId2);
+          ourSidCn = new ChangeNumber(0, 0, localServerID);
         }
         // Cannot be negative as our Cn for our server id is strictly
         // greater than those of the servers in late server list
         long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
 
+        boolean lateServerisLocal =
+          ReplicationServer.isLocalReplicationServer(lateServer);
         if ((minShift < 0) // First time in loop
-          || (tmpShift < minShift))
+          || ((tmpShift < minShift) && lateServerisLocal)
+          || (((bestServerIsLocal == false) && (tmpShift < minShift)))
+          || ((bestServerIsLocal == false) && (lateServerisLocal &&
+                                            (tmpShift<(minShift + 60)) ))
+          || (tmpShift+120 < minShift))
         {
-          // This sever is even closer to topo state
+          // This server is even closer to topo state
           bestServer = lateServer;
+          bestServerIsLocal = lateServerisLocal;
           minShift = tmpShift;
         }
       } // For late servers
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
index 349767e..7588ade 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -41,6 +41,8 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.server.ReplicationServer;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -504,7 +506,7 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId2);
     aState.update(cn);
-    cn = new ChangeNumber(3L, 0, myId3);
+    cn = new ChangeNumber(4L, 0, myId3);
     aState.update(cn);
     replServerStartMsg =
       new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
@@ -920,4 +922,126 @@
 
     assertEquals(bestServer, WINNER, "Wrong best replication server.");
   }
+
+
+  @DataProvider(name = "create3ServersData")
+  public Object[][] create3ServersData() {
+    return new Object[][] {
+        // first RS is up to date, the others are late none is local
+        { 4, 2, 3, false, 1, 2, 3, false, 2, 3, 4, false},
+
+        // test that the local RS  is chosen first when all up to date
+        { 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
+
+        // test that the local ServerID is more important than the others
+        { 3, 0, 0, false, 1, 100, 100, false, 2, 100, 100, false},
+
+        // test that the local RS is chosen first even if it is a bit late
+        { 4, 1, 1, true, 4, 2, 3, false, 4, 2, 3, false},
+
+        // test that the local RS is not chosen first when it is very late
+        { 4, 1000, 1000, false, 4, 2, 3, true, 4, 2, 1000, true},
+
+        // test that the local RS is not chosen first when it is missing
+        // local changes
+        { 4, 1, 1, false, 3, 2, 3, true, 1, 1, 1, false},
+
+        // test that the local RS is not chosen first when it is missing
+        // more local changes than another RS
+        { 4, 1, 1, false, 2, 2, 3, true, 1, 1, 1, false},
+
+        // test that the local RS is chosen first when it is missing
+        // the same local changes as the other RS
+        { 3, 1, 1, true, 3, 1, 1, false, 3, 1, 1, false},
+        };
+  }
+
+  /**
+   * Test with 3 replication servers (see data provider)
+   */
+  @Test(dataProvider =  "create3ServersData")
+  public void test3Servers(
+      long winnerT1, long winnerT2, long winnerT3, boolean winnerIsLocal,
+      long looser1T1, long looser1T2, long looser1T3, boolean looser1IsLocal,
+      long looser2T1, long looser2T2, long looser2T3, boolean looser2IsLocal)
+      throws Exception
+  {
+    String testCase = "test3ServersLate";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    int myId1 = 1;
+    int myId2 = 2;
+    int myId3 = 3;
+
+    // definitions for server names
+    final String WINNER  = "localhost:123";
+    final String LOOSER1 = "localhost:456";
+    final String LOOSER2 = "localhost:789";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers info list
+    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(looser1T1, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(looser1T2, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(looser1T3, 0, myId3);
+    aState.update(cn);
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
+    if (looser1IsLocal)
+      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
+
+    // State for server 2
+    aState = new ServerState();
+    cn = new ChangeNumber(winnerT1, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(winnerT2, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(winnerT3, 0, myId3);
+    aState.update(cn);
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
+    if (winnerIsLocal)
+      ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
+
+    // State for server 3
+    aState = new ServerState();
+    cn = new ChangeNumber(looser2T1, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(looser2T2, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(looser2T3, 0, myId3);
+    aState.update(cn);
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
+    if (looser2IsLocal)
+      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
+
+    ReplicationServer.onlyForTestsClearLocalReplicationServerList();
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 9e975fd..ef39c00 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -165,6 +165,7 @@
    * Returns a bunch of single values for fractional-exclude configuration
    * attribute
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testExcludePrecommitProvider")
   private Object[][] testExcludePrecommitProvider()
   {
@@ -178,6 +179,7 @@
    * Returns a bunch of single values for fractional-exclude configuration
    * attribute
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testExcludeNightlyProvider")
   private Object[][] testExcludeNightlyProvider()
   {
@@ -305,6 +307,7 @@
    * Returns a bunch of single values for fractional-include configuration
    * attribute
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testIncludePrecommitProvider")
   private Object[][] testIncludePrecommitProvider()
   {
@@ -318,6 +321,7 @@
    * Returns a bunch of single values for fractional-include configuration
    * attribute
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testIncludeNightlyProvider")
   private Object[][] testIncludeNightlyProvider()
   {
@@ -1077,6 +1081,7 @@
    * Returns a bunch of single values for fractional configuration
    * attributes
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testInitWithFullUpdateExcludePrecommitProvider")
   private Object[][] testInitWithFullUpdateExcludePrecommitProvider()
   {
@@ -1090,6 +1095,7 @@
    * Returns a bunch of single values for fractional configuration
    * attributes
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testInitWithFullUpdateExcludeNightlyProvider")
   private Object[][] testInitWithFullUpdateExcludeNightlyProvider()
   {
@@ -1319,6 +1325,7 @@
    * Returns a bunch of single values for fractional configuration
    * attributes
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testInitWithFullUpdateIncludePrecommitProvider")
   private Object[][] testInitWithFullUpdateIncludePrecommitProvider()
   {
@@ -1332,6 +1339,7 @@
    * Returns a bunch of single values for fractional configuration
    * attributes
    */
+  @SuppressWarnings("unused")
   @DataProvider(name = "testInitWithFullUpdateIncludeNightlyProvider")
   private Object[][] testInitWithFullUpdateIncludeNightlyProvider()
   {

--
Gitblit v1.10.0