mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.26.2014 de36fa06856d8d04652401bb24e49c3259aef154
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -103,7 +103,8 @@
  /**
   * Holds the last time each replica was seen alive, whether via updates or
   * heartbeats received. Data is held for each serverId cross domain.
   * heartbeat notifications, or offline notifications. Data is held for each
   * serverId cross domain.
   * <p>
   * Updates are persistent and stored in the replicaDBs, heartbeats are
   * transient and are easily constructed on normal operations.
@@ -221,6 +222,7 @@
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   * @see MultimasterReplication#isECLEnabledDomain(DN)
   */
  protected boolean isECLEnabledDomain(DN baseDN)
  {
@@ -351,6 +353,20 @@
      nextChangeForInsertDBCursor.next();
    }
    for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
        .entrySet())
    {
      final DN baseDN = entry.getKey();
      final List<CSN> offlineCSNs = entry.getValue();
      for (CSN offlineCSN : offlineCSNs)
      {
        if (isECLEnabledDomain(baseDN))
        {
          replicasOffline.update(baseDN, offlineCSN);
        }
      }
    }
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
@@ -554,20 +570,33 @@
  }
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN)
      final DN mcBaseDN) throws ChangelogException
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mcCSN)
        // If no new updates has been seen for this replica
        && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
    if (offlineCSN != null)
    {
      removeCursor(mcBaseDN, mcCSN);
      replicasOffline.removeCSN(mcBaseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
      {
        // replica is back online, we can forget the last time it was offline
        replicasOffline.removeCSN(mcBaseDN, offlineCSN);
      }
      else if (offlineCSN.isOlderThan(mcCSN))
      {
        /*
         * replica is not back online and Medium consistency point has gone past
         * its last offline time: remove everything known about it: cursor,
         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
         * from the medium consistency RUV.
         */
        removeCursor(mcBaseDN, mcCSN);
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
    }
  }
@@ -587,6 +616,7 @@
  }
  private void removeCursor(final DN baseDN, final CSN csn)
      throws ChangelogException
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
@@ -601,6 +631,7 @@
          {
            iter.remove();
            StaticUtils.close(entry2.getValue());
            resetNextChangeForInsertDBCursor();
            return;
          }
        }