From 6d84d49bcafa262c5b9bb545810778721597421a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 26 Aug 2013 13:51:36 +0000
Subject: [PATCH] Found problems in the replication ECL code. Also made the code more explicit.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                         |  106 +++++++++++---------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java                                 |    4 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java                             |   52 ++++++++----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                                          |   11 +-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java |   20 ++--
 5 files changed, 97 insertions(+), 96 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 67b977c..387b190 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -565,7 +565,7 @@
   private String findCookie(final int startDraftCN) throws ChangelogException,
       DirectoryException
   {
-    ChangelogDB changelogDB = replicationServer.getChangelogDB();
+    final ChangelogDB changelogDB = replicationServer.getChangelogDB();
 
     if (startDraftCN <= 1)
     {
@@ -580,9 +580,10 @@
         return null;
       }
 
-      final int firstKey = changelogDB.getFirstKey();
-      String crossDomainStartState = changelogDB.getPreviousCookie(firstKey);
-      changelogDBIter = changelogDB.generateIterator(firstKey);
+      final int firstDraftCN = changelogDB.getFirstDraftCN();
+      final String crossDomainStartState =
+          changelogDB.getPreviousCookie(firstDraftCN);
+      changelogDBIter = changelogDB.generateIterator(firstDraftCN);
       return crossDomainStartState;
     }
 
@@ -633,7 +634,7 @@
         return null;
       }
 
-      final int lastKey = changelogDB.getLastKey();
+      final int lastKey = changelogDB.getLastDraftCN();
       crossDomainStartState = changelogDB.getPreviousCookie(lastKey);
       changelogDBIter = changelogDB.generateIterator(lastKey);
       return crossDomainStartState;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 7b16799..7455fa0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -89,8 +89,8 @@
   private Thread listenThread;
   private Thread connectThread;
 
-  /** The list of replication servers configured by the administrator. */
-  private Collection<String> replicationServers;
+  /** The list of replication server URLs configured by the administrator. */
+  private Collection<String> replicationServerUrls;
 
   /**
    * This table is used to store the list of dn for which we are currently
@@ -219,9 +219,9 @@
   {
     replicationPort = configuration.getReplicationPort();
     serverId = configuration.getReplicationServerId();
-    replicationServers = configuration.getReplicationServer();
-    if (replicationServers == null)
-      replicationServers = new ArrayList<String>();
+    replicationServerUrls = configuration.getReplicationServer();
+    if (replicationServerUrls == null)
+      replicationServerUrls = new ArrayList<String>();
     queueSize = configuration.getQueueSize();
     purgeDelay = configuration.getReplicationPurgeDelay();
     dbDirname = configuration.getReplicationDBDirectory();
@@ -259,8 +259,8 @@
     configuration.addChangeListener(this);
     try
     {
-      backendConfigEntryDN = DN.decode(
-      "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
+      backendConfigEntryDN =
+         DN.decode("ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
     } catch (Exception e) { /* do nothing */ }
 
     // Creates the backend associated to this ReplicationServer
@@ -404,14 +404,14 @@
 
           /*
            * check that all replication server in the config are in the
-           * connected Set. If not create the connection
+           * connected Set. If not, create the connection
            */
-          for (String aServerURL : replicationServers)
+          for (String rsURL : replicationServerUrls)
           {
-            final int separator = aServerURL.lastIndexOf(':');
-            final String portString = aServerURL.substring(separator + 1);
-            final int port = Integer.parseInt(portString);
-            final String hostname = aServerURL.substring(0, separator);
+            final int separator = rsURL.lastIndexOf(':');
+            final String hostname = rsURL.substring(0, separator);
+            final int port = Integer.parseInt(rsURL.substring(separator + 1));
+
             final InetAddress inetAddress;
             try
             {
@@ -436,13 +436,13 @@
             }
 
             // Don't connect to a server if it is already connected.
-            final String normalizedServerURL = normalizeServerURL(aServerURL);
+            final String normalizedServerURL = normalizeServerURL(rsURL);
             if (connectedRSUrls.contains(normalizedServerURL))
             {
               continue;
             }
 
-            connect(aServerURL, domain.getBaseDn());
+            connect(rsURL, domain.getBaseDn());
           }
         }
 
@@ -538,10 +538,7 @@
       listenSocket = new ServerSocket();
       listenSocket.bind(new InetSocketAddress(replicationPort));
 
-      /*
-       * creates working threads
-       * We must first connect, then start to listen.
-       */
+      // creates working threads: we must first connect, then start to listen.
       if (debugEnabled())
         TRACER.debugInfo("RS " +getMonitorInstanceName()+
             " creates connect thread");
@@ -559,7 +556,7 @@
       // can know me and really enableECL.
       if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID) != null)
       {
-        // Already done . Nothing to do
+        // Already done. Nothing to do
         return;
       }
       eclwe = new ECLWorkflowElement(this);
@@ -567,7 +564,6 @@
       if (debugEnabled())
         TRACER.debugInfo("RS " +getMonitorInstanceName()+
             " successfully initialized");
-
     } catch (ChangelogException e)
     {
       Message message = ERR_COULD_NOT_READ_DB.get(
@@ -927,7 +923,7 @@
 
         try
         {
-          lastGeneratedDraftCN = changelogDB.getLastKey();
+          lastGeneratedDraftCN = changelogDB.getLastDraftCN();
         }
         catch (Exception ignored)
         {
@@ -995,9 +991,9 @@
 
     disconnectRemovedReplicationServers(configuration.getReplicationServer());
 
-    replicationServers = configuration.getReplicationServer();
-    if (replicationServers == null)
-      replicationServers = new ArrayList<String>();
+    replicationServerUrls = configuration.getReplicationServer();
+    if (replicationServerUrls == null)
+      replicationServerUrls = new ArrayList<String>();
 
     queueSize = configuration.getQueueSize();
     long newPurgeDelay = configuration.getReplicationPurgeDelay();
@@ -1088,8 +1084,8 @@
       broadcastConfigChange();
     }
 
-    if ((configuration.getReplicationDBDirectory() != null) &&
-        (!dbDirname.equals(configuration.getReplicationDBDirectory())))
+    final String newDir = configuration.getReplicationDBDirectory();
+    if (newDir != null && !dbDirname.equals(newDir))
     {
       return new ConfigChangeResult(ResultCode.SUCCESS, true);
     }
@@ -1109,25 +1105,24 @@
      * First try the set of configured replication servers to see if one of them
      * is this replication server (this should always be the case).
      */
-    for (String rs : replicationServers)
+    for (String rsUrl : replicationServerUrls)
     {
       /*
        * No need validate the string format because the admin framework has
        * already done it.
        */
-      final int index = rs.lastIndexOf(":");
-      final String hostname = rs.substring(0, index);
-      final int port = Integer.parseInt(rs.substring(index + 1));
+      final int index = rsUrl.lastIndexOf(":");
+      final String hostname = rsUrl.substring(0, index);
+      final int port = Integer.parseInt(rsUrl.substring(index + 1));
+
       if (port == replicationPort && isLocalAddress(hostname))
       {
-        serverURL = rs;
+        serverURL = rsUrl;
         return;
       }
     }
 
-    /*
-     * Fall-back to the machine hostname.
-     */
+    // Fall-back to the machine hostname.
     serverURL = InetAddress.getLocalHost().getHostName() + ":"
         + replicationPort;
   }
@@ -1249,8 +1244,7 @@
   public void remove()
   {
     if (debugEnabled())
-      TRACER.debugInfo("RS " +getMonitorInstanceName()+
-          " starts removing");
+      TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing");
 
     shutdown();
     removeBackend();
@@ -1471,23 +1465,20 @@
   {
     Collection<String> serversToDisconnect = new ArrayList<String>();
 
-    for (String server: replicationServers)
+    for (String rsUrl : replicationServerUrls)
     {
-      if (!newReplServers.contains(server))
+      if (!newReplServers.contains(rsUrl))
       {
         try
         {
-          // translate the server name into IP address
-          // and keep the port number
-          String[] host = server.split(":");
+          // translate the server name into IP address and keep the port number
+          String[] host = rsUrl.split(":");
           serversToDisconnect.add(
-              (InetAddress.getByName(host[0])).getHostAddress()
-              + ":" + host[1]);
+              InetAddress.getByName(host[0]).getHostAddress() + ":" + host[1]);
         }
         catch (IOException e)
         {
-          Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(server);
-          logError(message);
+          logError(ERR_COULD_NOT_SOLVE_HOSTNAME.get(rsUrl));
         }
       }
     }
@@ -1686,7 +1677,7 @@
     {
       if (changelogDB != null)
       {
-        return changelogDB.getFirstKey();
+        return changelogDB.getFirstDraftCN();
       }
       return 0;
     }
@@ -1702,7 +1693,7 @@
     {
       if (changelogDB != null)
       {
-        return changelogDB.getLastKey();
+        return changelogDB.getLastDraftCN();
       }
       return 0;
     }
@@ -1754,10 +1745,10 @@
 
     int lastDraftCN;
     boolean dbEmpty = false;
-    long newestDate = 0L;
-    ChangelogDB changelogDB = getChangelogDB();
+    long newestDate = 0;
+    final ChangelogDB changelogDB = getChangelogDB();
 
-    int firstDraftCN = changelogDB.getFirstKey();
+    int firstDraftCN = changelogDB.getFirstDraftCN();
     Map<String,ServerState> domainsServerStateForLastSeqnum = null;
     ChangeNumber changeNumberForLastSeqnum = null;
     String domainForLastSeqnum = null;
@@ -1769,7 +1760,7 @@
     }
     else
     {
-      lastDraftCN = changelogDB.getLastKey();
+      lastDraftCN = changelogDB.getLastDraftCN();
 
       // Get the generalized state associated with the current last DraftCN
       // and initializes from it the startStates table
@@ -1813,7 +1804,7 @@
         //  (may be this domain was disabled when this record was returned).
         // In that case, are counted the changes from
         //  the date of the most recent change from this last draft record
-        if (newestDate == 0L)
+        if (newestDate == 0)
         {
           newestDate = changeNumberForLastSeqnum.getTime();
         }
@@ -1900,7 +1891,6 @@
   }
 
 
-
   private String normalizeServerURL(final String url)
   {
     final int separator = url.lastIndexOf(':');
@@ -1909,16 +1899,12 @@
     try
     {
       final InetAddress inetAddress = InetAddress.getByName(hostname);
-
       if (isLocalAddress(inetAddress))
       {
-        // It doesn't matter whether we use an IP or hostname here.
+        // It does not matter whether we use an IP or hostname here.
         return InetAddress.getLocalHost().getHostAddress() + ":" + portString;
       }
-      else
-      {
-        return inetAddress.getHostAddress() + ":" + portString;
-      }
+      return inetAddress.getHostAddress() + ":" + portString;
     }
     catch (UnknownHostException e)
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
index f5382d7..4883047 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -70,14 +70,14 @@
    *
    * @return Returns the first draftCN in the DB.
    */
-  int getFirstKey();
+  int getFirstDraftCN();
 
   /**
    * Get the lastChange.
    *
    * @return Returns the last draftCN in the DB
    */
-  int getLastKey();
+  int getLastDraftCN();
 
   /**
    * Add an update to the list of messages that must be saved to the db managed
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
index 6aac0d9..10e0b71 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -77,8 +77,16 @@
   private static int NO_KEY = 0;
 
   private DraftCNDB db;
-  private int firstkey = NO_KEY;
-  private int lastkey = NO_KEY;
+  /**
+   * FIXME Is this field that useful? {@link #getFirstDraftCN()} does not even
+   * use it!
+   */
+  private int firstDraftCN = NO_KEY;
+  /**
+   * FIXME Is this field that useful? {@link #getLastDraftCN()} does not even
+   * use it! It is not even updated.
+   */
+  private int lastDraftCN = NO_KEY;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
   private boolean shutdown = false;
   private boolean trimDone = false;
@@ -117,8 +125,8 @@
 
     // DB initialization
     db = new DraftCNDB(dbenv);
-    firstkey = db.readFirstDraftCN();
-    lastkey = db.readLastDraftCN();
+    firstDraftCN = db.readFirstDraftCN();
+    lastDraftCN = db.readLastDraftCN();
 
     // Trimming thread
     thread = new DirectoryThread(this, "Replication DraftCN db");
@@ -147,14 +155,14 @@
 
   /** {@inheritDoc} */
   @Override
-  public int getFirstKey()
+  public int getFirstDraftCN()
   {
     return db.readFirstDraftCN();
   }
 
   /** {@inheritDoc} */
   @Override
-  public int getLastKey()
+  public int getLastDraftCN()
   {
     return db.readLastDraftCN();
   }
@@ -309,11 +317,18 @@
       return;
     }
 
+    // FIXME is this correct?
+    // This code is not setting the excludedBaseDNs of the RS which means it
+    // could take any value set by one of the other methods!
+    // In addition, this code is not thread safe, but I suspect it is used in a
+    // multi-threaded way.
+    // The call to RS.getEligibleCN() is not reliable in any way and could
+    // return very different values even if the DB content did not change!!
     ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
 
     for (int i = 0; i < 100; i++)
     {
-      DraftCNDBCursor cursor = db.openDeleteCursor();
+      final DraftCNDBCursor cursor = db.openDeleteCursor();
       try
       {
         for (int j = 0; j < 50; j++)
@@ -326,16 +341,15 @@
           }
 
           // From the draftCNDb change record, get the domain and changeNumber
-          ChangeNumber cn = cursor.currentChangeNumber();
-          String baseDN = cursor.currentBaseDN();
-          if ((baseDNToClear != null)
-              && (baseDNToClear.equalsIgnoreCase(baseDN)))
+          final ChangeNumber cn = cursor.currentChangeNumber();
+          final String baseDN = cursor.currentBaseDN();
+          if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN))
           {
             cursor.delete();
             continue;
           }
 
-          ReplicationServerDomain domain = replicationServer
+          final ReplicationServerDomain domain = replicationServer
               .getReplicationServerDomain(baseDN, false);
           if (domain == null)
           {
@@ -346,14 +360,14 @@
             continue;
           }
 
-          ServerState startState = domain.getStartState();
+          final ServerState startState = domain.getStartState();
 
           // We don't use the returned endState but it's updating CN as reading
           domain.getEligibleState(crossDomainEligibleCN);
 
-          ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
+          final ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
 
-          int currentKey = cursor.currentKey();
+          final int currentDraftCN = cursor.currentKey();
 
           if (cn.older(fcn))
           {
@@ -391,7 +405,7 @@
             continue;
           }
 
-          firstkey = currentKey;
+          firstDraftCN = currentDraftCN;
           cursor.close();
           return;
         }
@@ -465,7 +479,7 @@
   @Override
   public String toString()
   {
-    return "draftCNdb:" + " " + firstkey + " " + lastkey;
+    return "draftCNdb:" + " " + firstDraftCN + " " + lastDraftCN;
   }
 
   /**
@@ -482,8 +496,8 @@
   public void clear() throws ChangelogException
   {
     db.clear();
-    firstkey = db.readFirstDraftCN();
-    lastkey = db.readLastDraftCN();
+    firstDraftCN = db.readFirstDraftCN();
+    lastDraftCN = db.readLastDraftCN();
   }
 
   private ReentrantLock lock = new ReentrantLock();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
index 73e3696..8d5450c 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -111,11 +111,11 @@
       handler.add(sn3, value3, baseDN3, changeNumber3);
 
       // The ChangeNumber should not get purged
-      int firstkey = handler.getFirstKey();
-      assertEquals(firstkey, sn1);
-      assertEquals(handler.getLastKey(), sn3);
+      final int firstDraftCN = handler.getFirstDraftCN();
+      assertEquals(firstDraftCN, sn1);
+      assertEquals(handler.getLastDraftCN(), sn3);
 
-      DraftCNDBCursor dbc = handler.getReadCursor(firstkey);
+      DraftCNDBCursor dbc = handler.getReadCursor(firstDraftCN);
       try
       {
         assertEquals(dbc.currentChangeNumber(), changeNumber1);
@@ -149,8 +149,8 @@
       {
         Thread.sleep(200);
       }
-      assertEquals(handler.getFirstKey(), 0);
-      assertEquals(handler.getLastKey(), 0);
+      assertEquals(handler.getFirstDraftCN(), 0);
+      assertEquals(handler.getLastDraftCN(), 0);
 
 
     } finally
@@ -239,8 +239,8 @@
       Thread.sleep(500);
 
       // Checks
-      assertEquals(handler.getFirstKey(), sn1);
-      assertEquals(handler.getLastKey(), sn3);
+      assertEquals(handler.getFirstDraftCN(), sn1);
+      assertEquals(handler.getLastDraftCN(), sn3);
 
       assertEquals(handler.count(), 3, "Db count");
 
@@ -260,8 +260,8 @@
       handler.clear();
 
       // Check the db is cleared.
-      assertEquals(handler.getFirstKey(), 0);
-      assertEquals(handler.getLastKey(), 0);
+      assertEquals(handler.getFirstDraftCN(), 0);
+      assertEquals(handler.getLastDraftCN(), 0);
       assertEquals(handler.count(), 0);
     } finally
     {

--
Gitblit v1.10.0