mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

coulbeck
19.35.2007 d00ae6eb8ebca2516ab0543ee5d5b37d17befee3
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -431,6 +431,13 @@
        {
          /* was enabled and moved to disabled */
          broker.suspendReceive();
          // FIXME Need a way to stop these threads.
          // Setting the shutdown flag does not stop them until they have
          // consumed and discarded one more message each.
//          for (ListenerThread thread : synchroThreads)
//          {
//            thread.shutdown();
//          }
          synchroThreads.clear();
        }
        receiveStatus = newReceiveStatus;
@@ -1012,7 +1019,7 @@
     * TODO : need to make number of thread configurable
     * TODO : need to handle operation dependencies
     */
    for (int i=0; i<10; i++)
    for (int i=0; i<listenerThreadNumber; i++)
    {
      ListenerThread myThread = new ListenerThread(this);
      myThread.start();
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -181,7 +181,7 @@
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
        op.run();;
        op.run();
      }
    }
    catch (NoSuchElementException e) {
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -31,11 +31,8 @@
import static org.testng.Assert.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import org.opends.server.TestCaseUtils;
@@ -44,7 +41,6 @@
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.Historical;
import org.opends.server.synchronization.protocol.AddMsg;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
@@ -60,14 +56,16 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.types.*;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
 * Test the contructors, encoders and decoders of the synchronization AckMsg,
 * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
 * Test synchronization update operations on the directory server and through
 * the synchronization server broker interface.
 */
public class UpdateOperationTest extends SynchronizationTestCase
{
@@ -92,7 +90,6 @@
  /**
   * Set up the environment for performing the tests in this Class.
   * synchronization
   *
   * @throws Exception
   *           If the environment could not be set up.
@@ -231,6 +228,89 @@
  }
  /**
   * Tests whether the synchronization provider receive status can be disabled
   * then re-enabled.
   * FIXME Enable this test when broker suspend/resume receive are implemented.
   * @throws Exception
   */
  @Test(enabled=false)
  public void toggleReceiveStatus() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization test : toggleReceiveStatus" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
     * Open a session to the changelog server using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the changelog server.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
    // Disable the directory server receive status.
    setReceiveStatus(synchroServerEntry.getDN().toString(), false);
    // Create and publish an update message to add an entry.
    AddMsg addMsg = new AddMsg(gen.NewChangeNumber(),
        personWithUUIDEntry.getDN().toString(),
        user1entryUUID,
        baseUUID,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
    broker.publish(addMsg);
    entryList.add(personWithUUIDEntry.getDN());
    Entry resultEntry;
    // Check that the entry has not been created in the directory server.
    resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true);
    assertNull(resultEntry,
        "The synchronization message was replayed while the server " +
             "receive status was disabled");
    // Enable the directory server receive status.
    setReceiveStatus(synchroServerEntry.getDN().toString(), true);
    // Create and publish another update message to add an entry.
    addMsg = new AddMsg(gen.NewChangeNumber(),
        personWithUUIDEntry.getDN().toString(),
        user1entryUUID,
        baseUUID,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
    broker.publish(addMsg);
    // Check that the entry has been created in the directory server.
    resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, true);
    assertNotNull(resultEntry,
        "The synchronization message was not replayed after the server " +
             "receive status was enabled");
    // Delete the entries to clean the database.
    DeleteMsg delMsg =
      new DeleteMsg(personWithUUIDEntry.getDN().toString(),
          gen.NewChangeNumber(), user1entryUUID);
    broker.publish(delMsg);
    resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, false);
    // Check that the delete operation has been applied.
    assertNull(resultEntry,
        "The DELETE synchronization message was not replayed");
    broker.stop();
  }
  /**
   * Tests the naming conflict resolution code.
   * In this test, the local server act both as an LDAP server and
   * a changelog server that are inter-connected.
@@ -548,9 +628,9 @@
    /*
     * When replaying add operations it is possible that the parent entry has
     * been renamed before and that another entry have taken the former dn of
     * the parent entry. In such case the synchronization replay code should
     * the parent entry. In such case the synchronization replay code should
     * detect that the parent has been renamed and should add the entry below
     * the new dn of the parent (thus changing the original dn with which the
     * the new dn of the parent (thus changing the original dn with which the
     * entry had been created)
     *
     * Steps
@@ -559,7 +639,7 @@
     * - MODDN parent entry 1 to baseDn2 in the LDAP server
     * - add new parent entry 2 with baseDn1
     * - publish msg
     * - check that the Dn has been changed to baseDn2 in the msg received
     * - check that the Dn has been changed to baseDn2 in the msg received
     */
    // - create parent entry 1 with baseDn1
@@ -595,9 +675,9 @@
    // - MODDN parent entry 1 to baseDn2 in the LDAP server
    ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null,
        .nextMessageID(), null,
        DN.decode("ou=baseDn1,"+baseDn),
        RDN.decode("ou=baseDn2"), true,
        RDN.decode("ou=baseDn2"), true,
        baseDn);
    modDNOp.run();
    entryList.add(DN.decode("ou=baseDn2,"+baseDn));
@@ -1164,4 +1244,43 @@
         DirectoryServer.getDefaultAttributeType("replayed-updates");
    return entry.getAttributeValue(attrType, IntegerSyntax.DECODER).longValue();
  }
  /**
   * Enable or disable the receive status of a synchronization provider.
   *
   * @param syncConfigDN The DN of the synchronization provider configuration
   * entry.
   * @param enable Specifies whether the receive status should be enabled
   * or disabled.
   */
  private static void setReceiveStatus(String syncConfigDN, boolean enable)
  {
    ArrayList<ASN1OctetString> valueList = new ArrayList<ASN1OctetString>(1);
    if (enable)
    {
      valueList.add(new ASN1OctetString("TRUE"));
    }
    else
    {
      valueList.add(new ASN1OctetString("FALSE"));
    }
    LDAPAttribute a = new LDAPAttribute("ds-cfg-receive-status", valueList);
    LDAPModification m = new LDAPModification(ModificationType.REPLACE, a);
    ArrayList<LDAPModification> modList = new ArrayList<LDAPModification>(1);
    modList.add(m);
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    ASN1OctetString rawEntryDN =
         new ASN1OctetString(syncConfigDN);
    ModifyOperation internalModify = conn.processModify(rawEntryDN, modList);
    ResultCode resultCode = internalModify.getResultCode();
    if (resultCode != ResultCode.SUCCESS)
    {
      throw new RuntimeException("Cannot set receive status");
    }
  }
}