From 40cef7d36084fbe86d34cfa497628d8972c4c9e7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 29 Mar 2007 17:53:41 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java | 366 ++++++++++++++++++++++++++++++++++-----------------
1 files changed, 243 insertions(+), 123 deletions(-)
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index d7e0e2a..51f099e 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@
*/
package org.opends.server.synchronization.changelog;
+import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.loggers.Error.logError;
-
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import com.sleepycat.je.DatabaseException;
-import org.opends.server.types.DN;
-import org.opends.server.types.ErrorLogCategory;
-import org.opends.server.types.ErrorLogSeverity;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-
import java.io.IOException;
+import java.util.ArrayList;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.types.DN;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+
+import com.sleepycat.je.DatabaseException;
+
/**
* This class define an in-memory cache that will be used to store
* the messages that have been received from an LDAP server or
@@ -425,148 +430,263 @@
}
}
-
/**
- * Send back an ack to the server that sent the change.
+ * Retrieves the destination handlers for a routable message.
*
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
+ * @param msg The message to route.
+ * @param senderHandler The handler of the server that published this message.
+ * @return The list of destination handlers.
*/
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+ protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
+ ServerHandler senderHandler)
{
- short serverId = changeNumber.getServerId();
- sendAck(changeNumber, isLDAPserver, serverId);
- }
- /**
- *
- * Send back an ack to a server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
- * @param serverId The identifier of the server from which we
- * received the change..
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
- short serverId)
- {
- ServerHandler handler;
- if (isLDAPserver)
- handler = connectedServers.get(serverId);
- else
- handler = changelogServers.get(serverId);
+ List<ServerHandler> servers =
+ new ArrayList<ServerHandler>();
- // TODO : check for null handler and log error
- try
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "getDestinationServers"
+ + " msgDest:" + msg.getDestination() , 1);
+
+ if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
{
- handler.sendAck(changeNumber);
- } catch (IOException e)
- {
- /*
- * An error happened trying the send back an ack to this server.
- * Log an error and close the connection to this server.
- */
- int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
- String message = getMessage(msgID, this.toString())
- + stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- handler.shutdown();
+ // TODO Import from the "closest server" to be implemented
}
- }
-
- /**
- * Shutdown this ChangelogCache.
- */
- public void shutdown()
- {
- // Close session with other changelogs
- for (ServerHandler serverHandler : changelogServers.values())
+ else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
{
- serverHandler.shutdown();
- }
-
- // Close session with other LDAP servers
- for (ServerHandler serverHandler : connectedServers.values())
- {
- serverHandler.shutdown();
- }
-
- // Shutdown the dbHandlers
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
+ if (!senderHandler.isChangelogServer())
{
- dbHandler.shutdown();
+ // Send to all changelogServers
+ for (ServerHandler destinationHandler : changelogServers.values())
+ {
+ servers.add(destinationHandler);
+ }
}
- sourceDbHandlers.clear();
+
+ // Send to all connected LDAP servers
+ for (ServerHandler destinationHandler : connectedServers.values())
+ {
+ // Don't loop on the sender
+ if (destinationHandler == senderHandler)
+ continue;
+ servers.add(destinationHandler);
+ }
}
+ else
+ {
+ // Destination is one server
+ ServerHandler destinationHandler =
+ connectedServers.get(msg.getDestination());
+ if (destinationHandler != null)
+ {
+ servers.add(destinationHandler);
+ }
+ else
+ {
+ // the targeted server is NOT connected
+ if (senderHandler.isLDAPserver())
+ {
+ // let's forward to the other changelogs
+ servers.addAll(changelogServers.values());
+ }
+ }
+ }
+
+ return servers;
}
/**
- * Returns the ServerState describing the last change from this replica.
+ * Process an InitializeRequestMessage.
*
- * @return The ServerState describing the last change from this replica.
+ * @param msg The message received and to be processed.
+ * @param senderHandler The server handler of the server that emitted
+ * the message.
*/
- public ServerState getDbServerState()
+ public void process(RoutableMessage msg, ServerHandler senderHandler)
{
- ServerState serverState = new ServerState();
- for (DbHandler db : sourceDbHandlers.values())
- {
- serverState.update(db.getLastChange());
- }
- return serverState;
- }
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- return "ChangelogCache " + baseDn;
- }
+ List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
- /**
- * Check if some server Handler should be removed from flow control state.
- * @throws IOException If an error happened.
- */
- public void checkAllSaturation() throws IOException
- {
- for (ServerHandler handler : changelogServers.values())
+ if (servers.isEmpty())
{
- handler.checkWindow();
+ if (!(msg instanceof InitializeRequestMessage))
+ {
+ // TODO A more elaborated policy is probably needed
+ }
+ else
+ {
+ ErrorMessage errMsg = new ErrorMessage(
+ msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+ "serverID:" + msg.getDestination());
+
+ try
+ {
+ senderHandler.send(errMsg);
+ }
+ catch(IOException ioe)
+ {
+ // TODO Handle error properly (sender timeout in addition)
+ }
+ }
+ return;
}
- for (ServerHandler handler : connectedServers.values())
+ for (ServerHandler targetHandler : servers)
{
- handler.checkWindow();
+ try
+ {
+ targetHandler.send(msg);
+ }
+ catch(IOException ioe)
+ {
+ // TODO Handle error properly (sender timeout in addition)
+ }
}
+
}
- /**
- * Check if a server that was in flow control can now restart
- * sending updates.
- * @param sourceHandler The server that must be checked.
- * @return true if the server can restart sending changes.
- * false if the server can't restart sending changes.
- */
- public boolean restartAfterSaturation(ServerHandler sourceHandler)
- {
- for (ServerHandler handler : changelogServers.values())
+ /**
+ * Send back an ack to the server that sent the change.
+ *
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
+ */
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
{
- if (!handler.restartAfterSaturation(sourceHandler))
+ short serverId = changeNumber.getServerId();
+ sendAck(changeNumber, isLDAPserver, serverId);
+ }
+
+ /**
+ *
+ * Send back an ack to a server that sent the change.
+ *
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
+ * @param serverId The identifier of the server from which we
+ * received the change..
+ */
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+ short serverId)
+ {
+ ServerHandler handler;
+ if (isLDAPserver)
+ handler = connectedServers.get(serverId);
+ else
+ handler = changelogServers.get(serverId);
+
+ // TODO : check for null handler and log error
+ try
+ {
+ handler.sendAck(changeNumber);
+ } catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ handler.shutdown();
+ }
+ }
+
+ /**
+ * Shutdown this ChangelogCache.
+ */
+ public void shutdown()
+ {
+ // Close session with other changelogs
+ for (ServerHandler serverHandler : changelogServers.values())
+ {
+ serverHandler.shutdown();
+ }
+
+ // Close session with other LDAP servers
+ for (ServerHandler serverHandler : connectedServers.values())
+ {
+ serverHandler.shutdown();
+ }
+
+ // Shutdown the dbHandlers
+ synchronized (sourceDbHandlers)
+ {
+ for (DbHandler dbHandler : sourceDbHandlers.values())
+ {
+ dbHandler.shutdown();
+ }
+ sourceDbHandlers.clear();
+ }
+ }
+
+ /**
+ * Returns the ServerState describing the last change from this replica.
+ *
+ * @return The ServerState describing the last change from this replica.
+ */
+ public ServerState getDbServerState()
+ {
+ ServerState serverState = new ServerState();
+ for (DbHandler db : sourceDbHandlers.values())
+ {
+ serverState.update(db.getLastChange());
+ }
+ return serverState;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ return "ChangelogCache " + baseDn;
+ }
+
+ /**
+ * Check if some server Handler should be removed from flow control state.
+ * @throws IOException If an error happened.
+ */
+ public void checkAllSaturation() throws IOException
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ handler.checkWindow();
+ }
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ handler.checkWindow();
+ }
+ }
+
+ /**
+ * Check if a server that was in flow control can now restart
+ * sending updates.
+ * @param sourceHandler The server that must be checked.
+ * @return true if the server can restart sending changes.
+ * false if the server can't restart sending changes.
+ */
+ public boolean restartAfterSaturation(ServerHandler sourceHandler)
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
return false;
- }
+ }
- for (ServerHandler handler : connectedServers.values())
- {
- if (!handler.restartAfterSaturation(sourceHandler))
+ for (ServerHandler handler : connectedServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
return false;
+ }
+ return true;
}
- return true;
- }
}
--
Gitblit v1.10.0