mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.57.2013 de65d8f6f9832b272f2d707f76d35b8fab879197
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -800,7 +800,7 @@
      {
        if (allowUnknownDomains)
          for (String providedDomain : startStatesFromProvidedCookie.keySet())
            if (rs.getReplicationServerDomain(providedDomain, false) == null)
            if (rs.getReplicationServerDomain(providedDomain) == null)
              // the domain provided in the cookie is not replicated
              startStatesFromProvidedCookie.remove(providedDomain);
      }
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -189,18 +189,19 @@
  /**
   * Returns the Replication Server Domain to which belongs this handler.
   *
   * @param createIfNotExist    Creates the domain if it does not exist.
   * @param waitConnections     Waits for the Connections with other RS to
   *                            be established before returning.
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain(
      boolean createIfNotExist, boolean waitConnections)
  public ReplicationServerDomain getDomain(boolean waitConnections)
  {
    if (replicationServerDomain==null)
    if (replicationServerDomain == null)
    {
      replicationServerDomain = replicationServer.getReplicationServerDomain(
            baseDN, createIfNotExist, waitConnections);
      replicationServerDomain =
          replicationServer.getReplicationServerDomain(baseDN, true);
      if (waitConnections) {
        replicationServer.waitConnections();
      }
    }
    return replicationServerDomain;
  }
@@ -476,7 +477,7 @@
        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
    // Build a list of candidates iterator (i.e. db i.e. server)
    for (int serverId : replicationServerDomain.getServers())
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server
      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
@@ -525,20 +526,19 @@
       * the number of updates to be sent is the size of the receive queue.
       */
      if (following)
        return msgQueue.count();
      else
      {
        /*
         * When the server  is not able to follow, the msgQueue
         * may become too large and therefore won't contain all the
         * changes. Some changes may only be stored in the backing DB
         * of the servers.
         * The total size of the receive queue is calculated by doing
         * the sum of the number of missing changes for every dbHandler.
         */
        ServerState dbState = replicationServerDomain.getDbServerState();
        return ServerState.diffChanges(dbState, serverState);
        return msgQueue.count();
      }
      /*
       * When the server is not able to follow, the msgQueue may become too
       * large and therefore won't contain all the changes. Some changes may
       * only be stored in the backing DB of the servers.
       * The total size of the receive queue is calculated by doing the sum of
       * the number of missing changes for every dbHandler.
       */
      ServerState dbState = replicationServerDomain.getDbServerState();
      return ServerState.diffChanges(dbState, serverState);
    }
  }
@@ -639,7 +639,7 @@
    {
      this.baseDN = baseDN;
      if (!baseDN.equalsIgnoreCase("cn=changelog"))
        this.replicationServerDomain = getDomain(true, isDataServer);
        this.replicationServerDomain = getDomain(isDataServer);
    }
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -624,7 +624,7 @@
      final LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation, final ChangeNumber previousCN)
  {
    for (int serverId : rsd.getServers())
    for (int serverId : rsd.getServerIds())
    {
      if (exportConfig != null && exportConfig.isCancelled())
      { // Abort if cancelled
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -430,14 +430,9 @@
            // FIXME: this will need changing if we ever support listening on
            // specific addresses.
            if (isLocalAddress(inetAddress) && (port == replicationPort))
            {
              continue;
            }
            if ((isLocalAddress(inetAddress) && port == replicationPort)
            // Don't connect to a server if it is already connected.
            final String normalizedServerURL = normalizeServerURL(rsURL);
            if (connectedRSUrls.contains(normalizedServerURL))
                || connectedRSUrls.contains(normalizeServerURL(rsURL)))
            {
              continue;
            }
@@ -726,15 +721,12 @@
   *
   * @param baseDn The base Dn for which the ReplicationServerDomain must be
   * returned.
   * @param create Specifies whether to create the ReplicationServerDomain if
   *        it does not already exist.
   * @return The ReplicationServerDomain associated to the base DN given in
   *         parameter.
   */
  public ReplicationServerDomain getReplicationServerDomain(String baseDn,
          boolean create)
  public ReplicationServerDomain getReplicationServerDomain(String baseDn)
  {
    return getReplicationServerDomain(baseDn, create, false);
    return getReplicationServerDomain(baseDn, false);
  }
  /**
@@ -745,68 +737,62 @@
   * returned.
   * @param create Specifies whether to create the ReplicationServerDomain if
   *        it does not already exist.
   * @param waitConnections     Waits for the Connections with other RS to
   *                            be established before returning.
   * @return The ReplicationServerDomain associated to the base DN given in
   *         parameter.
   */
  public ReplicationServerDomain getReplicationServerDomain(String baseDn,
          boolean create, boolean waitConnections)
      boolean create)
  {
    ReplicationServerDomain domain;
    synchronized (baseDNs)
    {
      domain = baseDNs.get(baseDn);
      if (domain != null ||!create) {
        return domain;
      ReplicationServerDomain domain = baseDNs.get(baseDn);
      if (domain == null && create) {
        domain = new ReplicationServerDomain(baseDn, this);
        baseDNs.put(baseDn, domain);
      }
      domain = new ReplicationServerDomain(baseDn, this);
      baseDNs.put(baseDn, domain);
      return domain;
    }
  }
    if (waitConnections)
  /**
   * Waits for connections to this ReplicationServer.
   */
  public void waitConnections()
  {
    // Acquire a domain ticket and wait for a complete cycle of the connect
    // thread.
    final long myDomainTicket;
    synchronized (connectThreadLock)
    {
      // Acquire a domain ticket and wait for a complete cycle of the connect
      // thread.
      final long myDomainTicket;
      synchronized (connectThreadLock)
      {
        // Connect thread must be waiting.
        synchronized (domainTicketLock)
        {
          // Determine the ticket which will be used in the next connect thread
          // iteration.
          myDomainTicket = domainTicket + 1;
        }
        // Wake up connect thread.
        connectThreadLock.notify();
      }
      // Wait until the connect thread has processed next connect phase.
      // Connect thread must be waiting.
      synchronized (domainTicketLock)
      {
        // Condition.
        while (myDomainTicket > domainTicket && !shutdown)
        // Determine the ticket which will be used in the next connect thread
        // iteration.
        myDomainTicket = domainTicket + 1;
      }
      // Wake up connect thread.
      connectThreadLock.notify();
    }
    // Wait until the connect thread has processed next connect phase.
    synchronized (domainTicketLock)
    {
      while (myDomainTicket > domainTicket && !shutdown)
      {
        try
        {
          try
          {
            // Wait with timeout so that we detect shutdown.
            domainTicketLock.wait(500);
          }
          catch (InterruptedException e)
          {
            // Can't do anything with this.
            Thread.currentThread().interrupt();
          }
          // Wait with timeout so that we detect shutdown.
          domainTicketLock.wait(500);
        }
        catch (InterruptedException e)
        {
          // Can't do anything with this.
          Thread.currentThread().interrupt();
        }
      }
    }
    return domain;
  }
  /**
@@ -1158,7 +1144,7 @@
   */
  public long getGenerationId(String baseDN)
  {
    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN, false);
    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    if (rsd!=null)
      return rsd.getGenerationId();
    return -1;
@@ -1924,4 +1910,34 @@
    return "RS(" + serverId + ") on " + serverURL + ", domains="
        + baseDNs.keySet();
  }
  /**
   * Initializes the generationId for the specified replication domain.
   *
   * @param baseDn
   *          the replication domain
   * @param generationId
   *          the the generationId value for initialization
   */
  public void initDomainGenerationID(String baseDn, long generationId)
  {
    getReplicationServerDomain(baseDn, true).initGenerationID(generationId);
  }
  /**
   * Adds the specified serverId to the specified replication domain.
   *
   * @param serverId
   *          the server Id to add to the replication domain
   * @param baseDn
   *          the replication domain where to add the serverId
   * @throws ChangelogException
   *           If a database error happened.
   */
  public void addServerIdToDomain(int serverId, String baseDn)
      throws ChangelogException
  {
    DbHandler dbHandler = newDbHandler(serverId, baseDn);
    getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler);
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -120,7 +120,7 @@
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<Integer, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Integer, DbHandler>();
      new ConcurrentHashMap<Integer, DbHandler>();
  /** The ReplicationServer that created the current instance. */
  private ReplicationServer localReplicationServer;
@@ -1269,12 +1269,13 @@
  }
  /**
   * Return a set containing the server that produced update and known by
   * this replicationServer from all over the topology,
   * whatever directly connected of connected to another RS.
   * @return a set containing the servers known by this replicationServer.
   * Returns a set containing the serverIds that produced updates and known by
   * this replicationServer from all over the topology, whether directly
   * connected or connected to another RS.
   *
   * @return a set containing the serverIds known by this replicationServer.
   */
  public Set<Integer> getServers()
  public Set<Integer> getServerIds()
  {
    return sourceDbHandlers.keySet();
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -562,7 +562,7 @@
    public String getMonitorInstanceName()
    {
      ReplicationServerDomain domain = replicationServer
          .getReplicationServerDomain(baseDn, false);
          .getReplicationServerDomain(baseDn);
      return "Changelog for DS(" + serverId + "),cn="
          + domain.getMonitorInstanceName();
    }
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -340,8 +340,8 @@
            continue;
          }
          final ReplicationServerDomain domain = replicationServer
              .getReplicationServerDomain(baseDN, false);
          final ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(baseDN);
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -203,8 +203,7 @@
              + this.replicationServer.getMonitorInstanceName()
              + " Has read baseDn=" + baseDn + " generationId=" + generationId);
        replicationServer.getReplicationServerDomain(baseDn, true)
            .initGenerationID(generationId);
        replicationServer.initDomainGenerationID(baseDn, generationId);
      }
      status = cursor.getNext(key, data, LockMode.DEFAULT);
    }
@@ -236,12 +235,7 @@
              + this.replicationServer.getMonitorInstanceName()
              + " Has read: baseDn=" + baseDn + " serverId=" + serverId);
        DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this,
                replicationServer.getQueueSize());
        replicationServer.getReplicationServerDomain(baseDn, true)
            .setDbHandler(serverId, dbHandler);
        replicationServer.addServerIdToDomain(serverId, baseDn);
      }
      status = cursor.getNext(key, data, LockMode.DEFAULT);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -830,14 +830,13 @@
      publishDeleteMsgInOTest(s2test, cn9, tn, 9);
      sleep(500);
      ReplicationServerDomain rsd =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
      ReplicationServerDomain rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
      ServerState startState = rsd.getStartState();
      assertEquals(startState.getChangeNumber(s1test.getServerId()).getSeqnum(), 1);
      assertTrue(startState.getChangeNumber(s2test.getServerId()) != null);
      assertEquals(startState.getChangeNumber(s2test.getServerId()).getSeqnum(), 7);
      rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2, false);
      rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2);
      startState = rsd.getStartState();
      assertEquals(startState.getChangeNumber(s2test2.getServerId()).getSeqnum(), 2);
      assertEquals(startState.getChangeNumber(s1test2.getServerId()).getSeqnum(), 6);
@@ -1012,8 +1011,8 @@
      // ---
      // 2. Now set up a very short purge delay on the replication changelogs
      // so that this test can play with a trimmed changelog.
      d1 = replicationServer.getReplicationServerDomain("o=test", false);
      d2 = replicationServer.getReplicationServerDomain("o=test2", false);
      d1 = replicationServer.getReplicationServerDomain("o=test");
      d2 = replicationServer.getReplicationServerDomain("o=test2");
      d1.setPurgeDelay(1);
      d2.setPurgeDelay(1);
@@ -2331,8 +2330,7 @@
      publishDeleteMsgInOTest(s2test, cn9, tn, 9);
      sleep(500);
      ReplicationServerDomain rsd1 =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
      ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
      rsd1.getDbServerState();
      rsd1.getChangeTimeHeartbeatState();
      debugInfo(tn, rsd1.getBaseDn()
@@ -2342,8 +2340,7 @@
          + " rs eligibleCN=" + replicationServer.getEligibleCN());
      // FIXME:ECL Enable this test by adding an assert on the right value
      ReplicationServerDomain rsd2 =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2, false);
      ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2);
      rsd2.getDbServerState();
      rsd2.getChangeTimeHeartbeatState();
      debugInfo(tn, rsd2.getBaseDn()
@@ -2900,8 +2897,7 @@
    final ChangeNumber cn2 = cns[1];
    final ChangeNumber cn3 = cns[2];
    ReplicationServerDomain rsdtest =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
    ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
    // this empty state will force to count from the start of the DB
    final ServerState fromStart = new ServerState();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -27,6 +27,11 @@
 */
package org.opends.server.replication;
import java.io.File;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -43,6 +48,7 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationBackend;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tasks.LdifFileWriter;
import org.opends.server.types.*;
@@ -50,11 +56,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -910,23 +911,17 @@
      debugInfo("RS1 must have been cleared since it has not the proper generation ID");
      checkChangelogSize(0);
      assertTrue(!replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server1ID),
      "Expecting that DS1 status in RS1 is : not in bad gen id.");
      assertFalse(isDegradedDueToGenerationId(replServer1, server1ID),
          "Expecting that DS1 status in RS1 is : not in bad gen id.");
      //===============================================================
      debugInfo(testCase + " ** TEST ** Previous test set a new gen ID on the "+
          "topology, verify degradation of DS2 and DS3");
      assertTrue(replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server2ID),
      "Expecting that DS2 with old gen ID is in bad gen id from RS1");
      assertTrue(replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server3ID),
      "Expecting that DS3 with old gen ID is in bad gen id from RS1");
      assertTrue(isDegradedDueToGenerationId(replServer1, server2ID),
          "Expecting that DS2 with old gen ID is in bad gen id from RS1");
      assertTrue(isDegradedDueToGenerationId(replServer1, server3ID),
          "Expecting that DS3 with old gen ID is in bad gen id from RS1");
      debugInfo("Add entries to DS1, update should not be sent to DS2 and DS3 that are in bad gen id");
      String[] ent3 = { createEntry(UUID.randomUUID()) };
@@ -1013,18 +1008,12 @@
      checkChangelogSize(1);
      debugInfo("Verifying that DS2 is not in bad gen id any more");
      assertTrue(!replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server2ID),
      "Expecting that DS2 is not in bad gen id from RS1");
      assertFalse(isDegradedDueToGenerationId(replServer1, server2ID),
          "Expecting that DS2 is not in bad gen id from RS1");
      debugInfo("Verifying that DS3 is not in bad gen id any more");
      assertTrue(!replServer1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
          isDegradedDueToGenerationId(server3ID),
      "Expecting that DS3 is not in bad gen id from RS1");
      assertFalse(isDegradedDueToGenerationId(replServer1, server3ID),
          "Expecting that DS3 is not in bad gen id from RS1");
      debugInfo("Verify that DS2 receives the add message stored in RS1 DB");
      try
@@ -1190,10 +1179,8 @@
        fail("Broker connection is expected to be accepted.");
      }
      debugInfo(
        "Expecting that broker2 is not in bad gen id since it has a correct genId");
      assertTrue(!replServer1.getReplicationServerDomain(baseDn.toNormalizedString(), false).
        isDegradedDueToGenerationId(server2ID));
      debugInfo("Expecting that broker2 is not in bad gen id since it has a correct genId");
      assertFalse(isDegradedDueToGenerationId(replServer1, server2ID));
      debugInfo("Disconnecting DS from replServer1");
      disconnectFromReplServer(changelog1ID);
@@ -1229,10 +1216,8 @@
        fail("Broker connection is expected to be accepted.");
      }
      debugInfo(
        "Expecting that broker3 is in bad gen id since it has a bad genId");
      assertTrue(replServer1.getReplicationServerDomain(baseDn.toNormalizedString(), false).
        isDegradedDueToGenerationId(server3ID));
      debugInfo("Expecting that broker3 is in bad gen id since it has a bad genId");
      assertTrue(isDegradedDueToGenerationId(replServer1, server3ID));
      int found = testEntriesInDb();
      assertEquals(found, updatedEntries.length,
@@ -1302,6 +1287,12 @@
    }
  }
  private boolean isDegradedDueToGenerationId(ReplicationServer rs, int serverId)
  {
    ReplicationServerDomain domain = rs.getReplicationServerDomain(baseDn.toNormalizedString());
    return domain.isDegradedDueToGenerationId(serverId);
  }
  /**
   * Disconnect broker and remove entries from the local DB
   */
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -48,6 +48,7 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.*;
@@ -1072,34 +1073,21 @@
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      // Check that the list of connected LDAP servers is correct
      // in each replication servers
      Set<Integer> l1 = changelog1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l1).containsExactly(server1ID);
      Set<Integer> l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID, server3ID);
      Set<Integer> l3 = changelog3.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l3).isEmpty();
      // Check that the list of connected LDAP servers is correct in each replication servers
      Assertions.assertThat(getConnectedDSServerIds(changelog1)).containsExactly(server1ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server2ID, server3ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog3)).isEmpty();
      // Test updates
      broker3.stop();
      Thread.sleep(1000);
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server2ID);
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      broker2.stop();
      Thread.sleep(1000);
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server3ID);
      Assertions.assertThat(getConnectedDSServerIds(changelog2)).containsExactly(server3ID);
    // TODO Test ReplicationServerDomain.getDestinationServers method.
@@ -1115,6 +1103,12 @@
    }
  }
  private Set<Integer> getConnectedDSServerIds(ReplicationServer changelog)
  {
    ReplicationServerDomain domain = changelog.getReplicationServerDomain(baseDn.toNormalizedString());
    return domain.getConnectedDSs().keySet();
  }
  @Test(enabled=true, groups="slow")
  public void initializeTargetExportMultiSS() throws Exception
  {