From 40cef7d36084fbe86d34cfa497628d8972c4c9e7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 29 Mar 2007 17:53:41 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 63 ++++++++++++++++++++++++++-----
1 files changed, 52 insertions(+), 11 deletions(-)
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 3cee58d..a4e1ae8 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,6 +27,8 @@
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -44,6 +46,18 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.ChangelogStartMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.ServerStartMessage;
+import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -51,17 +65,6 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.ChangelogStartMessage;
-import org.opends.server.synchronization.protocol.ProtocolSession;
-import org.opends.server.synchronization.protocol.ServerStartMessage;
-import org.opends.server.synchronization.protocol.SynchronizationMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.synchronization.protocol.WindowMessage;
-import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -108,6 +111,7 @@
// flow controled and should
// be stopped from sending messsages.
private int saturationCount = 0;
+ private short changelogId;
/**
* The time in milliseconds between heartbeats from the synchronization
@@ -155,6 +159,7 @@
public void start(DN baseDn, short changelogId, String changelogURL,
int windowSize, Changelog changelog)
{
+ this.changelogId = changelogId;
rcvWindowSizeHalf = windowSize/2;
maxRcvWindow = windowSize;
rcvWindow = windowSize;
@@ -1263,4 +1268,40 @@
{
return heartbeatInterval;
}
+
+ /**
+ * Processes a routable message.
+ *
+ * @param msg The message to be processed.
+ */
+ public void process(RoutableMessage msg)
+ {
+ if (debugEnabled())
+ debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
+
+ changelogCache.process(msg, this);
+ }
+
+ /**
+ * Send an InitializeRequestMessage to the server connected through this
+ * handler.
+ *
+ * @param msg The message to be processed
+ * @throws IOException when raised by the underlying session
+ */
+ public void send(RoutableMessage msg) throws IOException
+ {
+ if (debugEnabled())
+ debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
+
+ session.publish(msg);
+ }
}
--
Gitblit v1.10.0