From 66f48672bc7953e77364a9a7ae41f1e70d83534f Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 18 Sep 2008 06:40:07 +0000
Subject: [PATCH] Fix for issue 3477 : OpenDS runs out of synchronization or crashes
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 43 +++++++++++++++++++++++++++++--------------
1 files changed, 29 insertions(+), 14 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a99ad1b..f85e133 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
private int maxSendDelay = 0;
- private int maxQueueSize = 10000;
+ private int maxQueueSize = 5000;
+ private int maxQueueBytesSize = maxQueueSize * 100;
private int restartReceiveQueue;
private int restartSendQueue;
private int restartReceiveDelay;
@@ -115,8 +116,8 @@
private Semaphore sendWindow;
private int sendWindowSize;
private boolean flowControl = false; // indicate that the server is
- // flow controled and should
- // be stopped from sending messsages.
+ // flow controlled and should
+ // be stopped from sending messages.
private int saturationCount = 0;
private short replicationServerId;
@@ -165,6 +166,7 @@
super("Server Handler");
this.session = session;
this.maxQueueSize = queueSize;
+ this.maxQueueBytesSize = queueSize * 100;
this.protocolVersion = ProtocolVersion.currentVersion();
}
@@ -305,7 +307,7 @@
session.publish(myStartMsg);
sendWindowSize = receivedMsg.getWindowSize();
- /* Until here session is encrypted then it depends on the negociation */
+ /* Until here session is encrypted then it depends on the negotiation */
if (!sslEncryption)
{
session.stopEncryption();
@@ -352,7 +354,7 @@
}
else
{
- // We are an empty Replicationserver
+ // We are an empty Replication Server
if ((generationId>0)&&(!serverState.isEmpty()))
{
// If the LDAP server has already sent changes
@@ -722,7 +724,7 @@
{
synchronized (msgQueue)
{
- int size = msgQueue.size();
+ int size = msgQueue.count();
if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
return true;
@@ -764,7 +766,7 @@
{
synchronized (msgQueue)
{
- int queueSize = msgQueue.size();
+ int queueSize = msgQueue.count();
if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
return false;
if ((source != null) && (source.maxSendQueue > 0) &&
@@ -815,7 +817,7 @@
* the number of updates to be sent is the size of the receive queue.
*/
if (isFollowing())
- return msgQueue.size();
+ return msgQueue.count();
else
{
/*
@@ -1034,7 +1036,8 @@
/* TODO : size should be configurable
* and larger than max-receive-queue-size
*/
- while (msgQueue.size() > maxQueueSize)
+ while ((msgQueue.count() > maxQueueSize) ||
+ (msgQueue.bytesCount() > maxQueueBytesSize))
{
setFollowing(false);
msgQueue.removeFirst();
@@ -1164,11 +1167,13 @@
// The loop below relies on the fact that it is sorted based
// on the currentChange of each iterator to consider the next
- // change accross all servers.
+ // change across all servers.
// Hence it is necessary to remove and eventual add again an iterator
// when looping in order to keep consistent the order of the
// iterators (see ReplicationIteratorComparator.
- while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
+ while (!iteratorSortedSet.isEmpty() &&
+ (lateQueue.count()<100) &&
+ (lateQueue.bytesCount()<50000) )
{
ReplicationIterator iterator = iteratorSortedSet.first();
iteratorSortedSet.remove(iterator);
@@ -1190,7 +1195,8 @@
{
synchronized (msgQueue)
{
- if (msgQueue.size() < maxQueueSize)
+ if ((msgQueue.count() < maxQueueSize) &&
+ (msgQueue.bytesCount() < maxQueueBytesSize))
{
setFollowing(true);
}
@@ -1203,7 +1209,7 @@
{
if (msgQueue.contains(msg))
{
- /* we finally catched up with the regular queue */
+ /* we finally catch up with the regular queue */
setFollowing(true);
lateQueue.clear();
UpdateMessage msg1;
@@ -1266,7 +1272,7 @@
* Update the serverState with the last message sent.
*
* @param msg the last update sent.
- * @return boolean indicating if the update was meaningfull.
+ * @return boolean indicating if the update was meaningful.
*/
public boolean updateServerState(UpdateMessage msg)
{
@@ -1599,6 +1605,15 @@
}
}
+ attributes.add(
+ new Attribute("queue-size", String.valueOf(msgQueue.count())));
+ attributes.add(
+ new Attribute(
+ "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
+ attributes.add(
+ new Attribute(
+ "following", String.valueOf(following)));
+
// Deprecated
attributes.add(new Attribute("max-waiting-changes",
String.valueOf(maxQueueSize)));
--
Gitblit v1.10.0