From 0a9135e3444bbefde6188f456b9c9772a816096d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 18 Sep 2013 15:17:14 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |   82 ++++++++++++++++++++++-------------------
 1 files changed, 44 insertions(+), 38 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index bffdd04..577c766 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -50,6 +50,7 @@
 import org.opends.server.tasks.InitializeTargetTask;
 import org.opends.server.tasks.InitializeTask;
 import org.opends.server.types.Attribute;
+import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
 
@@ -79,7 +80,7 @@
  *   and which can start receiving updates.
  * <p>
  *   When updates are received the Replication Service calls the
- *   {@link #processUpdate(UpdateMsg)} method.
+ *   {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
  *   ReplicationDomain implementation should implement the appropriate code
  *   for replaying the update on the local repository.
  *   When fully done the subclass must call the
@@ -121,7 +122,7 @@
    *  All Replication Domain using this baseDN will be connected
    *  through the Replication Service.
    */
-  private final String baseDN;
+  private final DN baseDN;
 
   /**
    * The identifier of this Replication Domain inside the
@@ -161,8 +162,8 @@
   /**
    * A Map used to store all the ReplicationDomains created on this server.
    */
-  private static Map<String, ReplicationDomain> domains =
-    new HashMap<String, ReplicationDomain>();
+  private static Map<DN, ReplicationDomain> domains =
+      new HashMap<DN, ReplicationDomain>();
 
   /*
    * Assured mode properties
@@ -335,7 +336,7 @@
    *                   is participating to a given Replication Domain.
    * @param initWindow Window used during initialization.
    */
-  public ReplicationDomain(String baseDN, int serverID,int initWindow)
+  public ReplicationDomain(DN baseDN, int serverID, int initWindow)
   {
     this.baseDN = baseDN;
     this.serverID = serverID;
@@ -358,8 +359,7 @@
    *                   is participating to a given Replication Domain.
    * @param serverState The serverState to use
    */
-  public ReplicationDomain(String baseDN, int serverID,
-    ServerState serverState)
+  public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
   {
     this.baseDN = baseDN;
     this.serverID = serverID;
@@ -397,7 +397,7 @@
     if (!isValidInitialStatus(initStatus))
     {
       Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
-        baseDN, Integer.toString(serverID));
+          getBaseDNString(), Integer.toString(serverID));
       logError(msg);
     } else
     {
@@ -426,7 +426,7 @@
     if (event == StatusMachineEvent.INVALID_EVENT)
     {
       Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
-        baseDN, Integer.toString(serverID));
+          getBaseDNString(), Integer.toString(serverID));
       logError(msg);
       return;
     }
@@ -482,13 +482,23 @@
   }
 
   /**
+   * Returns the base DN of this ReplicationDomain.
+   *
+   * @return The base DN of this ReplicationDomain
+   */
+  public DN getBaseDN()
+  {
+    return baseDN;
+  }
+
+  /**
    * Gets the baseDN of this domain.
    *
    * @return The baseDN for this domain.
    */
   public String getBaseDNString()
   {
-    return baseDN;
+    return baseDN.toNormalizedString();
   }
 
   /**
@@ -943,7 +953,7 @@
         requested servers. Log problem
         */
         Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
-            baseDN, Integer.toString(serverID),
+            getBaseDNString(), Integer.toString(serverID),
             update.toString(), ack.errorsToString());
         logError(errorMsg);
 
@@ -1387,7 +1397,7 @@
     if (serverToInitialize == RoutableMsg.ALL_SERVERS)
     {
       Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
-          countEntries(), baseDN, serverID);
+          countEntries(), getBaseDNString(), serverID);
       logError(msg);
 
       for (DSInfo dsi : getReplicasList())
@@ -1403,7 +1413,7 @@
     else
     {
       Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
-          countEntries(), baseDN, serverID, serverToInitialize);
+          countEntries(), getBaseDNString(), serverID, serverToInitialize);
       logError(msg);
 
       ieContext.startList.add(serverToInitialize);
@@ -1434,8 +1444,8 @@
 
         // Send start message to the peer
         InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
-            baseDN, serverID, serverToInitialize, serverRunningTheTask,
-            ieContext.entryCount, initWindow);
+            getBaseDNString(), serverID, serverToInitialize,
+            serverRunningTheTask, ieContext.entryCount, initWindow);
 
         broker.publish(initTargetMsg);
 
@@ -1457,7 +1467,6 @@
         // Notify the peer of the success
         DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
         broker.publish(doneMsg);
-
       }
       catch(DirectoryException exportException)
       {
@@ -1558,14 +1567,14 @@
         .getLocalizedMessage() : "";
     if (serverToInitialize == RoutableMsg.ALL_SERVERS)
     {
-      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL
-          .get(baseDN, serverID, cause);
+      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
+          getBaseDNString(), serverID, cause);
       logError(msg);
     }
     else
     {
       Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
-          baseDN, serverID, serverToInitialize, cause);
+          getBaseDNString(), serverID, serverToInitialize, cause);
       logError(msg);
     }
 
@@ -1922,7 +1931,7 @@
             Message errMsg =
               Message.raw(Category.SYNC, Severity.NOTICE,
                   ERR_INIT_EXPORTER_DISCONNECTION.get(
-                      this.baseDN,
+                      getBaseDNString(),
                       Integer.toString(this.serverID),
                       Integer.toString(ieContext.importSource)));
             if (ieContext.getException()==null)
@@ -2200,7 +2209,7 @@
       ieContext.initializeTask = initTask;
       ieContext.attemptCnt = 0;
       ieContext.initReqMsgSent = new InitializeRequestMsg(
-          baseDN, serverID, source, this.initWindow);
+          getBaseDNString(), serverID, source, this.initWindow);
 
       // Publish Init request msg
       broker.publish(ieContext.initReqMsgSent);
@@ -2261,7 +2270,7 @@
     {
       // Log starting
       Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
-          baseDN, initTargetMsgReceived.getSenderID(), serverID);
+          getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID);
       logError(msg);
 
       // Go into full update status
@@ -2395,7 +2404,7 @@
       finally
       {
         Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
-            baseDN, initTargetMsgReceived.getSenderID(), serverID,
+            getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
             (ieContext.getException() != null ? ieContext
                 .getException().getLocalizedMessage() : ""));
         logError(msg);
@@ -2436,7 +2445,7 @@
 
     if (newStatus == ServerStatus.INVALID_STATUS)
     {
-      Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDN,
+      Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
           Integer.toString(serverID), status.toString(), event.toString());
       logError(msg);
       return;
@@ -2514,10 +2523,8 @@
     }
     if (!allSet)
     {
-      ResultCode resultCode = ResultCode.OTHER;
-      Message message = ERR_RESET_GENERATION_ID_FAILED.get(baseDN);
-      throw new DirectoryException(
-          resultCode, message);
+      Message message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDNString());
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
   }
 
@@ -2592,7 +2599,7 @@
     if (!isConnected())
     {
       ResultCode resultCode = ResultCode.OTHER;
-      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(baseDN,
+      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
           Integer.toString(serverID),
           Long.toString(genIdMessage.getGenerationId()));
       throw new DirectoryException(
@@ -2987,11 +2994,10 @@
    * Starts the receiver side of the Replication Service.
    * <p>
    * After this method has been called, the Replication Service will start
-   * calling the {@link #processUpdate(UpdateMsg)}.
+   * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
    * <p>
    * This method must be called once and must be called after the
    * {@link #startPublishService(Collection, int, long, long)}.
-   *
    */
   public void startListenService()
   {
@@ -3234,8 +3240,8 @@
         } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
         {
           Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
-              Integer.toString(serverID), msgAssuredMode.toString(), baseDN,
-            msg.toString());
+              Integer.toString(serverID), msgAssuredMode.toString(),
+              getBaseDNString(), msg.toString());
           logError(errorMsg);
         }
         // Nothing to do in Assured safe data mode, only RS ack updates.
@@ -3401,9 +3407,9 @@
    * Publish an {@link UpdateMsg} to the Replication Service.
    * <p>
    * The Replication Service will handle the delivery of this {@link UpdateMsg}
-   * to all the participants of this Replication Domain.
-   * These members will be receive this {@link UpdateMsg} through a call
-   * of the {@link #processUpdate(UpdateMsg)} message.
+   * to all the participants of this Replication Domain. These members will be
+   * receive this {@link UpdateMsg} through a call of the
+   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
    *
    * @param msg The UpdateMsg that should be pushed.
    */
@@ -3449,8 +3455,8 @@
     {
       // This exception may only be raised if assured replication is
       // enabled
-      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(baseDN, Long.toString(
-        assuredTimeout), update.toString());
+      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
+          Long.toString(assuredTimeout), update.toString());
       logError(errorMsg);
     }
   }

--
Gitblit v1.10.0