mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.25.2014 d989abc914626b0846a31d8d2a6e59dbbb6ead2a
OPENDJ-1441 (CR-4303) Persistent searches on external changelog do not return changes for new replicas and new domains

Fixing a deadlock situation with ExternalChangeLogTest.
When removing domains, the cursors were not properly closed by ChangeNumberIndexer.run() before calling Object.wait().
Relied on the fact that DBCursors are not thread safe so if client code calls MultiDomainDBCursor.removeDomain(), then the cursor is immediately removed.
I did not see the problem before because unit tests on trunk execute by default with file based changelog db.


ChangeNumberIndexer.java:
In run(), simplified the code that removes the domain cursors.

CompositeDBCursor.java:
Removed removeNoLongerNeededCursors() and removedCursorsIterator() + extracted method removeCursor() from it.
Added another removeCursor() which also removes from the exhaustedCursors.

MultiDomainDBCursor.java
Consequence of the change to CompositeDBCursor.
Removed removeDomains field and removedCursorsIterator().
In removeDomain(), now call CompositeDBCursor.removeCursor().

DomainDBCursor.java:
Consequence of the change to CompositeDBCursor.
5 files modified
101 ■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 25 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java 45 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java 9 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java 13 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 9 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -413,26 +413,13 @@
      {
        try
        {
          if (!domainsToClear.isEmpty())
          while (!domainsToClear.isEmpty())
          {
            final DN cursorData = nextChangeForInsertDBCursor.getData();
            final boolean callNextOnCursor =
                cursorData == null || domainsToClear.contains(cursorData);
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
              nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
              // Only release the waiting thread
              // once this domain's state has been cleared.
              domainsToClear.remove(baseDNToClear);
            }
            if (callNextOnCursor)
            {
              // The next change to consume comes from a domain to be removed.
              // Call DBCursor.next() to ensure this domain is removed
              nextChangeForInsertDBCursor.next();
            }
            final DN baseDNToClear = domainsToClear.first();
            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
            // Only release the waiting thread
            // once this domain's state has been cleared.
            domainsToClear.remove(baseDNToClear);
          }
          // Do not call DBCursor.next() here
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -107,7 +107,6 @@
      addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
    }
    removeNoLongerNeededCursors();
    incorporateNewCursors();
    return !cursors.isEmpty();
  }
@@ -127,31 +126,31 @@
    }
  }
  private void removeNoLongerNeededCursors()
  /**
   * Removes the cursor matching the provided data.
   *
   * @param dataToFind
   *          the data for which the cursor must be found and removed
   */
  protected void removeCursor(final Data dataToFind)
  {
    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
    {
      final Data dataToFind = iter.next();
      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
          cursors.entrySet().iterator(); cursorIter.hasNext();)
      {
        final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
        if (dataToFind.equals(entry.getValue()))
        {
          entry.getKey().close();
          cursorIter.remove();
        }
      }
      iter.remove();
    }
    removeCursor(this.cursors, dataToFind);
    removeCursor(this.exhaustedCursors, dataToFind);
  }
  /**
   * Returns an Iterator over the data associated to cursors that must be removed.
   *
   * @return an Iterator over the data associated to cursors that must be removed.
   */
  protected abstract Iterator<Data> removedCursorsIterator();
  private void removeCursor(Map<DBCursor<UpdateMsg>, Data> cursors, Data dataToFind)
  {
    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
        cursors.entrySet().iterator(); cursorIter.hasNext();)
    {
      final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
      if (dataToFind.equals(entry.getValue()))
      {
        entry.getKey().close();
        cursorIter.remove();
      }
    }
  }
  /**
   * Adds a cursor to this composite cursor. It first calls
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,7 +24,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -118,14 +117,6 @@
  /** {@inheritDoc} */
  @Override
  @SuppressWarnings("unchecked")
  protected Iterator<Void> removedCursorsIterator()
  {
    return Collections.EMPTY_LIST.iterator(); // nothing to remove
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    super.close();
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -27,7 +27,6 @@
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -47,8 +46,6 @@
  private final ConcurrentSkipListMap<DN, ServerState> newDomains =
      new ConcurrentSkipListMap<DN, ServerState>();
  private final ConcurrentSkipListSet<DN> removeDomains =
      new ConcurrentSkipListSet<DN>();
  private final PositionStrategy positionStrategy;
@@ -108,14 +105,7 @@
   */
  public void removeDomain(DN baseDN)
  {
    removeDomains.add(baseDN);
  }
  /** {@inheritDoc} */
  @Override
  protected Iterator<DN> removedCursorsIterator()
  {
    return removeDomains.iterator();
    removeCursor(baseDN);
  }
  /** {@inheritDoc} */
@@ -125,7 +115,6 @@
    super.close();
    domainDB.unregisterCursor(this);
    newDomains.clear();
    removeDomains.clear();
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,9 +25,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -51,12 +48,6 @@
    protected void incorporateNewCursors() throws ChangelogException
    {
    }
    @Override
    protected Iterator<String> removedCursorsIterator()
    {
      return Collections.EMPTY_LIST.iterator();
    }
  }
  private UpdateMsg msg1;