mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.47.2014 a592fe71c4c2e29a136f9700a2981f3dcbd7e114
OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation

After OPENDJ-1206, I could remove all old External ChangeLog (or ECL for short) related classes. I could also remove annoyingly strange APIs from various places of replication code.



ECLUpdateMsg.java, ServerStartECLMsg.java, StartECLSessionMsg.java, ECLServerHandler.java, ECLServerWriter.java, ECLSearchOperation.java, ECLWorkflowElement.java, ExternalChangeLogTest.java: REMOVED
Also removed package src/server/org/opends/server/workflowelement/externalchangelog as part of these deletes.

replication*.properties:
Removed now unused SEVERE_ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.

ReplicationMsg.java:
Deprecated old ECL message types.

ReplicationBroker.java, ReplicationServer.java:
Removed all ECL related code.

ReplicationServerDomain.java:
Removed the now unused otherHandlers field, stopServer(MessageHandler), registerHandler(MessageHandler), unRegisterHandler(MessageHandler), unregisterOtherHandler(), getOldestState() and getLatestDomainTrimDate()

ServerHandler.java
Reduced visibility has much as possible after removing ECLServerHandler.



MultimasterReplication.java:
Renamed getECLDisabledDomains() to , included "cn=changelog" by default + changed client code to not add it anymore.

LastCookieVirtualProvider.java:
Consequence of the change to MultimasterReplication.

ChangelogBackend.java:
Consequence of the change to MultimasterReplication.getECLDisabledDomains().
Removed SearchParams.requestType and replaced its usage with the new isCookieBasedSearch()



MultiDomainServerState.java, ServerState.java, ReplicationDomainDB.java, FileChangelogDB.java, JEChangelogDB.java:
Removed now unused methods / fields.
Made private methods / fields that are now only used inside their declaration classes.



ChangelogBackendTestCase.java, SynchronizationMsgTest.java, FileReplicaDBTest.java:
Updated or removed tests as a consequences of the whole change.

9 files deleted
14 files modified
7440 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java 5 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java 16 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/common/MultiDomainServerState.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/common/ServerState.java 24 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 57 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java 185 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 10 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java 251 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java 377 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java 1465 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java 276 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java 6 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 140 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java 55 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 12 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 6 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 10 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java 120 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java 1255 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/workflowelement/externalchangelog/ECLWorkflowElement.java 153 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/workflowelement/externalchangelog/package-info.java 37 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 161 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 2817 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -48,8 +48,6 @@
import org.forgerock.opendj.ldap.ModificationType;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SearchScope;
import org.opends.messages.Category;
import org.opends.messages.Severity;
import org.opends.server.admin.Configuration;
import org.opends.server.api.Backend;
import org.opends.server.config.ConfigConstants;
@@ -114,7 +112,6 @@
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.replication.plugin.MultimasterReplication.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
@@ -795,7 +792,7 @@
  private static Set<DN> getExcludedBaseDNs() throws DirectoryException
  {
    final Set<DN> excludedDNs = new HashSet<DN>();
    for (String dn : getECLDisabledDomains())
    for (String dn : getExcludedChangelogDomains())
    {
      excludedDNs.add(DN.valueOf(dn));
    }
opendj3-server-dev/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
@@ -26,20 +26,20 @@
 */
package org.opends.server.replication.common;
import java.util.Set;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
import org.opends.server.core.SearchOperation;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.Entry;
import org.opends.server.types.VirtualAttributeRule;
import static org.opends.messages.ExtensionMessages.*;
import static org.opends.server.replication.plugin.MultimasterReplication.*;
/**
 * This class implements a virtual attribute provider in the root-dse entry
@@ -89,11 +89,7 @@
    {
      if (replicationServer != null)
      {
        // Set a list of excluded domains (also exclude 'cn=changelog' itself)
        Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
        excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
        String newestCookie = replicationServer.getNewestECLCookie(excludedDomains).toString();
        String newestCookie = replicationServer.getNewestECLCookie(getExcludedChangelogDomains()).toString();
        return Attributes.create(rule.getAttributeType(), newestCookie);
      }
    }
opendj3-server-dev/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -381,7 +381,7 @@
   *              when an error occurs
   * @return the split state.
   */
  public static Map<DN, ServerState> splitGenStateToServerStates(
  private static Map<DN, ServerState> splitGenStateToServerStates(
      String multiDomainServerState) throws DirectoryException
  {
    Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
opendj3-server-dev/src/server/org/opends/server/replication/common/ServerState.java
@@ -421,28 +421,4 @@
    return saved;
  }
  /**
   * Build a copy of the ServerState with only CSNs older than a provided
   * timestamp. This is used when building the initial Cookie in the External
   * Changelog, to cope with purged changes.
   *
   * @param timestamp
   *          The timestamp to compare the ServerState against
   * @return a copy of the ServerState which only contains the CSNs older than
   *         csn.
   */
  public ServerState duplicateOnlyOlderThan(long timestamp)
  {
    final CSN csn = new CSN(timestamp, 0, 0);
    final ServerState newState = new ServerState();
    for (CSN change : serverIdToCSN.values())
    {
      if (change.isOlderThan(csn))
      {
        newState.serverIdToCSN.put(change.getServerId(), change);
      }
    }
    return newState;
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -26,31 +26,60 @@
 */
package org.opends.server.replication.plugin;
import java.util.*;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.api.*;
import org.opends.server.api.Backend;
import org.opends.server.api.BackupTaskListener;
import org.opends.server.api.ExportTaskListener;
import org.opends.server.api.ImportTaskListener;
import org.opends.server.api.RestoreTaskListener;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.types.operation.*;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.Operation;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.SynchronizationProviderResult;
import org.opends.server.types.operation.PluginOperation;
import org.opends.server.types.operation.PostOperationAddOperation;
import org.opends.server.types.operation.PostOperationDeleteOperation;
import org.opends.server.types.operation.PostOperationModifyDNOperation;
import org.opends.server.types.operation.PostOperationModifyOperation;
import org.opends.server.types.operation.PostOperationOperation;
import org.opends.server.types.operation.PreOperationAddOperation;
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -819,14 +848,14 @@
  }
  /**
   * Gets the Set of baseDN of the domains which are disabled for the external
   * changelog.
   * Gets the Set of domain baseDN which are disabled for the external changelog.
   *
   * @return The Set of baseDNs which are disabled for the external changelog.
   * @return The Set of domain baseDNs which are disabled for the external changelog.
   */
  public static Set<String> getECLDisabledDomains()
  public static Set<String> getExcludedChangelogDomains()
  {
    final Set<String> disabledBaseDNs = new HashSet<String>(domains.size());
    final Set<String> disabledBaseDNs = new HashSet<String>(domains.size() + 1);
    disabledBaseDNs.add(DN_EXTERNAL_CHANGELOG_ROOT);
    for (LDAPReplicationDomain domain : domains.values())
    {
      if (!domain.isECLEnabled())
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
File was deleted
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -71,8 +71,11 @@
  static final byte MSG_TYPE_GENERIC_UPDATE = 29;
  // Added for protocol version 3
  @Deprecated
  static final byte MSG_TYPE_START_ECL = 30;
  @Deprecated
  static final byte MSG_TYPE_START_ECL_SESSION = 31;
  @Deprecated
  static final byte MSG_TYPE_ECL_UPDATE = 32;
  static final byte MSG_TYPE_CT_HEARTBEAT = 33;
@@ -190,11 +193,12 @@
    case MSG_TYPE_GENERIC_UPDATE:
      return new UpdateMsg(buffer);
    case MSG_TYPE_START_ECL:
      return new ServerStartECLMsg(buffer);
    case MSG_TYPE_START_ECL_SESSION:
      return new StartECLSessionMsg(buffer);
    case MSG_TYPE_ECL_UPDATE:
      return new ECLUpdateMsg(buffer);
      // Legacy versions never sent such messages to other instances (other JVMs).
      // They were only used in the combined DS-RS case.
      // It is safe to totally ignore these values since code now uses the ChangelogBackend.
      return null;
    case MSG_TYPE_CT_HEARTBEAT:
      return new ChangeTimeHeartbeatMsg(buffer, protocolVersion);
    case MSG_TYPE_REPL_SERVER_START_DS:
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
File was deleted
opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
File was deleted
opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
File was deleted
opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
File was deleted
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -267,12 +267,6 @@
              session, queueSize, this, rcvWindow);
          rsHandler.startFromRemoteRS((ReplServerStartMsg) msg);
        }
        else if (msg instanceof ServerStartECLMsg)
        {
          ECLServerHandler eclHandler = new ECLServerHandler(
              session, queueSize, this, rcvWindow);
          eclHandler.startFromRemoteServer((ServerStartECLMsg) msg);
        }
        else
        {
          // We did not recognize the message, close session as what
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,10 +27,16 @@
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -41,12 +47,31 @@
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.HostPort;
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
@@ -112,9 +137,6 @@
  private final Map<Integer, ReplicationServerHandler> connectedRSs =
    new ConcurrentHashMap<Integer, ReplicationServerHandler>();
  private final Queue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  private final ReplicationDomainDB domainDB;
  /** The ReplicationServer that created the current instance. */
  private final ReplicationServer localReplicationServer;
@@ -368,11 +390,6 @@
        addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
      }
    }
    // Push the message to the other subscribing handlers
    for (MessageHandler mHandler : otherHandlers) {
      mHandler.add(updateMsg);
    }
  }
  private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
@@ -1086,10 +1103,6 @@
        {
          unregisterServerHandler(sHandler, shutdown, true);
        }
        else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(sHandler);
        }
      }
      catch(Exception e)
      {
@@ -1105,12 +1118,6 @@
    }
  }
  private void unregisterOtherHandler(MessageHandler mHandler)
  {
    unRegisterHandler(mHandler);
    mHandler.shutdown();
  }
  private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
      boolean isDirectoryServer)
  {
@@ -1137,59 +1144,6 @@
  }
  /**
   * Stop the handler.
   * @param mHandler The handler to stop.
   */
  public void stopServer(MessageHandler mHandler)
  {
    // TODO JNR merge with stopServer(ServerHandler, boolean)
    if (logger.isTraceEnabled())
    {
      debug("stopServer() on the message handler " + mHandler);
    }
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
     * ServerWriter at the same time, or from a thread that wants to shut down
     * the handler. So use a thread safe flag to know if the job must be done
     * or not (is already being processed or not).
     */
    if (!mHandler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      }
      catch (InterruptedException ex)
      {
        // We can't deal with this here, so re-interrupt thread so that it is
        // caught during subsequent IO.
        Thread.currentThread().interrupt();
        return;
      }
      try
      {
        if (otherHandlers.contains(mHandler))
        {
          unregisterOtherHandler(mHandler);
        }
      }
      catch(Exception e)
      {
        logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
      }
      finally
      {
        release();
      }
    }
  }
  /**
   * Unregister this handler from the list of handlers registered to this
   * domain.
   * @param sHandler the provided handler to unregister.
@@ -2395,25 +2349,6 @@
  }
  /**
   * Register in the domain an handler that subscribes to changes.
   * @param mHandler the provided subscribing handler.
   */
  public void registerHandler(MessageHandler mHandler)
  {
    this.otherHandlers.add(mHandler);
  }
  /**
   * Unregister from the domain an handler.
   * @param mHandler the provided unsubscribing handler.
   * @return Whether this handler has been unregistered with success.
   */
  public boolean unRegisterHandler(MessageHandler mHandler)
  {
    return this.otherHandlers.remove(mHandler);
  }
  /**
   * Returns the oldest known state for the domain, made of the oldest CSN
   * stored for each serverId.
   * <p>
@@ -2427,8 +2362,7 @@
    return domainDB.getDomainOldestCSNs(baseDN);
  }
  private void sendTopologyMsg(String type, ServerHandler handler,
      TopologyMsg msg)
  private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg)
  {
    for (int i = 1; i <= 2; i++)
    {
@@ -2491,18 +2425,6 @@
    }
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
   * @return The latest trim date.
   */
  public long getLatestDomainTrimDate()
  {
    return domainDB.getDomainLatestTrimDate(baseDN);
  }
  /**
   * Return the monitor instance name of the ReplicationServer that created the
   * current instance.
opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -118,12 +118,12 @@
  /**
   * The associated ServerWriter that sends messages to the remote server.
   */
  protected ServerWriter writer;
  private ServerWriter writer;
  /**
   * The associated ServerReader that receives messages from the remote server.
   */
  protected ServerReader reader;
  private ServerReader reader;
  // window
  private int rcvWindow;
@@ -136,11 +136,11 @@
  /**
   * Semaphore that the writer uses to control the flow to the remote server.
   */
  protected Semaphore sendWindow;
  private Semaphore sendWindow;
  /**
   * The initial size of the sending window.
   */
  protected int sendWindowSize;
  private int sendWindowSize;
  /**
   * remote generation id.
   */
@@ -165,7 +165,7 @@
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  protected long heartbeatInterval = 0;
  protected long heartbeatInterval;
  /**
   * The thread that will send heartbeats.
@@ -175,7 +175,7 @@
  /**
   * Set when ServerWriter is stopping.
   */
  protected volatile boolean shutdownWriter = false;
  private volatile boolean shutdownWriter;
  /**
   * Weight of this remote server.
@@ -543,22 +543,16 @@
    }
    // Window stats
    attributes.add(Attributes.create("max-send-window", String
        .valueOf(sendWindowSize)));
    attributes.add(Attributes.create("current-send-window", String
        .valueOf(sendWindow.availablePermits())));
    attributes.add(Attributes.create("max-rcv-window", String
        .valueOf(maxRcvWindow)));
    attributes.add(Attributes.create("current-rcv-window", String
        .valueOf(rcvWindow)));
    attributes.add(Attributes.create("max-send-window", String.valueOf(sendWindowSize)));
    attributes.add(Attributes.create("current-send-window", String.valueOf(sendWindow.availablePermits())));
    attributes.add(Attributes.create("max-rcv-window", String.valueOf(maxRcvWindow)));
    attributes.add(Attributes.create("current-rcv-window", String.valueOf(rcvWindow)));
    // Encryption
    attributes.add(Attributes.create("ssl-encryption", String
        .valueOf(session.isEncrypted())));
    attributes.add(Attributes.create("ssl-encryption", String.valueOf(session.isEncrypted())));
    // Data generation
    attributes.add(Attributes.create("generation-id", String
        .valueOf(generationId)));
    attributes.add(Attributes.create("generation-id", String.valueOf(generationId)));
    return attributes;
  }
@@ -923,20 +917,12 @@
  }
  /**
   * Requests to shutdown the writer.
   */
  protected void shutdownWriter()
  {
    shutdownWriter = true;
  }
  /**
   * Shutdown This ServerHandler.
   */
  @Override
  public void shutdown()
  {
    shutdownWriter();
    shutdownWriter = true;
    setConsumerActive(false);
    super.shutdown();
@@ -1141,21 +1127,6 @@
  }
  /**
   * Log the messages involved in the Topology/StartSession handshake.
   * @param inStartECLSessionMsg The message received first.
   */
  protected void logStartECLSessionHandshake(
      StartECLSessionMsg inStartECLSessionMsg)
  {
    if (logger.isTraceEnabled())
    {
      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + " :"
          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
    }
  }
  /**
   * Process a Ack message received.
   * @param ack the message received.
   */
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -77,18 +77,6 @@
  ServerState getDomainNewestCSNs(DN baseDN);
  /**
   * Retrieves the latest trim date for the specified replication domain.
   * <p>
   * FIXME will be removed when ECLServerHandler will not be responsible anymore
   * for lazily building the ChangeNumberIndexDB.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return the domain latest trim date
   */
  long getDomainLatestTrimDate(DN baseDN);
  /**
   * Removes all the data relating to the specified replication domain and
   * shutdown all its replica databases. In particular, it will:
   * <ol>
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -78,12 +78,6 @@
 */
public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
{
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(DN baseDN)
  {
    throw new RuntimeException("Not implemented");
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -113,7 +113,6 @@
   */
  private long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
  private volatile long latestPurgeDate;
  /** The local replication server. */
  private final ReplicationServer replicationServer;
@@ -664,13 +663,6 @@
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(final DN baseDN)
  {
    return latestPurgeDate;
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDB getChangeNumberIndexDB()
  {
    synchronized (cnIndexDBLock)
@@ -967,8 +959,6 @@
            }
          }
          latestPurgeDate = purgeTimestamp;
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -49,7 +49,6 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DN;
import org.opends.server.types.HostPort;
import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
@@ -279,7 +278,7 @@
    {
      shutdown = false;
      this.rcvWindow = getMaxRcvWindow();
      connect();
      connectAsDataServer();
    }
  }
@@ -692,19 +691,6 @@
    }
  }
  private void connect()
  {
    if (getBaseDN().toNormalizedString().equalsIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
    {
      connectAsECL();
    }
    else
    {
      connectAsDataServer();
    }
  }
  /**
   * Contacts all replication servers to get information from them and being
   * able to choose the more suitable.
@@ -718,7 +704,7 @@
    for (String serverUrl : getReplicationServerUrls())
    {
      // Connect to server + get and store info about it
      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
      final ReplicationServerInfo rsInfo = rs.rsInfo;
      if (rsInfo != null)
      {
@@ -730,35 +716,6 @@
  }
  /**
   * Special aspects of connecting as ECL (External Change Log) compared to
   * connecting as data server are :
   * <ul>
   * <li>1 single RS configured</li>
   * <li>so no choice of the preferred RS</li>
   * <li>?? Heartbeat</li>
   * <li>Start handshake is :
   *
   * <pre>
   *    Broker ---> StartECLMsg       ---> RS
   *          <---- ReplServerStartMsg ---
   *           ---> StartSessionECLMsg --> RS
   * </pre>
   *
   * </li>
   * </ul>
   */
  private void connectAsECL()
  {
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    final String bestServerURL = getReplicationServerUrls().iterator().next();
    final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
    if (rs.isConnected())
    {
      performECLPhaseTwoHandshake(bestServerURL, rs);
    }
  }
  /**
   * Connect to a ReplicationServer.
   *
   * Handshake sequences between a DS and a RS is divided into 2 logical
@@ -834,7 +791,7 @@
              + evals.getBestRS());
        final ConnectedRS electedRS = performPhaseOneHandshake(
            evals.getBestRS().getServerURL(), true, false);
            evals.getBestRS().getServerURL(), true);
        final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
        if (electedRsInfo != null)
        {
@@ -1100,12 +1057,9 @@
   *          Do we keep session opened or not after handshake. Use true if want
   *          to perform handshake phase 2 with the same session and keep the
   *          session to create as the current one.
   * @param isECL
   *          Indicates whether or not the an ECL handshake is to be performed.
   * @return The answer from the server . Null if could not get an answer.
   */
  private ConnectedRS performPhaseOneHandshake(String serverURL,
      boolean keepSession, boolean isECL)
  private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
  {
    Session newSession = null;
    Socket socket = null;
@@ -1124,8 +1078,7 @@
        socket.bind(local);
      }
      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
          timeoutMS);
      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
      newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
      boolean isSslEncryption = replSessionSecurity.isSslEncryption();
@@ -1133,19 +1086,9 @@
      final HostPort hp = new HostPort(
          socket.getLocalAddress().getHostName(), socket.getLocalPort());
      final String url = hp.toString();
      final StartMsg serverStartMsg;
      if (!isECL)
      {
        serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
            getGenerationID(), isSslEncryption, getGroupId());
      }
      else
      {
        serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
            getGenerationID(), isSslEncryption, getGroupId());
      }
      final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
          getMaxRcvWindow(), config.getHeartbeatInterval(), state,
          getGenerationID(), isSslEncryption, getGroupId());
      newSession.publish(serverStartMsg);
      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1233,44 +1176,6 @@
    return setConnectedRS(ConnectedRS.noConnectedRS());
  }
  /**
   * Performs the second phase handshake for External Change Log (send
   * StartSessionMsg and receive TopologyMsg messages exchange) and return the
   * reply message from the replication server.
   *
   * @param server Server we are connecting with.
   */
  private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
  {
    try
    {
      // Send our Start Session
      final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
      startECLSessionMsg.setOperationId("-1");
      rs.session.publish(startECLSessionMsg);
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (logger.isTraceEnabled())
      {
        debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
      }
      // Alright set the timeout to the desired value
      rs.session.setSoTimeout(timeout);
      setConnectedRS(rs);
    }
    catch (Exception e)
    {
      logger.warn(WARN_EXCEPTION_STARTING_SESSION_PHASE, getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      rs.session.close();
      setConnectedRS(ConnectedRS.noConnectedRS());
    }
  }
  /**
   * Performs the second phase handshake (send StartSessionMsg and receive
   * TopologyMsg messages exchange) and return the reply message from the
@@ -2273,7 +2178,7 @@
        try
        {
          connect();
          connectAsDataServer();
          rs = connectedRS.get();
        }
        catch (Exception e)
@@ -3077,7 +2982,8 @@
        Map<Integer, ReplicationServerInfo> previousRsInfos)
    {
      this.rsServerId = rsServerId;
      this.replicaInfos = dsInfosToKeep;
      this.replicaInfos = dsInfosToKeep == null
          ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
          previousRsInfos, configuredReplicationServerUrls);
    }
@@ -3284,7 +3190,9 @@
    @Override
    public String toString()
    {
      return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
      return getClass().getSimpleName()
          + " rsServerId=" + rsServerId
          + ", replicaInfos=" + replicaInfos.values()
          + ", rsInfos=" + rsInfos.values();
    }
  }
opendj3-server-dev/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
File was deleted
opendj3-server-dev/src/server/org/opends/server/workflowelement/externalchangelog/ECLWorkflowElement.java
File was deleted
opendj3-server-dev/src/server/org/opends/server/workflowelement/externalchangelog/package-info.java
File was deleted
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -37,8 +37,6 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
import org.opends.server.replication.protocol.StartECLSessionMsg.Persistent;
import org.opends.server.types.*;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.localbackend.LocalBackendAddOperation;
@@ -631,52 +629,6 @@
        msg1.getBytes(getCurrentVersion()), getCurrentVersion());
  }
  @Test(enabled=true)
  public void eclUpdateMsg()
         throws Exception
  {
    // create a msg to put in the eclupdatemsg
    InternalClientConnection connection =
      InternalClientConnection.getRootConnection();
    DeleteOperation deleteOp =
      new DeleteOperationBasis(connection, 1, 1,null, DN.valueOf("cn=t1"));
    LocalBackendDeleteOperation op = new LocalBackendDeleteOperation(deleteOp);
    CSN csn = new CSN(TimeThread.getTime(), 123, 45);
    op.setAttachment(SYNCHROCONTEXT, new DeleteContext(csn, "uniqueid"));
    DeleteMsg delmsg = new DeleteMsg(op);
    long changeNumber = 21;
    DN baseDN = DN.valueOf("dc=example,dc=com");
    // create a cookie
    MultiDomainServerState cookie =
      new MultiDomainServerState(
          "o=test:000001210b6f21e904b100000001 000001210b6f21e904b200000001;" +
          "o=test2:000001210b6f21e904b100000002 000001210b6f21e904b200000002;");
    // Constructor test
    ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, baseDN, changeNumber);
    assertTrue(msg1.getCookie().equalsTo(cookie));
    assertEquals(msg1.getBaseDN(), baseDN);
    assertEquals(msg1.getChangeNumber(), changeNumber);
    DeleteMsg delmsg2 = (DeleteMsg)msg1.getUpdateMsg();
    assertEquals(delmsg.compareTo(delmsg2), 0);
    // Constructor test (with byte[])
    ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes(getCurrentVersion()));
    assertTrue(msg2.getCookie().equalsTo(msg2.getCookie()));
    assertTrue(msg2.getCookie().equalsTo(cookie));
    assertEquals(msg2.getBaseDN(), msg1.getBaseDN());
    assertEquals(msg2.getBaseDN(), baseDN);
    assertEquals(msg2.getChangeNumber(), msg1.getChangeNumber());
    assertEquals(msg2.getChangeNumber(), changeNumber);
    DeleteMsg delmsg1 = (DeleteMsg)msg1.getUpdateMsg();
    delmsg2 = (DeleteMsg)msg2.getUpdateMsg();
    assertEquals(delmsg2.compareTo(delmsg), 0);
    assertEquals(delmsg2.compareTo(delmsg1), 0);
  }
  @DataProvider(name="createServerStartData")
  public Object[][] createServerStartData() throws Exception
  {
@@ -767,55 +719,6 @@
                 newMsg.getDegradedStatusThreshold());
  }
  @DataProvider(name="createReplServerStartDSData")
  public Object[][] createReplServerStartDSData() throws Exception
  {
    DN baseDN = TEST_ROOT_DN;
    final ServerState state1 = new ServerState();
    state1.update(new CSN(0, 0, 0));
    final ServerState state2 = new ServerState();
    state2.update(new CSN(75, 5, 263));
    final ServerState state3 = new ServerState();
    state3.update(new CSN(123, 5, 98));
    return new Object[][]
    {
      {1, baseDN, 0, "localhost:8989", state1, 0L, (byte)0, 0, 0, 0},
      {16, baseDN, 100, "anotherHost:1025", state2, 1245L, (byte)25, 3456, 3, 31512},
      {36, baseDN, 100, "anotherHostAgain:8017", state3, 6841L, (byte)32, 2496, 630, 9524},
    };
  }
  /**
   * Test that ReplServerStartDSMsg encoding and decoding works
   * by checking that : msg == new ReplServerStartMsg(msg.getBytes()).
   */
  @Test(dataProvider="createReplServerStartDSData")
  public void replServerStartDSMsgTest(int serverId, DN baseDN, int window,
         String url, ServerState state, long genId, byte groupId, int degTh,
         int weight, int connectedDSNumber) throws Exception
  {
    ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId,
        url, baseDN, window, state, genId,
        true, groupId, degTh, weight, connectedDSNumber);
    ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
    assertEquals(msg.getBaseDN(), newMsg.getBaseDN());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getServerState().getCSN(1),
        newMsg.getServerState().getCSN(1));
    assertEquals(newMsg.getVersion(), getCurrentVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
    assertEquals(msg.getGroupId(), newMsg.getGroupId());
    assertEquals(msg.getDegradedStatusThreshold(),
                 newMsg.getDegradedStatusThreshold());
    assertEquals(msg.getWeight(), newMsg.getWeight());
    assertEquals(msg.getConnectedDSNumber(), newMsg.getConnectedDSNumber());
  }
  /**
   * Test that StopMsg encoding and decoding works
   * by checking that : msg == new StopMsg(msg.getBytes()).
@@ -1263,70 +1166,6 @@
    assertEquals(test.getBytes(), newMsg.getPayload());
  }
  /**
   * Test that ServerStartMsg encoding and decoding works
   * by checking that : msg == new ServerStartMsg(msg.getBytes()).
   */
  @Test(enabled=true,dataProvider="createServerStartData")
  public void startECLMsgTest(int serverId, DN baseDN, int window,
         ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception
  {
    ServerStartECLMsg msg = new ServerStartECLMsg(
        "localhost:1234", window, window, window, window, window, window, state,
        genId, sslEncryption, groupId);
    ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
    assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
    assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
    assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay());
    assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
    assertEquals(msg.getServerState().getCSN(1),
        newMsg.getServerState().getCSN(1));
    assertEquals(newMsg.getVersion(), getCurrentVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertEquals(msg.getGroupId(), newMsg.getGroupId());
  }
  /**
   * Test StartSessionMsg encoding and decoding.
   */
  @Test()
  public void startECLSessionMsgTest() throws Exception
  {
    // data
    CSN csn = new CSN(TimeThread.getTime(), 123, 45);
    ServerState state = new ServerState();
    assertTrue(state.update(new CSN(75, 5,263)));
    // create original
    StartECLSessionMsg msg = new StartECLSessionMsg();
    msg.setCSN(csn);
    msg.setCrossDomainServerState("fakegenstate");
    msg.setPersistent(Persistent.PERSISTENT);
    msg.setFirstChangeNumber(13);
    msg.setLastChangeNumber(14);
    msg.setECLRequestType(ECLRequestType.REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER);
    msg.setOperationId("fakeopid");
    String dn1 = "cn=admin data";
    String dn2 = "cn=config";
    msg.setExcludedDNs(newSet(dn1, dn2));
    // create copy
    StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes(getCurrentVersion()));
    // test equality between the two copies
    assertEquals(msg.getCSN(), newMsg.getCSN());
    assertEquals(msg.getPersistent(), newMsg.getPersistent());
    assertEquals(msg.getFirstChangeNumber(), newMsg.getFirstChangeNumber());
    assertEquals(msg.getECLRequestType(), newMsg.getECLRequestType());
    assertEquals(msg.getLastChangeNumber(), newMsg.getLastChangeNumber());
    assertTrue(msg.getCrossDomainServerState().equalsIgnoreCase(newMsg.getCrossDomainServerState()));
    assertTrue(msg.getOperationId().equalsIgnoreCase(newMsg.getOperationId()));
    Assertions.assertThat(newMsg.getExcludedBaseDNs()).containsOnly(dn1, dn2);
  }
  private int perfRep = 100000;
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
File was deleted