mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
06.52.2009 b891a41e31f548f0ef888cf8de090f4785715169

improvements for Issue 3640 : Generic replication Service

With these changes the ReplicationDomain.resetReplicationLog() interface
returns after the reset is fully done.
Before the caller had to sleep() for a while to avoid restarting the service
before the reset was completed.


5 files modified
232 ■■■■■ changed files
opends/src/messages/messages/replication.properties 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 112 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 34 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 11 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 72 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -358,4 +358,5 @@
SEVERE_ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL_151=In replication server %s, \
 received a safe data assured update message with incoherent level: %s, this is \
 for domain %s. Message: %s
SEVERE_ERR_RESET_GENERATION_ID_FAILED_152=The generation ID could not be \
reset for domain %s
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -170,7 +170,7 @@
   * The ReplicationBroker that is used by this ReplicationDomain to
   * connect to the ReplicationService.
   */
  private ReplicationBroker broker;
  private ReplicationBroker broker = null;
  /**
   * This Map is used to store all outgoing assured messages in order
@@ -991,8 +991,6 @@
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    ReplInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMsg.UNKNOWN_SERVER;
    // The source in the case of an import
@@ -1553,7 +1551,6 @@
    ieContext.setCounters(
        initializeMessage.getEntryCount(),
        initializeMessage.getEntryCount());
    ieContext.ldifImportInputStream = new ReplInputStream(this);
    try
    {
@@ -1682,6 +1679,52 @@
  }
  /**
   * Check the value of the Replication Servers generation ID.
   *
   * @param generationID        The expected value of the generation ID.
   *
   * @throws DirectoryException When the generation ID of the Replication
   *                            Servers is not the expected value.
   */
  private void checkGenerationID(long generationID) throws DirectoryException
  {
    boolean flag = false;
    for (int i = 0; i< 10; i++)
    {
      for (RSInfo rsInfo : getRsList())
      {
        if (rsInfo.getGenerationId() == generationID)
        {
          flag = true;
          break;
        }
        else
        {
          try
          {
            Thread.sleep(i*100);
          } catch (InterruptedException e)
          {
          }
        }
      }
      if (flag)
      {
        break;
      }
    }
    if (!flag)
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
      throw new DirectoryException(
          resultCode, message);
    }
  }
  /**
   * Reset the Replication Log.
   * Calling this method will remove all the Replication information that
   * was kept on all the Replication Servers currently connected in the
@@ -1693,7 +1736,21 @@
   */
  public void resetReplicationLog() throws DirectoryException
  {
    // Reset the Generation ID to -1 to clean the ReplicationServers.
    resetGenerationId((long)-1);
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(-1);
    // Reconnect to the Replication Server so that it adopt our
    // GenerationID.
    disableService();
    enableService();
    resetGenerationId(getGenerationID());
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(getGenerationID());
  }
  /**
@@ -1715,8 +1772,7 @@
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
          serviceID);
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
      throw new DirectoryException(
         resultCode, message);
    }
@@ -2088,26 +2144,29 @@
      Collection<String> replicationServers, int window,
      long heartbeatInterval) throws ConfigException
  {
    /*
     * create the broker object used to publish and receive changes
     */
    broker = new ReplicationBroker(
        this, state, serviceID,
        serverID, window,
        getGenerationID(),
        heartbeatInterval,
        new ReplSessionSecurity(),
        getGroupId());
    if (broker == null)
    {
      /*
       * create the broker object used to publish and receive changes
       */
      broker = new ReplicationBroker(
          this, state, serviceID,
          serverID, window,
          getGenerationID(),
          heartbeatInterval,
          new ReplSessionSecurity(),
          getGroupId());
    broker.start(replicationServers);
      broker.start(replicationServers);
   /*
    * Create a replication monitor object responsible for publishing
    * monitoring information below cn=monitor.
    */
   monitor = new ReplicationMonitor(this);
      /*
       * Create a replication monitor object responsible for publishing
       * monitoring information below cn=monitor.
       */
      monitor = new ReplicationMonitor(this);
   DirectoryServer.registerMonitorProvider(monitor);
      DirectoryServer.registerMonitorProvider(monitor);
    }
  }
  /**
@@ -2115,9 +2174,14 @@
   * <p>
   * After this method has been called, the Replication Service will start
   * calling the {@link #processUpdate(UpdateMsg)}.
   * <p>
   * This method must be called once and must be called after the
   * {@link #startPublishService(Collection, int, long)}.
   *
   */
  public void startListenService()
  {
    //
    // Create the listener thread
    listenerThread = new ListenerThread(this);
    listenerThread.start();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -231,7 +231,7 @@
      fakeRs2.shutdown();
      fakeRs2 = null;
    }
    if (fakeRs3 != null)
    {
      fakeRs3.shutdown();
@@ -400,7 +400,7 @@
      {
        port = rs3Port;
        if (testCase.equals("testSafeDataManyRealRSs"))
        {
        {
          // Every 3 RSs connected together
          replServers.add("localhost:" + rs1Port);
          replServers.add("localhost:" + rs2Port);
@@ -603,7 +603,7 @@
        debugInfo("Fake DS " + getServerId() + " received update assured sd level is wrong: " + updateMsg);
        ok = false;
      }
      if (ok)
        debugInfo("Fake DS " + getServerId() + " received update assured parameters are ok: " + updateMsg);
      else
@@ -739,7 +739,7 @@
    /**
     * Connect to RS
     * Returns true if connection was made successfuly
     * Returns true if connection was made successfully
     */
    public boolean connect()
    {
@@ -941,7 +941,7 @@
    {
      return everyUpdatesAreOk;
    }
    public int nReceivedUpdates()
    {
      return nReceivedUpdates;
@@ -1166,7 +1166,7 @@
  {
    return new Object[][]
    {
      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
@@ -1329,7 +1329,7 @@
    if (objectArrayList.size() == 0)
    {
      // First time we add some parameters, create first object arrays
      // First time we add some parameters, create first object arrays
      // Add each possible parameter as initial parameter lists
      for (Object possibleParameter : possibleParameters)
      {
@@ -1388,7 +1388,7 @@
    String testCase = "testSafeDataLevelHigh";
    debugInfo("Starting " + testCase);
    assertTrue(sdLevel > 1);
    int nWishedServers = sdLevel - 1; // Number of fake RSs we want an ack from
@@ -1449,7 +1449,7 @@
        fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
        fakeRs3Scen);
      assertNotNull(fakeRs3);
      // Wait for connections to be finished
      // DS must see expected numbers of fake DSs and RSs
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
@@ -1477,7 +1477,7 @@
        fail("No timeout is expected here");
      }
      long sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1551,7 +1551,7 @@
        fail("No timeout is expected here");
      }
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1587,8 +1587,8 @@
      {
        fail("No timeout is expected here");
      }
      sendUpdateTime = System.currentTimeMillis() - startTime;
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1962,7 +1962,7 @@
      /*
       * Send an assured update using configured assured parameters
       */
      long startTime = System.currentTimeMillis();
      AckMsg ackMsg = null;
      boolean timeout = false;
@@ -1997,7 +1997,7 @@
        assertFalse(ackMsg.hasWrongStatus());
        assertEquals(ackMsg.getFailedServers().size(), 0);
      }
   } finally
    {
      endTest();
@@ -2054,7 +2054,7 @@
      rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase);
      assertNotNull(rs3);
      /*
       * Start DS that will send updates
       */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -56,6 +56,8 @@
  private int exportedEntryCount;
  private long generationID = 1;
  public FakeReplicationDomain(
      String serviceID,
      short serverID,
@@ -114,7 +116,7 @@
  @Override
  public long getGenerationID()
  {
    return 1;
    return generationID;
  }
  @Override
@@ -146,4 +148,9 @@
      queue.add(updateMsg);
    return true;
  }
  public void setGenerationID(long newGenerationID)
  {
    generationID = newGenerationID;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -59,8 +59,10 @@
  public void publishAndReceive() throws Exception
  {
    String testService = "test";
    ReplicationServer replServer = null;
    int replServerID = 10;
    ReplicationServer replServer1 = null;
    ReplicationServer replServer2 = null;
    int replServerID1 = 10;
    int replServerID2 = 20;
    FakeReplicationDomain domain1 = null;
    FakeReplicationDomain domain2 = null;
@@ -68,17 +70,33 @@
    {
      // find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      int replServerPort1 = socket.getLocalPort();
      socket.close();
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(
            replServerPort, "ReplicationDomainTestDb",
            0, replServerID, 0, 100, null);
      socket = TestCaseUtils.bindFreePort();
      int replServerPort2 = socket.getLocalPort();
      socket.close();
      replServer = new ReplicationServer(conf);
      TreeSet<String> replserver1 = new TreeSet<String>();
      replserver1.add("localhost:" + replServerPort1);
      TreeSet<String> replserver2 = new TreeSet<String>();
      replserver2.add("localhost:" + replServerPort2);
      ReplServerFakeConfiguration conf1 =
        new ReplServerFakeConfiguration(
            replServerPort1, "ReplicationDomainTestDb",
            0, replServerID1, 0, 100, replserver2);
      ReplServerFakeConfiguration conf2 =
        new ReplServerFakeConfiguration(
            replServerPort2, "ReplicationDomainTestDb",
            0, replServerID2, 0, 100, replserver1);
      replServer1 = new ReplicationServer(conf1);;
      replServer2 = new ReplicationServer(conf2);
      ArrayList<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      servers.add("localhost:" + replServerPort1);
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
@@ -99,32 +117,27 @@
      assertNotNull(rcvdMsg);
      assertEquals(test, rcvdMsg.getPayload());
      /*
       * Now test the resetReplicationLog() method.
       */
      List<RSInfo> replServers = domain1.getRsList();
      // There should be one and only one server in the list.
      assertTrue(replServers.size() == 1);
      for (RSInfo replServerInfo : replServers)
      {
        // The generation Id of the remote should be 1
        assertTrue(replServerInfo.getGenerationId() == 1);
      }
      RSInfo replServerInfo = replServers.get(0);
      // The generation Id of the remote should be 1
      assertTrue(replServerInfo.getGenerationId() == 1);
      domain1.setGenerationID(2);
      domain1.resetReplicationLog();
      Thread.sleep(1000);
      replServers = domain1.getRsList();
      // There should be one and only one server in the list.
      assertTrue(replServers.size() == 1);
      replServerInfo = replServers.get(0);
      // The generation Id of the remote should now be -1
      assertTrue(replServerInfo.getGenerationId() == -1);
      for (RSInfo replServerInfo : replServers)
      {
        // The generation Id of the remote should now be 2
        assertTrue(replServerInfo.getGenerationId() == 2);
      }
    }
    finally
    {
@@ -134,8 +147,11 @@
      if (domain2 != null)
        domain2.disableService();
      if (replServer != null)
        replServer.remove();
      if (replServer1 != null)
        replServer1.remove();
      if (replServer2 != null)
        replServer2.remove();
    }
  }