mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
18.13.2008 a719d21181a3b1c98c16bc677e892cf67fed4e7f
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -59,6 +59,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
@@ -318,26 +319,8 @@
    configDn = configuration.dn();
    this.updateToReplayQueue = updateToReplayQueue;
    /*
     * Fill assured configuration properties
     */
    AssuredType assuredType = configuration.getAssuredType();
    switch (assuredType)
    {
      case NOT_ASSURED:
        setAssured(false);
        break;
      case SAFE_DATA:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
        break;
      case SAFE_READ:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_READ_MODE);
        break;
    }
    setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
    setAssuredTimeout(configuration.getAssuredTimeout());
    // Get assured configuration
    readAssuredConfig(configuration);
    setGroupId((byte)configuration.getGroupId());
    setURLs(configuration.getReferralsUrl());
@@ -405,6 +388,72 @@
  }
  /**
   * Gets and stores the assured replication configuration parameters. Returns
   * a boolean indicating if the passed configuration has changed compared to
   * previous values and the changes require a reconnection.
   * @param configuration The configuration object
   * @return True if the assured configuration changed and we need to reconnect
   */
  private boolean readAssuredConfig(ReplicationDomainCfg configuration)
  {
    boolean needReconnect = false;
    byte newSdLevel = (byte) configuration.getAssuredSdLevel();
    if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) &&
      (newSdLevel != getAssuredSdLevel()))
    {
      needReconnect = true;
    }
    AssuredType newAssuredType = configuration.getAssuredType();
    switch (newAssuredType)
    {
      case NOT_ASSURED:
        if (isAssured())
        {
          needReconnect = true;
        }
        break;
      case SAFE_DATA:
        if (!isAssured() ||
          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE)))
        {
          needReconnect = true;
        }
        break;
      case SAFE_READ:
        if (!isAssured() ||
          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)))
        {
          needReconnect = true;
        }
        break;
    }
    switch (newAssuredType)
    {
      case NOT_ASSURED:
        setAssured(false);
        break;
      case SAFE_DATA:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
        break;
      case SAFE_READ:
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_READ_MODE);
        break;
    }
    setAssuredSdLevel(newSdLevel);
    // Changing timeout does not require restart as it is not sent in
    // StartSessionMsg
    setAssuredTimeout(configuration.getAssuredTimeout());
    return needReconnect;
  }
  /**
   * Returns the base DN of this ReplicationDomain.
   *
   * @return The base DN of this ReplicationDomain
@@ -836,7 +885,27 @@
    if (!op.isSynchronizationOperation())
    {
      // If assured replication is configured, this will prepare blocking
      // mechanism. If assured replication is disabled, this returns
      // immediately
      prepareWaitForAckIfAssuredEnabled(msg);
      pendingChanges.pushCommittedChanges();
      // If assured replication is enabled, this will wait for the matching
      // ack or time out. If assured replication is disabled, this returns
      // immediately
      try
      {
        waitForAckIfAssuredEnabled(msg);
      } catch (TimeoutException ex)
      {
        // This exception may only be raised if assured replication is
        // enabled
        Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(),
          Long.toString(getAssuredTimeout()), msg.toString());
        logError(errorMsg);
      }
    }
  }
@@ -2583,7 +2652,18 @@
    changeConfig(
        configuration.getReplicationServer(),
        configuration.getWindowSize(),
        configuration.getHeartbeatInterval());
        configuration.getHeartbeatInterval(),
        (byte)configuration.getGroupId());
    // Get assured configuration
    boolean needReconnect = readAssuredConfig(configuration);
    // Reconnect if required
    if (needReconnect)
    {
      disableService();
      enableService();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }