mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.48.2015 a89f7014aeb71dba5c94404dfea7eb89e7eeee74
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -157,30 +157,26 @@
    {
      boolean configurationChanged = false;
      Set<String> s1 = new HashSet<String>(includeAttributes);
      Set<String> s1 = new HashSet<>(includeAttributes);
      // Combine all+delete attributes.
      Set<String> s2 = new HashSet<String>(s1);
      Set<String> s2 = new HashSet<>(s1);
      s2.addAll(includeAttributesForDeletes);
      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesByServer = new HashMap<Integer, Set<String>>(
            this.includedAttrsByServer);
        eclIncludesByServer = new HashMap<>(this.includedAttrsByServer);
        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
      }
      Map<Integer, Set<String>> eclIncludesForDeletesByServer =
          this.includedAttrsForDeletesByServer;
      Map<Integer, Set<String>> eclIncludesForDeletesByServer = this.includedAttrsForDeletesByServer;
      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
      {
        configurationChanged = true;
        eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>(
                this.includedAttrsForDeletesByServer);
        eclIncludesForDeletesByServer.put(
            serverId, Collections.unmodifiableSet(s2));
        eclIncludesForDeletesByServer = new HashMap<>(this.includedAttrsForDeletesByServer);
        eclIncludesForDeletesByServer.put(serverId, Collections.unmodifiableSet(s2));
      }
      if (!configurationChanged)
@@ -189,13 +185,13 @@
      }
      // and rebuild the global list to be ready for usage
      Set<String> eclIncludesAllServer = new HashSet<String>();
      Set<String> eclIncludesAllServer = new HashSet<>();
      for (Set<String> attributes : eclIncludesByServer.values())
      {
        eclIncludesAllServer.addAll(attributes);
      }
      Set<String> eclIncludesForDeletesAllServer = new HashSet<String>();
      Set<String> eclIncludesForDeletesAllServer = new HashSet<>();
      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
      {
        eclIncludesForDeletesAllServer.addAll(attributes);
@@ -234,16 +230,12 @@
   * to be able to correlate all the coming back acks to the original
   * operation.
   */
  private final Map<CSN, UpdateMsg> waitingAckMsgs =
    new ConcurrentHashMap<CSN, UpdateMsg>();
  private final Map<CSN, UpdateMsg> waitingAckMsgs = new ConcurrentHashMap<>();
  /**
   * The context related to an import or export being processed
   * Null when none is being processed.
   */
  private final AtomicReference<ImportExportContext> importExportContext =
      new AtomicReference<ImportExportContext>();
  private final AtomicReference<ImportExportContext> importExportContext = new AtomicReference<>();
  /**
   * The Thread waiting for incoming update messages for this domain and pushing
@@ -252,9 +244,7 @@
   */
  private volatile DirectoryThread listenerThread;
  /**
   * A set of counters used for Monitoring.
   */
  /** A set of counters used for Monitoring. */
  private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
  private AtomicInteger numSentUpdates = new AtomicInteger(0);
@@ -273,8 +263,7 @@
   * successfully acknowledged (either because of timeout, wrong status or error
   * at replay).
   */
  private AtomicInteger assuredSrNotAcknowledgedUpdates =
    new AtomicInteger(0);
  private AtomicInteger assuredSrNotAcknowledgedUpdates = new AtomicInteger(0);
  /**
   * Number of updates sent in Assured Mode, Safe Read, that have not been
   * successfully acknowledged because of timeout.
@@ -297,8 +286,7 @@
   * <p>
   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
   */
  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
    new HashMap<Integer,Integer>();
  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = new HashMap<>();
  /** Number of updates received in Assured Mode, Safe Read request. */
  private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
  /**
@@ -330,8 +318,7 @@
   * <p>
   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
   */
  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = new HashMap<>();
  /* Status related monitoring fields */
@@ -354,8 +341,7 @@
   */
  private final CSNGenerator generator;
  private final AtomicReference<ECLIncludes> eclIncludes =
      new AtomicReference<ECLIncludes>(new ECLIncludes());
  private final AtomicReference<ECLIncludes> eclIncludes = new AtomicReference<>(new ECLIncludes());
  /**
   * An object used to protect the initialization of the underlying broker
@@ -1127,28 +1113,21 @@
     */
    private final long startTime;
    /**
     * List for replicas (DS) connected to the topology when initialization
     * started.
     */
    private final Set<Integer> startList = new HashSet<Integer>(0);
    /** List for replicas (DS) connected to the topology when initialization started. */
    private final Set<Integer> startList = new HashSet<>(0);
    /**
     * List for replicas (DS) with a failure (disconnected from the topology)
     * since the initialization started.
     */
    private final Set<Integer> failureList = new HashSet<Integer>(0);
    private final Set<Integer> failureList = new HashSet<>(0);
    /**
     * Flow control during initialization: for each remote server, counter of
     * messages received.
     */
    private final Map<Integer, Integer> ackVals =
      new HashMap<Integer, Integer>();
    /**
     * ServerId of the slowest server (the one with the smallest non null
     * counter).
     */
    private final Map<Integer, Integer> ackVals = new HashMap<>();
    /** ServerId of the slowest server (the one with the smallest non null counter). */
    private int slowestServerId = -1;
    private short exporterProtocolVersion = -1;
@@ -1644,8 +1623,7 @@
   */
  private void waitForRemoteStartOfInit(ImportExportContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieCtx.startList);
    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
    if (logger.isTraceEnabled())
      logger.trace(
@@ -1702,8 +1680,7 @@
   */
  private void waitForRemoteEndOfInit(ImportExportContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieCtx.startList);
    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
    if (logger.isTraceEnabled())
      logger.trace(
@@ -2774,8 +2751,7 @@
  {
    synchronized(assuredSrServerNotAcknowledgedUpdates)
    {
      return new HashMap<Integer, Integer>(
          assuredSrServerNotAcknowledgedUpdates);
      return new HashMap<>(assuredSrServerNotAcknowledgedUpdates);
    }
  }
@@ -2852,7 +2828,7 @@
  {
    synchronized(assuredSdServerTimeoutUpdates)
    {
      return new HashMap<Integer, Integer>(assuredSdServerTimeoutUpdates);
      return new HashMap<>(assuredSdServerTimeoutUpdates);
    }
  }
@@ -3189,7 +3165,7 @@
            //   -> replay error occurred
            ackMsg.setHasReplayError(true);
            //   -> replay error occurred in our server
            List<Integer> idList = new ArrayList<Integer>();
            List<Integer> idList = new ArrayList<>();
            idList.add(getServerId());
            ackMsg.setFailedServers(idList);
          }
@@ -3427,7 +3403,7 @@
   */
  public Collection<Attribute> getAdditionalMonitoring()
  {
    return new ArrayList<Attribute>();
    return new ArrayList<>();
  }
  /**