mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.11.2014 355365ea3f95fdfec5cac95588044b91354096c8
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,11 +25,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -50,6 +45,11 @@
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * Thread responsible for inserting replicated changes into the ChangeNumber
 * Index DB (CNIndexDB for short). Only changes older than the medium
@@ -430,26 +430,13 @@
      {
        try
        {
          if (!domainsToClear.isEmpty())
          while (!domainsToClear.isEmpty())
          {
            final DN cursorData = nextChangeForInsertDBCursor.getData();
            final boolean callNextOnCursor =
                cursorData != null && domainsToClear.contains(cursorData);
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
              nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
              // Only release the waiting thread
              // once this domain's state has been cleared.
              domainsToClear.remove(baseDNToClear);
            }
            if (callNextOnCursor)
            {
              // The next change to consume comes from a domain to be removed.
              // Call DBCursor.next() to ensure this domain is removed
              nextChangeForInsertDBCursor.next();
            }
            final DN baseDNToClear = domainsToClear.first();
            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
            // Only release the waiting thread
            // once this domain's state has been cleared.
            domainsToClear.remove(baseDNToClear);
          }
          // Do not call DBCursor.next() here
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -107,7 +107,6 @@
      addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
    }
    removeNoLongerNeededCursors();
    incorporateNewCursors();
    return !cursors.isEmpty();
  }
@@ -127,31 +126,31 @@
    }
  }
  private void removeNoLongerNeededCursors()
  /**
   * Removes the cursor matching the provided data.
   *
   * @param dataToFind
   *          the data for which the cursor must be found and removed
   */
  protected void removeCursor(final Data dataToFind)
  {
    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
    {
      final Data dataToFind = iter.next();
      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
          cursors.entrySet().iterator(); cursorIter.hasNext();)
      {
        final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
        if (dataToFind.equals(entry.getValue()))
        {
          entry.getKey().close();
          cursorIter.remove();
        }
      }
      iter.remove();
    }
    removeCursor(this.cursors, dataToFind);
    removeCursor(this.exhaustedCursors, dataToFind);
  }
  /**
   * Returns an Iterator over the data associated to cursors that must be removed.
   *
   * @return an Iterator over the data associated to cursors that must be removed.
   */
  protected abstract Iterator<Data> removedCursorsIterator();
  private void removeCursor(Map<DBCursor<UpdateMsg>, Data> cursors, Data dataToFind)
  {
    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
        cursors.entrySet().iterator(); cursorIter.hasNext();)
    {
      final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
      if (dataToFind.equals(entry.getValue()))
      {
        entry.getKey().close();
        cursorIter.remove();
      }
    }
  }
  /**
   * Adds a cursor to this composite cursor. It first calls
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,7 +24,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -118,14 +117,6 @@
  /** {@inheritDoc} */
  @Override
  @SuppressWarnings("unchecked")
  protected Iterator<Void> removedCursorsIterator()
  {
    return Collections.EMPTY_LIST.iterator(); // nothing to remove
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    super.close();
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -27,7 +27,6 @@
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -47,8 +46,6 @@
  private final ConcurrentSkipListMap<DN, ServerState> newDomains =
      new ConcurrentSkipListMap<DN, ServerState>();
  private final ConcurrentSkipListSet<DN> removeDomains =
      new ConcurrentSkipListSet<DN>();
  private final PositionStrategy positionStrategy;
@@ -108,14 +105,7 @@
   */
  public void removeDomain(DN baseDN)
  {
    removeDomains.add(baseDN);
  }
  /** {@inheritDoc} */
  @Override
  protected Iterator<DN> removedCursorsIterator()
  {
    return removeDomains.iterator();
    removeCursor(baseDN);
  }
  /** {@inheritDoc} */
@@ -125,7 +115,6 @@
    super.close();
    domainDB.unregisterCursor(this);
    newDomains.clear();
    removeDomains.clear();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,9 +25,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -51,12 +48,6 @@
    protected void incorporateNewCursors() throws ChangelogException
    {
    }
    @Override
    protected Iterator<String> removedCursorsIterator()
    {
      return Collections.EMPTY_LIST.iterator();
    }
  }
  private UpdateMsg msg1;