From 988ec833bcc9d5dcca7ea59611102a39f08b025c Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508  These changes implement a window mechanism in the sycnhronization protocol.

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java |  101 ++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index c9a0605..2778e34 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -34,6 +34,7 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -78,6 +79,12 @@
   private int maxReceiveDelay;
   private int maxSendQueue;
   private int maxReceiveQueue;
+  private Semaphore sendWindow;
+  private int maxSendWindow;
+  private int rcvWindow;
+  private int halfRcvWindow;
+  private int maxRcvWindow;
+  private int timeout = 0;
 
   /**
    * Creates a new Changelog Broker for a particular SynchronizationDomain.
@@ -95,10 +102,11 @@
    * @param maxSendQueue The maximum size of the send queue to use on
    *                     the changelog server.
    * @param maxSendDelay The maximum send delay to use on the changelog server.
+   * @param window The size of the send and receive window to use.
    */
   public ChangelogBroker(ServerState state, DN baseDn, short serverID,
       int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
-      int maxSendDelay )
+      int maxSendDelay, int window)
   {
     this.baseDn = baseDn;
     this.serverID = serverID;
@@ -109,6 +117,9 @@
     this.state = state;
     replayOperations =
       new TreeSet<FakeOperation>(new FakeOperationComparator());
+    this.rcvWindow = window;
+    this.maxRcvWindow = window;
+    this.halfRcvWindow = window/2;
   }
 
   /**
@@ -165,6 +176,7 @@
           InetSocketAddress ServerAddr = new InetSocketAddress(
               InetAddress.getByName(hostname), Integer.parseInt(port));
           Socket socket = new Socket();
+          socket.setReceiveBufferSize(1000000);
           socket.connect(ServerAddr, 500);
           session = new SocketSession(socket);
 
@@ -173,7 +185,7 @@
            */
           ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
               maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-              state);
+              halfRcvWindow*2, state);
           session.publish(msg);
 
 
@@ -182,7 +194,7 @@
            */
           session.setSoTimeout(1000);
           startMsg = (ChangelogStartMessage) session.receive();
-          session.setSoTimeout(0);
+          session.setSoTimeout(timeout);
 
           /*
            * We must not publish changes to a changelog that has not
@@ -202,6 +214,8 @@
               (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
           {
             changelogServer = ServerAddr.toString();
+            maxSendWindow = startMsg.getWindowSize();
+            this.sendWindow = new Semaphore(maxSendWindow);
             connected = true;
             break;
           }
@@ -254,6 +268,8 @@
               else
               {
                 changelogServer = ServerAddr.toString();
+                maxSendWindow = startMsg.getWindowSize();
+                this.sendWindow = new Semaphore(maxSendWindow);
                 connected = true;
                 for (FakeOperation replayOp : replayOperations)
                 {
@@ -306,6 +322,14 @@
            * changes that this server has already processed, start again
            * the loop looking for any changelog server.
            */
+          try
+          {
+            Thread.sleep(500);
+          } catch (InterruptedException e)
+          {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
           checkState = false;
           int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
           String message = getMessage(msgID);
@@ -393,13 +417,18 @@
         {
           if (this.connected == false)
             this.reStart(failingSession);
-
+          if (msg instanceof UpdateMessage)
+            sendWindow.acquire();
           session.publish(msg);
           done = true;
         } catch (IOException e)
         {
           this.reStart(failingSession);
         }
+        catch (InterruptedException e)
+        {
+          this.reStart(failingSession);
+        }
       }
     }
   }
@@ -418,7 +447,25 @@
       ProtocolSession failingSession = session;
       try
       {
-        return session.receive();
+        SynchronizationMessage msg = session.receive();
+        if (msg instanceof WindowMessage)
+        {
+          WindowMessage windowMsg = (WindowMessage) msg;
+          sendWindow.release(windowMsg.getNumAck());
+        }
+        else
+        {
+          if (msg instanceof UpdateMessage)
+          {
+            rcvWindow--;
+            if (rcvWindow < halfRcvWindow)
+            {
+              session.publish(new WindowMessage(halfRcvWindow));
+              rcvWindow += halfRcvWindow;
+            }
+          }
+          return msg;
+        }
       } catch (Exception e)
       {
         if (e instanceof SocketTimeoutException)
@@ -485,6 +532,7 @@
    */
   public void setSoTimeout(int timeout) throws SocketException
   {
+    this.timeout = timeout;
     session.setSoTimeout(timeout);
   }
 
@@ -532,4 +580,47 @@
   {
     // TODO to be implemented
   }
+
+  /**
+   * Get the maximum receive window size.
+   *
+   * @return The maximum receive window size.
+   */
+  public int getMaxRcvWindow()
+  {
+    return maxRcvWindow;
+  }
+
+  /**
+   * Get the current receive window size.
+   *
+   * @return The current receive window size.
+   */
+  public int getCurrentRcvWindow()
+  {
+    return rcvWindow;
+  }
+
+  /**
+   * Get the maximum send window size.
+   *
+   * @return The maximum send window size.
+   */
+  public int getMaxSendWindow()
+  {
+    return maxSendWindow;
+  }
+
+  /**
+   * Get the current send window size.
+   *
+   * @return The current send window size.
+   */
+  public int getCurrentSendWindow()
+  {
+    if (connected)
+      return sendWindow.availablePermits();
+    else
+      return 0;
+  }
 }

--
Gitblit v1.10.0