mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.54.2014 aea0892feca2fd3d56c9c810debed6d22389454e
opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -43,14 +43,14 @@
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.api.*;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.types.operation.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -73,9 +73,10 @@
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private ReplicationServerListener replicationServerListener = null;
  private ReplicationServerListener replicationServerListener;
  private static final Map<DN, LDAPReplicationDomain> domains =
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
      new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
  /**
   * The queue of received update messages, to be treated by the ReplayThread
@@ -119,8 +120,7 @@
   *                   Can be null is the request has no associated operation.
   * @return           The domain for this DN.
   */
  public static LDAPReplicationDomain findDomain(
      DN dn, PluginOperation pluginOp)
  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
  {
    /*
     * Don't run the special replication code on Operation that are
@@ -130,7 +130,9 @@
    {
        final Operation op = (Operation) pluginOp;
        if (op.dontSynchronize())
        {
          return null;
        }
        /*
         * Check if the provided operation is a repair operation and set the
@@ -187,8 +189,8 @@
  {
    try
    {
      LDAPReplicationDomain domain =
          new LDAPReplicationDomain(configuration, updateToReplayQueue);
      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
          configuration, updateToReplayQueue, dsrsShutdownSync);
      if (domains.size() == 0)
      {
        // Create the threads that will process incoming update messages
@@ -223,9 +225,8 @@
      BlockingQueue<UpdateToReplay> queue)
      throws ConfigException
  {
    LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue);
    final LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
    domains.put(domain.getBaseDN(), domain);
    return domain;
  }
@@ -251,35 +252,30 @@
  /** {@inheritDoc} */
  @Override
  public void initializeSynchronizationProvider(
      ReplicationSynchronizationProviderCfg configuration)
  throws org.forgerock.opendj.config.server.ConfigException
      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
  {
    domains.clear();
    replicationServerListener = new ReplicationServerListener(configuration);
    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
    // Register as an add and delete listener with the root configuration so we
    // can be notified if Multimaster domain entries are added or removed.
    configuration.addReplicationDomainAddListener(this);
    configuration.addReplicationDomainDeleteListener(this);
    cfg.addReplicationDomainAddListener(this);
    cfg.addReplicationDomainDeleteListener(this);
    // Register as a root configuration listener so that we can be notified if
    // number of replay threads is changed and apply changes.
    configuration.addReplicationChangeListener(this);
    cfg.addReplicationChangeListener(this);
    replayThreadNumber = configuration.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
        Integer.MAX_VALUE);
    replayThreadNumber = cfg.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    for (String name : cfg.listReplicationDomains())
    {
      createNewDomain(configuration.getReplicationDomain(name));
      createNewDomain(cfg.getReplicationDomain(name));
    }
    /*
     * If any schema changes were made with the server offline, then handle them
     * now.
     */
    // If any schema changes were made with the server offline, then handle them now.
    List<Modification> offlineSchemaChanges =
         DirectoryServer.getOfflineSchemaChanges();
    if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -407,12 +403,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyOperation modifyOperation)
  {
    LDAPReplicationDomain domain =
      findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyOperation);
    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -420,12 +416,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationAddOperation addOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(addOperation);
    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(addOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -433,12 +429,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationDeleteOperation deleteOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(deleteOperation);
    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(deleteOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -446,12 +442,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyDNOperation);
    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyDNOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -510,7 +506,9 @@
    LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
    if (domain == null || !domain.solveConflict())
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // The historical object is retrieved from the attachment created
    // in the HandleConflictResolution phase.
@@ -542,11 +540,15 @@
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // For LOCAL op only, generate CSN and attach Context
    if (!addOperation.isSynchronizationOperation())
    {
      domain.doPreOperation(addOperation);
    }
    // Add to the operation the historical attribute : "dn:changeNumber:add"
    EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -569,7 +571,9 @@
    stopReplayThreads();
    if (replicationServerListener != null)
    {
      replicationServerListener.shutdown();
    }
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
@@ -590,10 +594,11 @@
  @Override
  public void processSchemaChange(List<Modification> modifications)
  {
    LDAPReplicationDomain domain =
      findDomain(DirectoryServer.getSchemaDN(), null);
    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
    if (domain != null)
    {
      domain.synchronizeModifications(modifications);
    }
  }
  /** {@inheritDoc} */
@@ -604,7 +609,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -617,7 +624,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }
@@ -629,7 +638,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -642,7 +653,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -654,7 +667,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -667,7 +682,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -679,7 +696,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -692,7 +711,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }