mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
16.41.2009 422cba8af9837ba20b11a46a6e8a172e3b5a7558
Fix for 4272 : Changes not replayed initially on adde

This issue happens when a new OpenDS server is added to a topology which has been up and
running for some time and the new server is both a Replication Server and a Directory Server.

In such cases the Replication Server starts empty and is therefore very late with regards to
the other servers.
The Directory Server is initialized from an up to date DS already in the topology and is therefore
not late.

The problem is that the new DS incorrectly choose the RS that was just installed to be provided
with all the new changes in the topology.

The DS therefore as to wait for the RS to grab all the old changes before being provided with the
new change.

The fix is to change the algorithm that is used by the DS to select the RS and to give priority
to the RS that are more up to date.

Gary would like to see this in the 2.2 branch and would therefore have to be back-ported there.

I have also added unit tests for this case and other similar ones.
replica DS after init
5 files modified
265 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/ServerState.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 28 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 83 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 126 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -366,6 +366,26 @@
  }
  /**
   * Checks that the ChangeNumber given as a parameter is in this ServerState.
   *
   * @param   covered The ChangeNumber that should be checked.
   * @return  A boolean indicating if this ServerState contains the ChangeNumber
   *          given in parameter.
   */
  public boolean cover(ChangeNumber covered)
  {
    ChangeNumber change = this.list.get(covered.getServerId());
    if ((change == null) || (change.older(covered)))
    {
      return false;
    }
    else
    {
      return true;
    }
  }
  /**
   * Tests if the state is empty.
   *
   * @return True if the state is empty.
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1503,6 +1503,34 @@
  }
  /**
   * WARNING : only use this methods for tests purpose.
   *
   * Add the Replication Server given as a parameter in the list
   * of local replication servers.
   *
   * @param server The server to be added.
   */
  public static void onlyForTestsAddlocalReplicationServer(String server)
  {
    int separator = server.lastIndexOf(':');
    if (separator == -1)
      return ;
    int port = Integer.parseInt(server.substring(separator + 1));
    localPorts.add(port);
  }
  /**
   * WARNING : only use this methods for tests purpose.
   *
   * Clear the list of local Replication Servers
   *
   */
  public static void onlyForTestsClearLocalReplicationServerList()
  {
    localPorts.clear();
  }
  /**
   * This method allows to check if the Replication Server given
   * as the parameter is running in the local JVM.
   *
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1375,13 +1375,14 @@
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * @param serverId2 The server id for the suffix we are working for.
   * @param localServerId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @param groupId The groupId we prefer being connected to if possible
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
    Map<String, ServerInfo> rsInfos, int localServerId,
    String baseDn, byte groupId)
  {
    /*
     * Preference is given to servers with the requested group id:
@@ -1405,11 +1406,11 @@
    if (sameGroupIdRsInfos.size() > 0)
    {
      return searchForBestReplicationServer(myState, sameGroupIdRsInfos,
        serverId2, baseDn);
        localServerId, baseDn);
    } else
    {
      return searchForBestReplicationServer(myState, rsInfos,
        serverId2, baseDn);
        localServerId, baseDn);
    }
  }
@@ -1422,12 +1423,12 @@
   * @param myState The local server state.
   * @param rsInfos The list of available replication servers and their
   *                 associated information (choice will be made among them).
   * @param serverId2 The server id for the suffix we are working for.
   * @param localServerID The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  private static String searchForBestReplicationServer(ServerState myState,
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
    Map<String, ServerInfo> rsInfos, int localServerID, String baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
@@ -1455,6 +1456,8 @@
    }
    String bestServer = null;
    boolean bestServerIsLocal = false;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
@@ -1464,19 +1467,19 @@
    /*
     * Start loop to differentiate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(localServerID);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, serverId2);
      myChangeNumber = new ChangeNumber(0, 0, localServerID);
    }
    for (String repServer : rsInfos.keySet())
    {
      ServerState rsState = rsInfos.get(repServer).getServerState();
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId2);
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(localServerID);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, serverId2);
        rsChangeNumber = new ChangeNumber(0, 0, localServerID);
      }
      // Store state in right list
@@ -1491,7 +1494,6 @@
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose either :
       * - The local one
@@ -1504,35 +1506,10 @@
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(), baseDn, Integer.toString(serverId2));
        upToDateServers.size(), baseDn, Integer.toString(localServerID));
      logError(message);
      /*
       * If there are local Replication Servers, remove all the other one
       * from the list so that we are sure that we choose a local one.
       */
      boolean localRS = false;
      for (String upServer : upToDateServers.keySet())
      {
        if (ReplicationServer.isLocalReplicationServer(upServer))
        {
          localRS = true;
          break;
        }
      }
      if (localRS)
      {
        Iterator<String> it = upToDateServers.keySet().iterator();
        while (it.hasNext())
        {
          if (!ReplicationServer.isLocalReplicationServer(it.next()))
          {
            it.remove();
          }
        }
      }
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
@@ -1570,7 +1547,7 @@
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = -1L;
        long shift = 0;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Integer> it = curState.iterator();
        while (it.hasNext())
@@ -1586,17 +1563,21 @@
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          if (tmpShift > shift)
          {
            shift = tmpShift;
          }
          shift +=tmpShift;
        }
        boolean upServerIsLocal =
          ReplicationServer.isLocalReplicationServer(upServer);
        if ((minShift < 0) // First time in loop
          || (shift < minShift))
            || ((shift < minShift) && upServerIsLocal)
            || (((bestServerIsLocal == false) && (shift < minShift)))
            || ((bestServerIsLocal == false) && (upServerIsLocal &&
                                              (shift<(minShift + 60)) ))
            || (shift+120 < minShift))
        {
          // This server is even closer to topo state
          bestServer = upServer;
          bestServerIsLocal = upServerIsLocal;
          minShift = shift;
        }
      } // For up to date servers
@@ -1616,26 +1597,32 @@
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId2);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(localServerID);
        if (ourSidCn == null)
        {
          ourSidCn = new ChangeNumber(0, 0, serverId2);
          ourSidCn = new ChangeNumber(0, 0, localServerID);
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        boolean lateServerisLocal =
          ReplicationServer.isLocalReplicationServer(lateServer);
        if ((minShift < 0) // First time in loop
          || (tmpShift < minShift))
          || ((tmpShift < minShift) && lateServerisLocal)
          || (((bestServerIsLocal == false) && (tmpShift < minShift)))
          || ((bestServerIsLocal == false) && (lateServerisLocal &&
                                            (tmpShift<(minShift + 60)) ))
          || (tmpShift+120 < minShift))
        {
          // This sever is even closer to topo state
          // This server is even closer to topo state
          bestServer = lateServer;
          bestServerIsLocal = lateServerisLocal;
          minShift = tmpShift;
        }
      } // For late servers
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -41,6 +41,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
@@ -504,7 +506,7 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(3L, 0, myId3);
    cn = new ChangeNumber(4L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
@@ -920,4 +922,126 @@
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  @DataProvider(name = "create3ServersData")
  public Object[][] create3ServersData() {
    return new Object[][] {
        // first RS is up to date, the others are late none is local
        { 4, 2, 3, false, 1, 2, 3, false, 2, 3, 4, false},
        // test that the local RS  is chosen first when all up to date
        { 4, 2, 3, true, 4, 2, 3, false, 4, 2, 3, false},
        // test that the local ServerID is more important than the others
        { 3, 0, 0, false, 1, 100, 100, false, 2, 100, 100, false},
        // test that the local RS is chosen first even if it is a bit late
        { 4, 1, 1, true, 4, 2, 3, false, 4, 2, 3, false},
        // test that the local RS is not chosen first when it is very late
        { 4, 1000, 1000, false, 4, 2, 3, true, 4, 2, 1000, true},
        // test that the local RS is not chosen first when it is missing
        // local changes
        { 4, 1, 1, false, 3, 2, 3, true, 1, 1, 1, false},
        // test that the local RS is not chosen first when it is missing
        // more local changes than another RS
        { 4, 1, 1, false, 2, 2, 3, true, 1, 1, 1, false},
        // test that the local RS is chosen first when it is missing
        // the same local changes as the other RS
        { 3, 1, 1, true, 3, 1, 1, false, 3, 1, 1, false},
        };
  }
  /**
   * Test with 3 replication servers (see data provider)
   */
  @Test(dataProvider =  "create3ServersData")
  public void test3Servers(
      long winnerT1, long winnerT2, long winnerT3, boolean winnerIsLocal,
      long looser1T1, long looser1T2, long looser1T3, boolean looser1IsLocal,
      long looser2T1, long looser2T2, long looser2T3, boolean looser2IsLocal)
      throws Exception
  {
    String testCase = "test3ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    int myId1 = 1;
    int myId2 = 2;
    int myId3 = 3;
    // definitions for server names
    final String WINNER  = "localhost:123";
    final String LOOSER1 = "localhost:456";
    final String LOOSER2 = "localhost:789";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers info list
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(looser1T1, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(looser1T2, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(looser1T3, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    if (looser1IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER1);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(winnerT1, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(winnerT2, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(winnerT3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    if (winnerIsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(WINNER);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(looser2T1, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(looser2T2, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(looser2T3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    if (looser2IsLocal)
      ReplicationServer.onlyForTestsAddlocalReplicationServer(LOOSER2);
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
    ReplicationServer.onlyForTestsClearLocalReplicationServerList();
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -165,6 +165,7 @@
   * Returns a bunch of single values for fractional-exclude configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testExcludePrecommitProvider")
  private Object[][] testExcludePrecommitProvider()
  {
@@ -178,6 +179,7 @@
   * Returns a bunch of single values for fractional-exclude configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testExcludeNightlyProvider")
  private Object[][] testExcludeNightlyProvider()
  {
@@ -305,6 +307,7 @@
   * Returns a bunch of single values for fractional-include configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testIncludePrecommitProvider")
  private Object[][] testIncludePrecommitProvider()
  {
@@ -318,6 +321,7 @@
   * Returns a bunch of single values for fractional-include configuration
   * attribute
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testIncludeNightlyProvider")
  private Object[][] testIncludeNightlyProvider()
  {
@@ -1077,6 +1081,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateExcludePrecommitProvider")
  private Object[][] testInitWithFullUpdateExcludePrecommitProvider()
  {
@@ -1090,6 +1095,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateExcludeNightlyProvider")
  private Object[][] testInitWithFullUpdateExcludeNightlyProvider()
  {
@@ -1319,6 +1325,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateIncludePrecommitProvider")
  private Object[][] testInitWithFullUpdateIncludePrecommitProvider()
  {
@@ -1332,6 +1339,7 @@
   * Returns a bunch of single values for fractional configuration
   * attributes
   */
  @SuppressWarnings("unused")
  @DataProvider(name = "testInitWithFullUpdateIncludeNightlyProvider")
  private Object[][] testInitWithFullUpdateIncludeNightlyProvider()
  {