From 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508  These changes implement a window mechanism in the sycnhronization protocol.

---
 opends/src/server/org/opends/server/changelog/ServerHandler.java |  122 ++++++++++++++++++++++++++++++++--------
 1 files changed, 98 insertions(+), 24 deletions(-)

diff --git a/opends/src/server/org/opends/server/changelog/ServerHandler.java b/opends/src/server/org/opends/server/changelog/ServerHandler.java
index d4cc9d6..6934954 100644
--- a/opends/src/server/org/opends/server/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/changelog/ServerHandler.java
@@ -39,6 +39,7 @@
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
 
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigEntry;
@@ -58,6 +59,7 @@
 import org.opends.server.synchronization.ServerState;
 import org.opends.server.synchronization.SynchronizationMessage;
 import org.opends.server.synchronization.UpdateMessage;
+import org.opends.server.synchronization.WindowMessage;
 import org.opends.server.util.TimeThread;
 
 /**
@@ -71,7 +73,7 @@
   private MsgQueue msgQueue = new MsgQueue();
   private MsgQueue lateQueue = new MsgQueue();
   private Map<ChangeNumber, AckMessageList> waitingAcks  =
-          new HashMap<ChangeNumber, AckMessageList>();;
+          new HashMap<ChangeNumber, AckMessageList>();
   private ChangelogCache changelogCache = null;
   private String serverURL;
   private int outCount = 0; // number of update sent to the server
@@ -93,6 +95,12 @@
   private ServerWriter writer = null;
   private DN baseDn = null;
   private String serverAddressURL;
+  private int rcvWindow;
+  private int rcvWindowSizeHalf;
+  private int maxRcvWindow;
+  private ServerReader reader;
+  private Semaphore sendWindow;
+  private int sendWindowSize;
 
   private static Map<ChangeNumber, ChangelogAckMessageList>
    changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -116,20 +124,29 @@
    *
    * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
    *               null if this is an incoming connection.
+   * @param changelogId The identifier of the changelog that creates this
+   *                    server handler.
+   * @param changelogURL The URL of the changelog that creates this
+   *                    server handler.
+   * @param windowSize the window size that this server handler must use.
+   * @param changelog the Changelog that created this server handler.
    */
-  public void start(DN baseDn)
+  public void start(DN baseDn, short changelogId, String changelogURL,
+                    int windowSize, Changelog changelog)
   {
+    rcvWindowSizeHalf = windowSize/2;
+    maxRcvWindow = windowSize;
+    rcvWindow = windowSize;
     try
     {
       if (baseDn != null)
       {
         this.baseDn = baseDn;
-        changelogCache = Changelog.getChangelogCache(baseDn);
+        changelogCache = changelog.getChangelogCache(baseDn);
         ServerState localServerState = changelogCache.getDbServerState();
         ChangelogStartMessage msg =
-          new ChangelogStartMessage(Changelog.getServerId(),
-                                    Changelog.getServerURL(),
-                                    baseDn, localServerState);
+          new ChangelogStartMessage(changelogId, changelogURL,
+                                    baseDn, windowSize, localServerState);
 
         session.publish(msg);
       }
@@ -175,16 +192,15 @@
           restartSendDelay = 0;
         serverIsLDAPserver = true;
 
-        changelogCache = Changelog.getChangelogCache(this.baseDn);
+        changelogCache = changelog.getChangelogCache(this.baseDn);
         ServerState localServerState = changelogCache.getDbServerState();
         ChangelogStartMessage myStartMsg =
-          new ChangelogStartMessage(Changelog.getServerId(),
-                                    Changelog.getServerURL(),
-                                    this.baseDn, localServerState);
+          new ChangelogStartMessage(changelogId, changelogURL,
+                                    this.baseDn, windowSize, localServerState);
         session.publish(myStartMsg);
+        sendWindowSize = receivedMsg.getWindowSize();
       }
-      else if (msg.getClass() == Class.forName(
-      "org.opends.server.synchronization.ChangelogStartMessage"))
+      else if (msg instanceof ChangelogStartMessage)
       {
         ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
         serverId = receivedMsg.getServerId();
@@ -195,17 +211,17 @@
         this.baseDn = receivedMsg.getBaseDn();
         if (baseDn == null)
         {
-          changelogCache = Changelog.getChangelogCache(this.baseDn);
+          changelogCache = changelog.getChangelogCache(this.baseDn);
           ServerState serverState = changelogCache.getDbServerState();
           ChangelogStartMessage outMsg =
-            new ChangelogStartMessage(Changelog.getServerId(),
-                                      Changelog.getServerURL(),
-                                      this.baseDn, serverState);
+            new ChangelogStartMessage(changelogId, changelogURL,
+                                      this.baseDn, windowSize, serverState);
           session.publish(outMsg);
         }
         else
           this.baseDn = baseDn;
         this.serverState = receivedMsg.getServerState();
+        sendWindowSize = receivedMsg.getWindowSize();
       }
       else
       {
@@ -213,7 +229,7 @@
         return;   // we did not recognize the message, ignore it
       }
 
-      changelogCache = Changelog.getChangelogCache(this.baseDn);
+      changelogCache = changelog.getChangelogCache(this.baseDn);
 
       if (serverIsLDAPserver)
       {
@@ -226,7 +242,7 @@
 
       writer = new ServerWriter(session, serverId, this, changelogCache);
 
-      ServerReader reader = new ServerReader(session, serverId, this,
+      reader = new ServerReader(session, serverId, this,
                                              changelogCache);
 
       reader.start();
@@ -251,7 +267,7 @@
         // ignore
       }
     }
-
+    sendWindow = new Semaphore(sendWindowSize);
   }
 
   /**
@@ -576,6 +592,30 @@
    */
   public UpdateMessage take()
   {
+    boolean interrupted = true;
+    UpdateMessage msg = getnextMessage();
+    do {
+      try
+      {
+        sendWindow.acquire();
+        interrupted = false;
+      } catch (InterruptedException e)
+      {
+        // loop until not interrupted
+      }
+    } while (interrupted);
+    this.incrementOutCount();
+    return msg;
+  }
+
+  /**
+   * Get the next update that must be sent to the server
+   * from the message queue or from the database.
+   *
+   * @return The next update that must be sent to the server.
+   */
+  private UpdateMessage getnextMessage()
+  {
     UpdateMessage msg;
     do
     {
@@ -668,7 +708,6 @@
                   msg1 = msgQueue.removeFirst();
                 } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
                 this.updateServerState(msg);
-                this.incrementOutCount();
                 return msg;
               }
             }
@@ -679,7 +718,6 @@
           /* get the next change from the lateQueue */
           msg = lateQueue.removeFirst();
           this.updateServerState(msg);
-          this.incrementOutCount();
           return msg;
         }
       }
@@ -707,7 +745,6 @@
              * by the other server.
              * Otherwise just loop to select the next message.
              */
-            this.incrementOutCount();
             return msg;
           }
         }
@@ -927,7 +964,7 @@
   @Override
   public String getMonitorInstanceName()
   {
-    String str = changelogCache.getBaseDn().toString() +
+    String str = baseDn.toString() +
                  " " + serverURL + " " + String.valueOf(serverId);
 
     if (serverIsLDAPserver)
@@ -985,7 +1022,7 @@
     attributes.add(new Attribute("server-id",
                                  String.valueOf(serverId)));
     attributes.add(new Attribute("base-dn",
-                                 changelogCache.getBaseDn().toString()));
+                                 baseDn.toString()));
     attributes.add(new Attribute("waiting-changes",
                                  String.valueOf(getRcvMsgQueueSize())));
     attributes.add(new Attribute("update-waiting-acks",
@@ -999,6 +1036,14 @@
                                  String.valueOf(getInAckCount())));
     attributes.add(new Attribute("approximate-delay",
                                  String.valueOf(getApproxDelay())));
+    attributes.add(new Attribute("max-send-window",
+                                 String.valueOf(sendWindowSize)));
+    attributes.add(new Attribute("current-send-window",
+                                String.valueOf(sendWindow.availablePermits())));
+    attributes.add(new Attribute("max-rcv-window",
+                                 String.valueOf(maxRcvWindow)));
+    attributes.add(new Attribute("current-rcv-window",
+                                 String.valueOf(rcvWindow)));
     long olderUpdateTime = getOlderUpdateTime();
     if (olderUpdateTime != 0)
     {
@@ -1058,4 +1103,33 @@
 
     return localString;
   }
+
+  /**
+   * Check the protocol window and send WindowMessage if necessary.
+   *
+   * @throws IOException when the session becomes unavailable.
+   */
+  public synchronized void checkWindow() throws IOException
+  {
+    rcvWindow--;
+    if (rcvWindow < rcvWindowSizeHalf)
+    {
+      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+      session.publish(msg);
+      outAckCount++;
+      rcvWindow += rcvWindowSizeHalf;
+    }
+  }
+
+  /**
+   * Update the send window size based on the credit specified in the
+   * given window message.
+   *
+   * @param windowMsg The Window Message containing the information
+   *                  necessary for updating the window size.
+   */
+  public void updateWindow(WindowMessage windowMsg)
+  {
+    sendWindow.release(windowMsg.getNumAck());
+  }
 }

--
Gitblit v1.10.0