mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
10.05.2006 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed
opends/src/server/org/opends/server/changelog/ServerHandler.java
@@ -39,6 +39,7 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
@@ -58,6 +59,7 @@
import org.opends.server.synchronization.ServerState;
import org.opends.server.synchronization.SynchronizationMessage;
import org.opends.server.synchronization.UpdateMessage;
import org.opends.server.synchronization.WindowMessage;
import org.opends.server.util.TimeThread;
/**
@@ -71,7 +73,7 @@
  private MsgQueue msgQueue = new MsgQueue();
  private MsgQueue lateQueue = new MsgQueue();
  private Map<ChangeNumber, AckMessageList> waitingAcks  =
          new HashMap<ChangeNumber, AckMessageList>();;
          new HashMap<ChangeNumber, AckMessageList>();
  private ChangelogCache changelogCache = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
@@ -93,6 +95,12 @@
  private ServerWriter writer = null;
  private DN baseDn = null;
  private String serverAddressURL;
  private int rcvWindow;
  private int rcvWindowSizeHalf;
  private int maxRcvWindow;
  private ServerReader reader;
  private Semaphore sendWindow;
  private int sendWindowSize;
  private static Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -116,20 +124,29 @@
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection.
   * @param changelogId The identifier of the changelog that creates this
   *                    server handler.
   * @param changelogURL The URL of the changelog that creates this
   *                    server handler.
   * @param windowSize the window size that this server handler must use.
   * @param changelog the Changelog that created this server handler.
   */
  public void start(DN baseDn)
  public void start(DN baseDn, short changelogId, String changelogURL,
                    int windowSize, Changelog changelog)
  {
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    try
    {
      if (baseDn != null)
      {
        this.baseDn = baseDn;
        changelogCache = Changelog.getChangelogCache(baseDn);
        changelogCache = changelog.getChangelogCache(baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage msg =
          new ChangelogStartMessage(Changelog.getServerId(),
                                    Changelog.getServerURL(),
                                    baseDn, localServerState);
          new ChangelogStartMessage(changelogId, changelogURL,
                                    baseDn, windowSize, localServerState);
        session.publish(msg);
      }
@@ -175,16 +192,15 @@
          restartSendDelay = 0;
        serverIsLDAPserver = true;
        changelogCache = Changelog.getChangelogCache(this.baseDn);
        changelogCache = changelog.getChangelogCache(this.baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage myStartMsg =
          new ChangelogStartMessage(Changelog.getServerId(),
                                    Changelog.getServerURL(),
                                    this.baseDn, localServerState);
          new ChangelogStartMessage(changelogId, changelogURL,
                                    this.baseDn, windowSize, localServerState);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
      }
      else if (msg.getClass() == Class.forName(
      "org.opends.server.synchronization.ChangelogStartMessage"))
      else if (msg instanceof ChangelogStartMessage)
      {
        ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
        serverId = receivedMsg.getServerId();
@@ -195,17 +211,17 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          changelogCache = Changelog.getChangelogCache(this.baseDn);
          changelogCache = changelog.getChangelogCache(this.baseDn);
          ServerState serverState = changelogCache.getDbServerState();
          ChangelogStartMessage outMsg =
            new ChangelogStartMessage(Changelog.getServerId(),
                                      Changelog.getServerURL(),
                                      this.baseDn, serverState);
            new ChangelogStartMessage(changelogId, changelogURL,
                                      this.baseDn, windowSize, serverState);
          session.publish(outMsg);
        }
        else
          this.baseDn = baseDn;
        this.serverState = receivedMsg.getServerState();
        sendWindowSize = receivedMsg.getWindowSize();
      }
      else
      {
@@ -213,7 +229,7 @@
        return;   // we did not recognize the message, ignore it
      }
      changelogCache = Changelog.getChangelogCache(this.baseDn);
      changelogCache = changelog.getChangelogCache(this.baseDn);
      if (serverIsLDAPserver)
      {
@@ -226,7 +242,7 @@
      writer = new ServerWriter(session, serverId, this, changelogCache);
      ServerReader reader = new ServerReader(session, serverId, this,
      reader = new ServerReader(session, serverId, this,
                                             changelogCache);
      reader.start();
@@ -251,7 +267,7 @@
        // ignore
      }
    }
    sendWindow = new Semaphore(sendWindowSize);
  }
  /**
@@ -576,6 +592,30 @@
   */
  public UpdateMessage take()
  {
    boolean interrupted = true;
    UpdateMessage msg = getnextMessage();
    do {
      try
      {
        sendWindow.acquire();
        interrupted = false;
      } catch (InterruptedException e)
      {
        // loop until not interrupted
      }
    } while (interrupted);
    this.incrementOutCount();
    return msg;
  }
  /**
   * Get the next update that must be sent to the server
   * from the message queue or from the database.
   *
   * @return The next update that must be sent to the server.
   */
  private UpdateMessage getnextMessage()
  {
    UpdateMessage msg;
    do
    {
@@ -668,7 +708,6 @@
                  msg1 = msgQueue.removeFirst();
                } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
                this.updateServerState(msg);
                this.incrementOutCount();
                return msg;
              }
            }
@@ -679,7 +718,6 @@
          /* get the next change from the lateQueue */
          msg = lateQueue.removeFirst();
          this.updateServerState(msg);
          this.incrementOutCount();
          return msg;
        }
      }
@@ -707,7 +745,6 @@
             * by the other server.
             * Otherwise just loop to select the next message.
             */
            this.incrementOutCount();
            return msg;
          }
        }
@@ -927,7 +964,7 @@
  @Override
  public String getMonitorInstanceName()
  {
    String str = changelogCache.getBaseDn().toString() +
    String str = baseDn.toString() +
                 " " + serverURL + " " + String.valueOf(serverId);
    if (serverIsLDAPserver)
@@ -985,7 +1022,7 @@
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
                                 changelogCache.getBaseDn().toString()));
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("update-waiting-acks",
@@ -999,6 +1036,14 @@
                                 String.valueOf(getInAckCount())));
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    attributes.add(new Attribute("max-send-window",
                                 String.valueOf(sendWindowSize)));
    attributes.add(new Attribute("current-send-window",
                                String.valueOf(sendWindow.availablePermits())));
    attributes.add(new Attribute("max-rcv-window",
                                 String.valueOf(maxRcvWindow)));
    attributes.add(new Attribute("current-rcv-window",
                                 String.valueOf(rcvWindow)));
    long olderUpdateTime = getOlderUpdateTime();
    if (olderUpdateTime != 0)
    {
@@ -1058,4 +1103,33 @@
    return localString;
  }
  /**
   * Check the protocol window and send WindowMessage if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void checkWindow() throws IOException
  {
    rcvWindow--;
    if (rcvWindow < rcvWindowSizeHalf)
    {
      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
      session.publish(msg);
      outAckCount++;
      rcvWindow += rcvWindowSizeHalf;
    }
  }
  /**
   * Update the send window size based on the credit specified in the
   * given window message.
   *
   * @param windowMsg The Window Message containing the information
   *                  necessary for updating the window size.
   */
  public void updateWindow(WindowMessage windowMsg)
  {
    sendWindow.release(windowMsg.getNumAck());
  }
}