mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
09.15.2006 7b84e53457bce1f0733afa87797afc9928568c52
opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -39,6 +39,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import org.opends.server.changelog.ProtocolSession;
import org.opends.server.changelog.SocketSession;
@@ -47,6 +49,7 @@
import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -63,43 +66,64 @@
{
  private boolean shutdown = false;
  private List<String> servers;
  private Short identifier;
  private boolean connected = false;
  private SynchronizationDomain domain;
  private final Object lock = new Object();
  private String changelogServer = "Not connected";
  private TreeSet<FakeOperation> replayOperations;
  private ProtocolSession session = null;
  private final ServerState state;
  private final DN baseDn;
  private final short serverID;
  private int maxSendDelay;
  private int maxReceiveDelay;
  private int maxSendQueue;
  private int maxReceiveQueue;
  /**
   * Creates a new Changelog Broker for a particular SynchronizationDomain.
   *
   * @param domain The SynchronizationDomain for which the borker is created.
   * @param state The ServerState that should be used by this broker
   *              when negociating the session with the changelog servers.
   * @param baseDn The base DN that should be used by this broker
   *              when negociating the session with the changelog servers.
   * @param serverID The server ID that should be used by this broker
   *              when negociating the session with the changelog servers.
   * @param maxReceiveQueue The maximum size of the receive queue to use on
   *                         the changelog server.
   * @param maxReceiveDelay The maximum replication delay to use on the
   *                        changelog server.
   * @param maxSendQueue The maximum size of the send queue to use on
   *                     the changelog server.
   * @param maxSendDelay The maximum send delay to use on the changelog server.
   */
  public ChangelogBroker(SynchronizationDomain domain)
  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay )
  {
    this.domain = domain;
    this.baseDn = baseDn;
    this.serverID = serverID;
    this.maxReceiveDelay = maxReceiveDelay;
    this.maxSendDelay = maxSendDelay;
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendQueue = maxSendQueue;
    this.state = state;
    replayOperations =
      new TreeSet<FakeOperation>(new FakeOperationComparator());
  }
  /**
   * Start the ChangelogBroker.
   *
   * @param identifier identifier of the changelog
   * @param servers list of servers used
   * @throws Exception : in case of errors
   */
  public void start(Short identifier,
                    List<String> servers)
  public void start(List<String> servers)
                    throws Exception
  {
    /*
     * Open Socket to the Changelog
     * Send the Start message
     */
    this.identifier = identifier;
    this.servers = servers;
    if (servers.size() < 1)
    {
@@ -147,7 +171,9 @@
          /*
           * Send our ServerStartMessage.
           */
          ServerStartMessage msg = domain.newServerStartMessage();
          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              state);
          session.publish(msg);
@@ -167,10 +193,10 @@
           * those changes and send them again to any changelog server.
           */
          ChangeNumber changelogMaxChangeNumber =
            startMsg.getServerState().getMaxChangeNumber(identifier);
            startMsg.getServerState().getMaxChangeNumber(serverID);
          if (changelogMaxChangeNumber == null)
            changelogMaxChangeNumber = new ChangeNumber(0, 0, identifier);
          ChangeNumber ourMaxChangeNumber =  domain.getMaxChangeNumber();
            changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
          if ((ourMaxChangeNumber == null) ||
              (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
          {
@@ -206,7 +232,7 @@
              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
              attrs.add(Historical.HISTORICALATTRIBUTENAME);
              InternalSearchOperation op = conn.processSearch(
                  new ASN1OctetString(domain.getBaseDN().toString()),
                  new ASN1OctetString(baseDn.toString()),
                  SearchScope.WHOLE_SUBTREE,
                  DereferencePolicy.NEVER_DEREF_ALIASES,
                  0, 0, false, filter,
@@ -381,8 +407,10 @@
  /**
   * Receive a message.
   * @return the received message
   * @throws SocketTimeoutException if the tiemout set by setSoTimeout
   *         has expired
   */
  public SynchronizationMessage receive()
  public SynchronizationMessage receive() throws SocketTimeoutException
  {
    while (shutdown == false)
    {
@@ -392,6 +420,11 @@
        return session.receive();
      } catch (Exception e)
      {
        if (e instanceof SocketTimeoutException)
        {
          SocketTimeoutException e1 = (SocketTimeoutException) e;
          throw e1;
        }
        if (shutdown == false)
        {
          synchronized (lock)
@@ -439,6 +472,22 @@
  }
  /**
   * Set a timeout value.
   * With this option set to a non-zero value, calls to the receive() method
   * block for only this amount of time after which a
   * java.net.SocketTimeoutException is raised.
   * The Broker is valid and useable even after such an Exception is raised.
   *
   * @param timeout the specified timeout, in milliseconds.
   * @throws SocketException if there is an error in the underlying protocol,
   *         such as a TCP error.
   */
  public void setSoTimeout(int timeout) throws SocketException
  {
    session.setSoTimeout(timeout);
  }
  /**
   * Get the name of the changelog server to which this broker is currently
   * connected.
   *