From a00e725a364e9eb859ca8ed2c914e6637c94cb9d Mon Sep 17 00:00:00 2001
From: neil_a_wilson <neil_a_wilson@localhost>
Date: Thu, 29 Mar 2007 21:12:30 +0000
Subject: [PATCH] Back out the commit included in revision 1539 because it does not build properly (it appears to reference classes that are not in the repository).
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java | 374 ++++++++++++++++++-----------------------------------
1 files changed, 127 insertions(+), 247 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index 51f099e..d7e0e2a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,31 +26,26 @@
*/
package org.opends.server.synchronization.changelog;
-import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.loggers.Error.logError;
+
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.InitializeRequestMessage;
-import org.opends.server.synchronization.protocol.ErrorMessage;
-import org.opends.server.synchronization.protocol.RoutableMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
+import com.sleepycat.je.DatabaseException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
-import com.sleepycat.je.DatabaseException;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* This class define an in-memory cache that will be used to store
@@ -430,263 +425,148 @@
}
}
+
/**
- * Retrieves the destination handlers for a routable message.
+ * Send back an ack to the server that sent the change.
*
- * @param msg The message to route.
- * @param senderHandler The handler of the server that published this message.
- * @return The list of destination handlers.
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
*/
- protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
- ServerHandler senderHandler)
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
{
+ short serverId = changeNumber.getServerId();
+ sendAck(changeNumber, isLDAPserver, serverId);
+ }
- List<ServerHandler> servers =
- new ArrayList<ServerHandler>();
-
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "getDestinationServers"
- + " msgDest:" + msg.getDestination() , 1);
-
- if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
- {
- // TODO Import from the "closest server" to be implemented
- }
- else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
- {
- if (!senderHandler.isChangelogServer())
- {
- // Send to all changelogServers
- for (ServerHandler destinationHandler : changelogServers.values())
- {
- servers.add(destinationHandler);
- }
- }
-
- // Send to all connected LDAP servers
- for (ServerHandler destinationHandler : connectedServers.values())
- {
- // Don't loop on the sender
- if (destinationHandler == senderHandler)
- continue;
- servers.add(destinationHandler);
- }
- }
+ /**
+ *
+ * Send back an ack to a server that sent the change.
+ *
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
+ * @param serverId The identifier of the server from which we
+ * received the change..
+ */
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+ short serverId)
+ {
+ ServerHandler handler;
+ if (isLDAPserver)
+ handler = connectedServers.get(serverId);
else
- {
- // Destination is one server
- ServerHandler destinationHandler =
- connectedServers.get(msg.getDestination());
- if (destinationHandler != null)
- {
- servers.add(destinationHandler);
- }
- else
- {
- // the targeted server is NOT connected
- if (senderHandler.isLDAPserver())
- {
- // let's forward to the other changelogs
- servers.addAll(changelogServers.values());
- }
- }
- }
+ handler = changelogServers.get(serverId);
- return servers;
+ // TODO : check for null handler and log error
+ try
+ {
+ handler.sendAck(changeNumber);
+ } catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ handler.shutdown();
+ }
}
/**
- * Process an InitializeRequestMessage.
- *
- * @param msg The message received and to be processed.
- * @param senderHandler The server handler of the server that emitted
- * the message.
+ * Shutdown this ChangelogCache.
*/
- public void process(RoutableMessage msg, ServerHandler senderHandler)
+ public void shutdown()
{
-
- List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
-
- if (servers.isEmpty())
+ // Close session with other changelogs
+ for (ServerHandler serverHandler : changelogServers.values())
{
- if (!(msg instanceof InitializeRequestMessage))
- {
- // TODO A more elaborated policy is probably needed
- }
- else
- {
- ErrorMessage errMsg = new ErrorMessage(
- msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
- "serverID:" + msg.getDestination());
-
- try
- {
- senderHandler.send(errMsg);
- }
- catch(IOException ioe)
- {
- // TODO Handle error properly (sender timeout in addition)
- }
- }
- return;
+ serverHandler.shutdown();
}
- for (ServerHandler targetHandler : servers)
+ // Close session with other LDAP servers
+ for (ServerHandler serverHandler : connectedServers.values())
{
- try
- {
- targetHandler.send(msg);
- }
- catch(IOException ioe)
- {
- // TODO Handle error properly (sender timeout in addition)
- }
+ serverHandler.shutdown();
}
+ // Shutdown the dbHandlers
+ synchronized (sourceDbHandlers)
+ {
+ for (DbHandler dbHandler : sourceDbHandlers.values())
+ {
+ dbHandler.shutdown();
+ }
+ sourceDbHandlers.clear();
+ }
}
- /**
- * Send back an ack to the server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+ /**
+ * Returns the ServerState describing the last change from this replica.
+ *
+ * @return The ServerState describing the last change from this replica.
+ */
+ public ServerState getDbServerState()
+ {
+ ServerState serverState = new ServerState();
+ for (DbHandler db : sourceDbHandlers.values())
{
- short serverId = changeNumber.getServerId();
- sendAck(changeNumber, isLDAPserver, serverId);
+ serverState.update(db.getLastChange());
+ }
+ return serverState;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ return "ChangelogCache " + baseDn;
+ }
+
+ /**
+ * Check if some server Handler should be removed from flow control state.
+ * @throws IOException If an error happened.
+ */
+ public void checkAllSaturation() throws IOException
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ handler.checkWindow();
}
- /**
- *
- * Send back an ack to a server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
- * @param serverId The identifier of the server from which we
- * received the change..
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
- short serverId)
+ for (ServerHandler handler : connectedServers.values())
{
- ServerHandler handler;
- if (isLDAPserver)
- handler = connectedServers.get(serverId);
- else
- handler = changelogServers.get(serverId);
-
- // TODO : check for null handler and log error
- try
- {
- handler.sendAck(changeNumber);
- } catch (IOException e)
- {
- /*
- * An error happened trying the send back an ack to this server.
- * Log an error and close the connection to this server.
- */
- int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
- String message = getMessage(msgID, this.toString())
- + stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- handler.shutdown();
- }
+ handler.checkWindow();
}
+ }
- /**
- * Shutdown this ChangelogCache.
- */
- public void shutdown()
+ /**
+ * Check if a server that was in flow control can now restart
+ * sending updates.
+ * @param sourceHandler The server that must be checked.
+ * @return true if the server can restart sending changes.
+ * false if the server can't restart sending changes.
+ */
+ public boolean restartAfterSaturation(ServerHandler sourceHandler)
+ {
+ for (ServerHandler handler : changelogServers.values())
{
- // Close session with other changelogs
- for (ServerHandler serverHandler : changelogServers.values())
- {
- serverHandler.shutdown();
- }
-
- // Close session with other LDAP servers
- for (ServerHandler serverHandler : connectedServers.values())
- {
- serverHandler.shutdown();
- }
-
- // Shutdown the dbHandlers
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- dbHandler.shutdown();
- }
- sourceDbHandlers.clear();
- }
- }
-
- /**
- * Returns the ServerState describing the last change from this replica.
- *
- * @return The ServerState describing the last change from this replica.
- */
- public ServerState getDbServerState()
- {
- ServerState serverState = new ServerState();
- for (DbHandler db : sourceDbHandlers.values())
- {
- serverState.update(db.getLastChange());
- }
- return serverState;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- return "ChangelogCache " + baseDn;
- }
-
- /**
- * Check if some server Handler should be removed from flow control state.
- * @throws IOException If an error happened.
- */
- public void checkAllSaturation() throws IOException
- {
- for (ServerHandler handler : changelogServers.values())
- {
- handler.checkWindow();
- }
-
- for (ServerHandler handler : connectedServers.values())
- {
- handler.checkWindow();
- }
- }
-
- /**
- * Check if a server that was in flow control can now restart
- * sending updates.
- * @param sourceHandler The server that must be checked.
- * @return true if the server can restart sending changes.
- * false if the server can't restart sending changes.
- */
- public boolean restartAfterSaturation(ServerHandler sourceHandler)
- {
- for (ServerHandler handler : changelogServers.values())
- {
- if (!handler.restartAfterSaturation(sourceHandler))
+ if (!handler.restartAfterSaturation(sourceHandler))
return false;
- }
-
- for (ServerHandler handler : connectedServers.values())
- {
- if (!handler.restartAfterSaturation(sourceHandler))
- return false;
- }
- return true;
}
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
+ return false;
+ }
+ return true;
+ }
}
--
Gitblit v1.10.0