mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -51,6 +51,8 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -67,6 +69,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -143,6 +146,7 @@
  private short replicationServerId;
  private short protocolVersion;
  private long generationId=-1;
  /**
@@ -189,7 +193,7 @@
   * Then create the reader and writer thread.
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection.
   *               null if this is an incoming connection (listen).
   * @param replicationServerId The identifier of the replicationServer that
   *                            creates this server handler.
   * @param replicationServerURL The URL of the replicationServer that creates
@@ -206,22 +210,34 @@
                    int windowSize, boolean sslEncryption,
                    ReplicationServer replicationServer)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
                " starts a new LS or RS " +
                ((baseDn == null)?"incoming connection":"outgoing connection"));
    this.replicationServerId = replicationServerId;
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    long localGenerationId=-1;
    try
    {
      if (baseDn != null)
      {
        // This is an outgoing connection. Publish our start message.
        this.baseDn = baseDn;
        replicationCache = replicationServer.getReplicationCache(baseDn);
        // Get or create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(baseDn, true);
        localGenerationId = replicationCache.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState,
                                    protocolVersion, sslEncryption);
                                    protocolVersion, localGenerationId,
                                    sslEncryption);
        session.publish(msg);
      }
@@ -229,9 +245,10 @@
      ReplicationMessage msg = session.receive();
      if (msg instanceof ServerStartMessage)
      {
        // The remote server is an LDAP Server
        // The remote server is an LDAP Server.
        ServerStartMessage receivedMsg = (ServerStartMessage) msg;
        generationId = receivedMsg.getGenerationId();
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        serverId = receivedMsg.getServerId();
@@ -281,15 +298,69 @@
        serverIsLDAPserver = true;
        // This an incoming connection. Publish our start message
        replicationCache = replicationServer.getReplicationCache(this.baseDn);
        // Get or Create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(this.baseDn,
            true);
        localGenerationId = replicationCache.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        // This an incoming connection. Publish our start message
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    this.baseDn, windowSize, localServerState,
                                    protocolVersion, sslEncryption);
                                    protocolVersion, localGenerationId,
                                    sslEncryption);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negociation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
                   ", SH received START from LS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
                   " localGenerationId=" + localGenerationId +
                   " state=" + ss +
                   " and sent ReplServerStart with state=" + lss);
        }
        /*
         * If we have already a generationID set for the domain
         * then
         *   if the connecting replica has not the same
         *   then it is degraded locally and notified by an error message
         * else
         *   we set the generationID from the one received
         *   (unsaved yet on disk . will be set with the 1rst change received)
         */
        if (localGenerationId>0)
        {
          if (generationId != localGenerationId)
          {
            Message message = NOTE_BAD_GENERATION_ID.get(
                receivedMsg.getBaseDn().toNormalizedString(),
                Short.toString(receivedMsg.getServerId()),
                Long.toString(generationId),
                Long.toString(localGenerationId));
            ErrorMessage errorMsg =
              new ErrorMessage(replicationServerId, serverId, message);
            session.publish(errorMsg);
          }
        }
        else
        {
          replicationCache.setGenerationId(generationId, false);
        }
      }
      else if (msg instanceof ReplServerStartMessage)
      {
@@ -297,6 +368,7 @@
        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        generationId = receivedMsg.getGenerationId();
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
        int separator = serverURL.lastIndexOf(':');
@@ -306,7 +378,10 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          replicationCache = replicationServer.getReplicationCache(this.baseDn);
          // Get or create the ReplicationCache
          replicationCache = replicationServer.getReplicationCache(this.baseDn,
              true);
          localGenerationId = replicationCache.getGenerationId();
          ServerState serverState = replicationCache.getDbServerState();
          // The session initiator decides whether to use SSL.
@@ -317,7 +392,9 @@
            new ReplServerStartMessage(replicationServerId,
                                       replicationServerURL,
                                       this.baseDn, windowSize, serverState,
                                       protocolVersion, sslEncryption);
                                       protocolVersion,
                                       localGenerationId,
                                       sslEncryption);
          session.publish(outMsg);
        }
        else
@@ -326,6 +403,107 @@
        }
        this.serverState = receivedMsg.getServerState();
        sendWindowSize = receivedMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negociation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
                   ", SH received START from RS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
                   " localGenerationId=" + localGenerationId +
                   " state=" + ss +
                   " and sent ReplServerStart with state=" + lss);
        }
        // if the remote RS and the local RS have the same genID
        // then it's ok and nothing else to do
        if (generationId == localGenerationId)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
              getMonitorInstanceName() + " RS with serverID=" + serverId +
              " is connected with the right generation ID");
          }
        }
        else
        {
          if (localGenerationId>0)
          {
            // if the local RS is initialized
            if (generationId>0)
            {
              // if the remote RS is initialized
              if (generationId != localGenerationId)
              {
                // if the 2 RS have different generationID
                if (replicationCache.getGenerationIdSavedStatus())
                {
                  // it the present RS has received changes regarding its
                  //     gen ID and so won't change without a reset
                  // then  we are just degrading the peer.
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                  ErrorMessage errorMsg =
                    new ErrorMessage(replicationServerId, serverId, message);
                  session.publish(errorMsg);
                }
                else
                {
                  // The present RS has never received changes regarding its
                  // gen ID.
                  //
                  // Example case:
                  // - we are in RS1
                  // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
                  // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
                  // - we are in RS1 and we receive a START msg from RS2
                  // - Each RS keeps its genID / is degraded and when LS2 will
                  //   be populated from LS1 everything will becomes ok.
                  //
                  // Issue:
                  // FIXME : Would it be a good idea in some cases to just
                  //         set the gen ID received from the peer RS
                  //         specially if the peer has a non nul state and
                  //         we have a nul state ?
                  // replicationCache.setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                  ErrorMessage errorMsg =
                    new ErrorMessage(replicationServerId, serverId, message);
                  session.publish(errorMsg);
                }
              }
            }
            else
            {
              // The remote has no genId. We don't change anything for the
              // current RS.
            }
          }
          else
          {
            // The local RS is not initialized - take the one received
            replicationCache.setGenerationId(generationId, false);
          }
        }
      }
      else
      {
@@ -333,12 +511,9 @@
        return;   // we did not recognize the message, ignore it
      }
      if (!sslEncryption)
      {
        session.stopEncryption();
      }
      replicationCache = replicationServer.getReplicationCache(this.baseDn);
      // Get or create the ReplicationCache
      replicationCache = replicationServer.getReplicationCache(this.baseDn,
          true);
      boolean started;
      if (serverIsLDAPserver)
@@ -352,10 +527,11 @@
      if (started)
      {
        writer = new ServerWriter(session, serverId, this, replicationCache);
        // sendWindow MUST be created before starting the writer
        sendWindow = new Semaphore(sendWindowSize);
        reader = new ServerReader(session, serverId, this,
            replicationCache);
        writer = new ServerWriter(session, serverId, this, replicationCache);
        reader = new ServerReader(session, serverId, this, replicationCache);
        reader.start();
        writer.start();
@@ -377,6 +553,12 @@
        // the connection is not valid, close it.
        try
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
              getMonitorInstanceName() + " RS failed to start locally " +
              " the connection from serverID="+serverId);
          }
          session.close();
        } catch (IOException e1)
        {
@@ -388,7 +570,8 @@
    {
      // some problem happened, reject the connection
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(this.toString()));
      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
          this.getMonitorInstanceName()));
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      try
@@ -399,7 +582,6 @@
        // ignore
      }
    }
    sendWindow = new Semaphore(sendWindowSize);
  }
  /**
@@ -720,6 +902,21 @@
   */
  public void add(UpdateMessage update, ServerHandler sourceHandler)
  {
    /*
     * Ignore updates from a server that is degraded due to
     * its inconsistent generationId
     */
    long referenceGenerationId = replicationCache.getGenerationId();
    if ((referenceGenerationId>0) &&
        (referenceGenerationId != generationId))
    {
      logError(ERR_IGNORING_UPDATE_TO.get(
               update.getDn(),
               this.getMonitorInstanceName()));
      return;
    }
    synchronized (msgQueue)
    {
      /*
@@ -1164,7 +1361,7 @@
    if (serverIsLDAPserver)
      return "Remote LDAP Server " + str;
    else
      return "Remote Replication Server " + str;
      return "Remote Repl Server " + str;
  }
  /**
@@ -1261,7 +1458,10 @@
    attributes.add(attr);
    attributes.add(new Attribute("ssl-encryption",
                                 String.valueOf(session.isEncrypted())));
        String.valueOf(session.isEncrypted())));
    attributes.add(new Attribute("generation-id",
        String.valueOf(generationId)));
    return attributes;
  }
@@ -1385,9 +1585,10 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
                 msg + " from " + serverId);
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                 getMonitorInstanceName() +
                 " SH for remote server " + this.getMonitorInstanceName() +
                 " processes received msg=" + msg);
    replicationCache.process(msg, this);
  }
@@ -1401,6 +1602,12 @@
   public void sendInfo(ReplServerInfoMessage info)
   throws IOException
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sends message=" + info);
     session.publish(info);
   }
@@ -1412,7 +1619,13 @@
    */
   public void setReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
     remoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
   }
   /**
@@ -1458,8 +1671,10 @@
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
              getMonitorInstanceName() +
              " SH for remote server " + this.getMonitorInstanceName() +
              " sends message=" + msg);
    session.publish(msg);
  }
@@ -1492,4 +1707,48 @@
      checkWindow();
    }
  }
  /**
   * Returns the value of generationId for that handler.
   * @return The value of the generationId.
   */
  public long getGenerationId()
  {
    return generationId;
  }
  /**
   * Resets the generationId for this domain.
   */
  public void resetGenerationId()
  {
    // Notify the peer that it is now invalid regarding the generationId
    // We are now waiting a startServer message from this server with
    // a valid generationId.
    try
    {
      Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
      ErrorMessage errorMsg =
        new ErrorMessage(serverId, replicationServerId, message);
      session.publish(errorMsg);
    }
    catch (Exception e)
    {
      // FIXME Log exception when sending reset error message
    }
  }
  /**
   * Sends a message containing a generationId to a peer server.
   * The peer is expected to be a replication server.
   *
   * @param  msg         The GenerationIdMessage message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
   public void sendGenerationId(ResetGenerationId msg)
   throws IOException
   {
     session.publish(msg);
   }
}