mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
02.22.2007 3fcf167e0af7a8f405442b3b4c8b07404ec6c0ff
opendj-sdk/opends/src/server/org/opends/server/synchronization/common/ServerState.java
@@ -57,6 +57,19 @@
    list = new HashMap<Short, ChangeNumber>();
  }
  /**
   * Empty the ServerState.
   * After this call the Server State will be in the same state
   * as if it was just created.
   */
  public void clear()
  {
    synchronized (this)
    {
      list.clear();
    }
  }
  /**
   * Creates a new ServerState object from its encoded form.
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -163,6 +163,7 @@
     * Open Socket to the Changelog
     * Send the Start message
     */
    shutdown = false;
    this.servers = servers;
    if (servers.size() < 1)
    {
@@ -173,6 +174,7 @@
               message, msgID);
    }
    this.rcvWindow = this.maxRcvWindow;
    this.connect();
  }
@@ -537,7 +539,9 @@
   */
  public void stop()
  {
    changelogServer = "stopped";
    shutdown = true;
    connected = false;
    try
    {
      session.close();
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -445,8 +445,10 @@
  /**
   * Finds the Synchronization domain for a given DN.
   *
   * @param dn The DN for which the domain must be returned.
   * @return The Synchronization domain for this DN.
   * @param dn   The DN for which the domain must be returned.
   * @param op   An optional operation for which the check is done.
   *             Can be null is the request has no associated operation.
   * @return     The Synchronization domain for this DN.
   */
  private static SynchronizationDomain findDomain(DN dn, Operation op)
  {
@@ -454,7 +456,7 @@
     * Don't run the special synchronization code on Operation that are
     * specifically marked as don't synchronize.
     */
    if (op.dontSynchronize())
    if ((op != null) && op.dontSynchronize())
      return null;
    SynchronizationDomain domain = null;
@@ -489,6 +491,50 @@
    return;
  }
  /**
   * Handle a Notification of restore start from the core server.
   *
   * @param dn The baseDn of the restore.
   */
  public static void notificationRestoreStart(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.disable();
  }
  /**
   * Handle a Notification of restore end from the core server.
   *
   * @param dn The baseDn of the restore.
   */
  public static void notificationRestoreEnd(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.enable();
  }
  /**
   * Handle a Notification of backup start from the core server.
   *
   * @param dn The baseDn of the backup.
   */
  public static void notificationBackupStart(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.backupStart();
  }
  /**
   * Handle a Notification of backup end from the core server.
   *
   * @param dn The baseDn of the backup.
   */
  public static void notificationBackupEnd(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.backupEnd();
  }
}
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -117,7 +117,7 @@
  /**
   * Load the ServerState from the backing entry in database to memory.
   */
  private void loadState()
  public void loadState()
  {
    /*
     * Read the serverState from the database,
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -158,6 +158,8 @@
  private boolean solveConflictFlag = true;
  private boolean disabled = false;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
  static final String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
@@ -1093,18 +1095,10 @@
  @Override
  public void run()
  {
    /* synchroThreads
     * create the threads that will wait for incoming changes
     * TODO : should use a pool of threads shared between all the servers
     * TODO : need to make number of thread configurable
     * TODO : need to handle operation dependencies
    /*
     * create the threads that will wait for incoming changes.
     */
    for (int i=0; i<listenerThreadNumber; i++)
    {
      ListenerThread myThread = new ListenerThread(this);
      myThread.start();
      synchroThreads.add(myThread);
    }
    createListeners();
    while (shutdown  == false)
    {
@@ -1113,8 +1107,11 @@
        synchronized (this)
        {
          this.wait(1000);
          // save the RUV
          state.save();
          if (!disabled )
          {
            // save the RUV
            state.save();
          }
        }
      } catch (InterruptedException e)
      { }
@@ -1123,6 +1120,22 @@
  }
  /**
   * create the threads that will wait for incoming changes.
   * TODO : should use a pool of threads shared between all the servers
   * TODO : need to make number of thread configurable
   */
  private void createListeners()
  {
    synchroThreads.clear();
    for (int i=0; i<listenerThreadNumber; i++)
    {
      ListenerThread myThread = new ListenerThread(this);
      myThread.start();
      synchroThreads.add(myThread);
    }
  }
  /**
   * Shutdown this SynchronizationDomain.
   */
  public void shutdown()
@@ -1854,4 +1867,70 @@
  {
    return solveConflictFlag;
  }
  /**
   * Disable the Synchronization on this domain.
   * The session to the Synchronization server will be stopped.
   * The domain will not be destroyed but call to the pre-operation
   * methods will result in failure.
   * The listener threads will be destroyed.
   * The monitor informations will still be accessible.
   */
  public void disable()
  {
    state.save();
    state.clear();
    disabled = true;
    //  stop the listener threads
    for (ListenerThread thread : synchroThreads)
    {
      thread.shutdown();
    }
    broker.stop(); // this will cut the session and wake-up the listeners
  }
  /**
   * Enable back the domain after a previous disable.
   * The domain will connect back to a Synchronization Server and
   * will recreate threads to listen for messages from the Sycnhronization
   * server.
   * The ServerState will also be read again from the local database.
   */
  public void enable()
  {
    state.clear();
    state.loadState();
    disabled = false;
    try
    {
      broker.start(changelogServers);
    } catch (Exception e)
    {
      /* TODO should mark that changelog service is
       * not available, log an error and retry upon timeout
       * should we stop the modifications ?
       */
      e.printStackTrace();
      return;
    }
    createListeners();
  }
  /**
   * Do whatever is needed when a backup is started.
   * We need to make sure that the serverState is correclty save.
   */
  public void backupStart()
  {
    state.save();
  }
  /**
   * Do whatever is needed when a backup is finished.
   */
  public void backupEnd()
  {
    // Nothing is needed at the moment
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ReSyncTest.java
New file
@@ -0,0 +1,335 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.io.File;
import java.net.ServerSocket;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Test re-synchronization after after backup/restore and LDIF import.
 */
public class ReSyncTest extends SynchronizationTestCase
{
 /**
  * Set up the environment for performing the tests in this Class.
  *
  * @throws Exception
  *           If the environment could not be set up.
  */
 @BeforeClass
  public void setup() throws Exception
  {
   /*
    * - Start a server and a changelog server, configure synchronization
    * - Do some changes.
    */
    TestCaseUtils.startServer();
    // find  a free port for the changelog server
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    socket.close();
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
    //  Create backend top level entries
    addEntry("dn: dc=example,dc=com\n" + "objectClass: top\n"
        + "objectClass: domain\n");
    // top level synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Multimaster Synchro plugin
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
        + synchroStringDN;
    String synchroPluginLdif = "dn: "
        + synchroPluginStringDN
        + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider\n"
        + "ds-cfg-synchronization-provider-enabled: true\n"
        + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
    synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n"
        + "ds-cfg-changelog-port:" + changelogPort + "\n"
        + "ds-cfg-changelog-server-id: 1\n";
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    // suffix synchronized
    String synchroServerLdif = "dn: cn=example, " + synchroPluginStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider-config\n"
        + "cn: example\n"
        + "ds-cfg-synchronization-dn: dc=example,dc=com\n"
        + "ds-cfg-changelog-server: localhost:"+ changelogPort + "\n"
        + "ds-cfg-directory-server-id: 1\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    configureSynchronization();
    // Create a dummy entry
    addEntry("dn: dc=dummy, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
  }
  /**
   * Utility function. Can be used to create and add and entry
   * in the local DS from its ldif description.
   *
   * @param entryString  The entry in ldif from.
   * @return             The ResultCode of the operation.
   * @throws Exception   If something went wrong.
   */
  private ResultCode addEntry(String entryString) throws Exception
  {
    Entry entry;
    AddOperation addOp;
    entry = TestCaseUtils.entryFromLdifString(entryString);
    addOp = new AddOperation(connection,
       InternalClientConnection.nextOperationID(), InternalClientConnection
       .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
       entry.getUserAttributes(), entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    entryList.add(entry.getDN());
    return addOp.getResultCode();
  }
  /**
   * Test re-synchronization after after backup/restore
   */
  @Test()
  public void testResyncAfterRestore() throws Exception
  {
    /*
     * - Backup the server
     * - ADD an entry
     * - Restore the backup taken previously
     * - Check that entry has been added again in the LDAP server.
     */
    // Delete the entry we are going to use to make sure that
    // we do test something.
    connection.processDelete(DN.decode("dc=foo, dc=example,dc=com"));
    MultimasterSynchronization.notificationBackupStart(
        DN.decode("dc=example,dc=com")); // TEMPORARY UNTIL core server
                                        // sends a notification
    task("dn: ds-task-id=" + UUID.randomUUID()
        +  ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-backup\n"
        + "ds-task-class-name: org.opends.server.tasks.BackupTask\n"
        + "ds-backup-directory-path: bak\n"
        + "ds-task-backup-all: TRUE\n");
    addEntry("dn: dc=foo, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
    MultimasterSynchronization.notificationRestoreStart(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       //  sends a notification
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-restore\n"
        + "ds-task-class-name: org.opends.server.tasks.RestoreTask\n"
        + "ds-backup-directory-path: bak" + File.separator
        + "userRoot\n");
    MultimasterSynchronization.notificationRestoreEnd(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       // sends a notification
   if (getEntry(DN.decode("dc=foo, dc=example,dc=com"), 30000, true) == null)
     fail("The Directory has not been resynchronized after the restore.");
   connection.processDelete(DN.decode("dc=foo, dc=example,dc=com"));
  }
  /**
   * Test re-synchronization after after backup/restore
   */
  @Test()
  public void testResyncAfterImport() throws Exception
  {
    /*
     * - Do an export to a LDIF file
     * - Add an entry
     * - Import LDIF file generated above.
     * - Check that entry has been added again in the LDAP server.
     */
    // delete the entry we are going to use to make sure that
    // we do test something.
    connection.processDelete(DN.decode("dc=foo, dc=example,dc=com"));
    MultimasterSynchronization.notificationBackupStart(
        DN.decode("dc=example,dc=com")); // TEMPORARY UNTIL core server
                                        // sends a notification
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = buildRoot + File.separator + "ReSynchTest";
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-export\n"
        + "ds-task-class-name: org.opends.server.tasks.ExportTask\n"
        + "ds-task-export-backend-id: userRoot\n"
        + "ds-task-export-ldif-file: " + path + "\n");
    addEntry("dn: dc=foo, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
    MultimasterSynchronization.notificationRestoreStart(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       //  sends a notification
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-import\n"
        + "ds-task-class-name: org.opends.server.tasks.ImportTask\n"
        + "ds-task-import-backend-id: userRoot\n"
        + "ds-task-import-ldif-file: " + path + "\n"
        + "ds-task-import-reject-file: " + path + "reject\n");
    MultimasterSynchronization.notificationRestoreEnd(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       //  sends a notification
   if (getEntry(DN.decode("dc=foo, dc=example,dc=com"), 30000, true) == null)
     fail("The Directory has not been resynchronized after the restore.");
  }
  /**
   * Utility method to create, run a task and check its result.
   */
  private void task(String task) throws Exception
  {
    Entry taskEntry = TestCaseUtils.makeEntry(task);
    InternalClientConnection connection =
         InternalClientConnection.getRootConnection();
    // Add the task.
    AddOperation addOperation =
         connection.processAdd(taskEntry.getDN(),
                               taskEntry.getObjectClasses(),
                               taskEntry.getUserAttributes(),
                               taskEntry.getOperationalAttributes());
    assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
                 "Add of the task definition was not successful");
    // Wait until the task completes.
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
         ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    String completionTime = null;
    long startMillisecs = System.currentTimeMillis();
    do
    {
      InternalSearchOperation searchOperation =
           connection.processSearch(taskEntry.getDN(),
                                    SearchScope.BASE_OBJECT,
                                    filter);
      try
      {
        resultEntry = searchOperation.getSearchEntries().getFirst();
      } catch (Exception e)
      {
        continue;
      }
      completionTime =
           resultEntry.getAttributeValue(completionTimeType,
                                         DirectoryStringSyntax.DECODER);
      if (completionTime == null)
      {
        if (System.currentTimeMillis() - startMillisecs > 1000*30)
        {
          break;
        }
        Thread.sleep(10);
      }
    } while (completionTime == null);
    if (completionTime == null)
    {
      fail("The task has not completed after 30 seconds.");
    }
    // Check that the task state is as expected.
    AttributeType taskStateType =
         DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
    String stateString =
         resultEntry.getAttributeValue(taskStateType,
                                       DirectoryStringSyntax.DECODER);
    TaskState taskState = TaskState.fromString(stateString);
    assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
                 "The task completed in an unexpected state");
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java
@@ -77,10 +77,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -394,4 +394,52 @@
    return found;
  }
  /**
   * Retrieves an entry from the local Directory Server.
   * @throws Exception When the entry cannot be locked.
   */
  protected Entry getEntry(DN dn, int timeout, boolean exist) throws Exception
  {
    int count = timeout/200;
    if (count<1)
      count=1;
    Thread.sleep(50);
    boolean found = DirectoryServer.entryExists(dn);
    while ((count> 0) && (found != exist))
    {
      Thread.sleep(200);
      found = DirectoryServer.entryExists(dn);
      count--;
    }
    Lock lock = null;
    for (int i=0; i < 3; i++)
    {
      lock = LockManager.lockRead(dn);
      if (lock != null)
      {
        break;
      }
    }
    if (lock == null)
    {
      throw new Exception("could not lock entry " + dn);
    }
    try
    {
      Entry entry = DirectoryServer.getEntry(dn);
      if (entry == null)
        return null;
      else
        return entry.duplicate();
    }
    finally
    {
      LockManager.unlock(dn, lock);
    }
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -1079,54 +1079,6 @@
  }
  /**
   * Retrieves an entry from the local Directory Server.
   * @throws Exception When the entry cannot be locked.
   */
  private Entry getEntry(DN dn, int timeout, boolean exist)
               throws Exception
  {
    int count = timeout/200;
    if (count<1)
      count=1;
    boolean found = DirectoryServer.entryExists(dn);
    while ((count> 0) && (found != exist))
    {
      Thread.sleep(200);
      found = DirectoryServer.entryExists(dn);
      count--;
    }
    Lock lock = null;
    for (int i=0; i < 3; i++)
    {
      lock = LockManager.lockRead(dn);
      if (lock != null)
      {
        break;
      }
    }
    if (lock == null)
    {
      throw new Exception("could not lock entry " + dn);
    }
    try
    {
      Entry entry = DirectoryServer.getEntry(dn);
      if (entry == null)
        return null;
      else
        return entry.duplicate();
    }
    finally
    {
      LockManager.unlock(dn, lock);
    }
  }
  /**
   * Test case for
   * [Issue 635] NullPointerException when trying to access non existing entry.
   */