mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
13.01.2008 5589e80c0344aafd9db1b24ef4a820486620322a
opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumberGenerator.java
@@ -100,7 +100,7 @@
  }
  /**
   * Adjust the lastTime and seqnum of this Changenumber generator with
   * Adjust the lastTime of this Changenumber generator with
   * a ChangeNumber that we have received from another server.
   * This is necessary because we need that the changenumber generated
   * after processing an update received from other hosts to be larger
@@ -110,10 +110,17 @@
   */
  public void adjust(ChangeNumber number)
  {
    if (number==null)
    {
      lastTime = TimeThread.getTime();
      seqnum = 0;
      return;
    }
    long rcvdTime = number.getTime();
    /* need to synchronize with NewChangeNumber method so that we
     * protect writing of seqnum and lastTime fields
     * protect writing lastTime fields
     */
    synchronized(this)
    {
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -477,13 +477,16 @@
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     * for each operation done on this replication domain.
     *
     * The generator time is adjusted to the time of the last CN received from
     * remote other servers.
     */
    ChangeNumberGenerator generator =
      new ChangeNumberGenerator(serverId, state);
    pendingChanges =
      new PendingChanges(new ChangeNumberGenerator(serverId, state),
      new PendingChanges(generator,
                         broker, state);
    remotePendingChanges = new RemotePendingChanges(generator, state);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -28,12 +28,15 @@
package org.opends.server.replication;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import org.opends.server.loggers.debug.DebugTracer;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.opends.messages.Category;
@@ -54,6 +57,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.LDAPException;
import org.opends.server.types.Modification;
import org.opends.server.types.Operation;
@@ -62,6 +66,7 @@
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.opends.server.types.Attribute;
/**
 * Test the contructors, encoders and decoders of the Replication AckMsg,
@@ -147,7 +152,7 @@
      Thread.sleep(500);
      // check that the replicationServer only sent WINDOW_SIZE messages
      assertTrue(searchUpdateSent());
      searchUpdateSent();
      int rcvCount=0;
      try
@@ -209,15 +214,16 @@
   * And that the number of waiting changes is accurate.
   * Do this by checking the monitoring information.
   */
  private boolean searchUpdateSent() throws Exception
  private void searchUpdateSent() throws Exception
  {
    InternalSearchOperation op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    if (op.getEntriesSent() != 1)
      return false;
    assertEquals(op.getEntriesSent(), 1,
        "Entries#=" + op.getEntriesSent());
    op = connection.processSearch(
        new ASN1OctetString("cn=monitor"),
@@ -225,8 +231,21 @@
        LDAPFilter.decode("(missing-changes=" +
            (REPLICATION_QUEUE_SIZE + WINDOW_SIZE) + ")"));
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    return (op.getEntriesSent() == 1);
    Iterator<SearchResultEntry> entriesit = op.getSearchEntries().iterator();
    while(entriesit.hasNext())
    {
      SearchResultEntry e = entriesit.next();
      Iterator<Attribute> attit = e.getAttributes().iterator();
      while (attit.hasNext())
      {
        Attribute attr = attit.next();
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        e.getDN() + "= " + attr.getName() + " " + attr.getValues().iterator()
        .next().getStringValue()));
      }
    }
    assertEquals(op.getEntriesSent(), 1, "Entries#=" + op.getEntriesSent());
  }
  /**
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -98,6 +98,11 @@
  private Entry personWithUUIDEntry;
  private Entry personWithSecondUniqueID;
  private Entry  user3Entry;
  private String user3dn;
  private String user3lLDIFEntry;
  private String user3UUID;
  private String baseUUID;
  private String user1dn;
@@ -120,6 +125,8 @@
  private Entry domain1;
  private Entry domain2;
  private Entry domain3;
  Short domainSid = 55;
  /**
   * Set up the environment for performing the tests in this Class.
@@ -177,7 +184,8 @@
        + "cn: example\n"
        + "ds-cfg-base-dn: ou=People,dc=example,dc=com\n"
        + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
        + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
        + "ds-cfg-server-id: "+ domainSid +"\n"
        + "ds-cfg-receive-status: true\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
@@ -239,7 +247,25 @@
    personWithSecondUniqueID =
      TestCaseUtils.entryFromLdifString(entryWithSecondUUID);
    
    user3UUID = "44444444-4444-4444-4444-444444444444";
    user3dn = "uid=user3,ou=People,dc=example,dc=com";
    String user3LDIFEntry = "dn: "+ user3dn + "\n"
      + "objectClass: top\n" + "objectClass: person\n"
      + "objectClass: organizationalPerson\n"
      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
      + "homePhone: 951-245-7634\n"
      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
      + "mobile: 027-085-0537\n"
      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
      + "$Rockford, NC  85762\n" + "mail: user.3@example.com\n"
      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
      + "street: 17984 Thirteenth Street\n"
      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
      + "userPassword: password\n" + "initials: AA\n"
      + "entryUUID: " + user3UUID + "\n";
    user3Entry = TestCaseUtils.entryFromLdifString(user3LDIFEntry);
    domain1dn = "dc=domain1,ou=People,dc=example,dc=com";
    domain2dn = "dc=domain2,dc=domain1,ou=People,dc=example,dc=com";
    domain3dn = "dc=domain3,dc=domain1,ou=People,dc=example,dc=com";
@@ -1667,4 +1693,91 @@
      throw new RuntimeException("Cannot set receive status");
    }
  }
  /**
   * Test that the ReplicationDomain (plugin inside LDAP server) adjust
   * its internal change number generator to the last change number
   * received. Steps:
   * - create a domain with the current date in the CN generator
   * - make it receive an update with a CN in the future
   * - do a local operation replicated on that domain
   * - check that the update generated for that operation has a CN in the
   *   future.
   * @throws Exception
   */
  @Test(enabled=true)
  public void CNGeneratorAdjust() throws Exception
  {
    short serverId = 88;
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting synchronization test : CNGeneratorAdjust"));
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    ReplicationBroker broker =
      openReplicationSession(baseDn, serverId, 100, replServerPort, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the replicationServer.
     */
    long inTheFutur = System.currentTimeMillis() + (3600*1000);
    ChangeNumberGenerator gen = new ChangeNumberGenerator(serverId, inTheFutur);
    // Create and publish an update message to add an entry.
    AddMsg addMsg = new AddMsg(
        gen.newChangeNumber(),
        user3dn.toString(),
        user3UUID,
        baseUUID,
        user3Entry.getObjectClassAttribute(),
        user3Entry.getAttributes(),
        new ArrayList<Attribute>());
    broker.publish(addMsg);
    entryList.add(personWithUUIDEntry.getDN());
    Entry resultEntry;
    // Check that the entry has not been created in the directory server.
    resultEntry = getEntry(user3Entry.getDN(), 1000, true);
    assertNotNull(resultEntry, "The entry has not been created");
    // Modify the entry
    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
    ModifyOperationBasis modOp = new ModifyOperationBasis(
        connection,
        InternalClientConnection.nextOperationID(),
        InternalClientConnection.nextMessageID(),
        null,
        user3Entry.getDN(),
        mods);
    modOp.setInternalOperation(true);
    modOp.run();
    // See if the client has received the msg
    ReplicationMessage msg = broker.receive();
    assertTrue(msg instanceof ModifyMsg,
      "The received replication message is not a MODIFY msg");
    ModifyMsg modMsg = (ModifyMsg) msg;
    assertEquals(addMsg.getChangeNumber().getTimeSec(),
                 modMsg.getChangeNumber().getTimeSec(),
                "The MOD timestamp should have been adjusted to the ADD one");
    // Delete the entries to clean the database.
    DeleteMsg delMsg =
      new DeleteMsg(
          user3Entry.getDN().toString(),
          gen.newChangeNumber(),
          user3UUID);
    broker.publish(delMsg);
    // Check that the delete operation has been applied.
    resultEntry = getEntry(user3Entry.getDN(), 10000, false);
    assertNull(resultEntry,
        "The DELETE replication message was not replayed");
    broker.stop();
  }
}