From 988ec833bcc9d5dcca7ea59611102a39f08b025c Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508 These changes implement a window mechanism in the sycnhronization protocol.
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java | 101 ++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 96 insertions(+), 5 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index c9a0605..2778e34 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@@ -78,6 +79,12 @@
private int maxReceiveDelay;
private int maxSendQueue;
private int maxReceiveQueue;
+ private Semaphore sendWindow;
+ private int maxSendWindow;
+ private int rcvWindow;
+ private int halfRcvWindow;
+ private int maxRcvWindow;
+ private int timeout = 0;
/**
* Creates a new Changelog Broker for a particular SynchronizationDomain.
@@ -95,10 +102,11 @@
* @param maxSendQueue The maximum size of the send queue to use on
* the changelog server.
* @param maxSendDelay The maximum send delay to use on the changelog server.
+ * @param window The size of the send and receive window to use.
*/
public ChangelogBroker(ServerState state, DN baseDn, short serverID,
int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay )
+ int maxSendDelay, int window)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -109,6 +117,9 @@
this.state = state;
replayOperations =
new TreeSet<FakeOperation>(new FakeOperationComparator());
+ this.rcvWindow = window;
+ this.maxRcvWindow = window;
+ this.halfRcvWindow = window/2;
}
/**
@@ -165,6 +176,7 @@
InetSocketAddress ServerAddr = new InetSocketAddress(
InetAddress.getByName(hostname), Integer.parseInt(port));
Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
socket.connect(ServerAddr, 500);
session = new SocketSession(socket);
@@ -173,7 +185,7 @@
*/
ServerStartMessage msg = new ServerStartMessage( serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- state);
+ halfRcvWindow*2, state);
session.publish(msg);
@@ -182,7 +194,7 @@
*/
session.setSoTimeout(1000);
startMsg = (ChangelogStartMessage) session.receive();
- session.setSoTimeout(0);
+ session.setSoTimeout(timeout);
/*
* We must not publish changes to a changelog that has not
@@ -202,6 +214,8 @@
(ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
{
changelogServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ this.sendWindow = new Semaphore(maxSendWindow);
connected = true;
break;
}
@@ -254,6 +268,8 @@
else
{
changelogServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ this.sendWindow = new Semaphore(maxSendWindow);
connected = true;
for (FakeOperation replayOp : replayOperations)
{
@@ -306,6 +322,14 @@
* changes that this server has already processed, start again
* the loop looking for any changelog server.
*/
+ try
+ {
+ Thread.sleep(500);
+ } catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
checkState = false;
int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
String message = getMessage(msgID);
@@ -393,13 +417,18 @@
{
if (this.connected == false)
this.reStart(failingSession);
-
+ if (msg instanceof UpdateMessage)
+ sendWindow.acquire();
session.publish(msg);
done = true;
} catch (IOException e)
{
this.reStart(failingSession);
}
+ catch (InterruptedException e)
+ {
+ this.reStart(failingSession);
+ }
}
}
}
@@ -418,7 +447,25 @@
ProtocolSession failingSession = session;
try
{
- return session.receive();
+ SynchronizationMessage msg = session.receive();
+ if (msg instanceof WindowMessage)
+ {
+ WindowMessage windowMsg = (WindowMessage) msg;
+ sendWindow.release(windowMsg.getNumAck());
+ }
+ else
+ {
+ if (msg instanceof UpdateMessage)
+ {
+ rcvWindow--;
+ if (rcvWindow < halfRcvWindow)
+ {
+ session.publish(new WindowMessage(halfRcvWindow));
+ rcvWindow += halfRcvWindow;
+ }
+ }
+ return msg;
+ }
} catch (Exception e)
{
if (e instanceof SocketTimeoutException)
@@ -485,6 +532,7 @@
*/
public void setSoTimeout(int timeout) throws SocketException
{
+ this.timeout = timeout;
session.setSoTimeout(timeout);
}
@@ -532,4 +580,47 @@
{
// TODO to be implemented
}
+
+ /**
+ * Get the maximum receive window size.
+ *
+ * @return The maximum receive window size.
+ */
+ public int getMaxRcvWindow()
+ {
+ return maxRcvWindow;
+ }
+
+ /**
+ * Get the current receive window size.
+ *
+ * @return The current receive window size.
+ */
+ public int getCurrentRcvWindow()
+ {
+ return rcvWindow;
+ }
+
+ /**
+ * Get the maximum send window size.
+ *
+ * @return The maximum send window size.
+ */
+ public int getMaxSendWindow()
+ {
+ return maxSendWindow;
+ }
+
+ /**
+ * Get the current send window size.
+ *
+ * @return The current send window size.
+ */
+ public int getCurrentSendWindow()
+ {
+ if (connected)
+ return sendWindow.availablePermits();
+ else
+ return 0;
+ }
}
--
Gitblit v1.10.0