From bcf686add35bda4a6ac5c3d085abe151ea018e8e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 14 Jan 2009 08:29:50 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/replication/protocol/AckMsg.java | 6
opends/src/server/org/opends/server/replication/protocol/AddMsg.java | 55 +++--
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java | 21 ++
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java | 41 +++
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 6
opends/src/server/org/opends/server/replication/server/DbHandler.java | 128 ++++++++------
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java | 101 ++++++-----
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 55 ++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 4
opends/src/server/org/opends/server/replication/common/ChangeNumber.java | 32 +++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 2
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java | 11 +
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 22 +
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 6
14 files changed, 328 insertions(+), 162 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
index 8c8ef70..1a8a379 100644
--- a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
+++ b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -35,9 +35,13 @@
java.lang.Comparable<ChangeNumber>
{
private static final long serialVersionUID = -8802722277749190740L;
- private long timeStamp;
- private int seqnum;
- private short serverId;
+ private final long timeStamp;
+ private final int seqnum;
+ private final short serverId;
+
+ // A String representation of the ChangeNumber suitable for network
+ // transmission.
+ private String formatedString = null;;
/**
* Create a new ChangeNumber from a String.
@@ -54,6 +58,8 @@
temp = str.substring(20, 28);
seqnum = Integer.parseInt(temp, 16);
+
+ formatedString = str;
}
/**
@@ -141,11 +147,27 @@
*/
public String toString()
{
+ return format();
+ }
+
+ /**
+ * Convert the ChangeNumber to a String that is suitable for network
+ * transmission.
+ *
+ * @return the string
+ */
+ public String format()
+ {
+ if (formatedString != null)
+ return formatedString;
+
return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
}
/**
- * Convert the ChangeNumber to a printable String that is .
+ * Convert the ChangeNumber to a printable String with a user friendly
+ * format.
+ *
* @return the string
*/
public String toStringUI()
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index c6629eb..a0e0d85 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -49,6 +49,7 @@
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -869,7 +870,18 @@
}
else
{
- pendingChanges.commit(curChangeNumber, msg);
+ // If assured replication is configured, this will prepare blocking
+ // mechanism. If assured replication is disabled, this returns
+ // immediately
+ prepareWaitForAckIfAssuredEnabled(msg);
+ try
+ {
+ msg.encode();
+ } catch (UnsupportedEncodingException e)
+ {
+ // will be caught at publish time.
+ }
+ pendingChanges.commitAndPushCommittedChanges(curChangeNumber, msg);
}
}
catch (NoSuchElementException e)
@@ -892,18 +904,12 @@
if (curChangeNumber != null)
{
pendingChanges.remove(curChangeNumber);
+ pendingChanges.pushCommittedChanges();
}
}
if (!op.isSynchronizationOperation())
{
- // If assured replication is configured, this will prepare blocking
- // mechanism. If assured replication is disabled, this returns
- // immediately
- prepareWaitForAckIfAssuredEnabled(msg);
-
- pendingChanges.pushCommittedChanges();
-
// If assured replication is enabled, this will wait for the matching
// ack or time out. If assured replication is disabled, this returns
// immediately
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index b9b512f..ad1207d 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
@@ -113,6 +113,18 @@
public synchronized void commit(ChangeNumber changeNumber,
LDAPUpdateMsg msg)
{
+ _commit(changeNumber, msg);
+ }
+ /**
+ * Mark an update message as committed.
+ *
+ * @param changeNumber The ChangeNumber of the update message that must be
+ * set as committed.
+ * @param msg The message associated to the update.
+ */
+ public void _commit(ChangeNumber changeNumber,
+ LDAPUpdateMsg msg)
+ {
PendingChange curChange = pendingChanges.get(changeNumber);
if (curChange == null)
{
@@ -149,6 +161,18 @@
*/
public synchronized ChangeNumber putLocalOperation(PluginOperation operation)
{
+ return _putLocalOperation(operation);
+ }
+ /**
+ * Add a new UpdateMsg to the pending list from the provided local
+ * operation.
+ *
+ * @param operation The local operation for which an UpdateMsg must
+ * be added in the pending list.
+ * @return The ChangeNumber now associated to the operation.
+ */
+ public ChangeNumber _putLocalOperation(PluginOperation operation)
+ {
ChangeNumber changeNumber;
changeNumber = changeNumberGenerator.newChangeNumber();
@@ -165,6 +189,15 @@
*/
public synchronized int pushCommittedChanges()
{
+ return _pushCommittedChanges();
+ }
+ /**
+ * Push all committed local changes to the replicationServer service.
+ *
+ * @return The number of pushed updates.
+ */
+ public int _pushCommittedChanges()
+ {
int numSentUpdates = 0;
if (pendingChanges.isEmpty())
return numSentUpdates;
@@ -195,4 +228,24 @@
}
return numSentUpdates;
}
+
+ /**
+ * Mark an update message as committed, then
+ * push all committed local changes to the replicationServer service
+ * in a single atomic operation.
+ *
+ *
+ * @param changeNumber The ChangeNumber of the update message that must be
+ * set as committed.
+ * @param msg The message associated to the update.
+ *
+ * @return The number of pushed updates.
+ */
+ public synchronized int commitAndPushCommittedChanges(
+ ChangeNumber changeNumber,
+ LDAPUpdateMsg msg)
+ {
+ _commit(changeNumber, msg);
+ return _pushCommittedChanges();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
index 3b3f53e..a0e8399 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -241,13 +241,13 @@
* error><failed server ids>
*/
- ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+ ByteArrayOutputStream oStream = new ByteArrayOutputStream(200);
/* Put the type of the operation */
oStream.write(MSG_TYPE_ACK);
/* Put the ChangeNumber */
- byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
+ byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8");
oStream.write(changeNumberByte);
oStream.write(0);
diff --git a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index 972a40c..9cb3e89 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -258,34 +258,41 @@
@Override
public byte[] getBytes() throws UnsupportedEncodingException
{
- int length = encodedAttributes.length;
- byte[] byteParentId = null;
- if (parentUniqueId != null)
+ if (bytes == null)
{
- byteParentId = parentUniqueId.getBytes("UTF-8");
- length += byteParentId.length + 1;
+ int length = encodedAttributes.length;
+ byte[] byteParentId = null;
+ if (parentUniqueId != null)
+ {
+ byteParentId = parentUniqueId.getBytes("UTF-8");
+ length += byteParentId.length + 1;
+ }
+ else
+ {
+ length += 1;
+ }
+
+ /* encode the header in a byte[] large enough to also contain the mods */
+ byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length);
+
+ int pos = resultByteArray.length - length;
+
+ if (byteParentId != null)
+ pos = addByteArray(byteParentId, resultByteArray, pos);
+ else
+ resultByteArray[pos++] = 0;
+
+ /* put the attributes */
+ for (int i=0; i<encodedAttributes.length; i++,pos++)
+ {
+ resultByteArray[pos] = encodedAttributes[i];
+ }
+ return resultByteArray;
}
else
{
- length += 1;
+ return bytes;
}
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length);
-
- int pos = resultByteArray.length - length;
-
- if (byteParentId != null)
- pos = addByteArray(byteParentId, resultByteArray, pos);
- else
- resultByteArray[pos++] = 0;
-
- /* put the attributes */
- for (int i=0; i<encodedAttributes.length; i++,pos++)
- {
- resultByteArray[pos] = encodedAttributes[i];
- }
- return resultByteArray;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
index df136f2..e365b07 100644
--- a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -106,7 +106,14 @@
@Override
public byte[] getBytes() throws UnsupportedEncodingException
{
- return encodeHeader(MSG_TYPE_DELETE, 0);
+ if (bytes == null)
+ {
+ return encodeHeader(MSG_TYPE_DELETE, 0);
+ }
+ else
+ {
+ return bytes;
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
index c761af5..66624aa 100644
--- a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -58,6 +58,11 @@
protected String uniqueId;
/**
+ * Encoded form of the LDAPUpdateMsg.
+ */
+ protected byte[] bytes = null;
+
+ /**
* Creates a new UpdateMsg.
*/
public LDAPUpdateMsg()
@@ -158,6 +163,20 @@
return uniqueId;
}
+ /**
+ * Do all the work necessary for the encoding.
+ *
+ * This is useful in case when one wants to perform this outside
+ * of a synchronized portion of code.
+ *
+ * This method is not synchronized and therefore not MT safe.
+ *
+ * @throws UnsupportedEncodingException when encoding fails.
+ */
+ public void encode() throws UnsupportedEncodingException
+ {
+ bytes = getBytes();
+ }
/**
* Create and Operation from the message.
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index c146b2a..e4f3b72 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -524,56 +524,63 @@
*/
public byte[] getBytes_V1() throws UnsupportedEncodingException
{
- byte[] byteNewRdn = newRDN.getBytes("UTF-8");
- byte[] byteNewSuperior = null;
- byte[] byteNewSuperiorId = null;
-
- // calculate the length necessary to encode the parameters
- int length = byteNewRdn.length + 1 + 1;
- if (newSuperior != null)
+ if (bytes == null)
{
- byteNewSuperior = newSuperior.getBytes("UTF-8");
- length += byteNewSuperior.length + 1;
+ byte[] byteNewRdn = newRDN.getBytes("UTF-8");
+ byte[] byteNewSuperior = null;
+ byte[] byteNewSuperiorId = null;
+
+ // calculate the length necessary to encode the parameters
+ int length = byteNewRdn.length + 1 + 1;
+ if (newSuperior != null)
+ {
+ byteNewSuperior = newSuperior.getBytes("UTF-8");
+ length += byteNewSuperior.length + 1;
+ }
+ else
+ length += 1;
+
+ if (newSuperiorId != null)
+ {
+ byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
+ length += byteNewSuperiorId.length + 1;
+ }
+ else
+ length += 1;
+
+ byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length);
+ int pos = resultByteArray.length - length;
+
+ /* put the new RDN and a terminating 0 */
+ pos = addByteArray(byteNewRdn, resultByteArray, pos);
+
+ /* put the newsuperior and a terminating 0 */
+ if (newSuperior != null)
+ {
+ pos = addByteArray(byteNewSuperior, resultByteArray, pos);
+ }
+ else
+ resultByteArray[pos++] = 0;
+
+ /* put the newsuperiorId and a terminating 0 */
+ if (newSuperiorId != null)
+ {
+ pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
+ }
+ else
+ resultByteArray[pos++] = 0;
+
+ /* put the deleteoldrdn flag */
+ if (deleteOldRdn)
+ resultByteArray[pos++] = 1;
+ else
+ resultByteArray[pos++] = 0;
+
+ return resultByteArray;
}
else
- length += 1;
-
- if (newSuperiorId != null)
{
- byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
- length += byteNewSuperiorId.length + 1;
+ return bytes;
}
- else
- length += 1;
-
- byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length);
- int pos = resultByteArray.length - length;
-
- /* put the new RDN and a terminating 0 */
- pos = addByteArray(byteNewRdn, resultByteArray, pos);
-
- /* put the newsuperior and a terminating 0 */
- if (newSuperior != null)
- {
- pos = addByteArray(byteNewSuperior, resultByteArray, pos);
- }
- else
- resultByteArray[pos++] = 0;
-
- /* put the newsuperiorId and a terminating 0 */
- if (newSuperiorId != null)
- {
- pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
- }
- else
- resultByteArray[pos++] = 0;
-
- /* put the deleteoldrdn flag */
- if (deleteOldRdn)
- resultByteArray[pos++] = 1;
- else
- resultByteArray[pos++] = 0;
-
- return resultByteArray;
}
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 9f6676f..8986f15 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -92,6 +92,8 @@
public ModifyMsg(byte[] in) throws DataFormatException,
UnsupportedEncodingException
{
+ bytes = in;
+
// Decode header
byte[] allowedPduTypes = new byte[2];
allowedPduTypes[0] = MSG_TYPE_MODIFY;
@@ -117,19 +119,44 @@
}
/**
+ * Creates a new Modify message from a V1 byte[].
+ *
+ * @param in The byte[] from which the operation must be read.
+ * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
+ * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
+ *
+ * @return The created ModifyMsg.
+ */
+ public static ModifyMsg createV1(byte[] in) throws DataFormatException,
+ UnsupportedEncodingException
+ {
+ ModifyMsg msg = new ModifyMsg(in);
+ msg.bytes = null;
+
+ return msg;
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
public byte[] getBytes() throws UnsupportedEncodingException
{
- /* encode the header in a byte[] large enough to also contain the mods */
- byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1);
+ if (bytes == null)
+ {
+ /* encode the header in a byte[] large enough to also contain the mods */
+ byte[] mybytes = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1);
- /* add the mods */
- int pos = encodedMsg.length - (encodedMods.length + 1);
- addByteArray(encodedMods, encodedMsg, pos);
+ /* add the mods */
+ int pos = mybytes.length - (encodedMods.length + 1);
+ addByteArray(encodedMods, mybytes, pos);
- return encodedMsg;
+ return mybytes;
+ }
+ else
+ {
+ return bytes;
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index b65af58..fdce792 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -143,8 +143,10 @@
throw new NotSupportedOldVersionPDUException("Replication Server Info",
ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]);
case MSG_TYPE_MODIFY:
+ msg = new ModifyMsg(buffer);
+ break;
case MSG_TYPE_MODIFY_V1:
- msg = new ModifyMsg(buffer);
+ msg = ModifyMsg.createV1(buffer);
break;
case MSG_TYPE_ADD:
case MSG_TYPE_ADD_V1:
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 097638d..1f21de3 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -86,13 +86,15 @@
// the threads calling add() method will be blocked if the size of
// msgQueue becomes larger than the queueHimark and will resume
// only when the size of the msgQueue goes below queueLowmark.
- int queueHimark = 5000;
- int queueLowmark = 4000;
+ int queueMaxSize = 5000;
+ int queueLowmark = 1000;
+ int queueHimark = 4000;
// The queue himark and lowmark in bytes, this is set to 100 times the
// himark and lowmark in number of updates.
- int queueHimarkBytes = 100 * queueHimark;
+ int queueMaxBytes = 100 * queueMaxSize;
int queueLowmarkBytes = 100 * queueLowmark;
+ int queueHimarkBytes = 100 * queueHimark;
// The number of bytes currently in the queue
int queueByteSize = 0;
@@ -140,10 +142,12 @@
serverId = id;
this.baseDn = baseDn;
trimage = replicationServer.getTrimage();
- queueHimark = queueSize;
- queueLowmark = queueSize * 4 / 5;
- queueHimarkBytes = 100 * queueHimark;
- queueLowmarkBytes = 100 * queueLowmark;
+ queueMaxSize = queueSize;
+ queueLowmark = queueSize * 1 / 5;
+ queueHimark = queueSize * 4 / 5;
+ queueMaxBytes = 200 * queueMaxSize;
+ queueLowmarkBytes = 200 * queueLowmark;
+ queueHimarkBytes = 200 * queueLowmark;
db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
firstChange = db.readFirstChange();
lastChange = db.readLastChange();
@@ -171,11 +175,14 @@
synchronized (msgQueue)
{
int size = msgQueue.size();
- while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+ if ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+ msgQueue.notify();
+
+ while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes))
{
try
{
- msgQueue.wait(500);
+ msgQueue.wait(5000);
} catch (InterruptedException e)
{
// simply loop to try again.
@@ -379,17 +386,22 @@
{
while (shutdown == false)
{
- try {
+ try
+ {
flush();
trim();
- synchronized (this)
+ synchronized (msgQueue)
{
- try
+ if ((msgQueue.size() < queueLowmark) &&
+ (queueByteSize < queueLowmarkBytes))
{
- this.wait(1000);
- } catch (InterruptedException e)
- { }
+ try
+ {
+ msgQueue.wait(10000);
+ } catch (InterruptedException e)
+ { }
+ }
}
} catch (Exception end)
{
@@ -434,56 +446,59 @@
int tries = 0;
while ((tries++ < DEADLOCK_RETRIES) && (!done))
{
- /* the trim is done by group in order to save some CPU and IO bandwidth
- * start the transaction then do a bunch of remove then commit
- */
- ReplServerDBCursor cursor;
- cursor = db.openDeleteCursor();
-
- try
+ synchronized (flushLock)
{
- while ((size < 5000 ) && (!finished))
+ /* the trim is done by group in order to save some CPU and IO bandwidth
+ * start the transaction then do a bunch of remove then commit
+ */
+ ReplServerDBCursor cursor;
+ cursor = db.openDeleteCursor();
+
+ try
{
- ChangeNumber changeNumber = cursor.nextChangeNumber();
- if (changeNumber != null)
+ while ((size < 5000 ) && (!finished))
{
- if ((!changeNumber.equals(lastChange))
- && (changeNumber.older(trimDate)))
+ ChangeNumber changeNumber = cursor.nextChangeNumber();
+ if (changeNumber != null)
{
- size++;
- cursor.delete();
+ if ((!changeNumber.equals(lastChange))
+ && (changeNumber.older(trimDate)))
+ {
+ size++;
+ cursor.delete();
+ }
+ else
+ {
+ firstChange = changeNumber;
+ finished = true;
+ }
}
else
- {
- firstChange = changeNumber;
finished = true;
- }
}
- else
- finished = true;
+ cursor.close();
+ done = true;
}
- cursor.close();
- done = true;
- }
- catch (DeadlockException e)
- {
- cursor.abort();
- if (tries == DEADLOCK_RETRIES)
+ catch (DeadlockException e)
{
- // could not handle the Deadlock after DEADLOCK_RETRIES tries.
- // shutdown the ReplicationServer.
+ cursor.abort();
+ if (tries == DEADLOCK_RETRIES)
+ {
+ // could not handle the Deadlock after DEADLOCK_RETRIES tries.
+ // shutdown the ReplicationServer.
+ shutdown = true;
+ throw (e);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ // mark shutdown for this db so that we don't try again to
+ // stop it from cursor.close() or methods called by cursor.close()
shutdown = true;
+ cursor.abort();
throw (e);
}
}
- catch (DatabaseException e)
- {
- // mark shutdown for this db so that we don't try again to
- // stop it from cursor.close() or methods called by cursor.close()
- shutdown = true;
- cursor.abort();
- throw (e);
- }
}
}
@@ -493,7 +508,7 @@
private void flush()
{
int size;
- int chunksize = (500 < queueHimark ? 500 : queueHimark);
+ int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
do
{
@@ -630,9 +645,10 @@
{
msgQueue.clear();
queueByteSize = 0;
+
+ db.clear();
+ firstChange = db.readFirstChange();
+ lastChange = db.readLastChange();
}
- db.clear();
- firstChange = db.readFirstChange();
- lastChange = db.readLastChange();
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6d82905..de44cd7 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -626,12 +626,12 @@
logError(errorMsg);
} else if (sourceGroupId != groupId)
{
- // Assured feature does not cross different group ids
+ // Assured feature does not cross different group IDS
} else
{
if ((generationId > 0) &&
(generationId == sourceHandler.getGenerationId()))
- // Ignore assured updates from wrong generationid servers
+ // Ignore assured updates from wrong generationId servers
{
if (sourceHandler.isLDAPserver())
{
@@ -662,7 +662,7 @@
}
}
} else
- { // A RS sent us the safe data message, for sure no futher acks to wait
+ { // A RS sent us the safe data message, for sure no further ack to wait
if (safeDataLevel == (byte) 1)
{
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index f2c0de2..53ef7fc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication;
@@ -978,7 +978,7 @@
Message expectedMessage)
{
TaskState taskState = null;
- int cpt=10;
+ int cpt=40;
try
{
SearchFilter filter =
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 7b56a53..c9ba1cb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
--
Gitblit v1.10.0