mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
07.59.2009 3a32db2207756321c5dd661a64dde85e09b92dc0
Fix for  4165 : Replica can be slow to replay changes when using large window size

When checking dependencies during replay of operations on the consumer
the replication go through the list of pending changes while holding the
pending changes lock.
When using large values for the window size, the list can be large
and this can therefore cause large replication delay on the replica.

The solution is to limit the parsing to the list of changes that are older than the current change.
2 files modified
116 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java 36 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 80 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -265,6 +265,15 @@
          }
        }
      }
      else
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
      }
    }
    return hasDependencies;
  }
@@ -312,6 +321,15 @@
          }
        }
      }
      else
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
      }
    }
    return hasDependencies;
  }
@@ -389,6 +407,15 @@
          }
        }
      }
      else
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
      }
    }
    return hasDependencies;
  }
@@ -465,6 +492,15 @@
          }
        }
      }
      else
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
      }
    }
    return hasDependencies;
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -197,6 +197,86 @@
  }
  /**
   * Publish performance test.
   * The test loops calling the publish methods of the ReplicationDomain.
   * It should not be enabled by default as it will use a lot of time.
   * Its call is only to investigate performance issues with the replication.
   */
  @Test(enabled=false)
  public void publishPerf() throws Exception
  {
    String testService = "test";
    ReplicationServer replServer1 = null;
    int replServerID1 = 10;
    FakeReplicationDomain domain1 = null;
    short domain1ServerId = 1;
    try
    {
      // find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      socket.close();
      TreeSet<String> replserver = new TreeSet<String>();
      replserver.add("localhost:" + replServerPort);
      ReplServerFakeConfiguration conf1 =
        new ReplServerFakeConfiguration(
            replServerPort, "ReplicationDomainTestDb",
            0, replServerID1, 0, 100000, replserver);
      replServer1 = new ReplicationServer(conf1);;
      ArrayList<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
          testService, domain1ServerId, servers, 1000, 100000, rcvQueue1);
      /*
       * Publish a message from domain1,
       * Check that domain2 receives it shortly after.
       */
      byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5};
      long timeStart = System.nanoTime();
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      long timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
      timeStart = timeNow;
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
      timeStart = timeNow;
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
      timeStart = timeNow;
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
    }
    finally
    {
      if (domain1 != null)
        domain1.disableService();
      if (replServer1 != null)
        replServer1.remove();
    }
  }
  /**
   * Test that a ReplicationDomain is able to export and import its database.
   */
  @Test(enabled=true)