From 1990fa43bdcee1079c3e7909a416f705282a476a Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 20 Feb 2009 08:44:40 +0000
Subject: [PATCH] Fix for 3804 : improve replication monitoring

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java             |    4 ++--
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java    |   17 +++++++++++++----
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |    3 +++
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java  |   12 +++++++++++-
 4 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index df09350..3b7e07c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -3038,6 +3038,9 @@
     ReplicationMonitor.addMonitorData(attributes, "unresolved-naming-conflicts",
         getNumUnresolvedNamingConflicts());
 
+    ReplicationMonitor.addMonitorData(attributes, "remote-pending-changes-size",
+        remotePendingChanges.getQueueSize());
+
     return attributes;
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index 9c7f7ae..dee8335 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2007-2008 Sun Microsystems, Inc.
+ *      Copyright 2007-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 
@@ -91,6 +91,16 @@
   }
 
   /**
+   * Returns the number of changes currently in this list.
+   *
+   * @return The number of changes currently in this list.
+   */
+  public synchronized int getQueueSize()
+  {
+    return pendingChanges.size();
+  }
+
+  /**
    * Add a new LDAPUpdateMsg that was received from the replication server
    * to the pendingList.
    *
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index cca49f5..27a909b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -182,7 +182,7 @@
       {
         try
         {
-          msgQueue.wait(5000);
+          msgQueue.wait(500);
         } catch (InterruptedException e)
         {
           // simply loop to try again.
@@ -337,7 +337,7 @@
       }
       if ((msgQueue.size() < queueLowmark) &&
           (queueByteSize < queueLowmarkBytes))
-        msgQueue.notify();
+        msgQueue.notifyAll();
     }
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index f79d4c3..78a036e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -137,6 +137,7 @@
   private List<RSInfo> rsList = new ArrayList<RSInfo>();
 
   private long generationID;
+  private int updateDoneCount = 0;
 
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -1363,6 +1364,13 @@
       try
       {
         ReplicationMsg msg = session.receive();
+        if (msg instanceof UpdateMsg)
+        {
+          synchronized (this)
+          {
+            rcvWindow--;
+          }
+        }
         if (msg instanceof WindowMsg)
         {
           WindowMsg windowMsg = (WindowMsg) msg;
@@ -1410,11 +1418,12 @@
   {
     try
     {
-      rcvWindow--;
-      if ((rcvWindow < halfRcvWindow) && (session != null))
+      updateDoneCount ++;
+      if ((updateDoneCount >= halfRcvWindow) && (session != null))
       {
-        session.publish(new WindowMsg(halfRcvWindow));
-        rcvWindow += halfRcvWindow;
+        session.publish(new WindowMsg(updateDoneCount));
+        rcvWindow += updateDoneCount;
+        updateDoneCount = 0;
       }
     } catch (IOException e)
     {

--
Gitblit v1.10.0