From 27024516fd64857ad7f8ae6f364b09403f6dea8d Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 06 Jan 2009 08:52:11 +0000
Subject: [PATCH] 

---
 opendj-sdk/opends/src/messages/messages/replication.properties                                                              |    3 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java       |   11 ++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |   34 ++++----
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java       |   72 +++++++++++-------
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |  112 ++++++++++++++++++++++------
 5 files changed, 160 insertions(+), 72 deletions(-)

diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index ca2ee16..fa9597f 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -358,4 +358,5 @@
 SEVERE_ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL_151=In replication server %s, \
  received a safe data assured update message with incoherent level: %s, this is \
  for domain %s. Message: %s
-
+SEVERE_ERR_RESET_GENERATION_ID_FAILED_152=The generation ID could not be \
+reset for domain %s
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 7c4df2b..1f54b7f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2008-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.service;
 
@@ -170,7 +170,7 @@
    * The ReplicationBroker that is used by this ReplicationDomain to
    * connect to the ReplicationService.
    */
-  private ReplicationBroker broker;
+  private ReplicationBroker broker = null;
 
   /**
    * This Map is used to store all outgoing assured messages in order
@@ -991,8 +991,6 @@
   {
     // The task that initiated the operation.
     Task initializeTask;
-    // The input stream for the import
-    ReplInputStream ldifImportInputStream = null;
     // The target in the case of an export
     short exportTarget = RoutableMsg.UNKNOWN_SERVER;
     // The source in the case of an import
@@ -1553,7 +1551,6 @@
     ieContext.setCounters(
         initializeMessage.getEntryCount(),
         initializeMessage.getEntryCount());
-    ieContext.ldifImportInputStream = new ReplInputStream(this);
 
     try
     {
@@ -1682,6 +1679,52 @@
   }
 
   /**
+   * Check the value of the Replication Servers generation ID.
+   *
+   * @param generationID        The expected value of the generation ID.
+   *
+   * @throws DirectoryException When the generation ID of the Replication
+   *                            Servers is not the expected value.
+   */
+  private void checkGenerationID(long generationID) throws DirectoryException
+  {
+    boolean flag = false;
+
+    for (int i = 0; i< 10; i++)
+    {
+      for (RSInfo rsInfo : getRsList())
+      {
+        if (rsInfo.getGenerationId() == generationID)
+        {
+          flag = true;
+          break;
+        }
+        else
+        {
+          try
+          {
+            Thread.sleep(i*100);
+          } catch (InterruptedException e)
+          {
+          }
+        }
+      }
+      if (flag)
+      {
+        break;
+      }
+    }
+
+    if (!flag)
+    {
+      ResultCode resultCode = ResultCode.OTHER;
+      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
+      throw new DirectoryException(
+          resultCode, message);
+    }
+  }
+
+  /**
    * Reset the Replication Log.
    * Calling this method will remove all the Replication information that
    * was kept on all the Replication Servers currently connected in the
@@ -1693,7 +1736,21 @@
    */
   public void resetReplicationLog() throws DirectoryException
   {
+    // Reset the Generation ID to -1 to clean the ReplicationServers.
     resetGenerationId((long)-1);
+
+    // check that at least one ReplicationServer did change its generation-id
+    checkGenerationID(-1);
+
+    // Reconnect to the Replication Server so that it adopt our
+    // GenerationID.
+    disableService();
+    enableService();
+
+    resetGenerationId(getGenerationID());
+
+    // check that at least one ReplicationServer did change its generation-id
+    checkGenerationID(getGenerationID());
   }
 
   /**
@@ -1715,8 +1772,7 @@
     if (!isConnected())
     {
       ResultCode resultCode = ResultCode.OTHER;
-      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
-          serviceID);
+      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
       throw new DirectoryException(
          resultCode, message);
     }
@@ -2088,26 +2144,29 @@
       Collection<String> replicationServers, int window,
       long heartbeatInterval) throws ConfigException
   {
-    /*
-     * create the broker object used to publish and receive changes
-     */
-    broker = new ReplicationBroker(
-        this, state, serviceID,
-        serverID, window,
-        getGenerationID(),
-        heartbeatInterval,
-        new ReplSessionSecurity(),
-        getGroupId());
+    if (broker == null)
+    {
+      /*
+       * create the broker object used to publish and receive changes
+       */
+      broker = new ReplicationBroker(
+          this, state, serviceID,
+          serverID, window,
+          getGenerationID(),
+          heartbeatInterval,
+          new ReplSessionSecurity(),
+          getGroupId());
 
-    broker.start(replicationServers);
+      broker.start(replicationServers);
 
-   /*
-    * Create a replication monitor object responsible for publishing
-    * monitoring information below cn=monitor.
-    */
-   monitor = new ReplicationMonitor(this);
+      /*
+       * Create a replication monitor object responsible for publishing
+       * monitoring information below cn=monitor.
+       */
+      monitor = new ReplicationMonitor(this);
 
-   DirectoryServer.registerMonitorProvider(monitor);
+      DirectoryServer.registerMonitorProvider(monitor);
+    }
   }
 
   /**
@@ -2115,9 +2174,14 @@
    * <p>
    * After this method has been called, the Replication Service will start
    * calling the {@link #processUpdate(UpdateMsg)}.
+   * <p>
+   * This method must be called once and must be called after the
+   * {@link #startPublishService(Collection, int, long)}.
+   *
    */
   public void startListenService()
   {
+    //
     // Create the listener thread
     listenerThread = new ListenerThread(this);
     listenerThread.start();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 3d4941d..e049d81 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2008-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -231,7 +231,7 @@
       fakeRs2.shutdown();
       fakeRs2 = null;
     }
-    
+
     if (fakeRs3 != null)
     {
       fakeRs3.shutdown();
@@ -400,7 +400,7 @@
       {
         port = rs3Port;
         if (testCase.equals("testSafeDataManyRealRSs"))
-        {          
+        {
           // Every 3 RSs connected together
           replServers.add("localhost:" + rs1Port);
           replServers.add("localhost:" + rs2Port);
@@ -603,7 +603,7 @@
         debugInfo("Fake DS " + getServerId() + " received update assured sd level is wrong: " + updateMsg);
         ok = false;
       }
-      
+
       if (ok)
         debugInfo("Fake DS " + getServerId() + " received update assured parameters are ok: " + updateMsg);
       else
@@ -739,7 +739,7 @@
 
     /**
      * Connect to RS
-     * Returns true if connection was made successfuly
+     * Returns true if connection was made successfully
      */
     public boolean connect()
     {
@@ -941,7 +941,7 @@
     {
       return everyUpdatesAreOk;
     }
-    
+
     public int nReceivedUpdates()
     {
       return nReceivedUpdates;
@@ -1166,7 +1166,7 @@
   {
     return new Object[][]
     {
-      
+
       { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
       { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
       { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
@@ -1329,7 +1329,7 @@
 
     if (objectArrayList.size() == 0)
     {
-      // First time we add some parameters, create first object arrays      
+      // First time we add some parameters, create first object arrays
       // Add each possible parameter as initial parameter lists
       for (Object possibleParameter : possibleParameters)
       {
@@ -1388,7 +1388,7 @@
     String testCase = "testSafeDataLevelHigh";
 
     debugInfo("Starting " + testCase);
-    
+
     assertTrue(sdLevel > 1);
     int nWishedServers = sdLevel - 1; // Number of fake RSs we want an ack from
 
@@ -1449,7 +1449,7 @@
         fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
         fakeRs3Scen);
       assertNotNull(fakeRs3);
-      
+
       // Wait for connections to be finished
       // DS must see expected numbers of fake DSs and RSs
       waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
@@ -1477,7 +1477,7 @@
         fail("No timeout is expected here");
       }
       long sendUpdateTime = System.currentTimeMillis() - startTime;
-      
+
       // Check
       sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
       checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1551,7 +1551,7 @@
         fail("No timeout is expected here");
       }
       sendUpdateTime = System.currentTimeMillis() - startTime;
-      
+
       // Check
       sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
       checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1587,8 +1587,8 @@
       {
         fail("No timeout is expected here");
       }
-      sendUpdateTime = System.currentTimeMillis() - startTime;      
-            
+      sendUpdateTime = System.currentTimeMillis() - startTime;
+
       // Check
       sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
       checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
@@ -1962,7 +1962,7 @@
       /*
        * Send an assured update using configured assured parameters
        */
-      
+
       long startTime = System.currentTimeMillis();
       AckMsg ackMsg = null;
       boolean timeout = false;
@@ -1997,7 +1997,7 @@
         assertFalse(ackMsg.hasWrongStatus());
         assertEquals(ackMsg.getFailedServers().size(), 0);
       }
-      
+
    } finally
     {
       endTest();
@@ -2054,7 +2054,7 @@
       rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
         testCase);
       assertNotNull(rs3);
-      
+
       /*
        * Start DS that will send updates
        */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 76c2cae..a274a90 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2008-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.service;
 
@@ -56,6 +56,8 @@
 
   private int exportedEntryCount;
 
+  private long generationID = 1;
+
   public FakeReplicationDomain(
       String serviceID,
       short serverID,
@@ -114,7 +116,7 @@
   @Override
   public long getGenerationID()
   {
-    return 1;
+    return generationID;
   }
 
   @Override
@@ -146,4 +148,9 @@
       queue.add(updateMsg);
     return true;
   }
+
+  public void setGenerationID(long newGenerationID)
+  {
+    generationID = newGenerationID;
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 7261219..a3cdf79 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2008-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.service;
 
@@ -59,8 +59,10 @@
   public void publishAndReceive() throws Exception
   {
     String testService = "test";
-    ReplicationServer replServer = null;
-    int replServerID = 10;
+    ReplicationServer replServer1 = null;
+    ReplicationServer replServer2 = null;
+    int replServerID1 = 10;
+    int replServerID2 = 20;
     FakeReplicationDomain domain1 = null;
     FakeReplicationDomain domain2 = null;
 
@@ -68,17 +70,33 @@
     {
       // find  a free port for the replicationServer
       ServerSocket socket = TestCaseUtils.bindFreePort();
-      int replServerPort = socket.getLocalPort();
+      int replServerPort1 = socket.getLocalPort();
       socket.close();
 
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(
-            replServerPort, "ReplicationDomainTestDb",
-            0, replServerID, 0, 100, null);
+      socket = TestCaseUtils.bindFreePort();
+      int replServerPort2 = socket.getLocalPort();
+      socket.close();
 
-      replServer = new ReplicationServer(conf);
+      TreeSet<String> replserver1 = new TreeSet<String>();
+      replserver1.add("localhost:" + replServerPort1);
+
+      TreeSet<String> replserver2 = new TreeSet<String>();
+      replserver2.add("localhost:" + replServerPort2);
+
+      ReplServerFakeConfiguration conf1 =
+        new ReplServerFakeConfiguration(
+            replServerPort1, "ReplicationDomainTestDb",
+            0, replServerID1, 0, 100, replserver2);
+
+      ReplServerFakeConfiguration conf2 =
+        new ReplServerFakeConfiguration(
+            replServerPort2, "ReplicationDomainTestDb",
+            0, replServerID2, 0, 100, replserver1);
+
+      replServer1 = new ReplicationServer(conf1);;
+      replServer2 = new ReplicationServer(conf2);
       ArrayList<String> servers = new ArrayList<String>(1);
-      servers.add("localhost:" + replServerPort);
+      servers.add("localhost:" + replServerPort1);
 
       BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
       domain1 = new FakeReplicationDomain(
@@ -99,32 +117,27 @@
       assertNotNull(rcvdMsg);
       assertEquals(test, rcvdMsg.getPayload());
 
-
       /*
        * Now test the resetReplicationLog() method.
        */
       List<RSInfo> replServers = domain1.getRsList();
 
-      // There should be one and only one server in the list.
-      assertTrue(replServers.size() == 1);
+      for (RSInfo replServerInfo : replServers)
+      {
+        // The generation Id of the remote should be 1
+        assertTrue(replServerInfo.getGenerationId() == 1);
+      }
 
-      RSInfo replServerInfo = replServers.get(0);
-
-      // The generation Id of the remote should be 1
-      assertTrue(replServerInfo.getGenerationId() == 1);
-
+      domain1.setGenerationID(2);
       domain1.resetReplicationLog();
-      Thread.sleep(1000);
 
       replServers = domain1.getRsList();
 
-      // There should be one and only one server in the list.
-      assertTrue(replServers.size() == 1);
-
-      replServerInfo = replServers.get(0);
-
-      // The generation Id of the remote should now be -1
-      assertTrue(replServerInfo.getGenerationId() == -1);
+      for (RSInfo replServerInfo : replServers)
+      {
+        // The generation Id of the remote should now be 2
+        assertTrue(replServerInfo.getGenerationId() == 2);
+      }
     }
     finally
     {
@@ -134,8 +147,11 @@
       if (domain2 != null)
         domain2.disableService();
 
-      if (replServer != null)
-        replServer.remove();
+      if (replServer1 != null)
+        replServer1.remove();
+
+      if (replServer2 != null)
+        replServer2.remove();
     }
   }
 

--
Gitblit v1.10.0