From 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508 These changes implement a window mechanism in the sycnhronization protocol.
---
opends/src/server/org/opends/server/changelog/ServerHandler.java | 122 ++++++++++++++++++++++++++++++++--------
1 files changed, 98 insertions(+), 24 deletions(-)
diff --git a/opends/src/server/org/opends/server/changelog/ServerHandler.java b/opends/src/server/org/opends/server/changelog/ServerHandler.java
index d4cc9d6..6934954 100644
--- a/opends/src/server/org/opends/server/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/changelog/ServerHandler.java
@@ -39,6 +39,7 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
@@ -58,6 +59,7 @@
import org.opends.server.synchronization.ServerState;
import org.opends.server.synchronization.SynchronizationMessage;
import org.opends.server.synchronization.UpdateMessage;
+import org.opends.server.synchronization.WindowMessage;
import org.opends.server.util.TimeThread;
/**
@@ -71,7 +73,7 @@
private MsgQueue msgQueue = new MsgQueue();
private MsgQueue lateQueue = new MsgQueue();
private Map<ChangeNumber, AckMessageList> waitingAcks =
- new HashMap<ChangeNumber, AckMessageList>();;
+ new HashMap<ChangeNumber, AckMessageList>();
private ChangelogCache changelogCache = null;
private String serverURL;
private int outCount = 0; // number of update sent to the server
@@ -93,6 +95,12 @@
private ServerWriter writer = null;
private DN baseDn = null;
private String serverAddressURL;
+ private int rcvWindow;
+ private int rcvWindowSizeHalf;
+ private int maxRcvWindow;
+ private ServerReader reader;
+ private Semaphore sendWindow;
+ private int sendWindowSize;
private static Map<ChangeNumber, ChangelogAckMessageList>
changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -116,20 +124,29 @@
*
* @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
* null if this is an incoming connection.
+ * @param changelogId The identifier of the changelog that creates this
+ * server handler.
+ * @param changelogURL The URL of the changelog that creates this
+ * server handler.
+ * @param windowSize the window size that this server handler must use.
+ * @param changelog the Changelog that created this server handler.
*/
- public void start(DN baseDn)
+ public void start(DN baseDn, short changelogId, String changelogURL,
+ int windowSize, Changelog changelog)
{
+ rcvWindowSizeHalf = windowSize/2;
+ maxRcvWindow = windowSize;
+ rcvWindow = windowSize;
try
{
if (baseDn != null)
{
this.baseDn = baseDn;
- changelogCache = Changelog.getChangelogCache(baseDn);
+ changelogCache = changelog.getChangelogCache(baseDn);
ServerState localServerState = changelogCache.getDbServerState();
ChangelogStartMessage msg =
- new ChangelogStartMessage(Changelog.getServerId(),
- Changelog.getServerURL(),
- baseDn, localServerState);
+ new ChangelogStartMessage(changelogId, changelogURL,
+ baseDn, windowSize, localServerState);
session.publish(msg);
}
@@ -175,16 +192,15 @@
restartSendDelay = 0;
serverIsLDAPserver = true;
- changelogCache = Changelog.getChangelogCache(this.baseDn);
+ changelogCache = changelog.getChangelogCache(this.baseDn);
ServerState localServerState = changelogCache.getDbServerState();
ChangelogStartMessage myStartMsg =
- new ChangelogStartMessage(Changelog.getServerId(),
- Changelog.getServerURL(),
- this.baseDn, localServerState);
+ new ChangelogStartMessage(changelogId, changelogURL,
+ this.baseDn, windowSize, localServerState);
session.publish(myStartMsg);
+ sendWindowSize = receivedMsg.getWindowSize();
}
- else if (msg.getClass() == Class.forName(
- "org.opends.server.synchronization.ChangelogStartMessage"))
+ else if (msg instanceof ChangelogStartMessage)
{
ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
serverId = receivedMsg.getServerId();
@@ -195,17 +211,17 @@
this.baseDn = receivedMsg.getBaseDn();
if (baseDn == null)
{
- changelogCache = Changelog.getChangelogCache(this.baseDn);
+ changelogCache = changelog.getChangelogCache(this.baseDn);
ServerState serverState = changelogCache.getDbServerState();
ChangelogStartMessage outMsg =
- new ChangelogStartMessage(Changelog.getServerId(),
- Changelog.getServerURL(),
- this.baseDn, serverState);
+ new ChangelogStartMessage(changelogId, changelogURL,
+ this.baseDn, windowSize, serverState);
session.publish(outMsg);
}
else
this.baseDn = baseDn;
this.serverState = receivedMsg.getServerState();
+ sendWindowSize = receivedMsg.getWindowSize();
}
else
{
@@ -213,7 +229,7 @@
return; // we did not recognize the message, ignore it
}
- changelogCache = Changelog.getChangelogCache(this.baseDn);
+ changelogCache = changelog.getChangelogCache(this.baseDn);
if (serverIsLDAPserver)
{
@@ -226,7 +242,7 @@
writer = new ServerWriter(session, serverId, this, changelogCache);
- ServerReader reader = new ServerReader(session, serverId, this,
+ reader = new ServerReader(session, serverId, this,
changelogCache);
reader.start();
@@ -251,7 +267,7 @@
// ignore
}
}
-
+ sendWindow = new Semaphore(sendWindowSize);
}
/**
@@ -576,6 +592,30 @@
*/
public UpdateMessage take()
{
+ boolean interrupted = true;
+ UpdateMessage msg = getnextMessage();
+ do {
+ try
+ {
+ sendWindow.acquire();
+ interrupted = false;
+ } catch (InterruptedException e)
+ {
+ // loop until not interrupted
+ }
+ } while (interrupted);
+ this.incrementOutCount();
+ return msg;
+ }
+
+ /**
+ * Get the next update that must be sent to the server
+ * from the message queue or from the database.
+ *
+ * @return The next update that must be sent to the server.
+ */
+ private UpdateMessage getnextMessage()
+ {
UpdateMessage msg;
do
{
@@ -668,7 +708,6 @@
msg1 = msgQueue.removeFirst();
} while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
this.updateServerState(msg);
- this.incrementOutCount();
return msg;
}
}
@@ -679,7 +718,6 @@
/* get the next change from the lateQueue */
msg = lateQueue.removeFirst();
this.updateServerState(msg);
- this.incrementOutCount();
return msg;
}
}
@@ -707,7 +745,6 @@
* by the other server.
* Otherwise just loop to select the next message.
*/
- this.incrementOutCount();
return msg;
}
}
@@ -927,7 +964,7 @@
@Override
public String getMonitorInstanceName()
{
- String str = changelogCache.getBaseDn().toString() +
+ String str = baseDn.toString() +
" " + serverURL + " " + String.valueOf(serverId);
if (serverIsLDAPserver)
@@ -985,7 +1022,7 @@
attributes.add(new Attribute("server-id",
String.valueOf(serverId)));
attributes.add(new Attribute("base-dn",
- changelogCache.getBaseDn().toString()));
+ baseDn.toString()));
attributes.add(new Attribute("waiting-changes",
String.valueOf(getRcvMsgQueueSize())));
attributes.add(new Attribute("update-waiting-acks",
@@ -999,6 +1036,14 @@
String.valueOf(getInAckCount())));
attributes.add(new Attribute("approximate-delay",
String.valueOf(getApproxDelay())));
+ attributes.add(new Attribute("max-send-window",
+ String.valueOf(sendWindowSize)));
+ attributes.add(new Attribute("current-send-window",
+ String.valueOf(sendWindow.availablePermits())));
+ attributes.add(new Attribute("max-rcv-window",
+ String.valueOf(maxRcvWindow)));
+ attributes.add(new Attribute("current-rcv-window",
+ String.valueOf(rcvWindow)));
long olderUpdateTime = getOlderUpdateTime();
if (olderUpdateTime != 0)
{
@@ -1058,4 +1103,33 @@
return localString;
}
+
+ /**
+ * Check the protocol window and send WindowMessage if necessary.
+ *
+ * @throws IOException when the session becomes unavailable.
+ */
+ public synchronized void checkWindow() throws IOException
+ {
+ rcvWindow--;
+ if (rcvWindow < rcvWindowSizeHalf)
+ {
+ WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+ session.publish(msg);
+ outAckCount++;
+ rcvWindow += rcvWindowSizeHalf;
+ }
+ }
+
+ /**
+ * Update the send window size based on the credit specified in the
+ * given window message.
+ *
+ * @param windowMsg The Window Message containing the information
+ * necessary for updating the window size.
+ */
+ public void updateWindow(WindowMessage windowMsg)
+ {
+ sendWindow.release(windowMsg.getNumAck());
+ }
}
--
Gitblit v1.10.0