mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.55.2009 d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.service.ReplicationMonitor;
import org.opends.server.tasks.TaskUtils;
@@ -174,9 +175,62 @@
 */
public class LDAPReplicationDomain extends ReplicationDomain
       implements ConfigurationChangeListener<ReplicationDomainCfg>,
                  AlertGenerator, InternalSearchListener
                  AlertGenerator
{
  /**
   * This class is used in the session establishment phase
   * when no Replication Server with all the local changes has been found
   * and we therefore need to recover them.
   * A search is then performed on the database using this
   * internalSearchListener.
   */
  private class ScanSearchListener implements InternalSearchListener
  {
    private ChangeNumber startingChangeNumber = null;
    private ChangeNumber endChangeNumber = null;
    public ScanSearchListener(
        ChangeNumber startingChangeNumber,
        ChangeNumber endChangeNumber)
    {
      this.startingChangeNumber = startingChangeNumber;
      this.endChangeNumber = endChangeNumber;
    }
    @Override
    public void handleInternalSearchEntry(
        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
        throws DirectoryException
    {
      // Build the list of Operations that happened on this entry
      // after startingChangeNumber and before endChangeNumber and
      // add them to the replayOperations list
      Iterable<FakeOperation> updates =
        Historical.generateFakeOperations(searchEntry);
      for (FakeOperation op : updates)
      {
        ChangeNumber cn = op.getChangeNumber();
        if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
        {
          synchronized (replayOperations)
          {
            replayOperations.put(cn, op);
          }
        }
      }
    }
    @Override
    public void handleInternalSearchReference(
        InternalSearchOperation searchOperation,
        SearchResultReference searchReference) throws DirectoryException
    {
       // Nothing to do.
    }
  }
  /**
   * The fully-qualified name of this class.
   */
  private static final String CLASS_NAME =
@@ -398,6 +452,80 @@
  }
  /**
   * The thread that is responsible to update the RS to which this domain is
   * connected in case it is late and there is no RS which is up to date.
   */
  private class RSUpdater extends DirectoryThread
  {
    private ChangeNumber startChangeNumber;
    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
    {
      super("Replication Server Updater for server id " +
            serverId + " and domain " + baseDn.toString());
      this.startChangeNumber = replServerMaxChangeNumber;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // Replication server is missing some of our changes: let's
      // send them to him.
      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
      logError(message);
      /*
       * Get all the changes that have not been seen by this
       * replication server and publish them.
       */
      try
      {
        if (buildAndPublishMissingChanges(startChangeNumber, broker))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
          synchronized(replayOperations)
          {
            replayOperations.clear();
          }
        }
        else
        {
          /*
           * An error happened trying to search for the updates
           * This server will start accepting again new updates but
           * some inconsistencies will stay between servers.
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
        }
      } catch (Exception e)
      {
        /*
         * An error happened trying to search for the updates
         * This server will start accepting again new updates but
         * some inconsistencies will stay between servers.
         * Log an error for the repair tool
         * that will need to re-synchronize the servers.
         */
        message = ERR_CANNOT_RECOVER_CHANGES.get(
            baseDn.toNormalizedString());
        logError(message);
      }
      finally
      {
        broker.setRecoveryRequired(false);
      }
    }
  }
  /**
   * Creates a new ReplicationDomain using configuration from configEntry.
   *
   * @param configuration    The configuration of this ReplicationDomain.
@@ -490,9 +618,6 @@
      saveGenerationId(generationId);
    }
    startPublishService(replicationServers, window, heartbeatInterval,
        configuration.getChangetimeHeartbeatInterval());
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on this replication domain.
@@ -505,6 +630,9 @@
    pendingChanges =
      new PendingChanges(generator, this);
    startPublishService(replicationServers, window, heartbeatInterval,
        configuration.getChangetimeHeartbeatInterval());
    remotePendingChanges = new RemotePendingChanges(getServerState());
    // listen for changes on the configuration
@@ -4356,74 +4484,9 @@
        if ((ourMaxChangeNumber != null) &&
            (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
        {
          // Replication server is missing some of our changes: let's
          // send them to him.
          Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
          logError(message);
          /*
           * Get all the changes that have not been seen by this
           * replication server and populate the replayOperations
           * list.
           */
          InternalSearchOperation op = searchForChangedEntries(
              baseDn, replServerMaxChangeNumber, this);
          if (op.getResultCode() != ResultCode.SUCCESS)
          {
            /*
             * An error happened trying to search for the updates
             * This server will start accepting again new updates but
             * some inconsistencies will stay between servers.
             * Log an error for the repair tool
             * that will need to re-synchronize the servers.
             */
            message = ERR_CANNOT_RECOVER_CHANGES.get(
                baseDn.toNormalizedString());
            logError(message);
          } else
          {
            for (FakeOperation replayOp :
              replayOperations.tailMap(replServerMaxChangeNumber).values())
            {
              ChangeNumber cn = replayOp.getChangeNumber();
              /*
               * Because the entry returned by the search operation
               * can contain old historical information, it is
               * possible that some of the FakeOperation are
               * actually older than the last ChangeNumber known by
               * the Replication Server.
               * In such case don't send the operation.
               */
              if (!cn.newer(replServerMaxChangeNumber))
              {
                continue;
              }
              /*
               * Check if the DeleteOperation has been abandoned before
               * being processed. This is necessary because the replayOperation
               *
               */
              if (replayOp instanceof FakeDelOperation)
              {
                FakeDelOperation delOp = (FakeDelOperation) replayOp;
                if (findEntryDN(delOp.getUUID()) != null)
                {
                  continue;
                }
              }
              message =
                DEBUG_SENDING_CHANGE.get(
                    replayOp.getChangeNumber().toString());
              logError(message);
              session.publish(replayOp.generateMessage());
            }
            message = DEBUG_CHANGES_SENT.get();
            logError(message);
          }
          replayOperations.clear();
          pendingChanges.setRecovering(true);
          broker.setRecoveryRequired(true);
          new RSUpdater(replServerMaxChangeNumber).start();
        }
      }
    } catch (Exception e)
@@ -4437,19 +4500,124 @@
  }
  /**
   * Build the list of changes that have been processed by this server
   * after the ChangeNumber given as a parameter and publish them
   * using the given session.
   *
   * @param startingChangeNumber  The ChangeNumber whe we need to start the
   *                              search
   * @param session               The session to use to publish the changes
   *
   * @return                      A boolean indicating he success of the
   *                              operation.
   * @throws Exception            if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(
      ChangeNumber startingChangeNumber,
      ReplicationBroker session)
      throws Exception
  {
    // Trim the changes in replayOperations that are older than
    // the startingChangeNumber.
    synchronized (replayOperations)
    {
      Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (it.next().olderOrEqual(startingChangeNumber))
        {
          it.remove();
        }
        else
        {
          break;
        }
      }
    }
    ChangeNumber lastRetrievedChange = null;
    long missingChangesDelta;
    InternalSearchOperation op;
    ChangeNumber currentStartChangeNumber = startingChangeNumber;
    do
    {
      lastRetrievedChange = null;
      // We can't do the search in one go because we need to
      // store the results so that we are sure we send the operations
      // in order and because the list might be large
      // So we search by interval of 10 seconds
      // and store the results in the replayOperations list
      // so that they are sorted before sending them.
      missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
      ChangeNumber endChangeNumber =
        new ChangeNumber(
            missingChangesDelta, 0xffffffff, serverId);
      ScanSearchListener listener =
        new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
      op = searchForChangedEntries(
          baseDn, currentStartChangeNumber, endChangeNumber, listener);
      // Publish and remove all the changes from the replayOperations list
      // that are older than the endChangeNumber.
      LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      synchronized (replayOperations)
      {
        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
        while (itOp.hasNext())
        {
          FakeOperation fakeOp = itOp.next();
          if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
              && state.cover(fakeOp.getChangeNumber()))
          {
            lastRetrievedChange = fakeOp.getChangeNumber();
            opsToSend.add(fakeOp);
            itOp.remove();
          }
          else
          {
            break;
          }
        }
      }
      for (FakeOperation opToSend : opsToSend)
      {
          session.publishRecovery(opToSend.generateMessage());
      }
      opsToSend.clear();
      if (lastRetrievedChange != null)
      {
        currentStartChangeNumber = lastRetrievedChange;
      }
      else
      {
        currentStartChangeNumber = endChangeNumber;
      }
    } while (pendingChanges.RecoveryUntil(lastRetrievedChange) &&
             (op.getResultCode().equals(ResultCode.SUCCESS)));
    return op.getResultCode().equals(ResultCode.SUCCESS);
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @param fromChangeNumber The ChangeNumber from which we want the changes
   * @param lastChangeNumber The max ChangeNumber that the search should return
   * @param resultListener   The listener that will process the entries returned
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    ChangeNumber lastChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
@@ -4457,8 +4625,16 @@
      InternalClientConnection.getRootConnection();
    Integer serverId = fromChangeNumber.getServerId();
    String maxValueForId = "ffffffffffffffff" +
      String.format("%04x", serverId) + "ffffffff";
    String maxValueForId;
    if (lastChangeNumber == null)
    {
      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
                      + "ffffffff";
    }
    else
    {
      maxValueForId = lastChangeNumber.toString();
    }
    LDAPFilter filter = LDAPFilter.decode(
       "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
@@ -4479,36 +4655,24 @@
  }
  /**
   * {@inheritDoc}
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public void handleInternalSearchEntry(
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
    /*
     * This call back is called at session establishment phase
     * for each entry that has been changed by this server and the changes
     * have not been sent to any Replication Server.
     * The role of this method is to build equivalent operation from
     * the historical information and add them in the replayOperations
     * table.
     */
    Iterable<FakeOperation> updates =
      Historical.generateFakeOperations(searchEntry);
    for (FakeOperation op : updates)
    {
      replayOperations.put(op.getChangeNumber(), op);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
  {
    // TODO to be implemented
    return searchForChangedEntries(
        baseDn, fromChangeNumber, null, resultListener);
  }