mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
17.42.2007 3eb214ba900a7f3e4550f6a047db26e7f40b82a8
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import com.sleepycat.je.DatabaseException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.InitializeRequestMessage;
import org.opends.server.synchronization.protocol.ErrorMessage;
import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import com.sleepycat.je.DatabaseException;
/**
 * This class define an in-memory cache that will be used to store
 * the messages that have been received from an LDAP server or
@@ -425,148 +430,263 @@
    }
  }
  /**
   * Send back an ack to the server that sent the change.
   * Retrieves the destination handlers for a routable message.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   * @param msg The message to route.
   * @param senderHandler The handler of the server that published this message.
   * @return The list of destination handlers.
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
      ServerHandler senderHandler)
  {
    short serverId = changeNumber.getServerId();
    sendAck(changeNumber, isLDAPserver, serverId);
  }
  /**
   *
   * Send back an ack to a server that sent the change.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a Changelog server.
   * @param serverId     The identifier of the server from which we
   *                     received the change..
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
                      short serverId)
  {
    ServerHandler handler;
    if (isLDAPserver)
      handler = connectedServers.get(serverId);
    else
      handler = changelogServers.get(serverId);
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    // TODO : check for null handler and log error
    try
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "getDestinationServers"
        + " msgDest:" + msg.getDestination() , 1);
    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
    {
      handler.sendAck(changeNumber);
    } catch (IOException e)
    {
      /*
       * An error happened trying the send back an ack to this server.
       * Log an error and close the connection to this server.
       */
      int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
      String message = getMessage(msgID, this.toString())
                                  + stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      handler.shutdown();
      // TODO Import from the "closest server" to be implemented
    }
  }
  /**
   * Shutdown this ChangelogCache.
   */
  public void shutdown()
  {
    // Close session with other changelogs
    for (ServerHandler serverHandler : changelogServers.values())
    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
    {
      serverHandler.shutdown();
    }
    // Close session with other LDAP servers
    for (ServerHandler serverHandler : connectedServers.values())
    {
      serverHandler.shutdown();
    }
    // Shutdown the dbHandlers
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      if (!senderHandler.isChangelogServer())
      {
        dbHandler.shutdown();
        // Send to all changelogServers
        for (ServerHandler destinationHandler : changelogServers.values())
        {
          servers.add(destinationHandler);
        }
      }
      sourceDbHandlers.clear();
      // Send to all connected LDAP servers
      for (ServerHandler destinationHandler : connectedServers.values())
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
          continue;
        servers.add(destinationHandler);
      }
    }
    else
    {
      // Destination is one server
      ServerHandler destinationHandler =
        connectedServers.get(msg.getDestination());
      if (destinationHandler != null)
      {
        servers.add(destinationHandler);
      }
      else
      {
        // the targeted server is NOT connected
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(changelogServers.values());
        }
      }
    }
    return servers;
  }
  /**
   * Returns the ServerState describing the last change from this replica.
   * Process an InitializeRequestMessage.
   *
   * @return The ServerState describing the last change from this replica.
   * @param msg The message received and to be processed.
   * @param senderHandler The server handler of the server that emitted
   * the message.
   */
  public ServerState getDbServerState()
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    ServerState serverState = new ServerState();
    for (DbHandler db : sourceDbHandlers.values())
    {
      serverState.update(db.getLastChange());
    }
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    return "ChangelogCache " + baseDn;
  }
    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : changelogServers.values())
    if (servers.isEmpty())
    {
      handler.checkWindow();
      if (!(msg instanceof InitializeRequestMessage))
      {
        // TODO A more elaborated policy is probably needed
      }
      else
      {
        ErrorMessage errMsg = new ErrorMessage(
            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
            "serverID:" + msg.getDestination());
        try
        {
          senderHandler.send(errMsg);
        }
        catch(IOException ioe)
        {
          // TODO Handle error properly (sender timeout in addition)
        }
      }
      return;
    }
    for (ServerHandler handler : connectedServers.values())
    for (ServerHandler targetHandler : servers)
    {
      handler.checkWindow();
      try
      {
        targetHandler.send(msg);
      }
      catch(IOException ioe)
      {
        // TODO Handle error properly (sender timeout in addition)
      }
    }
  }
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  {
    for (ServerHandler handler : changelogServers.values())
    /**
     * Send back an ack to the server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
    {
      if (!handler.restartAfterSaturation(sourceHandler))
      short serverId = changeNumber.getServerId();
      sendAck(changeNumber, isLDAPserver, serverId);
    }
    /**
     *
     * Send back an ack to a server that sent the change.
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     * @param serverId     The identifier of the server from which we
     *                     received the change..
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
        short serverId)
    {
      ServerHandler handler;
      if (isLDAPserver)
        handler = connectedServers.get(serverId);
      else
        handler = changelogServers.get(serverId);
      // TODO : check for null handler and log error
      try
      {
        handler.sendAck(changeNumber);
      } catch (IOException e)
      {
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         */
        int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_ACK;
        String message = getMessage(msgID, this.toString())
        + stackTraceToSingleLineString(e);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        handler.shutdown();
      }
    }
    /**
     * Shutdown this ChangelogCache.
     */
    public void shutdown()
    {
      // Close session with other changelogs
      for (ServerHandler serverHandler : changelogServers.values())
      {
        serverHandler.shutdown();
      }
      // Close session with other LDAP servers
      for (ServerHandler serverHandler : connectedServers.values())
      {
        serverHandler.shutdown();
      }
      // Shutdown the dbHandlers
      synchronized (sourceDbHandlers)
      {
        for (DbHandler dbHandler : sourceDbHandlers.values())
        {
          dbHandler.shutdown();
        }
        sourceDbHandlers.clear();
      }
    }
    /**
     * Returns the ServerState describing the last change from this replica.
     *
     * @return The ServerState describing the last change from this replica.
     */
    public ServerState getDbServerState()
    {
      ServerState serverState = new ServerState();
      for (DbHandler db : sourceDbHandlers.values())
      {
        serverState.update(db.getLastChange());
      }
      return serverState;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      return "ChangelogCache " + baseDn;
    }
    /**
     * Check if some server Handler should be removed from flow control state.
     * @throws IOException If an error happened.
     */
    public void checkAllSaturation() throws IOException
    {
      for (ServerHandler handler : changelogServers.values())
      {
        handler.checkWindow();
      }
      for (ServerHandler handler : connectedServers.values())
      {
        handler.checkWindow();
      }
    }
    /**
     * Check if a server that was in flow control can now restart
     * sending updates.
     * @param sourceHandler The server that must be checked.
     * @return true if the server can restart sending changes.
     *         false if the server can't restart sending changes.
     */
    public boolean restartAfterSaturation(ServerHandler sourceHandler)
    {
      for (ServerHandler handler : changelogServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
      }
    for (ServerHandler handler : connectedServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
      for (ServerHandler handler : connectedServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
      }
      return true;
    }
    return true;
  }
}