mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
10.05.2006 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed
opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@@ -78,6 +79,12 @@
  private int maxReceiveDelay;
  private int maxSendQueue;
  private int maxReceiveQueue;
  private Semaphore sendWindow;
  private int maxSendWindow;
  private int rcvWindow;
  private int halfRcvWindow;
  private int maxRcvWindow;
  private int timeout = 0;
  /**
   * Creates a new Changelog Broker for a particular SynchronizationDomain.
@@ -95,10 +102,11 @@
   * @param maxSendQueue The maximum size of the send queue to use on
   *                     the changelog server.
   * @param maxSendDelay The maximum send delay to use on the changelog server.
   * @param window The size of the send and receive window to use.
   */
  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay )
      int maxSendDelay, int window)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -109,6 +117,9 @@
    this.state = state;
    replayOperations =
      new TreeSet<FakeOperation>(new FakeOperationComparator());
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window/2;
  }
  /**
@@ -165,6 +176,7 @@
          InetSocketAddress ServerAddr = new InetSocketAddress(
              InetAddress.getByName(hostname), Integer.parseInt(port));
          Socket socket = new Socket();
          socket.setReceiveBufferSize(1000000);
          socket.connect(ServerAddr, 500);
          session = new SocketSession(socket);
@@ -173,7 +185,7 @@
           */
          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              state);
              halfRcvWindow*2, state);
          session.publish(msg);
@@ -182,7 +194,7 @@
           */
          session.setSoTimeout(1000);
          startMsg = (ChangelogStartMessage) session.receive();
          session.setSoTimeout(0);
          session.setSoTimeout(timeout);
          /*
           * We must not publish changes to a changelog that has not
@@ -202,6 +214,8 @@
              (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
          {
            changelogServer = ServerAddr.toString();
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
            break;
          }
@@ -254,6 +268,8 @@
              else
              {
                changelogServer = ServerAddr.toString();
                maxSendWindow = startMsg.getWindowSize();
                this.sendWindow = new Semaphore(maxSendWindow);
                connected = true;
                for (FakeOperation replayOp : replayOperations)
                {
@@ -306,6 +322,14 @@
           * changes that this server has already processed, start again
           * the loop looking for any changelog server.
           */
          try
          {
            Thread.sleep(500);
          } catch (InterruptedException e)
          {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
          checkState = false;
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
          String message = getMessage(msgID);
@@ -393,13 +417,18 @@
        {
          if (this.connected == false)
            this.reStart(failingSession);
          if (msg instanceof UpdateMessage)
            sendWindow.acquire();
          session.publish(msg);
          done = true;
        } catch (IOException e)
        {
          this.reStart(failingSession);
        }
        catch (InterruptedException e)
        {
          this.reStart(failingSession);
        }
      }
    }
  }
@@ -418,7 +447,25 @@
      ProtocolSession failingSession = session;
      try
      {
        return session.receive();
        SynchronizationMessage msg = session.receive();
        if (msg instanceof WindowMessage)
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else
        {
          if (msg instanceof UpdateMessage)
          {
            rcvWindow--;
            if (rcvWindow < halfRcvWindow)
            {
              session.publish(new WindowMessage(halfRcvWindow));
              rcvWindow += halfRcvWindow;
            }
          }
          return msg;
        }
      } catch (Exception e)
      {
        if (e instanceof SocketTimeoutException)
@@ -485,6 +532,7 @@
   */
  public void setSoTimeout(int timeout) throws SocketException
  {
    this.timeout = timeout;
    session.setSoTimeout(timeout);
  }
@@ -532,4 +580,47 @@
  {
    // TODO to be implemented
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
   */
  public int getMaxRcvWindow()
  {
    return maxRcvWindow;
  }
  /**
   * Get the current receive window size.
   *
   * @return The current receive window size.
   */
  public int getCurrentRcvWindow()
  {
    return rcvWindow;
  }
  /**
   * Get the maximum send window size.
   *
   * @return The maximum send window size.
   */
  public int getMaxSendWindow()
  {
    return maxSendWindow;
  }
  /**
   * Get the current send window size.
   *
   * @return The current send window size.
   */
  public int getCurrentSendWindow()
  {
    if (connected)
      return sendWindow.availablePermits();
    else
      return 0;
  }
}