From 96eaa516a85e620a6b76a64ffbe71cdc6037e026 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 13 Apr 2011 16:23:40 +0000
Subject: [PATCH] Initial fix for OpenDJ-97: Very many minor problems with the error logging for replication

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  326 +++++++++++++++++++++++++++--------------------------
 1 files changed, 166 insertions(+), 160 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 22148e5..347158c 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.collectionToString;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
@@ -67,7 +68,6 @@
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.HeartbeatMonitor;
 import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
@@ -871,7 +871,6 @@
 
       // Get info from every available replication servers
       replicationServerInfos = collectReplicationServersInfo();
-      String rsis = replicationServerInfos.toString();
 
       ReplicationServerInfo replicationServerInfo = null;
 
@@ -1030,22 +1029,18 @@
           this.getGenerationID()) ||
           (replicationServerInfo.getGenerationId() == -1))
         {
-          Message message =
-            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
-            baseDn.toString(),
-            Integer.toString(rsServerId),
-            replicationServer,
-            Integer.toString(serverId),
-            Long.toString(this.getGenerationID()));
+          Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
+              .get(serverId, rsServerId, baseDn,
+                  session.getReadableRemoteAddress(),
+                  getGenerationID());
           logError(message);
         } else
         {
-          Message message =
-            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
-            baseDn.toString(),
-            replicationServer,
-            Long.toString(this.getGenerationID()),
-            Long.toString(replicationServerInfo.getGenerationId()));
+          Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
+              .get(serverId, rsServerId, baseDn,
+                  session.getReadableRemoteAddress(),
+                  getGenerationID(),
+                  replicationServerInfo.getGenerationId());
           logError(message);
         }
       } else
@@ -1058,9 +1053,22 @@
         {
           connectionError = true;
           connectPhaseLock.notify();
-          Message message =
-            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString(), rsis);
-          logError(message);
+
+          if (replicationServerInfos.size() > 0)
+          {
+            Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
+                serverId,
+                baseDn,
+                collectionToString(replicationServerInfos.keySet(),
+                    ", "));
+            logError(message);
+          }
+          else
+          {
+            Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
+                serverId, baseDn);
+            logError(message);
+          }
         }
       }
     }
@@ -1149,30 +1157,33 @@
     }
   }
 
+
+
   /**
-   * Connect to the provided server performing the first phase handshake
-   * (start messages exchange) and return the reply message from the replication
+   * Connect to the provided server performing the first phase handshake (start
+   * messages exchange) and return the reply message from the replication
    * server, wrapped in a ReplicationServerInfo object.
    *
-   * @param server Server to connect to.
-   * @param keepConnection Do we keep session opened or not after handshake.
-   *        Use true if want to perform handshake phase 2 with the same session
-   *        and keep the session to create as the current one.
-   * @return The answer from the server . Null if could not
-   *         get an answer.
+   * @param server
+   *          Server to connect to.
+   * @param keepConnection
+   *          Do we keep session opened or not after handshake. Use true if want
+   *          to perform handshake phase 2 with the same session and keep the
+   *          session to create as the current one.
+   * @return The answer from the server . Null if could not get an answer.
    */
-  private ReplicationServerInfo performPhaseOneHandshake(String server,
-    boolean keepConnection)
+  private ReplicationServerInfo performPhaseOneHandshake(
+      String server, boolean keepConnection)
   {
-    ReplicationServerInfo replServerInfo = null;
-
-    // Parse server string.
     int separator = server.lastIndexOf(':');
     String port = server.substring(separator + 1);
     String hostname = server.substring(0, separator);
-    ProtocolSession localSession = null;
 
-    boolean error = false;
+    ProtocolSession localSession = null;
+    Socket socket = null;
+    boolean hasConnected = false;
+    Message errorMessage = null;
+
     try
     {
       /*
@@ -1180,132 +1191,131 @@
        */
       int intPort = Integer.parseInt(port);
       InetSocketAddress serverAddr = new InetSocketAddress(
-        InetAddress.getByName(hostname), intPort);
-      Socket socket = new Socket();
+          InetAddress.getByName(hostname), intPort);
+      socket = new Socket();
       socket.setReceiveBufferSize(1000000);
       socket.setTcpNoDelay(true);
       socket.connect(serverAddr, 500);
-      localSession = replSessionSecurity.createClientSession(server, socket,
-        ReplSessionSecurity.HANDSHAKE_TIMEOUT);
-      boolean isSslEncryption =
-        replSessionSecurity.isSslEncryption(server);
-      /*
-       * Send our ServerStartMsg.
-       */
-      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn,
-        maxRcvWindow, heartbeatInterval, state,
-        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
-        isSslEncryption,
-        groupId);
+      localSession = replSessionSecurity.createClientSession(
+          socket, ReplSessionSecurity.HANDSHAKE_TIMEOUT);
+      boolean isSslEncryption = replSessionSecurity
+          .isSslEncryption(server);
+
+      // Send our ServerStartMsg.
+      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
+          baseDn, maxRcvWindow, heartbeatInterval, state,
+          ProtocolVersion.getCurrentVersion(),
+          this.getGenerationID(), isSslEncryption, groupId);
       localSession.publish(serverStartMsg);
 
-      /*
-       * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
-       * back.
-       */
+      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
+      // come back.
       ReplicationMsg msg = localSession.receive();
-
       if (debugEnabled())
       {
-        debugInfo("In RB for " + baseDn +
-          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
-          "\nAND RECEIVED:\n" + msg.toString());
+        debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+            + serverStartMsg.toString() + "\nAND RECEIVED:\n"
+            + msg.toString());
       }
 
       // Wrap received message in a server info object
-      replServerInfo = ReplicationServerInfo.newInstance(msg);
+      ReplicationServerInfo replServerInfo = ReplicationServerInfo
+          .newInstance(msg);
 
       // Sanity check
       String repDn = replServerInfo.getBaseDn();
       if (!(this.baseDn.equals(repDn)))
       {
-        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
-          this.baseDn);
-        logError(message);
-        error = true;
+        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
+            this.baseDn);
+        return null;
       }
 
       /*
-       * We have sent our own protocol version to the replication server.
-       * The replication server will use the same one (or an older one
-       * if it is an old replication server).
+       * We have sent our own protocol version to the replication server. The
+       * replication server will use the same one (or an older one if it is an
+       * old replication server).
        */
-      protocolVersion = ProtocolVersion.minWithCurrent(
-        replServerInfo.getProtocolVersion());
+      protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo
+          .getProtocolVersion());
       localSession.setProtocolVersion(protocolVersion);
 
-
       if (!isSslEncryption)
       {
         localSession.stopEncryption();
       }
-    } catch (ConnectException e)
-    {
-      /*
-       * There was no server waiting on this host:port
-       * Log a notice and try the next replicationServer in the list
-       */
-      if (!connectionError)
-      {
-        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
-        if (keepConnection) // Log error message only for final connection
-        {
-          // the error message is only logged once to avoid overflowing
-          // the error log
-          logError(message);
-        } else if (debugEnabled())
-        {
-          debugInfo(message.toString());
-        }
-      }
-      error = true;
-    } catch (Exception e)
-    {
-      if ((e instanceof SocketTimeoutException) && debugEnabled())
-      {
-        debugInfo("Timeout trying to connect to RS " + server +
-          " for dn: " + baseDn);
-      }
-      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
-        baseDn, server, e.getLocalizedMessage() +
-        stackTraceToSingleLineString(e));
-      if (keepConnection) // Log error message only for final connection
-      {
-        logError(message);
-      } else if (debugEnabled())
-      {
-        debugInfo(message.toString());
-      }
-      error = true;
-    }
 
-    // Close session if requested
-    if (!keepConnection || error)
-    {
-      if (localSession != null)
+      hasConnected = true;
+
+      // If this connection as the one to use for sending and receiving
+      // updates, store it.
+      if (keepConnection)
       {
-        if (debugEnabled()) {
-          debugInfo("In RB, closing session after phase 1");
+        session = localSession;
+      }
+
+      return replServerInfo;
+    }
+    catch (ConnectException e)
+    {
+      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
+          server, baseDn);
+      return null;
+    }
+    catch (SocketTimeoutException e)
+    {
+      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(serverId,
+          server, baseDn);
+      return null;
+    }
+    catch (Exception e)
+    {
+      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+          server, baseDn, stackTraceToSingleLineString(e));
+      return null;
+    }
+    finally
+    {
+      if (!hasConnected || !keepConnection)
+      {
+        if (localSession != null)
+        {
+          localSession.close();
         }
 
-        localSession.close();
-        localSession = null;
+        if (socket != null)
+        {
+          try
+          {
+            socket.close();
+          }
+          catch (IOException e)
+          {
+            // Ignore.
+          }
+        }
       }
-      if (error)
+
+      if (!hasConnected && errorMessage != null)
       {
-        replServerInfo = null;
-      } // Be sure to return null.
+        // There was no server waiting on this host:port Log a notice and try
+        // the next replicationServer in the list
+        if (!connectionError)
+        {
+          if (keepConnection) // Log error message only for final connection
+          {
+            // the error message is only logged once to avoid overflowing
+            // the error log
+            logError(errorMessage);
+          }
 
+          if (debugEnabled())
+          {
+            debugInfo(errorMessage.toString());
+          }
+        }
+      }
     }
-
-    // If this connection as the one to use for sending and receiving updates,
-    // store it.
-    if (keepConnection)
-    {
-      session = localSession;
-    }
-
-    return replServerInfo;
   }
 
   /**
@@ -1323,6 +1333,8 @@
   private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
     boolean keepConnection)
   {
+    // FIXME: this should be merged with performPhaseOneHandshake to avoid
+    // code/bug duplication.
     ReplServerStartDSMsg replServerStartDSMsg = null;
 
     // Parse server string.
@@ -1344,7 +1356,7 @@
       socket.setReceiveBufferSize(1000000);
       socket.setTcpNoDelay(true);
       socket.connect(serverAddr, 500);
-      localSession = replSessionSecurity.createClientSession(server, socket,
+      localSession = replSessionSecurity.createClientSession(socket,
         ReplSessionSecurity.HANDSHAKE_TIMEOUT);
       boolean isSslEncryption =
         replSessionSecurity.isSslEncryption(server);
@@ -1400,7 +1412,9 @@
        */
       if (!connectionError)
       {
-        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
+        Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
+            server, baseDn);
+
         if (keepConnection) // Log error message only for final connection
         {
           // the error message is only logged once to avoid overflowing
@@ -1419,9 +1433,8 @@
         debugInfo("Timeout trying to connect to RS " + server +
           " for dn: " + baseDn);
       }
-      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
-        baseDn, server, e.getLocalizedMessage() +
-        stackTraceToSingleLineString(e));
+      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+          server, baseDn, stackTraceToSingleLineString(e));
       if (keepConnection) // Log error message only for final connection
       {
         logError(message);
@@ -1499,9 +1512,8 @@
 
     } catch (Exception e)
     {
-      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
-        baseDn, server, e.getLocalizedMessage() +
-        stackTraceToSingleLineString(e));
+      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+          server, baseDn, stackTraceToSingleLineString(e));
       logError(message);
 
       if (session != null)
@@ -1573,9 +1585,8 @@
 
     } catch (Exception e)
     {
-      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
-        baseDn, server, e.getLocalizedMessage() +
-        stackTraceToSingleLineString(e));
+      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
+          server, baseDn, stackTraceToSingleLineString(e));
       logError(message);
 
       if (session != null)
@@ -2196,15 +2207,8 @@
     // Start a heartbeat monitor thread.
     if (heartbeatInterval > 0)
     {
-      String threadName = "Replica DS("
-          + this.getServerId() + ") heartbeat monitor for domain \""
-          + this.baseDn + "\" from RS(" + this.getRsServerId()
-          + ") at " + session.getReadableRemoteAddress();
-
-      heartbeatMonitor = new HeartbeatMonitor(
-          threadName,
-          session,
-          heartbeatInterval);
+      heartbeatMonitor = new HeartbeatMonitor(getServerId(),
+          getRsServerId(), baseDn, session, heartbeatInterval);
       heartbeatMonitor.start();
     }
   }
@@ -2496,7 +2500,11 @@
         reStart(null, true);
       }
 
-      ProtocolSession failingSession = session;
+      // Save session information for later in case we need it for log messages
+      // after the session has been closed and/or failed.
+      final ProtocolSession failingSession = session;
+      final int replicationServerID = rsServerId;
+
       try
       {
         ReplicationMsg msg = session.receive();
@@ -2530,11 +2538,12 @@
           /*
            * RS performs a proper disconnection
            */
-          Message message =
-            NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
-            Integer.toString(rsServerId), baseDn.toString(),
-            Integer.toString(serverId));
+          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
+              .get(replicationServerID,
+                  failingSession.getReadableRemoteAddress(),
+                  serverId, baseDn);
           logError(message);
+
           // Try to find a suitable RS
           this.reStart(failingSession, true);
         } else if (msg instanceof MonitorMsg)
@@ -2597,12 +2606,10 @@
                 {
                   bestServerId = bestServerInfo.getServerId();
                 }
-                Message message =
-                  NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
-                  Integer.toString(serverId),
-                  Integer.toString(rsServerId),
-                  rsServerUrl,
-                  Integer.toString(bestServerId));
+                Message message = NOTE_NEW_BEST_REPLICATION_SERVER
+                    .get(serverId, replicationServerID,
+                        failingSession.getReadableRemoteAddress(),
+                        bestServerId, baseDn);
                 logError(message);
                 reStart(true);
               }
@@ -2632,10 +2639,9 @@
             /*
              * We did not initiate the close on our side, log an error message.
              */
-            Message message =
-              ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
-              Integer.toString(rsServerId), baseDn.toString(),
-              Integer.toString(serverId));
+            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
+                .get(serverId, baseDn, replicationServerID,
+                    failingSession.getReadableRemoteAddress());
             logError(message);
           }
           if (reconnectOnFailure)

--
Gitblit v1.10.0