mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.14.2014 5eb7b26eabf15d047fd913597aa508bb78c1d7e7
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -271,16 +271,32 @@
    }
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasArePastOldestPossibleCSN();
    return allInitialReplicasAreOfflineOrAlive();
  }
  private boolean allInitialReplicasArePastOldestPossibleCSN()
  /**
   * Returns true only if the initial replicas known from the changelog state DB
   * are either:
   * <ul>
   * <li>offline, so do not wait for them in order to compute medium consistency
   * </li>
   * <li>alive, because we received heartbeats or changes (so their last alive
   * CSN has been updated to something past the oldest possible CSN), we have
   * enough info to compute medium consistency</li>
   * </ul>
   * In this case, we have enough information to compute medium consistency
   * without waiting any more.
   */
  private boolean allInitialReplicasAreOfflineOrAlive()
  {
    for (DN baseDN : lastAliveCSNs)
    {
      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
      {
        if (csn.getTime() == 0)
        if (// oldest possible CSN?
            csn.getTime() == 0
            // replica is not offline
            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
        {
          return false;
        }
@@ -363,6 +379,9 @@
        if (isECLEnabledDomain(baseDN))
        {
          replicasOffline.update(baseDN, offlineCSN);
          // a replica offline message could also be the very last time
          // we heard from this replica :)
          lastAliveCSNs.update(baseDN, offlineCSN);
        }
      }
    }
@@ -528,10 +547,6 @@
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
          changelogDB.getChangeNumberIndexDB().addRecord(record);
          moveForwardMediumConsistencyPoint(csn, baseDN);
          // advance the cursor we just read from,
          // success/failure will be checked later
          nextChangeForInsertDBCursor.next();
        }
        catch (InterruptedException ignored)
        {
@@ -571,6 +586,7 @@
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  {
    boolean callNextOnCursor = true;
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
@@ -586,17 +602,34 @@
      }
      else if (offlineCSN.isOlderThan(mcCSN))
      {
        /*
         * replica is not back online and Medium consistency point has gone past
         * its last offline time: remove everything known about it: cursor,
         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
         * from the medium consistency RUV.
         */
        removeCursor(mcBaseDN, mcCSN);
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
            pair = getCursor(mcBaseDN, mcCSN.getServerId());
        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
        if (iter != null && !iter.hasNext())
        {
          /*
           * replica is not back online, Medium consistency point has gone past
           * its last offline time, and there are no more changes after the
           * offline CSN in the cursor: remove everything known about it:
           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
           * this replica from the medium consistency RUV.
           */
          iter.remove();
          StaticUtils.close(pair.getFirst());
          resetNextChangeForInsertDBCursor();
          callNextOnCursor = false;
          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
        }
      }
    }
    if (callNextOnCursor)
    {
      // advance the cursor we just read from,
      // success/failure will be checked later
      nextChangeForInsertDBCursor.next();
    }
  }
  private void removeAllCursors()
@@ -614,8 +647,8 @@
    newCursors.clear();
  }
  private void removeCursor(final DN baseDN, final CSN csn)
      throws ChangelogException
  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
      getCursor(final DN baseDN, final int serverId) throws ChangelogException
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
@@ -626,16 +659,14 @@
            entry1.getValue().entrySet().iterator(); iter.hasNext();)
        {
          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
          if (csn.getServerId() == entry2.getKey())
          if (serverId == entry2.getKey())
          {
            iter.remove();
            StaticUtils.close(entry2.getValue());
            resetNextChangeForInsertDBCursor();
            return;
            return Pair.of(entry2.getValue(), iter);
          }
        }
      }
    }
    return Pair.empty();
  }
  private boolean recycleExhaustedCursors() throws ChangelogException