From 60f8d8d4575206697f47c040d4272dee27251bab Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Apr 2014 13:56:22 +0000
Subject: [PATCH] OPENDJ-1430 Some changes are missing from the external changelog changeNumber

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                   |    4 
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |   40 ++++++--
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |   83 ++++++++++++++--
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java   |   40 +++----
 opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java                               |   92 +++++++++++++-----
 5 files changed, 188 insertions(+), 71 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 99870ed..3b44c84 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -373,7 +373,10 @@
         cursors.put(entry2.getValue(), entry.getKey());
       }
     }
-    final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
+
+    // CNIndexer manages the cursor itself,
+    // so do not try to recycle exhausted cursors
+    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
     result.next();
     nextChangeForInsertDBCursor = result;
   }
@@ -456,7 +459,12 @@
           }
           else
           {
-            createNewCursors();
+            final boolean createdCursors = createNewCursors();
+            final boolean recycledCursors = recycleExhaustedCursors();
+            if (createdCursors || recycledCursors)
+            {
+              resetNextChangeForInsertDBCursor();
+            }
           }
 
           final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
@@ -470,9 +478,6 @@
               }
               wait();
             }
-            // try to recycle the exhausted cursors,
-            // success/failure will be checked later
-            nextChangeForInsertDBCursor.next();
             // loop to check whether new changes have been added to the
             // ReplicaDBs
             continue;
@@ -603,7 +608,24 @@
     }
   }
 
-  private void createNewCursors() throws ChangelogException
+  private boolean recycleExhaustedCursors() throws ChangelogException
+  {
+    boolean succesfullyRecycled = false;
+    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+    {
+      for (DBCursor<UpdateMsg> cursor : map.values())
+      {
+        // try to recycle it by calling next()
+        if (cursor.getRecord() == null && cursor.next())
+        {
+          succesfullyRecycled = true;
+        }
+      }
+    }
+    return succesfullyRecycled;
+  }
+
+  private boolean createNewCursors() throws ChangelogException
   {
     if (!newCursors.isEmpty())
     {
@@ -623,11 +645,9 @@
         }
         iter.remove();
       }
-      if (newCursorAdded)
-      {
-        resetNextChangeForInsertDBCursor();
-      }
+      return newCursorAdded;
     }
+    return false;
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 46994c5..52ef4c6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.je;
 
@@ -45,8 +45,23 @@
 final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
 {
 
-  private UpdateMsg currentRecord;
-  private Data currentData;
+  private static final byte UNINITIALIZED = 0;
+  private static final byte READY = 1;
+  private static final byte CLOSED = 2;
+
+  /**
+   * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
+   * {@link #CLOSED}
+   */
+  private byte state = UNINITIALIZED;
+
+  /** Whether this composite should try to recycle exhausted cursors. */
+  private final boolean recycleExhaustedCursors;
+  /**
+   * These cursors are considered exhausted because they had no new changes the
+   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
+   * might be recycled at some point when they start returning changes again.
+   */
   private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
       new HashMap<DBCursor<UpdateMsg>, Data>();
   /**
@@ -71,9 +86,14 @@
    *
    * @param cursors
    *          the cursors that will be iterated upon.
+   * @param recycleExhaustedCursors
+   *          whether a call to {@link #next()} tries to recycle exhausted
+   *          cursors
    */
-  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
+  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
+      boolean recycleExhaustedCursors)
   {
+    this.recycleExhaustedCursors = recycleExhaustedCursors;
     for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
     {
       put(entry);
@@ -84,10 +104,16 @@
   @Override
   public boolean next() throws ChangelogException
   {
-    if (!exhaustedCursors.isEmpty())
+    if (state == CLOSED)
+    {
+      return false;
+    }
+    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
+    state = READY;
+    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
     {
       // try to recycle empty cursors in case the underlying ReplicaDBs received
-      // new changes. Copy the List to avoid ConcurrentModificationExceptions.
+      // new changes.
       final Map<DBCursor<UpdateMsg>, Data> copy =
           new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
       exhaustedCursors.clear();
@@ -96,25 +122,30 @@
         entry.getKey().next();
         put(entry);
       }
-    }
-
-    if (cursors.isEmpty())
-    {
-      // no cursors are left with changes.
-      currentRecord = null;
-      currentData = null;
-      return false;
+      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
+      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
+      {
+        // if the first cursor was previously an exhausted cursor,
+        // then we have already called next() on it.
+        // Avoid calling it again because we know new changes have been found.
+        return true;
+      }
     }
 
     // To keep consistent the cursors' order in the SortedSet, it is necessary
-    // to remove and eventually add again a cursor (after moving it forward).
-    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
-    final DBCursor<UpdateMsg> cursor = entry.getKey();
-    currentRecord = cursor.getRecord();
-    currentData = entry.getValue();
-    cursor.next();
-    put(entry);
-    return true;
+    // to remove and add again the cursor after moving it forward.
+    if (advanceNonExhaustedCursors)
+    {
+      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
+      if (firstEntry != null)
+      {
+        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
+        cursor.next();
+        put(firstEntry);
+      }
+    }
+    // no cursors are left with changes.
+    return !cursors.isEmpty();
   }
 
   private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
@@ -135,7 +166,12 @@
   @Override
   public UpdateMsg getRecord()
   {
-    return currentRecord;
+    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+    if (entry != null)
+    {
+      return entry.getKey().getRecord();
+    }
+    return null;
   }
 
   /**
@@ -145,7 +181,12 @@
    */
   public Data getData()
   {
-    return currentData;
+    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+    if (entry != null)
+    {
+      return entry.getValue();
+    }
+    return null;
   }
 
   /** {@inheritDoc} */
@@ -160,8 +201,7 @@
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " currentRecord=" + currentRecord
-        + " currentData=" + currentData + " openCursors=" + cursors
+    return getClass().getSimpleName() + " openCursors=" + cursors
         + " exhaustedCursors=" + exhaustedCursors;
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fed23b8..b1e2d63 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -728,7 +728,9 @@
           startAfterServerState.getCSN(serverId) : null;
       cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
     }
-    return new CompositeDBCursor<Void>(cursors);
+    // recycle exhausted cursors,
+    // because client code will not manage the cursors itself
+    return new CompositeDBCursor<Void>(cursors, true);
   }
 
   /** {@inheritDoc} */
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 49e1994..62fe230 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -155,7 +155,7 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void emptyDBOneInitialDS() throws Exception
+  public void emptyDBOneDS() throws Exception
   {
     addReplica(BASE_DN1, serverId1);
     startCNIndexer(BASE_DN1);
@@ -166,7 +166,7 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void nonEmptyDBOneInitialDS() throws Exception
+  public void nonEmptyDBOneDS() throws Exception
   {
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     addReplica(BASE_DN1, serverId1);
@@ -179,7 +179,7 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void emptyDBTwoInitialDSs() throws Exception
+  public void emptyDBTwoDSs() throws Exception
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
@@ -196,7 +196,7 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void emptyDBTwoInitialDSsDifferentDomains() throws Exception
+  public void emptyDBTwoDSsDifferentDomains() throws Exception
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN2, serverId2);
@@ -211,8 +211,53 @@
     assertExternalChangelogContent(msg1, msg2);
   }
 
+  /**
+   * This test tries to reproduce a very subtle implementation bug where:
+   * <ol>
+   * <li>the change number indexer has no more records to proceed, because all
+   * cursors are exhausted, so it calls wait()<li>
+   * <li>a new change Upd1 comes in for an exhausted cursor,
+   * medium consistency cannot move<li>
+   * <li>a new change Upd2 comes in for a cursor that is not already opened,
+   * medium consistency can move, so wake up the change number indexer<li>
+   * <li>on wake up, the change number indexer calls next(),
+   * advancing the CompositeDBCursor, which recycles the exhausted cursor,
+   * then calls next() on it, making it lose its change.
+   * CompositeDBCursor currentRecord == Upd1.<li>
+   * <li>on the next iteration of the loop in run(), a new cursor is created,
+   * triggering the creation of a new CompositeDBCursor => Upd1 is lost.
+   * CompositeDBCursor currentRecord == Upd2.<li>
+   * </ol>
+   */
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void nonEmptyDBTwoInitialDSs() throws Exception
+  public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
+  {
+    addReplica(BASE_DN1, serverId1);
+    startCNIndexer(BASE_DN1);
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+    publishUpdateMsg(msg1);
+    assertExternalChangelogContent(msg1);
+
+    addReplica(BASE_DN1, serverId2);
+    sendHeartbeat(BASE_DN1, serverId2, 2);
+    assertExternalChangelogContent(msg1);
+    // publish change that will not trigger a wake up of change number indexer,
+    // but will make it open a cursor on next wake up
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+    publishUpdateMsg(msg2);
+    assertExternalChangelogContent(msg1);
+    // wake up change number indexer
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+    publishUpdateMsg(msg3);
+    assertExternalChangelogContent(msg1, msg2);
+    sendHeartbeat(BASE_DN1, serverId2, 4);
+    // assert no changes have been lost
+    assertExternalChangelogContent(msg1, msg2, msg3);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void nonEmptyDBTwoDSs() throws Exception
   {
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -252,7 +297,7 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
+  public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
   {
     addReplica(ADMIN_DATA_DN, serverId1);
     addReplica(BASE_DN1, serverId2);
@@ -291,7 +336,25 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
+  public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
+  {
+    addReplica(BASE_DN1, serverId1);
+    startCNIndexer(BASE_DN1);
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+    publishUpdateMsg(msg1);
+
+    addReplica(BASE_DN1, serverId2);
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+    publishUpdateMsg(msg2);
+    assertExternalChangelogContent(msg1);
+
+    sendHeartbeat(BASE_DN1, serverId1, 3);
+    assertExternalChangelogContent(msg1, msg2);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
@@ -307,7 +370,7 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
+  public void emptyDBTwoDSsOneGoingOffline() throws Exception
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
@@ -477,7 +540,7 @@
   }
 
   @DataProvider
-  public Object[][] precedingCSNData()
+  public Object[][] precedingCSNDataProvider()
   {
     final int serverId = 42;
     final int t = 1000;
@@ -490,7 +553,7 @@
     };
   }
 
-  @Test(dataProvider = "precedingCSNData")
+  @Test(dataProvider = "precedingCSNDataProvider")
   public void getPrecedingCSN(CSN start, CSN expected)
   {
     CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index a708775..1a933e8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -21,7 +21,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.je;
 
@@ -125,34 +125,26 @@
   public void recycleTwoElementCursors() throws Exception
   {
     final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
-        of(new SequentialDBCursor(msg2, null, msg3), baseDN1),
-        of(new SequentialDBCursor(null, msg1, msg4), baseDN2));
+        of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
+        of(new SequentialDBCursor(null, msg1, msg3), baseDN2));
     assertInOrder(compCursor,
         of(msg1, baseDN2),
         of(msg2, baseDN1),
-        of(msg3, baseDN1),
-        of(msg4, baseDN2));
+        of(msg3, baseDN2),
+        of(msg4, baseDN1));
   }
 
-  @Test
-  public void recycleTwoElementCursorsTODOJNR() throws Exception
+  private UpdateMsg newUpdateMsg(final int t)
   {
-    SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3);
-    SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4);
-    cursor1.next();
-    cursor2.next();
-    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
-        of(cursor1, baseDN1),
-        of(cursor2, baseDN2));
-    assertInOrder(compCursor,
-        of(msg1, baseDN2),
-        of(msg3, baseDN1),
-        of(msg4, baseDN2));
-  }
-
-  private UpdateMsg newUpdateMsg(int t)
-  {
-    return new UpdateMsg(new CSN(t, t, t), new byte[t]);
+    return new UpdateMsg(new CSN(t, t, t), new byte[t])
+    {
+      /** {@inheritDoc} */
+      @Override
+      public String toString()
+      {
+        return "UpdateMsg(" + t + ")";
+      }
+    };
   }
 
   private CompositeDBCursor<String> newCompositeDBCursor(
@@ -164,7 +156,7 @@
     {
       cursorsMap.put(pair.getFirst(), pair.getSecond());
     }
-    return new CompositeDBCursor<String>(cursorsMap);
+    return new CompositeDBCursor<String>(cursorsMap, true);
   }
 
   private void assertInOrder(final CompositeDBCursor<String> compCursor,

--
Gitblit v1.10.0