mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
16.03.2008 ee5658e776839088da75a481df7a99f224aa8d14
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -276,10 +276,10 @@
        // According to assured sub-mode, prepare structures to keep track of
        // the acks we are interested in.
        AssuredMode assuredMode = update.getAssuredMode();
        if (assuredMode != AssuredMode.SAFE_DATA_MODE)
        if (assuredMode == AssuredMode.SAFE_DATA_MODE)
        {
          preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
        } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
        } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
        {
          preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
        } else
@@ -525,7 +525,15 @@
        for (ServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
            expectedServers.add(handler.getServerId());
            // No ack expected from a RS with different group id
          {
            if ((generationId > 0) &&
              (generationId == handler.getGenerationId()))
              // No ack expected from a RS with bad gen id
            {
              expectedServers.add(handler.getServerId());
            }
          }
        }
      }
@@ -538,13 +546,29 @@
          continue;
        }
        if (handler.getGroupId() == groupId)
          // No ack expected from a DS with different group id
        {
          if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
          ServerStatus serverStatus = handler.getStatus();
          if (serverStatus == ServerStatus.NORMAL_STATUS)
          {
            expectedServers.add(handler.getServerId());
          } else
            // No ack expected from a DS with wrong status
          {
            wrongStatusServers.add(handler.getServerId());
            if (serverStatus == ServerStatus.DEGRADED_STATUS)
            {
              wrongStatusServers.add(handler.getServerId());
            } else
            {
              /*
               * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
               * We do not want this to be reported as an error to the update
               * maker -> no pollution or potential missunderstanding when
               * reading logs or monitoring and it was just administration (for
               * instance new server is being configured in topo: it goes in bad
               * gen then then full full update).
               */
            }
          }
        }
      }
@@ -589,7 +613,7 @@
    UpdateMsg update, ServerHandler sourceHandler) throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    boolean interestedInAcks = true;
    boolean interestedInAcks = false;
    byte safeDataLevel = update.getSafeDataLevel();
    byte groupId = replicationServer.getGroupId();
    byte sourceGroupId = sourceHandler.getGroupId();
@@ -600,47 +624,55 @@
        Short.toString(replicationServer.getServerId()),
        Byte.toString(safeDataLevel), baseDn, update.toString());
      logError(errorMsg);
      interestedInAcks = false;
    } else if (sourceGroupId != groupId)
    {
      // Assured feature does not cross different group ids
      interestedInAcks = false;
    } else
    {
      if (sourceHandler.isLDAPserver())
      if ((generationId > 0) &&
        (generationId == sourceHandler.getGenerationId()))
        // Ignore assured updates from wrong generationid servers
      {
        if (safeDataLevel == (byte) 1)
        if (sourceHandler.isLDAPserver())
        {
          // Immediatly return the ack for an assured message in safe data mode
          // with safe data level 1, coming from a DS. No need to wait for more
          // acks
          AckMsg ack = new AckMsg(cn);
          sourceHandler.sendAck(ack);
          interestedInAcks = false; // No further acks to obtain
          if (safeDataLevel == (byte) 1)
          {
            // Immediatly return the ack for an assured message in safe data
            // mode with safe data level 1, coming from a DS. No need to wait
            // for more acks
            AckMsg ack = new AckMsg(cn);
            sourceHandler.sendAck(ack);
          } else
          {
            if (safeDataLevel != (byte) 0)
            {
              // level > 1 : We need further acks
              // The message will be posted in assured mode to elligible
              // servers. The embedded safe data level is not changed, and his
              // value will be used by a remote RS to determine if he must send
              // an ack (level > 1) or not (level = 1)
              interestedInAcks = true;
            } else
            {
              // Should never happen
            }
          }
        } else
        {
          // level > 1 : We need further acks
          // The message will be posted in assured mode to elligible servers.
          // The embedded safe data level is not changed, and his value will be
          // used by a remote RS to determine if he must send an ack (level > 1)
          // or not (level = 1)
        }
      } else
      { // A RS sent us the safe data message, for sure no futher acks to wait
        interestedInAcks = false;
        if (safeDataLevel == (byte) 1)
        {
          // The original level was 1 so the RS that sent us this message should
          // have already sent his ack to the sender DS. Level 1 has already
          // been reached so no further acks to wait
          // This should not happen in theory as the sender RS server should
          // have sent us a matching not assured message so we should not come
          // to here.
        } else
        {
          // level > 1, so Ack this message to originator RS
          AckMsg ack = new AckMsg(cn);
          sourceHandler.sendAck(ack);
        { // A RS sent us the safe data message, for sure no futher acks to wait
          if (safeDataLevel == (byte) 1)
          {
            // The original level was 1 so the RS that sent us this message
            // should have already sent his ack to the sender DS. Level 1 has
            // already been reached so no further acks to wait.
            // This should not happen in theory as the sender RS server should
            // have sent us a matching not assured message so we should not come
            // to here.
          } else
          {
            // level > 1, so Ack this message to originator RS
            AckMsg ack = new AckMsg(cn);
            sourceHandler.sendAck(ack);
          }
        }
      }
    }
@@ -654,39 +686,46 @@
        for (ServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
            expectedServers.add(handler.getServerId());
            // No ack expected from a RS with different group id
          {
            if ((generationId > 0) &&
              (generationId == handler.getGenerationId()))
              // No ack expected from a RS with bad gen id
            {
              expectedServers.add(handler.getServerId());
            }
          }
        }
      }
      // Look for DS elligible for assured
      for (ServerHandler handler : directoryServers.values())
      {
        // Don't forward the change to the server that just sent it
        if (handler == sourceHandler)
        {
          continue;
        }
        if (handler.getGroupId() == groupId)
          expectedServers.add(handler.getServerId());
      }
    }
    // Return computed structures
    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
    if (interestedInAcks && (expectedServers.size() > 0))
    int nExpectedServers = expectedServers.size();
    if (interestedInAcks) // interestedInAcks so level > 1
    {
      // Some other acks to wait for
      preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
        sourceHandler, update.getSafeDataLevel());
      preparedAssuredInfo.expectedServers = expectedServers;
    }
    if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
    {
      // level > 1 and source is a DS but no elligible servers found, send the
      // ack immediatly
      AckMsg ack = new AckMsg(cn);
      sourceHandler.sendAck(ack);
      if (nExpectedServers > 0)
      {
        // Some other acks to wait for
        int sdl = update.getSafeDataLevel();
        int neededAdditionalServers = sdl - 1;
        // Change the number of expected acks if not enough available elligible
        // servers: the level is a best effort thing, we do not want to timeout
        // at every assured SD update for instance if a RS has had his gen id
        // resetted
        byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
          (byte)sdl : // Keep level as it was
          (byte)(nExpectedServers+1)); // Change level to match what's available
        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
          sourceHandler, finalSdl, expectedServers);
        preparedAssuredInfo.expectedServers = expectedServers;
      } else
      {
        // level > 1 and source is a DS but no elligible servers found, send the
        // ack immediatly
        AckMsg ack = new AckMsg(cn);
        sourceHandler.sendAck(ack);
      }
    }
    return preparedAssuredInfo;
@@ -1436,6 +1475,9 @@
   */
  public void shutdown()
  {
    // Terminate the assured timer
    assuredTimeoutTimer.cancel();
    // Close session with other changelogs
    for (ServerHandler serverHandler : replicationServers.values())
    {