From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java |  144 ++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 122 insertions(+), 22 deletions(-)

diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 6bf5821..93193b2 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,13 +28,13 @@
 
 import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
 import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
-import org.opends.server.config.ConfigException;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -42,13 +42,14 @@
 import java.util.NoSuchElementException;
 import java.util.concurrent.locks.Lock;
 
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
-import org.opends.messages.MessageBuilder;
-import org.opends.messages.Message;
-import org.opends.messages.Category;
-import org.opends.messages.Severity;
 import org.opends.server.backends.task.TaskState;
+import org.opends.server.config.ConfigException;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperationBasis;
 import org.opends.server.core.DirectoryServer;
@@ -58,7 +59,10 @@
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.plugin.PersistentServerState;
 import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.replication.protocol.ErrorMessage;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMessage;
 import org.opends.server.schema.DirectoryStringSyntax;
 import org.opends.server.schema.IntegerSyntax;
 import org.opends.server.types.Attribute;
@@ -136,13 +140,50 @@
   }
 
   /**
-   * Open a replicationServer session to the local ReplicationServer.
+   * Retrieves the domain associated to the baseDn, and the value of the generationId
+   * of this domain. If the domain does not exist, returns the default hard-coded\
+   * value of the generationId corresponding to 'no entry'.
    *
+   * @param baseDn The baseDn for which we want the generationId
+   * @return The value of the generationId.
+   */
+  static protected long getGenerationId(DN baseDn)
+  {
+    // This is the value of the generationId computed by the server when the
+    // suffix is empty.
+    long genId = 3276850;
+    try
+    {
+      ReplicationDomain replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
+      genId = replDomain.getGenerationId();
+    }
+    catch(Exception e) {}  
+    return genId;
+  }
+
+  /**
+   * Open a replicationServer session to the local ReplicationServer.
+   * The generation is read from the replicationDomain object. If it
+   * does not exist, take the 'empty backend' generationID.
    */
   protected ReplicationBroker openReplicationSession(
       final DN baseDn, short serverId, int window_size,
       int port, int timeout, boolean emptyOldChanges)
-          throws Exception
+          throws Exception, SocketException
+  {    
+    return openReplicationSession(baseDn, serverId, window_size,
+        port, timeout, emptyOldChanges, getGenerationId(baseDn));
+  }
+  
+  /**
+   * Open a replicationServer session to the local ReplicationServer
+   * providing the generationId.
+   */
+  protected ReplicationBroker openReplicationSession(
+        final DN baseDn, short serverId, int window_size,
+        int port, int timeout, boolean emptyOldChanges,
+        long generationId)
+  throws Exception, SocketException
   {
     ServerState state;
     if (emptyOldChanges)
@@ -151,8 +192,8 @@
        state = new ServerState();
 
     ReplicationBroker broker = new ReplicationBroker(
-        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
-        getReplSessionSecurity());
+        state, baseDn, serverId, 0, 0, 0, 0, 
+        window_size, 0, generationId, getReplSessionSecurity());
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
     broker.start(servers);
@@ -170,7 +211,15 @@
       {
         while (true)
         {
-          broker.receive();
+          ReplicationMessage rMsg = broker.receive();
+          if (rMsg instanceof ErrorMessage)
+          {
+            ErrorMessage eMsg = (ErrorMessage)rMsg;
+            logError(new MessageBuilder(
+                "ReplicationTestCase/openReplicationSession ").append(
+                " received ErrorMessage when emptying old changes ").append(
+                eMsg.getDetails()).toMessage());
+          }
         }
       }
       catch (Exception e)
@@ -184,16 +233,30 @@
   }
 
   /**
+   * Open a replicationServer session to the local ReplicationServer
+   * with a default value generationId.
+   *
+   */
+  protected ReplicationBroker openReplicationSession(
+      final DN baseDn, short serverId, int window_size,
+      int port, int timeout, ServerState state)
+    throws Exception, SocketException
+  {   
+    return openReplicationSession(baseDn, serverId, window_size,
+        port, timeout, state, getGenerationId(baseDn));
+  }
+  
+  /**
    * Open a new session to the ReplicationServer
    * starting with a given ServerState.
    */
   protected ReplicationBroker openReplicationSession(
       final DN baseDn, short serverId, int window_size,
-      int port, int timeout, ServerState state)
-          throws Exception
+      int port, int timeout, ServerState state, long generationId)
+          throws Exception, SocketException
   {
     ReplicationBroker broker = new ReplicationBroker(
-        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
+        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, generationId,
         getReplSessionSecurity());
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
@@ -213,7 +276,18 @@
       final DN baseDn, short serverId, int window_size,
       int port, int timeout, int maxSendQueue, int maxRcvQueue,
       boolean emptyOldChanges)
-          throws Exception
+      throws Exception, SocketException
+  {
+    return openReplicationSession(baseDn, serverId, window_size,
+        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges, 
+        getGenerationId(baseDn));
+  }
+  
+  protected ReplicationBroker openReplicationSession(
+      final DN baseDn, short serverId, int window_size,
+        int port, int timeout, int maxSendQueue, int maxRcvQueue,
+        boolean emptyOldChanges, long generationId)
+            throws Exception, SocketException
   {
     ServerState state;
     if (emptyOldChanges)
@@ -223,7 +297,7 @@
 
     ReplicationBroker broker = new ReplicationBroker(
         state, baseDn, serverId, maxRcvQueue, 0,
-        maxSendQueue, 0, window_size, 0,
+        maxSendQueue, 0, window_size, 0, generationId,
         getReplSessionSecurity());
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
@@ -240,7 +314,15 @@
       {
         while (true)
         {
-          broker.receive();
+          ReplicationMessage rMsg = broker.receive();
+          if (rMsg instanceof ErrorMessage)
+          {
+            ErrorMessage eMsg = (ErrorMessage)rMsg;
+            logError(new MessageBuilder(
+                "ReplicationTestCase/openReplicationSession ").append(
+                " received ErrorMessage when emptying old changes ").append(
+                eMsg.getDetails()).toMessage());
+          }
         }
       }
       catch (Exception e)
@@ -264,13 +346,22 @@
       while (true)
       {
         DN dn = configEntryList.removeLast();
-             logError(Message.raw(Category.SYNC, Severity.NOTICE,
+        
+        logError(Message.raw(Category.SYNC, Severity.NOTICE,
                  "cleaning config entry " + dn));
 
         op = new DeleteOperationBasis(connection, InternalClientConnection
             .nextOperationID(), InternalClientConnection.nextMessageID(), null,
             dn);
         op.run();
+        if ((op.getResultCode() != ResultCode.SUCCESS) &&
+            (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
+        {
+          logError(Message.raw(Category.SYNC, Severity.NOTICE,
+                   "ReplicationTestCase/Cleaning config entries" +
+                   "DEL " + dn +
+                   " failed " + op.getResultCode().getResultCodeName()));
+        }
       }
     }
     catch (NoSuchElementException e) {
@@ -293,14 +384,23 @@
       while (true)
       {
         DN dn = entryList.removeLast();
-        logError(Message.raw(Category.SYNC, Severity.NOTICE,
-            "cleaning entry " + dn));
 
-        op = new DeleteOperationBasis(connection, InternalClientConnection
-            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
-            dn);
+        op = new DeleteOperationBasis(connection, 
+               InternalClientConnection.nextOperationID(), 
+               InternalClientConnection.nextMessageID(), 
+               null,
+               dn);
 
         op.run();
+        
+        if ((op.getResultCode() != ResultCode.SUCCESS) &&
+            (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
+        {
+          logError(Message.raw(Category.SYNC, Severity.NOTICE,
+                   "ReplicationTestCase/Cleaning entries" +
+                   "DEL " + dn +
+                   " failed " + op.getResultCode().getResultCodeName()));
+        }
       }
     }
     catch (NoSuchElementException e) {

--
Gitblit v1.10.0