From 7adb93986ace907531875e25be1f94d735fbb068 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java |   48 +++++++++++++++++++++++++++---------------------
 1 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index 513e41c..a284aae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,14 +26,19 @@
  */
 package org.opends.server.replication.plugin;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+
 import java.util.NoSuchElementException;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import java.util.concurrent.TimeoutException;
+import org.opends.messages.Message;
+import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
 import org.opends.server.types.operation.PluginOperation;
 
 /**
@@ -61,32 +66,23 @@
   private ChangeNumberGenerator changeNumberGenerator;
 
   /**
-   * The Replicationbroker that will be used to send UpdateMsg.
+   * The ReplicationDomain that will be used to send UpdateMsg.
    */
-  private ReplicationBroker broker;
-
-  /**
-   * The ServerState that will be updated when UpdateMsg are committed.
-   */
-  private ServerState state;
+  private ReplicationDomain domain;
 
   /**
    * Creates a new PendingChanges using the provided ChangeNumberGenerator.
    *
    * @param changeNumberGenerator The ChangeNumberGenerator to use to create
    *                               new unique ChangeNumbers.
-   * @param broker  The Replicationbroker that will be used to send
+   * @param domain  The ReplicationDomain that will be used to send
    *                UpdateMsg.
-   * @param state   The ServerState that will be updated when UpdateMsg
-   *                are committed.
    */
   public PendingChanges(
-      ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
-      ServerState state)
+      ChangeNumberGenerator changeNumberGenerator, ReplicationDomain domain)
   {
     this.changeNumberGenerator = changeNumberGenerator;
-    this.broker = broker;
-    this.state = state;
+    this.domain = domain;
   }
 
   /**
@@ -96,7 +92,7 @@
    *
    * @return The UpdateMsg that was just removed.
    */
-  public synchronized UpdateMsg remove(ChangeNumber changeNumber)
+  public synchronized LDAPUpdateMsg remove(ChangeNumber changeNumber)
   {
     return pendingChanges.remove(changeNumber).getMsg();
   }
@@ -119,7 +115,7 @@
    * @param msg          The message associated to the update.
    */
   public synchronized void commit(ChangeNumber changeNumber,
-      UpdateMsg msg)
+      LDAPUpdateMsg msg)
   {
     PendingChange curChange = pendingChanges.get(changeNumber);
     if (curChange == null)
@@ -186,9 +182,19 @@
           (firstChange.getOp().isSynchronizationOperation() == false))
       {
         numSentUpdates++;
-        broker.publish(firstChange.getMsg());
-      }
-      state.update(firstChangeNumber);
+        LDAPUpdateMsg updateMsg = firstChange.getMsg();
+          try
+          {
+            domain.publish(updateMsg);
+          } catch (TimeoutException ex) {
+            // This exception may only be raised if assured replication is
+            // enabled
+            Message errorMsg = ERR_DS_ACK_TIMEOUT.get(
+              domain.getServiceID(), Long.toString(domain.getAssuredTimeout()),
+              updateMsg.toString());
+            logError(errorMsg);
+          }
+        }
       pendingChanges.remove(firstChangeNumber);
 
       if (pendingChanges.isEmpty())

--
Gitblit v1.10.0