From 7ed8164f91194d46c58290cb0887fcfa7d585da7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 11 Jun 2015 10:20:09 +0000
Subject: [PATCH] OPENDJ-2106 (CR-7228) Entry is not replicated on second instance in 2 RS topology
---
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java | 28 ++++---------
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java | 16 --------
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java | 4 +-
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java | 67 ++++++++++++++-------------------
4 files changed, 40 insertions(+), 75 deletions(-)
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
index 18c955f..a13585c 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -31,7 +31,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.LocalizableMessage;
@@ -234,19 +233,14 @@
}
/**
- * Get the next update that must be sent to the consumer
- * from the message queue or from the database.
+ * Get the next update that must be sent to the consumer from the message queue or from the
+ * database.
*
- * @param connectedReplicaIds
- * Ids of replicas to accept when returning a message from
- * the late queue.
- * @param synchronous specifies what to do when the queue is empty.
- * when true, the method blocks; when false the method return null.
- *
- * @return The next update that must be sent to the consumer.
- * null when synchronous is false and queue is empty.
+ * @param sendToServerId
+ * serverId of the replica where changes will be sent
+ * @return The next update that must be sent to the consumer, or {@code null} when queue is empty
*/
- protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
+ protected UpdateMsg getNextMessage(int sendToServerId)
{
while (activeConsumer)
{
@@ -279,7 +273,7 @@
* restart as usual
* load this change on the delayList
*/
- fillLateQueue(connectedReplicaIds);
+ fillLateQueue(sendToServerId);
if (lateQueue.isEmpty())
{
// we could not find any messages in the changelog
@@ -342,10 +336,6 @@
{
while (msgQueue.isEmpty() && following)
{
- if (!synchronous)
- {
- return null;
- }
msgQueue.wait(500);
if (!activeConsumer)
{
@@ -381,14 +371,14 @@
* Fills the late queue with the most recent changes, accepting only the
* messages from provided replica ids.
*/
- private void fillLateQueue(Set<Integer> connectedReplicaIds)
+ private void fillLateQueue(int sendToServerId)
{
try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
{
while (cursor.next() && isLateQueueBelowThreshold())
{
final UpdateMsg record = cursor.getRecord();
- if (connectedReplicaIds.contains(record.getCSN().getServerId()))
+ if (record.getCSN().getServerId() != sendToServerId)
{
lateQueue.add(record);
}
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
index 4a9e1d6..0ab175d 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,7 +31,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -1265,21 +1264,6 @@
}
/**
- * Get the next update that need to be sent to a given LDAP server.
- * This call is blocking when no update is available or when dependencies
- * do not allow to send the next available change
- *
- * @param sHandler The server handler for the target directory server.
- *
- * @return the update that must be forwarded
- */
- public UpdateMsg take(ServerHandler sHandler)
- {
- // Next message can only be taken from connected DSs
- return sHandler.take(new HashSet<Integer>(getConnectedDSs().keySet()));
- }
-
- /**
* Creates and returns a cursor across this replication domain.
* <p>
* Client code must call {@link DBCursor#next()} to advance the cursor to the
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
index 21eb5fb..6656852 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -31,7 +31,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -603,15 +602,6 @@
/**
* Increment the number of updates sent to the server in assured safe data
- * mode.
- */
- private void incrementAssuredSdSentUpdates()
- {
- assuredSdSentUpdates++;
- }
-
- /**
- * Increment the number of updates sent to the server in assured safe data
* mode that timed out.
*/
public void incrementAssuredSdSentUpdatesTimeout()
@@ -639,15 +629,6 @@
/**
* Increment the number of updates sent to the server in assured safe read
- * mode.
- */
- private void incrementAssuredSrSentUpdates()
- {
- assuredSrSentUpdates++;
- }
-
- /**
- * Increment the number of updates sent to the server in assured safe read
* mode that timed out.
*/
public void incrementAssuredSrSentUpdatesTimeout()
@@ -942,17 +923,31 @@
* Select the next update that must be sent to the server managed by this
* ServerHandler.
*
- * @param connectedReplicaIds
- * Ids of replicas to accept when returning a message.
* @return the next update that must be sent to the server managed by this
* ServerHandler.
*/
- public UpdateMsg take(Set<Integer> connectedReplicaIds)
+ public UpdateMsg take()
{
- boolean interrupted = true;
- UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg
+ final UpdateMsg msg = getNextMessage(serverId);
+ acquirePermitInSendWindow();
+
+ if (msg != null)
+ {
+ incrementOutCount();
+ if (msg.isAssured())
+ {
+ incrementAssuredStats(msg);
+ }
+ return msg;
+ }
+ return null;
+ }
+
+ private void acquirePermitInSendWindow()
+ {
boolean acquired = false;
+ boolean interrupted = true;
do
{
try
@@ -964,22 +959,18 @@
// loop until not interrupted
}
} while ((interrupted || !acquired) && !shutdownWriter);
- if (msg != null)
- {
- incrementOutCount();
+ }
- if (msg.isAssured())
- {
- if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
- {
- incrementAssuredSrSentUpdates();
- } else if (!isDataServer())
- {
- incrementAssuredSdSentUpdates();
- }
- }
+ private void incrementAssuredStats(final UpdateMsg msg)
+ {
+ if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
+ {
+ assuredSrSentUpdates++;
}
- return msg;
+ else if (!isDataServer())
+ {
+ assuredSdSentUpdates++;
+ }
}
/**
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
index 343e2d5..f4fbe2a 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2014 ForgeRock AS
+ * Portions Copyright 2011-2015 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -101,7 +101,7 @@
while (!shutdown
|| !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
{
- final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
+ final UpdateMsg updateMsg = this.handler.take();
if (updateMsg == null)
{
// this connection is closing
--
Gitblit v1.10.0