mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.57.2013 a5f828cdb6549933c9f49783f99be1de1085a75e
Made code more encapsulated and more explicit.
Removed parameters obscuring the code.


ReplicationServerDomain.java:
Renamed getServers() to getServerIds().

MessageHandler.java:
In getDomain(), removed createIfNotExist which was always true in calling code + called ReplicationServer.waitConnections() here.
In getRcvMsgQueueSize(), used early exit.

ReplicationServer.java:
In getReplicationServerDomain() methods, removed the create parameter.
Also extracted the waitConnections() method and called it higher up in the call hierarchy.
Created initDomainGenerationID() and addServerIdToDomain() methods here from code in ReplicationDbEnv.java
In runConnect(), collapsed if statements.


*.java
Consequence of the changes to:
- ReplicationServerDomain.getServers()
- ReplicationServer.getReplicationServerDomain()
- ReplicationServer.initDomainGenerationID() and addServerIdToDomain()

GenerationIdTest.java:
Extracted method isDegradedDueToGenerationId().

InitOnLineTest.java:
Extracted method getConnectedDSServerIds().
11 files modified
238 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 32 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 74 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 10 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 18 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 51 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 32 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -800,7 +800,7 @@
      {
        if (allowUnknownDomains)
          for (String providedDomain : startStatesFromProvidedCookie.keySet())
            if (rs.getReplicationServerDomain(providedDomain, false) == null)
            if (rs.getReplicationServerDomain(providedDomain) == null)
              // the domain provided in the cookie is not replicated
              startStatesFromProvidedCookie.remove(providedDomain);
      }
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -189,18 +189,19 @@
  /**
   * Returns the Replication Server Domain to which belongs this handler.
   *
   * @param createIfNotExist    Creates the domain if it does not exist.
   * @param waitConnections     Waits for the Connections with other RS to
   *                            be established before returning.
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain(
      boolean createIfNotExist, boolean waitConnections)
  public ReplicationServerDomain getDomain(boolean waitConnections)
  {
    if (replicationServerDomain==null)
    {
      replicationServerDomain = replicationServer.getReplicationServerDomain(
            baseDN, createIfNotExist, waitConnections);
      replicationServerDomain =
          replicationServer.getReplicationServerDomain(baseDN, true);
      if (waitConnections) {
        replicationServer.waitConnections();
      }
    }
    return replicationServerDomain;
  }
@@ -476,7 +477,7 @@
        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
    // Build a list of candidates iterator (i.e. db i.e. server)
    for (int serverId : replicationServerDomain.getServers())
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server
      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
@@ -525,22 +526,21 @@
       * the number of updates to be sent is the size of the receive queue.
       */
      if (following)
        return msgQueue.count();
      else
      {
        return msgQueue.count();
      }
        /*
         * When the server  is not able to follow, the msgQueue
         * may become too large and therefore won't contain all the
         * changes. Some changes may only be stored in the backing DB
         * of the servers.
         * The total size of the receive queue is calculated by doing
         * the sum of the number of missing changes for every dbHandler.
       * When the server is not able to follow, the msgQueue may become too
       * large and therefore won't contain all the changes. Some changes may
       * only be stored in the backing DB of the servers.
       * The total size of the receive queue is calculated by doing the sum of
       * the number of missing changes for every dbHandler.
         */
        ServerState dbState = replicationServerDomain.getDbServerState();
        return ServerState.diffChanges(dbState, serverState);
      }
    }
  }
  /**
   * Get the state of this server.
@@ -639,7 +639,7 @@
    {
      this.baseDN = baseDN;
      if (!baseDN.equalsIgnoreCase("cn=changelog"))
        this.replicationServerDomain = getDomain(true, isDataServer);
        this.replicationServerDomain = getDomain(isDataServer);
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -624,7 +624,7 @@
      final LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation, final ChangeNumber previousCN)
  {
    for (int serverId : rsd.getServers())
    for (int serverId : rsd.getServerIds())
    {
      if (exportConfig != null && exportConfig.isCancelled())
      { // Abort if cancelled
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -430,14 +430,9 @@
            // FIXME: this will need changing if we ever support listening on
            // specific addresses.
            if (isLocalAddress(inetAddress) && (port == replicationPort))
            {
              continue;
            }
            if ((isLocalAddress(inetAddress) && port == replicationPort)
            // Don't connect to a server if it is already connected.
            final String normalizedServerURL = normalizeServerURL(rsURL);
            if (connectedRSUrls.contains(normalizedServerURL))
                || connectedRSUrls.contains(normalizeServerURL(rsURL)))
            {
              continue;
            }
@@ -726,15 +721,12 @@
   *
   * @param baseDn The base Dn for which the ReplicationServerDomain must be
   * returned.
   * @param create Specifies whether to create the ReplicationServerDomain if
   *        it does not already exist.
   * @return The ReplicationServerDomain associated to the base DN given in
   *         parameter.
   */
  public ReplicationServerDomain getReplicationServerDomain(String baseDn,
          boolean create)
  public ReplicationServerDomain getReplicationServerDomain(String baseDn)
  {
    return getReplicationServerDomain(baseDn, create, false);
    return getReplicationServerDomain(baseDn, false);
  }
  /**
@@ -745,29 +737,27 @@
   * returned.
   * @param create Specifies whether to create the ReplicationServerDomain if
   *        it does not already exist.
   * @param waitConnections     Waits for the Connections with other RS to
   *                            be established before returning.
   * @return The ReplicationServerDomain associated to the base DN given in
   *         parameter.
   */
  public ReplicationServerDomain getReplicationServerDomain(String baseDn,
          boolean create, boolean waitConnections)
      boolean create)
  {
    ReplicationServerDomain domain;
    synchronized (baseDNs)
    {
      domain = baseDNs.get(baseDn);
      if (domain != null ||!create) {
        return domain;
      }
      ReplicationServerDomain domain = baseDNs.get(baseDn);
      if (domain == null && create) {
      domain = new ReplicationServerDomain(baseDn, this);
      baseDNs.put(baseDn, domain);
    }
      return domain;
    }
  }
    if (waitConnections)
  /**
   * Waits for connections to this ReplicationServer.
   */
  public void waitConnections()
    {
      // Acquire a domain ticket and wait for a complete cycle of the connect
      // thread.
@@ -789,7 +779,6 @@
      // Wait until the connect thread has processed next connect phase.
      synchronized (domainTicketLock)
      {
        // Condition.
        while (myDomainTicket > domainTicket && !shutdown)
        {
          try
@@ -806,9 +795,6 @@
      }
    }
    return domain;
  }
  /**
   * Shutdown the Replication Server service and all its connections.
   */
@@ -1158,7 +1144,7 @@
   */
  public long getGenerationId(String baseDN)
  {
    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN, false);
    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    if (rsd!=null)
      return rsd.getGenerationId();
    return -1;
@@ -1924,4 +1910,34 @@
    return "RS(" + serverId + ") on " + serverURL + ", domains="
        + baseDNs.keySet();
  }
  /**
   * Initializes the generationId for the specified replication domain.
   *
   * @param baseDn
   *          the replication domain
   * @param generationId
   *          the the generationId value for initialization
   */
  public void initDomainGenerationID(String baseDn, long generationId)
  {
    getReplicationServerDomain(baseDn, true).initGenerationID(generationId);
  }
  /**
   * Adds the specified serverId to the specified replication domain.
   *
   * @param serverId
   *          the server Id to add to the replication domain
   * @param baseDn
   *          the replication domain where to add the serverId
   * @throws ChangelogException
   *           If a database error happened.
   */
  public void addServerIdToDomain(int serverId, String baseDn)
      throws ChangelogException
  {
    DbHandler dbHandler = newDbHandler(serverId, baseDn);
    getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler);
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1269,12 +1269,13 @@
  }
  /**
   * Return a set containing the server that produced update and known by
   * this replicationServer from all over the topology,
   * whatever directly connected of connected to another RS.
   * @return a set containing the servers known by this replicationServer.
   * Returns a set containing the serverIds that produced updates and known by
   * this replicationServer from all over the topology, whether directly
   * connected or connected to another RS.
   *
   * @return a set containing the serverIds known by this replicationServer.
   */
  public Set<Integer> getServers()
  public Set<Integer> getServerIds()
  {
    return sourceDbHandlers.keySet();
  }
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -562,7 +562,7 @@
    public String getMonitorInstanceName()
    {
      ReplicationServerDomain domain = replicationServer
          .getReplicationServerDomain(baseDn, false);
          .getReplicationServerDomain(baseDn);
      return "Changelog for DS(" + serverId + "),cn="
          + domain.getMonitorInstanceName();
    }
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -340,8 +340,8 @@
            continue;
          }
          final ReplicationServerDomain domain = replicationServer
              .getReplicationServerDomain(baseDN, false);
          final ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(baseDN);
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -203,8 +203,7 @@
              + this.replicationServer.getMonitorInstanceName()
              + " Has read baseDn=" + baseDn + " generationId=" + generationId);
        replicationServer.getReplicationServerDomain(baseDn, true)
            .initGenerationID(generationId);
        replicationServer.initDomainGenerationID(baseDn, generationId);
      }
      status = cursor.getNext(key, data, LockMode.DEFAULT);
    }
@@ -236,12 +235,7 @@
              + this.replicationServer.getMonitorInstanceName()
              + " Has read: baseDn=" + baseDn + " serverId=" + serverId);
        DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this,
                replicationServer.getQueueSize());
        replicationServer.getReplicationServerDomain(baseDn, true)
            .setDbHandler(serverId, dbHandler);
        replicationServer.addServerIdToDomain(serverId, baseDn);
      }
      status = cursor.getNext(key, data, LockMode.DEFAULT);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -830,14 +830,13 @@
      publishDeleteMsgInOTest(s2test, cn9, tn, 9);
      sleep(500);
      ReplicationServerDomain rsd =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
      ReplicationServerDomain rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
      ServerState startState = rsd.getStartState();
      assertEquals(startState.getChangeNumber(s1test.getServerId()).getSeqnum(), 1);
      assertTrue(startState.getChangeNumber(s2test.getServerId()) != null);
      assertEquals(startState.getChangeNumber(s2test.getServerId()).getSeqnum(), 7);
      rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2, false);
      rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2);
      startState = rsd.getStartState();
      assertEquals(startState.getChangeNumber(s2test2.getServerId()).getSeqnum(), 2);
      assertEquals(startState.getChangeNumber(s1test2.getServerId()).getSeqnum(), 6);
@@ -1012,8 +1011,8 @@
      // ---
      // 2. Now set up a very short purge delay on the replication changelogs
      // so that this test can play with a trimmed changelog.
      d1 = replicationServer.getReplicationServerDomain("o=test", false);
      d2 = replicationServer.getReplicationServerDomain("o=test2", false);
      d1 = replicationServer.getReplicationServerDomain("o=test");
      d2 = replicationServer.getReplicationServerDomain("o=test2");
      d1.setPurgeDelay(1);
      d2.setPurgeDelay(1);
@@ -2331,8 +2330,7 @@
      publishDeleteMsgInOTest(s2test, cn9, tn, 9);
      sleep(500);
      ReplicationServerDomain rsd1 =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
      ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
      rsd1.getDbServerState();
      rsd1.getChangeTimeHeartbeatState();
      debugInfo(tn, rsd1.getBaseDn()
@@ -2342,8 +2340,7 @@
          + " rs eligibleCN=" + replicationServer.getEligibleCN());
      // FIXME:ECL Enable this test by adding an assert on the right value
      ReplicationServerDomain rsd2 =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2, false);
      ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2);
      rsd2.getDbServerState();
      rsd2.getChangeTimeHeartbeatState();
      debugInfo(tn, rsd2.getBaseDn()
@@ -2900,8 +2897,7 @@
    final ChangeNumber cn2 = cns[1];
    final ChangeNumber cn3 = cns[2];
    ReplicationServerDomain rsdtest =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
    ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
    // this empty state will force to count from the start of the DB
    final ServerState fromStart = new ServerState();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -27,6 +27,11 @@
 */
package org.opends.server.replication;
import java.io.File;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -43,6 +48,7 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationBackend;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tasks.LdifFileWriter;
import org.opends.server.types.*;
@@ -50,11 +56,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -910,22 +911,16 @@
      debugInfo("RS1 must have been cleared since it has not the proper generation ID");
      checkChangelogSize(0);
      assertTrue(!replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server1ID),
      assertFalse(isDegradedDueToGenerationId(replServer1, server1ID),
      "Expecting that DS1 status in RS1 is : not in bad gen id.");
      //===============================================================
      debugInfo(testCase + " ** TEST ** Previous test set a new gen ID on the "+
          "topology, verify degradation of DS2 and DS3");
      assertTrue(replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server2ID),
      assertTrue(isDegradedDueToGenerationId(replServer1, server2ID),
      "Expecting that DS2 with old gen ID is in bad gen id from RS1");
      assertTrue(replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server3ID),
      assertTrue(isDegradedDueToGenerationId(replServer1, server3ID),
      "Expecting that DS3 with old gen ID is in bad gen id from RS1");
      debugInfo("Add entries to DS1, update should not be sent to DS2 and DS3 that are in bad gen id");
@@ -1013,17 +1008,11 @@
      checkChangelogSize(1);
      debugInfo("Verifying that DS2 is not in bad gen id any more");
      assertTrue(!replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server2ID),
      assertFalse(isDegradedDueToGenerationId(replServer1, server2ID),
      "Expecting that DS2 is not in bad gen id from RS1");
      debugInfo("Verifying that DS3 is not in bad gen id any more");
      assertTrue(!replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server3ID),
      assertFalse(isDegradedDueToGenerationId(replServer1, server3ID),
      "Expecting that DS3 is not in bad gen id from RS1");
      debugInfo("Verify that DS2 receives the add message stored in RS1 DB");
@@ -1190,10 +1179,8 @@
        fail("Broker connection is expected to be accepted.");
      }
      debugInfo(
        "Expecting that broker2 is not in bad gen id since it has a correct genId");
      assertTrue(!replServer1.getReplicationServerDomain(baseDn.toNormalizedString(), false).
        isDegradedDueToGenerationId(server2ID));
      debugInfo("Expecting that broker2 is not in bad gen id since it has a correct genId");
      assertFalse(isDegradedDueToGenerationId(replServer1, server2ID));
      debugInfo("Disconnecting DS from replServer1");
      disconnectFromReplServer(changelog1ID);
@@ -1229,10 +1216,8 @@
        fail("Broker connection is expected to be accepted.");
      }
      debugInfo(
        "Expecting that broker3 is in bad gen id since it has a bad genId");
      assertTrue(replServer1.getReplicationServerDomain(baseDn.toNormalizedString(), false).
        isDegradedDueToGenerationId(server3ID));
      debugInfo("Expecting that broker3 is in bad gen id since it has a bad genId");
      assertTrue(isDegradedDueToGenerationId(replServer1, server3ID));
      int found = testEntriesInDb();
      assertEquals(found, updatedEntries.length,
@@ -1302,6 +1287,12 @@
    }
  }
  private boolean isDegradedDueToGenerationId(ReplicationServer rs, int serverId)
  {
    ReplicationServerDomain domain = rs.getReplicationServerDomain(baseDn.toNormalizedString());
    return domain.isDegradedDueToGenerationId(serverId);
  }
  /**
   * Disconnect broker and remove entries from the local DB
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -48,6 +48,7 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.*;
@@ -1072,34 +1073,21 @@
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      // Check that the list of connected LDAP servers is correct
      // in each replication servers
      Set<Integer> l1 = changelog1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l1).containsExactly(server1ID);
      Set<Integer> l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID, server3ID);
      Set<Integer> l3 = changelog3.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l3).isEmpty();
      // Check that the list of connected LDAP servers is correct in each replication servers
      Assertions.assertThat(getConnectedDSServerIds(changelog1)).containsExactly(server1ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server2ID, server3ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog3)).isEmpty();
      // Test updates
      broker3.stop();
      Thread.sleep(1000);
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server2ID);
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      broker2.stop();
      Thread.sleep(1000);
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server3ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server3ID);
    // TODO Test ReplicationServerDomain.getDestinationServers method.
@@ -1115,6 +1103,12 @@
    }
  }
  private Set<Integer> getConnectedDSServerIds(ReplicationServer changelog)
  {
    ReplicationServerDomain domain = changelog.getReplicationServerDomain(baseDn.toNormalizedString());
    return domain.getConnectedDSs().keySet();
  }
  @Test(enabled=true, groups="slow")
  public void initializeTargetExportMultiSS() throws Exception
  {