mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.30.2014 75fec93860ffcb666d6f1d6fce4472f63089e20c
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,10 +27,16 @@
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,12 +49,33 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.HostPort;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -114,9 +141,6 @@
  private final Map<Integer, ReplicationServerHandler> connectedRSs =
    new ConcurrentHashMap<Integer, ReplicationServerHandler>();
  private final Queue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  private final ReplicationDomainDB domainDB;
  /** The ReplicationServer that created the current instance. */
  private final ReplicationServer localReplicationServer;
@@ -368,11 +392,6 @@
        addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
      }
    }
    // Push the message to the other subscribing handlers
    for (MessageHandler mHandler : otherHandlers) {
      mHandler.add(updateMsg);
    }
  }
  private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
@@ -1097,10 +1116,6 @@
        {
          unregisterServerHandler(sHandler, shutdown, true);
        }
        else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(sHandler);
        }
      }
      catch(Exception e)
      {
@@ -1117,12 +1132,6 @@
    }
  }
  private void unregisterOtherHandler(MessageHandler mHandler)
  {
    unRegisterHandler(mHandler);
    mHandler.shutdown();
  }
  private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
      boolean isDirectoryServer)
  {
@@ -1149,60 +1158,6 @@
  }
  /**
   * Stop the handler.
   * @param mHandler The handler to stop.
   */
  public void stopServer(MessageHandler mHandler)
  {
    // TODO JNR merge with stopServer(ServerHandler, boolean)
    if (debugEnabled())
    {
      debug("stopServer() on the message handler " + mHandler);
    }
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
     * ServerWriter at the same time, or from a thread that wants to shut down
     * the handler. So use a thread safe flag to know if the job must be done
     * or not (is already being processed or not).
     */
    if (!mHandler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      }
      catch (InterruptedException ex)
      {
        // We can't deal with this here, so re-interrupt thread so that it is
        // caught during subsequent IO.
        Thread.currentThread().interrupt();
        return;
      }
      try
      {
        if (otherHandlers.contains(mHandler))
        {
          unregisterOtherHandler(mHandler);
        }
      }
      catch(Exception e)
      {
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
            stackTraceToSingleLineString(e)));
      }
      finally
      {
        release();
      }
    }
  }
  /**
   * Unregister this handler from the list of handlers registered to this
   * domain.
   * @param sHandler the provided handler to unregister.
@@ -2427,39 +2382,6 @@
    return attributes;
  }
  /**
   * Register in the domain an handler that subscribes to changes.
   * @param mHandler the provided subscribing handler.
   */
  public void registerHandler(MessageHandler mHandler)
  {
    this.otherHandlers.add(mHandler);
  }
  /**
   * Unregister from the domain an handler.
   * @param mHandler the provided unsubscribing handler.
   * @return Whether this handler has been unregistered with success.
   */
  public boolean unRegisterHandler(MessageHandler mHandler)
  {
    return this.otherHandlers.remove(mHandler);
  }
  /**
   * Returns the oldest known state for the domain, made of the oldest CSN
   * stored for each serverId.
   * <p>
   * Note: Because the replication changelogDB trimming always keep one change
   * whatever its date, the CSN contained in the returned state can be very old.
   *
   * @return the start state of the domain.
   */
  public ServerState getOldestState()
  {
    return domainDB.getDomainOldestCSNs(baseDN);
  }
  private void sendTopologyMsg(String type, ServerHandler handler,
      TopologyMsg msg)
  {
@@ -2524,18 +2446,6 @@
    }
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
   * @return The latest trim date.
   */
  public long getLatestDomainTrimDate()
  {
    return domainDB.getDomainLatestTrimDate(baseDN);
  }
  /**
   * Return the monitor instance name of the ReplicationServer that created the
   * current instance.