From 93b35d25ca1affdaed002b9467c3a6b6a369eae4 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 03 Jan 2008 14:00:38 +0000
Subject: [PATCH]  fix for 2787 : Replication Server sessions fails when disconnecting and re-connecting   When a ReplicationBroker disconnect and reconnect quickly to a Replication Server the reconnection sometimes fails.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java                             |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                     |    8 -
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java |    6 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java                  |    9 -
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                         |   23 ++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java               |   99 ++++++++++++++++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                          |   49 ---------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java                      |    4 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java     |   69 +++++++++++--
 9 files changed, 175 insertions(+), 100 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
index 5d48ea1..d523595 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -22,16 +22,14 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.util.ServerConstants;
 
 /**
@@ -41,11 +39,6 @@
 public class ReplLDIFOutputStream
        extends OutputStream
 {
-  /**
-   * The tracer object for the debug logger.
-   */
-  private static final DebugTracer TRACER = getTracer();
-
   // The synchronization domain on which the export is done
   ReplicationDomain domain;
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 1c1b59a..34c7882 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.MessageBuilder;
@@ -114,12 +114,11 @@
    * @param baseDn the baseDn for which this DB was created.
    * @param replicationServer The ReplicationServer that creates this dbHandler.
    * @param dbenv the Database Env to use to create the ReplicationServer DB.
-   * @param generationId The generationId of the data contained in the LDAP
    * server for this domain.
    * @throws DatabaseException If a database problem happened
    */
   public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
-      ReplicationDbEnv dbenv, long generationId)
+      ReplicationDbEnv dbenv)
          throws DatabaseException
   {
     this.serverId = id;
@@ -369,7 +368,8 @@
         mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
         mb.append(stackTraceToSingleLineString(end));
         logError(mb.toMessage());
-        replicationServer.shutdown();
+        if (replicationServer != null)
+          replicationServer.shutdown();
         break;
       }
     }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index e1b7373..3df442d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.*;
@@ -257,7 +257,7 @@
               + " serverId=" + serverId);
 
           DbHandler dbHandler =
-            new DbHandler(serverId, baseDn, replicationServer, this, 1);
+            new DbHandler(serverId, baseDn, replicationServer, this);
 
           replicationServer.getReplicationServerDomain(baseDn, true).
           setDbHandler(serverId, dbHandler);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index a7d69e5..bf81c46 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import static org.opends.messages.ReplicationMessages.*;
@@ -544,16 +544,14 @@
    *
    * @param id The serverId for which the dbHandler must be created.
    * @param baseDn The DN for which the dbHandler muste be created.
-   * @param generationId The generationId for this server and this
-   *        replicationServerDomain.
    * @return The new DB handler for this ReplicationServer and the serverId and
    *         DN given in parameter.
    * @throws DatabaseException in case of underlying database problem.
    */
-  public DbHandler newDbHandler(short id, DN baseDn, long generationId)
+  public DbHandler newDbHandler(short id, DN baseDn)
   throws DatabaseException
   {
-    return new DbHandler(id, baseDn, this, dbEnv, generationId);
+    return new DbHandler(id, baseDn, this, dbEnv);
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6382ca3..d97d2ad 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -215,6 +215,11 @@
       }
     }
 
+    if (generationId < 0)
+    {
+      generationId = sourceHandler.getGenerationId();
+    }
+
     // look for the dbHandler that is responsible for the LDAP server which
     // generated the change.
     DbHandler dbHandler = null;
@@ -225,8 +230,7 @@
       {
         try
         {
-          dbHandler = replicationServer.newDbHandler(id,
-              baseDn, generationId);
+          dbHandler = replicationServer.newDbHandler(id, baseDn);
           generationIdSavedStatus = true;
         }
         catch (DatabaseException e)
@@ -277,8 +281,25 @@
       handler.add(update, sourceHandler);
     }
 
+  }
 
-
+  /**
+   * Wait a short while for ServerId disconnection.
+   *
+   * @param serverId the serverId to be checked.
+   */
+  public void waitDisconnection(short serverId)
+  {
+    if (connectedServers.containsKey(serverId))
+    {
+      // try again
+      try
+      {
+        Thread.sleep(100);
+      } catch (InterruptedException e)
+      {
+      }
+    }
   }
 
   /**
@@ -339,22 +360,27 @@
         " for " + baseDn + " " +
         " stopServer " + handler.getMonitorInstanceName());
 
-    handler.stopHandler();
 
-    if (handler.isReplicationServer())
-    {
-      replicationServers.remove(handler.getServerId());
-    }
-    else
-    {
-      connectedServers.remove(handler.getServerId());
-    }
+      if (handler.isReplicationServer())
+      {
+        if (replicationServers.containsValue(handler))
+        {
+          replicationServers.remove(handler.getServerId());
+          handler.stopHandler();
+        }
+      }
+      else
+      {
+        if (connectedServers.containsValue(handler))
+        {
+          connectedServers.remove(handler.getServerId());
+          handler.stopHandler();
+        }
+      }
 
-    mayResetGenerationId();
-
-    // Update the remote replication servers with our list
-    // of connected LDAP servers
-    sendReplServerInfo();
+      // Update the remote replication servers with our list
+      // of connected LDAP servers
+      sendReplServerInfo();
   }
 
   /**
@@ -1238,6 +1264,45 @@
       return replicationServer;
     }
 
+    /**
+     * Process reception of a ReplServerInfoMessage.
+     *
+     * @param infoMsg The received message.
+     * @param handler The handler that received the message.
+     * @throws IOException when raised by the underlying session.
+     */
+    public void receiveReplServerInfo(
+        ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException
+    {
+      if (debugEnabled())
+      {
+        if (handler.isReplicationServer())
+          TRACER.debugInfo(
+           "In RS " + getReplicationServer().getServerId() +
+           " Receiving replServerInfo from " + handler.getServerId() +
+           " baseDn=" + baseDn +
+           " genId=" + infoMsg.getGenerationId());
+      }
+
+      mayResetGenerationId();
+      if (generationId < 0)
+        generationId = handler.getGenerationId();
+      if (generationId > 0 && (generationId != infoMsg.getGenerationId()))
+      {
+        Message message = NOTE_BAD_GENERATION_ID.get(
+            baseDn.toNormalizedString(),
+            Short.toString(handler.getServerId()),
+            Long.toString(infoMsg.getGenerationId()),
+            Long.toString(generationId));
+
+        ErrorMessage errorMsg = new ErrorMessage(
+            getReplicationServer().getServerId(),
+            handler.getServerId(),
+            message);
+        handler.sendError(errorMsg);
+      }
+    }
+
     /*
      * Monitor Data generation
      */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index cd499e3..fb71b78 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -283,6 +283,10 @@
         // Get or Create the ReplicationServerDomain
         replicationServerDomain =
                 replicationServer.getReplicationServerDomain(this.baseDn, true);
+
+        replicationServerDomain.waitDisconnection(receivedMsg.getServerId());
+        replicationServerDomain.mayResetGenerationId();
+
         localGenerationId = replicationServerDomain.getGenerationId();
 
         ServerState localServerState =
@@ -1010,16 +1014,18 @@
       {
       }
     }
-    do {
+    boolean acquired = false;
+    do
+    {
       try
       {
-        sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
+        acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
         interrupted = false;
       } catch (InterruptedException e)
       {
         // loop until not interrupted
       }
-    } while ((interrupted) && (!shutdown));
+    } while (((interrupted) || (!acquired )) && (!shutdown));
     this.incrementOutCount();
     return msg;
   }
@@ -1800,6 +1806,17 @@
   }
 
   /**
+   * Send an ErrorMessage to the peer.
+   *
+   * @param errorMsg The message to be sent
+   * @throws IOException when raised by the underlying session
+   */
+  public void sendError(ErrorMessage errorMsg) throws IOException
+  {
+    session.publish(errorMsg);
+  }
+
+  /**
    * Process the reception of a WindowProbe message.
    *
    * @param  windowProbeMsg The message to process.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 9193c99..323751e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -199,54 +199,7 @@
         {
           ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
           handler.receiveReplServerInfo(infoMsg);
-
-          if (debugEnabled())
-          {
-            if (handler.isReplicationServer())
-              TRACER.debugInfo(
-               "In RS " + replicationServerDomain.getReplicationServer().
-               getServerId() +
-               " Receiving replServerInfo from " + handler.getServerId() +
-               " baseDn=" + replicationServerDomain.getBaseDn() +
-               " genId=" + infoMsg.getGenerationId());
-          }
-
-          if (replicationServerDomain.getGenerationId()<0)
-          {
-            // Here is the case where a ReplicationServer receives from
-            // another ReplicationServer the generationId for a domain
-            // for which the generation ID has never been set.
-            replicationServerDomain.
-                    setGenerationId(infoMsg.getGenerationId(),false);
-          }
-          else
-          {
-            if (infoMsg.getGenerationId()<0)
-            {
-              // Here is the case where another ReplicationServer
-              // signals that it has no generationId set for the domain.
-              // If we have generationId set locally and no server currently
-              // connected for that domain in the topology then we may also
-              // reset the generationId localy.
-              replicationServerDomain.mayResetGenerationId();
-            }
-
-            if (replicationServerDomain.getGenerationId() !=
-                    infoMsg.getGenerationId())
-            {
-              Message message = NOTE_BAD_GENERATION_ID.get(
-                  replicationServerDomain.getBaseDn().toNormalizedString(),
-                  Short.toString(handler.getServerId()),
-                  Long.toString(infoMsg.getGenerationId()),
-                  Long.toString(replicationServerDomain.getGenerationId()));
-
-              ErrorMessage errorMsg = new ErrorMessage(
-                  replicationServerDomain.getReplicationServer().getServerId(),
-                  handler.getServerId(),
-                  message);
-              session.publish(errorMsg);
-            }
-          }
+          replicationServerDomain.receiveReplServerInfo(infoMsg, handler);
         }
         else if (msg instanceof MonitorRequestMessage)
         {
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
index 5ba1a2d..7011c1f 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication;
 
@@ -721,7 +721,6 @@
       // Read generationId - should be not retrievable since no entry
       debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")");
       connectServer1ToChangelog(changelog1ID);
-      Thread.sleep(1000);
 
       debugInfo(testCase + " Expect genId attribute to be not retrievable");
       genId = readGenId();
@@ -848,13 +847,15 @@
       debugInfo("Create again replServer1");
       replServer1 = createReplicationServer(changelog1ID, false, testCase);
       debugInfo("Delay to allow DS to reconnect to replServer1");
-      Thread.sleep(200);
+      Thread.sleep(1000);
 
       long genIdAfterRestart = replServer1.getGenerationId(baseDn);
       debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
       assertTrue(replServer1!=null, "Replication server creation failed.");
       assertTrue(genIdBeforeShut == genIdAfterRestart,
-      "generationId is expected to have the same value after replServer1 restart");
+        "generationId is expected to have the same value" +
+        " after replServer1 restart. Before : " + genIdBeforeShut +
+        " after : " + genIdAfterRestart);
 
       try
       {
@@ -1072,12 +1073,6 @@
     disconnectFromReplServer(changelog1ID);
     Thread.sleep(1000);
 
-    debugInfo("Expect genId to be unset(-1) in all servers since no server is " +
-        " connected and no change ever occurred");
-    assertEquals(replServer1.getGenerationId(baseDn), -1, " in replServer1");
-    assertEquals(replServer2.getGenerationId(baseDn), -1, " in replServer2");
-    assertEquals(replServer3.getGenerationId(baseDn), -1, " in replServer3");
-
     debugInfo("Add entries to DS");
     this.addTestEntriesToDB(updatedEntries);
 
@@ -1278,6 +1273,8 @@
       debugInfo(testCase + " Expect genId attribute to be retrievable");
       genId = readGenId();
       assertEquals(genId, 3211313L);
+      
+      disconnectFromReplServer(changelog1ID);
     }
     finally
     {
@@ -1285,11 +1282,63 @@
       debugInfo("Successfully ending " + testCase);
     }
   }
+  
+  /**
+   * Loop opening sessions to the Replication Server 
+   * to check that it handle correctly deconnection and reconnection. 
+   */
+  @Test(enabled=false, groups="slow")
+  public void testLoop() throws Exception
+  {
+    String testCase = "testLoop";
+    debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled());
+    long rgenId;
+    
+    ReplicationDomain.clearJEBackend(false,
+        "userRoot",
+        baseDn.toNormalizedString());
+
+    replServer1 = createReplicationServer(changelog1ID, false, testCase);
+    replServer1.clearDb();
+
+    ReplicationBroker broker = null;
+    try 
+    {
+      for (int i=0; i< 100; i++)
+      {
+        long generationId = 1000+i;
+        broker = openReplicationSession(baseDn,
+            server2ID, 100, getChangelogPort(changelog1ID),
+            1000, !emptyOldChanges, generationId);
+
+        debugInfo(testCase + " Expect genId to be set in memory on the replication " +
+        " server side even if not wrote on disk/db since no change occurred.");
+        rgenId = replServer1.getGenerationId(baseDn);
+        if (rgenId != generationId)
+        {
+          fail("replication server failed to set generation ID");
+          replServer1.getGenerationId(baseDn);
+        }
+        broker.stop();
+        broker = null;
+      }
+    } finally
+    {
+      if (broker != null)
+        broker.stop();
+    }
+  }
+  
+  /**
+   * This is used to make sure that the 3 tests are run in the 
+   * specified order since this is necessary.
+   */
   @Test(enabled=true, groups="slow")
   public void generationIdTest() throws Exception
   {
     testSingleRS();
     testMultiRS();
     testServerStop();
+    testLoop();
   }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 46f45bb..7af4ec5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -73,7 +73,7 @@
     ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
 
     DbHandler handler =
-      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
+      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
 
     ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
     ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -153,7 +153,7 @@
     ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
 
     DbHandler handler =
-      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
+      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
 
     // Creates changes added to the dbHandler
     ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);

--
Gitblit v1.10.0