mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
18.18.2008 4c5133331fbc83cb3fd98f0c100104b7f9bf0359
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -51,6 +51,7 @@
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
@@ -172,18 +173,16 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * on shutdown, the server will wait for existing threads to stop
   * during this timeout (in ms).
   */
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  private ReplicationMonitor monitor;
  private ReplicationBroker broker;
  private List<ListenerThread> synchroThreads =
    new ArrayList<ListenerThread>();
  // Thread waiting for incoming update messages for this domain and pushing
  // them to the global incoming update message queue for later processing by
  // replay threads.
  private ListenerThread listenerThread;
  // The update to replay message queue where the listener thread is going to
  // push incoming update messages.
  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
  private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMessage>();
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
@@ -233,8 +232,6 @@
  // Null when none is being processed.
  private IEContext ieContext = null;
  private int listenerThreadNumber = 10;
  private Collection<String> replicationServers;
  private DN baseDN;
@@ -355,12 +352,59 @@
  }
  /**
   * This thread is launched when we want to export data to another server that
   * has requested to be initialized with the data of our backend.
   */
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will receive updates
    private short target;
    /**
     * Constructor for the ExportThread.
     *
     * @param target Id of server that will receive updates
     */
    public ExportThread(short target)
    {
      super("Export thread");
      this.target = target;
    }
    /**
     * Run method for this class.
     */
    public void run()
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread starting.");
      }
      try
      {
        initializeRemote(target, target, null);
      } catch (DirectoryException de)
      {
      // An error message has been sent to the peer
      // Nothing more to do locally
      }
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread stopping.");
      }
    }
  }
  /**
   * Creates a new ReplicationDomain using configuration from configEntry.
   *
   * @param configuration    The configuration of this ReplicationDomain.
   * @param updateToReplayQueue The queue for update messages to replay.
   * @throws ConfigException In case of invalid configuration.
   */
  public ReplicationDomain(ReplicationDomainCfg configuration)
  public ReplicationDomain(ReplicationDomainCfg configuration,
    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
    throws ConfigException
  {
    super("replicationDomain_" + configuration.getBaseDN());
@@ -373,6 +417,7 @@
    heartbeatInterval = configuration.getHeartbeatInterval();
    isolationpolicy = configuration.getIsolationPolicy();
    configDn = configuration.dn();
    this.updateToReplayQueue = updateToReplayQueue;
    /*
     * Modify conflicts are solved for all suffixes but the schema suffix
@@ -819,122 +864,112 @@
   */
  public UpdateMessage receive()
  {
    UpdateMessage update = remotePendingChanges.getNextUpdate();
    UpdateMessage update = null;
    if (update == null)
    while (update == null)
    {
      while (update == null)
      InitializeRequestMessage initMsg = null;
      synchronized (broker)
      {
        InitializeRequestMessage initMsg = null;
        synchronized (broker)
        ReplicationMessage msg;
        try
        {
          ReplicationMessage msg;
          try
          msg = broker.receive();
          if (msg == null)
          {
            msg = broker.receive();
            if (msg == null)
            {
              // The server is in the shutdown process
              return null;
            }
            // The server is in the shutdown process
            return null;
          }
            if (debugEnabled())
              if (!(msg instanceof HeartbeatMessage))
                TRACER.debugVerbose("Message received <" + msg + ">");
          if (debugEnabled())
            if (!(msg instanceof HeartbeatMessage))
              TRACER.debugVerbose("Message received <" + msg + ">");
            if (msg instanceof AckMessage)
            {
              AckMessage ack = (AckMessage) msg;
              receiveAck(ack);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
            receiveAck(ack);
            }
            else if (msg instanceof InitializeRequestMessage)
            {
              // Another server requests us to provide entries
              // for a total update
          {
            // Another server requests us to provide entries
            // for a total update
              initMsg = (InitializeRequestMessage)msg;
            }
            else if (msg instanceof InitializeTargetMessage)
            {
              // Another server is exporting its entries to us
              InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
              try
              {
                // This must be done while we are still holding the
                // broker lock because we are now going to receive a
                // bunch of entries from the remote server and we
                // want the import thread to catch them and
                // not the ListenerThread.
                initialize(importMsg);
            try
            {
              // This must be done while we are still holding the
              // broker lock because we are now going to receive a
              // bunch of entries from the remote server and we
              // want the import thread to catch them and
              // not the ListenerThread.
              initialize(importMsg);
              }
              catch(DirectoryException de)
              {
                // Returns an error message to notify the sender
                ErrorMessage errorMsg =
                  new ErrorMessage(importMsg.getsenderID(),
                                   de.getMessageObject());
                MessageBuilder mb = new MessageBuilder();
                mb.append(de.getMessageObject());
                TRACER.debugInfo(Message.toString(mb.toMessage()));
                broker.publish(errorMsg);
              }
            {
              // Returns an error message to notify the sender
              ErrorMessage errorMsg =
                new ErrorMessage(importMsg.getsenderID(),
                de.getMessageObject());
              MessageBuilder mb = new MessageBuilder();
              mb.append(de.getMessageObject());
              TRACER.debugInfo(Message.toString(mb.toMessage()));
              broker.publish(errorMsg);
            }
            }
            else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              if (ieContext != null)
              {
                // This is an error termination for the 2 following cases :
                // - either during an export
                // - or before an import really started
                //   For example, when we publish a request and the
                //  replicationServer did not find any import source.
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
              else
              {
                /* We can receive an error message from the replication server
                 * in the following cases :
                 * - we connected with an incorrect generation id
                 */
            {
              /* We can receive an error message from the replication server
               * in the following cases :
               * - we connected with an incorrect generation id
               */
                ErrorMessage errorMsg = (ErrorMessage)msg;
                logError(ERR_ERROR_MSG_RECEIVED.get(
                           errorMsg.getDetails()));
              }
              logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
            }
            }
            else if (msg instanceof UpdateMessage)
            {
              update = (UpdateMessage) msg;
              receiveUpdate(update);
            }
          {
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          }
          catch (SocketTimeoutException e)
          {
            // just retry
          }
        }
        // Test if we have received and export request message and
        // if that's the case handle it now.
        // This must be done outside of the portion of code protected
        // by the broker lock so that we keep receiveing update
        // when we are doing and export and so that a possible
        // closure of the socket happening when we are publishing the
        // entries to the remote can be handled by the other
        // ListenerThread when they call this method and therefore the
        // broker.receive() method.
        if (initMsg != null)
        {
          try
          {
            initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
                null);
          }
          catch(DirectoryException de)
          {
            // An error message has been sent to the peer
            // Nothing more to do locally
          }
        // just retry
        }
      }
      // Test if we have received and export request message and
      // if that's the case handle it now.
      // This must be done outside of the portion of code protected
      // by the broker lock so that we keep receiveing update
      // when we are doing and export and so that a possible
      // closure of the socket happening when we are publishing the
      // entries to the remote can be handled by the other
      // replay thread when they call this method and therefore the
      // broker.receive() method.
      if (initMsg != null)
      {
        // Do this work in a thread to allow replay thread continue working
        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
        exportThread.start();
      }
    }
    return update;
  }
@@ -1190,10 +1225,9 @@
  @Override
  public void run()
  {
    /*
     * create the threads that will wait for incoming changes.
     */
    createListeners();
    // Create the listener thread
    listenerThread = new ListenerThread(this, updateToReplayQueue);
    listenerThread.start();
    while (shutdown  == false)
    {
@@ -1217,28 +1251,6 @@
  }
  /**
   * create the threads that will wait for incoming changes.
   * TODO : should use a pool of threads shared between all the servers
   * TODO : need to make number of thread configurable
   */
  private void createListeners()
  {
    synchronized (synchroThreads)
    {
      if (!shutdown)
      {
        synchroThreads.clear();
        for (int i=0; i<listenerThreadNumber; i++)
        {
          ListenerThread myThread = new ListenerThread(this);
          myThread.start();
          synchroThreads.add(myThread);
        }
      }
    }
  }
  /**
   * Shutdown this ReplicationDomain.
   */
  public void shutdown()
@@ -1246,14 +1258,8 @@
    // stop the flush thread
    shutdown = true;
    synchronized (synchroThreads)
    {
      // stop the listener threads
      for (ListenerThread thread : synchroThreads)
      {
        thread.shutdown();
      }
    }
    // Stop the listener thread
    listenerThread.shutdown();
    synchronized (this)
    {
@@ -1267,11 +1273,8 @@
    // stop the ReplicationBroker
    broker.stop();
    //  wait for the listener thread to stop
    for (ListenerThread thread : synchroThreads)
    {
      thread.waitForShutdown();
    }
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
    // wait for completion of the persistentServerState thread.
    try
@@ -1315,140 +1318,148 @@
    int retryCount = 10;
    boolean firstTry = true;
    try
    // Try replay the operation, then flush (replaying) any pending operation
    // whose dependency has been replayed until no more left.
    do
    {
      while ((!dependency) && (!done) && (retryCount-- > 0))
      try
      {
        op = msg.createOperation(conn);
        op.setInternalOperation(true);
        op.setSynchronizationOperation(true);
        changeNumber = OperationContext.getChangeNumber(op);
        ((AbstractOperation)op).run();
        ResultCode result = op.getResultCode();
        if (result != ResultCode.SUCCESS)
        while ((!dependency) && (!done) && (retryCount-- > 0))
        {
          if (op instanceof ModifyOperation)
          {
            ModifyOperation newOp = (ModifyOperation) op;
            dependency = remotePendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            dependency = remotePendingChanges.checkDependencies(newOp);
            if ((!dependency) && (!firstTry))
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof AddOperation)
          {
            AddOperation newOp = (AddOperation) op;
            AddMsg addMsg = (AddMsg) msg;
            dependency = remotePendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, addMsg);
            }
          }
          else if (op instanceof ModifyDNOperationBasis)
          {
            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
            dependency = remotePendingChanges.checkDependencies(newMsg);
            if (!dependency)
            {
              ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
              done = solveNamingConflict(newOp, msg);
            }
          }
          else
          {
            done = true;  // unknown type of operation ?!
          }
          if (done)
          {
            // the update became a dummy update and the result
            // of the conflict resolution phase is to do nothing.
            // however we still need to push this change to the serverState
            updateError(changeNumber);
          }
        }
        else
        {
          done = true;
        }
        firstTry = false;
      }
          op = msg.createOperation(conn);
      if (!done && !dependency)
      {
        // Continue with the next change but the servers could now become
        // inconsistent.
        // Let the repair tool know about this.
        Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
          op.setInternalOperation(true);
          op.setSynchronizationOperation(true);
          changeNumber = OperationContext.getChangeNumber(op);
          ((AbstractOperation) op).run();
          // Try replay the operation
          ResultCode result = op.getResultCode();
          if (result != ResultCode.SUCCESS)
          {
            if (op instanceof ModifyOperation)
            {
              ModifyOperation newOp = (ModifyOperation) op;
              dependency = remotePendingChanges.checkDependencies(newOp);
              if (!dependency)
              {
                done = solveNamingConflict(newOp, msg);
              }
            } else if (op instanceof DeleteOperation)
            {
              DeleteOperation newOp = (DeleteOperation) op;
              dependency = remotePendingChanges.checkDependencies(newOp);
              if ((!dependency) && (!firstTry))
              {
                done = solveNamingConflict(newOp, msg);
              }
            } else if (op instanceof AddOperation)
            {
              AddOperation newOp = (AddOperation) op;
              AddMsg addMsg = (AddMsg) msg;
              dependency = remotePendingChanges.checkDependencies(newOp);
              if (!dependency)
              {
                done = solveNamingConflict(newOp, addMsg);
              }
            } else if (op instanceof ModifyDNOperationBasis)
            {
              ModifyDNMsg newMsg = (ModifyDNMsg) msg;
              dependency = remotePendingChanges.checkDependencies(newMsg);
              if (!dependency)
              {
                ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
                done = solveNamingConflict(newOp, msg);
              }
            } else
            {
              done = true;  // unknown type of operation ?!
            }
            if (done)
            {
              // the update became a dummy update and the result
              // of the conflict resolution phase is to do nothing.
              // however we still need to push this change to the serverState
              updateError(changeNumber);
            }
          } else
          {
            done = true;
          }
          firstTry = false;
        }
        if (!done && !dependency)
        {
          // Continue with the next change but the servers could now become
          // inconsistent.
          // Let the repair tool know about this.
          Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
            op.getErrorMessage().toString());
        logError(message);
        numUnresolvedNamingConflicts.incrementAndGet();
          logError(message);
          numUnresolvedNamingConflicts.incrementAndGet();
        updateError(changeNumber);
      }
    }
    catch (ASN1Exception e)
    {
      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
              String.valueOf(msg) + stackTraceToSingleLineString(e));
      logError(message);
    }
    catch (LDAPException e)
    {
      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
              String.valueOf(msg) + stackTraceToSingleLineString(e));
      logError(message);
    }
    catch (DataFormatException e)
    {
      Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
              String.valueOf(msg) + stackTraceToSingleLineString(e));
      logError(message);
    }
    catch (Exception e)
    {
      if (changeNumber != null)
      {
        /*
         * An Exception happened during the replay process.
         * Continue with the next change but the servers will now start
         * to be inconsistent.
         * Let the repair tool know about this.
         */
        Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
            stackTraceToSingleLineString(e), op.toString());
        logError(message);
        updateError(changeNumber);
      }
      else
          updateError(changeNumber);
        }
      } catch (ASN1Exception e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
                String.valueOf(msg) + stackTraceToSingleLineString(e));
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
      }
    }
    finally
    {
      if (!dependency)
      } catch (LDAPException e)
      {
        if (msg.isAssured())
          ack(msg.getChangeNumber());
        incProcessedUpdates();
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
      } catch (DataFormatException e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
      } catch (Exception e)
      {
        if (changeNumber != null)
        {
          /*
           * An Exception happened during the replay process.
           * Continue with the next change but the servers will now start
           * to be inconsistent.
           * Let the repair tool know about this.
           */
          Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
            stackTraceToSingleLineString(e), op.toString());
          logError(message);
          updateError(changeNumber);
        } else
        {
          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
            String.valueOf(msg) + stackTraceToSingleLineString(e));
          logError(message);
        }
      } finally
      {
        if (!dependency)
        {
          if (msg.isAssured())
            ack(msg.getChangeNumber());
          incProcessedUpdates();
        }
      }
    }
      // Now replay any pending update that had a dependency and whose
      // dependency has been replayed, do that until no more updates of that
      // type left...
      msg = remotePendingChanges.getNextUpdate();
      // Prepare restart of loop
      done = false;
      dependency = false;
      changeNumber = null;
      retryCount = 10;
      firstTry = true;
    } while (msg != null);
  }
  /**
@@ -2224,7 +2235,7 @@
   * The session to the replication server will be stopped.
   * The domain will not be destroyed but call to the pre-operation
   * methods will result in failure.
   * The listener threads will be destroyed.
   * The listener thread will be destroyed.
   * The monitor informations will still be accessible.
   */
  public void disable()
@@ -2232,23 +2243,14 @@
    state.save();
    state.clearInMemory();
    disabled = true;
    //  stop the listener threads
    for (ListenerThread thread : synchroThreads)
    {
      thread.shutdown();
    }
    broker.stop(); // this will cut the session and wake-up the listeners
    for (ListenerThread thread : synchroThreads)
    {
      try
      {
        thread.join(SHUTDOWN_JOIN_TIMEOUT);
      } catch (InterruptedException e)
      {
        // ignore
      }
    }
    // Stop the listener thread
    listenerThread.shutdown();
    broker.stop(); // This will cut the session and wake up the listener
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
  }
  /**
@@ -2265,7 +2267,6 @@
    state.loadState();
    disabled = false;
    try
    {
      generationId = loadGenerationId();
@@ -2288,7 +2289,9 @@
    broker.start(replicationServers);
    createListeners();
    // Create the listener thread
    listenerThread = new ListenerThread(this, updateToReplayQueue);
    listenerThread.start();
  }
  /**