From a592fe71c4c2e29a136f9700a2981f3dcbd7e114 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 22 Sep 2014 19:47:33 +0000
Subject: [PATCH] OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation

---
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java |  120 +++++++-----------------------------------------------------
 1 files changed, 14 insertions(+), 106 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c37bdda..eecfc65 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -49,7 +49,6 @@
 import org.opends.server.replication.protocol.*;
 import org.opends.server.types.DN;
 import org.opends.server.types.HostPort;
-import org.opends.server.util.ServerConstants;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
@@ -279,7 +278,7 @@
     {
       shutdown = false;
       this.rcvWindow = getMaxRcvWindow();
-      connect();
+      connectAsDataServer();
     }
   }
 
@@ -692,19 +691,6 @@
     }
   }
 
-  private void connect()
-  {
-    if (getBaseDN().toNormalizedString().equalsIgnoreCase(
-        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
-    {
-      connectAsECL();
-    }
-    else
-    {
-      connectAsDataServer();
-    }
-  }
-
   /**
    * Contacts all replication servers to get information from them and being
    * able to choose the more suitable.
@@ -718,7 +704,7 @@
     for (String serverUrl : getReplicationServerUrls())
     {
       // Connect to server + get and store info about it
-      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
+      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
       final ReplicationServerInfo rsInfo = rs.rsInfo;
       if (rsInfo != null)
       {
@@ -730,35 +716,6 @@
   }
 
   /**
-   * Special aspects of connecting as ECL (External Change Log) compared to
-   * connecting as data server are :
-   * <ul>
-   * <li>1 single RS configured</li>
-   * <li>so no choice of the preferred RS</li>
-   * <li>?? Heartbeat</li>
-   * <li>Start handshake is :
-   *
-   * <pre>
-   *    Broker ---> StartECLMsg       ---> RS
-   *          <---- ReplServerStartMsg ---
-   *           ---> StartSessionECLMsg --> RS
-   * </pre>
-   *
-   * </li>
-   * </ul>
-   */
-  private void connectAsECL()
-  {
-    // FIXME:ECL List of RS to connect is for now limited to one RS only
-    final String bestServerURL = getReplicationServerUrls().iterator().next();
-    final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
-    if (rs.isConnected())
-    {
-      performECLPhaseTwoHandshake(bestServerURL, rs);
-    }
-  }
-
-  /**
    * Connect to a ReplicationServer.
    *
    * Handshake sequences between a DS and a RS is divided into 2 logical
@@ -834,7 +791,7 @@
               + evals.getBestRS());
 
         final ConnectedRS electedRS = performPhaseOneHandshake(
-            evals.getBestRS().getServerURL(), true, false);
+            evals.getBestRS().getServerURL(), true);
         final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
         if (electedRsInfo != null)
         {
@@ -1100,12 +1057,9 @@
    *          Do we keep session opened or not after handshake. Use true if want
    *          to perform handshake phase 2 with the same session and keep the
    *          session to create as the current one.
-   * @param isECL
-   *          Indicates whether or not the an ECL handshake is to be performed.
    * @return The answer from the server . Null if could not get an answer.
    */
-  private ConnectedRS performPhaseOneHandshake(String serverURL,
-      boolean keepSession, boolean isECL)
+  private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
   {
     Session newSession = null;
     Socket socket = null;
@@ -1124,8 +1078,7 @@
         socket.bind(local);
       }
       int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
-      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
-          timeoutMS);
+      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
       newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
       boolean isSslEncryption = replSessionSecurity.isSslEncryption();
 
@@ -1133,19 +1086,9 @@
       final HostPort hp = new HostPort(
           socket.getLocalAddress().getHostName(), socket.getLocalPort());
       final String url = hp.toString();
-      final StartMsg serverStartMsg;
-      if (!isECL)
-      {
-        serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
-            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
-            getGenerationID(), isSslEncryption, getGroupId());
-      }
-      else
-      {
-        serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
-            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
-            getGenerationID(), isSslEncryption, getGroupId());
-      }
+      final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
+          getMaxRcvWindow(), config.getHeartbeatInterval(), state,
+          getGenerationID(), isSslEncryption, getGroupId());
       newSession.publish(serverStartMsg);
 
       // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1233,44 +1176,6 @@
     return setConnectedRS(ConnectedRS.noConnectedRS());
   }
 
-
-
-  /**
-   * Performs the second phase handshake for External Change Log (send
-   * StartSessionMsg and receive TopologyMsg messages exchange) and return the
-   * reply message from the replication server.
-   *
-   * @param server Server we are connecting with.
-   */
-  private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
-  {
-    try
-    {
-      // Send our Start Session
-      final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
-      startECLSessionMsg.setOperationId("-1");
-      rs.session.publish(startECLSessionMsg);
-
-      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
-      if (logger.isTraceEnabled())
-      {
-        debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
-      }
-
-      // Alright set the timeout to the desired value
-      rs.session.setSoTimeout(timeout);
-      setConnectedRS(rs);
-    }
-    catch (Exception e)
-    {
-      logger.warn(WARN_EXCEPTION_STARTING_SESSION_PHASE, getServerId(), server, getBaseDN().toNormalizedString(),
-          stackTraceToSingleLineString(e));
-
-      rs.session.close();
-      setConnectedRS(ConnectedRS.noConnectedRS());
-    }
-  }
-
   /**
    * Performs the second phase handshake (send StartSessionMsg and receive
    * TopologyMsg messages exchange) and return the reply message from the
@@ -2273,7 +2178,7 @@
 
         try
         {
-          connect();
+          connectAsDataServer();
           rs = connectedRS.get();
         }
         catch (Exception e)
@@ -3077,7 +2982,8 @@
         Map<Integer, ReplicationServerInfo> previousRsInfos)
     {
       this.rsServerId = rsServerId;
-      this.replicaInfos = dsInfosToKeep;
+      this.replicaInfos = dsInfosToKeep == null
+          ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
       this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
           previousRsInfos, configuredReplicationServerUrls);
     }
@@ -3284,7 +3190,9 @@
     @Override
     public String toString()
     {
-      return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
+      return getClass().getSimpleName()
+          + " rsServerId=" + rsServerId
+          + ", replicaInfos=" + replicaInfos.values()
           + ", rsInfos=" + rsInfos.values();
     }
   }

--
Gitblit v1.10.0