mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.24.2013 9a13d05fcb1b17c52c7b91b8445d334bce3f9e28
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -64,8 +64,7 @@
  /*
   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
   * 1) initialization can happen while the replication server starts receiving
   * updates 2) many updates can happen concurrently. This solution also avoids
   * using a queue that could fill up before we have consumed all its content.
   * updates 2) many updates can happen concurrently.
   */
  /**
   * Holds the cross domain medium consistency Replication Update Vector for the
@@ -98,6 +97,8 @@
   */
  private final MultiDomainServerState lastSeenUpdates =
      new MultiDomainServerState();
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
  /**
   * Composite cursor across all the replicaDBs for all the replication domains.
@@ -168,6 +169,21 @@
  }
  /**
   * Signals a replica went offline.
   *
   * @param baseDN
   *          the replica's replication domain
   * @param offlineCSN
   *          the serverId and time of the replica that went offline
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    lastSeenUpdates.update(baseDN, offlineCSN);
    replicasOffline.update(baseDN, offlineCSN);
    tryNotify(baseDN);
  }
  /**
   * Notifies the Change number indexer thread if it will be able to do some
   * work.
   */
@@ -187,8 +203,8 @@
    final CSN mcCSN = mediumConsistencyCSN;
    if (mcCSN != null)
    {
      final CSN lastSeenSameServerId =
          lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
      final int serverId = mcCSN.getServerId();
      final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
      return mcCSN.isOlderThan(lastSeenSameServerId);
    }
    return true;
@@ -374,6 +390,37 @@
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(baseDN, csn);
    mediumConsistencyCSN = csn;
    final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mediumConsistencyCSN)
        // If no new updates has been seen for this replica
        && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
    {
      removeCursor(baseDN, csn);
      replicasOffline.removeCSN(baseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
    }
  }
  private void removeCursor(final DN baseDN, final CSN csn)
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
        .entrySet())
    {
      if (baseDN.equals(entry.getKey()))
      {
        final Set<Integer> serverIds = entry.getValue().keySet();
        for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
        {
          final int serverId = iter.next();
          if (csn.getServerId() == serverId)
          {
            iter.remove();
            return;
          }
        }
      }
    }
  }
  private void createNewCursors() throws ChangelogException