mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.08.2014 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -95,15 +95,118 @@
 * <p>
 *   Full Initialization of a replica can be triggered by LDAP clients
 *   by creating InitializeTasks or InitializeTargetTask.
 *   Full initialization can also by triggered from the ReplicationDomain
 *   implementation using methods {@link #initializeRemote(int)}
 *   or {@link #initializeFromRemote(int)}.
 *   Full initialization can also be triggered from the ReplicationDomain
 *   implementation using methods {@link #initializeRemote(int, Task)}
 *   or {@link #initializeFromRemote(int, Task)}.
 * <p>
 *   At shutdown time, the {@link #disableService()} method should be called to
 *   cleanly stop the replication service.
 */
public abstract class ReplicationDomain
{
  /**
   * Contains all the attributes included for the ECL (External Changelog).
   */
  // @Immutable
  private final static class ECLIncludes
  {
    final Map<Integer, Set<String>> includedAttrsByServer;
    final Set<String> includedAttrsAllServers;
    final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
    final Set<String> includedAttrsForDeletesAllServers;
    private ECLIncludes(
        Map<Integer, Set<String>> includedAttrsByServer,
        Set<String> includedAttrsAllServers,
        Map<Integer, Set<String>> includedAttrsForDeletesByServer,
        Set<String> includedAttrsForDeletesAllServers)
    {
      this.includedAttrsByServer = includedAttrsByServer;
      this.includedAttrsAllServers = includedAttrsAllServers;
      this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
      this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
    }
    @SuppressWarnings("unchecked")
    public ECLIncludes()
    {
      this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
          Collections.EMPTY_SET);
    }
    /**
     * Add attributes to be included in the ECL.
     *
     * @param serverId
     *          Server where these attributes are configured.
     * @param includeAttributes
     *          Attributes to be included with all change records, may include
     *          wild-cards.
     * @param includeAttributesForDeletes
     *          Additional attributes to be included with delete change records,
     *          may include wild-cards.
     * @return a new {@link ECLIncludes} object if included attributes have
     *         changed, or the current object otherwise.
     */
    public ECLIncludes addIncludedAttributes(int serverId,
        Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
    {
      boolean configurationChanged = false;
      Set<String> s1 = new HashSet<String>(includeAttributes);
      // Combine all+delete attributes.
      Set<String> s2 = new HashSet<String>(s1);
      s2.addAll(includeAttributesForDeletes);
      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesByServer = new HashMap<Integer, Set<String>>(
            this.includedAttrsByServer);
        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
      }
      Map<Integer, Set<String>> eclIncludesForDeletesByServer =
          this.includedAttrsForDeletesByServer;
      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>(
                this.includedAttrsForDeletesByServer);
        eclIncludesForDeletesByServer.put(
            serverId, Collections.unmodifiableSet(s2));
      }
      if (!configurationChanged)
      {
        return this;
      }
      // and rebuild the global list to be ready for usage
      Set<String> eclIncludesAllServer = new HashSet<String>();
      for (Set<String> attributes : eclIncludesByServer.values())
      {
        eclIncludesAllServer.addAll(attributes);
      }
      Set<String> eclIncludesForDeletesAllServer = new HashSet<String>();
      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
      {
        eclIncludesForDeletesAllServer.addAll(attributes);
      }
      return new ECLIncludes(eclIncludesByServer,
          Collections.unmodifiableSet(eclIncludesAllServer),
          eclIncludesForDeletesByServer,
          Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
    }
  }
  /**
   * Current status for this replicated domain.
   */
@@ -251,14 +354,8 @@
   */
  private final CSNGenerator generator;
  private final Object eclIncludesLock = new Object();
  private final Map<Integer, Set<String>> eclIncludesByServer =
    new HashMap<Integer, Set<String>>();
  private Set<String> eclIncludesAllServers = Collections.emptySet();
  private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
    new HashMap<Integer, Set<String>>();
  private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
  private final AtomicReference<ECLIncludes> eclIncludes =
      new AtomicReference<ECLIncludes>(new ECLIncludes());
  /**
   * An object used to protect the initialization of the underlying broker
@@ -551,9 +648,9 @@
   * Gets the info for Replicas in the topology (except us).
   * @return The info for Replicas in the topology (except us)
   */
  public List<DSInfo> getReplicasList()
  public Map<Integer, DSInfo> getReplicaInfos()
  {
    return broker.getDsList();
    return broker.getReplicaInfos();
  }
  /**
@@ -562,20 +659,13 @@
   * disconnected. Return null when no server with the provided serverId is
   * connected.
   *
   * @param  serverId The provided serverId of the remote replica
   * @param  dsId The provided serverId of the remote replica
   * @return the info related to this remote server if it is connected,
   *                  null is the server is NOT connected.
   */
  public DSInfo isRemoteDSConnected(int serverId)
  private DSInfo isRemoteDSConnected(int dsId)
  {
    for (DSInfo remoteDS : getReplicasList())
    {
      if (remoteDS.getDsId() == serverId)
      {
        return remoteDS;
      }
    }
    return null;
    return getReplicaInfos().get(dsId);
  }
  /**
@@ -601,9 +691,9 @@
   * @return The info for RSs in the topology (except the one we are connected
   * to)
   */
  public List<RSInfo> getRsList()
  public List<RSInfo> getRsInfos()
  {
    return broker.getRsList();
    return broker.getRsInfos();
  }
@@ -1100,7 +1190,7 @@
     *                         for and import, false if the IEContext
     *                         will be used for and export.
     */
    public IEContext(boolean importInProgress)
    private IEContext(boolean importInProgress)
    {
      this.importInProgress = importInProgress;
      this.startTime = System.currentTimeMillis();
@@ -1114,7 +1204,7 @@
     * @return A boolean indicating if a total update import is currently in
     *         Progress.
     */
    public boolean importInProgress()
    boolean importInProgress()
    {
      return importInProgress;
    }
@@ -1153,18 +1243,17 @@
      entryCount = total;
      entryLeftCount = total;
      if (initializeTask != null)
      if (initializeTask instanceof InitializeTask)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setTotal(entryCount);
          ((InitializeTask)initializeTask).setLeft(entryCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
        }
        final InitializeTask task = (InitializeTask) initializeTask;
        task.setTotal(entryCount);
        task.setLeft(entryCount);
      }
      else if (initializeTask instanceof InitializeTargetTask)
      {
        final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
        task.setTotal(entryCount);
        task.setLeft(entryCount);
      }
    }
@@ -1177,7 +1266,7 @@
     *
     * @throws DirectoryException if an error occurred.
     */
    public void updateCounters(int entriesDone) throws DirectoryException
    private void updateCounters(int entriesDone) throws DirectoryException
    {
      entryLeftCount -= entriesDone;
@@ -1198,7 +1287,7 @@
    @Override
    public String toString()
    {
      return "[ Entry count=" + this.entryCount +
      return "[Entry count=" + this.entryCount +
             ", Entry left count=" + this.entryLeftCount + "]";
    }
@@ -1258,7 +1347,7 @@
     * @param serverId serverId of the acknowledger/receiver/importer server.
     * @param numAck   id of the message received.
     */
    public void setAckVal(int serverId, int numAck)
    private void setAckVal(int serverId, int numAck)
    {
      if (logger.isTraceEnabled())
        logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
@@ -1315,6 +1404,7 @@
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
        // JNR: Yes please
      }
      return target;
    }
@@ -1338,7 +1428,7 @@
   *
   * @param target   The server-id of the server that should be initialized.
   *                 The target can be discovered using the
   *                 {@link #getReplicasList()} method.
   *                 {@link #getReplicaInfos()} method.
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   *
@@ -1386,13 +1476,10 @@
      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
          countEntries(), getBaseDNString(), getServerId());
      for (DSInfo dsi : getReplicasList())
      {
        ieCtx.startList.add(dsi.getDsId());
      }
      ieCtx.startList.addAll(getReplicaInfos().keySet());
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
      for (DSInfo dsi : getReplicaInfos().values())
      {
        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
@@ -1408,7 +1495,7 @@
      ieCtx.startList.add(serverToInitialize);
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
      for (DSInfo dsi : getReplicaInfos().values())
      {
        if (dsi.getDsId() == serverToInitialize &&
            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
@@ -1453,7 +1540,7 @@
        {
          throw new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(),
                  ieCtx.failureList));
        }
@@ -1472,7 +1559,7 @@
      if (logger.isTraceEnabled())
        logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
            + " export ends with " + " connected=" + broker.isConnected()
            + " export ends with connected=" + broker.isConnected()
            + " exportRootException=" + exportRootException);
      if (exportRootException != null)
@@ -1592,7 +1679,7 @@
    do
    {
      done = true;
      for (DSInfo dsi : getReplicasList())
      for (DSInfo dsi : getReplicaInfos().values())
      {
        if (logger.isTraceEnabled())
          logger.trace(
@@ -1650,10 +1737,7 @@
    considered in the processing of sorting the successfully initialized
    and the others
    */
    for (DSInfo dsi : getReplicasList())
    {
      replicasWeAreWaitingFor.add(dsi.getDsId());
    }
    replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
    boolean done;
    do
@@ -1698,13 +1782,11 @@
            done = false;
            break;
          }
          else
          {
            if (dsInfo.getGenerationId() == getGenerationID())
            { // and with the expected generationId
              // We're done with this server
              it.remove();
            }
          if (dsInfo.getGenerationId() == getGenerationID())
          { // and with the expected generationId
            // We're done with this server
            it.remove();
          }
        }
      }
@@ -1717,7 +1799,6 @@
          Thread.currentThread().interrupt();
        } // 1sec
      }
    }
    while (!done && !broker.shuttingDown()); // infinite wait
@@ -1967,8 +2048,8 @@
   *
   * @throws IOException when an error occurred.
   */
  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
  throws IOException
  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
      throws IOException
  {
    if (logger.isTraceEnabled())
      logger.trace("[IE] Entering exportLDIFEntry entry=" +
@@ -2072,53 +2153,6 @@
  }
  /**
   * Initializes this domain from another source server.
   * <p>
   * When this method is called, a request for initialization will
   * be sent to the source server asking for initialization.
   * <p>
   * The {@code exportBackend(OutputStream)} will therefore be called
   * on the source server, and the {@code importBackend(InputStream)}
   * will be called on his server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
   * methods will be connected through the replication protocol.
   *
   * @param source   The server-id of the source from which to initialize.
   *                 The source can be discovered using the
   *                 {@link #getReplicasList()} method.
   *
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeFromRemote(int source) throws DirectoryException
  {
    initializeFromRemote(source, null);
  }
  /**
   * Initializes a remote server from this server.
   * <p>
   * The {@code exportBackend(OutputStream)} will therefore be called
   * on this server, and the {@code importBackend(InputStream)}
   * will be called on the remote server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
   * methods will be connected through the replication protocol.
   *
   * @param target   The server-id of the server that should be initialized.
   *                 The target can be discovered using the
   *                 {@link #getReplicasList()} method.
   *
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeRemote(int target) throws DirectoryException
  {
    initializeRemote(target, null);
  }
  /**
   * Initializes asynchronously this domain from a remote source server.
   * Before returning from this call, for the provided task :
   * - the progressing counters are updated during the initialization using
@@ -2131,7 +2165,7 @@
   *
   * @param source   The server-id of the source from which to initialize.
   *                 The source can be discovered using the
   *                 {@link #getReplicasList()} method.
   *                 {@link #getReplicaInfos()} method.
   *
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
@@ -2217,7 +2251,7 @@
   *                          task has initially been created (this server,
   *                          or the remote server).
   */
  void initialize(InitializeTargetMsg initTargetMsgReceived,
  private void initialize(InitializeTargetMsg initTargetMsgReceived,
      int requesterServerId)
  {
    InitializeTask initFromTask = null;
@@ -2254,7 +2288,6 @@
      ieCtx.importSource = source;
      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask) ieCtx.initializeTask;
@@ -2382,18 +2415,14 @@
   * @param dsServerId The provided serverId.
   * @return The protocol version.
   */
  short getProtocolVersion(int dsServerId)
  private short getProtocolVersion(int dsServerId)
  {
    short protocolVersion = -1;
    for (DSInfo dsi : getReplicasList())
    final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
    if (dsInfo != null)
    {
      if (dsi.getDsId() == dsServerId)
      {
        protocolVersion = dsi.getProtocolVersion();
        break;
      }
      return dsInfo.getProtocolVersion();
    }
    return protocolVersion;
    return -1;
  }
  /**
@@ -2459,7 +2488,7 @@
    for (int i = 0; i< 50; i++)
    {
      allSet = true;
      for (RSInfo rsInfo : getRsList())
      for (RSInfo rsInfo : getRsInfos())
      {
        // the 'empty' RSes (generationId==-1) are considered as good citizens
        if (rsInfo.getGenerationId() != -1 &&
@@ -2498,7 +2527,7 @@
   *                           connected to a Replication Server or it
   *                           was not possible to contact it.
   */
  public void resetReplicationLog() throws DirectoryException
  void resetReplicationLog() throws DirectoryException
  {
    // Reset the Generation ID to -1 to clean the ReplicationServers.
    resetGenerationId(-1L);
@@ -2913,7 +2942,7 @@
      {
        // create the broker object used to publish and receive changes
        broker = new ReplicationBroker(
            this, state, config, getGenerationID(), new ReplSessionSecurity());
            this, state, config, new ReplSessionSecurity());
        broker.start();
      }
    }
@@ -3067,8 +3096,8 @@
  }
  /**
   * Applies a configuration change to the attributes which should be be
   * included in the ECL.
   * Applies a configuration change to the attributes which should be included
   * in the ECL.
   *
   * @param includeAttributes
   *          attributes to be included with all change records.
@@ -3385,7 +3414,7 @@
   * @param msg  The byte array containing the information that should
   *             be sent to the remote entities.
   */
  public void publish(byte[] msg)
  void publish(byte[] msg)
  {
    UpdateMsg update;
    synchronized (this)
@@ -3489,46 +3518,16 @@
      Set<String> includeAttributes,
      Set<String> includeAttributesForDeletes)
  {
    boolean configurationChanged = false;
    synchronized (eclIncludesLock)
    ECLIncludes current;
    ECLIncludes updated;
    do
    {
      Set<String> s1 = new HashSet<String>(includeAttributes);
      // Combine all+delete attributes.
      Set<String> s2 = new HashSet<String>(s1);
      s2.addAll(includeAttributesForDeletes);
      if (!s1.equals(eclIncludesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
      }
      if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesForDeletesByServer.put(serverId,
            Collections.unmodifiableSet(s2));
      }
      // and rebuild the global list to be ready for usage
      Set<String> s = new HashSet<String>();
      for (Set<String> attributes : eclIncludesByServer.values())
      {
        s.addAll(attributes);
      }
      eclIncludesAllServers = Collections.unmodifiableSet(s);
      s = new HashSet<String>();
      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
      {
        s.addAll(attributes);
      }
      eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s);
      current = this.eclIncludes.get();
      updated = current.addIncludedAttributes(
          serverId, includeAttributes, includeAttributesForDeletes);
    }
    return configurationChanged;
    while (!this.eclIncludes.compareAndSet(current, updated));
    return current != updated;
  }
@@ -3540,10 +3539,7 @@
   */
  public Set<String> getEclIncludes()
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesAllServers;
    }
    return eclIncludes.get().includedAttrsAllServers;
  }
@@ -3555,10 +3551,7 @@
   */
  public Set<String> getEclIncludesForDeletes()
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesForDeletesAllServers;
    }
    return eclIncludes.get().includedAttrsForDeletesAllServers;
  }
@@ -3573,10 +3566,7 @@
   */
  Set<String> getEclIncludes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesByServer.get(serverId);
    }
    return eclIncludes.get().includedAttrsByServer.get(serverId);
  }
@@ -3591,10 +3581,7 @@
   */
  Set<String> getEclIncludesForDeletes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
      return eclIncludesForDeletesByServer.get(serverId);
    }
    return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
  }
  /**