From 40cef7d36084fbe86d34cfa497628d8972c4c9e7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 29 Mar 2007 17:53:41 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java |   63 ++++++++++++++++++++++++++-----
 1 files changed, 52 insertions(+), 11 deletions(-)

diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 3cee58d..a4e1ae8 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,6 +27,8 @@
 package org.opends.server.synchronization.changelog;
 
 import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.synchronization.common.LogMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -44,6 +46,18 @@
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigEntry;
 import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.ChangelogStartMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.ServerStartMessage;
+import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.synchronization.protocol.WindowMessage;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
@@ -51,17 +65,6 @@
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
 import org.opends.server.types.InitializationException;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.ChangelogStartMessage;
-import org.opends.server.synchronization.protocol.ProtocolSession;
-import org.opends.server.synchronization.protocol.ServerStartMessage;
-import org.opends.server.synchronization.protocol.SynchronizationMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.synchronization.protocol.WindowMessage;
-import org.opends.server.synchronization.protocol.HeartbeatThread;
 import org.opends.server.util.TimeThread;
 
 /**
@@ -108,6 +111,7 @@
                                        // flow controled and should
                                        // be stopped from sending messsages.
   private int saturationCount = 0;
+  private short changelogId;
 
   /**
    * The time in milliseconds between heartbeats from the synchronization
@@ -155,6 +159,7 @@
   public void start(DN baseDn, short changelogId, String changelogURL,
                     int windowSize, Changelog changelog)
   {
+    this.changelogId = changelogId;
     rcvWindowSizeHalf = windowSize/2;
     maxRcvWindow = windowSize;
     rcvWindow = windowSize;
@@ -1263,4 +1268,40 @@
   {
     return heartbeatInterval;
   }
+
+  /**
+   * Processes a routable message.
+   *
+   * @param msg The message to be processed.
+   */
+  public void process(RoutableMessage msg)
+  {
+    if (debugEnabled())
+      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.SEVERE_ERROR,
+        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
+
+    changelogCache.process(msg, this);
+  }
+
+  /**
+   * Send an InitializeRequestMessage to the server connected through this
+   * handler.
+   *
+   * @param msg The message to be processed
+   * @throws IOException when raised by the underlying session
+   */
+  public void send(RoutableMessage msg) throws IOException
+  {
+    if (debugEnabled())
+      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.SEVERE_ERROR,
+        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
+
+    session.publish(msg);
+  }
 }

--
Gitblit v1.10.0