mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.19.2014 137d1b4ba1992acdd880b61b1a03dc31f0cc1839
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -66,8 +66,6 @@
    return csn;
  }
  /**
   * Creates a message from a provided byte array.
   *
@@ -113,11 +111,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
@@ -141,4 +135,10 @@
    }
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -947,17 +947,6 @@
  }
  /**
   * Get the queueSize for this replication server.
   *
   * @return The maximum size of the queues for this Replication Server
   *
   */
  public int getQueueSize()
  {
    return this.config.getQueueSize();
  }
  /**
   * Creates the backend associated to this replication server.
   */
  private void createBackend() throws ConfigException
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -62,8 +62,6 @@
  private static int NO_KEY = 0;
  private DraftCNDB db;
  /** FIXME What is this field used for? */
  private volatile long oldestChangeNumber = NO_KEY;
  /**
   * The newest changenumber stored in the DB. It is used to avoid purging the
   * record with the newest changenumber. The newest record in the changenumber
@@ -96,14 +94,11 @@
  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
  {
    db = new DraftCNDB(dbEnv);
    final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
    final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
    oldestChangeNumber = getChangeNumber(oldestRecord);
    final long newestCN = getChangeNumber(newestRecord);
    newestChangeNumber = newestCN;
    newestChangeNumber = getChangeNumber(newestRecord);
    // initialization of the lastGeneratedChangeNumber from the DB content
    // if DB is empty => last record does not exist => default to 0
    lastGeneratedChangeNumber = new AtomicLong(newestCN);
    lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
    // Monitoring registration
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -117,7 +112,7 @@
    {
      return record.getChangeNumber();
    }
    return 0;
    return NO_KEY;
  }
  /** {@inheritDoc} */
@@ -198,19 +193,11 @@
   */
  public void shutdown()
  {
    if (shutdown.get())
    if (shutdown.compareAndSet(false, true))
    {
      return;
      db.shutdown();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
    shutdown.set(true);
    synchronized (this)
    {
      notifyAll();
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  /**
@@ -236,11 +223,6 @@
      while (!mustShutdown(shutdown) && cursor.next())
      {
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (record.getChangeNumber() != oldestChangeNumber)
        {
          oldestChangeNumber = record.getChangeNumber();
        }
        if (record.getChangeNumber() != newestChangeNumber
            && record.getCSN().isOlderThan(purgeCSN))
        {
@@ -293,14 +275,9 @@
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      boolean isOldestRecord = true;
      while (!mustShutdown(shutdown) && cursor.next())
      {
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
        {
          oldestChangeNumber = record.getChangeNumber();
        }
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
@@ -312,10 +289,6 @@
        {
          cursor.delete();
        }
        else
        {
          isOldestRecord = false;
        }
      }
    }
    catch (ChangelogException e)
@@ -398,7 +371,7 @@
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + ": " + oldestChangeNumber + " "
    return getClass().getSimpleName() + ", newestChangeNumber="
        + newestChangeNumber;
  }
@@ -411,8 +384,7 @@
  public void clear() throws ChangelogException
  {
    db.clear();
    oldestChangeNumber = getChangeNumber(db.readFirstRecord());
    newestChangeNumber = getChangeNumber(db.readLastRecord());
    newestChangeNumber = NO_KEY;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -385,11 +385,13 @@
    if (indexer != null)
    {
      indexer.initiateShutdown();
      indexer.interrupt();
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
      purger.interrupt();
    }
    try
@@ -417,6 +419,23 @@
    if (dbEnv != null)
    {
      // wait for shutdown of the threads holding cursors
      try
      {
        if (indexer != null)
        {
          indexer.join();
        }
        if (purger != null)
        {
          purger.join();
        }
      }
      catch (InterruptedException e)
      {
        // do nothing: we are already shutting down
      }
      dbEnv.shutdown();
    }
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,14 +28,10 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
@@ -52,8 +48,6 @@
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used for managing the replicationServer database for each
@@ -67,7 +61,7 @@
 * <p>
 * This class publish some monitoring information below cn=monitor.
 */
public class JEReplicaDB implements Runnable
public class JEReplicaDB
{
  /**
@@ -89,29 +83,7 @@
  }
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
   * <p>
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread
   * flushes this blocking queue.
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
   */
  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
    new LinkedBlockingQueue<UpdateMsg>();
  /**
   * Semaphore used to limit the number of bytes used in memory by the queue.
   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
   * size of msgQueue becomes larger than the available permits and will resume
   * only when the number of available permits allow it.
   */
  private final Semaphore queueSizeBytes;
  private final int queueMaxBytes;
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  private ReplicationDB db;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -122,12 +94,6 @@
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #flush()}.
   * This can happen with the thread flushing the queue, or else on shutdown.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  /**
@@ -147,15 +113,8 @@
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
            + replicationServer.getServerId()
            + ") flusher thread for Replica DS(" + serverId
            + ") for domain \"" + baseDN + "\"");
    thread.start();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -174,34 +133,14 @@
   */
  public void add(UpdateMsg updateMsg) throws ChangelogException
  {
    if (thread.isShutdownInitiated())
    if (shutdown.get())
    {
      throw new ChangelogException(
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
    final int msgSize = updateMsg.size();
    if (msgSize < queueMaxBytes)
    {
      try
      {
        queueSizeBytes.acquire(msgSize);
      }
      catch (InterruptedException e)
      {
        throw new ChangelogException(
            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
                .toString(), String.valueOf(baseDN), String.valueOf(serverId),
                stackTraceToSingleLineString(e)));
      }
    }
    else
    {
      // edge case with a very large message
      collectAllPermits();
    }
    msgQueue.add(updateMsg);
    db.addEntry(updateMsg);
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null
@@ -215,22 +154,6 @@
    }
  }
  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
  private void collectAllPermits()
  {
    int collectedPermits = queueSizeBytes.drainPermits();
    while (collectedPermits != queueMaxBytes)
    {
      // Do not use Thread.sleep() because:
      // 1) it is expected the permits will be released very soon
      // 2) we want to collect all the permits, so do not leave a chance to
      // other threads to steal them from us.
      // 3) we want to keep low latency
      Thread.yield();
      collectedPermits += queueSizeBytes.drainPermits();
    }
  }
  /**
   * Get the oldest CSN that has not been purged yet.
   *
@@ -289,80 +212,10 @@
   */
  public void shutdown()
  {
    if (thread.isShutdownInitiated())
    if (shutdown.compareAndSet(false, true))
    {
      return;
    }
    thread.initiateShutdown();
    while (msgQueue.size() != 0)
    {
      try
      {
        flush();
      }
      catch (ChangelogException e)
      {
        // We are already shutting down
        logError(e.getMessageObject());
      }
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  /**
   * Flushes the replicaDB queue from memory to stable storage.
   */
  @Override
  public void run()
  {
    thread.startWork();
    try
    {
      while (!thread.isShutdownInitiated())
      {
        try
        {
          flush();
        }
        catch (ChangelogException end)
        {
          stop(end);
          break;
        }
      }
      try
      {
        // call flush a last time before exiting to make sure that
        // no change was forgotten in the msgQueue
        flush();
      }
      catch (ChangelogException e)
      {
        stop(e);
      }
    }
    finally
    {
      thread.stopWork();
    }
  }
  private void stop(Exception e)
  {
    logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
        .get(stackTraceToSingleLineString(e)));
    thread.initiateShutdown();
    if (replicationServer != null)
    {
      replicationServer.shutdown();
      db.shutdown();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
  }
@@ -401,7 +254,7 @@
      {
        for (int j = 0; j < 50; j++)
        {
          if (thread.isShutdownInitiated())
          if (shutdown.get())
          {
            return;
          }
@@ -428,7 +281,7 @@
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        thread.initiateShutdown();
        shutdown.set(true);
        throw e;
      }
      finally
@@ -439,47 +292,6 @@
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   * <p>
   * @GuardedBy("flushLock")
   * @throws ChangelogException
   *           If a database problem happened
   */
  private void flush() throws ChangelogException
  {
    try
    {
      synchronized (flushLock)
      {
        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, check if shutdown was invoked
          return;
        }
        // Try to see if there are more changes and persist them all.
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        changes.add(change);
        msgQueue.drainTo(changes);
        int totalSize = db.addEntries(changes);
        // do not release more than queue max size permits
        // (be careful of the edge case with the very large message)
        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
      }
    }
    catch (InterruptedException e)
    {
      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
          .get(stackTraceToSingleLineString(e)));
    }
  }
  /**
   * This internal class is used to implement the Monitoring capabilities of the
   * ReplicaDB.
   */
@@ -501,9 +313,6 @@
      {
        create(attributes, "last-change", encode(limits.newestCSN));
      }
      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
      create(attributes, "queue-size-bytes",
          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
      return attributes;
    }
@@ -552,8 +361,6 @@
   */
  public void clear() throws ChangelogException
  {
    collectAllPermits();
    msgQueue.clear(); // this call should not do anything at all
    db.clear();
    csnLimits = new CSNLimits(null, null);
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -28,7 +28,6 @@
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -182,15 +181,14 @@
  }
  /**
   * add a list of changes to the underlying db.
   * add one change to the underlying db.
   *
   * @param changes
   *          The list of changes to add to the underlying db.
   * @return the total size of all the changes
   * @param change
   *          The change to add to the underlying db.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public int addEntries(List<UpdateMsg> changes) throws ChangelogException
  public void addEntry(UpdateMsg change) throws ChangelogException
  {
    dbCloseLock.readLock().lock();
    try
@@ -198,26 +196,22 @@
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        return 0;
        return;
      }
      int totalSize = 0;
      for (UpdateMsg change : changes)
      {
        final DatabaseEntry key = createReplicationKey(change.getCSN());
        final DatabaseEntry data = new ReplicationData(change);
      final DatabaseEntry key = createReplicationKey(change.getCSN());
      final DatabaseEntry data = new ReplicationData(change);
        insertCounterRecordIfNeeded(change.getCSN());
        db.put(null, key, data);
        counterCurrValue++;
        totalSize += change.size();
      }
      return totalSize;
      insertCounterRecordIfNeeded(change.getCSN());
      db.put(null, key, data);
      counterCurrValue++;
    }
    catch (DatabaseException e)
    {
      throw new ChangelogException(e);
      throw new ChangelogException(
          ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(
              change.toString(), String.valueOf(baseDN),
              String.valueOf(serverId), stackTraceToSingleLineString(e)));
    }
    finally
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -48,8 +48,7 @@
import static org.testng.Assert.*;
/**
 * Test the JEChangeNumberIndexDB class with 2 kinds of cleaning of the db : -
 * periodic trim - call to clear method()
 * Test the JEChangeNumberIndexDB class.
 */
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
@@ -71,13 +70,13 @@
   * <li>create the db</li>
   * <li>add records</li>
   * <li>read them with a cursor</li>
   * <li>set a very short trim period</li>
   * <li>wait for the db to be trimmed / here since the changes are not stored
   * <li>set a very short purge period</li>
   * <li>wait for the db to be purged / here since the changes are not stored
   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
   * </ol>
   */
  @Test
  void testPurge() throws Exception
  public void testPurge() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
@@ -172,8 +171,8 @@
   * <li>clear the db</li>
   * </ol>
   */
  @Test()
  void testClear() throws Exception
  @Test
  public void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,8 +97,6 @@
      //--
      // Iterator tests with changes persisted
      waitChangesArePersisted(replicaDB, 3);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
@@ -108,7 +106,6 @@
      //--
      // Cursor tests with changes persisted
      replicaDB.add(update4);
      waitChangesArePersisted(replicaDB, 4);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      // Test cursor from existing CSN
@@ -141,29 +138,6 @@
    }
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted) throws Exception
  {
    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted, int counterWindow) throws Exception
  {
    // one counter record is inserted every time "counterWindow"
    // records have been inserted
    int expectedNbRecords =
        nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
    int count = 0;
    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
    {
      Thread.sleep(10);
      count++;
    }
    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
  }
  static CSN[] newCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
@@ -301,7 +275,6 @@
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
      }
      waitChangesArePersisted(replicaDB, 4);
      cursor = replicaDB.generateCursorFrom(csns[0]);
      assertTrue(cursor.next());
@@ -378,7 +351,6 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      waitChangesArePersisted(replicaDB, max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -402,7 +374,6 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");