From 1c8b422d63f419d8c85a28b1f2276ac0f3e3632c Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 12 Jul 2007 15:41:32 +0000
Subject: [PATCH] Fix for 1895 Summary: Total update does not work with 3 servers that are also replication servers
---
opends/src/server/org/opends/server/replication/server/ReplicationCache.java | 166 +++++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 130 insertions(+), 36 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 2fbc4da..24a167f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ErrorMessage;
-import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -95,6 +95,7 @@
* We add new TreeSet in the HashMap when a new replication server register
* to this replication server.
*/
+
private Map<Short, ServerHandler> replicationServers =
new ConcurrentHashMap<Short, ServerHandler>();
@@ -253,6 +254,11 @@
return false;
}
connectedServers.put(handler.getServerId(), handler);
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
+
return true;
}
}
@@ -269,7 +275,13 @@
if (handler.isReplicationServer())
replicationServers.remove(handler.getServerId());
else
+ {
connectedServers.remove(handler.getServerId());
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
+ }
}
/**
@@ -312,6 +324,12 @@
return false;
}
replicationServers.put(handler.getServerId(), handler);
+
+ // Update this server with the list of LDAP servers
+ // already connected
+ handler.sendInfo(
+ new ReplServerInfoMessage(getConnectedLDAPservers()));
+
return true;
}
}
@@ -376,6 +394,22 @@
return sourceDbHandlers.keySet();
}
+ /**
+ * Returns as a set of String the list of LDAP servers connected to us.
+ * Each string is the serverID of a connected LDAP server.
+ *
+ * @return The set of connected LDAP servers
+ */
+ public List<String> getConnectedLDAPservers()
+ {
+ List<String> mySet = new ArrayList<String>(0);
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ mySet.add(String.valueOf(handler.getServerId()));
+ }
+ return mySet;
+ }
/**
* Creates and returns an iterator.
@@ -473,15 +507,9 @@
protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
ServerHandler senderHandler)
{
-
List<ServerHandler> servers =
new ArrayList<ServerHandler>();
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "getDestinationServers"
- + " msgDest:" + msg.getDestination() , 1);
-
if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
{
// TODO Import from the "closest server" to be implemented
@@ -497,7 +525,7 @@
}
}
- // Send to all connected LDAP servers
+ // Sends to all connected LDAP servers
for (ServerHandler destinationHandler : connectedServers.values())
{
// Don't loop on the sender
@@ -518,14 +546,20 @@
else
{
// the targeted server is NOT connected
+ // Let's search for THE changelog server that MAY
+ // have the targeted server connected.
if (senderHandler.isLDAPserver())
{
- // let's forward to the other changelogs
- servers.addAll(replicationServers.values());
+ for (ServerHandler h : replicationServers.values())
+ {
+ if (h.isRemoteLDAPServer(msg.getDestination()))
+ {
+ servers.add(h);
+ }
+ }
}
}
}
-
return servers;
}
@@ -543,37 +577,53 @@
if (servers.isEmpty())
{
- if (!(msg instanceof InitializeRequestMessage))
- {
- // TODO A more elaborated policy is probably needed
- }
- else
- {
- ErrorMessage errMsg = new ErrorMessage(
- msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
- "serverID:" + msg.getDestination());
-
- try
- {
- senderHandler.send(errMsg);
- }
- catch(IOException ioe)
- {
- // TODO Handle error properly (sender timeout in addition)
- }
- }
- return;
- }
-
- for (ServerHandler targetHandler : servers)
- {
+ ErrorMessage errMsg = new ErrorMessage(
+ msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+ "serverID:" + msg.getDestination());
try
{
- targetHandler.send(msg);
+ senderHandler.send(errMsg);
}
catch(IOException ioe)
{
// TODO Handle error properly (sender timeout in addition)
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(ioe);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ senderHandler.shutdown();
+ }
+ }
+ else
+ {
+ for (ServerHandler targetHandler : servers)
+ {
+ try
+ {
+ targetHandler.send(msg);
+ }
+ catch(IOException ioe)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(ioe) + " "
+ + msg.getClass().getCanonicalName();
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ senderHandler.shutdown();
+ // TODO Handle error properly (sender timeout in addition)
+ }
}
}
@@ -722,4 +772,48 @@
}
return true;
}
+
+ /**
+ * Send a ReplServerInfoMessage to all the connected replication servers
+ * in order to let them know our connected LDAP servers.
+ */
+ private void sendReplServerInfo()
+ {
+ ReplServerInfoMessage info =
+ new ReplServerInfoMessage(getConnectedLDAPservers());
+ for (ServerHandler handler : replicationServers.values())
+ {
+ try
+ {
+ handler.sendInfo(info);
+ }
+ catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_INFO;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ handler.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Sets the replication server informations for the provided
+ * handler from the provided ReplServerInfoMessage.
+ *
+ * @param handler The server handler from which the info was received.
+ * @param infoMsg The information message that was received.
+ */
+ public void setReplServerInfo(
+ ServerHandler handler, ReplServerInfoMessage infoMsg)
+ {
+ handler.setReplServerInfo(infoMsg);
+ }
}
--
Gitblit v1.10.0