mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.47.2014 a592fe71c4c2e29a136f9700a2981f3dcbd7e114
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,10 +27,16 @@
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -41,12 +47,31 @@
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.HostPort;
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
@@ -112,9 +137,6 @@
  private final Map<Integer, ReplicationServerHandler> connectedRSs =
    new ConcurrentHashMap<Integer, ReplicationServerHandler>();
  private final Queue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  private final ReplicationDomainDB domainDB;
  /** The ReplicationServer that created the current instance. */
  private final ReplicationServer localReplicationServer;
@@ -368,11 +390,6 @@
        addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
      }
    }
    // Push the message to the other subscribing handlers
    for (MessageHandler mHandler : otherHandlers) {
      mHandler.add(updateMsg);
    }
  }
  private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
@@ -1086,10 +1103,6 @@
        {
          unregisterServerHandler(sHandler, shutdown, true);
        }
        else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(sHandler);
        }
      }
      catch(Exception e)
      {
@@ -1105,12 +1118,6 @@
    }
  }
  private void unregisterOtherHandler(MessageHandler mHandler)
  {
    unRegisterHandler(mHandler);
    mHandler.shutdown();
  }
  private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
      boolean isDirectoryServer)
  {
@@ -1137,59 +1144,6 @@
  }
  /**
   * Stop the handler.
   * @param mHandler The handler to stop.
   */
  public void stopServer(MessageHandler mHandler)
  {
    // TODO JNR merge with stopServer(ServerHandler, boolean)
    if (logger.isTraceEnabled())
    {
      debug("stopServer() on the message handler " + mHandler);
    }
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
     * ServerWriter at the same time, or from a thread that wants to shut down
     * the handler. So use a thread safe flag to know if the job must be done
     * or not (is already being processed or not).
     */
    if (!mHandler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      }
      catch (InterruptedException ex)
      {
        // We can't deal with this here, so re-interrupt thread so that it is
        // caught during subsequent IO.
        Thread.currentThread().interrupt();
        return;
      }
      try
      {
        if (otherHandlers.contains(mHandler))
        {
          unregisterOtherHandler(mHandler);
        }
      }
      catch(Exception e)
      {
        logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
      }
      finally
      {
        release();
      }
    }
  }
  /**
   * Unregister this handler from the list of handlers registered to this
   * domain.
   * @param sHandler the provided handler to unregister.
@@ -2395,25 +2349,6 @@
  }
  /**
   * Register in the domain an handler that subscribes to changes.
   * @param mHandler the provided subscribing handler.
   */
  public void registerHandler(MessageHandler mHandler)
  {
    this.otherHandlers.add(mHandler);
  }
  /**
   * Unregister from the domain an handler.
   * @param mHandler the provided unsubscribing handler.
   * @return Whether this handler has been unregistered with success.
   */
  public boolean unRegisterHandler(MessageHandler mHandler)
  {
    return this.otherHandlers.remove(mHandler);
  }
  /**
   * Returns the oldest known state for the domain, made of the oldest CSN
   * stored for each serverId.
   * <p>
@@ -2427,8 +2362,7 @@
    return domainDB.getDomainOldestCSNs(baseDN);
  }
  private void sendTopologyMsg(String type, ServerHandler handler,
      TopologyMsg msg)
  private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg)
  {
    for (int i = 1; i <= 2; i++)
    {
@@ -2491,18 +2425,6 @@
    }
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
   * @return The latest trim date.
   */
  public long getLatestDomainTrimDate()
  {
    return domainDB.getDomainLatestTrimDate(baseDN);
  }
  /**
   * Return the monitor instance name of the ReplicationServer that created the
   * current instance.