mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.08.2013 5f803832687ee9d56deec9946d6be7f3772e7688
opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,19 @@
  }
  /**
   * Returns the ServerState associated to the provided replication domain's
   * baseDN.
   *
   * @param baseDN
   *          the replication domain's baseDN
   * @return the associated ServerState
   */
  public ServerState getServerState(DN baseDN)
  {
    return list.get(baseDN);
  }
  /**
   * Returns the CSN associated to the provided replication domain's baseDN and
   * serverId.
   *
@@ -216,7 +229,7 @@
   *          the replication domain's baseDN
   * @param serverId
   *          the serverId
   * @return the associated ServerState
   * @return the associated CSN
   */
  public CSN getCSN(DN baseDN, int serverId)
  {
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -167,8 +167,6 @@
   */
  private int assuredTimeoutTimerPurgeCounter = 0;
  private ServerState ctHeartbeatState;
  /**
   * Creates a new ReplicationServerDomain associated to the baseDN.
   *
@@ -2558,19 +2556,6 @@
  }
  /**
   * Return the state that contain for each server the time of eligibility.
   * @return the state.
   */
  public ServerState getChangeTimeHeartbeatState()
  {
    if (ctHeartbeatState == null)
    {
      ctHeartbeatState = getLatestServerState().duplicate();
    }
    return ctHeartbeatState;
  }
  /**
   * Returns the oldest known state for the domain, made of the oldest CSN
   * stored for each serverId.
   * <p>
@@ -2593,31 +2578,13 @@
   *
   * @return the eligible CSN.
   */
  public CSN getEligibleCSN()
  CSN getEligibleCSN()
  {
    CSN eligibleCSN = null;
    final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
    for (final CSN replicaNewestCSN : newestCSNs)
    for (final CSN lastAliveCSN : domainDB.getDomainLastAliveCSNs(baseDN))
    {
      // Should it be considered for eligibility ?
      int serverId = replicaNewestCSN.getServerId();
      CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the domain is considered down and not considered for eligibility
      /*
      if ((heartbeatLastDN != null) &&
          (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000))
      {
        if (debugEnabled())
          TRACER.debugInfo("In " + this.getName() +
            " Server " + sid
            + " is not considered for eligibility ... potentially down");
        continue;
      }
      */
      final int serverId = lastAliveCSN.getServerId();
      if (!isServerConnected(serverId))
      {
        if (debugEnabled())
@@ -2628,13 +2595,9 @@
        continue;
      }
      if (eligibleCSN == null || replicaNewestCSN.isNewerThan(eligibleCSN))
      if (eligibleCSN == null || lastAliveCSN.isNewerThan(eligibleCSN))
      {
        eligibleCSN = replicaNewestCSN;
      }
      if (heartbeatLastCSN != null && heartbeatLastCSN.isNewerThan(eligibleCSN))
      {
        eligibleCSN = heartbeatLastCSN;
        eligibleCSN = lastAliveCSN;
      }
    }
@@ -2671,7 +2634,7 @@
   * @param msg The message to process.
   */
  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg )
      ChangeTimeHeartbeatMsg msg)
  {
    try
    {
@@ -2689,7 +2652,7 @@
    try
    {
      storeReceivedCTHeartbeat(msg.getCSN());
      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      if (senderHandler.isDataServer())
      {
        // If we are the first replication server warned,
@@ -2722,17 +2685,6 @@
  }
  /**
   * Store a change time value received from a data server.
   * @param csn The provided change time.
   */
  public void storeReceivedCTHeartbeat(CSN csn)
  {
    // TODO:Maybe we can spare processing by only storing CSN (timestamp)
    // instead of a server state.
    getChangeTimeHeartbeatState().update(csn);
  }
  /**
   * This methods count the changes, server by server :
   * - from a serverState start point
   * - to (inclusive) an end point (the provided endCSN).
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -47,8 +47,8 @@
  long getDomainChangesCount(DN baseDN);
  /**
   * Returns the oldest {@link CSN}s of each serverId for the specified
   * replication domain.
   * Returns the oldest {@link CSN}s from the replicaDBs for each serverId in
   * the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
@@ -59,8 +59,8 @@
  ServerState getDomainOldestCSNs(DN baseDN);
  /**
   * Returns the newest {@link CSN}s of each serverId for the specified
   * replication domain.
   * Returns the newest {@link CSN}s from the replicaDBs for each serverId in
   * the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
@@ -71,6 +71,18 @@
  ServerState getDomainNewestCSNs(DN baseDN);
  /**
   * Returns the last time each serverId was seen alive for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => CSN} Map. Can be
   *         null if the config that computes change numbers is set to false or
   *         if domain is not replicated.
   */
  ServerState getDomainLastAliveCSNs(DN baseDN);
  /**
   * Retrieves the latest trim date for the specified replication domain.
   * <p>
   * FIXME will be removed when ECLServerHandler will not be responsible anymore
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -101,12 +101,13 @@
  private volatile CSN mediumConsistencyCSN;
  /**
   * Holds the most recent changes or heartbeats received for each serverIds
   * cross domain. changes are stored in the replicaDBs and hence persistent,
   * heartbeats are transient because they are easily constructed on normal
   * operations.
   * Holds the last time each replica was seen alive, whether via updates or
   * heartbeats received. Data is held for each serverId cross domain.
   * <p>
   * Updates are persistent and stored in the replicaDBs, heartbeats are
   * transient and are easily constructed on normal operations.
   */
  private final MultiDomainServerState lastSeenUpdates =
  private final MultiDomainServerState lastAliveCSNs =
      new MultiDomainServerState();
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
@@ -174,7 +175,7 @@
   */
  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    lastSeenUpdates.update(baseDN, heartbeatCSN);
    lastAliveCSNs.update(baseDN, heartbeatCSN);
    tryNotify(baseDN);
  }
@@ -192,13 +193,27 @@
      throws ChangelogException
  {
    final CSN csn = updateMsg.getCSN();
    lastSeenUpdates.update(baseDN, csn);
    lastAliveCSNs.update(baseDN, csn);
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    tryNotify(baseDN);
  }
  /**
   * Returns the last time each serverId was seen alive for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => CSN} Map. Can be
   *         null if domain is not replicated.
   */
  public ServerState getDomainLastAliveCSNs(DN baseDN)
  {
    return lastAliveCSNs.getServerState(baseDN);
  }
  /**
   * Signals a replica went offline.
   *
   * @param baseDN
@@ -208,7 +223,7 @@
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    lastSeenUpdates.update(baseDN, offlineCSN);
    lastAliveCSNs.update(baseDN, offlineCSN);
    replicasOffline.update(baseDN, offlineCSN);
    tryNotify(baseDN);
  }
@@ -234,8 +249,8 @@
    if (mcCSN != null)
    {
      final int serverId = mcCSN.getServerId();
      final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
      return mcCSN.isOlderThan(lastSeenSameServerId);
      CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    return true;
  }
@@ -269,7 +284,7 @@
      }
      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
      lastSeenUpdates.update(baseDN, latestKnownState);
      lastAliveCSNs.update(baseDN, latestKnownState);
    }
    resetNextChangeForInsertDBCursor();
@@ -491,7 +506,7 @@
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mediumConsistencyCSN)
        // If no new updates has been seen for this replica
        && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
        && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
    {
      removeCursor(baseDN, csn);
      replicasOffline.removeCSN(baseDN, offlineCSN);
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -539,6 +539,23 @@
  /** {@inheritDoc} */
  @Override
  public ServerState getDomainLastAliveCSNs(DN baseDN)
  {
    final ChangeNumberIndexer indexer = this.cnIndexer.get();
    if (indexer != null)
    {
      final ServerState results = indexer.getDomainLastAliveCSNs(baseDN);
      if (results != null)
      {
        // return a copy to protect against concurrent modifications
        return results.duplicate();
      }
    }
    return null;
  }
  /** {@inheritDoc} */
  @Override
  public void removeDomain(DN baseDN) throws ChangelogException
  {
    // Remember the first exception because :
@@ -791,8 +808,11 @@
  @Override
  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    // TODO implement this when the changelogDB will be responsible for
    // maintaining the medium consistency point
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.publishHeartbeat(baseDN, heartbeatCSN);
    }
  }
  /** {@inheritDoc} */
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -552,9 +552,8 @@
    debugInfo(tn, "Starting test\n\n");
    // root entry returned
    final InternalSearchOperation op = searchOnChangelog(
        "(objectclass=*)", Collections.<String>emptySet(), createControls(""), 1, tn);
    waitOpResult(op, ResultCode.SUCCESS);
    searchOnChangelog("(objectclass=*)", Collections.<String> emptySet(), createControls(""),
        1, ResultCode.SUCCESS, tn);
    debugInfo(tn, "Ending test successfully");
  }
@@ -585,8 +584,7 @@
  /** Add an entry in the database */
  private void addEntry(Entry entry) throws Exception
  {
    AddOperation addOp = connection.processAdd(entry);
    waitOpResult(addOp, ResultCode.SUCCESS);
    waitOpResult(connection.processAdd(entry), ResultCode.SUCCESS);
    assertNotNull(getEntry(entry.getDN(), 1000, true));
  }
@@ -868,10 +866,8 @@
      throws Exception
  {
    debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]");
    final InternalSearchOperation searchOp = searchOnChangelog(
        filterString, ALL_ATTRIBUTES, createControls(cookie), expectedNbEntries, testName);
    waitOpResult(searchOp, expectedResultCode);
    return searchOp;
    return searchOnChangelog(filterString, ALL_ATTRIBUTES, createControls(cookie),
        expectedNbEntries, expectedResultCode, testName);
  }
  private InternalSearchOperation searchOnChangelog(String filterString,
@@ -879,15 +875,13 @@
      throws Exception
  {
    debugInfo(testName, " Search: " + filterString);
    final InternalSearchOperation searchOp = searchOnChangelog(
        filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, testName);
    waitOpResult(searchOp, expectedResultCode);
    return searchOp;
    return searchOnChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL,
        expectedNbEntries, expectedResultCode, testName);
  }
  private InternalSearchOperation searchOnChangelog(String filterString,
      Set<String> attributes, List<Control> controls, int expectedNbEntries,
      String testName) throws Exception
      ResultCode expectedResultCode, String testName) throws Exception
  {
    InternalSearchOperation op = null;
    int cnt = 0;
@@ -912,6 +906,7 @@
    final List<SearchResultEntry> entries = op.getSearchEntries();
    assertThat(entries).hasSize(expectedNbEntries);
    debugAndWriteEntries(getLDIFWriter(), entries, testName);
    waitOpResult(op, expectedResultCode);
    return op;
  }
@@ -2104,7 +2099,6 @@
      ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
      debugInfo(tn, rsd1.getBaseDN()
          + " LatestServerState=" + rsd1.getLatestServerState()
          + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState()
          + " eligibleCSN=" + rsd1.getEligibleCSN()
          + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
      // FIXME:ECL Enable this test by adding an assert on the right value
@@ -2112,7 +2106,6 @@
      ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
      debugInfo(tn, rsd2.getBaseDN()
          + " LatestServerState=" + rsd2.getLatestServerState()
          + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState()
          + " eligibleCSN=" + rsd2.getEligibleCSN()
          + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
      // FIXME:ECL Enable this test by adding an assert on the right value
@@ -2882,13 +2875,10 @@
    }
    finally
    {
      final DeleteOperation delOp1 = connection.processDelete(
          DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2));
      waitOpResult(delOp1, ResultCode.SUCCESS);
      final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2);
      waitOpResult(delOp2, ResultCode.SUCCESS);
      final DeleteOperation delOp3 = connection.processDelete(baseDN3);
      waitOpResult(delOp3, ResultCode.SUCCESS);
      final DN fionaDN = DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
      waitOpResult(connection.processDelete(fionaDN), ResultCode.SUCCESS);
      waitOpResult(connection.processDelete(TEST_ROOT_DN2), ResultCode.SUCCESS);
      waitOpResult(connection.processDelete(baseDN3), ResultCode.SUCCESS);
      remove(domain21, domain2, domain3);
      removeTestBackend(backend2, backend3);