From 66f48672bc7953e77364a9a7ae41f1e70d83534f Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 18 Sep 2008 06:40:07 +0000
Subject: [PATCH] Fix for issue 3477 : OpenDS runs out of synchronization or crashes
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 15 +++
opends/src/server/org/opends/server/replication/server/MsgQueue.java | 18 ++++
opends/src/server/org/opends/server/replication/protocol/AddMsg.java | 9 ++
opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java | 7 +
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 43 +++++++---
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java | 11 ++
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java | 12 +++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java | 6 +
opends/src/server/org/opends/server/replication/server/DbHandler.java | 57 ++++++++++----
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 4
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java | 11 ++
11 files changed, 156 insertions(+), 37 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index c6044c9..db5dab6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -297,4 +297,13 @@
{
return parentUniqueId;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int size()
+ {
+ return encodedAttributes.length + 40;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
index 28deccd..ef24139 100644
--- a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -122,4 +122,15 @@
{
return ("DEL " + getDn() + " " + getChangeNumber());
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int size()
+ {
+ // The DeleteMsg size is mostly dependent on the DN and should never
+ // grow very large. It is therefore safe to assume an average of 40 bytes.
+ return 40;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index aede5ee..9dac7f6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -360,4 +360,15 @@
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int size()
+ {
+ // The MODDN message size are mainly dependent on the
+ // size of the DN. let's assume that they average on 100 bytes
+ return 100;
+ }
+
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 0b0abda..d5397b8 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -235,4 +235,16 @@
{
return("MOD " + getDn() + " " + getChangeNumber());
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int size()
+ {
+ // The ModifyMsh can be very large when added or deleted attribute
+ // values are very large. We therefore need to count the
+ // whole encoded msg.
+ return encodedMsg.length;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
index cdc2a50..224190e 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -362,4 +362,11 @@
}
}
+
+ /**
+ * Return the number of bytes used by this message.
+ *
+ * @return The number of bytes used by this message.
+ */
+ public abstract int size();
}
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 14ec358..d27daca 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -81,6 +81,22 @@
//
private final LinkedList<UpdateMessage> msgQueue =
new LinkedList<UpdateMessage>();
+
+ // The High and low water mark for the max size of the msgQueue.
+ // the threads calling add() method will be blocked if the size of
+ // msgQueue becomes larger than the queueHimark and will resume
+ // only when the size of the msgQueue goes below queueLowmark.
+ int queueHimark = 5000;
+ int queueLowmark = 4000;
+
+ // The queue himark and lowmark in bytes, this is set to 100 times the
+ // himark and lowmark in number of updates.
+ int queueHimarkBytes = 100 * queueHimark;
+ int queueLowmarkBytes = 100 * queueLowmark;
+
+ // The number of bytes currently in the queue
+ int queueByteSize = 0;
+
private ReplicationDB db;
private ChangeNumber firstChange = null;
private ChangeNumber lastChange = null;
@@ -93,14 +109,6 @@
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
-
- // The High and low water mark for the max size of the msgQueue.
- // the threads calling add() method will be blocked if the size of
- // msgQueue becomes larger than the MSG_QUEUE_HIMARK and will resume
- // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
- final static int MSG_QUEUE_HIMARK = 5000;
- final static int MSG_QUEUE_LOWMARK = 4000;
-
// The maximum number of retries in case of DatabaseDeadlock Exception.
private static final int DEADLOCK_RETRIES = 10;
@@ -120,16 +128,22 @@
* @param replicationServer The ReplicationServer that creates this dbHandler.
* @param dbenv the Database Env to use to create the ReplicationServer DB.
* server for this domain.
+ * @param queueSize The queueSize to use when creating the dbHandler.
* @throws DatabaseException If a database problem happened
*/
- public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
- ReplicationDbEnv dbenv)
+ public DbHandler(
+ short id, DN baseDn, ReplicationServer replicationServer,
+ ReplicationDbEnv dbenv, int queueSize)
throws DatabaseException
{
this.replicationServer = replicationServer;
- this.serverId = id;
+ serverId = id;
this.baseDn = baseDn;
- this.trimage = replicationServer.getTrimage();
+ trimage = replicationServer.getTrimage();
+ queueHimark = queueSize;
+ queueLowmark = queueSize * 4 / 5;
+ queueHimarkBytes = 100 * queueHimark;
+ queueLowmarkBytes = 100 * queueLowmark;
db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
firstChange = db.readFirstChange();
lastChange = db.readLastChange();
@@ -156,7 +170,7 @@
synchronized (msgQueue)
{
int size = msgQueue.size();
- while (size > MSG_QUEUE_HIMARK)
+ while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
{
try
{
@@ -168,6 +182,7 @@
size = msgQueue.size();
}
+ queueByteSize += update.size();
msgQueue.add(update);
if (lastChange == null || lastChange.older(update.getChangeNumber()))
{
@@ -308,10 +323,12 @@
int current = 0;
while ((current < number) && (!msgQueue.isEmpty()))
{
- msgQueue.remove();
+ UpdateMessage msg = msgQueue.remove();
+ queueByteSize -= msg.size();
current++;
}
- if (msgQueue.size() < MSG_QUEUE_LOWMARK)
+ if ((msgQueue.size() < queueLowmark) &&
+ (queueByteSize < queueLowmarkBytes))
msgQueue.notify();
}
}
@@ -475,13 +492,14 @@
private void flush()
{
int size;
+ int chunksize = (500 < queueHimark ? 500 : queueHimark);
do
{
synchronized(flushLock)
{
// get N messages to save in the DB
- List<UpdateMessage> changes = getChanges(500);
+ List<UpdateMessage> changes = getChanges(chunksize);
// if no more changes to save exit immediately.
if ((changes == null) || ((size = changes.size()) == 0))
@@ -493,7 +511,7 @@
// remove the changes from the list of changes to be saved.
clearQueue(changes.size());
}
- } while (size >=500);
+ } while (size >= chunksize);
}
/**
@@ -529,6 +547,10 @@
attributes.add(new Attribute("last-change",
lastChange.toString() + " " + lastTime.toString()));
}
+ attributes.add(
+ new Attribute("queue-size", String.valueOf(msgQueue.size())));
+ attributes.add(
+ new Attribute("queue-size-bytes", String.valueOf(queueByteSize)));
return attributes;
}
@@ -604,6 +626,7 @@
synchronized(flushLock)
{
msgQueue.clear();
+ queueByteSize = 0;
}
db.clear();
firstChange = db.readFirstChange();
diff --git a/opends/src/server/org/opends/server/replication/server/MsgQueue.java b/opends/src/server/org/opends/server/replication/server/MsgQueue.java
index d8b7295..3484633 100644
--- a/opends/src/server/org/opends/server/replication/server/MsgQueue.java
+++ b/opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -42,6 +42,9 @@
private SortedMap<ChangeNumber, UpdateMessage> map =
new TreeMap<ChangeNumber, UpdateMessage>();
+ // The total number of bytes for all the message in the queue.
+ private int bytesCount = 0;
+
/**
* Return the first UpdateMessage in the MsgQueue.
*
@@ -67,12 +70,22 @@
*
* @return The number of elements in this MsgQueue.
*/
- public int size()
+ public int count()
{
return map.size();
}
/**
+ * Returns the number of bytes in this MsgQueue.
+ *
+ * @return The number of bytes in this MsgQueue.
+ */
+ public int bytesCount()
+ {
+ return bytesCount;
+ }
+
+ /**
* Returns <tt>true</tt> if this MsgQueue contains no UpdateMessage.
*
* @return <tt>true</tt> if this MsgQueue contains no UpdateMessage.
@@ -91,6 +104,7 @@
public void add(UpdateMessage update)
{
map.put(update.getChangeNumber(), update);
+ bytesCount += update.size();
}
/**
@@ -102,6 +116,7 @@
{
UpdateMessage msg = map.get(map.firstKey());
map.remove(msg.getChangeNumber());
+ bytesCount -= msg.size();
return msg;
}
@@ -126,5 +141,6 @@
public void clear()
{
map.clear();
+ bytesCount = 0;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index 48603d5..cca4d50 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -257,7 +257,9 @@
+ " serverId=" + serverId);
DbHandler dbHandler =
- new DbHandler(serverId, baseDn, replicationServer, this);
+ new DbHandler(
+ serverId, baseDn, replicationServer, this,
+ replicationServer.getQueueSize());
replicationServer.getReplicationServerDomain(baseDn, true).
setDbHandler(serverId, dbHandler);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 393a531..5fe6a1e 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -134,7 +134,7 @@
// ID of the backend
private static final String backendId = "replicationChanges";
- // At startup, the listen thread wait on this flag for the connet
+ // At startup, the listen thread wait on this flag for the connect
// thread to look for other servers in the topology.
private boolean connectedInTopology = false;
private final Object connectedInTopologyLock = new Object();
@@ -554,7 +554,7 @@
public DbHandler newDbHandler(short id, DN baseDn)
throws DatabaseException
{
- return new DbHandler(id, baseDn, this, dbEnv);
+ return new DbHandler(id, baseDn, this, dbEnv, queueSize);
}
/**
@@ -827,6 +827,17 @@
}
/**
+ * Get the queueSize for this replication server.
+ *
+ * @return The maximum size of the queues for this Replication Server
+ *
+ */
+ public int getQueueSize()
+ {
+ return queueSize;
+ }
+
+ /**
* Creates the backend associated to this replication server.
* @throws ConfigException
*/
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a99ad1b..f85e133 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -96,7 +96,8 @@
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
private int maxSendDelay = 0;
- private int maxQueueSize = 10000;
+ private int maxQueueSize = 5000;
+ private int maxQueueBytesSize = maxQueueSize * 100;
private int restartReceiveQueue;
private int restartSendQueue;
private int restartReceiveDelay;
@@ -115,8 +116,8 @@
private Semaphore sendWindow;
private int sendWindowSize;
private boolean flowControl = false; // indicate that the server is
- // flow controled and should
- // be stopped from sending messsages.
+ // flow controlled and should
+ // be stopped from sending messages.
private int saturationCount = 0;
private short replicationServerId;
@@ -165,6 +166,7 @@
super("Server Handler");
this.session = session;
this.maxQueueSize = queueSize;
+ this.maxQueueBytesSize = queueSize * 100;
this.protocolVersion = ProtocolVersion.currentVersion();
}
@@ -305,7 +307,7 @@
session.publish(myStartMsg);
sendWindowSize = receivedMsg.getWindowSize();
- /* Until here session is encrypted then it depends on the negociation */
+ /* Until here session is encrypted then it depends on the negotiation */
if (!sslEncryption)
{
session.stopEncryption();
@@ -352,7 +354,7 @@
}
else
{
- // We are an empty Replicationserver
+ // We are an empty Replication Server
if ((generationId>0)&&(!serverState.isEmpty()))
{
// If the LDAP server has already sent changes
@@ -722,7 +724,7 @@
{
synchronized (msgQueue)
{
- int size = msgQueue.size();
+ int size = msgQueue.count();
if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
return true;
@@ -764,7 +766,7 @@
{
synchronized (msgQueue)
{
- int queueSize = msgQueue.size();
+ int queueSize = msgQueue.count();
if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
return false;
if ((source != null) && (source.maxSendQueue > 0) &&
@@ -815,7 +817,7 @@
* the number of updates to be sent is the size of the receive queue.
*/
if (isFollowing())
- return msgQueue.size();
+ return msgQueue.count();
else
{
/*
@@ -1034,7 +1036,8 @@
/* TODO : size should be configurable
* and larger than max-receive-queue-size
*/
- while (msgQueue.size() > maxQueueSize)
+ while ((msgQueue.count() > maxQueueSize) ||
+ (msgQueue.bytesCount() > maxQueueBytesSize))
{
setFollowing(false);
msgQueue.removeFirst();
@@ -1164,11 +1167,13 @@
// The loop below relies on the fact that it is sorted based
// on the currentChange of each iterator to consider the next
- // change accross all servers.
+ // change across all servers.
// Hence it is necessary to remove and eventual add again an iterator
// when looping in order to keep consistent the order of the
// iterators (see ReplicationIteratorComparator.
- while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
+ while (!iteratorSortedSet.isEmpty() &&
+ (lateQueue.count()<100) &&
+ (lateQueue.bytesCount()<50000) )
{
ReplicationIterator iterator = iteratorSortedSet.first();
iteratorSortedSet.remove(iterator);
@@ -1190,7 +1195,8 @@
{
synchronized (msgQueue)
{
- if (msgQueue.size() < maxQueueSize)
+ if ((msgQueue.count() < maxQueueSize) &&
+ (msgQueue.bytesCount() < maxQueueBytesSize))
{
setFollowing(true);
}
@@ -1203,7 +1209,7 @@
{
if (msgQueue.contains(msg))
{
- /* we finally catched up with the regular queue */
+ /* we finally catch up with the regular queue */
setFollowing(true);
lateQueue.clear();
UpdateMessage msg1;
@@ -1266,7 +1272,7 @@
* Update the serverState with the last message sent.
*
* @param msg the last update sent.
- * @return boolean indicating if the update was meaningfull.
+ * @return boolean indicating if the update was meaningful.
*/
public boolean updateServerState(UpdateMessage msg)
{
@@ -1599,6 +1605,15 @@
}
}
+ attributes.add(
+ new Attribute("queue-size", String.valueOf(msgQueue.count())));
+ attributes.add(
+ new Attribute(
+ "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
+ attributes.add(
+ new Attribute(
+ "following", String.valueOf(following)));
+
// Deprecated
attributes.add(new Attribute("max-waiting-changes",
String.valueOf(maxQueueSize)));
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index f2a1021..05ceca0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -73,7 +73,8 @@
ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
DbHandler handler =
- new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
+ new DbHandler(
+ (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -153,7 +154,8 @@
ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
DbHandler handler =
- new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
+ new DbHandler(
+ (short) 1, DN.decode("o=test"), replicationServer, dbEnv, 5000);
// Creates changes added to the dbHandler
ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
--
Gitblit v1.10.0