mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
12.41.2007 1c8b422d63f419d8c85a28b1f2276ac0f3e3632c
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -95,6 +95,7 @@
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
   */
  private Map<Short, ServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ServerHandler>();
@@ -253,6 +254,11 @@
        return false;
      }
      connectedServers.put(handler.getServerId(), handler);
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
      return true;
    }
  }
@@ -269,7 +275,13 @@
    if (handler.isReplicationServer())
      replicationServers.remove(handler.getServerId());
    else
    {
      connectedServers.remove(handler.getServerId());
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
    }
  }
  /**
@@ -312,6 +324,12 @@
        return false;
      }
      replicationServers.put(handler.getServerId(), handler);
      // Update this server with the list of LDAP servers
      // already connected
      handler.sendInfo(
          new ReplServerInfoMessage(getConnectedLDAPservers()));
      return true;
    }
  }
@@ -376,6 +394,22 @@
    return sourceDbHandlers.keySet();
  }
  /**
   * Returns as a set of String the list of LDAP servers connected to us.
   * Each string is the serverID of a connected LDAP server.
   *
   * @return The set of connected LDAP servers
   */
  public List<String> getConnectedLDAPservers()
  {
    List<String> mySet = new ArrayList<String>(0);
    for (ServerHandler handler : connectedServers.values())
    {
      mySet.add(String.valueOf(handler.getServerId()));
    }
    return mySet;
  }
  /**
   * Creates and returns an iterator.
@@ -473,15 +507,9 @@
  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
      ServerHandler senderHandler)
  {
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "getDestinationServers"
        + " msgDest:" + msg.getDestination() , 1);
    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
    {
      // TODO Import from the "closest server" to be implemented
@@ -497,7 +525,7 @@
        }
      }
      // Send to all connected LDAP servers
      // Sends to all connected LDAP servers
      for (ServerHandler destinationHandler : connectedServers.values())
      {
        // Don't loop on the sender
@@ -518,14 +546,20 @@
      else
      {
        // the targeted server is NOT connected
        // Let's search for THE changelog server that MAY
        // have the targeted server connected.
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(replicationServers.values());
          for (ServerHandler h : replicationServers.values())
          {
            if (h.isRemoteLDAPServer(msg.getDestination()))
            {
              servers.add(h);
            }
          }
        }
      }
    }
    return servers;
  }
@@ -543,37 +577,53 @@
    if (servers.isEmpty())
    {
      if (!(msg instanceof InitializeRequestMessage))
      {
        // TODO A more elaborated policy is probably needed
      }
      else
      {
        ErrorMessage errMsg = new ErrorMessage(
            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
            "serverID:" + msg.getDestination());
        try
        {
          senderHandler.send(errMsg);
        }
        catch(IOException ioe)
        {
          // TODO Handle error properly (sender timeout in addition)
        }
      }
      return;
    }
    for (ServerHandler targetHandler : servers)
    {
      ErrorMessage errMsg = new ErrorMessage(
          msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
          "serverID:" + msg.getDestination());
      try
      {
        targetHandler.send(msg);
        senderHandler.send(errMsg);
      }
      catch(IOException ioe)
      {
        // TODO Handle error properly (sender timeout in addition)
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         */
        int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR;
        String message = getMessage(msgID, this.toString())
        + stackTraceToSingleLineString(ioe);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        senderHandler.shutdown();
      }
    }
    else
    {
      for (ServerHandler targetHandler : servers)
      {
        try
        {
          targetHandler.send(msg);
        }
        catch(IOException ioe)
        {
          /*
           * An error happened trying the send back an ack to this server.
           * Log an error and close the connection to this server.
           */
          int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG;
          String message = getMessage(msgID, this.toString())
          + stackTraceToSingleLineString(ioe) + " "
          + msg.getClass().getCanonicalName();
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          senderHandler.shutdown();
          // TODO Handle error properly (sender timeout in addition)
        }
      }
    }
@@ -722,4 +772,48 @@
      }
      return true;
    }
    /**
     * Send a ReplServerInfoMessage to all the connected replication servers
     * in order to let them know our connected LDAP servers.
     */
    private void sendReplServerInfo()
    {
      ReplServerInfoMessage info =
        new ReplServerInfoMessage(getConnectedLDAPservers());
      for (ServerHandler handler : replicationServers.values())
      {
        try
        {
          handler.sendInfo(info);
        }
        catch (IOException e)
        {
          /*
           * An error happened trying the send back an ack to this server.
           * Log an error and close the connection to this server.
           */
          int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_INFO;
          String message = getMessage(msgID, this.toString())
          + stackTraceToSingleLineString(e);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          handler.shutdown();
        }
      }
    }
    /**
     * Sets the replication server informations for the provided
     * handler from the provided ReplServerInfoMessage.
     *
     * @param handler The server handler from which the info was received.
     * @param infoMsg The information message that was received.
     */
    public void setReplServerInfo(
        ServerHandler handler, ReplServerInfoMessage infoMsg)
    {
      handler.setReplServerInfo(infoMsg);
    }
}