mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -28,6 +28,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -47,6 +49,7 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
import com.sleepycat.je.DatabaseException;
@@ -106,6 +109,15 @@
    new ConcurrentHashMap<Short, DbHandler>();
  private ReplicationServer replicationServer;
  /* GenerationId management */
  private long generationId = -1;
  private boolean generationIdSavedStatus = false;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new ReplicationCache associated to the DN baseDn.
   *
@@ -117,7 +129,13 @@
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
  }
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        " Created Cache for " + baseDn + " " +
        stackTraceToSingleLineString(new Exception()));
}
  /**
   * Add an update that has been received to the list of
@@ -138,6 +156,7 @@
     * other replication server before pushing it to the LDAP servers
     */
    short id  = update.getChangeNumber().getServerId();
    sourceHandler.updateServerState(update);
    sourceHandler.incrementInCount();
@@ -158,19 +177,21 @@
      }
    }
    // look for the dbHandler that is responsible for the master server which
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler = null;
    synchronized (sourceDbHandlers)
    {
      short id  = update.getChangeNumber().getServerId();
      dbHandler   = sourceDbHandlers.get(id);
      if (dbHandler == null)
      {
        try
        {
          dbHandler = replicationServer.newDbHandler(id, baseDn);
        } catch (DatabaseException e)
          dbHandler = replicationServer.newDbHandler(id,
              baseDn, generationId);
          generationIdSavedStatus = true;
        }
        catch (DatabaseException e)
        {
          /*
           * Because of database problem we can't save any more changes
@@ -250,6 +271,15 @@
      }
      connectedServers.put(handler.getServerId(), handler);
      // It can be that the server that connects here is the
      // first server connected for a domain.
      // In that case, we will establish the appriopriate connections
      // to the other repl servers for this domain and receive
      // their ReplServerInfo messages.
      // FIXME: Is it necessary to end this above processing BEFORE listening
      //        to incoming messages for that domain ? But the replica
      //        would raise Read Timeout for replica that connects.
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
@@ -265,17 +295,90 @@
   */
  public void stopServer(ServerHandler handler)
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In RS " + this.replicationServer.getMonitorInstanceName() +
        " for " + baseDn + " " +
        " stopServer " + handler.getMonitorInstanceName());
    handler.stopHandler();
    if (handler.isReplicationServer())
    {
      replicationServers.remove(handler.getServerId());
    }
    else
    {
      connectedServers.remove(handler.getServerId());
    }
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
    mayResetGenerationId();
    // Update the remote replication servers with our list
    // of connected LDAP servers
    sendReplServerInfo();
  }
  /**
   * Resets the generationId for this domain if there is no LDAP
   * server currently connected and if the generationId has never
   * been saved.
   */
  protected void mayResetGenerationId()
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In RS " + this.replicationServer.getMonitorInstanceName() +
        " for " + baseDn + " " +
        " mayResetGenerationId generationIdSavedStatus=" +
        generationIdSavedStatus);
    // If there is no more any LDAP server connected to this domain in the
    // topology and the generationId has never been saved, then we can reset
    // it and the next LDAP server to connect will become the new reference.
    boolean lDAPServersConnectedInTheTopology = false;
    if (connectedServers.isEmpty())
    {
      for (ServerHandler rsh : replicationServers.values())
      {
        if (generationId != rsh.getGenerationId())
        {
          if (debugEnabled())
            TRACER.debugInfo(
                "In RS " + this.replicationServer.getMonitorInstanceName() +
                " for " + baseDn + " " +
                " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
                " thas different genId");
        }
        else
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          {
            lDAPServersConnectedInTheTopology = true;
            if (debugEnabled())
              TRACER.debugInfo(
                  "In RS " + this.replicationServer.getMonitorInstanceName() +
                  " for " + baseDn + " " +
                  " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
              " has servers connected to it - will not reset generationId");
          }
        }
      }
    }
    else
    {
      lDAPServersConnectedInTheTopology = true;
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + this.replicationServer.getMonitorInstanceName() +
          " for " + baseDn + " " +
          " has servers connected to it - will not reset generationId");
    }
    if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus))
    {
      setGenerationId(-1, false);
    }
  }
@@ -321,7 +424,7 @@
      // Update this server with the list of LDAP servers
      // already connected
      handler.sendInfo(
          new ReplServerInfoMessage(getConnectedLDAPservers()));
          new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
      return true;
    }
@@ -437,17 +540,20 @@
  }
  /**
   * creates a new ReplicationDB with specified identifier.
   * @param id the identifier of the new ReplicationDB.
   * @param db the new db.
   * Sets the provided DbHandler associated to the provided serverId.
   *
   * @param serverId  the serverId for the server to which is
   *                  associated the Dbhandler.
   * @param dbHandler the dbHandler associated to the serverId.
   *
   * @throws DatabaseException If a database error happened.
   */
  public void newDb(short id, DbHandler db) throws DatabaseException
  public void setDbHandler(short serverId, DbHandler dbHandler)
  throws DatabaseException
  {
    synchronized (sourceDbHandlers)
    {
      sourceDbHandlers.put(id , db);
      sourceDbHandlers.put(serverId , dbHandler);
    }
  }
@@ -557,7 +663,8 @@
  }
  /**
   * Process an InitializeRequestMessage.
   * Processes a message coming from one server in the topology
   * and potentially forwards it to one or all other servers.
   *
   * @param msg The message received and to be processed.
   * @param senderHandler The server handler of the server that emitted
@@ -565,6 +672,23 @@
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    // A replication server is not expected to be the destination
    // of a routable message except for an error message.
    if (msg.getDestination() == this.replicationServer.getServerId())
    {
      if (msg instanceof ErrorMessage)
      {
        ErrorMessage errorMsg = (ErrorMessage)msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(
                   errorMsg.getDetails()));
      }
      else
      {
        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
            msg.getClass().getCanonicalName()));
      }
      return;
    }
    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
@@ -572,9 +696,13 @@
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
      mb.append("serverID:" + msg.getDestination());
      mb.append(" unreachable server ID=" + msg.getDestination());
      mb.append(" unroutable message =" + msg);
      ErrorMessage errMsg = new ErrorMessage(
              msg.getsenderID(), mb.toMessage());
          this.replicationServer.getServerId(),
          msg.getsenderID(),
          mb.toMessage());
      try
      {
        senderHandler.send(errMsg);
@@ -583,8 +711,8 @@
      {
        // TODO Handle error properly (sender timeout in addition)
        /*
         * An error happened trying the send back an error to this server.
         * Log an error and close the connection to the sender server.
         * An error happened trying to send an error msg to this server.
         * Log an error and close the connection to this server.
         */
        MessageBuilder mb2 = new MessageBuilder();
        mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -788,7 +916,7 @@
    private void sendReplServerInfo()
    {
      ReplServerInfoMessage info =
        new ReplServerInfoMessage(getConnectedLDAPservers());
        new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
      for (ServerHandler handler : replicationServers.values())
      {
        try
@@ -811,6 +939,26 @@
    }
    /**
     * Get the generationId associated to this domain.
     *
     * @return The generationId
     */
    public long getGenerationId()
    {
      return generationId;
    }
    /**
     * Get the generationId saved status.
     *
     * @return The generationId saved status.
     */
    public boolean getGenerationIdSavedStatus()
    {
      return generationIdSavedStatus;
    }
    /**
     * Sets the replication server informations for the provided
     * handler from the provided ReplServerInfoMessage.
     *
@@ -822,4 +970,162 @@
    {
      handler.setReplServerInfo(infoMsg);
    }
    /**
     * Sets the provided value as the new in memory generationId.
     *
     * @param generationId The new value of generationId.
     * @param savedStatus  The saved status of the generationId.
     */
    synchronized public void setGenerationId(long generationId,
        boolean savedStatus)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          " RCache.set GenerationId=" + generationId);
      if (generationId == this.generationId)
        return;
      if (this.generationId>0)
      {
        for (ServerHandler handler : connectedServers.values())
        {
          handler.resetGenerationId();
        }
      }
      this.generationId = generationId;
      this.generationIdSavedStatus = savedStatus;
    }
    /**
     * Resets the generationID.
     *
     * @param senderHandler The handler associated to the server
     *        that requested to reset the generationId.
     */
    public void resetGenerationId(ServerHandler senderHandler)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          " RCache.resetGenerationId");
      // Notifies the others LDAP servers that from now on
      // they have the bad generationId
      for (ServerHandler handler : connectedServers.values())
      {
        handler.resetGenerationId();
      }
      // Propagates the reset message to the others replication servers
      // dealing with the same domain.
      if (senderHandler.isLDAPserver())
      {
        for (ServerHandler handler : replicationServers.values())
        {
          try
          {
            handler.sendGenerationId(new ResetGenerationId());
          }
          catch (IOException e)
          {
            logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
                get(handler.getMonitorInstanceName()));
           }
        }
      }
      // Reset the localchange and state db for the current domain
      synchronized (sourceDbHandlers)
      {
        for (DbHandler dbHandler : sourceDbHandlers.values())
        {
          try
          {
            dbHandler.clear();
          }
          catch (Exception e)
          {
            // TODO: i18n
            logError(Message.raw(
                "Exception caught while clearing dbHandler:" +
                e.getLocalizedMessage()));
          }
        }
        sourceDbHandlers.clear();
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " baseDN=" + baseDn +
              " The source db handler has been cleared");
      }
      try
      {
        replicationServer.clearGenerationId(baseDn);
      }
      catch (Exception e)
      {
        // TODO: i18n
        logError(Message.raw(
            "Exception caught while clearing generationId:" +
            e.getLocalizedMessage()));
      }
      // Reset the in memory domain generationId
      generationId = -1;
    }
    /**
     * Returns whether the provided server is in degraded
     * state due to the fact that the peer server has an invalid
     * generationId for this domain.
     *
     * @param serverId The serverId for which we want to know the
     *                 the state.
     * @return Whether it is degraded or not.
     */
    public boolean isDegradedDueToGenerationId(short serverId)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDN=" + baseDn +
            " isDegraded serverId=" + serverId +
            " given local generation Id=" + this.generationId);
      ServerHandler handler = replicationServers.get(serverId);
      if (handler == null)
      {
        handler = connectedServers.get(serverId);
        if (handler == null)
        {
          return false;
        }
      }
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDN=" + baseDn +
            " Compute degradation of serverId=" + serverId +
            " LS server generation Id=" + handler.getGenerationId());
      return (handler.getGenerationId() != this.generationId);
    }
    /**
     * Return the associated replication server.
     * @return The replication server.
     */
    public ReplicationServer getReplicationServer()
    {
      return replicationServer;
    }
}