mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
04.12.2007 e2447ca29d7539529ef05a40a26abc2f7ae35d8c
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -60,6 +61,7 @@
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
@@ -83,7 +85,6 @@
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
  private final Object lock = new Object();
  private String replicationServer = "Not connected";
  private TreeSet<FakeOperation> replayOperations;
  private ProtocolSession session = null;
@@ -120,7 +121,7 @@
  private int numLostConnections = 0;
  /**
   * When the broker cannort connect to any replication server
   * When the broker cannot connect to any replication server
   * it log an error and keeps continuing every second.
   * This boolean is set when the first failure happens and is used
   * to avoid repeating the error message for further failure to connect
@@ -129,6 +130,8 @@
   */
  private boolean connectionError = false;
  private Object connectPhaseLock = new Object();
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
   *
@@ -217,234 +220,243 @@
    boolean checkState = true;
    boolean receivedResponse = true;
    while ((!connected) && (!shutdown) && (receivedResponse))
    synchronized (connectPhaseLock)
    {
      receivedResponse = false;
      for (String server : servers)
      while ((!connected) && (!shutdown) && (receivedResponse))
      {
        int separator = server.lastIndexOf(':');
        String port = server.substring(separator + 1);
        String hostname = server.substring(0, separator);
        try
        receivedResponse = false;
        for (String server : servers)
        {
          /*
           * Open a socket connection to the next candidate.
           */
          InetSocketAddress ServerAddr = new InetSocketAddress(
              InetAddress.getByName(hostname), Integer.parseInt(port));
          Socket socket = new Socket();
          socket.setReceiveBufferSize(1000000);
          socket.setTcpNoDelay(true);
          socket.connect(ServerAddr, 500);
          session = new SocketSession(socket);
          int separator = server.lastIndexOf(':');
          String port = server.substring(separator + 1);
          String hostname = server.substring(0, separator);
          /*
           * Send our ServerStartMessage.
           */
          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              halfRcvWindow*2, heartbeatInterval, state,
              protocolVersion);
          session.publish(msg);
          /*
           * Read the ReplServerStartMessage that should come back.
           */
          session.setSoTimeout(1000);
          startMsg = (ReplServerStartMessage) session.receive();
          receivedResponse = true;
          /*
           * We have sent our own protocol version to the replication server.
           * The replication server will use the same one (or an older one
           * if it is an old replication server).
           */
          protocolVersion = ProtocolVersion.minWithCurrent(
              startMsg.getVersion());
          session.setSoTimeout(timeout);
          /*
           * We must not publish changes to a replicationServer that has not
           * seen all our previous changes because this could cause some
           * other ldap servers to miss those changes.
           * Check that the ReplicationServer has seen all our previous changes.
           * If not, try another replicationServer.
           * If no other replicationServer has seen all our changes, recover
           * those changes and send them again to any replicationServer.
           */
          ChangeNumber replServerMaxChangeNumber =
            startMsg.getServerState().getMaxChangeNumber(serverID);
          if (replServerMaxChangeNumber == null)
            replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
          if ((ourMaxChangeNumber == null) ||
              (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
          try
          {
            replicationServer = ServerAddr.toString();
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
            startHeartBeat();
            break;
          }
          else
          {
            if (checkState == true)
            /*
             * Open a socket connection to the next candidate.
             */
            InetSocketAddress ServerAddr = new InetSocketAddress(
                InetAddress.getByName(hostname), Integer.parseInt(port));
            Socket socket = new Socket();
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = new SocketSession(socket);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion);
            session.publish(msg);
            /*
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            startMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
             * We have sent our own protocol version to the replication server.
             * The replication server will use the same one (or an older one
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                startMsg.getVersion());
            session.setSoTimeout(timeout);
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
             * other ldap servers to miss those changes.
             * Check that the ReplicationServer has seen all our previous
             * changes.
             * If not, try another replicationServer.
             * If no other replicationServer has seen all our changes, recover
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              startMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            ChangeNumber ourMaxChangeNumber =
              state.getMaxChangeNumber(serverID);
            if ((ourMaxChangeNumber == null) ||
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              /* This replicationServer is missing some
               * of our changes, we are going to try another server
               * but before log a notice message
               */
              int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
              String message = getMessage(msgID, server);
              logError(ErrorLogCategory.SYNCHRONIZATION,
                       ErrorLogSeverity.NOTICE,
                       message, msgID);
              replicationServer = ServerAddr.toString();
              maxSendWindow = startMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
            }
            else
            {
              replayOperations.clear();
              /*
               * Get all the changes that have not been seen by this
               * replicationServer and update it
               */
              InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
              LDAPFilter filter = LDAPFilter.decode(
                  "("+ Historical.HISTORICALATTRIBUTENAME +
                  ">=dummy:" + replServerMaxChangeNumber + ")");
              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
              attrs.add(Historical.HISTORICALATTRIBUTENAME);
              InternalSearchOperation op = conn.processSearch(
                  new ASN1OctetString(baseDn.toString()),
                  SearchScope.WHOLE_SUBTREE,
                  DereferencePolicy.NEVER_DEREF_ALIASES,
                  0, 0, false, filter,
                  attrs, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              if (checkState == true)
              {
                /*
                 * An error happened trying to search for the updates
                 * This server therefore can't start acepting new updates.
                 * TODO : should stop the LDAP server (how to ?)
                /* This replicationServer is missing some
                 * of our changes, we are going to try another server
                 * but before log a notice message
                 */
                int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
                String message = getMessage(msgID);
                int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
                String message = getMessage(msgID, server);
                logError(ErrorLogCategory.SYNCHRONIZATION,
                         ErrorLogSeverity.FATAL_ERROR,
                         message, msgID);
                    ErrorLogSeverity.NOTICE,
                    message, msgID);
              }
              else
              {
                replicationServer = ServerAddr.toString();
                maxSendWindow = startMsg.getWindowSize();
                this.sendWindow = new Semaphore(maxSendWindow);
                connected = true;
                for (FakeOperation replayOp : replayOperations)
                replayOperations.clear();
                logError(ErrorLogCategory.SYNCHRONIZATION,
                    ErrorLogSeverity.NOTICE,
                    "going to search for changes", 1);
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and update it
                 */
                InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
                LDAPFilter filter = LDAPFilter.decode(
                    "("+ Historical.HISTORICALATTRIBUTENAME +
                    ">=dummy:" + replServerMaxChangeNumber + ")");
                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
                attrs.add(Historical.HISTORICALATTRIBUTENAME);
                InternalSearchOperation op = conn.processSearch(
                    new ASN1OctetString(baseDn.toString()),
                    SearchScope.WHOLE_SUBTREE,
                    DereferencePolicy.NEVER_DEREF_ALIASES,
                    0, 0, false, filter,
                    attrs, this);
                if (op.getResultCode() != ResultCode.SUCCESS)
                {
                  publish(replayOp.generateMessage());
                  /*
                   * An error happened trying to search for the updates
                   * This server will start acepting again new updates but
                   * some inconsistencies will stay between servers.
                   * TODO : REPAIR : log an error for the repair tool
                   * that will need to resynchronize the servers.
                   */
                  int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
                  String message = getMessage(msgID);
                  logError(ErrorLogCategory.SYNCHRONIZATION,
                      ErrorLogSeverity.FATAL_ERROR,
                      message, msgID);
                }
                startHeartBeat();
                break;
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = startMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
                    logError(ErrorLogCategory.SYNCHRONIZATION,
                        ErrorLogSeverity.NOTICE,
                        "sendingChange", 1);
                    session.publish(replayOp.generateMessage());
                  }
                  startHeartBeat();
                  logError(ErrorLogCategory.SYNCHRONIZATION,
                      ErrorLogSeverity.NOTICE,
                      "changes sent", 1);
                  break;
                }
              }
            }
          }
          catch (ConnectException e)
          {
            /*
             * There was no server waiting on this host:port
             * Log a notice and try the next replicationServer in the list
             */
            if (!connectionError )
            {
              // the error message is only logged once to avoid overflowing
              // the error log
              int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
              String message = getMessage(msgID, server);
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  message, msgID);
            }
          }
          catch (Exception e)
          {
            int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
            String message = getMessage(msgID);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.SEVERE_ERROR,
                message + stackTraceToSingleLineString(e), msgID);
          }
          finally
          {
            if (connected == false)
            {
              if (session != null)
              {
                session.close();
                session = null;
              }
            }
          }
        }
        catch (ConnectException e)
        if ((!connected) && (checkState == true) && receivedResponse)
        {
          /*
           * There was no server waiting on this host:port
           * Log a notice and try the next replicationServer in the list
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          if (!connectionError )
          {
            // the error message is only logged once to avoid overflowing
            // the error log
            int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
            String message = getMessage(msgID, server);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.NOTICE,
                message, msgID);
          }
        }
        catch (Exception e)
        {
          int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
          String message = getMessage(msgID)  + stackTraceToSingleLineString(e);
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
        }
        finally
        {
          if (connected == false)
          {
            if (session != null)
            {
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  "Broker : connect closing session" , 1);
              session.close();
              session = null;
            }
          }
              ErrorLogSeverity.NOTICE,
              message, msgID);
          checkState = false;
        }
      }
      if ((!connected) && (checkState == true) && receivedResponse)
      if (connected)
      {
        // This server has connected correctly.
        // Log a message to let the administrator know that the failure was
        // resolved.
        // wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
          sendWindow.release(Integer.MAX_VALUE);
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
        String message =
          getMessage(msgID, replicationServer, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
      else
      {
        /*
         * We could not find a replicationServer that has seen all the
         * changes that this server has already processed, start again
         * the loop looking for any replicationServer.
         * This server could not find any replicationServer
         * It's going to start in degraded mode.
         * Log a message
         */
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
        String message = getMessage(msgID);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            message, msgID);
        try
        if (!connectionError)
        {
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          checkState = false;
          connectionError = true;
          connectPhaseLock.notify();
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
          String message = getMessage(msgID, baseDn.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE, message, msgID);
        }
        checkState = false;
      }
    }
    if (connected)
    {
      // This server has connected correctly.
      // let's check if it was previosuly on error, in this case log
      // a message to let the administratot know that the failure was resolved.
      if (connectionError)
      {
        connectionError = false;
        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
        String message = getMessage(msgID, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
    }
    else
    {
      /*
       * This server could not find any replicationServer
       * It's going to start in degraded mode.
       * Log a message
       */
      if (!connectionError)
      {
        checkState = false;
        connectionError = true;
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
        String message = getMessage(msgID, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
    }
  }
@@ -530,30 +542,90 @@
  public void publish(ReplicationMessage msg)
  {
    boolean done = false;
    ProtocolSession failingSession = session;
    while (!done)
    {
      if (connectionError)
        return;
      synchronized (lock)
      {
        try
        // It was not possible to connect to any replication server.
        // Since the operation was already processed, we have no other
        // choice than to return without sending the ReplicationMessage
        // and relying on the resend procedure of the connect phase to
        // fix the problem when we finally connect.
        return;
      }
      try
      {
        boolean credit;
        ProtocolSession current_session;
        Semaphore currentWindowSemaphore;
        // save the session at the time when we acquire the
        // sendwindow credit so that we can make sure later
        // that the session did not change in between.
        // This is necessary to make sure that we don't publish a message
        // on a session with a credit that was acquired from a previous
        // session.
        synchronized (connectPhaseLock)
        {
          if (this.connected == false)
            this.reStart(failingSession);
          if (msg instanceof UpdateMessage)
            sendWindow.acquire();
          session.publish(msg);
          done = true;
        } catch (IOException e)
        {
          this.reStart(failingSession);
          current_session = session;
          currentWindowSemaphore = sendWindow;
        }
        catch (InterruptedException e)
        if (msg instanceof UpdateMessage)
        {
          this.reStart(failingSession);
          // Acquiring the window credit must be done outside of the
          // connectPhaseLock because it can be blocking and we don't
          // want to hold off reconnection in case the connection dropped.
          credit =
            currentWindowSemaphore.tryAcquire(
                (long) 500, TimeUnit.MILLISECONDS);
        }
        else
        {
          credit = true;
        }
        if (credit)
        {
          synchronized (connectPhaseLock)
          {
            // check the session. If it has changed, some
            // deconnection/reconnection happened and we need to restart from
            // scratch.
            if (session == current_session)
            {
              session.publish(msg);
              done = true;
            }
          }
        }
        if (!credit)
        {
          // the window is still closed.
          // Send a WindowProbe message to wakeup the receiver in case the
          // window update message was lost somehow...
          // then loop to check again if connection was closed.
          session.publish(new WindowProbe());
        }
      } catch (IOException e)
      {
        // The receive threads should handle reconnection or
        // mark this broker in error. Just retry.
        synchronized (connectPhaseLock)
        {
          try
          {
            connectPhaseLock.wait(100);
          } catch (InterruptedException e1)
          {
            // ignore
          }
        }
      }
      catch (InterruptedException e)
      {
        // just loop.
      }
    }
  }
@@ -561,6 +633,10 @@
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
@@ -603,10 +679,12 @@
      {
        if (shutdown == false)
        {
          synchronized (lock)
          {
            this.reStart(failingSession);
          }
          int    msgID   = MSGID_DISCONNECTED_FROM_CHANGELOG;
          String message = getMessage(msgID, replicationServer);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE,
              message + " " + e.getMessage(), msgID);
          this.reStart(failingSession);
        }
      }
    }