From ca243a420602b9f8b441e2d4d53b96601c756e97 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 29 May 2007 10:04:40 +0000
Subject: [PATCH] Fix for 1323 : Error message on startup with synchronization enabled

---
 opends/src/server/org/opends/server/messages/ReplicationMessages.java         |   18 ++++++
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java |   22 +++----
 opends/src/server/org/opends/server/replication/server/ReplicationCache.java  |   46 +++++++++++++--
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java |    3 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java     |   53 +++++++++++------
 5 files changed, 102 insertions(+), 40 deletions(-)

diff --git a/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 68bcf69..cd5fe07 100644
--- a/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -381,6 +381,20 @@
   public static final int MSGID_READER_EXCEPTION =
     CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53;
 
+  /**
+   * A replication server received a null messsage from
+   * another server.
+   */
+  public static final int MSGID_DUPLICATE_SERVER_ID =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 54;
+
+  /**
+   * A server disconnected from the replication server.
+   * (this is an informational message)
+   */
+  public static final int MSGID_DUPLICATE_REPLICATION_SERVER_ID =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 55;
+
 
 
   /**
@@ -520,6 +534,10 @@
         "The provider class does not allow the operation requested");
     registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME,
         "The hostname %s could not be resolved as an IP address");
+    registerMessage(MSGID_DUPLICATE_SERVER_ID,
+        "Servers %s and %s have the same ServerId : %d");
+    registerMessage(MSGID_DUPLICATE_REPLICATION_SERVER_ID,
+        "Replication Servers %s and %s have the same ServerId : %d");
     registerMessage(MSGID_READER_NULL_MSG,
         "Received a Null Msg from %s");
     registerMessage(MSGID_READER_EXCEPTION,
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a67fdc3..94cbe62 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -405,20 +405,16 @@
            * This server could not find any replicationServer
            * Let's wait a little and try again.
            */
-          synchronized (this)
+          checkState = false;
+          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
+          String message = getMessage(msgID);
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.NOTICE, message, msgID);
+          try
           {
-            checkState = false;
-            int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
-            String message = getMessage(msgID);
-            logError(ErrorLogCategory.SYNCHRONIZATION,
-                ErrorLogSeverity.NOTICE,
-                message, msgID);
-            try
-            {
-              this.wait(1000);
-            } catch (InterruptedException e)
-            {
-            }
+            Thread.sleep(1000);
+          } catch (InterruptedException e)
+          {
           }
         }
       }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 24aa6e4..2fbc4da 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -229,20 +229,31 @@
    *
    * @param handler handler for the server that must be started
    * @throws Exception when method has failed
+   * @return A boolean indicating if the start was successfull.
    */
-  public void startServer(ServerHandler handler) throws Exception
+  public boolean startServer(ServerHandler handler) throws Exception
   {
     /*
      * create the balanced tree that will be used to forward changes
      */
     synchronized (connectedServers)
     {
+      ServerHandler oldHandler = connectedServers.get(handler.getServerId());
+
       if (connectedServers.containsKey(handler.getServerId()))
       {
-        /* TODO : handle error properly */
-        throw new Exception("serverId already registered");
+        // looks like two LDAP servers have the same serverId
+        // log an error message and drop this connection.
+        int    msgID   = MSGID_DUPLICATE_SERVER_ID;
+        String message = getMessage(msgID, oldHandler.toString(),
+            handler.toString(), handler.getServerId());
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+                 ErrorLogSeverity.SEVERE_ERROR,
+                 message, msgID);
+        return false;
       }
       connectedServers.put(handler.getServerId(), handler);
+      return true;
     }
   }
 
@@ -267,20 +278,41 @@
    *
    * @param handler the server ID to which we want to forward changes
    * @throws Exception in case of errors
+   * @return A boolean indicating if the start was successfull.
    */
-  public void startReplicationServer(ServerHandler handler) throws Exception
+  public boolean startReplicationServer(ServerHandler handler) throws Exception
   {
     /*
      * create the balanced tree that will be used to forward changes
-     * TODO throw proper exception
      */
     synchronized (replicationServers)
     {
-      if (replicationServers.containsKey(handler.getServerId()))
+      ServerHandler oldHandler = replicationServers.get(handler.getServerId());
+      if ((oldHandler != null))
       {
-        throw new Exception("Replication Server Id already registered");
+        if (oldHandler.getServerAddressURL().equals(
+            handler.getServerAddressURL()))
+        {
+          // this is the same server, this means that our ServerStart messages
+          // have been sent at about the same time and 2 connections
+          // have been established.
+          // Silently drop this connection.
+        }
+        else
+        {
+          // looks like two replication servers have the same serverId
+          // log an error message and drop this connection.
+          int    msgID   = MSGID_DUPLICATE_REPLICATION_SERVER_ID;
+          String message = getMessage(msgID, oldHandler.getServerAddressURL(),
+                handler.getServerAddressURL(), handler.getServerId());
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+                   ErrorLogSeverity.SEVERE_ERROR,
+                   message, msgID);
+        }
+        return false;
       }
       replicationServers.put(handler.getServerId(), handler);
+      return true;
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 2f70139..556e5da 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -288,7 +288,8 @@
         synchronized (this)
         {
           /* check if we are connected every second */
-          wait(1000);
+          int randomizer = (int) Math.random()*100;
+          wait(1000 + randomizer);
         }
       } catch (InterruptedException e)
       {
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c9d2835..d308870 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -302,34 +302,49 @@
 
       replicationCache = replicationServer.getReplicationCache(this.baseDn);
 
+      boolean started;
       if (serverIsLDAPserver)
       {
-        replicationCache.startServer(this);
+        started = replicationCache.startServer(this);
       }
       else
       {
-        replicationCache.startReplicationServer(this);
+        started = replicationCache.startReplicationServer(this);
       }
 
-      writer = new ServerWriter(session, serverId, this, replicationCache);
-
-      reader = new ServerReader(session, serverId, this,
-                                             replicationCache);
-
-      reader.start();
-      writer.start();
-
-      // Create a thread to send heartbeat messages.
-      if (heartbeatInterval > 0)
+      if (started)
       {
-        heartbeatThread = new HeartbeatThread("replication Heartbeat",
-                                              session, heartbeatInterval);
-        heartbeatThread.start();
+        writer = new ServerWriter(session, serverId, this, replicationCache);
+
+        reader = new ServerReader(session, serverId, this,
+            replicationCache);
+
+        reader.start();
+        writer.start();
+
+        // Create a thread to send heartbeat messages.
+        if (heartbeatInterval > 0)
+        {
+          heartbeatThread = new HeartbeatThread("replication Heartbeat",
+              session, heartbeatInterval);
+          heartbeatThread.start();
+        }
+
+
+        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+        DirectoryServer.registerMonitorProvider(this);
       }
-
-
-      DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
-      DirectoryServer.registerMonitorProvider(this);
+      else
+      {
+        // the connection is not valid, close it.
+        try
+        {
+          session.close();
+        } catch (IOException e1)
+        {
+          // ignore
+        }
+      }
     }
     catch (Exception e)
     {

--
Gitblit v1.10.0