mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.51.2013 69ebea4017b17653a1b966b7a83372eb9ce0dcdc
ECLServerHandler.java, ReplicationServerDomain.java:
Renamed variables and changed comments to match the new terminology.
Added more comments to explain what the code is doing.

ReplicationBroker.java
Renamed replicationServerInfo => rsInfo.

replication.properties
Removed space characters at the end of each lines.

RSInfo.java:
In toString(), removed newline + improved formatting.
5 files modified
175 ■■■■ changed files
opends/src/messages/messages/replication.properties 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/RSInfo.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 88 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 58 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -500,7 +500,7 @@
NOTICE_RS_NOT_LOCALLY_CONFIGURED_219=RS(%d) was not configured locally on DS(%d), \
 but at least one other RS was
NOTICE_RS_HAS_NO_GENERATION_ID_220=RS(%d) has no generation Id, but at least one \
 other RS has the same generation Id %d as DS(%d)
 other RS has the same generation Id %d as DS(%d)
NOTICE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS_221=RS(%d) generation Id %d does not \
 match DS(%d) generation Id %d, but at least another RS does
NOTICE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS_222=RS(%d) groupId %d does not match \
@@ -525,5 +525,5 @@
 biggest weight among all the replication servers
NOTICE_AVOID_YOYO_EFFECT_232=DS(%d) stayed connected to RS(%d) to avoid the yoyo effect
NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \
 for DS(%d) to connect to because it was the only one standing after all tests
 for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -167,16 +167,11 @@
  public String toString()
  {
    StringBuilder sb = new StringBuilder();
    sb.append("\nId: ");
    sb.append(id);
    sb.append(" ; Server URL: ");
    sb.append(serverUrl);
    sb.append(" ; Generation id: ");
    sb.append(generationId);
    sb.append(" ; Group id: ");
    sb.append(groupId);
    sb.append(" ; Weight: ");
    sb.append(weight);
    sb.append("Id: ").append(id);
    sb.append(" ; Server URL: ").append(serverUrl);
    sb.append(" ; Generation id: ").append(generationId);
    sb.append(" ; Group id: ").append(groupId);
    sb.append(" ; Weight: ").append(weight);
    return sb.toString();
  }
}
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -58,13 +58,33 @@
public final class ECLServerHandler extends ServerHandler
{
  /**
   * Marks the end of the search on the External Change Log.
   */
  private static int UNDEFINED_PHASE = 0;
  /**
   * Constant used to indicate the handler is in the ECL initialization phase.
   * The External Change Log initialization phase currently reads the changes
   * from all the ReplicaDBs from oldest to newest and then:
   * <ul>
   * <li>Matches each ReplicaDBs change with the corresponding record in
   * {@link ChangeNumberIndexDB}, then assign its changeNumber in memory before
   * returning the change to the client</li>
   * <li>Once it reaches the end of the {@link ChangeNumberIndexDB}, it inserts
   * each ReplicaDBs change in the {@link ChangeNumberIndexDB} and gets back and
   * assign the changeNumber in memory to the ReplicaDBs change.</li>
   * </ul>
   * Once this phase is over the current search moves to the
   * {@link #UNDEFINED_PHASE} or the {@link #PERSISTENT_PHASE} depending on the
   * search type.
   *
   * @see #getSearchPhase()
   */
  public static int INIT_PHASE = 1;
  /**
   * The persistent phase is only used for persistent searches on the External
   * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the
   * client newly added changes to the {@link ChangeNumberIndexDB}.
   */
  private static int PERSISTENT_PHASE = 2;
  /**
@@ -111,8 +131,8 @@
  private Set<String> excludedBaseDNs = new HashSet<String>();
  /**
   * Eligible CSN - only changes older or equal to eligibleCSN * are published
   * in the ECL.
   * Eligible CSN - only changes older or equal to eligibleCSN are published in
   * the ECL.
   */
  private CSN eligibleCSN;
@@ -1203,6 +1223,9 @@
        if (draftCompat)
        {
          continueLooping = !assignChangeNumber(change);
          // if we encounter a change that has been trimmed from the replicaDBs,
          // we will skip it and loop to the next oldest change from the
          // replicaDBs
        }
        // here we have the right oldest change
@@ -1288,34 +1311,34 @@
   * Either retrieves a change number from the DB, or assign a new change number
   * and store in the DB.
   *
   * @param oldestChange
   *          the oldestChange where to assign the change number
   * @param replicaDBChange
   *          the replica DB change to find in the {@link ChangeNumberIndexDB}
   *          where to assign the change number
   * @return <code>true</code> if a change number has been assigned to the
   *         provided oldestChange, <code>false</code> otherwise
   *         provided replicaDBChange, <code>false</code> otherwise
   * @throws DirectoryException
   *           if any problem occur
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
  private boolean assignChangeNumber(final ECLUpdateMsg replicaDBChange)
      throws ChangelogException
  {
    // We also need to check if the CNIndexDB is consistent with the
    // changelogDB. If not, 2 potential reasons:
    // a/ changelog has been purged (trim) let's traverse the CNIndexDB
    // We also need to check if the CNIndexDB is consistent with the replicaDBs.
    // If not, 2 potential reasons:
    // a/ replicaDBs have been purged (trim) let's traverse the CNIndexDB
    // b/ changelog is late ... let's traverse the changelogDb
    // The following loop allows to loop until being on the same cn in the 2 dbs
    // replogCSN : the oldest change from the changelog db
    CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
    DN dnFromChangelogDb = oldestChange.getBaseDN();
    CSN csnFromReplicaDB = replicaDBChange.getUpdateMsg().getCSN();
    DN dnFromReplicaDB = replicaDBChange.getBaseDN();
    while (true)
    {
      if (isEndOfCNIndexDBReached)
      {
        // we are at the end of the CNIndexDB in the append mode
        assignNewChangeNumberAndStore(oldestChange);
        assignNewChangeNumberAndStore(replicaDBChange);
        return true;
      }
@@ -1324,43 +1347,44 @@
      final DN dnFromCNIndexDB = currentRecord.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :"
            + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:"
            + asDate(csnFromChangelogDb) + " ?older"
        TRACER.debugInfo("assignChangeNumber() comparing the replicaDB's and"
            + " CNIndexDB's DNs :" + dnFromReplicaDB + "?=" + dnFromCNIndexDB
            + " timestamps:" + asDate(csnFromReplicaDB) + " ?older"
            + asDate(csnFromCNIndexDB));
      if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
          csnFromCNIndexDB, dnFromCNIndexDB))
      if (areSameChange(csnFromReplicaDB, dnFromReplicaDB, csnFromCNIndexDB,
          dnFromCNIndexDB))
      {
        // We matched the ReplicaDB change with a record in the CNIndexDB
        // => set the changeNumber in memory and return the change to the client
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
              + currentRecord.getChangeNumber() + " to change=" + oldestChange);
              + currentRecord.getChangeNumber() + " to change="
              + replicaDBChange);
        oldestChange.setChangeNumber(currentRecord.getChangeNumber());
        replicaDBChange.setChangeNumber(currentRecord.getChangeNumber());
        return true;
      }
      if (!csnFromCNIndexDB.isOlderThan(csnFromChangelogDb))
      if (!csnFromCNIndexDB.isOlderThan(csnFromReplicaDB))
      {
        // the change from the changelogDb is older
        // the change from the replicaDB is older
        // it should have been stored lately
        // let's continue to traverse the changelogDB
        // let's continue to traverse the replicaDBs
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() will skip "
              + csnFromChangelogDb
          TRACER.debugInfo("assignChangeNumber() will skip " + csnFromReplicaDB
              + " and read next change from the regular changelog.");
        return false; // TO BE CHECKED
      }
      // the change from the CNIndexDB is older
      // that means that the change has been purged from the
      // changelogDb (and CNIndexDB not yet been trimmed)
      // The change from the CNIndexDB is older.
      // It means that the CNIndexDB change has been purged from the replicaDB
      // and CNIndexDB has not been trimmed yet.
      try
      {
        // let's traverse the CNIndexDB searching for the change
        // found in the changelogDB
        // keep traversing the CNIndexDB searching for the replicaDB change
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
              + " and read next change from the CNIndexDB.");
@@ -1380,7 +1404,7 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        // FIXME There is an opportunity for an infinite loop here if the DB
        // continuously throws DatabaseExceptions
        // continuously throws ChangelogExceptions
      }
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2595,10 +2595,10 @@
    CSN eligibleCSN = null;
    final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
    for (final CSN changelogNewestCSN : newestCSNs)
    for (final CSN replicaNewestCSN : newestCSNs)
    {
      // Should it be considered for eligibility ?
      int serverId = changelogNewestCSN.getServerId();
      int serverId = replicaNewestCSN.getServerId();
      CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
@@ -2625,11 +2625,11 @@
        continue;
      }
      if (changelogNewestCSN != null
      if (replicaNewestCSN != null
          && (eligibleCSN == null ||
              changelogNewestCSN.isNewerThan(eligibleCSN)))
              replicaNewestCSN.isNewerThan(eligibleCSN)))
      {
        eligibleCSN = changelogNewestCSN;
        eligibleCSN = replicaNewestCSN;
      }
      if (heartbeatLastCSN != null
          && (eligibleCSN == null || heartbeatLastCSN.isNewerThan(eligibleCSN)))
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -299,20 +299,19 @@
  /**
   * Sets the locally configured flag for the passed ReplicationServerInfo
   * object, analyzing the local configuration.
   * @param replicationServerInfo the Replication server to check and update
   * @param rsInfo the Replication server to check and update
   */
  private void updateRSInfoLocallyConfiguredStatus(
    ReplicationServerInfo replicationServerInfo)
  private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo)
  {
    // Determine if the passed ReplicationServerInfo has a URL that is present
    // in the locally configured replication servers
    String rsUrl = replicationServerInfo.getServerURL();
    String rsUrl = rsInfo.getServerURL();
    if (rsUrl == null)
    {
      // The ReplicationServerInfo has been generated from a server with
      // no URL in TopologyMsg (i.e: with replication protocol version < 4):
      // ignore this server as we do not know how to connect to it
      replicationServerInfo.setLocallyConfigured(false);
      rsInfo.setLocallyConfigured(false);
      return;
    }
    for (String serverUrl : getReplicationServerUrls())
@@ -320,12 +319,12 @@
      if (isSameReplicationServerUrl(serverUrl, rsUrl))
      {
        // This RS is locally configured, mark this
        replicationServerInfo.setLocallyConfigured(true);
        replicationServerInfo.serverURL = serverUrl;
        rsInfo.setLocallyConfigured(true);
        rsInfo.serverURL = serverUrl;
        return;
      }
    }
    replicationServerInfo.setLocallyConfigured(false);
    rsInfo.setLocallyConfigured(false);
  }
  /**
@@ -704,14 +703,12 @@
    for (String serverUrl : getReplicationServerUrls())
    {
      // Connect to server and get info about it
      ReplicationServerInfo replicationServerInfo =
        performPhaseOneHandshake(serverUrl, false, false);
      // Store server info in list
      if (replicationServerInfo != null)
      // Connect to server + get and store info about it
      ReplicationServerInfo rsInfo =
          performPhaseOneHandshake(serverUrl, false, false);
      if (rsInfo != null)
      {
        rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
        rsInfos.put(rsInfo.getServerId(), rsInfo);
      }
    }
@@ -1943,10 +1940,10 @@
    int sumOfWeights = 0;
    // Sum of the connected DSs
    int sumOfConnectedDSs = 0;
    for (ReplicationServerInfo replicationServerInfo : bestServers.values())
    for (ReplicationServerInfo rsInfo : bestServers.values())
    {
      sumOfWeights += replicationServerInfo.getWeight();
      sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
      sumOfWeights += rsInfo.getWeight();
      sumOfConnectedDSs += rsInfo.getConnectedDSNumber();
    }
    // Distance (difference) of the current loads to the load goals of each RS:
@@ -3036,27 +3033,25 @@
    // Update replication server info list with the received topology
    // information
    List<Integer> rsToKeepList = new ArrayList<Integer>();
    final Set<Integer> rssToKeep = new HashSet<Integer>();
    for (RSInfo rsInfo : topoMsg.getRsList())
    {
      int rsId = rsInfo.getId();
      rsToKeepList.add(rsId); // Mark this server as still existing
      rssToKeep.add(rsId); // Mark this server as still existing
      List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
      ReplicationServerInfo replicationServerInfo =
        replicationServerInfos.get(rsId);
      if (replicationServerInfo == null)
      ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
      if (rsInfo2 == null)
      {
        // New replication server, create info for it add it to the list
        replicationServerInfo =
          new ReplicationServerInfo(rsInfo, connectedDSs);
        rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
        // Set the locally configured flag for this new RS only if it is
        // configured
        updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
        replicationServerInfos.put(rsId, replicationServerInfo);
        updateRSInfoLocallyConfiguredStatus(rsInfo2);
        replicationServerInfos.put(rsId, rsInfo2);
      } else
      {
        // Update the existing info for the replication server
        replicationServerInfo.update(rsInfo, connectedDSs);
        rsInfo2.update(rsInfo, connectedDSs);
      }
    }
@@ -3064,12 +3059,11 @@
     * Now remove any replication server that may have disappeared from the
     * topology.
     */
    Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt =
      replicationServerInfos.entrySet().iterator();
    Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator();
    while (rsInfoIt.hasNext())
    {
      Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next();
      if (!rsToKeepList.contains(rsInfoEntry.getKey()))
      final Integer rsId = rsInfoIt.next();
      if (!rssToKeep.contains(rsId))
      {
        // This replication server has quit the topology, remove it from the
        // list