From 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:08:01 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR

---
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java |  355 ++++++++++++++++++++++++++++------------------------------
 1 files changed, 171 insertions(+), 184 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 60a2b4a..1323777 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -95,15 +95,118 @@
  * <p>
  *   Full Initialization of a replica can be triggered by LDAP clients
  *   by creating InitializeTasks or InitializeTargetTask.
- *   Full initialization can also by triggered from the ReplicationDomain
- *   implementation using methods {@link #initializeRemote(int)}
- *   or {@link #initializeFromRemote(int)}.
+ *   Full initialization can also be triggered from the ReplicationDomain
+ *   implementation using methods {@link #initializeRemote(int, Task)}
+ *   or {@link #initializeFromRemote(int, Task)}.
  * <p>
  *   At shutdown time, the {@link #disableService()} method should be called to
  *   cleanly stop the replication service.
  */
 public abstract class ReplicationDomain
 {
+
+  /**
+   * Contains all the attributes included for the ECL (External Changelog).
+   */
+  // @Immutable
+  private final static class ECLIncludes
+  {
+
+    final Map<Integer, Set<String>> includedAttrsByServer;
+    final Set<String> includedAttrsAllServers;
+
+    final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
+    final Set<String> includedAttrsForDeletesAllServers;
+
+    private ECLIncludes(
+        Map<Integer, Set<String>> includedAttrsByServer,
+        Set<String> includedAttrsAllServers,
+        Map<Integer, Set<String>> includedAttrsForDeletesByServer,
+        Set<String> includedAttrsForDeletesAllServers)
+    {
+      this.includedAttrsByServer = includedAttrsByServer;
+      this.includedAttrsAllServers = includedAttrsAllServers;
+
+      this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
+      this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
+    }
+
+    @SuppressWarnings("unchecked")
+    public ECLIncludes()
+    {
+      this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
+          Collections.EMPTY_SET);
+    }
+
+    /**
+     * Add attributes to be included in the ECL.
+     *
+     * @param serverId
+     *          Server where these attributes are configured.
+     * @param includeAttributes
+     *          Attributes to be included with all change records, may include
+     *          wild-cards.
+     * @param includeAttributesForDeletes
+     *          Additional attributes to be included with delete change records,
+     *          may include wild-cards.
+     * @return a new {@link ECLIncludes} object if included attributes have
+     *         changed, or the current object otherwise.
+     */
+    public ECLIncludes addIncludedAttributes(int serverId,
+        Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
+    {
+      boolean configurationChanged = false;
+
+      Set<String> s1 = new HashSet<String>(includeAttributes);
+
+      // Combine all+delete attributes.
+      Set<String> s2 = new HashSet<String>(s1);
+      s2.addAll(includeAttributesForDeletes);
+
+      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
+      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
+      {
+        configurationChanged = true;
+        eclIncludesByServer = new HashMap<Integer, Set<String>>(
+            this.includedAttrsByServer);
+        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
+      }
+
+      Map<Integer, Set<String>> eclIncludesForDeletesByServer =
+          this.includedAttrsForDeletesByServer;
+      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
+      {
+        configurationChanged = true;
+        eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>(
+                this.includedAttrsForDeletesByServer);
+        eclIncludesForDeletesByServer.put(
+            serverId, Collections.unmodifiableSet(s2));
+      }
+
+      if (!configurationChanged)
+      {
+        return this;
+      }
+
+      // and rebuild the global list to be ready for usage
+      Set<String> eclIncludesAllServer = new HashSet<String>();
+      for (Set<String> attributes : eclIncludesByServer.values())
+      {
+        eclIncludesAllServer.addAll(attributes);
+      }
+
+      Set<String> eclIncludesForDeletesAllServer = new HashSet<String>();
+      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
+      {
+        eclIncludesForDeletesAllServer.addAll(attributes);
+      }
+      return new ECLIncludes(eclIncludesByServer,
+          Collections.unmodifiableSet(eclIncludesAllServer),
+          eclIncludesForDeletesByServer,
+          Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
+    }
+  }
+
   /**
    * Current status for this replicated domain.
    */
@@ -251,14 +354,8 @@
    */
   private final CSNGenerator generator;
 
-  private final Object eclIncludesLock = new Object();
-  private final Map<Integer, Set<String>> eclIncludesByServer =
-    new HashMap<Integer, Set<String>>();
-  private Set<String> eclIncludesAllServers = Collections.emptySet();
-
-  private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
-    new HashMap<Integer, Set<String>>();
-  private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
+  private final AtomicReference<ECLIncludes> eclIncludes =
+      new AtomicReference<ECLIncludes>(new ECLIncludes());
 
   /**
    * An object used to protect the initialization of the underlying broker
@@ -551,9 +648,9 @@
    * Gets the info for Replicas in the topology (except us).
    * @return The info for Replicas in the topology (except us)
    */
-  public List<DSInfo> getReplicasList()
+  public Map<Integer, DSInfo> getReplicaInfos()
   {
-    return broker.getDsList();
+    return broker.getReplicaInfos();
   }
 
   /**
@@ -562,20 +659,13 @@
    * disconnected. Return null when no server with the provided serverId is
    * connected.
    *
-   * @param  serverId The provided serverId of the remote replica
+   * @param  dsId The provided serverId of the remote replica
    * @return the info related to this remote server if it is connected,
    *                  null is the server is NOT connected.
    */
-  public DSInfo isRemoteDSConnected(int serverId)
+  private DSInfo isRemoteDSConnected(int dsId)
   {
-    for (DSInfo remoteDS : getReplicasList())
-    {
-      if (remoteDS.getDsId() == serverId)
-      {
-        return remoteDS;
-      }
-    }
-    return null;
+    return getReplicaInfos().get(dsId);
   }
 
   /**
@@ -601,9 +691,9 @@
    * @return The info for RSs in the topology (except the one we are connected
    * to)
    */
-  public List<RSInfo> getRsList()
+  public List<RSInfo> getRsInfos()
   {
-    return broker.getRsList();
+    return broker.getRsInfos();
   }
 
 
@@ -1100,7 +1190,7 @@
      *                         for and import, false if the IEContext
      *                         will be used for and export.
      */
-    public IEContext(boolean importInProgress)
+    private IEContext(boolean importInProgress)
     {
       this.importInProgress = importInProgress;
       this.startTime = System.currentTimeMillis();
@@ -1114,7 +1204,7 @@
      * @return A boolean indicating if a total update import is currently in
      *         Progress.
      */
-    public boolean importInProgress()
+    boolean importInProgress()
     {
       return importInProgress;
     }
@@ -1153,18 +1243,17 @@
       entryCount = total;
       entryLeftCount = total;
 
-      if (initializeTask != null)
+      if (initializeTask instanceof InitializeTask)
       {
-        if (initializeTask instanceof InitializeTask)
-        {
-          ((InitializeTask)initializeTask).setTotal(entryCount);
-          ((InitializeTask)initializeTask).setLeft(entryCount);
-        }
-        else if (initializeTask instanceof InitializeTargetTask)
-        {
-          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
-          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
-        }
+        final InitializeTask task = (InitializeTask) initializeTask;
+        task.setTotal(entryCount);
+        task.setLeft(entryCount);
+      }
+      else if (initializeTask instanceof InitializeTargetTask)
+      {
+        final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
+        task.setTotal(entryCount);
+        task.setLeft(entryCount);
       }
     }
 
@@ -1177,7 +1266,7 @@
      *
      * @throws DirectoryException if an error occurred.
      */
-    public void updateCounters(int entriesDone) throws DirectoryException
+    private void updateCounters(int entriesDone) throws DirectoryException
     {
       entryLeftCount -= entriesDone;
 
@@ -1198,7 +1287,7 @@
     @Override
     public String toString()
     {
-      return "[ Entry count=" + this.entryCount +
+      return "[Entry count=" + this.entryCount +
              ", Entry left count=" + this.entryLeftCount + "]";
     }
 
@@ -1258,7 +1347,7 @@
      * @param serverId serverId of the acknowledger/receiver/importer server.
      * @param numAck   id of the message received.
      */
-    public void setAckVal(int serverId, int numAck)
+    private void setAckVal(int serverId, int numAck)
     {
       if (logger.isTraceEnabled())
         logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
@@ -1315,6 +1404,7 @@
       if (target >= 0)
       {
         // FIXME Could we check now that it is a know server in the domain ?
+        // JNR: Yes please
       }
       return target;
     }
@@ -1338,7 +1428,7 @@
    *
    * @param target   The server-id of the server that should be initialized.
    *                 The target can be discovered using the
-   *                 {@link #getReplicasList()} method.
+   *                 {@link #getReplicaInfos()} method.
    * @param initTask The task that triggers this initialization and that should
    *                 be updated with its progress.
    *
@@ -1386,13 +1476,10 @@
       logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
           countEntries(), getBaseDNString(), getServerId());
 
-      for (DSInfo dsi : getReplicasList())
-      {
-        ieCtx.startList.add(dsi.getDsId());
-      }
+      ieCtx.startList.addAll(getReplicaInfos().keySet());
 
       // We manage the list of servers with which a flow control can be enabled
-      for (DSInfo dsi : getReplicasList())
+      for (DSInfo dsi : getReplicaInfos().values())
       {
         if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
@@ -1408,7 +1495,7 @@
       ieCtx.startList.add(serverToInitialize);
 
       // We manage the list of servers with which a flow control can be enabled
-      for (DSInfo dsi : getReplicasList())
+      for (DSInfo dsi : getReplicaInfos().values())
       {
         if (dsi.getDsId() == serverToInitialize &&
             dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
@@ -1453,7 +1540,7 @@
         {
           throw new DirectoryException(
               ResultCode.OTHER,
-              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
+              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(),
                   ieCtx.failureList));
         }
 
@@ -1472,7 +1559,7 @@
 
       if (logger.isTraceEnabled())
         logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
-            + " export ends with " + " connected=" + broker.isConnected()
+            + " export ends with connected=" + broker.isConnected()
             + " exportRootException=" + exportRootException);
 
       if (exportRootException != null)
@@ -1592,7 +1679,7 @@
     do
     {
       done = true;
-      for (DSInfo dsi : getReplicasList())
+      for (DSInfo dsi : getReplicaInfos().values())
       {
         if (logger.isTraceEnabled())
           logger.trace(
@@ -1650,10 +1737,7 @@
     considered in the processing of sorting the successfully initialized
     and the others
     */
-    for (DSInfo dsi : getReplicasList())
-    {
-      replicasWeAreWaitingFor.add(dsi.getDsId());
-    }
+    replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
 
     boolean done;
     do
@@ -1698,13 +1782,11 @@
             done = false;
             break;
           }
-          else
-          {
-            if (dsInfo.getGenerationId() == getGenerationID())
-            { // and with the expected generationId
-              // We're done with this server
-              it.remove();
-            }
+
+          if (dsInfo.getGenerationId() == getGenerationID())
+          { // and with the expected generationId
+            // We're done with this server
+            it.remove();
           }
         }
       }
@@ -1717,7 +1799,6 @@
           Thread.currentThread().interrupt();
         } // 1sec
       }
-
     }
     while (!done && !broker.shuttingDown()); // infinite wait
 
@@ -1967,8 +2048,8 @@
    *
    * @throws IOException when an error occurred.
    */
-  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
-  throws IOException
+  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
+      throws IOException
   {
     if (logger.isTraceEnabled())
       logger.trace("[IE] Entering exportLDIFEntry entry=" +
@@ -2072,53 +2153,6 @@
   }
 
   /**
-   * Initializes this domain from another source server.
-   * <p>
-   * When this method is called, a request for initialization will
-   * be sent to the source server asking for initialization.
-   * <p>
-   * The {@code exportBackend(OutputStream)} will therefore be called
-   * on the source server, and the {@code importBackend(InputStream)}
-   * will be called on his server.
-   * <p>
-   * The InputStream and OutpuStream given as a parameter to those
-   * methods will be connected through the replication protocol.
-   *
-   * @param source   The server-id of the source from which to initialize.
-   *                 The source can be discovered using the
-   *                 {@link #getReplicasList()} method.
-   *
-   * @throws DirectoryException If it was not possible to publish the
-   *                            Initialization message to the Topology.
-   */
-  public void initializeFromRemote(int source) throws DirectoryException
-  {
-    initializeFromRemote(source, null);
-  }
-
-  /**
-   * Initializes a remote server from this server.
-   * <p>
-   * The {@code exportBackend(OutputStream)} will therefore be called
-   * on this server, and the {@code importBackend(InputStream)}
-   * will be called on the remote server.
-   * <p>
-   * The InputStream and OutpuStream given as a parameter to those
-   * methods will be connected through the replication protocol.
-   *
-   * @param target   The server-id of the server that should be initialized.
-   *                 The target can be discovered using the
-   *                 {@link #getReplicasList()} method.
-   *
-   * @throws DirectoryException If it was not possible to publish the
-   *                            Initialization message to the Topology.
-   */
-  public void initializeRemote(int target) throws DirectoryException
-  {
-    initializeRemote(target, null);
-  }
-
-  /**
    * Initializes asynchronously this domain from a remote source server.
    * Before returning from this call, for the provided task :
    * - the progressing counters are updated during the initialization using
@@ -2131,7 +2165,7 @@
    *
    * @param source   The server-id of the source from which to initialize.
    *                 The source can be discovered using the
-   *                 {@link #getReplicasList()} method.
+   *                 {@link #getReplicaInfos()} method.
    *
    * @param initTask The task that launched the initialization
    *                 and should be updated of its progress.
@@ -2217,7 +2251,7 @@
    *                          task has initially been created (this server,
    *                          or the remote server).
    */
-  void initialize(InitializeTargetMsg initTargetMsgReceived,
+  private void initialize(InitializeTargetMsg initTargetMsgReceived,
       int requesterServerId)
   {
     InitializeTask initFromTask = null;
@@ -2254,7 +2288,6 @@
       ieCtx.importSource = source;
       ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
       ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
-      // Protocol version is -1 when not known.
       ieCtx.exporterProtocolVersion = getProtocolVersion(source);
       initFromTask = (InitializeTask) ieCtx.initializeTask;
 
@@ -2382,18 +2415,14 @@
    * @param dsServerId The provided serverId.
    * @return The protocol version.
    */
-  short getProtocolVersion(int dsServerId)
+  private short getProtocolVersion(int dsServerId)
   {
-    short protocolVersion = -1;
-    for (DSInfo dsi : getReplicasList())
+    final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
+    if (dsInfo != null)
     {
-      if (dsi.getDsId() == dsServerId)
-      {
-        protocolVersion = dsi.getProtocolVersion();
-        break;
-      }
+      return dsInfo.getProtocolVersion();
     }
-    return protocolVersion;
+    return -1;
   }
 
   /**
@@ -2459,7 +2488,7 @@
     for (int i = 0; i< 50; i++)
     {
       allSet = true;
-      for (RSInfo rsInfo : getRsList())
+      for (RSInfo rsInfo : getRsInfos())
       {
         // the 'empty' RSes (generationId==-1) are considered as good citizens
         if (rsInfo.getGenerationId() != -1 &&
@@ -2498,7 +2527,7 @@
    *                           connected to a Replication Server or it
    *                           was not possible to contact it.
    */
-  public void resetReplicationLog() throws DirectoryException
+  void resetReplicationLog() throws DirectoryException
   {
     // Reset the Generation ID to -1 to clean the ReplicationServers.
     resetGenerationId(-1L);
@@ -2913,7 +2942,7 @@
       {
         // create the broker object used to publish and receive changes
         broker = new ReplicationBroker(
-            this, state, config, getGenerationID(), new ReplSessionSecurity());
+            this, state, config, new ReplSessionSecurity());
         broker.start();
       }
     }
@@ -3067,8 +3096,8 @@
   }
 
   /**
-   * Applies a configuration change to the attributes which should be be
-   * included in the ECL.
+   * Applies a configuration change to the attributes which should be included
+   * in the ECL.
    *
    * @param includeAttributes
    *          attributes to be included with all change records.
@@ -3385,7 +3414,7 @@
    * @param msg  The byte array containing the information that should
    *             be sent to the remote entities.
    */
-  public void publish(byte[] msg)
+  void publish(byte[] msg)
   {
     UpdateMsg update;
     synchronized (this)
@@ -3489,46 +3518,16 @@
       Set<String> includeAttributes,
       Set<String> includeAttributesForDeletes)
   {
-    boolean configurationChanged = false;
-
-    synchronized (eclIncludesLock)
+    ECLIncludes current;
+    ECLIncludes updated;
+    do
     {
-      Set<String> s1 = new HashSet<String>(includeAttributes);
-
-      // Combine all+delete attributes.
-      Set<String> s2 = new HashSet<String>(s1);
-      s2.addAll(includeAttributesForDeletes);
-
-      if (!s1.equals(eclIncludesByServer.get(serverId)))
-      {
-        configurationChanged = true;
-        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
-      }
-
-      if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
-      {
-        configurationChanged = true;
-        eclIncludesForDeletesByServer.put(serverId,
-            Collections.unmodifiableSet(s2));
-      }
-
-      // and rebuild the global list to be ready for usage
-      Set<String> s = new HashSet<String>();
-      for (Set<String> attributes : eclIncludesByServer.values())
-      {
-        s.addAll(attributes);
-      }
-      eclIncludesAllServers = Collections.unmodifiableSet(s);
-
-      s = new HashSet<String>();
-      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
-      {
-        s.addAll(attributes);
-      }
-      eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s);
+      current = this.eclIncludes.get();
+      updated = current.addIncludedAttributes(
+          serverId, includeAttributes, includeAttributesForDeletes);
     }
-
-    return configurationChanged;
+    while (!this.eclIncludes.compareAndSet(current, updated));
+    return current != updated;
   }
 
 
@@ -3540,10 +3539,7 @@
    */
   public Set<String> getEclIncludes()
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesAllServers;
-    }
+    return eclIncludes.get().includedAttrsAllServers;
   }
 
 
@@ -3555,10 +3551,7 @@
    */
   public Set<String> getEclIncludesForDeletes()
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesForDeletesAllServers;
-    }
+    return eclIncludes.get().includedAttrsForDeletesAllServers;
   }
 
 
@@ -3573,10 +3566,7 @@
    */
   Set<String> getEclIncludes(int serverId)
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesByServer.get(serverId);
-    }
+    return eclIncludes.get().includedAttrsByServer.get(serverId);
   }
 
 
@@ -3591,10 +3581,7 @@
    */
   Set<String> getEclIncludesForDeletes(int serverId)
   {
-    synchronized (eclIncludesLock)
-    {
-      return eclIncludesForDeletesByServer.get(serverId);
-    }
+    return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
   }
 
   /**

--
Gitblit v1.10.0