mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
21.07.2014 13f31d030c3b205931b63c29b0d6bc1d4eefd163
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,6 +42,7 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
import org.opends.server.backends.ChangelogBackend;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
@@ -56,6 +57,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
@@ -63,6 +65,7 @@
import org.opends.server.util.StaticUtils;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -95,11 +98,19 @@
  private final Map<DN, ReplicationServerDomain> baseDNs =
      new HashMap<DN, ReplicationServerDomain>();
  /** The database storing the changes. */
  private final ChangelogDB changelogDB;
  /** The backend that allow to search the changes (external changelog). */
  private ChangelogBackend changelogBackend;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private boolean stopListen = false;
  private final ReplSessionSecurity replSessionSecurity;
  /** To know whether a domain is enabled for the external changelog. */
  private final ECLEnabledDomainPredicate domainPredicate;
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
@@ -136,26 +147,41 @@
   */
  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
  {
    this(cfg, new DSRSShutdownSync());
    this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
  }
  /**
   * Creates a new Replication server using the provided configuration entry.
   * Creates a new Replication server using the provided configuration entry and shutdown
   * synchronization object.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg cfg,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
  }
  /**
   * Creates a new Replication server using the provided configuration entry, shutdown
   * synchronization object and domain predicate.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @param predicate Indicates whether a domain is enabled for the external changelog.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
      final ECLEnabledDomainPredicate predicate) throws ConfigException
  {
    this.config = cfg;
    this.dsrsShutdownSync = dsrsShutdownSync;
    this.domainPredicate = predicate;
    ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
    if (DebugLogger.debugEnabled())
    {
      TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl
          + " as DB implementation for changelog DB");
      TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB");
    }
    this.changelogDB = dbImpl == ReplicationDBImplementation.JE
        ? new JEChangelogDB(this, cfg)
@@ -165,6 +191,9 @@
    initialize();
    cfg.addChangeListener(this);
    // TODO : uncomment to branch changelog backend
    //enableExternalChangeLog();
    localPorts.add(getReplicationPort());
    // Keep track of this new instance
@@ -501,6 +530,57 @@
    registerVirtualAttributeRules();
  }
  /**
   * Enable the external changelog if it is not already enabled.
   * <p>
   * The external changelog is provided by the changelog backend.
   *
   * @throws ConfigException
   *            If an error occurs.
   */
  private void enableExternalChangeLog() throws ConfigException
  {
    if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
    {
      // Backend has already been created and initialized
      // This can occurs in tests
      return;
    }
    try
    {
      changelogBackend = new ChangelogBackend(this, domainPredicate);
      changelogBackend.initializeBackend();
      try
      {
        DirectoryServer.registerBackend(changelogBackend);
      }
      catch (Exception e)
      {
        logError(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
            getExceptionMessage(e)));
      }
      registerVirtualAttributeRules();
    }
    catch (Exception e)
    {
      // TODO : I18N with correct message + what kind of exception should we really throw ?
      // (Directory/Initialization/Config Exception)
      throw new ConfigException(Message.raw("Error when enabling external changelog"), e);
    }
  }
  private void shutdownExternalChangelog()
  {
    if (changelogBackend != null)
    {
      DirectoryServer.deregisterBackend(changelogBackend);
      changelogBackend.finalizeBackend();
      changelogBackend = null;
    }
    deregisterVirtualAttributeRules();
  }
  private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
  {
    final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>();
@@ -609,6 +689,64 @@
    return getReplicationServerDomain(baseDN, false);
  }
  /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
  private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
  {
    Set<DN> domains = null;
    synchronized (baseDNs)
    {
      domains = new HashSet<DN>(baseDNs.keySet());
    }
    domains.removeAll(excludedBaseDNs);
    return domains;
  }
  /**
   * Validate that provided state is coherent with this replication server,
   * when ignoring the provided set of DNs.
   * <p>
   * The state is coherent if and only if it exactly has the set of DNs corresponding to
   * the replication domains.
   *
   * @param state
   *            The multi domain state (cookie) to validate.
   * @param ignoredBaseDNs
   *            The set of DNs to ignore when validating
   * @throws DirectoryException
   *            If the state is not valid
   */
  public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException
  {
    // TODO : should skip unused domains, where domain.getLatestServerState(); is empty
    final Set<DN> domains = getDomainDNs(ignoredBaseDNs);
    final Set<DN> stateDomains = state.getSnapshot().keySet();
    final Set<DN> domainsCopy = new HashSet<DN>(domains);
    final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains);
    domainsCopy.removeAll(stateDomains);
    if (!domainsCopy.isEmpty())
    {
      final StringBuilder missingDomains = new StringBuilder();
      for (DN dn : domainsCopy)
      {
        missingDomains.append(dn).append(":;");
      }
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
              missingDomains, "<" + state.toString() + missingDomains + ">"));
    }
    stateDomainsCopy.removeAll(domains);
    if (!stateDomainsCopy.isEmpty())
    {
      final StringBuilder startState = new StringBuilder();
      for (DN dn : domains) {
        startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";");
      }
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
              stateDomainsCopy.toString(), startState));
    }
  }
  /**
   * Get the ReplicationServerDomain associated to the base DN given in
   * parameter.
@@ -706,7 +844,9 @@
      domain.shutdown();
    }
    // TODO : switch to second method when changelog backend is branched
    shutdownECL();
    //shutdownExternalChangelog();
    try
    {