From b891a41e31f548f0ef888cf8de090f4785715169 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 06 Jan 2009 08:52:11 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |  112 ++++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 88 insertions(+), 24 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 7c4df2b..1f54b7f 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2008-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.service;
 
@@ -170,7 +170,7 @@
    * The ReplicationBroker that is used by this ReplicationDomain to
    * connect to the ReplicationService.
    */
-  private ReplicationBroker broker;
+  private ReplicationBroker broker = null;
 
   /**
    * This Map is used to store all outgoing assured messages in order
@@ -991,8 +991,6 @@
   {
     // The task that initiated the operation.
     Task initializeTask;
-    // The input stream for the import
-    ReplInputStream ldifImportInputStream = null;
     // The target in the case of an export
     short exportTarget = RoutableMsg.UNKNOWN_SERVER;
     // The source in the case of an import
@@ -1553,7 +1551,6 @@
     ieContext.setCounters(
         initializeMessage.getEntryCount(),
         initializeMessage.getEntryCount());
-    ieContext.ldifImportInputStream = new ReplInputStream(this);
 
     try
     {
@@ -1682,6 +1679,52 @@
   }
 
   /**
+   * Check the value of the Replication Servers generation ID.
+   *
+   * @param generationID        The expected value of the generation ID.
+   *
+   * @throws DirectoryException When the generation ID of the Replication
+   *                            Servers is not the expected value.
+   */
+  private void checkGenerationID(long generationID) throws DirectoryException
+  {
+    boolean flag = false;
+
+    for (int i = 0; i< 10; i++)
+    {
+      for (RSInfo rsInfo : getRsList())
+      {
+        if (rsInfo.getGenerationId() == generationID)
+        {
+          flag = true;
+          break;
+        }
+        else
+        {
+          try
+          {
+            Thread.sleep(i*100);
+          } catch (InterruptedException e)
+          {
+          }
+        }
+      }
+      if (flag)
+      {
+        break;
+      }
+    }
+
+    if (!flag)
+    {
+      ResultCode resultCode = ResultCode.OTHER;
+      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
+      throw new DirectoryException(
+          resultCode, message);
+    }
+  }
+
+  /**
    * Reset the Replication Log.
    * Calling this method will remove all the Replication information that
    * was kept on all the Replication Servers currently connected in the
@@ -1693,7 +1736,21 @@
    */
   public void resetReplicationLog() throws DirectoryException
   {
+    // Reset the Generation ID to -1 to clean the ReplicationServers.
     resetGenerationId((long)-1);
+
+    // check that at least one ReplicationServer did change its generation-id
+    checkGenerationID(-1);
+
+    // Reconnect to the Replication Server so that it adopt our
+    // GenerationID.
+    disableService();
+    enableService();
+
+    resetGenerationId(getGenerationID());
+
+    // check that at least one ReplicationServer did change its generation-id
+    checkGenerationID(getGenerationID());
   }
 
   /**
@@ -1715,8 +1772,7 @@
     if (!isConnected())
     {
       ResultCode resultCode = ResultCode.OTHER;
-      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
-          serviceID);
+      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
       throw new DirectoryException(
          resultCode, message);
     }
@@ -2088,26 +2144,29 @@
       Collection<String> replicationServers, int window,
       long heartbeatInterval) throws ConfigException
   {
-    /*
-     * create the broker object used to publish and receive changes
-     */
-    broker = new ReplicationBroker(
-        this, state, serviceID,
-        serverID, window,
-        getGenerationID(),
-        heartbeatInterval,
-        new ReplSessionSecurity(),
-        getGroupId());
+    if (broker == null)
+    {
+      /*
+       * create the broker object used to publish and receive changes
+       */
+      broker = new ReplicationBroker(
+          this, state, serviceID,
+          serverID, window,
+          getGenerationID(),
+          heartbeatInterval,
+          new ReplSessionSecurity(),
+          getGroupId());
 
-    broker.start(replicationServers);
+      broker.start(replicationServers);
 
-   /*
-    * Create a replication monitor object responsible for publishing
-    * monitoring information below cn=monitor.
-    */
-   monitor = new ReplicationMonitor(this);
+      /*
+       * Create a replication monitor object responsible for publishing
+       * monitoring information below cn=monitor.
+       */
+      monitor = new ReplicationMonitor(this);
 
-   DirectoryServer.registerMonitorProvider(monitor);
+      DirectoryServer.registerMonitorProvider(monitor);
+    }
   }
 
   /**
@@ -2115,9 +2174,14 @@
    * <p>
    * After this method has been called, the Replication Service will start
    * calling the {@link #processUpdate(UpdateMsg)}.
+   * <p>
+   * This method must be called once and must be called after the
+   * {@link #startPublishService(Collection, int, long)}.
+   *
    */
   public void startListenService()
   {
+    //
     // Create the listener thread
     listenerThread = new ListenerThread(this);
     listenerThread.start();

--
Gitblit v1.10.0