mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.17.2013 0a9135e3444bbefde6188f456b9c9772a816096d
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
@@ -79,7 +80,7 @@
  private volatile String replicationServer = NO_CONNECTED_SERVER;
  private volatile Session session = null;
  private final ServerState state;
  private final String baseDn;
  private final DN baseDN;
  private final int serverId;
  private Semaphore sendWindow;
  private int maxSendWindow;
@@ -192,9 +193,9 @@
   * @param replicationDomain The replication domain that is creating us.
   * @param state The ServerState that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param baseDn The base DN that should be used by this broker
   * @param baseDN The base DN that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param serverID2 The server ID that should be used by this broker
   * @param serverId The server ID that should be used by this broker
   *        when negotiating the session with the replicationServer.
   * @param window The size of the send and receive window to use.
   * @param generationId The generationId for the server associated to the
@@ -208,14 +209,14 @@
   *        or zero if no CSN heartbeat should be sent.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
    ServerState state, String baseDn, int serverID2, int window,
    ServerState state, DN baseDN, int serverId, int window,
    long generationId, long heartbeatInterval,
    ReplSessionSecurity replSessionSecurity, byte groupId,
    long changeTimeHeartbeatInterval)
  {
    this.domain = replicationDomain;
    this.baseDn = baseDn;
    this.serverId = serverID2;
    this.baseDN = baseDN;
    this.serverId = serverId;
    this.state = state;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.replSessionSecurity = replSessionSecurity;
@@ -245,7 +246,7 @@
    {
      shutdown = false;
      this.rcvWindow = this.maxRcvWindow;
      this.connect();
      connect();
    }
  }
@@ -269,7 +270,7 @@
      }
      this.rcvWindow = this.maxRcvWindow;
      this.connect();
      connect();
    }
  }
@@ -779,8 +780,8 @@
  private void connect()
  {
    if (this.baseDn.compareToIgnoreCase(
      ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
    if (this.baseDN.toNormalizedString().equalsIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
    {
      connectAsECL();
    } else
@@ -964,14 +965,14 @@
            || (electedRsInfo.getGenerationId() == -1))
        {
          Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
              .get(serverId, rsServerId, baseDn,
              .get(serverId, rsServerId, baseDN.toNormalizedString(),
                  session.getReadableRemoteAddress(),
                  getGenerationID());
          logError(message);
        } else
        {
          Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
              .get(serverId, rsServerId, baseDn,
              .get(serverId, rsServerId, baseDN.toNormalizedString(),
                  session.getReadableRemoteAddress(),
                  getGenerationID(),
                  electedRsInfo.getGenerationId());
@@ -995,15 +996,14 @@
          {
            Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
                serverId,
                baseDn,
                collectionToString(replicationServerInfos.keySet(),
                    ", "));
                baseDN.toNormalizedString(),
                collectionToString(replicationServerInfos.keySet(), ", "));
            logError(message);
          }
          else
          {
            Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
                serverId, baseDn);
                serverId, baseDN.toNormalizedString());
            logError(message);
          }
        }
@@ -1082,11 +1082,10 @@
        warn user and start heartbeat monitor to recover when a server
        with the right group id shows up.
        */
        Message message =
            WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte
                .toString(groupId), Integer.toString(rsServerId), rsInfo
                .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer
                .toString(serverId));
        Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
                Byte.toString(groupId), Integer.toString(rsServerId),
                rsInfo.getServerURL(), Byte.toString(getRsGroupId()),
                baseDN.toNormalizedString(), Integer.toString(serverId));
        logError(message);
      }
      startRSHeartBeatMonitoring();
@@ -1098,10 +1097,9 @@
    }
    catch (Exception e)
    {
      Message message =
          ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e
              .getLocalizedMessage()
              + stackTraceToSingleLineString(e));
      Message message = ERR_COMPUTING_FAKE_OPS.get(
          baseDN.toNormalizedString(), rsInfo.getServerURL(),
          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
      logError(message);
    }
    finally
@@ -1149,7 +1147,7 @@
        if (debugEnabled())
        {
          TRACER.debugInfo("RB for dn " + baseDn + " and with server id "
          TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
              + serverId + " computed " + nChanges + " changes late.");
        }
@@ -1211,6 +1209,8 @@
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    final String baseDn = this.baseDN.toNormalizedString();
    Session localSession = null;
    Socket socket = null;
    boolean hasConnected = false;
@@ -1218,9 +1218,7 @@
    try
    {
      /*
       * Open a socket connection to the next candidate.
       */
      // Open a socket connection to the next candidate.
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
          InetAddress.getByName(hostname), intPort);
@@ -1239,15 +1237,15 @@
      StartMsg serverStartMsg;
      if (!isECL)
      {
        serverStartMsg = new ServerStartMsg(serverId, url, baseDn,
            maxRcvWindow, heartbeatInterval, state,
            this.getGenerationID(), isSslEncryption, groupId);
        serverStartMsg = new ServerStartMsg(serverId, url,
            baseDN.toNormalizedString(), maxRcvWindow, heartbeatInterval, state,
            getGenerationID(), isSslEncryption, groupId);
      }
      else
      {
        serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
            maxRcvWindow, heartbeatInterval, state,
            this.getGenerationID(), isSslEncryption, groupId);
            getGenerationID(), isSslEncryption, groupId);
      }
      localSession.publish(serverStartMsg);
@@ -1256,7 +1254,7 @@
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
            + serverStartMsg + "\nAND RECEIVED:\n" + msg);
      }
@@ -1266,10 +1264,9 @@
      // Sanity check
      String repDn = replServerInfo.getBaseDn();
      if (!this.baseDn.equals(repDn))
      if (!baseDn.equals(repDn))
      {
        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn,
            this.baseDn);
        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, baseDn);
        return null;
      }
@@ -1324,22 +1321,8 @@
    {
      if (!hasConnected || !keepConnection)
      {
        if (localSession != null)
        {
          localSession.close();
        }
        if (socket != null)
        {
          try
          {
            socket.close();
          }
          catch (IOException e)
          {
            // Ignore.
          }
        }
        close(localSession);
        close(socket);
      }
      if (!hasConnected && errorMessage != null)
@@ -1372,13 +1355,9 @@
   * reply message from the replication server.
   *
   * @param server Server we are connecting with.
   * @return The ReplServerStartMsg the server replied. Null if could not
   *         get an answer.
   */
  private TopologyMsg performECLPhaseTwoHandshake(String server)
  private void performECLPhaseTwoHandshake(String server)
  {
    TopologyMsg topologyMsg = null;
    try
    {
      // Send our Start Session
@@ -1386,32 +1365,24 @@
      startECLSessionMsg.setOperationId("-1");
      session.publish(startECLSessionMsg);
      /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
       * Read the TopologyMsg that should come back.
      topologyMsg = (TopologyMsg) session.receive();
       */
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
            + startECLSessionMsg);
      }
      // Alright set the timeout to the desired value
      session.setSoTimeout(timeout);
      connected = true;
    } catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDn, stackTraceToSingleLineString(e));
          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
      logError(message);
      setSession(null);
      // Be sure to return null.
      topologyMsg = null;
    }
    return topologyMsg;
  }
  /**
@@ -1464,7 +1435,7 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
        TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
            + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
      }
@@ -1474,7 +1445,7 @@
    } catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
          server, baseDn, stackTraceToSingleLineString(e));
          server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
      logError(message);
      setSession(null);
@@ -2118,8 +2089,8 @@
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor = new HeartbeatMonitor(getServerId(),
          getRsServerId(), baseDn, session, heartbeatInterval);
      heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
          baseDN.toNormalizedString(), session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
@@ -2185,8 +2156,8 @@
        catch (Exception e)
        {
          MessageBuilder mb = new MessageBuilder();
          mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn,
              e.getLocalizedMessage()));
          mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
              baseDN.toNormalizedString(), e.getLocalizedMessage()));
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
        }
@@ -2210,7 +2181,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(this + " end restart : connected=" + connected
        + " with RSid=" + this.getRsServerId() + " genid=" + this.generationID);
        + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
    }
  }
@@ -2476,17 +2447,14 @@
        }
        else if (msg instanceof StopMsg)
        {
          /*
           * RS performs a proper disconnection
           */
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
              .get(replicationServerID,
                  savedSession.getReadableRemoteAddress(),
              serverId, baseDn);
          // RS performs a proper disconnection
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
              replicationServerID, savedSession.getReadableRemoteAddress(),
              serverId, baseDN.toNormalizedString());
          logError(message);
          // Try to find a suitable RS
          this.reStart(savedSession, true);
          reStart(savedSession, true);
        }
        else if (msg instanceof MonitorMsg)
        {
@@ -2547,14 +2515,15 @@
                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      savedSession.getReadableRemoteAddress(),
                      baseDn);
                      baseDN.toNormalizedString());
                }
                else
                {
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, replicationServerID,
                      savedSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(), baseDn);
                      bestServerInfo.getServerId(),
                      baseDN.toNormalizedString());
                }
                logError(message);
                reStart(true);
@@ -2586,12 +2555,10 @@
          final Session tmpSession = session;
          if (tmpSession == null || !tmpSession.closeInitiated())
          {
            /*
             * We did not initiate the close on our side, log an error message.
             */
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
                .get(serverId, baseDn, replicationServerID,
                    savedSession.getReadableRemoteAddress());
            // We did not initiate the close on our side, log an error message.
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                serverId, baseDN.toNormalizedString(), replicationServerID,
                savedSession.getReadableRemoteAddress());
            logError(message);
          }
@@ -2678,7 +2645,7 @@
    if (debugEnabled())
      TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will"
        + " close the connection to replication server " + rsServerId + " for"
        + " domain " + baseDn);
        + " domain " + baseDN);
    synchronized (startStopLock)
    {
@@ -2767,10 +2734,8 @@
    if (connected)
    {
      return sendWindow.availablePermits();
    } else
    {
      return 0;
    }
    return 0;
  }
  /**
@@ -2864,9 +2829,9 @@
    } catch (IOException ex)
    {
      Message message = ERR_EXCEPTION_SENDING_CS.get(
        baseDn,
        baseDN.toNormalizedString(),
        Integer.toString(serverId),
        ex.getLocalizedMessage() + stackTraceToSingleLineString(ex));
        ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
      logError(message);
    }
  }
@@ -3022,10 +2987,9 @@
    // Start a CSN heartbeat thread.
    if (changeTimeHeartbeatSendInterval > 0)
    {
      String threadName = "Replica DS("
          + this.getServerId()
      String threadName = "Replica DS(" + getServerId()
          + ") change time heartbeat publisher for domain \""
          + this.baseDn + "\" to RS(" + this.getRsServerId()
          + this.baseDN + "\" to RS(" + getRsServerId()
          + ") at " + session.getReadableRemoteAddress();
      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(