mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
26.31.2007 98a5896dca14ea8ac850d94ebd46713da552601d
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,7 +50,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ChangelogStartMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMessage;
@@ -69,7 +69,7 @@
/**
 * This class defines a server handler, which handles all interaction with a
 * changelog server.
 * replication server.
 */
public class ServerHandler extends MonitorProvider
{
@@ -79,7 +79,7 @@
  private MsgQueue lateQueue = new MsgQueue();
  private final Map<ChangeNumber, AckMessageList> waitingAcks  =
          new HashMap<ChangeNumber, AckMessageList>();
  private ChangelogCache changelogCache = null;
  private ReplicationCache replicationCache = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
  private int inCount = 0;  // number of updates received from the server
@@ -111,7 +111,7 @@
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private short changelogId;
  private short replicationServerId;
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -124,8 +124,9 @@
   */
  HeartbeatThread heartbeatThread = null;
  private static final Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
  private static final Map<ChangeNumber, ReplServerAckMessageList>
   changelogsWaitingAcks =
       new HashMap<ChangeNumber, ReplServerAckMessageList>();
  /**
   * Creates a new server handler instance with the provided socket.
@@ -144,22 +145,24 @@
  /**
   * Do the exchange of start messages to know if the remote
   * server is an LDAP or changelog server and to exchange serverID.
   * server is an LDAP or replication server and to exchange serverID.
   * Then create the reader and writer thread.
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection.
   * @param changelogId The identifier of the changelog that creates this
   *                    server handler.
   * @param changelogURL The URL of the changelog that creates this
   *                    server handler.
   * @param replicationServerId The identifier of the replicationServer that
   *                            creates this server handler.
   * @param replicationServerURL The URL of the replicationServer that creates
   *                             this server handler.
   * @param windowSize the window size that this server handler must use.
   * @param changelog the Changelog that created this server handler.
   * @param replicationServer the ReplicationServer that created this server
   *                          handler.
   */
  public void start(DN baseDn, short changelogId, String changelogURL,
                    int windowSize, Changelog changelog)
  public void start(DN baseDn, short replicationServerId,
                    String replicationServerURL,
                    int windowSize, ReplicationServer replicationServer)
  {
    this.changelogId = changelogId;
    this.replicationServerId = replicationServerId;
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
@@ -168,10 +171,10 @@
      if (baseDn != null)
      {
        this.baseDn = baseDn;
        changelogCache = changelog.getChangelogCache(baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage msg =
          new ChangelogStartMessage(changelogId, changelogURL,
        replicationCache = replicationServer.getReplicationCache(baseDn);
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState);
        session.publish(msg);
@@ -225,17 +228,17 @@
        serverIsLDAPserver = true;
        changelogCache = changelog.getChangelogCache(this.baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage myStartMsg =
          new ChangelogStartMessage(changelogId, changelogURL,
        replicationCache = replicationServer.getReplicationCache(this.baseDn);
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    this.baseDn, windowSize, localServerState);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
      }
      else if (msg instanceof ChangelogStartMessage)
      else if (msg instanceof ReplServerStartMessage)
      {
        ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
        String[] splittedURL = serverURL.split(":");
@@ -244,11 +247,12 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          changelogCache = changelog.getChangelogCache(this.baseDn);
          ServerState serverState = changelogCache.getDbServerState();
          ChangelogStartMessage outMsg =
            new ChangelogStartMessage(changelogId, changelogURL,
                                      this.baseDn, windowSize, serverState);
          replicationCache = replicationServer.getReplicationCache(this.baseDn);
          ServerState serverState = replicationCache.getDbServerState();
          ReplServerStartMessage outMsg =
            new ReplServerStartMessage(replicationServerId,
                                       replicationServerURL,
                                       this.baseDn, windowSize, serverState);
          session.publish(outMsg);
        }
        else
@@ -262,21 +266,21 @@
        return;   // we did not recognize the message, ignore it
      }
      changelogCache = changelog.getChangelogCache(this.baseDn);
      replicationCache = replicationServer.getReplicationCache(this.baseDn);
      if (serverIsLDAPserver)
      {
        changelogCache.startServer(this);
        replicationCache.startServer(this);
      }
      else
      {
        changelogCache.startChangelog(this);
        replicationCache.startReplicationServer(this);
      }
      writer = new ServerWriter(session, serverId, this, changelogCache);
      writer = new ServerWriter(session, serverId, this, replicationCache);
      reader = new ServerReader(session, serverId, this,
                                             changelogCache);
                                             replicationCache);
      reader.start();
      writer.start();
@@ -486,11 +490,12 @@
  }
  /**
   * Check if the server associated to this ServerHandler is a changelog server.
   * Check if the server associated to this ServerHandler is a replication
   * server.
   * @return true if the server associated to this ServerHandler is a
   *         changelog server.
   *         replication server.
   */
  public boolean isChangelogServer()
  public boolean isReplicationServer()
  {
    return (!serverIsLDAPserver);
  }
@@ -520,7 +525,7 @@
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = changelogCache.getDbServerState();
       ServerState dbState = replicationCache.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
@@ -554,7 +559,7 @@
   * Get an approximation of the delay by looking at the age of the odest
   * message that has not been sent to this server.
   * This is an approximation because the age is calculated using the
   * clock of the servee where the changelog is currently running
   * clock of the servee where the replicationServer is currently running
   * while it should be calculated using the clock of the server
   * that originally processed the change.
   *
@@ -686,7 +691,7 @@
      saturationCount = 0;
      try
      {
        changelogCache.checkAllSaturation();
        replicationCache.checkAllSaturation();
      }
      catch (IOException e)
      {
@@ -747,16 +752,16 @@
           *   load this change on the delayList
           *
           */
          ChangelogIteratorComparator comparator =
            new ChangelogIteratorComparator();
          SortedSet<ChangelogIterator> iteratorSortedSet =
            new TreeSet<ChangelogIterator>(comparator);
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          /* fill the lateQueue */
          for (short serverId : changelogCache.getServers())
          for (short serverId : replicationCache.getServers())
          {
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ChangelogIterator iterator =
              changelogCache.getChangelogIterator(serverId, lastCsn);
            ReplicationIterator iterator =
              replicationCache.getChangelogIterator(serverId, lastCsn);
            if ((iterator != null) && (iterator.getChange() != null))
            {
              iteratorSortedSet.add(iterator);
@@ -764,7 +769,7 @@
          }
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          {
            ChangelogIterator iterator = iteratorSortedSet.first();
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
            lateQueue.add(iterator.getChange());
            if (iterator.next())
@@ -772,7 +777,7 @@
            else
              iterator.releaseCursor();
          }
          for (ChangelogIterator iterator : iteratorSortedSet)
          for (ReplicationIterator iterator : iteratorSortedSet)
          {
            iterator.releaseCursor();
          }
@@ -928,13 +933,13 @@
    }
    if (completedFlag)
    {
      changelogCache.sendAck(changeNumber, true);
      replicationCache.sendAck(changeNumber, true);
    }
  }
  /**
   * Process reception of an for an update that was received from a
   * Changelog Server.
   * ReplicationServer.
   *
   * @param message the ack message that was received.
   * @param ackingServerId The  id of the server that acked the change.
@@ -942,7 +947,7 @@
  public static void ackChangelog(AckMessage message, short ackingServerId)
  {
    ChangeNumber changeNumber = message.getChangeNumber();
    ChangelogAckMessageList ackList;
    ReplServerAckMessageList ackList;
    boolean completedFlag;
    synchronized (changelogsWaitingAcks)
    {
@@ -958,9 +963,9 @@
    }
    if (completedFlag)
    {
      ChangelogCache changelogCache = ackList.getChangelogCache();
      changelogCache.sendAck(changeNumber, false,
                             ackList.getChangelogServerId());
      ReplicationCache replicationCache = ackList.getChangelogCache();
      replicationCache.sendAck(changeNumber, false,
                             ackList.getReplicationServerId());
    }
  }
@@ -982,24 +987,26 @@
  }
  /**
   * Add an update to the list of update received from a changelog server and
   * Add an update to the list of update received from a replicationServer and
   * waiting for acks.
   *
   * @param update The update that must be added to the list.
   * @param ChangelogServerId The identifier of the changelog that sent the
   *                          update.
   * @param changelogCache The ChangelogCache from which the change was
   *                       processed and to which the ack must later be sent.
   * @param ChangelogServerId The identifier of the replicationServer that sent
   *                          the update.
   * @param replicationCache The ReplicationCache from which the change was
   *                         processed and to which the ack must later be sent.
   * @param nbWaitedAck The number of ack that must be received before
   *                    the update is fully acked.
   */
  public static void addWaitingAck(UpdateMessage update,
      short ChangelogServerId, ChangelogCache changelogCache, int nbWaitedAck)
  public static void addWaitingAck(
      UpdateMessage update,
      short ChangelogServerId, ReplicationCache replicationCache,
      int nbWaitedAck)
  {
    ChangelogAckMessageList ackList =
          new ChangelogAckMessageList(update.getChangeNumber(),
    ReplServerAckMessageList ackList =
          new ReplServerAckMessageList(update.getChangeNumber(),
                                      nbWaitedAck,
                                      ChangelogServerId, changelogCache);
                                      ChangelogServerId, replicationCache);
    synchronized(changelogsWaitingAcks)
    {
      changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
@@ -1031,7 +1038,7 @@
   * Check type of server handled.
   *
   * @return true if the handled server is an LDAP server.
   *         false if the handled server is a changelog server
   *         false if the handled server is a replicationServer
   */
  public boolean isLDAPserver()
  {
@@ -1074,7 +1081,7 @@
    if (serverIsLDAPserver)
      return "LDAP Server " + str;
    else
      return "Changelog Server " + str;
      return "Replication Server " + str;
  }
  /**
@@ -1122,7 +1129,7 @@
    if (serverIsLDAPserver)
      attributes.add(new Attribute("LDAP-Server", serverURL));
    else
      attributes.add(new Attribute("Changelog-Server", serverURL));
      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
@@ -1199,7 +1206,7 @@
      if (serverIsLDAPserver)
        localString = "Directory Server ";
      else
        localString = "Changelog Server ";
        localString = "Replication Server ";
      localString += serverId + " " + serverURL + " " + baseDn;
@@ -1233,7 +1240,7 @@
    {
      if (flowControl)
      {
        if (changelogCache.restartAfterSaturation(this))
        if (replicationCache.restartAfterSaturation(this))
        {
          flowControl = false;
        }
@@ -1277,13 +1284,15 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
      debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
        "SH(" + replicationServerId + ") receives " + msg +
            " from " + serverId, 1);
    changelogCache.process(msg, this);
    replicationCache.process(msg, this);
  }
  /**
@@ -1296,11 +1305,13 @@
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
      debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
        "SH(" + replicationServerId + ") forwards " +
             msg + " to " + serverId, 1);
    session.publish(msg);
  }