mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.03.2013 4b02c5f2c1450b7d44022c6ea52260823ae39686
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -874,7 +874,6 @@
    stopChangeTimeHeartBeatPublishing();
    mustRunBestServerCheckingAlgorithm = 0;
    boolean newServerWithSameGroupId = false;
    synchronized (connectPhaseLock)
    {
      /*
@@ -889,164 +888,45 @@
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
      ReplicationServerInfo replicationServerInfo = null;
      ReplicationServerInfo electedRsInfo = null;
      if (replicationServerInfos.size() > 0)
      {
        // At least one server answered, find the best one.
        replicationServerInfo = computeBestReplicationServer(true, -1, state,
          replicationServerInfos, serverId, baseDn, groupId,
          this.getGenerationID());
        electedRsInfo = computeBestReplicationServer(true, -1, state,
          replicationServerInfos, serverId, baseDn, groupId, getGenerationID());
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          debugInfo("serverId: " + serverId +
            " phase 2 : will perform PhaseOneH with the preferred RS="
              + replicationServerInfo);
        replicationServerInfo = performPhaseOneHandshake(
          replicationServerInfo.getServerURL(), true, false);
              + electedRsInfo);
        electedRsInfo = performPhaseOneHandshake(
          electedRsInfo.getServerURL(), true, false);
        if (replicationServerInfo != null)
        if (electedRsInfo != null)
        {
          // Update replication server info with potentially more up to date
          // data (server state for instance may have changed)
          replicationServerInfos.put(replicationServerInfo.getServerId(),
              replicationServerInfo);
          replicationServerInfos
              .put(electedRsInfo.getServerId(), electedRsInfo);
          // Handshake phase 1 exchange went well
          // Compute in which status we are starting the session to tell the RS
          ServerStatus initStatus =
            computeInitialServerStatus(replicationServerInfo.getGenerationId(),
            replicationServerInfo.getServerState(),
            replicationServerInfo.getDegradedStatusThreshold(),
            this.getGenerationID());
            computeInitialServerStatus(electedRsInfo.getGenerationId(),
            electedRsInfo.getServerState(),
            electedRsInfo.getDegradedStatusThreshold(),
            getGenerationID());
          // Perform session start (handshake phase 2)
          TopologyMsg topologyMsg = performPhaseTwoHandshake(
            replicationServerInfo.getServerURL(), initStatus);
            electedRsInfo.getServerURL(), initStatus);
          if (topologyMsg != null) // Handshake phase 2 exchange went well
          {
            try
            {
              /*
               * If we just connected to a RS with a different group id than us
               * (because for instance a RS with our group id was unreachable
               * while connecting to each RS) but the just received TopologyMsg
               * shows that in the same time a RS with our group id connected,
               * we must give up the connection to force reconnection that will
               * certainly go back to a server with our group id as server with
               * our group id have a greater priority for connection (in
               * computeBestReplicationServer). In other words, we disconnect to
               * connect to a server with our group id. If a server with our
               * group id comes back later in the topology, we will be advised
               * upon reception of a new TopologyMsg message and we will force
               * reconnection at that time to retrieve a server with our group
               * id.
               */
              byte tmpRsGroupId = replicationServerInfo.getGroupId();
              boolean someServersWithSameGroupId =
                hasSomeServerWithSameGroupId(topologyMsg.getRsList());
              // Really no other server with our group id ?
              if ((tmpRsGroupId == groupId) || (!someServersWithSameGroupId))
              {
                replicationServer = session.getReadableRemoteAddress();
                maxSendWindow = replicationServerInfo.getWindowSize();
                rsGroupId = replicationServerInfo.getGroupId();
                rsServerId = replicationServerInfo.getServerId();
                rsServerUrl = replicationServerInfo.getServerURL();
                receiveTopo(topologyMsg);
                // Log a message to let the administrator know that the failure
                // was resolved.
                // Wakeup all the thread that were waiting on the window
                // on the previous connection.
                connectionError = false;
                if (sendWindow != null)
                {
                  /*
                   * Fix (hack) for OPENDJ-401: we want to ensure that no
                   * threads holding this semaphore will get blocked when they
                   * acquire it. However, we also need to make sure that we
                   * don't overflow the semaphore by releasing too many permits.
                   */
                  final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
                  if (sendWindow.availablePermits() < MAX_PERMITS)
                  {
                    /*
                     * At least 2^29 acquisitions would need to occur for this
                     * to be insufficient. In addition, at least 2^30 releases
                     * would need to occur for this to potentially overflow.
                     * Hopefully this is unlikely to happen.
                     */
                    sendWindow.release(MAX_PERMITS);
                  }
                }
                sendWindow = new Semaphore(maxSendWindow);
                rcvWindow = maxRcvWindow;
                connected = true;
                // May have created a broker with null replication domain for
                // unit test purpose.
                if (domain != null)
                {
                  domain.sessionInitiated(
                    initStatus, replicationServerInfo.getServerState(),
                    replicationServerInfo.getGenerationId(),
                    session);
                }
                if (getRsGroupId() != groupId)
                {
                  // Connected to replication server with wrong group id:
                  // warn user and start poller to recover when a server with
                  // right group id arrives...
                  Message message =
                    WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
                    Byte.toString(groupId), Integer.toString(rsServerId),
                    replicationServerInfo.getServerURL(),
                    Byte.toString(getRsGroupId()),
                    baseDn, Integer.toString(serverId));
                  logError(message);
                }
                startRSHeartBeatMonitoring();
                if (replicationServerInfo.getProtocolVersion() >=
                  ProtocolVersion.REPLICATION_PROTOCOL_V3)
                {
                  startChangeTimeHeartBeatPublishing();
                }
              } else
              {
                // Detected new RS with our group id: log disconnection to
                // inform administrator
                Message message = NOTE_NEW_SERVER_WITH_SAME_GROUP_ID.get(
                  Byte.toString(groupId), baseDn.toString(),
                  Integer.toString(serverId));
                logError(message);
                // Do not log connection error
                newServerWithSameGroupId = true;
              }
            } catch (Exception e)
            {
              Message message = ERR_COMPUTING_FAKE_OPS.get(
                baseDn, replicationServerInfo.getServerURL(),
                e.getLocalizedMessage() + stackTraceToSingleLineString(e));
              logError(message);
            } finally
            {
              if (connected == false)
              {
                ProtocolSession localSession = session;
                if (localSession != null)
                {
                  localSession.close();
                  session = null;
                }
              }
            }
            connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
          } // Could perform handshake phase 2 with best
        } // Could perform handshake phase 1 with best
@@ -1057,9 +937,8 @@
      {
        connectPhaseLock.notify();
        if ((replicationServerInfo.getGenerationId() ==
          this.getGenerationID()) ||
          (replicationServerInfo.getGenerationId() == -1))
        if ((electedRsInfo.getGenerationId() == getGenerationID())
            || (electedRsInfo.getGenerationId() == -1))
        {
          Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
              .get(serverId, rsServerId, baseDn,
@@ -1072,7 +951,7 @@
              .get(serverId, rsServerId, baseDn,
                  session.getReadableRemoteAddress(),
                  getGenerationID(),
                  replicationServerInfo.getGenerationId());
                  electedRsInfo.getGenerationId());
          logError(message);
        }
      } else
@@ -1081,7 +960,7 @@
         * This server could not find any replicationServer. It's going to start
         * in degraded mode. Log a message.
         */
        if (!connectionError && !newServerWithSameGroupId)
        if (!connectionError)
        {
          connectionError = true;
          connectPhaseLock.notify();
@@ -1107,17 +986,104 @@
  }
  /**
   * Has the passed RS info list some servers with our group id ?
   * @return true if at least one server has the same group id
   * Connects to a replication server.
   *
   * @param rsInfo
   *          the Replication Server to connect to
   * @param initStatus
   *          The status to enter the state machine with
   * @param topologyMsg
   *          the message containing the topology information
   */
  private boolean hasSomeServerWithSameGroupId(List<RSInfo> rsInfos)
  private void connectToReplicationServer(ReplicationServerInfo rsInfo,
      ServerStatus initStatus, TopologyMsg topologyMsg)
  {
    for (RSInfo rsInfo : rsInfos)
    try
    {
      if (rsInfo.getGroupId() == this.groupId)
        return true;
      replicationServer = session.getReadableRemoteAddress();
      maxSendWindow = rsInfo.getWindowSize();
      rsGroupId = rsInfo.getGroupId();
      rsServerId = rsInfo.getServerId();
      rsServerUrl = rsInfo.getServerURL();
      receiveTopo(topologyMsg);
      // Log a message to let the administrator know that the failure
      // was resolved.
      // Wakeup all the thread that were waiting on the window
      // on the previous connection.
      connectionError = false;
      if (sendWindow != null)
      {
        /*
         * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
         * this semaphore will get blocked when they acquire it. However, we
         * also need to make sure that we don't overflow the semaphore by
         * releasing too many permits.
         */
        final int MAX_PERMITS = (Integer.MAX_VALUE >>> 2);
        if (sendWindow.availablePermits() < MAX_PERMITS)
        {
          /*
           * At least 2^29 acquisitions would need to occur for this to be
           * insufficient. In addition, at least 2^30 releases would need to
           * occur for this to potentially overflow. Hopefully this is unlikely
           * to happen.
           */
          sendWindow.release(MAX_PERMITS);
        }
      }
      sendWindow = new Semaphore(maxSendWindow);
      rcvWindow = maxRcvWindow;
      connected = true;
      // May have created a broker with null replication domain for
      // unit test purpose.
      if (domain != null)
      {
        domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
            .getGenerationId(), session);
      }
      if (getRsGroupId() != groupId)
      {
        // Connected to replication server with wrong group id:
        // warn user and start poller to recover when a server with
        // right group id arrives...
        Message message =
            WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte
                .toString(groupId), Integer.toString(rsServerId), rsInfo
                .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer
                .toString(serverId));
        logError(message);
      }
      startRSHeartBeatMonitoring();
      if (rsInfo.getProtocolVersion() >=
        ProtocolVersion.REPLICATION_PROTOCOL_V3)
      {
        startChangeTimeHeartBeatPublishing();
      }
    }
    return false;
    catch (Exception e)
    {
      Message message =
          ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e
              .getLocalizedMessage()
              + stackTraceToSingleLineString(e));
      logError(message);
    }
    finally
    {
      if (connected == false)
      {
        ProtocolSession localSession = session;
        if (localSession != null)
        {
          localSession.close();
          session = null;
        }
      }
    }
  }
  /**