From 7adb93986ace907531875e25be1f94d735fbb068 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 48 +++++++++++++++++++++++++++---------------------
1 files changed, 27 insertions(+), 21 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index 513e41c..a284aae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,14 +26,19 @@
*/
package org.opends.server.replication.plugin;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+import org.opends.messages.Message;
+import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.types.operation.PluginOperation;
/**
@@ -61,32 +66,23 @@
private ChangeNumberGenerator changeNumberGenerator;
/**
- * The Replicationbroker that will be used to send UpdateMsg.
+ * The ReplicationDomain that will be used to send UpdateMsg.
*/
- private ReplicationBroker broker;
-
- /**
- * The ServerState that will be updated when UpdateMsg are committed.
- */
- private ServerState state;
+ private ReplicationDomain domain;
/**
* Creates a new PendingChanges using the provided ChangeNumberGenerator.
*
* @param changeNumberGenerator The ChangeNumberGenerator to use to create
* new unique ChangeNumbers.
- * @param broker The Replicationbroker that will be used to send
+ * @param domain The ReplicationDomain that will be used to send
* UpdateMsg.
- * @param state The ServerState that will be updated when UpdateMsg
- * are committed.
*/
public PendingChanges(
- ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
- ServerState state)
+ ChangeNumberGenerator changeNumberGenerator, ReplicationDomain domain)
{
this.changeNumberGenerator = changeNumberGenerator;
- this.broker = broker;
- this.state = state;
+ this.domain = domain;
}
/**
@@ -96,7 +92,7 @@
*
* @return The UpdateMsg that was just removed.
*/
- public synchronized UpdateMsg remove(ChangeNumber changeNumber)
+ public synchronized LDAPUpdateMsg remove(ChangeNumber changeNumber)
{
return pendingChanges.remove(changeNumber).getMsg();
}
@@ -119,7 +115,7 @@
* @param msg The message associated to the update.
*/
public synchronized void commit(ChangeNumber changeNumber,
- UpdateMsg msg)
+ LDAPUpdateMsg msg)
{
PendingChange curChange = pendingChanges.get(changeNumber);
if (curChange == null)
@@ -186,9 +182,19 @@
(firstChange.getOp().isSynchronizationOperation() == false))
{
numSentUpdates++;
- broker.publish(firstChange.getMsg());
- }
- state.update(firstChangeNumber);
+ LDAPUpdateMsg updateMsg = firstChange.getMsg();
+ try
+ {
+ domain.publish(updateMsg);
+ } catch (TimeoutException ex) {
+ // This exception may only be raised if assured replication is
+ // enabled
+ Message errorMsg = ERR_DS_ACK_TIMEOUT.get(
+ domain.getServiceID(), Long.toString(domain.getAssuredTimeout()),
+ updateMsg.toString());
+ logError(errorMsg);
+ }
+ }
pendingChanges.remove(firstChangeNumber);
if (pendingChanges.isEmpty())
--
Gitblit v1.10.0