mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
13.01.2008 5589e80c0344aafd9db1b24ef4a820486620322a
An update sent by a replicated LDAP server is tagged with a ChangeNumber generated by a ChangeNumberGenerator. One component of the ChangeNumber is a timestamp,
Because we want the timestamp of the ChangeNumbers to be consistently growing across the whole topology and not only inside each server independently, the ChangeNumberGenerator must be adjusted with the timestamp of the updates
received from the other servers of the topology. The code was already here but erroneously used.

Unit test written accordingly. It also required some changes in ProtocolWindowTest unit test to isolate it from the other unit tests.


4 files modified
162 ■■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumberGenerator.java 11 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 7 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 29 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java 115 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumberGenerator.java
@@ -100,7 +100,7 @@
  }
  /**
   * Adjust the lastTime and seqnum of this Changenumber generator with
   * Adjust the lastTime of this Changenumber generator with
   * a ChangeNumber that we have received from another server.
   * This is necessary because we need that the changenumber generated
   * after processing an update received from other hosts to be larger
@@ -110,10 +110,17 @@
   */
  public void adjust(ChangeNumber number)
  {
    if (number==null)
    {
      lastTime = TimeThread.getTime();
      seqnum = 0;
      return;
    }
    long rcvdTime = number.getTime();
    /* need to synchronize with NewChangeNumber method so that we
     * protect writing of seqnum and lastTime fields
     * protect writing lastTime fields
     */
    synchronized(this)
    {
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -477,13 +477,16 @@
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     * for each operation done on this replication domain.
     *
     * The generator time is adjusted to the time of the last CN received from
     * remote other servers.
     */
    ChangeNumberGenerator generator =
      new ChangeNumberGenerator(serverId, state);
    pendingChanges =
      new PendingChanges(new ChangeNumberGenerator(serverId, state),
      new PendingChanges(generator,
                         broker, state);
    remotePendingChanges = new RemotePendingChanges(generator, state);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -28,12 +28,15 @@
package org.opends.server.replication;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import org.opends.server.loggers.debug.DebugTracer;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.opends.messages.Category;
@@ -54,6 +57,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.LDAPException;
import org.opends.server.types.Modification;
import org.opends.server.types.Operation;
@@ -62,6 +66,7 @@
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.opends.server.types.Attribute;
/**
 * Test the contructors, encoders and decoders of the Replication AckMsg,
@@ -147,7 +152,7 @@
      Thread.sleep(500);
      // check that the replicationServer only sent WINDOW_SIZE messages
      assertTrue(searchUpdateSent());
      searchUpdateSent();
      int rcvCount=0;
      try
@@ -209,15 +214,16 @@
   * And that the number of waiting changes is accurate.
   * Do this by checking the monitoring information.
   */
  private boolean searchUpdateSent() throws Exception
  private void searchUpdateSent() throws Exception
  {
    InternalSearchOperation op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    if (op.getEntriesSent() != 1)
      return false;
    assertEquals(op.getEntriesSent(), 1,
        "Entries#=" + op.getEntriesSent());
    op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
@@ -226,7 +232,20 @@
            (REPLICATION_QUEUE_SIZE + WINDOW_SIZE) + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 1);
    Iterator<SearchResultEntry> entriesit = op.getSearchEntries().iterator();
    while(entriesit.hasNext())
    {
      SearchResultEntry e = entriesit.next();
      Iterator<Attribute> attit = e.getAttributes().iterator();
      while (attit.hasNext())
      {
        Attribute attr = attit.next();
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        e.getDN() + "= " + attr.getName() + " " + attr.getValues().iterator()
        .next().getStringValue()));
      }
    }
    assertEquals(op.getEntriesSent(), 1, "Entries#=" + op.getEntriesSent());
  }
  /**
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -98,6 +98,11 @@
  private Entry personWithUUIDEntry;
  private Entry personWithSecondUniqueID;
  private Entry  user3Entry;
  private String user3dn;
  private String user3lLDIFEntry;
  private String user3UUID;
  private String baseUUID;
  private String user1dn;
@@ -121,6 +126,8 @@
  private Entry domain2;
  private Entry domain3;
  Short domainSid = 55;
  /**
   * Set up the environment for performing the tests in this Class.
   *
@@ -177,7 +184,8 @@
        + "cn: example\n"
        + "ds-cfg-base-dn: ou=People,dc=example,dc=com\n"
        + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
        + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
        + "ds-cfg-server-id: "+ domainSid +"\n"
        + "ds-cfg-receive-status: true\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
@@ -239,6 +247,24 @@
    personWithSecondUniqueID =
      TestCaseUtils.entryFromLdifString(entryWithSecondUUID);
    
    user3UUID = "44444444-4444-4444-4444-444444444444";
    user3dn = "uid=user3,ou=People,dc=example,dc=com";
    String user3LDIFEntry = "dn: "+ user3dn + "\n"
      + "objectClass: top\n" + "objectClass: person\n"
      + "objectClass: organizationalPerson\n"
      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
      + "homePhone: 951-245-7634\n"
      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
      + "mobile: 027-085-0537\n"
      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
      + "$Rockford, NC  85762\n" + "mail: user.3@example.com\n"
      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
      + "street: 17984 Thirteenth Street\n"
      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
      + "userPassword: password\n" + "initials: AA\n"
      + "entryUUID: " + user3UUID + "\n";
    user3Entry = TestCaseUtils.entryFromLdifString(user3LDIFEntry);
    
    domain1dn = "dc=domain1,ou=People,dc=example,dc=com";
    domain2dn = "dc=domain2,dc=domain1,ou=People,dc=example,dc=com";
@@ -1667,4 +1693,91 @@
      throw new RuntimeException("Cannot set receive status");
    }
  }
  /**
   * Test that the ReplicationDomain (plugin inside LDAP server) adjust
   * its internal change number generator to the last change number
   * received. Steps:
   * - create a domain with the current date in the CN generator
   * - make it receive an update with a CN in the future
   * - do a local operation replicated on that domain
   * - check that the update generated for that operation has a CN in the
   *   future.
   * @throws Exception
   */
  @Test(enabled=true)
  public void CNGeneratorAdjust() throws Exception
  {
    short serverId = 88;
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting synchronization test : CNGeneratorAdjust"));
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    ReplicationBroker broker =
      openReplicationSession(baseDn, serverId, 100, replServerPort, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the replicationServer.
     */
    long inTheFutur = System.currentTimeMillis() + (3600*1000);
    ChangeNumberGenerator gen = new ChangeNumberGenerator(serverId, inTheFutur);
    // Create and publish an update message to add an entry.
    AddMsg addMsg = new AddMsg(
        gen.newChangeNumber(),
        user3dn.toString(),
        user3UUID,
        baseUUID,
        user3Entry.getObjectClassAttribute(),
        user3Entry.getAttributes(),
        new ArrayList<Attribute>());
    broker.publish(addMsg);
    entryList.add(personWithUUIDEntry.getDN());
    Entry resultEntry;
    // Check that the entry has not been created in the directory server.
    resultEntry = getEntry(user3Entry.getDN(), 1000, true);
    assertNotNull(resultEntry, "The entry has not been created");
    // Modify the entry
    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
    ModifyOperationBasis modOp = new ModifyOperationBasis(
        connection,
        InternalClientConnection.nextOperationID(),
        InternalClientConnection.nextMessageID(),
        null,
        user3Entry.getDN(),
        mods);
    modOp.setInternalOperation(true);
    modOp.run();
    // See if the client has received the msg
    ReplicationMessage msg = broker.receive();
    assertTrue(msg instanceof ModifyMsg,
      "The received replication message is not a MODIFY msg");
    ModifyMsg modMsg = (ModifyMsg) msg;
    assertEquals(addMsg.getChangeNumber().getTimeSec(),
                 modMsg.getChangeNumber().getTimeSec(),
                "The MOD timestamp should have been adjusted to the ADD one");
    // Delete the entries to clean the database.
    DeleteMsg delMsg =
      new DeleteMsg(
          user3Entry.getDN().toString(),
          gen.newChangeNumber(),
          user3UUID);
    broker.publish(delMsg);
    // Check that the delete operation has been applied.
    resultEntry = getEntry(user3Entry.getDN(), 10000, false);
    assertNull(resultEntry,
        "The DELETE replication message was not replayed");
    broker.stop();
  }
}