From 45eb21b1354b6925fc058f834f505a9699d1bbbe Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 10 Jun 2009 08:43:50 +0000
Subject: [PATCH] External Changelog - first step - related issues 495,  519

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  309 +++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 294 insertions(+), 15 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c349289..a53ca20 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,15 +26,9 @@
  */
 package org.opends.server.replication.service;
 
-import org.opends.server.replication.protocol.HeartbeatMonitor;
-
-import org.opends.messages.*;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
-import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
@@ -52,13 +46,33 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
 import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.DSInfo;
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.HeartbeatMonitor;
+import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ServerStartECLMsg;
+import org.opends.server.replication.protocol.ServerStartMsg;
+import org.opends.server.replication.protocol.StartECLSessionMsg;
+import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.WindowMsg;
+import org.opends.server.replication.protocol.WindowProbeMsg;
+import org.opends.server.util.ServerConstants;
 import org.opends.server.replication.server.ReplicationServer;
 
 /**
@@ -300,6 +314,43 @@
     }
   }
 
+  private void connect()
+  {
+    if (this.baseDn.compareToIgnoreCase(
+        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
+    {
+      connectAsECL();
+    }
+    else
+    {
+      connectAsDataServer();
+    }
+  }
+
+  /**
+   * Special aspects of connecting as ECL compared to connecting as data server
+   * are :
+   * - 1 single RS configured
+   * - so no choice of the prefered RS
+   * - No same groupID polling
+   * - ?? Heartbeat
+   * - Start handshake is :
+   *    Broker ---> StartECLMsg       ---> RS
+   *          <---- ReplServerStartMsg ---
+   *           ---> StartSessionECLMsg --> RS
+   */
+  private void connectAsECL()
+  {
+    // FIXME:ECL List of RS to connect is for now limited to one RS only
+    String bestServer = this.servers.iterator().next();
+
+    ReplServerStartMsg inReplServerStartMsg
+      = performECLPhaseOneHandshake(bestServer, true);
+
+    if (inReplServerStartMsg!=null)
+      performECLPhaseTwoHandshake(bestServer);
+  }
+
   /**
    * Connect to a ReplicationServer.
    *
@@ -325,7 +376,7 @@
    *
    * @throws NumberFormatException address was invalid
    */
-  private void connect()
+  private void connectAsDataServer()
   {
     HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
 
@@ -350,6 +401,9 @@
        * Connect to each replication server and get their ServerState then find
        * out which one is the best to connect to.
        */
+      if (debugEnabled())
+        TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
+            " order to elect the prefered one");
       for (String server : servers)
       {
         // Connect to server and get reply message
@@ -375,6 +429,9 @@
           serverId, baseDn, groupId);
 
         // Best found, now initialize connection to this one (handshake phase 1)
+        if (debugEnabled())
+          TRACER.debugInfo(
+              "phase 2 : will perform PhaseOneH with the prefered RS.");
         replServerStartMsg = performPhaseOneHandshake(bestServer, true);
 
         if (replServerStartMsg != null) // Handshake phase 1 exchange went well
@@ -764,6 +821,8 @@
       {
         try
         {
+          if (debugEnabled())
+            TRACER.debugInfo("In RB, closing session after phase 1");
           localSession.close();
         } catch (IOException e)
         {
@@ -789,6 +848,225 @@
   }
 
   /**
+   * Connect to the provided server performing the first phase handshake
+   * (start messages exchange) and return the reply message from the replication
+   * server.
+   *
+   * @param server Server to connect to.
+   * @param keepConnection Do we keep session opened or not after handshake.
+   *        Use true if want to perform handshake phase 2 with the same session
+   *        and keep the session to create as the current one.
+   * @return The ReplServerStartMsg the server replied. Null if could not
+   *         get an answer.
+   */
+  private ReplServerStartMsg performECLPhaseOneHandshake(String server,
+    boolean keepConnection)
+  {
+    ReplServerStartMsg replServerStartMsg = null;
+
+    // Parse server string.
+    int separator = server.lastIndexOf(':');
+    String port = server.substring(separator + 1);
+    String hostname = server.substring(0, separator);
+    ProtocolSession localSession = null;
+
+    boolean error = false;
+    try
+    {
+      /*
+       * Open a socket connection to the next candidate.
+       */
+      int intPort = Integer.parseInt(port);
+      InetSocketAddress serverAddr = new InetSocketAddress(
+        InetAddress.getByName(hostname), intPort);
+      if (keepConnection)
+        tmpReadableServerName = serverAddr.toString();
+      Socket socket = new Socket();
+      socket.setReceiveBufferSize(1000000);
+      socket.setTcpNoDelay(true);
+      socket.connect(serverAddr, 500);
+      localSession = replSessionSecurity.createClientSession(server, socket,
+        ReplSessionSecurity.HANDSHAKE_TIMEOUT);
+      boolean isSslEncryption =
+        replSessionSecurity.isSslEncryption(server);
+
+      // Send our start msg.
+      ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
+          baseDn, 0, 0, 0, 0,
+          maxRcvWindow, heartbeatInterval, state,
+          ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
+          isSslEncryption,
+          groupId);
+      localSession.publish(serverStartECLMsg);
+
+      // Read the ReplServerStartMsg that should come back.
+      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("In RB for " + baseDn +
+          "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
+          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
+      }
+
+      // Sanity check
+      String repDn = replServerStartMsg.getBaseDn();
+      if (!(this.baseDn.equals(repDn)))
+      {
+        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
+          this.baseDn);
+        logError(message);
+        error = true;
+      }
+
+      /*
+       * We have sent our own protocol version to the replication server.
+       * The replication server will use the same one (or an older one
+       * if it is an old replication server).
+       */
+      if (keepConnection)
+        protocolVersion = ProtocolVersion.minWithCurrent(
+          replServerStartMsg.getVersion());
+
+      if (!isSslEncryption)
+      {
+        localSession.stopEncryption();
+      }
+    } catch (ConnectException e)
+    {
+      /*
+       * There was no server waiting on this host:port
+       * Log a notice and try the next replicationServer in the list
+       */
+      if (!connectionError)
+      {
+        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
+        if (keepConnection) // Log error message only for final connection
+        {
+          // the error message is only logged once to avoid overflowing
+          // the error log
+          logError(message);
+        } else if (debugEnabled())
+        {
+          TRACER.debugInfo(message.toString());
+        }
+      }
+      error = true;
+    } catch (Exception e)
+    {
+      if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+      {
+        TRACER.debugInfo("Timeout trying to connect to RS " + server +
+          " for dn: " + baseDn);
+      }
+      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
+        baseDn, server, e.getLocalizedMessage() +
+        stackTraceToSingleLineString(e));
+      if (keepConnection) // Log error message only for final connection
+      {
+        logError(message);
+      } else if (debugEnabled())
+      {
+        TRACER.debugInfo(message.toString());
+      }
+      error = true;
+    }
+
+    // Close session if requested
+    if (!keepConnection || error)
+    {
+      if (localSession != null)
+      {
+        try
+        {
+          if (debugEnabled())
+            TRACER.debugInfo("In RB, closing session after phase 1");
+          localSession.close();
+        } catch (IOException e)
+        {
+          // The session was already closed, just ignore.
+        }
+        localSession = null;
+      }
+      if (error)
+      {
+        replServerStartMsg = null;
+      } // Be sure to return null.
+
+    }
+
+    // If this connection as the one to use for sending and receiving updates,
+    // store it.
+    if (keepConnection)
+    {
+      session = localSession;
+    }
+
+    return replServerStartMsg;
+  }
+
+  /**
+   * Performs the second phase handshake (send StartSessionMsg and receive
+   * TopologyMsg messages exchange) and return the reply message from the
+   * replication server.
+   *
+   * @param server Server we are connecting with.
+   * @param initStatus The status we are starting with
+   * @return The ReplServerStartMsg the server replied. Null if could not
+   *         get an answer.
+   */
+  private TopologyMsg performECLPhaseTwoHandshake(String server)
+  {
+    TopologyMsg topologyMsg = null;
+
+    try
+    {
+      // Send our Start Session
+      StartECLSessionMsg startECLSessionMsg = null;
+      startECLSessionMsg = new StartECLSessionMsg();
+      startECLSessionMsg.setOperationId(Short.toString(serverId));
+      session.publish(startECLSessionMsg);
+
+      /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
+       * Read the TopologyMsg that should come back.
+      topologyMsg = (TopologyMsg) session.receive();
+       */
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("In RB for " + baseDn +
+          "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
+      //  +   "\nAND RECEIVED:\n" + topologyMsg.toString());
+      }
+
+      // Alright set the timeout to the desired value
+      session.setSoTimeout(timeout);
+      connected = true;
+
+    } catch (Exception e)
+    {
+      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
+        baseDn, server, e.getLocalizedMessage() +
+        stackTraceToSingleLineString(e));
+      logError(message);
+
+      if (session != null)
+      {
+        try
+        {
+          session.close();
+        } catch (IOException ex)
+        {
+          // The session was already closed, just ignore.
+        }
+        session = null;
+      }
+      // Be sure to return null.
+      topologyMsg = null;
+    }
+    return topologyMsg;
+  }
+
+  /**
    * Performs the second phase handshake (send StartSessionMsg and receive
    * TopologyMsg messages exchange) and return the reply message from the
    * replication server.
@@ -1720,6 +1998,12 @@
     {
       boolean done = false;
 
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
+          " started.");
+      }
+
       while ((!done) && (!sameGroupIdPollershutdown))
       {
         // Sleep some time between checks
@@ -1850,11 +2134,6 @@
    */
   public void receiveTopo(TopologyMsg topoMsg)
   {
-
-    if (debugEnabled())
-      TRACER.debugInfo("Replication domain " + baseDn
-        + " received topology info update:\n" + topoMsg);
-
     // Store new lists
     synchronized(getDsList())
     {

--
Gitblit v1.10.0