mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,8 +25,7 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -93,6 +92,7 @@
  private int maxRcvWindow;
  private int timeout = 0;
  private short protocolVersion;
  private long generationId = -1;
  private ReplSessionSecurity replSessionSecurity;
  /**
@@ -143,12 +143,15 @@
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * replicationServer, or zero if no heartbeats are requested.
   *
   * @param generationId The generationId for the server associated to the
   * provided serverID and for the domain associated to the provided baseDN.
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval,
      ReplSessionSecurity replSessionSecurity)
      long generationId, ReplSessionSecurity replSessionSecurity)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -164,6 +167,7 @@
    this.halfRcvWindow = window/2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.currentVersion();
    this.generationId = generationId;
    this.replSessionSecurity = replSessionSecurity;
  }
@@ -198,7 +202,7 @@
   */
  private void connect()
  {
    ReplServerStartMessage startMsg;
    ReplServerStartMessage replServerStartMsg;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -207,8 +211,24 @@
      heartbeatMonitor = null;
    }
    // checkState is true for the first loop on all replication servers
    // looking for one already up-to-date.
    // If we found some responding replication servers but none up-to-date
    // then we set check-state to false and do a second loop where the first
    // found will be the one elected and then we will update this replication
    // server.
    boolean checkState = true;
    boolean receivedResponse = true;
    // TODO: We are doing here 2 loops opening , closing , reopening session to
    // the same servers .. risk to have 'same server id' erros.
    // Would be better to do only one loop, keeping the best candidate while
    // traversing the list of replication servers to connect to.
    if (servers.size()==1)
    {
      checkState = false;
    }
    synchronized (connectPhaseLock)
    {
      while ((!connected) && (!shutdown) && (receivedResponse))
@@ -240,7 +260,7 @@
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion, isSslEncryption);
                protocolVersion, generationId, isSslEncryption);
            session.publish(msg);
@@ -248,7 +268,7 @@
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            startMsg = (ReplServerStartMessage) session.receive();
            replServerStartMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
@@ -257,7 +277,7 @@
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                startMsg.getVersion());
                replServerStartMsg.getVersion());
            session.setSoTimeout(timeout);
            if (!isSslEncryption)
@@ -276,7 +296,7 @@
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              startMsg.getServerState().getMaxChangeNumber(serverID);
              replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            ChangeNumber ourMaxChangeNumber =
@@ -285,7 +305,7 @@
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              replicationServer = ServerAddr.toString();
              maxSendWindow = startMsg.getWindowSize();
              maxSendWindow = replServerStartMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
@@ -298,7 +318,8 @@
                 * of our changes, we are going to try another server
                 * but before log a notice message
                 */
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server);
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
                    baseDn.toNormalizedString());
                logError(message);
              }
              else
@@ -341,7 +362,7 @@
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = startMsg.getWindowSize();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
@@ -371,8 +392,9 @@
          }
          catch (Exception e)
          {
            Message message =
                    ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage());
            Message message = NOTE_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            logError(message);
          }
          finally
@@ -392,7 +414,9 @@
              }
            }
          }
        }
        } // for servers
        // We have traversed all the replication servers
        if ((!connected) && (checkState == true) && receivedResponse)
        {
@@ -401,12 +425,16 @@
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get();
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
          checkState = false;
        }
      }
      // We have traversed all the replication servers as many times as needed
      // to find one if one is up and running.
      if (connected)
      {
        // This server has connected correctly.
@@ -462,9 +490,9 @@
  /**
   * restart the ReplicationBroker.
   */
  private void reStart()
  public void reStart()
  {
    reStart(null);
    reStart(this.session);
  }
  /**
@@ -472,7 +500,7 @@
   *
   * @param failingSession the socket which failed
   */
  private void reStart(ProtocolSession failingSession)
  public void reStart(ProtocolSession failingSession)
  {
    try
    {
@@ -498,7 +526,8 @@
      } catch (Exception e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage()));
        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
         baseDn.toNormalizedString(), e.getLocalizedMessage()));
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
@@ -533,6 +562,13 @@
        // choice than to return without sending the ReplicationMessage
        // and relying on the resend procedure of the connect phase to
        // fix the problem when we finally connect.
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() Publishing a " +
              " message is not possible due to existing connection error.");
        }
        return;
      }
@@ -601,12 +637,22 @@
          } catch (InterruptedException e1)
          {
            // ignore
            if (debugEnabled())
            {
              debugInfo("ReplicationBroker.publish() " +
                  "IO exception raised : " + e.getLocalizedMessage());
            }
          }
        }
      }
      catch (InterruptedException e)
      {
        // just loop.
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() " +
              "Interrupted exception raised." + e.getLocalizedMessage());
        }
      }
    }
  }
@@ -628,7 +674,7 @@
    {
      if (!connected)
      {
        reStart();
        reStart(null);
      }
      ProtocolSession failingSession = session;
@@ -663,6 +709,9 @@
          Message message =
              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
          logError(message);
          debugInfo("ReplicationBroker.receive() " + baseDn +
              " Exception raised." + e + e.getLocalizedMessage());
          this.reStart(failingSession);
        }
      }
@@ -683,7 +732,8 @@
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("ReplicationBroker Stop Closing session");
        debugInfo("ReplicationBroker is stopping. and will" +
          "close the connection");
      }
      if (session != null)
@@ -713,6 +763,20 @@
  }
  /**
   * Set the value of the generationId for that broker. Normally the
   * generationId is set through the constructor but there are cases
   * where the value of the generationId must be changed while the broker
   * already exist for example after an on-line import.
   *
   * @param generationId The value of the generationId.
   *
   */
  public void setGenerationId(long generationId)
  {
    this.generationId = generationId;
  }
  /**
   * Get the name of the replicationServer to which this broker is currently
   * connected.
   *
@@ -857,6 +921,13 @@
    return !connectionError;
  }
  private boolean debugEnabled() { return true; }
  private static final void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    TRACER.debugInfo(s);
  }
  /**
   * Determine whether the connection to the replication server is encrypted.
   * @return true if the connection is encrypted, false otherwise.