mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.25.2014 b6ccb560e9056cc9c028812f5f63ff2e80c95c87
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,15 +29,8 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.messages.Message;
@@ -47,18 +40,15 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * Thread responsible for inserting replicated changes into the ChangeNumber
@@ -84,7 +74,7 @@
  private ChangelogState changelogState;
  /*
   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
   * The following MultiDomainServerState fields must be thread safe, because
   * 1) initialization can happen while the replication server starts receiving
   * updates 2) many updates can happen concurrently.
   */
@@ -130,39 +120,7 @@
   *
   * @NonNull
   */
  @SuppressWarnings("unchecked")
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
  /**
   * New cursors for this Map must be created from the {@link #run()} method,
   * i.e. from the same thread that will make use of them. If this rule is not
   * obeyed, then a JE exception will be thrown about
   * "Non-transactional Cursors may not be used in multiple threads;".
   */
  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
  /**
   * Holds the newCursors that will have to be created in the next iteration
   * inside the {@link #run()} method.
   * <p>
   * This map can be updated by multiple threads.
   */
  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
          new Comparator<Pair<DN, Integer>>()
          {
            @Override
            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
            {
              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
              if (compareBaseDN == 0)
              {
                return o1.getSecond().compareTo(o2.getSecond());
              }
              return compareBaseDN;
            }
          });
  private MultiDomainDBCursor nextChangeForInsertDBCursor;
  /**
   * Builds a ChangeNumberIndexer object.
@@ -232,11 +190,8 @@
      return;
    }
    final CSN csn = updateMsg.getCSN();
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, csn);
    lastAliveCSNs.update(baseDN, updateMsg.getCSN());
    tryNotify(oldestCSNBefore);
  }
@@ -381,28 +336,25 @@
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (!isECLEnabledDomain(baseDN))
      if (isECLEnabledDomain(baseDN))
      {
        continue;
      }
        for (Integer serverId : entry.getValue())
        {
          /*
           * initialize with the oldest possible CSN in order for medium
           * consistency to wait for all replicas to be alive before moving
           * forward
           */
          lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
        }
      for (Integer serverId : entry.getValue())
      {
        /*
         * initialize with the oldest possible CSN in order for medium
         * consistency to wait for all replicas to be alive before moving
         * forward
         */
        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
        // start after the actual CSN when initializing from the previous cookie
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
        ensureCursorExists(baseDN, serverId, csn);
        ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
        lastAliveCSNs.update(baseDN, latestKnownState);
      }
      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
      lastAliveCSNs.update(baseDN, latestKnownState);
    }
    resetNextChangeForInsertDBCursor();
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
    {
@@ -445,68 +397,6 @@
    return new CSN(0, 0, serverId);
  }
  private void resetNextChangeForInsertDBCursor() throws ChangelogException
  {
    final Map<DBCursor<UpdateMsg>, DN> cursors =
        new HashMap<DBCursor<UpdateMsg>, DN>();
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
        : this.allCursors.entrySet())
    {
      for (Entry<Integer, DBCursor<UpdateMsg>> entry2
          : entry.getValue().entrySet())
      {
        cursors.put(entry2.getValue(), entry.getKey());
      }
    }
    // CNIndexer manages the cursor itself,
    // so do not try to recycle exhausted cursors
    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
    result.next();
    nextChangeForInsertDBCursor = result;
  }
  private boolean ensureCursorExists(DN baseDN, Integer serverId,
      CSN startAfterCSN) throws ChangelogException
  {
    Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
    if (map == null)
    {
      map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
      allCursors.put(baseDN, map);
    }
    DBCursor<UpdateMsg> cursor = map.get(serverId);
    if (cursor == null)
    {
      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
      cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
      cursor.next();
      map.put(serverId, cursor);
      return false;
    }
    return true;
  }
  /**
   * Returns the immediately preceding CSN.
   *
   * @param csn
   *          the CSN to use
   * @return the immediately preceding CSN or null if the provided CSN is null.
   */
  CSN getPrecedingCSN(CSN csn)
  {
    if (csn == null)
    {
      return null;
    }
    if (csn.getSeqnum() > 0)
    {
      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
    }
    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
  }
  /** {@inheritDoc} */
  @Override
  public void initiateShutdown()
@@ -535,28 +425,18 @@
      {
        try
        {
          if (!domainsToClear.isEmpty())
          while (!domainsToClear.isEmpty())
          {
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
              removeCursors(baseDNToClear);
              // Only release the waiting thread
              // once this domain's state has been cleared.
              domainsToClear.remove(baseDNToClear);
            }
            resetNextChangeForInsertDBCursor();
          }
          else
          {
            final boolean createdCursors = createNewCursors();
            final boolean recycledCursors = recycleExhaustedCursors();
            if (createdCursors || recycledCursors)
            {
              resetNextChangeForInsertDBCursor();
            }
            final DN baseDNToClear = domainsToClear.first();
            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
            // Only release the waiting thread
            // once this domain's state has been cleared.
            domainsToClear.remove(baseDNToClear);
          }
          // Do not call DBCursor.next() here
          // because we might not have consumed the last record,
          // for example if we could not move the MCP forward
          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
          if (msg == null)
          {
@@ -568,8 +448,13 @@
              }
              wait();
            }
            // loop to check whether new changes have been added to the
            // ReplicaDBs
            // check whether new changes have been added to the ReplicaDBs
            nextChangeForInsertDBCursor.next();
            continue;
          }
          else if (msg instanceof ReplicaOfflineMsg)
          {
            nextChangeForInsertDBCursor.next();
            continue;
          }
@@ -615,39 +500,44 @@
    }
    catch (RuntimeException e)
    {
      // Nothing can be done about it.
      // Rely on the DirectoryThread uncaught exceptions handler
      // for logging error + alert.
      // Message logged here gives corrective information to the administrator.
      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
          getClass().getSimpleName(), stackTraceToSingleLineString(e));
      TRACER.debugError(msg.toString());
      logUnexpectedException(e);
      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
      throw e;
    }
    catch (Exception e)
    {
      // Nothing can be done about it.
      // Rely on the DirectoryThread uncaught exceptions handler
      // for logging error + alert.
      // Message logged here gives corrective information to the administrator.
      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
          getClass().getSimpleName(), stackTraceToSingleLineString(e));
      TRACER.debugError(msg.toString());
      logUnexpectedException(e);
      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
      throw new RuntimeException(e);
    }
    finally
    {
      removeCursors(DN.NULL_DN);
      nextChangeForInsertDBCursor.close();
      nextChangeForInsertDBCursor = null;
    }
  }
  /**
   * Nothing can be done about it.
   * <p>
   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
   * alert.
   * <p>
   * Message logged here gives corrective information to the administrator.
   */
  private void logUnexpectedException(Exception e)
  {
    Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
        getClass().getSimpleName(), stackTraceToSingleLineString(e));
    TRACER.debugError(msg.toString());
  }
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    boolean callNextOnCursor = true;
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -660,133 +550,22 @@
      }
      else if (offlineCSN.isOlderThan(mcCSN))
      {
        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
            pair = getCursor(mcBaseDN, mcCSN.getServerId());
        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
        if (iter != null && !iter.hasNext())
        {
          /*
           * replica is not back online, Medium consistency point has gone past
           * its last offline time, and there are no more changes after the
           * offline CSN in the cursor: remove everything known about it:
           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
           * this replica from the medium consistency RUV.
           */
          iter.remove();
          StaticUtils.close(pair.getFirst());
          resetNextChangeForInsertDBCursor();
          callNextOnCursor = false;
          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
        }
        /*
         * replica is not back online, Medium consistency point has gone past
         * its last offline time, and there are no more changes after the
         * offline CSN in the cursor: remove everything known about it:
         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
         * this replica from the medium consistency RUV.
         */
        // TODO JNR how to close cursor for offline replica?
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
    }
    if (callNextOnCursor)
    {
      // advance the cursor we just read from,
      // success/failure will be checked later
      nextChangeForInsertDBCursor.next();
    }
  }
  private void removeCursors(DN baseDN)
  {
    if (nextChangeForInsertDBCursor != null)
    {
      nextChangeForInsertDBCursor.close();
      nextChangeForInsertDBCursor = null;
    }
    if (DN.NULL_DN.equals(baseDN))
    {
      // close all cursors
      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
      {
        StaticUtils.close(map.values());
      }
      allCursors.clear();
      newCursors.clear();
    }
    else
    {
      // close cursors for this DN
      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
      if (map != null)
      {
        StaticUtils.close(map.values());
      }
      for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
      {
        if (it.next().getFirst().equals(baseDN))
        {
          it.remove();
        }
      }
    }
  }
  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
      getCursor(final DN baseDN, final int serverId) throws ChangelogException
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
    {
      if (baseDN.equals(entry1.getKey()))
      {
        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
            entry1.getValue().entrySet().iterator(); iter.hasNext();)
        {
          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
          if (serverId == entry2.getKey())
          {
            return Pair.of(entry2.getValue(), iter);
          }
        }
      }
    }
    return Pair.empty();
  }
  private boolean recycleExhaustedCursors() throws ChangelogException
  {
    boolean succesfullyRecycled = false;
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    {
      for (DBCursor<UpdateMsg> cursor : map.values())
      {
        // try to recycle it by calling next()
        if (cursor.getRecord() == null && cursor.next())
        {
          succesfullyRecycled = true;
        }
      }
    }
    return succesfullyRecycled;
  }
  private boolean createNewCursors() throws ChangelogException
  {
    if (!newCursors.isEmpty())
    {
      boolean newCursorAdded = false;
      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
          newCursors.entrySet().iterator(); iter.hasNext();)
      {
        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
        final DN baseDN = entry.getKey().getFirst();
        final CSN csn = entry.getValue();
        // start after preceding CSN so the first CSN read will exactly be the
        // current one
        final CSN startFromCSN = getPrecedingCSN(csn);
        if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
        {
          newCursorAdded = true;
        }
        iter.remove();
      }
      return newCursorAdded;
    }
    return false;
    // advance the cursor we just read from,
    // success/failure will be checked later
    nextChangeForInsertDBCursor.next();
  }
  /**