mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
13.32.2013 5e495b3c867de5b83117834386859da67cbbedc5
OPENDJ-1116 Introduce abstraction for the changelog DB

Extracted interface org.opends.server.replication.server.changelog.api.ReplicationIterator from org.opends.server.replication.server.ReplicationIterator and used it in all the places where the class was used.
Renamed ReplicationIterator class to JEReplicationIterator.
Moved ReplicationIteratorComparator from package org.opends.server.replication.server to org.opends.server.replication.server.changelog.api.

MessageHandler.java:
Minimal code cleanup.
1 files added
2 files renamed
5 files modified
127 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java 28 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java 60 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java 4 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -42,6 +42,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -276,7 +277,7 @@
    {
      flush();
    }
    return new ReplicationIterator(db, changeNumber, this);
    return new JEReplicationIterator(db, changeNumber, this);
  }
  /**
opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
File was renamed from opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -32,12 +32,12 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
/**
 * This class allows to iterate through the changes received from a given
 * LDAP Server Identifier.
 * Berkeley DB JE implementation of IReplicationIterator.
 */
public class ReplicationIterator
public class JEReplicationIterator implements ReplicationIterator
{
  private UpdateMsg currentChange = null;
  private ReplServerDBCursor cursor = null;
@@ -55,7 +55,7 @@
   * @param dbHandler The associated DbHandler.
   * @throws ChangelogException if a database problem happened.
   */
  public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
  public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
      DbHandler dbHandler) throws ChangelogException
  {
    this.db = db;
@@ -86,20 +86,15 @@
    }
  }
  /**
   * Get the UpdateMsg where the iterator is currently set.
   * @return The UpdateMsg where the iterator is currently set.
   */
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getChange()
  {
    return currentChange;
  }
  /**
   * Go to the next change in the ReplicationDB or in the server Queue.
   * @return false if the iterator is already on the last change before
   *         this call.
   */
  /** {@inheritDoc} */
  @Override
  public boolean next()
  {
    currentChange = cursor.next();
@@ -136,11 +131,8 @@
    return currentChange != null;
  }
  /**
   * Release the resources and locks used by this Iterator.
   * This method must be called when the iterator is no longer used.
   * Failure to do it could cause DB deadlock.
   */
  /** {@inheritDoc} */
  @Override
  public void releaseCursor()
  {
    synchronized (this)
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,9 +27,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
@@ -45,8 +42,12 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * This class implements a buffering/producer/consumer mechanism of
 * replication changes (UpdateMsg) used inside the replication server.
@@ -281,17 +282,15 @@
           *   load this change on the delayList
           *
           */
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
              new TreeSet<ReplicationIterator>(
                  new ReplicationIteratorComparator());
          try
          {
            /* fill the lateQueue */
            for (int serverId : replicationServerDomain.getServers())
            {
              ChangeNumber lastCsn = serverState
                  .getChangeNumber(serverId);
              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
              ReplicationIterator iterator = replicationServerDomain
                  .getChangelogIterator(serverId, lastCsn);
              if (iterator != null)
@@ -318,8 +317,7 @@
                && (lateQueue.count() < 100)
                && (lateQueue.bytesCount() < 50000))
            {
              ReplicationIterator iterator = iteratorSortedSet
                  .first();
              ReplicationIterator iterator = iteratorSortedSet.first();
              iteratorSortedSet.remove(iterator);
              lateQueue.add(iterator.getChange());
              if (iterator.next())
@@ -376,7 +374,7 @@
                {
                  msg1 = msgQueue.removeFirst();
                } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
                this.updateServerState(msg);
                updateServerState(msg);
                return msg1;
              }
            }
@@ -388,7 +386,7 @@
          {
            msg = lateQueue.removeFirst();
          }
          this.updateServerState(msg);
          updateServerState(msg);
          return msg;
        }
      }
@@ -412,7 +410,7 @@
          }
          msg = msgQueue.removeFirst();
          if (this.updateServerState(msg))
          if (updateServerState(msg))
          {
            /*
             * Only push the message if it has not yet been seen
@@ -462,10 +460,9 @@
          there. So let's take the last change not sent directly from
          the db.
          */
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
              new TreeSet<ReplicationIterator>(
                  new ReplicationIteratorComparator());
          try
          {
            // Build a list of candidates iterator (i.e. db i.e. server)
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -72,6 +72,7 @@
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.AttributeType;
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,6 +47,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.types.*;
import org.opends.server.util.TimeThread;
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java
New file
@@ -0,0 +1,60 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
import org.opends.server.replication.protocol.UpdateMsg;
/**
 * This interface allows to iterate through the changes received from a given
 * LDAP Server Identifier.
 */
public interface ReplicationIterator
{
  /**
   * Get the UpdateMsg where the iterator is currently set.
   *
   * @return The UpdateMsg where the iterator is currently set.
   */
  UpdateMsg getChange();
  /**
   * Go to the next change in the ReplicationDB or in the server Queue.
   *
   * @return false if the iterator is already on the last change before this
   *         call.
   */
  boolean next();
  /**
   * Release the resources and locks used by this Iterator. This method must be
   * called when the iterator is no longer used. Failure to do it could cause DB
   * deadlock.
   */
  void releaseCursor();
}
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
File was renamed from opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java
@@ -23,8 +23,9 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server;
package org.opends.server.replication.server.changelog.api;
import java.util.Comparator;
@@ -45,6 +46,7 @@
   * @param o2 second ReplicationIterator.
   * @return result of the comparison.
   */
  @Override
  public int compare(ReplicationIterator o1, ReplicationIterator o2)
  {
    ChangeNumber csn1 = o1.getChange().getChangeNumber();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -42,6 +42,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.testng.annotations.Test;
/**