From 0a9135e3444bbefde6188f456b9c9772a816096d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 18 Sep 2013 15:17:14 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  125 +++++++++++++++++++++--------------------
 1 files changed, 65 insertions(+), 60 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index eef08e4..e0ed2c1 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -75,7 +75,7 @@
  */
 public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
 {
-  private final String baseDn;
+  private final DN baseDN;
 
   /**
    * The Status analyzer that periodically verifies whether the connected DSs
@@ -172,21 +172,21 @@
   private ServerState ctHeartbeatState;
 
   /**
-   * Creates a new ReplicationServerDomain associated to the DN baseDn.
+   * Creates a new ReplicationServerDomain associated to the baseDN.
    *
-   * @param baseDn
-   *          The baseDn associated to the ReplicationServerDomain.
+   * @param baseDN
+   *          The baseDN associated to the ReplicationServerDomain.
    * @param localReplicationServer
    *          the ReplicationServer that created this instance.
    */
-  public ReplicationServerDomain(String baseDn,
+  public ReplicationServerDomain(DN baseDN,
       ReplicationServer localReplicationServer)
   {
-    this.baseDn = baseDn;
+    this.baseDN = baseDN;
     this.localReplicationServer = localReplicationServer;
     this.assuredTimeoutTimer = new Timer("Replication server RS("
         + localReplicationServer.getServerId()
-        + ") assured timer for domain \"" + baseDn + "\"", true);
+        + ") assured timer for domain \"" + baseDN + "\"", true);
     this.changelogDB = localReplicationServer.getChangelogDB();
 
     DirectoryServer.registerMonitorProvider(this);
@@ -253,7 +253,8 @@
           // Unknown assured mode: should never happen
           Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
             Integer.toString(localReplicationServer.getServerId()),
-            assuredMode.toString(), baseDn, update.toString());
+            assuredMode.toString(), baseDN.toNormalizedString(),
+            update.toString());
           logError(errorMsg);
           assuredMessage = false;
         }
@@ -405,7 +406,7 @@
   {
     try
     {
-      if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
+      if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg))
       {
         /*
          * JNR: Matt and I had a hard time figuring out where to put this
@@ -608,7 +609,8 @@
       // Should never happen
       Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
         Integer.toString(localReplicationServer.getServerId()),
-        Byte.toString(safeDataLevel), baseDn, update.toString());
+        Byte.toString(safeDataLevel), baseDN.toNormalizedString(),
+        update.toString());
       logError(errorMsg);
     } else if (sourceGroupId == groupId
     // Assured feature does not cross different group IDS
@@ -760,7 +762,7 @@
             mb.append(ERR_RS_ERROR_SENDING_ACK.get(
               Integer.toString(localReplicationServer.getServerId()),
               Integer.toString(origServer.getServerId()),
-              csn.toString(), baseDn));
+              csn.toString(), baseDN.toNormalizedString()));
             mb.append(" ");
             mb.append(stackTraceToSingleLineString(e));
             logError(mb.toMessage());
@@ -838,7 +840,7 @@
             mb.append(ERR_RS_ERROR_SENDING_ACK.get(
                 Integer.toString(localReplicationServer.getServerId()),
                 Integer.toString(origServer.getServerId()),
-                csn.toString(), baseDn));
+                csn.toString(), baseDN.toNormalizedString()));
             mb.append(" ");
             mb.append(stackTraceToSingleLineString(e));
             logError(mb.toMessage());
@@ -1275,7 +1277,7 @@
    */
   public Set<Integer> getServerIds()
   {
-    return changelogDB.getDomainServerIds(baseDn);
+    return changelogDB.getDomainServerIds(baseDN);
   }
 
   /**
@@ -1292,7 +1294,7 @@
    */
   public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
   {
-    return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
+    return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN);
   }
 
  /**
@@ -1305,7 +1307,7 @@
   */
   public long getCount(int serverId, CSN from, CSN to)
   {
-    return changelogDB.getCount(baseDn, serverId, from, to);
+    return changelogDB.getCount(baseDN, serverId, from, to);
   }
 
   /**
@@ -1315,16 +1317,17 @@
    */
   public long getChangesCount()
   {
-    return changelogDB.getDomainChangesCount(baseDn);
+    return changelogDB.getDomainChangesCount(baseDN);
   }
 
   /**
-   * Get the baseDn.
-   * @return Returns the baseDn.
+   * Get the baseDN.
+   *
+   * @return Returns the baseDN.
    */
-  public String getBaseDn()
+  public DN getBaseDN()
   {
-    return baseDn;
+    return baseDN;
   }
 
   /**
@@ -1520,7 +1523,7 @@
   {
     MessageBuilder mb = new MessageBuilder();
     mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
-        this.baseDn, Integer.toString(msg.getDestination())));
+        baseDN.toNormalizedString(), Integer.toString(msg.getDestination())));
     mb.append(" In Replication Server=").append(
       this.localReplicationServer.getMonitorInstanceName());
     mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
@@ -1567,7 +1570,8 @@
          */
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
-            this.baseDn, Integer.toString(msg.getDestination())));
+            baseDN.toNormalizedString(),
+            Integer.toString(msg.getDestination())));
         mb.append(" unroutable message =" + msg.getClass().getSimpleName());
         mb.append(" Details: " + ioe.getLocalizedMessage());
         final Message message = mb.toMessage();
@@ -1698,7 +1702,7 @@
 
     stopAllServers(true);
 
-    changelogDB.shutdownDomain(baseDn);
+    changelogDB.shutdownDomain(baseDN);
   }
 
   /**
@@ -1709,7 +1713,7 @@
   public ServerState getDbServerState()
   {
     ServerState serverState = new ServerState();
-    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
+    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDN).values())
     {
       serverState.update(lastCSN);
     }
@@ -1722,7 +1726,7 @@
   @Override
   public String toString()
   {
-    return "ReplicationServerDomain " + baseDn;
+    return "ReplicationServerDomain " + baseDN;
   }
 
   /**
@@ -1755,10 +1759,9 @@
             {
               if (i == 2)
               {
-                Message message =
-                    ERR_EXCEPTION_SENDING_TOPO_INFO
-                        .get(baseDn, "directory", Integer.toString(dsHandler
-                            .getServerId()), e.getMessage());
+                Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
+                    baseDN.toNormalizedString(), "directory",
+                    Integer.toString(dsHandler.getServerId()), e.getMessage());
                 logError(message);
               }
             }
@@ -1793,7 +1796,7 @@
             if (i == 2)
             {
               Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
-                  baseDn, "replication",
+                  baseDN.toNormalizedString(), "replication",
                   Integer.toString(rsHandler.getServerId()), e.getMessage());
               logError(message);
             }
@@ -1934,9 +1937,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from "
-          + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n"
-          + genIdMsg);
+      debug("Receiving ResetGenerationIdMsg from "
+          + senderHandler.getServerId() + ":\n" + genIdMsg);
     }
 
     try
@@ -1965,10 +1967,8 @@
         // Order to take a gen id we already have, just ignore
         if (debugEnabled())
         {
-          TRACER.debugInfo("In " + this
-              + " Reset generation id requested for baseDn " + baseDn
-              + " but generation id was already " + this.generationId + ":\n"
-              + genIdMsg);
+          debug("Reset generation id requested but generationId was already "
+              + this.generationId + ":\n" + genIdMsg);
         }
       }
 
@@ -1987,8 +1987,8 @@
           }
         } catch (IOException e)
         {
-          logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn,
-              e.getMessage()));
+          logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(
+              baseDN.toNormalizedString(), e.getMessage()));
         }
       }
 
@@ -2001,7 +2001,8 @@
           dsHandler.changeStatusForResetGenId(newGenId);
         } catch (IOException e)
         {
-          logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn,
+          logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(
+              baseDN.toNormalizedString(),
               Integer.toString(dsHandler.getServerId()),
               e.getMessage()));
         }
@@ -2014,7 +2015,8 @@
       // treatment.
       sendTopoInfoToAll();
 
-      logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId));
+      logError(NOTE_RESET_GENERATION_ID.get(baseDN.toNormalizedString(),
+          newGenId));
     }
     catch(Exception e)
     {
@@ -2069,7 +2071,8 @@
       sendTopoInfoToAllExcept(senderHandler);
 
       Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
-          senderHandler.getServerId(), baseDn, newStatus.toString());
+          senderHandler.getServerId(), baseDN.toNormalizedString(),
+          newStatus.toString());
       logError(message);
     }
     catch(Exception e)
@@ -2114,7 +2117,7 @@
       // StatusAnalyzer.
       if (debugEnabled())
       {
-        TRACER.debugInfo("Status analyzer for domain " + baseDn
+        TRACER.debugInfo("Status analyzer for domain " + baseDN
             + " has been interrupted when"
             + " trying to acquire domain lock for changing the status of DS "
             + dsHandler.getServerId());
@@ -2133,7 +2136,7 @@
       catch (IOException e)
       {
         logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
-            .get(baseDn,
+            .get(baseDN.toNormalizedString(),
                 Integer.toString(dsHandler.getServerId()),
                 e.getMessage()));
       }
@@ -2186,10 +2189,10 @@
   public void clearDbs()
   {
     // Reset the localchange and state db for the current domain
-    changelogDB.clearDomain(baseDn);
+    changelogDB.clearDomain(baseDN);
     try
     {
-      localReplicationServer.clearGenerationId(baseDn);
+      localReplicationServer.clearGenerationId(baseDN);
     }
     catch (Exception e)
     {
@@ -2285,7 +2288,7 @@
             rsHandler.getServerId(),
             rsHandler.session.getReadableRemoteAddress(),
             rsHandler.getGenerationId(),
-            baseDn, getLocalRSServerId(), generationId);
+            baseDN.toNormalizedString(), getLocalRSServerId(), generationId);
         logError(message);
 
         ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(),
@@ -2494,7 +2497,8 @@
   {
     return "Replication server RS(" + localReplicationServer.getServerId()
         + ") " + localReplicationServer.getServerURL() + ",cn="
-        + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication";
+        + baseDN.toNormalizedString().replace(',', '_').replace('=', '_')
+        + ",cn=Replication";
   }
 
   /**
@@ -2509,9 +2513,10 @@
         String.valueOf(localReplicationServer.getServerId())));
     attributes.add(Attributes.create("replication-server-port",
         String.valueOf(localReplicationServer.getReplicationPort())));
-    attributes.add(Attributes.create("domain-name", baseDn));
+    attributes.add(Attributes.create("domain-name",
+        baseDN.toNormalizedString()));
     attributes.add(Attributes.create("generation-id",
-        baseDn + " " + generationId));
+        baseDN + " " + generationId));
 
     // Missing changes
     long missingChanges = getDomainMonitorData().getMissingChangesRS(
@@ -2595,7 +2600,7 @@
           if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
           {
             // let's try to seek the first change <= eligibleCSN
-            CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
+            CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN);
             result.update(newCSN);
           } else {
             // for this serverId, all changes in the ChangelogDb are holder
@@ -2612,8 +2617,7 @@
 
     if (debugEnabled())
     {
-      TRACER
-          .debugInfo("In " + this + " getEligibleState() result is " + result);
+      debug("getEligibleState() result is " + result);
     }
     return result;
   }
@@ -2629,7 +2633,7 @@
   public ServerState getStartState()
   {
     ServerState domainStartState = new ServerState();
-    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
+    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDN).values())
     {
       domainStartState.update(firstCSN);
     }
@@ -2650,7 +2654,7 @@
     CSN eligibleCSN = null;
 
     for (Entry<Integer, CSN> entry :
-      changelogDB.getDomainLastCSNs(baseDn).entrySet())
+      changelogDB.getDomainLastCSNs(baseDN).entrySet())
     {
       // Consider this producer (DS/db).
       final int serverId = entry.getKey();
@@ -2767,7 +2771,7 @@
             logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                 .get("Replication Server "
                     + localReplicationServer.getReplicationPort() + " "
-                    + baseDn + " " + localReplicationServer.getServerId()));
+                    + baseDN + " " + localReplicationServer.getServerId()));
             stopServer(rsHandler, false);
           }
         }
@@ -2844,7 +2848,7 @@
    */
   public long getLatestDomainTrimDate()
   {
-    return changelogDB.getDomainLatestTrimDate(baseDn);
+    return changelogDB.getDomainLatestTrimDate(baseDN);
   }
 
   /**
@@ -2962,8 +2966,9 @@
 
   private void debug(String message)
   {
-    TRACER.debugInfo("In RS serverId=" + localReplicationServer.getServerId()
-        + " for baseDn=" + baseDn + " and port="
-        + localReplicationServer.getReplicationPort() + ": " + message);
+    TRACER.debugInfo("In ReplicationServerDomain serverId="
+        + localReplicationServer.getServerId() + " for baseDN=" + baseDN
+        + " and port=" + localReplicationServer.getReplicationPort()
+        + ": " + message);
   }
 }

--
Gitblit v1.10.0