mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
24.20.2013 856fdd0571358c660afaf379f8e774ab8b24f05c
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -27,57 +27,29 @@
 */
package org.opends.server.replication.plugin;
import java.util.ArrayList;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.api.Backend;
import org.opends.server.api.BackupTaskListener;
import org.opends.server.api.ExportTaskListener;
import org.opends.server.api.ImportTaskListener;
import org.opends.server.api.RestoreTaskListener;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.Operation;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SynchronizationProviderResult;
import org.opends.server.types.operation.PluginOperation;
import org.opends.server.types.operation.PostOperationAddOperation;
import org.opends.server.types.operation.PostOperationDeleteOperation;
import org.opends.server.types.operation.PostOperationModifyDNOperation;
import org.opends.server.types.operation.PostOperationModifyOperation;
import org.opends.server.types.operation.PostOperationOperation;
import org.opends.server.types.operation.PreOperationAddOperation;
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import org.opends.server.types.*;
import org.opends.server.types.operation.*;
/**
 * This class is used to load the Replication code inside the JVM
@@ -104,8 +76,8 @@
   * The queue of received update messages, to be treated by the ReplayThread
   * threads.
   */
  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
    new LinkedBlockingQueue<UpdateToReplay>();
  private static final BlockingQueue<UpdateToReplay> updateToReplayQueue =
      new LinkedBlockingQueue<UpdateToReplay>(10000);
  /**
   * The list of ReplayThread threads.
@@ -228,9 +200,9 @@
  }
  /**
   * Creates a new domain from its configEntry, do the
   * necessary initialization and starts it so that it is
   * fully operational when this method returns.
   * Creates a new domain from its configEntry, do the necessary initialization
   * and starts it so that it is fully operational when this method returns. It
   * is only used for tests so far.
   *
   * @param configuration The entry with the configuration of this domain.
   * @param queue         The BlockingQueue that this domain will use.
@@ -239,13 +211,13 @@
   *
   * @throws ConfigException When the configuration is not valid.
   */
  public static LDAPReplicationDomain createNewDomain(
  static LDAPReplicationDomain createNewDomain(
      ReplicationDomainCfg configuration,
      BlockingQueue<UpdateToReplay> queue)
      throws ConfigException
  {
    LDAPReplicationDomain domain;
    domain = new LDAPReplicationDomain(configuration, queue);
    LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue);
    domains.put(domain.getBaseDN(), domain);
    return domain;
@@ -362,6 +334,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isConfigurationAddAcceptable(
      ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
  {
@@ -372,6 +345,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public ConfigChangeResult applyConfigurationAdd(
     ReplicationDomainCfg configuration)
  {
@@ -652,6 +626,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processBackupBegin(Backend backend, BackupConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -665,6 +640,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processBackupEnd(Backend backend, BackupConfig config,
                               boolean successful)
  {
@@ -679,6 +655,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processRestoreBegin(Backend backend, RestoreConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -692,6 +669,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processRestoreEnd(Backend backend, RestoreConfig config,
                                boolean successful)
  {
@@ -706,6 +684,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processImportBegin(Backend backend, LDIFImportConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -719,6 +698,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processImportEnd(Backend backend, LDIFImportConfig config,
                               boolean successful)
  {
@@ -733,6 +713,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processExportBegin(Backend backend, LDIFExportConfig config)
  {
    for (DN dn : backend.getBaseDNs())
@@ -746,6 +727,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processExportEnd(Backend backend, LDIFExportConfig config,
                               boolean successful)
  {
@@ -760,6 +742,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public ConfigChangeResult applyConfigurationDelete(
      ReplicationDomainCfg configuration)
  {
@@ -771,6 +754,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isConfigurationDeleteAcceptable(
      ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
  {
@@ -804,10 +788,10 @@
  /**
   * {@inheritDoc}
   */
  public boolean
    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
    configuration,
    List<Message> unacceptableReasons)
  @Override
  public boolean isConfigurationChangeAcceptable(
      ReplicationSynchronizationProviderCfg configuration,
      List<Message> unacceptableReasons)
  {
    return true;
  }
@@ -815,9 +799,9 @@
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult
    applyConfigurationChange
    (ReplicationSynchronizationProviderCfg configuration)
  @Override
  public ConfigChangeResult applyConfigurationChange(
      ReplicationSynchronizationProviderCfg configuration)
  {
    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
@@ -838,6 +822,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void completeSynchronizationProvider()
  {
    isRegistered = true;