mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
26.31.2007 71ebb3724c79a7d1218c36f080acd6ee162b9cd2
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
File was renamed from opends/src/server/org/opends/server/replication/plugin/ChangelogBroker.java
@@ -52,7 +52,7 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ChangelogStartMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
@@ -72,13 +72,13 @@
/**
 * The broker for Multi-master Replication.
 */
public class ChangelogBroker implements InternalSearchListener
public class ReplicationBroker implements InternalSearchListener
{
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
  private final Object lock = new Object();
  private String changelogServer = "Not connected";
  private String replicationServer = "Not connected";
  private TreeSet<FakeOperation> replayOperations;
  private ProtocolSession session = null;
  private final ServerState state;
@@ -114,26 +114,26 @@
  /**
   * Creates a new Changelog Broker for a particular ReplicationDomain.
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
   *
   * @param state The ServerState that should be used by this broker
   *              when negociating the session with the changelog servers.
   *              when negociating the session with the replicationServer.
   * @param baseDn The base DN that should be used by this broker
   *              when negociating the session with the changelog servers.
   *              when negociating the session with the replicationServer.
   * @param serverID The server ID that should be used by this broker
   *              when negociating the session with the changelog servers.
   *              when negociating the session with the replicationServer.
   * @param maxReceiveQueue The maximum size of the receive queue to use on
   *                         the changelog server.
   *                         the replicationServer.
   * @param maxReceiveDelay The maximum replication delay to use on the
   *                        changelog server.
   *                        replicationServer.
   * @param maxSendQueue The maximum size of the send queue to use on
   *                     the changelog server.
   * @param maxSendDelay The maximum send delay to use on the changelog server.
   *                     the replicationServer.
   * @param maxSendDelay The maximum send delay to use on the replicationServer.
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * changelog server, or zero if no heartbeats are requested.
   * replicationServer, or zero if no heartbeats are requested.
   */
  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
  {
@@ -153,7 +153,7 @@
  }
  /**
   * Start the ChangelogBroker.
   * Start the ReplicationBroker.
   *
   * @param servers list of servers used
   * @throws Exception : in case of errors
@@ -162,7 +162,7 @@
                    throws Exception
  {
    /*
     * Open Socket to the Changelog
     * Open Socket to the ReplicationServer
     * Send the Start message
     */
    shutdown = false;
@@ -182,14 +182,14 @@
  /**
   * Connect to a Changelog server.
   * Connect to a ReplicationServer.
   *
   * @throws NumberFormatException address was invalid
   * @throws IOException error during connection phase
   */
  private void connect() throws NumberFormatException, IOException
  {
    ChangelogStartMessage startMsg;
    ReplServerStartMessage startMsg;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -230,30 +230,30 @@
          /*
           * Read the ChangelogStartMessage that should come back.
           * Read the ReplServerStartMessage that should come back.
           */
          session.setSoTimeout(1000);
          startMsg = (ChangelogStartMessage) session.receive();
          startMsg = (ReplServerStartMessage) session.receive();
          session.setSoTimeout(timeout);
          /*
           * We must not publish changes to a changelog that has not
           * We must not publish changes to a replicationServer that has not
           * seen all our previous changes because this could cause some
           * other ldap servers to miss those changes.
           * Check that the Changelog has seen all our previous changes.
           * If not, try another changelog server.
           * If no other changelog server has seen all our changes, recover
           * those changes and send them again to any changelog server.
           * Check that the ReplicationServer has seen all our previous changes.
           * If not, try another replicationServer.
           * If no other replicationServer has seen all our changes, recover
           * those changes and send them again to any replicationServer.
           */
          ChangeNumber changelogMaxChangeNumber =
          ChangeNumber replServerMaxChangeNumber =
            startMsg.getServerState().getMaxChangeNumber(serverID);
          if (changelogMaxChangeNumber == null)
            changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          if (replServerMaxChangeNumber == null)
            replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
          if ((ourMaxChangeNumber == null) ||
              (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
              (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
          {
            changelogServer = ServerAddr.toString();
            replicationServer = ServerAddr.toString();
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
@@ -263,7 +263,7 @@
          {
            if (checkState == true)
            {
              /* This changelog server is missing some
              /* This replicationServer is missing some
               * of our changes, we are going to try another server
               * but before log a notice message
               */
@@ -277,14 +277,14 @@
            {
              replayOperations.clear();
              /*
               * Get all the changes that have not been seen by this changelog
               * server and update it
               * Get all the changes that have not been seen by this
               * replicationServer and update it
               */
              InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
              LDAPFilter filter = LDAPFilter.decode(
                  "("+ Historical.HISTORICALATTRIBUTENAME +
                  ">=dummy:" + changelogMaxChangeNumber + ")");
                  ">=dummy:" + replServerMaxChangeNumber + ")");
              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
              attrs.add(Historical.HISTORICALATTRIBUTENAME);
              InternalSearchOperation op = conn.processSearch(
@@ -308,7 +308,7 @@
              }
              else
              {
                changelogServer = ServerAddr.toString();
                replicationServer = ServerAddr.toString();
                maxSendWindow = startMsg.getWindowSize();
                this.sendWindow = new Semaphore(maxSendWindow);
                connected = true;
@@ -325,7 +325,7 @@
        {
          /*
           * There was no server waiting on this host:port
           * Log a notice and try the next changelog server in the list
           * Log a notice and try the next replicationServer in the list
           */
          int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
          String message = getMessage(msgID, server);
@@ -363,9 +363,9 @@
        if (checkState == true)
        {
          /*
           * We could not find a changelog server that has seen all the
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any changelog server.
           * the loop looking for any replicationServer.
           */
          try
          {
@@ -385,7 +385,7 @@
        else
        {
          /*
           * This server could not find any changelog server
           * This server could not find any replicationServer
           * Let's wait a little and try again.
           */
          synchronized (this)
@@ -419,7 +419,7 @@
  /**
   * Restart the Changelog broker after a failure.
   * Restart the ReplicationServer broker after a failure.
   *
   * @param failingSession the socket which failed
   */
@@ -545,14 +545,14 @@
   */
  public void stop()
  {
    changelogServer = "stopped";
    replicationServer = "stopped";
    shutdown = true;
    connected = false;
    try
    {
      if (debugEnabled())
      {
        debugInfo("ChangelogBroker Stop Closing session");
        debugInfo("ReplicationBroker Stop Closing session");
      }
      if (session != null)
@@ -599,15 +599,15 @@
  }
  /**
   * Get the name of the changelog server to which this broker is currently
   * Get the name of the replicationServer to which this broker is currently
   * connected.
   *
   * @return the name of the changelog server to which this domain
   * @return the name of the replicationServer to which this domain
   *         is currently connected.
   */
  public String getChangelogServer()
  public String getReplicationServer()
  {
    return changelogServer;
    return replicationServer;
  }
  /**
   * {@inheritDoc}
@@ -621,7 +621,7 @@
     * TODO : implement code for ADD, DEL, MODDN operation
     *
     * Parse all ds-sync-hist attribute values
     *   - for each Changenumber>changelogMaxChangeNumber :
     *   - for each Changenumber > replication server MaxChangeNumber :
     *          build an attribute mod
     *
     */
@@ -699,7 +699,7 @@
  /**
   * Change some config parameters.
   *
   * @param changelogServers    The new list of changelog servers.
   * @param replicationServers    The new list of replication servers.
   * @param maxReceiveQueue     The max size of receive queue.
   * @param maxReceiveDelay     The max receive delay.
   * @param maxSendQueue        The max send queue.
@@ -707,11 +707,11 @@
   * @param window              The max window size.
   * @param heartbeatInterval   The heartbeat interval.
   */
  public void changeConfig(Collection<String> changelogServers,
  public void changeConfig(Collection<String> replicationServers,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
  {
    this.servers = changelogServers;
    this.servers = replicationServers;
    this.maxRcvWindow = window;
    this.heartbeatInterval = heartbeatInterval;
    this.maxReceiveDelay = maxReceiveDelay;
@@ -719,7 +719,7 @@
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    // TODO : Changing those parameters requires to either restart a new
    // session with the changelog server or renegociate the parameters that
    // session with the replicationServer or renegociate the parameters that
    // were sent in the ServerStart message
  }
}