mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
17.47.2009 9b581fcaa5c5cab99677a3fd043c62260f4dfa7c

The changes contained in the RS have been sent by the DS that connected to it when this DS compared its own state with the state of the RS, and decided to update it with its
missing changes contained in the ds-sync-hist attribute of the changed entries.

As described at
https://www.opends.org/wiki/page/EffectsOfReplicationServerCrashes
the case where a DS updates an empty RS is not a realistic case because it is valid only in a corner case .. and could lead to an update of a massive unexpected number of
changes sent to the RS in all other cases.

So the fix here consists in testing in the DS that the RS is not empty before deciding to send the missing changes.

1 files modified
132 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 132 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4192,84 +4192,90 @@
      ChangeNumber replServerMaxChangeNumber =
        replicationServerState.getMaxChangeNumber(serverId);
      if (replServerMaxChangeNumber == null)
      // we don't want to update from here (a DS) an empty RS because
      // normally the RS should have been updated by other RSes except for
      // very last changes lost if the local connection was broken
      // ... hence the RS we are connected to should not be empty
      // ... or if it is empty, it is due to a volontary reset
      // and we don't want to update it with our changes that could be huge.
      if ((replServerMaxChangeNumber != null) &&
          (replServerMaxChangeNumber.getSeqnum()!=0))
      {
        replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId);
      }
      ChangeNumber ourMaxChangeNumber =
        state.getMaxChangeNumber(serverId);
        ChangeNumber ourMaxChangeNumber =
          state.getMaxChangeNumber(serverId);
      if ((ourMaxChangeNumber != null) &&
          (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
      {
        // Replication server is missing some of our changes: let's
        // send them to him.
        Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
        logError(message);
        /*
         * Get all the changes that have not been seen by this
         * replication server and populate the replayOperations
         * list.
         */
        InternalSearchOperation op = searchForChangedEntries(
            baseDn, replServerMaxChangeNumber, this);
        if (op.getResultCode() != ResultCode.SUCCESS)
        if ((ourMaxChangeNumber != null) &&
            (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
        {
          /*
           * An error happened trying to search for the updates
           * This server will start accepting again new updates but
           * some inconsistencies will stay between servers.
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(
              baseDn.toNormalizedString());
          // Replication server is missing some of our changes: let's
          // send them to him.
          Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
          logError(message);
        } else
        {
          for (FakeOperation replayOp :
            replayOperations.tailMap(replServerMaxChangeNumber).values())
          {
            ChangeNumber cn = replayOp.getChangeNumber();
            /*
             * Because the entry returned by the search operation
             * can contain old historical information, it is
             * possible that some of the FakeOperation are
             * actually older than the last ChangeNumber known by
             * the Replication Server.
             * In such case don't send the operation.
             */
            if (!cn.newer(replServerMaxChangeNumber))
            {
              continue;
            }
          /*
           * Get all the changes that have not been seen by this
           * replication server and populate the replayOperations
           * list.
           */
          InternalSearchOperation op = searchForChangedEntries(
              baseDn, replServerMaxChangeNumber, this);
          if (op.getResultCode() != ResultCode.SUCCESS)
          {
            /*
             * Check if the DeleteOperation has been abandoned before
             * being processed. This is necessary because the replayOperation
             *
             * An error happened trying to search for the updates
             * This server will start accepting again new updates but
             * some inconsistencies will stay between servers.
             * Log an error for the repair tool
             * that will need to re-synchronize the servers.
             */
            if (replayOp instanceof FakeDelOperation)
            message = ERR_CANNOT_RECOVER_CHANGES.get(
                baseDn.toNormalizedString());
            logError(message);
          } else
          {
            for (FakeOperation replayOp :
              replayOperations.tailMap(replServerMaxChangeNumber).values())
            {
              FakeDelOperation delOp = (FakeDelOperation) replayOp;
              if (findEntryDN(delOp.getUUID()) != null)
              ChangeNumber cn = replayOp.getChangeNumber();
              /*
               * Because the entry returned by the search operation
               * can contain old historical information, it is
               * possible that some of the FakeOperation are
               * actually older than the last ChangeNumber known by
               * the Replication Server.
               * In such case don't send the operation.
               */
              if (!cn.newer(replServerMaxChangeNumber))
              {
                continue;
              }
              /*
               * Check if the DeleteOperation has been abandoned before
               * being processed. This is necessary because the replayOperation
               *
               */
              if (replayOp instanceof FakeDelOperation)
              {
                FakeDelOperation delOp = (FakeDelOperation) replayOp;
                if (findEntryDN(delOp.getUUID()) != null)
                {
                  continue;
                }
              }
              message =
                DEBUG_SENDING_CHANGE.get(
                    replayOp.getChangeNumber().toString());
              logError(message);
              session.publish(replayOp.generateMessage());
            }
            message =
              DEBUG_SENDING_CHANGE.get(
                  replayOp.getChangeNumber().toString());
            message = DEBUG_CHANGES_SENT.get();
            logError(message);
            session.publish(replayOp.generateMessage());
          }
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
          replayOperations.clear();
        }
        replayOperations.clear();
      }
    } catch (Exception e)
    {