mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.19.2013 8d567305382a771caffae063adcd7a42af2c7b3e
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -225,19 +225,7 @@
  private final SortedMap<CSN, FakeOperation> replayOperations =
    new TreeMap<CSN, FakeOperation>();
  /**
   * The isolation policy that this domain is going to use.
   * This field describes the behavior of the domain when an update is
   * attempted and the domain could not connect to any Replication Server.
   * Possible values are accept-updates or deny-updates, but other values
   * may be added in the future.
   */
  private IsolationPolicy isolationPolicy;
  /**
   * The DN of the configuration entry of this domain.
   */
  private final DN configDn;
  private ReplicationDomainCfg config;
  private ExternalChangelogDomain eclDomain;
  /**
@@ -337,21 +325,6 @@
  private static final int FRACTIONAL_BECOME_NO_OP = 3;
  /**
   * This configuration boolean indicates if this ReplicationDomain should log
   * CSNs.
   */
  private boolean logCSN = false;
  /**
   * This configuration integer indicates the time the domain keeps the
   * historical information necessary to solve conflicts.<br>
   * When a change stored in the historical part of the user entry has a date
   * (from its replication CSN) older than this delay, it is candidate to be
   * purged.
   */
  private long histPurgeDelayInMilliSec = 0;
  /**
   * The last CSN purged in this domain. Allows to have a continuous purging
   * process from one purge processing (task run) to the next one. Values 0 when
   * the server starts.
@@ -485,29 +458,14 @@
   * @throws ConfigException In case of invalid configuration.
   */
  public LDAPReplicationDomain(ReplicationDomainCfg configuration,
    BlockingQueue<UpdateToReplay> updateToReplayQueue)
    throws ConfigException
      BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
  {
    super(configuration.getBaseDN(),
          configuration.getServerId(),
          configuration.getInitializationWindowSize());
    // Read the configuration parameters.
    Set<String> replicationServers = configuration.getReplicationServer();
    int window  = configuration.getWindowSize();
    /**
     * The time in milliseconds between heartbeats from the replication
     * server.  Zero means heartbeats are off.
     */
    long heartbeatInterval = configuration.getHeartbeatInterval();
    this.isolationPolicy = configuration.getIsolationPolicy();
    this.configDn = configuration.dn();
    this.logCSN = configuration.isLogChangenumber();
    this.config = configuration;
    this.updateToReplayQueue = updateToReplayQueue;
    this.histPurgeDelayInMilliSec =
      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
    // Get assured configuration
    readAssuredConfig(configuration, false);
@@ -566,8 +524,7 @@
    // register as an AlertGenerator
    DirectoryServer.registerAlertGenerator(this);
    startPublishService(replicationServers, window, heartbeatInterval,
        configuration.getChangetimeHeartbeatInterval());
    startPublishService(configuration);
  }
  /**
@@ -1663,6 +1620,8 @@
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
    }
    // FIXME should the next call use the initWindow parameter rather than the
    // instance variable?
    super.initializeRemote(target, requestorID, initTask, this.initWindow);
  }
@@ -1850,6 +1809,7 @@
   */
  private boolean brokerIsConnected()
  {
    final IsolationPolicy isolationPolicy = config.getIsolationPolicy();
    if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
    {
      // this policy imply that we always accept updates.
@@ -2131,7 +2091,7 @@
    // Note that a failed non-replication operation might not have a change
    // number.
    CSN curCSN = OperationContext.getCSN(op);
    if (curCSN != null && logCSN)
    if (curCSN != null && config.isLogChangenumber())
    {
      op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(),
          "replicationCSN", curCSN));
@@ -3526,7 +3486,7 @@
      {
        // If the base entry does not exist, save the generation
        // ID in the config entry
        result = runSaveGenerationId(configDn, generationId);
        result = runSaveGenerationId(config.dn(), generationId);
      }
      if (result != ResultCode.SUCCESS)
@@ -3573,7 +3533,7 @@
    {
      // if the base entry does not exist look for the generationID
      // in the config entry.
      search = conn.processSearch(configDn.toString(),
      search = conn.processSearch(config.dn().toString(),
          SearchScope.BASE_OBJECT,
          DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
          filter,attributes);
@@ -4125,16 +4085,8 @@
  public ConfigChangeResult applyConfigurationChange(
         ReplicationDomainCfg configuration)
  {
    isolationPolicy = configuration.getIsolationPolicy();
    logCSN = configuration.isLogChangenumber();
    histPurgeDelayInMilliSec =
      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
    changeConfig(
        configuration.getReplicationServer(),
        configuration.getWindowSize(),
        configuration.getHeartbeatInterval(),
        (byte)configuration.getGroupId());
    this.config = configuration;
    changeConfig(configuration);
    // Read assured + fractional configuration and each time reconnect if needed
    readAssuredConfig(configuration, true);
@@ -4211,7 +4163,7 @@
  @Override
  public DN getComponentEntryDN()
  {
    return configDn;
    return config.dn();
  }
  /**
@@ -4234,7 +4186,7 @@
  {
    try
    {
      DN eclConfigEntryDN = DN.decode("cn=external changeLog," + configDn);
      DN eclConfigEntryDN = DN.decode("cn=external changeLog," + config.dn());
      if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN))
      {
        DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null);
@@ -4264,6 +4216,7 @@
    // unit test cases
    try
    {
      DN configDn = config.dn();
      if (DirectoryServer.getConfigHandler().entryExists(configDn))
      {
        try
@@ -5272,14 +5225,14 @@
  }
  /**
   * Return the purge delay (in ms) for the historical information stored
   * in entries to solve conflicts for this domain.
   * Return the minimum time (in ms) that the domain keeps the historical
   * information necessary to solve conflicts.
   *
   * @return the purge delay.
   */
  public long getHistoricalPurgeDelay()
  {
    return histPurgeDelayInMilliSec;
    return config.getConflictsHistoricalPurgeDelay() * 60 * 1000;
  }
  /**
@@ -5348,7 +5301,7 @@
       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
       lastCSNPurgedFromHist = entryHist.getOldestCSN();
       entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec);
       entryHist.setPurgeDelay(getHistoricalPurgeDelay());
       Attribute attr = entryHist.encodeAndPurge();
       count += entryHist.getLastPurgedValuesCount();
       List<Modification> mods = new LinkedList<Modification>();