mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.01.2013 891159050af4aa3fe47c67e3ba7d3f21299027a4
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -30,6 +30,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
@@ -43,6 +44,9 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -57,6 +61,11 @@
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
   * If this is true, then the {@link #run()} method must clear its state.
   * Otherwise the run method executes normally.
   */
  private final AtomicBoolean doClear = new AtomicBoolean();
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
@@ -101,12 +110,12 @@
      new MultiDomainServerState();
  /**
   * Composite cursor across all the replicaDBs for all the replication domains.
   * It is volatile to ensure it supports concurrent update. Each time it is
   * used more than once in a method, the method must take a local copy to
   * ensure the cursor does not get updated in the middle of the method.
   * Cursor across all the replicaDBs for all the replication domains. It is
   * positioned on the next change that needs to be inserted in the CNIndexDB.
   * <p>
   * Note: it is only accessed from the {@link #run()} method.
   */
  private volatile CompositeDBCursor<DN> crossDomainDBCursor;
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
  /**
   * New cursors for this Map must be created from the {@link #run()} method,
@@ -116,9 +125,27 @@
   */
  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
  /** This map can be updated by multiple threads. */
  private ConcurrentMap<CSN, DN> newCursors =
      new ConcurrentSkipListMap<CSN, DN>();
  /**
   * Holds the newCursors that will have to be created in the next iteration
   * inside the {@link #run()} method.
   * <p>
   * This map can be updated by multiple threads.
   */
  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
          new Comparator<Pair<DN, Integer>>()
          {
            @Override
            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
            {
              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
              if (compareBaseDN == 0)
              {
                return o1.getSecond().compareTo(o2.getSecond());
              }
              return compareBaseDN;
            }
          });
  /**
   * Builds a ChangeNumberIndexer object.
@@ -164,7 +191,8 @@
  {
    final CSN csn = updateMsg.getCSN();
    lastSeenUpdates.update(baseDN, csn);
    newCursors.put(csn, baseDN);
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    tryNotify(baseDN);
  }
@@ -210,17 +238,23 @@
    return true;
  }
  /**
   * Restores in memory data needed to build the CNIndexDB, including the medium
   * consistency point.
   */
  private void initialize() throws ChangelogException, DirectoryException
  {
    final ChangeNumberIndexRecord newestRecord =
        changelogDB.getChangeNumberIndexDB().getNewestRecord();
    if (newestRecord != null)
    {
      // restore the mediumConsistencyRUV from DB
      mediumConsistencyRUV.update(
          new MultiDomainServerState(newestRecord.getPreviousCookie()));
    }
    // initialize the cross domain DB cursor
    // initialize the DB cursor and the last seen updates
    // to ensure the medium consistency CSN can move forward
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
    for (Entry<DN, List<Integer>> entry
        : changelogState.getDomainToServerIds().entrySet())
@@ -235,12 +269,12 @@
      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
      lastSeenUpdates.update(baseDN, latestKnownState);
    }
    resetNextChangeForInsertDBCursor();
    crossDomainDBCursor = newCompositeDBCursor();
    if (newestRecord != null)
    {
      // restore the "previousCookie" state before shutdown
      final UpdateMsg record = crossDomainDBCursor.getRecord();
      final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
      if (!record.getCSN().equals(newestRecord.getCSN()))
      {
        // TODO JNR i18n safety check, should never happen
@@ -248,14 +282,14 @@
            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
      }
      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
      crossDomainDBCursor.next();
      nextChangeForInsertDBCursor.next();
    }
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
  private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException
  private void resetNextChangeForInsertDBCursor() throws ChangelogException
  {
    final Map<DBCursor<UpdateMsg>, DN> cursors =
        new HashMap<DBCursor<UpdateMsg>, DN>();
@@ -270,7 +304,7 @@
    }
    final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
    result.next();
    return result;
    nextChangeForInsertDBCursor = result;
  }
  private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn)
@@ -286,13 +320,27 @@
    if (cursor == null)
    {
      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
      cursor = domainDB.getCursorFrom(baseDN, serverId, csn);
      // use an older CSN because getCursorFrom() starts after the given CSN
      final CSN anOlderCSN = getPrecedingCSN(csn);
      cursor = domainDB.getCursorFrom(baseDN, serverId, anOlderCSN);
      map.put(serverId, cursor);
      return false;
    }
    return true;
  }
  /**
   * Returns the immediately preceding CSN.
   */
  private CSN getPrecedingCSN(CSN csn)
  {
    if (csn.getSeqnum() > 0)
    {
      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
    }
    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
  }
  /** {@inheritDoc} */
  @Override
  public void run()
@@ -305,83 +353,96 @@
       * used.
       */
      initialize();
    }
    catch (DirectoryException e)
    {
      // TODO JNR error message i18n
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      return;
      while (!isShutdownInitiated())
      {
        try
        {
          if (doClear.get())
          {
            removeAllCursors();
            // No need to use CAS here because it is only for unit tests and at
            // this point all will have been cleaned up anyway.
            doClear.set(false);
          }
          else
          {
            createNewCursors();
          }
          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
          if (msg == null)
          {
            synchronized (this)
            {
              wait();
            }
            // advance cursor, success/failure will be checked later
            nextChangeForInsertDBCursor.next();
            // loop to check whether new changes have been added to the
            // ReplicaDBs
            continue;
          }
          final CSN csn = msg.getCSN();
          final DN baseDN = nextChangeForInsertDBCursor.getData();
          // FIXME problem: what if the serverId is not part of the ServerState?
          // right now, thread will be blocked
          if (!canMoveForwardMediumConsistencyPoint(baseDN))
          {
            // the oldest record to insert is newer than the medium consistency
            // point. Let's wait for a change that can be published.
            synchronized (this)
            {
              // double check to protect against a missed call to notify()
              if (!canMoveForwardMediumConsistencyPoint(baseDN))
              {
                wait();
                // loop to check if changes older than the medium consistency
                // point have been added to the ReplicaDBs
                continue;
              }
            }
          }
          // OK, the oldest change is older than the medium consistency point
          // let's publish it to the CNIndexDB.
          // Next if statement is ugly but ensures the first change will not be
          // immediately trimmed from the CNIndexDB. Yuck!
          if (mediumConsistencyRUV.isEmpty())
          {
            mediumConsistencyRUV.replace(baseDN, new ServerState());
          }
          final String previousCookie = mediumConsistencyRUV.toString();
          final ChangeNumberIndexRecord record =
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
          changelogDB.getChangeNumberIndexDB().addRecord(record);
          moveForwardMediumConsistencyPoint(csn, baseDN);
          // advance cursor, success/failure will be checked later
          nextChangeForInsertDBCursor.next();
        }
        catch (InterruptedException ignored)
        {
          // was shutdown called? loop to figure it out.
          Thread.currentThread().interrupt();
        }
      }
      removeAllCursors();
    }
    catch (ChangelogException e)
    {
      // TODO JNR error message i18n
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      return;
      // TODO JNR error message i18n
    }
    while (!isShutdownInitiated())
    catch (DirectoryException e)
    {
      try
      {
        createNewCursors();
        final UpdateMsg msg = crossDomainDBCursor.getRecord();
        if (msg == null)
        {
          synchronized (this)
          {
            wait();
          }
          // advance cursor, success/failure will be checked later
          crossDomainDBCursor.next();
          // loop to check whether new changes have been added to the ReplicaDBs
          continue;
        }
        final CSN csn = msg.getCSN();
        final DN baseDN = crossDomainDBCursor.getData();
        // FIXME problem: what if the serverId is not part of the ServerState?
        // right now, thread will be blocked
        if (!canMoveForwardMediumConsistencyPoint(baseDN))
        {
          // the oldest record to insert is newer than the medium consistency
          // point. Let's wait for a change that can be published.
          synchronized (this)
          {
            // double check to protect against a missed call to notify()
            if (!canMoveForwardMediumConsistencyPoint(baseDN))
            {
              wait();
              // loop to check if changes older than the medium consistency
              // point have been added to the ReplicaDBs
              continue;
            }
          }
        }
        // OK, the oldest change is older than the medium consistency point
        // let's publish it to the CNIndexDB
        final String previousCookie = mediumConsistencyRUV.toString();
        final ChangeNumberIndexRecord record =
            new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
        changelogDB.getChangeNumberIndexDB().addRecord(record);
        moveForwardMediumConsistencyPoint(csn, baseDN);
        // advance cursor, success/failure will be checked later
        crossDomainDBCursor.next();
      }
      catch (ChangelogException e)
      {
        if (debugEnabled())
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        // TODO JNR error message i18n
      }
      catch (InterruptedException ignored)
      {
        // was shutdown called?
      }
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      // TODO JNR error message i18n
    }
  }
@@ -402,20 +463,32 @@
    }
  }
  private void removeAllCursors() throws ChangelogException
  {
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    {
      StaticUtils.close(map.values());
    }
    allCursors.clear();
    newCursors.clear();
    resetNextChangeForInsertDBCursor();
  }
  private void removeCursor(final DN baseDN, final CSN csn)
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
        .entrySet())
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
    {
      if (baseDN.equals(entry.getKey()))
      if (baseDN.equals(entry1.getKey()))
      {
        final Set<Integer> serverIds = entry.getValue().keySet();
        for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
            entry1.getValue().entrySet().iterator(); iter.hasNext();)
        {
          final int serverId = iter.next();
          if (csn.getServerId() == serverId)
          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
          if (csn.getServerId() == entry2.getKey())
          {
            iter.remove();
            StaticUtils.close(entry2.getValue());
            return;
          }
        }
@@ -428,12 +501,13 @@
    if (!newCursors.isEmpty())
    {
      boolean newCursorAdded = false;
      for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator();
          iter.hasNext();)
      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
          newCursors.entrySet().iterator(); iter.hasNext();)
      {
        final Entry<CSN, DN> entry = iter.next();
        final CSN csn = entry.getKey();
        if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null))
        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
        final DN baseDN = entry.getKey().getFirst();
        final CSN csn = entry.getValue();
        if (!ensureCursorExists(baseDN, csn.getServerId(), csn))
        {
          newCursorAdded = true;
        }
@@ -441,9 +515,29 @@
      }
      if (newCursorAdded)
      {
        crossDomainDBCursor = newCompositeDBCursor();
        resetNextChangeForInsertDBCursor();
      }
    }
  }
  /**
   * Asks the current thread to clear its state.
   * <p>
   * This method is only useful for unit tests.
   */
  public void clear()
  {
    doClear.set(true);
    synchronized (this)
    {
      notify();
    }
    while (doClear.get())
    {
      // wait until clear() has been done by thread
      // ensures unit tests wait that this thread's state is cleaned up
      Thread.yield();
    }
  }
}