mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
06.52.2009 b891a41e31f548f0ef888cf8de090f4785715169

improvements for Issue 3640 : Generic replication Service

With these changes the ReplicationDomain.resetReplicationLog() interface
returns after the reset is fully done.
Before the caller had to sleep() for a while to avoid restarting the service
before the reset was completed.


5 files modified
164 ■■■■ changed files
opends/src/messages/messages/replication.properties 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 78 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 11 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 68 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -358,4 +358,5 @@
SEVERE_ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL_151=In replication server %s, \
 received a safe data assured update message with incoherent level: %s, this is \
 for domain %s. Message: %s
SEVERE_ERR_RESET_GENERATION_ID_FAILED_152=The generation ID could not be \
reset for domain %s
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -170,7 +170,7 @@
   * The ReplicationBroker that is used by this ReplicationDomain to
   * connect to the ReplicationService.
   */
  private ReplicationBroker broker;
  private ReplicationBroker broker = null;
  /**
   * This Map is used to store all outgoing assured messages in order
@@ -991,8 +991,6 @@
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    ReplInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMsg.UNKNOWN_SERVER;
    // The source in the case of an import
@@ -1553,7 +1551,6 @@
    ieContext.setCounters(
        initializeMessage.getEntryCount(),
        initializeMessage.getEntryCount());
    ieContext.ldifImportInputStream = new ReplInputStream(this);
    try
    {
@@ -1682,6 +1679,52 @@
  }
  /**
   * Check the value of the Replication Servers generation ID.
   *
   * @param generationID        The expected value of the generation ID.
   *
   * @throws DirectoryException When the generation ID of the Replication
   *                            Servers is not the expected value.
   */
  private void checkGenerationID(long generationID) throws DirectoryException
  {
    boolean flag = false;
    for (int i = 0; i< 10; i++)
    {
      for (RSInfo rsInfo : getRsList())
      {
        if (rsInfo.getGenerationId() == generationID)
        {
          flag = true;
          break;
        }
        else
        {
          try
          {
            Thread.sleep(i*100);
          } catch (InterruptedException e)
          {
          }
        }
      }
      if (flag)
      {
        break;
      }
    }
    if (!flag)
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
      throw new DirectoryException(
          resultCode, message);
    }
  }
  /**
   * Reset the Replication Log.
   * Calling this method will remove all the Replication information that
   * was kept on all the Replication Servers currently connected in the
@@ -1693,7 +1736,21 @@
   */
  public void resetReplicationLog() throws DirectoryException
  {
    // Reset the Generation ID to -1 to clean the ReplicationServers.
    resetGenerationId((long)-1);
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(-1);
    // Reconnect to the Replication Server so that it adopt our
    // GenerationID.
    disableService();
    enableService();
    resetGenerationId(getGenerationID());
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(getGenerationID());
  }
  /**
@@ -1715,8 +1772,7 @@
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
          serviceID);
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
      throw new DirectoryException(
         resultCode, message);
    }
@@ -2088,6 +2144,8 @@
      Collection<String> replicationServers, int window,
      long heartbeatInterval) throws ConfigException
  {
    if (broker == null)
    {
    /*
     * create the broker object used to publish and receive changes
     */
@@ -2109,15 +2167,21 @@
   DirectoryServer.registerMonitorProvider(monitor);
  }
  }
  /**
   * Starts the receiver side of the Replication Service.
   * <p>
   * After this method has been called, the Replication Service will start
   * calling the {@link #processUpdate(UpdateMsg)}.
   * <p>
   * This method must be called once and must be called after the
   * {@link #startPublishService(Collection, int, long)}.
   *
   */
  public void startListenService()
  {
    //
    // Create the listener thread
    listenerThread = new ListenerThread(this);
    listenerThread.start();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -739,7 +739,7 @@
    /**
     * Connect to RS
     * Returns true if connection was made successfuly
     * Returns true if connection was made successfully
     */
    public boolean connect()
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -56,6 +56,8 @@
  private int exportedEntryCount;
  private long generationID = 1;
  public FakeReplicationDomain(
      String serviceID,
      short serverID,
@@ -114,7 +116,7 @@
  @Override
  public long getGenerationID()
  {
    return 1;
    return generationID;
  }
  @Override
@@ -146,4 +148,9 @@
      queue.add(updateMsg);
    return true;
  }
  public void setGenerationID(long newGenerationID)
  {
    generationID = newGenerationID;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -59,8 +59,10 @@
  public void publishAndReceive() throws Exception
  {
    String testService = "test";
    ReplicationServer replServer = null;
    int replServerID = 10;
    ReplicationServer replServer1 = null;
    ReplicationServer replServer2 = null;
    int replServerID1 = 10;
    int replServerID2 = 20;
    FakeReplicationDomain domain1 = null;
    FakeReplicationDomain domain2 = null;
@@ -68,17 +70,33 @@
    {
      // find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      int replServerPort1 = socket.getLocalPort();
      socket.close();
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(
            replServerPort, "ReplicationDomainTestDb",
            0, replServerID, 0, 100, null);
      socket = TestCaseUtils.bindFreePort();
      int replServerPort2 = socket.getLocalPort();
      socket.close();
      replServer = new ReplicationServer(conf);
      TreeSet<String> replserver1 = new TreeSet<String>();
      replserver1.add("localhost:" + replServerPort1);
      TreeSet<String> replserver2 = new TreeSet<String>();
      replserver2.add("localhost:" + replServerPort2);
      ReplServerFakeConfiguration conf1 =
        new ReplServerFakeConfiguration(
            replServerPort1, "ReplicationDomainTestDb",
            0, replServerID1, 0, 100, replserver2);
      ReplServerFakeConfiguration conf2 =
        new ReplServerFakeConfiguration(
            replServerPort2, "ReplicationDomainTestDb",
            0, replServerID2, 0, 100, replserver1);
      replServer1 = new ReplicationServer(conf1);;
      replServer2 = new ReplicationServer(conf2);
      ArrayList<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      servers.add("localhost:" + replServerPort1);
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
@@ -99,32 +117,27 @@
      assertNotNull(rcvdMsg);
      assertEquals(test, rcvdMsg.getPayload());
      /*
       * Now test the resetReplicationLog() method.
       */
      List<RSInfo> replServers = domain1.getRsList();
      // There should be one and only one server in the list.
      assertTrue(replServers.size() == 1);
      RSInfo replServerInfo = replServers.get(0);
      for (RSInfo replServerInfo : replServers)
      {
      // The generation Id of the remote should be 1
      assertTrue(replServerInfo.getGenerationId() == 1);
      }
      domain1.setGenerationID(2);
      domain1.resetReplicationLog();
      Thread.sleep(1000);
      replServers = domain1.getRsList();
      // There should be one and only one server in the list.
      assertTrue(replServers.size() == 1);
      replServerInfo = replServers.get(0);
      // The generation Id of the remote should now be -1
      assertTrue(replServerInfo.getGenerationId() == -1);
      for (RSInfo replServerInfo : replServers)
      {
        // The generation Id of the remote should now be 2
        assertTrue(replServerInfo.getGenerationId() == 2);
      }
    }
    finally
    {
@@ -134,8 +147,11 @@
      if (domain2 != null)
        domain2.disableService();
      if (replServer != null)
        replServer.remove();
      if (replServer1 != null)
        replServer1.remove();
      if (replServer2 != null)
        replServer2.remove();
    }
  }