From ca243a420602b9f8b441e2d4d53b96601c756e97 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 29 May 2007 10:04:40 +0000
Subject: [PATCH] Fix for 1323 : Error message on startup with synchronization enabled
---
opends/src/server/org/opends/server/messages/ReplicationMessages.java | 18 ++++++
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 22 +++----
opends/src/server/org/opends/server/replication/server/ReplicationCache.java | 46 +++++++++++++--
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 3
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 53 +++++++++++------
5 files changed, 102 insertions(+), 40 deletions(-)
diff --git a/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 68bcf69..cd5fe07 100644
--- a/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -381,6 +381,20 @@
public static final int MSGID_READER_EXCEPTION =
CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53;
+ /**
+ * A replication server received a null messsage from
+ * another server.
+ */
+ public static final int MSGID_DUPLICATE_SERVER_ID =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 54;
+
+ /**
+ * A server disconnected from the replication server.
+ * (this is an informational message)
+ */
+ public static final int MSGID_DUPLICATE_REPLICATION_SERVER_ID =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 55;
+
/**
@@ -520,6 +534,10 @@
"The provider class does not allow the operation requested");
registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME,
"The hostname %s could not be resolved as an IP address");
+ registerMessage(MSGID_DUPLICATE_SERVER_ID,
+ "Servers %s and %s have the same ServerId : %d");
+ registerMessage(MSGID_DUPLICATE_REPLICATION_SERVER_ID,
+ "Replication Servers %s and %s have the same ServerId : %d");
registerMessage(MSGID_READER_NULL_MSG,
"Received a Null Msg from %s");
registerMessage(MSGID_READER_EXCEPTION,
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a67fdc3..94cbe62 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -405,20 +405,16 @@
* This server could not find any replicationServer
* Let's wait a little and try again.
*/
- synchronized (this)
+ checkState = false;
+ int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE, message, msgID);
+ try
{
- checkState = false;
- int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
- String message = getMessage(msgID);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
- try
- {
- this.wait(1000);
- } catch (InterruptedException e)
- {
- }
+ Thread.sleep(1000);
+ } catch (InterruptedException e)
+ {
}
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 24aa6e4..2fbc4da 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -229,20 +229,31 @@
*
* @param handler handler for the server that must be started
* @throws Exception when method has failed
+ * @return A boolean indicating if the start was successfull.
*/
- public void startServer(ServerHandler handler) throws Exception
+ public boolean startServer(ServerHandler handler) throws Exception
{
/*
* create the balanced tree that will be used to forward changes
*/
synchronized (connectedServers)
{
+ ServerHandler oldHandler = connectedServers.get(handler.getServerId());
+
if (connectedServers.containsKey(handler.getServerId()))
{
- /* TODO : handle error properly */
- throw new Exception("serverId already registered");
+ // looks like two LDAP servers have the same serverId
+ // log an error message and drop this connection.
+ int msgID = MSGID_DUPLICATE_SERVER_ID;
+ String message = getMessage(msgID, oldHandler.toString(),
+ handler.toString(), handler.getServerId());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return false;
}
connectedServers.put(handler.getServerId(), handler);
+ return true;
}
}
@@ -267,20 +278,41 @@
*
* @param handler the server ID to which we want to forward changes
* @throws Exception in case of errors
+ * @return A boolean indicating if the start was successfull.
*/
- public void startReplicationServer(ServerHandler handler) throws Exception
+ public boolean startReplicationServer(ServerHandler handler) throws Exception
{
/*
* create the balanced tree that will be used to forward changes
- * TODO throw proper exception
*/
synchronized (replicationServers)
{
- if (replicationServers.containsKey(handler.getServerId()))
+ ServerHandler oldHandler = replicationServers.get(handler.getServerId());
+ if ((oldHandler != null))
{
- throw new Exception("Replication Server Id already registered");
+ if (oldHandler.getServerAddressURL().equals(
+ handler.getServerAddressURL()))
+ {
+ // this is the same server, this means that our ServerStart messages
+ // have been sent at about the same time and 2 connections
+ // have been established.
+ // Silently drop this connection.
+ }
+ else
+ {
+ // looks like two replication servers have the same serverId
+ // log an error message and drop this connection.
+ int msgID = MSGID_DUPLICATE_REPLICATION_SERVER_ID;
+ String message = getMessage(msgID, oldHandler.getServerAddressURL(),
+ handler.getServerAddressURL(), handler.getServerId());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ return false;
}
replicationServers.put(handler.getServerId(), handler);
+ return true;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 2f70139..556e5da 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -288,7 +288,8 @@
synchronized (this)
{
/* check if we are connected every second */
- wait(1000);
+ int randomizer = (int) Math.random()*100;
+ wait(1000 + randomizer);
}
} catch (InterruptedException e)
{
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c9d2835..d308870 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -302,34 +302,49 @@
replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ boolean started;
if (serverIsLDAPserver)
{
- replicationCache.startServer(this);
+ started = replicationCache.startServer(this);
}
else
{
- replicationCache.startReplicationServer(this);
+ started = replicationCache.startReplicationServer(this);
}
- writer = new ServerWriter(session, serverId, this, replicationCache);
-
- reader = new ServerReader(session, serverId, this,
- replicationCache);
-
- reader.start();
- writer.start();
-
- // Create a thread to send heartbeat messages.
- if (heartbeatInterval > 0)
+ if (started)
{
- heartbeatThread = new HeartbeatThread("replication Heartbeat",
- session, heartbeatInterval);
- heartbeatThread.start();
+ writer = new ServerWriter(session, serverId, this, replicationCache);
+
+ reader = new ServerReader(session, serverId, this,
+ replicationCache);
+
+ reader.start();
+ writer.start();
+
+ // Create a thread to send heartbeat messages.
+ if (heartbeatInterval > 0)
+ {
+ heartbeatThread = new HeartbeatThread("replication Heartbeat",
+ session, heartbeatInterval);
+ heartbeatThread.start();
+ }
+
+
+ DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+ DirectoryServer.registerMonitorProvider(this);
}
-
-
- DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
- DirectoryServer.registerMonitorProvider(this);
+ else
+ {
+ // the connection is not valid, close it.
+ try
+ {
+ session.close();
+ } catch (IOException e1)
+ {
+ // ignore
+ }
+ }
}
catch (Exception e)
{
--
Gitblit v1.10.0