mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
25.35.2007 44789f3979a2303ba8501b6b97b880bb53f321fa
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/plugin/SynchronizationDomain.java
@@ -36,7 +36,7 @@
import static org.opends.server.messages.ConfigMessages.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.messages.SynchronizationMessages.*;
import static org.opends.server.messages.ReplicationMessages.*;
import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.StaticUtils.createEntry;
@@ -62,7 +62,6 @@
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
@@ -92,7 +91,7 @@
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.SynchronizationMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
@@ -118,16 +117,16 @@
/**
 *  This class implements the bulk part of the.of the Directory Server side
 *  of the synchronization code.
 *  of the replication code.
 *  It contains the root method for publishing a change,
 *  processing a change received from the changelog service,
 *  handle conflict resolution,
 *  handle protocol messages from the changelog server.
 */
public class SynchronizationDomain extends DirectoryThread
public class ReplicationDomain extends DirectoryThread
       implements ConfigurationChangeListener<MultimasterDomainCfg>
{
  private SynchronizationMonitor monitor;
  private ReplicationMonitor monitor;
  private ChangeNumberGenerator changeNumberGenerator;
  private ChangelogBroker broker;
@@ -151,7 +150,7 @@
  private int maxSendDelay = 0;
  /**
   * The time in milliseconds between heartbeats from the synchronization
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
@@ -166,7 +165,7 @@
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    SynchroLDIFInputStream ldifImportInputStream = null;
    ReplLDIFInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMessage.UNKNOWN_SERVER;
    // The source in the case of an import
@@ -253,9 +252,6 @@
  private DN baseDN;
  private List<ConfigAttribute> configAttributes =
                                          new ArrayList<ConfigAttribute>();
  private boolean shutdown = false;
  private InternalClientConnection conn =
@@ -270,15 +266,15 @@
  /**
   * Creates a new SynchronizationDomain using configuration from configEntry.
   * Creates a new ReplicationDomain using configuration from configEntry.
   *
   * @param configuration    The configuration of this SynchronizationDomain.
   * @param configuration    The configuration of this ReplicationDomain.
   * @throws ConfigException In case of invalid configuration.
   */
  public SynchronizationDomain(MultimasterDomainCfg configuration)
  public ReplicationDomain(MultimasterDomainCfg configuration)
    throws ConfigException
  {
    super("Synchronization flush");
    super("replication flush");
    // Read the configuration parameters.
    changelogServers = configuration.getChangelogServer();
@@ -314,15 +310,15 @@
    state = new PersistentServerState(baseDN);
    /*
     * Create a Synchronization monitor object responsible for publishing
     * Create a replication monitor object responsible for publishing
     * monitoring information below cn=monitor.
     */
    monitor = new SynchronizationMonitor(this);
    monitor = new ReplicationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the synchronization domain.
     * for each operation done on the replication domain.
     */
    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
@@ -358,9 +354,9 @@
  /**
   * Returns the base DN of this SynchronizationDomain.
   * Returns the base DN of this ReplicationDomain.
   *
   * @return The base DN of this SynchronizationDomain
   * @return The base DN of this ReplicationDomain
   */
  public DN getBaseDN()
  {
@@ -384,7 +380,7 @@
    if (ctx != null)
    {
      /*
       * This is a synchronization operation
       * This is a replication operation
       * Check that the modified entry has the same entryuuid
       * has was in the original message.
       */
@@ -409,8 +405,8 @@
    }
    else
    {
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      // There is no replication context attached to the operation
      // so this is not a replication operation.
      ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
      ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
@@ -460,7 +456,7 @@
            && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
        {
          // parentEntry has been renamed
          // Synchronization name conflict resolution is expected to fix that
          // replication name conflict resolution is expected to fix that
          // later in the flow
          addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
          return new SynchronizationProviderResult(false);
@@ -485,7 +481,7 @@
    if (ctx != null)
    {
      /*
       * This is a synchronization operation
       * This is a replication operation
       * Check that the modified entry has the same entryuuid
       * as was in the original message.
       */
@@ -524,8 +520,8 @@
    }
    else
    {
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      // There is no replication context attached to the operation
      // so this is not a replication operation.
      ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
      String newParentId = null;
      if (modifyDNOperation.getNewSuperior() != null)
@@ -557,8 +553,8 @@
    Entry modifiedEntry = modifyOperation.getModifiedEntry();
    if (ctx == null)
    {
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      // There is no replication context attached to the operation
      // so this is not a replication operation.
      ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
      String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
      if (modifiedEntryUUID == null)
@@ -614,7 +610,7 @@
  /**
   * The preOperation phase for the add Operation.
   * Its job is to generate the Synchronization context associated to the
   * Its job is to generate the replication context associated to the
   * operation. It is necessary to do it in this phase because contrary to
   * the other operations, the entry uid is not set when the handleConflict
   * phase is called.
@@ -642,7 +638,7 @@
      UpdateMessage update = null;
      while (update == null)
      {
        SynchronizationMessage msg;
        ReplicationMessage msg;
        try
        {
          msg = broker.receive();
@@ -784,7 +780,7 @@
    }
    UpdateMessage msg = null;
    // Note that a failed non-synchronization operation might not have a change
    // Note that a failed non-replication operation might not have a change
    // number.
    ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
@@ -792,7 +788,7 @@
    if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
    {
      // Generate a synchronization message for a successful non-synchronization
      // Generate a replication message for a successful non-replication
      // operation.
      msg = UpdateMessage.generateMsg(op, isAssured);
@@ -847,7 +843,7 @@
      }
      else if (!op.isSynchronizationOperation())
      {
        // Remove an unsuccessful non-synchronization operation from the pending
        // Remove an unsuccessful non-replication operation from the pending
        // changes list.
        if (curChangeNumber != null)
        {
@@ -878,7 +874,7 @@
  }
  /**
   * get the number of updates received by the synchronization plugin.
   * get the number of updates received by the replication plugin.
   *
   * @return the number of updates received
   */
@@ -888,7 +884,7 @@
  }
  /**
   * Get the number of updates sent by the synchronization plugin.
   * Get the number of updates sent by the replication plugin.
   *
   * @return the number of updates sent
   */
@@ -916,9 +912,9 @@
  }
  /**
   * get the number of updates replayed by the synchronization.
   * get the number of updates replayed by the replication.
   *
   * @return The number of updates replayed by the synchronization
   * @return The number of updates replayed by the replication
   */
  public int getNumProcessedUpdates()
  {
@@ -926,7 +922,7 @@
  }
  /**
   * get the number of updates replayed successfully by the synchronization.
   * get the number of updates replayed successfully by the replication.
   *
   * @return The number of updates replayed successfully
   */
@@ -1012,7 +1008,7 @@
  }
  /**
   * Shutdown this SynchronizationDomain.
   * Shutdown this ReplicationDomain.
   */
  public void shutdown()
  {
@@ -1684,8 +1680,8 @@
  }
  /**
   * Get the number of times the synchronization connection was lost.
   * @return The number of times the synchronization connection was lost.
   * Get the number of times the replication connection was lost.
   * @return The number of times the replication connection was lost.
   */
  public int getNumLostConnections()
  {
@@ -1703,8 +1699,8 @@
  }
  /**
   * Disable the Synchronization on this domain.
   * The session to the Synchronization server will be stopped.
   * Disable the replication on this domain.
   * The session to the replication server will be stopped.
   * The domain will not be destroyed but call to the pre-operation
   * methods will result in failure.
   * The listener threads will be destroyed.
@@ -1725,7 +1721,7 @@
  /**
   * Enable back the domain after a previous disable.
   * The domain will connect back to a Synchronization Server and
   * The domain will connect back to a replication Server and
   * will recreate threads to listen for messages from the Sycnhronization
   * server.
   * The ServerState will also be read again from the local database.
@@ -1774,13 +1770,13 @@
  /**
   * Receives bytes related to an entry in the context of an import to
   * initialize the domain (called by SynchronizationDomainLDIFInputStream).
   * initialize the domain (called by ReplLDIFInputStream).
   *
   * @return The bytes. Null when the Done or Err message has been received
   */
  public byte[] receiveEntryBytes()
  {
    SynchronizationMessage msg;
    ReplicationMessage msg;
    while (true)
    {
      try
@@ -1926,7 +1922,7 @@
      int    msgID   = MSGID_UNKNOWN_TYPE;
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE,
          "SynchronizationDomain/ " + message, msgID);
          "ReplicationDomain/ " + message, msgID);
    }
  }
@@ -1966,7 +1962,7 @@
          ResultCode.OTHER, message, msgID, null);
    }
    SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
    ReplLDIFOutputStream os = new ReplLDIFOutputStream(this);
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
@@ -2604,7 +2600,7 @@
      preBackendImport(this.backend, this.backendConfigEntry);
      DN[] baseDNs = {baseDN};
      ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
      ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
      importConfig =
        new LDIFImportConfig(ieContext.ldifImportInputStream);
      importConfig.setIncludeBranches(this.branches);
@@ -2679,23 +2675,23 @@
  }
  /**
   * Retrieves a synchronization domain based on the baseDN.
   * Retrieves a replication domain based on the baseDN.
   *
   * @param baseDN The baseDN of the domain to retrieve
   * @return The domain retrieved
   * @throws DirectoryException When an error occured.
   */
  public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
  public static ReplicationDomain retrievesReplicationDomain(DN baseDN)
  throws DirectoryException
  {
    SynchronizationDomain synchronizationDomain = null;
    ReplicationDomain replicationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider provider :
      DirectoryServer.getSynchronizationProviders())
    {
      if (!( provider instanceof MultimasterSynchronization))
      if (!( provider instanceof MultimasterReplication))
      {
        int msgID = MSGID_INVALID_PROVIDER;
        String message = getMessage(msgID);
@@ -2703,9 +2699,9 @@
            message, msgID);
      }
      // From the domainDN retrieves the synchronization domain
      SynchronizationDomain sdomain =
        MultimasterSynchronization.findDomain(baseDN, null);
      // From the domainDN retrieves the replication domain
      ReplicationDomain sdomain =
        MultimasterReplication.findDomain(baseDN, null);
      if (sdomain == null)
      {
        int msgID = MSGID_NO_MATCHING_DOMAIN;
@@ -2714,7 +2710,7 @@
            message, msgID);
      }
      if (synchronizationDomain != null)
      if (replicationDomain != null)
      {
        // Should never happen
        int msgID = MSGID_MULTIPLE_MATCHING_DOMAIN;
@@ -2722,9 +2718,9 @@
        throw new DirectoryException(ResultCode.OTHER,
            message, msgID);
      }
      synchronizationDomain = sdomain;
      replicationDomain = sdomain;
    }
    return synchronizationDomain;
    return replicationDomain;
  }
  /**
@@ -2791,7 +2787,7 @@
    // Check that there is not already a domain with the same DN
    // TODO : Check that the server id is a short
    DN dn = configuration.getSynchronizationDN();
    if (MultimasterSynchronization.findDomain(dn,null) != null)
    if (MultimasterReplication.findDomain(dn,null) != null)
    {
      String message = getMessage(MSGID_SYNC_INVALID_DN, dn.toString());
      unacceptableReasons.add(message);