mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.11.2014 355365ea3f95fdfec5cac95588044b91354096c8
OPENDJ-1441 (CR-4303) Persistent searches on external changelog do not return changes for new replicas and new domains

Fixing a deadlock situation with ExternalChangeLogTest.
When removing domains, the cursors were not properly closed by ChangeNumberIndexer.run() before calling Object.wait().
Relied on the fact that DBCursors are not thread safe so if client code calls MultiDomainDBCursor.removeDomain(), then the cursor is immediately removed.
I did not see the problem before because unit tests on trunk execute by default with file based changelog db.


ChangeNumberIndexer.java:
In run(), simplified the code that removes the domain cursors.

CompositeDBCursor.java:
Removed removeNoLongerNeededCursors() and removedCursorsIterator() + extracted method removeCursor() from it.
Added another removeCursor() which also removes from the exhaustedCursors.

MultiDomainDBCursor.java
Consequence of the change to CompositeDBCursor.
Removed removeDomains field and removedCursorsIterator().
In removeDomain(), now call CompositeDBCursor.removeCursor().

DomainDBCursor.java:
Consequence of the change to CompositeDBCursor.
5 files modified
79 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 23 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java 25 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java 13 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,11 +25,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -50,6 +45,11 @@
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * Thread responsible for inserting replicated changes into the ChangeNumber
 * Index DB (CNIndexDB for short). Only changes older than the medium
@@ -430,11 +430,6 @@
      {
        try
        {
          if (!domainsToClear.isEmpty())
          {
            final DN cursorData = nextChangeForInsertDBCursor.getData();
            final boolean callNextOnCursor =
                cursorData != null && domainsToClear.contains(cursorData);
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
@@ -444,14 +439,6 @@
              domainsToClear.remove(baseDNToClear);
            }
            if (callNextOnCursor)
            {
              // The next change to consume comes from a domain to be removed.
              // Call DBCursor.next() to ensure this domain is removed
              nextChangeForInsertDBCursor.next();
            }
          }
          // Do not call DBCursor.next() here
          // because we might not have consumed the last record,
          // for example if we could not move the MCP forward
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -107,7 +107,6 @@
      addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
    }
    removeNoLongerNeededCursors();
    incorporateNewCursors();
    return !cursors.isEmpty();
  }
@@ -127,11 +126,20 @@
    }
  }
  private void removeNoLongerNeededCursors()
  /**
   * Removes the cursor matching the provided data.
   *
   * @param dataToFind
   *          the data for which the cursor must be found and removed
   */
  protected void removeCursor(final Data dataToFind)
  {
    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
    removeCursor(this.cursors, dataToFind);
    removeCursor(this.exhaustedCursors, dataToFind);
  }
  private void removeCursor(Map<DBCursor<UpdateMsg>, Data> cursors, Data dataToFind)
    {
      final Data dataToFind = iter.next();
      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
          cursors.entrySet().iterator(); cursorIter.hasNext();)
      {
@@ -142,16 +150,7 @@
          cursorIter.remove();
        }
      }
      iter.remove();
    }
  }
  /**
   * Returns an Iterator over the data associated to cursors that must be removed.
   *
   * @return an Iterator over the data associated to cursors that must be removed.
   */
  protected abstract Iterator<Data> removedCursorsIterator();
  /**
   * Adds a cursor to this composite cursor. It first calls
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,7 +24,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -118,14 +117,6 @@
  /** {@inheritDoc} */
  @Override
  @SuppressWarnings("unchecked")
  protected Iterator<Void> removedCursorsIterator()
  {
    return Collections.EMPTY_LIST.iterator(); // nothing to remove
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    super.close();
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -27,7 +27,6 @@
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -47,8 +46,6 @@
  private final ConcurrentSkipListMap<DN, ServerState> newDomains =
      new ConcurrentSkipListMap<DN, ServerState>();
  private final ConcurrentSkipListSet<DN> removeDomains =
      new ConcurrentSkipListSet<DN>();
  private final PositionStrategy positionStrategy;
@@ -108,14 +105,7 @@
   */
  public void removeDomain(DN baseDN)
  {
    removeDomains.add(baseDN);
  }
  /** {@inheritDoc} */
  @Override
  protected Iterator<DN> removedCursorsIterator()
  {
    return removeDomains.iterator();
    removeCursor(baseDN);
  }
  /** {@inheritDoc} */
@@ -125,7 +115,6 @@
    super.close();
    domainDB.unregisterCursor(this);
    newDomains.clear();
    removeDomains.clear();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,9 +25,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -51,12 +48,6 @@
    protected void incorporateNewCursors() throws ChangelogException
    {
    }
    @Override
    protected Iterator<String> removedCursorsIterator()
    {
      return Collections.EMPTY_LIST.iterator();
    }
  }
  private UpdateMsg msg1;