mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Fabio Pistolesi
11.39.2016 3c140af6c756b30b325ce3c6ed080e8898e2b7ec
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2015 ForgeRock AS
 *      Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
@@ -32,6 +32,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -49,6 +50,7 @@
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
  private final ReentrantLock switchQueueLock;
  private AtomicBoolean shutdown = new AtomicBoolean(false);
  private static int count;
@@ -56,11 +58,13 @@
   * Constructor for the ReplayThread.
   *
   * @param updateToReplayQueue The queue of update messages we have to replay
   * @param switchQueueLock lock to ensure moving updates from one queue to another is atomic
   */
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue, ReentrantLock switchQueueLock)
  {
     super("Replica replay thread " + count++);
     this.updateToReplayQueue = updateToReplayQueue;
    super("Replica replay thread " + count++);
    this.updateToReplayQueue = updateToReplayQueue;
    this.switchQueueLock = switchQueueLock;
  }
  /**
@@ -86,19 +90,34 @@
    {
      try
      {
        UpdateToReplay updateToreplay;
        // Loop getting an updateToReplayQueue from the update message queue and
        // replaying matching changes
        while (!shutdown.get() &&
          ((updateToreplay = updateToReplayQueue.poll(1L,
          TimeUnit.SECONDS)) != null))
        if (switchQueueLock.tryLock(1L, TimeUnit.SECONDS))
        {
          // Find replication domain for that update message
          LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
          LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
          LDAPReplicationDomain domain;
          LDAPUpdateMsg updateMsg;
          try
          {
            if (shutdown.get())
            {
              break;
            }
            UpdateToReplay updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS);
            if (updateToreplay == null)
            {
              continue;
            }
            // Find replication domain for that update message and mark it as "in progress"
            updateMsg = updateToreplay.getUpdateMessage();
            domain = updateToreplay.getReplicationDomain();
            domain.markInProgress(updateMsg);
          }
          finally
          {
            switchQueueLock.unlock();
          }
          domain.replay(updateMsg, shutdown);
        }
      } catch (Exception e)
      }
      catch (Exception e)
      {
        /*
         * catch all exceptions happening so that the thread never dies even