From 0a9135e3444bbefde6188f456b9c9772a816096d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 18 Sep 2013 15:17:14 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  166 +++++++++++++++++++++---------------------------------
 1 files changed, 65 insertions(+), 101 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 2195c79..e1a7a00 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -45,6 +45,7 @@
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.*;
+import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.ServerConstants;
 
@@ -79,7 +80,7 @@
   private volatile String replicationServer = NO_CONNECTED_SERVER;
   private volatile Session session = null;
   private final ServerState state;
-  private final String baseDn;
+  private final DN baseDN;
   private final int serverId;
   private Semaphore sendWindow;
   private int maxSendWindow;
@@ -192,9 +193,9 @@
    * @param replicationDomain The replication domain that is creating us.
    * @param state The ServerState that should be used by this broker
    *        when negotiating the session with the replicationServer.
-   * @param baseDn The base DN that should be used by this broker
+   * @param baseDN The base DN that should be used by this broker
    *        when negotiating the session with the replicationServer.
-   * @param serverID2 The server ID that should be used by this broker
+   * @param serverId The server ID that should be used by this broker
    *        when negotiating the session with the replicationServer.
    * @param window The size of the send and receive window to use.
    * @param generationId The generationId for the server associated to the
@@ -208,14 +209,14 @@
    *        or zero if no CSN heartbeat should be sent.
    */
   public ReplicationBroker(ReplicationDomain replicationDomain,
-    ServerState state, String baseDn, int serverID2, int window,
+    ServerState state, DN baseDN, int serverId, int window,
     long generationId, long heartbeatInterval,
     ReplSessionSecurity replSessionSecurity, byte groupId,
     long changeTimeHeartbeatInterval)
   {
     this.domain = replicationDomain;
-    this.baseDn = baseDn;
-    this.serverId = serverID2;
+    this.baseDN = baseDN;
+    this.serverId = serverId;
     this.state = state;
     this.protocolVersion = ProtocolVersion.getCurrentVersion();
     this.replSessionSecurity = replSessionSecurity;
@@ -245,7 +246,7 @@
     {
       shutdown = false;
       this.rcvWindow = this.maxRcvWindow;
-      this.connect();
+      connect();
     }
   }
 
@@ -269,7 +270,7 @@
       }
 
       this.rcvWindow = this.maxRcvWindow;
-      this.connect();
+      connect();
     }
   }
 
@@ -779,8 +780,8 @@
 
   private void connect()
   {
-    if (this.baseDn.compareToIgnoreCase(
-      ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
+    if (this.baseDN.toNormalizedString().equalsIgnoreCase(
+        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
     {
       connectAsECL();
     } else
@@ -964,14 +965,14 @@
             || (electedRsInfo.getGenerationId() == -1))
         {
           Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
-              .get(serverId, rsServerId, baseDn,
+              .get(serverId, rsServerId, baseDN.toNormalizedString(),
                   session.getReadableRemoteAddress(),
                   getGenerationID());
           logError(message);
         } else
         {
           Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
-              .get(serverId, rsServerId, baseDn,
+              .get(serverId, rsServerId, baseDN.toNormalizedString(),
                   session.getReadableRemoteAddress(),
                   getGenerationID(),
                   electedRsInfo.getGenerationId());
@@ -995,15 +996,14 @@
           {
             Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
                 serverId,
-                baseDn,
-                collectionToString(replicationServerInfos.keySet(),
-                    ", "));
+                baseDN.toNormalizedString(),
+                collectionToString(replicationServerInfos.keySet(), ", "));
             logError(message);
           }
           else
           {
             Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
-                serverId, baseDn);
+                serverId, baseDN.toNormalizedString());
             logError(message);
           }
         }
@@ -1082,11 +1082,10 @@
         warn user and start heartbeat monitor to recover when a server
         with the right group id shows up.
         */
-        Message message =
-            WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte
-                .toString(groupId), Integer.toString(rsServerId), rsInfo
-                .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer
-                .toString(serverId));
+        Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
+                Byte.toString(groupId), Integer.toString(rsServerId),
+                rsInfo.getServerURL(), Byte.toString(getRsGroupId()),
+                baseDN.toNormalizedString(), Integer.toString(serverId));
         logError(message);
       }
       startRSHeartBeatMonitoring();
@@ -1098,10 +1097,9 @@
     }
     catch (Exception e)
     {
-      Message message =
-          ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e
-              .getLocalizedMessage()
-              + stackTraceToSingleLineString(e));
+      Message message = ERR_COMPUTING_FAKE_OPS.get(
+          baseDN.toNormalizedString(), rsInfo.getServerURL(),
+          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
       logError(message);
     }
     finally
@@ -1149,7 +1147,7 @@
 
         if (debugEnabled())
         {
-          TRACER.debugInfo("RB for dn " + baseDn + " and with server id "
+          TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
               + serverId + " computed " + nChanges + " changes late.");
         }
 
@@ -1211,6 +1209,8 @@
     String port = server.substring(separator + 1);
     String hostname = server.substring(0, separator);
 
+    final String baseDn = this.baseDN.toNormalizedString();
+
     Session localSession = null;
     Socket socket = null;
     boolean hasConnected = false;
@@ -1218,9 +1218,7 @@
 
     try
     {
-      /*
-       * Open a socket connection to the next candidate.
-       */
+      // Open a socket connection to the next candidate.
       int intPort = Integer.parseInt(port);
       InetSocketAddress serverAddr = new InetSocketAddress(
           InetAddress.getByName(hostname), intPort);
@@ -1239,15 +1237,15 @@
       StartMsg serverStartMsg;
       if (!isECL)
       {
-        serverStartMsg = new ServerStartMsg(serverId, url, baseDn,
-            maxRcvWindow, heartbeatInterval, state,
-            this.getGenerationID(), isSslEncryption, groupId);
+        serverStartMsg = new ServerStartMsg(serverId, url,
+            baseDN.toNormalizedString(), maxRcvWindow, heartbeatInterval, state,
+            getGenerationID(), isSslEncryption, groupId);
       }
       else
       {
         serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
             maxRcvWindow, heartbeatInterval, state,
-            this.getGenerationID(), isSslEncryption, groupId);
+            getGenerationID(), isSslEncryption, groupId);
       }
       localSession.publish(serverStartMsg);
 
@@ -1256,7 +1254,7 @@
       ReplicationMsg msg = localSession.receive();
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
             + serverStartMsg + "\nAND RECEIVED:\n" + msg);
       }
 
@@ -1266,10 +1264,9 @@
 
       // Sanity check
       String repDn = replServerInfo.getBaseDn();
-      if (!this.baseDn.equals(repDn))
+      if (!baseDn.equals(repDn))
       {
-        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn,
-            this.baseDn);
+        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, baseDn);
         return null;
       }
 
@@ -1324,22 +1321,8 @@
     {
       if (!hasConnected || !keepConnection)
       {
-        if (localSession != null)
-        {
-          localSession.close();
-        }
-
-        if (socket != null)
-        {
-          try
-          {
-            socket.close();
-          }
-          catch (IOException e)
-          {
-            // Ignore.
-          }
-        }
+        close(localSession);
+        close(socket);
       }
 
       if (!hasConnected && errorMessage != null)
@@ -1372,13 +1355,9 @@
    * reply message from the replication server.
    *
    * @param server Server we are connecting with.
-   * @return The ReplServerStartMsg the server replied. Null if could not
-   *         get an answer.
    */
-  private TopologyMsg performECLPhaseTwoHandshake(String server)
+  private void performECLPhaseTwoHandshake(String server)
   {
-    TopologyMsg topologyMsg = null;
-
     try
     {
       // Send our Start Session
@@ -1386,32 +1365,24 @@
       startECLSessionMsg.setOperationId("-1");
       session.publish(startECLSessionMsg);
 
-      /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
-       * Read the TopologyMsg that should come back.
-      topologyMsg = (TopologyMsg) session.receive();
-       */
+      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
             + startECLSessionMsg);
       }
 
       // Alright set the timeout to the desired value
       session.setSoTimeout(timeout);
       connected = true;
-
     } catch (Exception e)
     {
       Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
-          server, baseDn, stackTraceToSingleLineString(e));
+          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
       logError(message);
 
       setSession(null);
-
-      // Be sure to return null.
-      topologyMsg = null;
     }
-    return topologyMsg;
   }
 
   /**
@@ -1464,7 +1435,7 @@
 
       if (debugEnabled())
       {
-        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
             + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
       }
 
@@ -1474,7 +1445,7 @@
     } catch (Exception e)
     {
       Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
-          server, baseDn, stackTraceToSingleLineString(e));
+          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
       logError(message);
 
       setSession(null);
@@ -2118,8 +2089,8 @@
     // Start a heartbeat monitor thread.
     if (heartbeatInterval > 0)
     {
-      heartbeatMonitor = new HeartbeatMonitor(getServerId(),
-          getRsServerId(), baseDn, session, heartbeatInterval);
+      heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
+          baseDN.toNormalizedString(), session, heartbeatInterval);
       heartbeatMonitor.start();
     }
   }
@@ -2185,8 +2156,8 @@
         catch (Exception e)
         {
           MessageBuilder mb = new MessageBuilder();
-          mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn,
-              e.getLocalizedMessage()));
+          mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
+              baseDN.toNormalizedString(), e.getLocalizedMessage()));
           mb.append(stackTraceToSingleLineString(e));
           logError(mb.toMessage());
         }
@@ -2210,7 +2181,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo(this + " end restart : connected=" + connected
-        + " with RSid=" + this.getRsServerId() + " genid=" + this.generationID);
+        + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
     }
   }
 
@@ -2476,17 +2447,14 @@
         }
         else if (msg instanceof StopMsg)
         {
-          /*
-           * RS performs a proper disconnection
-           */
-          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
-              .get(replicationServerID,
-                  savedSession.getReadableRemoteAddress(),
-              serverId, baseDn);
+          // RS performs a proper disconnection
+          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
+              replicationServerID, savedSession.getReadableRemoteAddress(),
+              serverId, baseDN.toNormalizedString());
           logError(message);
 
           // Try to find a suitable RS
-          this.reStart(savedSession, true);
+          reStart(savedSession, true);
         }
         else if (msg instanceof MonitorMsg)
         {
@@ -2547,14 +2515,15 @@
                   message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                       serverId, replicationServerID,
                       savedSession.getReadableRemoteAddress(),
-                      baseDn);
+                      baseDN.toNormalizedString());
                 }
                 else
                 {
                   message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                       serverId, replicationServerID,
                       savedSession.getReadableRemoteAddress(),
-                      bestServerInfo.getServerId(), baseDn);
+                      bestServerInfo.getServerId(),
+                      baseDN.toNormalizedString());
                 }
                 logError(message);
                 reStart(true);
@@ -2586,12 +2555,10 @@
           final Session tmpSession = session;
           if (tmpSession == null || !tmpSession.closeInitiated())
           {
-            /*
-             * We did not initiate the close on our side, log an error message.
-             */
-            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
-                .get(serverId, baseDn, replicationServerID,
-                    savedSession.getReadableRemoteAddress());
+            // We did not initiate the close on our side, log an error message.
+            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
+                serverId, baseDN.toNormalizedString(), replicationServerID,
+                savedSession.getReadableRemoteAddress());
             logError(message);
           }
 
@@ -2678,7 +2645,7 @@
     if (debugEnabled())
       TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will"
         + " close the connection to replication server " + rsServerId + " for"
-        + " domain " + baseDn);
+        + " domain " + baseDN);
 
     synchronized (startStopLock)
     {
@@ -2767,10 +2734,8 @@
     if (connected)
     {
       return sendWindow.availablePermits();
-    } else
-    {
-      return 0;
     }
+    return 0;
   }
 
   /**
@@ -2864,9 +2829,9 @@
     } catch (IOException ex)
     {
       Message message = ERR_EXCEPTION_SENDING_CS.get(
-        baseDn,
+        baseDN.toNormalizedString(),
         Integer.toString(serverId),
-        ex.getLocalizedMessage() + stackTraceToSingleLineString(ex));
+        ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
       logError(message);
     }
   }
@@ -3022,10 +2987,9 @@
     // Start a CSN heartbeat thread.
     if (changeTimeHeartbeatSendInterval > 0)
     {
-      String threadName = "Replica DS("
-          + this.getServerId()
+      String threadName = "Replica DS(" + getServerId()
           + ") change time heartbeat publisher for domain \""
-          + this.baseDn + "\" to RS(" + this.getRsServerId()
+          + this.baseDN + "\" to RS(" + getRsServerId()
           + ") at " + session.getReadableRemoteAddress();
 
       ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(

--
Gitblit v1.10.0