From bcf686add35bda4a6ac5c3d085abe151ea018e8e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 14 Jan 2009 08:29:50 +0000
Subject: [PATCH] 

---
 opends/src/server/org/opends/server/replication/protocol/AckMsg.java                                      |    6 
 opends/src/server/org/opends/server/replication/protocol/AddMsg.java                                      |   55 +++--
 opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java                               |   21 ++
 opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java                                   |   41 +++
 opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java                              |    6 
 opends/src/server/org/opends/server/replication/server/DbHandler.java                                     |  128 ++++++++------
 opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java                                 |  101 ++++++-----
 opends/src/server/org/opends/server/replication/plugin/PendingChanges.java                                |   55 ++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java          |    4 
 opends/src/server/org/opends/server/replication/common/ChangeNumber.java                                  |   32 +++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java |    2 
 opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java                                   |   11 +
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                         |   22 +
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                       |    6 
 14 files changed, 328 insertions(+), 162 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
index 8c8ef70..1a8a379 100644
--- a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
+++ b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.common;
 
@@ -35,9 +35,13 @@
                                      java.lang.Comparable<ChangeNumber>
 {
   private static final long serialVersionUID = -8802722277749190740L;
-  private long timeStamp;
-  private int seqnum;
-  private short serverId;
+  private final long timeStamp;
+  private final int seqnum;
+  private final short serverId;
+
+  // A String representation of the ChangeNumber suitable for network
+  // transmission.
+  private String formatedString = null;;
 
   /**
    * Create a new ChangeNumber from a String.
@@ -54,6 +58,8 @@
 
     temp = str.substring(20, 28);
     seqnum = Integer.parseInt(temp, 16);
+
+    formatedString = str;
   }
 
   /**
@@ -141,11 +147,27 @@
    */
   public String toString()
   {
+    return format();
+  }
+
+  /**
+   * Convert the ChangeNumber to a String that is suitable for network
+   * transmission.
+   *
+   * @return the string
+   */
+  public String format()
+  {
+    if (formatedString != null)
+      return formatedString;
+
     return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
   }
 
   /**
-   * Convert the ChangeNumber to a printable String that is .
+   * Convert the ChangeNumber to a printable String with a user friendly
+   * format.
+   *
    * @return the string
    */
   public String toStringUI()
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index c6629eb..a0e0d85 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -49,6 +49,7 @@
 import java.io.File;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -869,7 +870,18 @@
         }
         else
         {
-          pendingChanges.commit(curChangeNumber, msg);
+          // If assured replication is configured, this will prepare blocking
+          // mechanism. If assured replication is disabled, this returns
+          // immediately
+          prepareWaitForAckIfAssuredEnabled(msg);
+          try
+          {
+            msg.encode();
+          } catch (UnsupportedEncodingException e)
+          {
+            // will be caught at publish time.
+          }
+          pendingChanges.commitAndPushCommittedChanges(curChangeNumber, msg);
         }
       }
       catch  (NoSuchElementException e)
@@ -892,18 +904,12 @@
       if (curChangeNumber != null)
       {
         pendingChanges.remove(curChangeNumber);
+        pendingChanges.pushCommittedChanges();
       }
     }
 
     if (!op.isSynchronizationOperation())
     {
-      // If assured replication is configured, this will prepare blocking
-      // mechanism. If assured replication is disabled, this returns
-      // immediately
-      prepareWaitForAckIfAssuredEnabled(msg);
-
-      pendingChanges.pushCommittedChanges();
-
       // If assured replication is enabled, this will wait for the matching
       // ack or time out. If assured replication is disabled, this returns
       // immediately
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index b9b512f..ad1207d 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 
@@ -113,6 +113,18 @@
   public synchronized void commit(ChangeNumber changeNumber,
       LDAPUpdateMsg msg)
   {
+    _commit(changeNumber, msg);
+  }
+  /**
+   * Mark an update message as committed.
+   *
+   * @param changeNumber The ChangeNumber of the update message that must be
+   *                     set as committed.
+   * @param msg          The message associated to the update.
+   */
+  public void _commit(ChangeNumber changeNumber,
+      LDAPUpdateMsg msg)
+  {
     PendingChange curChange = pendingChanges.get(changeNumber);
     if (curChange == null)
     {
@@ -149,6 +161,18 @@
    */
   public synchronized ChangeNumber putLocalOperation(PluginOperation operation)
   {
+    return _putLocalOperation(operation);
+  }
+  /**
+   * Add a new UpdateMsg to the pending list from the provided local
+   * operation.
+   *
+   * @param operation The local operation for which an UpdateMsg must
+   *                  be added in the pending list.
+   * @return The ChangeNumber now associated to the operation.
+   */
+  public  ChangeNumber _putLocalOperation(PluginOperation operation)
+  {
     ChangeNumber changeNumber;
 
     changeNumber = changeNumberGenerator.newChangeNumber();
@@ -165,6 +189,15 @@
    */
   public synchronized int pushCommittedChanges()
   {
+    return _pushCommittedChanges();
+  }
+  /**
+   * Push all committed local changes to the replicationServer service.
+   *
+   * @return The number of pushed updates.
+   */
+  public int _pushCommittedChanges()
+  {
     int numSentUpdates = 0;
     if (pendingChanges.isEmpty())
       return numSentUpdates;
@@ -195,4 +228,24 @@
     }
     return numSentUpdates;
   }
+
+  /**
+   * Mark an update message as committed, then
+   * push all committed local changes to the replicationServer service
+   * in a single atomic operation.
+   *
+   *
+   * @param changeNumber The ChangeNumber of the update message that must be
+   *                     set as committed.
+   * @param msg          The message associated to the update.
+   *
+   * @return The number of pushed updates.
+   */
+  public synchronized int commitAndPushCommittedChanges(
+      ChangeNumber changeNumber,
+      LDAPUpdateMsg msg)
+  {
+    _commit(changeNumber, msg);
+    return _pushCommittedChanges();
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
index 3b3f53e..a0e8399 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -241,13 +241,13 @@
        * error><failed server ids>
        */
 
-      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+      ByteArrayOutputStream oStream = new ByteArrayOutputStream(200);
 
       /* Put the type of the operation */
       oStream.write(MSG_TYPE_ACK);
 
       /* Put the ChangeNumber */
-      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
+      byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8");
       oStream.write(changeNumberByte);
       oStream.write(0);
 
diff --git a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index 972a40c..9cb3e89 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -258,34 +258,41 @@
   @Override
   public byte[] getBytes() throws UnsupportedEncodingException
   {
-    int length = encodedAttributes.length;
-    byte[] byteParentId = null;
-    if (parentUniqueId != null)
+    if (bytes == null)
     {
-      byteParentId = parentUniqueId.getBytes("UTF-8");
-      length += byteParentId.length + 1;
+      int length = encodedAttributes.length;
+      byte[] byteParentId = null;
+      if (parentUniqueId != null)
+      {
+        byteParentId = parentUniqueId.getBytes("UTF-8");
+        length += byteParentId.length + 1;
+      }
+      else
+      {
+        length += 1;
+      }
+
+      /* encode the header in a byte[] large enough to also contain the mods */
+      byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length);
+
+      int pos = resultByteArray.length - length;
+
+      if (byteParentId != null)
+        pos = addByteArray(byteParentId, resultByteArray, pos);
+      else
+        resultByteArray[pos++] = 0;
+
+      /* put the attributes */
+      for (int i=0; i<encodedAttributes.length; i++,pos++)
+      {
+        resultByteArray[pos] = encodedAttributes[i];
+      }
+      return resultByteArray;
     }
     else
     {
-      length += 1;
+      return bytes;
     }
-
-    /* encode the header in a byte[] large enough to also contain the mods */
-    byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length);
-
-    int pos = resultByteArray.length - length;
-
-    if (byteParentId != null)
-      pos = addByteArray(byteParentId, resultByteArray, pos);
-    else
-      resultByteArray[pos++] = 0;
-
-    /* put the attributes */
-    for (int i=0; i<encodedAttributes.length; i++,pos++)
-    {
-      resultByteArray[pos] = encodedAttributes[i];
-    }
-    return resultByteArray;
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
index df136f2..e365b07 100644
--- a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -106,7 +106,14 @@
   @Override
   public byte[] getBytes() throws UnsupportedEncodingException
   {
-    return encodeHeader(MSG_TYPE_DELETE, 0);
+    if (bytes == null)
+    {
+     return encodeHeader(MSG_TYPE_DELETE, 0);
+    }
+    else
+    {
+      return bytes;
+    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
index c761af5..66624aa 100644
--- a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -58,6 +58,11 @@
   protected String uniqueId;
 
   /**
+   * Encoded form of the LDAPUpdateMsg.
+   */
+  protected byte[] bytes = null;
+
+  /**
    * Creates a new UpdateMsg.
    */
   public LDAPUpdateMsg()
@@ -158,6 +163,20 @@
     return uniqueId;
   }
 
+  /**
+   * Do all the work necessary for the encoding.
+   *
+   * This is useful in case when one wants to perform this outside
+   * of a synchronized portion of code.
+   *
+   * This method is not synchronized and therefore not MT safe.
+   *
+   * @throws UnsupportedEncodingException when encoding fails.
+   */
+  public void encode() throws UnsupportedEncodingException
+  {
+    bytes = getBytes();
+  }
 
   /**
    * Create and Operation from the message.
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index c146b2a..e4f3b72 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -524,56 +524,63 @@
    */
   public byte[] getBytes_V1() throws UnsupportedEncodingException
   {
-    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
-    byte[] byteNewSuperior = null;
-    byte[] byteNewSuperiorId = null;
-
-    // calculate the length necessary to encode the parameters
-    int length = byteNewRdn.length + 1 + 1;
-    if (newSuperior != null)
+    if (bytes == null)
     {
-      byteNewSuperior = newSuperior.getBytes("UTF-8");
-      length += byteNewSuperior.length + 1;
+      byte[] byteNewRdn = newRDN.getBytes("UTF-8");
+      byte[] byteNewSuperior = null;
+      byte[] byteNewSuperiorId = null;
+
+      // calculate the length necessary to encode the parameters
+      int length = byteNewRdn.length + 1 + 1;
+      if (newSuperior != null)
+      {
+        byteNewSuperior = newSuperior.getBytes("UTF-8");
+        length += byteNewSuperior.length + 1;
+      }
+      else
+        length += 1;
+
+      if (newSuperiorId != null)
+      {
+        byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
+        length += byteNewSuperiorId.length + 1;
+      }
+      else
+        length += 1;
+
+      byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length);
+      int pos = resultByteArray.length - length;
+
+      /* put the new RDN and a terminating 0 */
+      pos = addByteArray(byteNewRdn, resultByteArray, pos);
+
+      /* put the newsuperior and a terminating 0 */
+      if (newSuperior != null)
+      {
+        pos = addByteArray(byteNewSuperior, resultByteArray, pos);
+      }
+      else
+        resultByteArray[pos++] = 0;
+
+      /* put the newsuperiorId and a terminating 0 */
+      if (newSuperiorId != null)
+      {
+        pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
+      }
+      else
+        resultByteArray[pos++] = 0;
+
+      /* put the deleteoldrdn flag */
+      if (deleteOldRdn)
+        resultByteArray[pos++] = 1;
+      else
+        resultByteArray[pos++] = 0;
+
+      return resultByteArray;
     }
     else
-      length += 1;
-
-    if (newSuperiorId != null)
     {
-      byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
-      length += byteNewSuperiorId.length + 1;
+      return bytes;
     }
-    else
-      length += 1;
-
-    byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length);
-    int pos = resultByteArray.length - length;
-
-    /* put the new RDN and a terminating 0 */
-    pos = addByteArray(byteNewRdn, resultByteArray, pos);
-
-    /* put the newsuperior and a terminating 0 */
-    if (newSuperior != null)
-    {
-      pos = addByteArray(byteNewSuperior, resultByteArray, pos);
-    }
-    else
-      resultByteArray[pos++] = 0;
-
-    /* put the newsuperiorId and a terminating 0 */
-    if (newSuperiorId != null)
-    {
-      pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
-    }
-    else
-      resultByteArray[pos++] = 0;
-
-    /* put the deleteoldrdn flag */
-    if (deleteOldRdn)
-      resultByteArray[pos++] = 1;
-    else
-      resultByteArray[pos++] = 0;
-
-    return resultByteArray;
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 9f6676f..8986f15 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -92,6 +92,8 @@
   public ModifyMsg(byte[] in) throws DataFormatException,
                                      UnsupportedEncodingException
   {
+    bytes = in;
+
     // Decode header
     byte[] allowedPduTypes = new byte[2];
     allowedPduTypes[0] = MSG_TYPE_MODIFY;
@@ -117,19 +119,44 @@
   }
 
   /**
+   * Creates a new Modify message from a V1 byte[].
+   *
+   * @param in The byte[] from which the operation must be read.
+   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
+   * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
+   *
+   * @return The created ModifyMsg.
+   */
+  public static ModifyMsg createV1(byte[] in) throws DataFormatException,
+                                     UnsupportedEncodingException
+  {
+    ModifyMsg msg = new ModifyMsg(in);
+    msg.bytes = null;
+
+    return msg;
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
   public byte[] getBytes() throws UnsupportedEncodingException
   {
-    /* encode the header in a byte[] large enough to also contain the mods */
-    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1);
+    if (bytes == null)
+    {
+      /* encode the header in a byte[] large enough to also contain the mods */
+      byte[] mybytes = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1);
 
-    /* add the mods */
-    int pos = encodedMsg.length - (encodedMods.length + 1);
-    addByteArray(encodedMods, encodedMsg, pos);
+      /* add the mods */
+      int pos = mybytes.length - (encodedMods.length + 1);
+      addByteArray(encodedMods, mybytes, pos);
 
-    return encodedMsg;
+      return mybytes;
+    }
+    else
+    {
+      return bytes;
+    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index b65af58..fdce792 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -143,8 +143,10 @@
         throw new NotSupportedOldVersionPDUException("Replication Server Info",
           ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]);
       case MSG_TYPE_MODIFY:
+        msg = new ModifyMsg(buffer);
+      break;
       case MSG_TYPE_MODIFY_V1:
-          msg = new ModifyMsg(buffer);
+          msg = ModifyMsg.createV1(buffer);
       break;
       case MSG_TYPE_ADD:
       case MSG_TYPE_ADD_V1:
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 097638d..1f21de3 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.MessageBuilder;
@@ -86,13 +86,15 @@
   // the threads calling add() method will be blocked if the size of
   // msgQueue becomes larger than the  queueHimark and will resume
   // only when the size of the msgQueue goes below queueLowmark.
-  int queueHimark = 5000;
-  int queueLowmark = 4000;
+  int queueMaxSize = 5000;
+  int queueLowmark = 1000;
+  int queueHimark = 4000;
 
   // The queue himark and lowmark in bytes, this is set to 100 times the
   // himark and lowmark in number of updates.
-  int queueHimarkBytes = 100 * queueHimark;
+  int queueMaxBytes = 100 * queueMaxSize;
   int queueLowmarkBytes = 100 * queueLowmark;
+  int queueHimarkBytes = 100 * queueHimark;
 
   // The number of bytes currently in the queue
   int queueByteSize = 0;
@@ -140,10 +142,12 @@
     serverId = id;
     this.baseDn = baseDn;
     trimage = replicationServer.getTrimage();
-    queueHimark = queueSize;
-    queueLowmark = queueSize * 4 / 5;
-    queueHimarkBytes = 100 * queueHimark;
-    queueLowmarkBytes = 100 * queueLowmark;
+    queueMaxSize = queueSize;
+    queueLowmark = queueSize * 1 / 5;
+    queueHimark = queueSize * 4 / 5;
+    queueMaxBytes = 200 * queueMaxSize;
+    queueLowmarkBytes = 200 * queueLowmark;
+    queueHimarkBytes = 200 * queueLowmark;
     db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
     firstChange = db.readFirstChange();
     lastChange = db.readLastChange();
@@ -171,11 +175,14 @@
     synchronized (msgQueue)
     {
       int size = msgQueue.size();
-      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+      if ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
+        msgQueue.notify();
+
+      while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes))
       {
         try
         {
-          msgQueue.wait(500);
+          msgQueue.wait(5000);
         } catch (InterruptedException e)
         {
           // simply loop to try again.
@@ -379,17 +386,22 @@
   {
     while (shutdown == false)
     {
-      try {
+      try
+      {
         flush();
         trim();
 
-        synchronized (this)
+        synchronized (msgQueue)
         {
-          try
+          if ((msgQueue.size() < queueLowmark) &&
+              (queueByteSize < queueLowmarkBytes))
           {
-            this.wait(1000);
-          } catch (InterruptedException e)
-          { }
+            try
+            {
+              msgQueue.wait(10000);
+            } catch (InterruptedException e)
+            { }
+          }
         }
       } catch (Exception end)
       {
@@ -434,56 +446,59 @@
     int tries = 0;
     while ((tries++ < DEADLOCK_RETRIES) && (!done))
     {
-      /* the trim is done by group in order to save some CPU and IO bandwidth
-       * start the transaction then do a bunch of remove then commit
-       */
-      ReplServerDBCursor cursor;
-      cursor = db.openDeleteCursor();
-
-      try
+      synchronized (flushLock)
       {
-        while ((size < 5000 ) &&  (!finished))
+        /* the trim is done by group in order to save some CPU and IO bandwidth
+         * start the transaction then do a bunch of remove then commit
+         */
+        ReplServerDBCursor cursor;
+        cursor = db.openDeleteCursor();
+
+        try
         {
-          ChangeNumber changeNumber = cursor.nextChangeNumber();
-          if (changeNumber != null)
+          while ((size < 5000 ) &&  (!finished))
           {
-            if ((!changeNumber.equals(lastChange))
-                && (changeNumber.older(trimDate)))
+            ChangeNumber changeNumber = cursor.nextChangeNumber();
+            if (changeNumber != null)
             {
-              size++;
-              cursor.delete();
+              if ((!changeNumber.equals(lastChange))
+                  && (changeNumber.older(trimDate)))
+              {
+                size++;
+                cursor.delete();
+              }
+              else
+              {
+                firstChange = changeNumber;
+                finished = true;
+              }
             }
             else
-            {
-              firstChange = changeNumber;
               finished = true;
-            }
           }
-          else
-            finished = true;
+          cursor.close();
+          done = true;
         }
-        cursor.close();
-        done = true;
-      }
-      catch (DeadlockException e)
-      {
-        cursor.abort();
-        if (tries == DEADLOCK_RETRIES)
+        catch (DeadlockException e)
         {
-          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
-          // shutdown the ReplicationServer.
+          cursor.abort();
+          if (tries == DEADLOCK_RETRIES)
+          {
+            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
+            // shutdown the ReplicationServer.
+            shutdown = true;
+            throw (e);
+          }
+        }
+        catch (DatabaseException e)
+        {
+          // mark shutdown for this db so that we don't try again to
+          // stop it from cursor.close() or methods called by cursor.close()
           shutdown = true;
+          cursor.abort();
           throw (e);
         }
       }
-      catch (DatabaseException e)
-      {
-        // mark shutdown for this db so that we don't try again to
-        // stop it from cursor.close() or methods called by cursor.close()
-        shutdown = true;
-        cursor.abort();
-        throw (e);
-      }
     }
   }
 
@@ -493,7 +508,7 @@
   private void flush()
   {
     int size;
-    int chunksize = (500 < queueHimark ? 500 : queueHimark);
+    int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
 
     do
     {
@@ -630,9 +645,10 @@
     {
       msgQueue.clear();
       queueByteSize = 0;
+
+      db.clear();
+      firstChange = db.readFirstChange();
+      lastChange = db.readLastChange();
     }
-    db.clear();
-    firstChange = db.readFirstChange();
-    lastChange = db.readLastChange();
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6d82905..de44cd7 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -626,12 +626,12 @@
       logError(errorMsg);
     } else if (sourceGroupId != groupId)
     {
-      // Assured feature does not cross different group ids
+      // Assured feature does not cross different group IDS
     } else
     {
       if ((generationId > 0) &&
         (generationId == sourceHandler.getGenerationId()))
-        // Ignore assured updates from wrong generationid servers
+        // Ignore assured updates from wrong generationId servers
       {
         if (sourceHandler.isLDAPserver())
         {
@@ -662,7 +662,7 @@
             }
           }
         } else
-        { // A RS sent us the safe data message, for sure no futher acks to wait
+        { // A RS sent us the safe data message, for sure no further ack to wait
           if (safeDataLevel == (byte) 1)
           {
             /**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index f2c0de2..53ef7fc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication;
 
@@ -978,7 +978,7 @@
       Message expectedMessage)
   {
     TaskState taskState = null;
-    int cpt=10;
+    int cpt=40;
     try
     {
       SearchFilter filter =
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 7b56a53..c9ba1cb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 

--
Gitblit v1.10.0