opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -3038,6 +3038,9 @@ ReplicationMonitor.addMonitorData(attributes, "unresolved-naming-conflicts", getNumUnresolvedNamingConflicts()); ReplicationMonitor.addMonitorData(attributes, "remote-pending-changes-size", remotePendingChanges.getQueueSize()); return attributes; } } opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2007-2008 Sun Microsystems, Inc. * Copyright 2007-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; @@ -91,6 +91,16 @@ } /** * Returns the number of changes currently in this list. * * @return The number of changes currently in this list. */ public synchronized int getQueueSize() { return pendingChanges.size(); } /** * Add a new LDAPUpdateMsg that was received from the replication server * to the pendingList. * opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -182,7 +182,7 @@ { try { msgQueue.wait(5000); msgQueue.wait(500); } catch (InterruptedException e) { // simply loop to try again. @@ -337,7 +337,7 @@ } if ((msgQueue.size() < queueLowmark) && (queueByteSize < queueLowmarkBytes)) msgQueue.notify(); msgQueue.notifyAll(); } } opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -137,6 +137,7 @@ private List<RSInfo> rsList = new ArrayList<RSInfo>(); private long generationID; private int updateDoneCount = 0; /** * Creates a new ReplicationServer Broker for a particular ReplicationDomain. @@ -1363,6 +1364,13 @@ try { ReplicationMsg msg = session.receive(); if (msg instanceof UpdateMsg) { synchronized (this) { rcvWindow--; } } if (msg instanceof WindowMsg) { WindowMsg windowMsg = (WindowMsg) msg; @@ -1410,11 +1418,12 @@ { try { rcvWindow--; if ((rcvWindow < halfRcvWindow) && (session != null)) updateDoneCount ++; if ((updateDoneCount >= halfRcvWindow) && (session != null)) { session.publish(new WindowMsg(halfRcvWindow)); rcvWindow += halfRcvWindow; session.publish(new WindowMsg(updateDoneCount)); rcvWindow += updateDoneCount; updateDoneCount = 0; } } catch (IOException e) {