mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.19.2014 f948474a8031c24160da4b31f0b97354456b40ad
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -41,13 +41,13 @@
import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.types.operation.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -67,9 +67,10 @@
                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                  ExportTaskListener
{
  private ReplicationServerListener replicationServerListener = null;
  private ReplicationServerListener replicationServerListener;
  private static final Map<DN, LDAPReplicationDomain> domains =
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
      new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
  /**
   * The queue of received update messages, to be treated by the ReplayThread
@@ -113,8 +114,7 @@
   *                   Can be null is the request has no associated operation.
   * @return           The domain for this DN.
   */
  public static LDAPReplicationDomain findDomain(
      DN dn, PluginOperation pluginOp)
  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
  {
    /*
     * Don't run the special replication code on Operation that are
@@ -124,7 +124,9 @@
    {
        final Operation op = (Operation) pluginOp;
        if (op.dontSynchronize())
        {
          return null;
        }
        /*
         * Check if the provided operation is a repair operation and set the
@@ -181,8 +183,8 @@
  {
    try
    {
      LDAPReplicationDomain domain =
          new LDAPReplicationDomain(configuration, updateToReplayQueue);
      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
          configuration, updateToReplayQueue, dsrsShutdownSync);
      if (domains.size() == 0)
      {
        // Create the threads that will process incoming update messages
@@ -218,9 +220,8 @@
      BlockingQueue<UpdateToReplay> queue)
      throws ConfigException
  {
    LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue);
    final LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
    domains.put(domain.getBaseDN(), domain);
    return domain;
  }
@@ -246,35 +247,30 @@
  /** {@inheritDoc} */
  @Override
  public void initializeSynchronizationProvider(
      ReplicationSynchronizationProviderCfg configuration)
  throws ConfigException
      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
  {
    domains.clear();
    replicationServerListener = new ReplicationServerListener(configuration);
    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
    // Register as an add and delete listener with the root configuration so we
    // can be notified if Multimaster domain entries are added or removed.
    configuration.addReplicationDomainAddListener(this);
    configuration.addReplicationDomainDeleteListener(this);
    cfg.addReplicationDomainAddListener(this);
    cfg.addReplicationDomainDeleteListener(this);
    // Register as a root configuration listener so that we can be notified if
    // number of replay threads is changed and apply changes.
    configuration.addReplicationChangeListener(this);
    cfg.addReplicationChangeListener(this);
    replayThreadNumber = configuration.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
        Integer.MAX_VALUE);
    replayThreadNumber = cfg.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    for (String name : cfg.listReplicationDomains())
    {
      createNewDomain(configuration.getReplicationDomain(name));
      createNewDomain(cfg.getReplicationDomain(name));
    }
    /*
     * If any schema changes were made with the server offline, then handle them
     * now.
     */
    // If any schema changes were made with the server offline, then handle them now.
    List<Modification> offlineSchemaChanges =
         DirectoryServer.getOfflineSchemaChanges();
    if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -402,12 +398,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyOperation modifyOperation)
  {
    LDAPReplicationDomain domain =
      findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyOperation);
    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -415,12 +411,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationAddOperation addOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(addOperation);
    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(addOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -428,12 +424,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationDeleteOperation deleteOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(deleteOperation);
    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(deleteOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -441,12 +437,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyDNOperation);
    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyDNOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -505,7 +501,9 @@
    LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
    if (domain == null || !domain.solveConflict())
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // The historical object is retrieved from the attachment created
    // in the HandleConflictResolution phase.
@@ -537,11 +535,15 @@
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // For LOCAL op only, generate CSN and attach Context
    if (!addOperation.isSynchronizationOperation())
    {
      domain.doPreOperation(addOperation);
    }
    // Add to the operation the historical attribute : "dn:changeNumber:add"
    EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -564,7 +566,9 @@
    stopReplayThreads();
    if (replicationServerListener != null)
    {
      replicationServerListener.shutdown();
    }
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
@@ -585,10 +589,11 @@
  @Override
  public void processSchemaChange(List<Modification> modifications)
  {
    LDAPReplicationDomain domain =
      findDomain(DirectoryServer.getSchemaDN(), null);
    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
    if (domain != null)
    {
      domain.synchronizeModifications(modifications);
    }
  }
  /** {@inheritDoc} */
@@ -599,7 +604,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -612,7 +619,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }
@@ -624,7 +633,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -637,7 +648,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -649,7 +662,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -662,7 +677,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -674,7 +691,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -687,7 +706,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }