mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
13.06.2013 d24edab25bfe93a3e3ea4eb38fb9860cef7aa034
OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB 

CompositeDBCursor.java: ADDED
Extracted and generalized from JEChangelogDB.CrossReplicaDBCursor.
Added exhaustedCursors field.
In next(), try to reuse exhausted cursors.
1 files added
1 files modified
272 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java 144 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 128 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
New file
@@ -0,0 +1,144 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.util.StaticUtils;
/**
 * {@link DBCursor} implementation that iterates across a Collection of
 * {@link DBCursor}s, advancing from the oldest to the newest change cross all
 * cursors.
 */
final class CompositeDBCursor implements DBCursor<UpdateMsg>
{
  private UpdateMsg currentChange;
  private final List<DBCursor<UpdateMsg>> exhaustedCursors =
      new ArrayList<DBCursor<UpdateMsg>>();
  /**
   * The cursors are sorted based on the current change of each cursor to
   * consider the next change across all available cursors.
   */
  private final NavigableSet<DBCursor<UpdateMsg>> cursors =
      new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
      {
        @Override
        public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
        {
          final CSN csn1 = o1.getRecord().getCSN();
          final CSN csn2 = o2.getRecord().getCSN();
          return CSN.compare(csn1, csn2);
        }
      });
  /**
   * Builds a CompositeDBCursor using the provided collection of cursors.
   *
   * @param cursors
   *          the cursors that will be iterated upon.
   */
  public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
  {
    for (DBCursor<UpdateMsg> cursor : cursors)
    {
      add(cursor);
    }
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
    // try to recycle empty cursors in case the underlying ReplicaDBs received
    // new changes
    for (Iterator<DBCursor<UpdateMsg>> iter = exhaustedCursors.iterator(); iter
        .hasNext();)
    {
      DBCursor<UpdateMsg> cursor = iter.next();
      iter.remove();
      cursor.next();
      add(cursor);
    }
    if (cursors.isEmpty())
    {
      // no cursors are left with changes.
      currentChange = null;
      return false;
    }
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove and eventually add again a cursor (after moving it forward).
    final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
    currentChange = cursor.getRecord();
    cursor.next();
    add(cursor);
    return true;
  }
  private void add(DBCursor<UpdateMsg> cursor)
  {
    if (cursor.getRecord() != null)
    {
      this.cursors.add(cursor);
    }
    else
    {
      this.exhaustedCursors.add(cursor);
    }
  }
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getRecord()
  {
    return currentChange;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    StaticUtils.close(cursors);
    StaticUtils.close(exhaustedCursors);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentChange=" + currentChange
        + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -61,110 +61,6 @@
  protected static final DebugTracer TRACER = getTracer();
  /**
   * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
   * a replication domain, advancing from the oldest to the newest change cross
   * all replicaDBs.
   */
  private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
  {
    private final DN baseDN;
    private UpdateMsg currentChange;
    /**
     * The cursors are sorted based on the current change of each cursor to
     * consider the next change across all replicaDBs.
     */
    private final NavigableSet<DBCursor<UpdateMsg>> cursors =
        new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
        {
          @Override
          public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
          {
            final CSN csn1 = o1.getRecord().getCSN();
            final CSN csn2 = o2.getRecord().getCSN();
            return CSN.compare(csn1, csn2);
          }
        });
    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
        throws ChangelogException
    {
      this.baseDN = baseDN;
      for (int serverId : getDomainMap(baseDN).keySet())
      {
        // get the last already sent CSN from that server to get a cursor
        final CSN lastCSN = startAfterServerState.getCSN(serverId);
        addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
      }
    }
    private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
        CSN startAfterCSN) throws ChangelogException
    {
      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
      if (replicaDB != null)
      {
        DBCursor<UpdateMsg> cursor =
            replicaDB.generateCursorFrom(startAfterCSN);
        cursor.next();
        return cursor;
      }
      return EMPTY_CURSOR;
    }
    @Override
    public boolean next() throws ChangelogException
    {
      if (cursors.isEmpty())
      {
        currentChange = null;
        return false;
      }
      // To keep consistent the cursors' order in the SortedSet, it is necessary
      // to remove and eventually add again a cursor (after moving it forward).
      final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
      currentChange = cursor.getRecord();
      cursor.next();
      addCursorIfNotEmpty(cursor);
      return true;
    }
    void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
    {
      if (cursor.getRecord() != null)
      {
        cursors.add(cursor);
      }
      else
      {
        StaticUtils.close(cursor);
      }
    }
    @Override
    public UpdateMsg getRecord()
    {
      return currentChange;
    }
    @Override
    public void close()
    {
      StaticUtils.close(cursors);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return getClass().getSimpleName() + " baseDN=" + baseDN
          + " currentChange=" + currentChange + " open cursors=" + cursors;
    }
  }
  /**
   * This map contains the List of updates received from each LDAP server.
   * <p>
   * When removing a domainMap, code:
@@ -790,7 +686,29 @@
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
      ServerState startAfterServerState) throws ChangelogException
  {
    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final List<DBCursor<UpdateMsg>> cursors =
        new ArrayList<DBCursor<UpdateMsg>>(serverIds.size());
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState.getCSN(serverId);
      cursors.add(getCursorFrom(baseDN, serverId, lastCSN));
    }
    return new CompositeDBCursor(cursors);
  }
  private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN) throws ChangelogException
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
      cursor.next();
      return cursor;
    }
    return EMPTY_CURSOR;
  }
  private ServerState buildServerState(DN baseDN, CSN startAfterCSN)