mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
02.22.2007 0ed3ce1253d7276824493d90a8312a2c117ea1c7
issue 504,505,507 : res-synchronization after restore or import

The synchronization protocol already have some re-synchronization capabilities
that allows the synchronization server to detect where to start from the changelog
when a new LDAP server opens a session.

These changes implements the ability to do this when a restore or
an import is done when the LDAP server is online and already connected to a
synchronization server.

To achieve this the synchronization code, on notification from the core server
that a restore or an import is starting, close the session to the synchronization
server and restart it when the restore or import is finished.

The synchronization protocol then does its job of negociating where to start
from the changelog and resynchronizing the LDAP Server.

The core server does not yet provide the notification to the changelog server
but here if the synchronization part of the code along with some test for it.
1 files added
8 files modified
611 ■■■■ changed files
opends/src/server/org/opends/server/synchronization/common/ServerState.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java 105 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ReSyncTest.java 335 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 48 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java 48 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/common/ServerState.java
@@ -57,6 +57,19 @@
    list = new HashMap<Short, ChangeNumber>();
  }
  /**
   * Empty the ServerState.
   * After this call the Server State will be in the same state
   * as if it was just created.
   */
  public void clear()
  {
    synchronized (this)
    {
      list.clear();
    }
  }
  /**
   * Creates a new ServerState object from its encoded form.
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -163,6 +163,7 @@
     * Open Socket to the Changelog
     * Send the Start message
     */
    shutdown = false;
    this.servers = servers;
    if (servers.size() < 1)
    {
@@ -173,6 +174,7 @@
               message, msgID);
    }
    this.rcvWindow = this.maxRcvWindow;
    this.connect();
  }
@@ -537,7 +539,9 @@
   */
  public void stop()
  {
    changelogServer = "stopped";
    shutdown = true;
    connected = false;
    try
    {
      session.close();
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -445,8 +445,10 @@
  /**
   * Finds the Synchronization domain for a given DN.
   *
   * @param dn The DN for which the domain must be returned.
   * @return The Synchronization domain for this DN.
   * @param dn   The DN for which the domain must be returned.
   * @param op   An optional operation for which the check is done.
   *             Can be null is the request has no associated operation.
   * @return     The Synchronization domain for this DN.
   */
  private static SynchronizationDomain findDomain(DN dn, Operation op)
  {
@@ -454,7 +456,7 @@
     * Don't run the special synchronization code on Operation that are
     * specifically marked as don't synchronize.
     */
    if (op.dontSynchronize())
    if ((op != null) && op.dontSynchronize())
      return null;
    SynchronizationDomain domain = null;
@@ -489,6 +491,50 @@
    return;
  }
  /**
   * Handle a Notification of restore start from the core server.
   *
   * @param dn The baseDn of the restore.
   */
  public static void notificationRestoreStart(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.disable();
  }
  /**
   * Handle a Notification of restore end from the core server.
   *
   * @param dn The baseDn of the restore.
   */
  public static void notificationRestoreEnd(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.enable();
  }
  /**
   * Handle a Notification of backup start from the core server.
   *
   * @param dn The baseDn of the backup.
   */
  public static void notificationBackupStart(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.backupStart();
  }
  /**
   * Handle a Notification of backup end from the core server.
   *
   * @param dn The baseDn of the backup.
   */
  public static void notificationBackupEnd(DN dn)
  {
    SynchronizationDomain domain = findDomain(dn, null);
    domain.backupEnd();
  }
}
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -117,7 +117,7 @@
  /**
   * Load the ServerState from the backing entry in database to memory.
   */
  private void loadState()
  public void loadState()
  {
    /*
     * Read the serverState from the database,
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -158,6 +158,8 @@
  private boolean solveConflictFlag = true;
  private boolean disabled = false;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
  static final String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
@@ -1093,18 +1095,10 @@
  @Override
  public void run()
  {
    /* synchroThreads
     * create the threads that will wait for incoming changes
     * TODO : should use a pool of threads shared between all the servers
     * TODO : need to make number of thread configurable
     * TODO : need to handle operation dependencies
    /*
     * create the threads that will wait for incoming changes.
     */
    for (int i=0; i<listenerThreadNumber; i++)
    {
      ListenerThread myThread = new ListenerThread(this);
      myThread.start();
      synchroThreads.add(myThread);
    }
    createListeners();
    while (shutdown  == false)
    {
@@ -1113,8 +1107,11 @@
        synchronized (this)
        {
          this.wait(1000);
          // save the RUV
          state.save();
          if (!disabled )
          {
            // save the RUV
            state.save();
          }
        }
      } catch (InterruptedException e)
      { }
@@ -1123,6 +1120,22 @@
  }
  /**
   * create the threads that will wait for incoming changes.
   * TODO : should use a pool of threads shared between all the servers
   * TODO : need to make number of thread configurable
   */
  private void createListeners()
  {
    synchroThreads.clear();
    for (int i=0; i<listenerThreadNumber; i++)
    {
      ListenerThread myThread = new ListenerThread(this);
      myThread.start();
      synchroThreads.add(myThread);
    }
  }
  /**
   * Shutdown this SynchronizationDomain.
   */
  public void shutdown()
@@ -1854,4 +1867,70 @@
  {
    return solveConflictFlag;
  }
  /**
   * Disable the Synchronization on this domain.
   * The session to the Synchronization server will be stopped.
   * The domain will not be destroyed but call to the pre-operation
   * methods will result in failure.
   * The listener threads will be destroyed.
   * The monitor informations will still be accessible.
   */
  public void disable()
  {
    state.save();
    state.clear();
    disabled = true;
    //  stop the listener threads
    for (ListenerThread thread : synchroThreads)
    {
      thread.shutdown();
    }
    broker.stop(); // this will cut the session and wake-up the listeners
  }
  /**
   * Enable back the domain after a previous disable.
   * The domain will connect back to a Synchronization Server and
   * will recreate threads to listen for messages from the Sycnhronization
   * server.
   * The ServerState will also be read again from the local database.
   */
  public void enable()
  {
    state.clear();
    state.loadState();
    disabled = false;
    try
    {
      broker.start(changelogServers);
    } catch (Exception e)
    {
      /* TODO should mark that changelog service is
       * not available, log an error and retry upon timeout
       * should we stop the modifications ?
       */
      e.printStackTrace();
      return;
    }
    createListeners();
  }
  /**
   * Do whatever is needed when a backup is started.
   * We need to make sure that the serverState is correclty save.
   */
  public void backupStart()
  {
    state.save();
  }
  /**
   * Do whatever is needed when a backup is finished.
   */
  public void backupEnd()
  {
    // Nothing is needed at the moment
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ReSyncTest.java
New file
@@ -0,0 +1,335 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.io.File;
import java.net.ServerSocket;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Test re-synchronization after after backup/restore and LDIF import.
 */
public class ReSyncTest extends SynchronizationTestCase
{
 /**
  * Set up the environment for performing the tests in this Class.
  *
  * @throws Exception
  *           If the environment could not be set up.
  */
 @BeforeClass
  public void setup() throws Exception
  {
   /*
    * - Start a server and a changelog server, configure synchronization
    * - Do some changes.
    */
    TestCaseUtils.startServer();
    // find  a free port for the changelog server
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    socket.close();
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
    //  Create backend top level entries
    addEntry("dn: dc=example,dc=com\n" + "objectClass: top\n"
        + "objectClass: domain\n");
    // top level synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Multimaster Synchro plugin
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
        + synchroStringDN;
    String synchroPluginLdif = "dn: "
        + synchroPluginStringDN
        + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider\n"
        + "ds-cfg-synchronization-provider-enabled: true\n"
        + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
    synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n"
        + "ds-cfg-changelog-port:" + changelogPort + "\n"
        + "ds-cfg-changelog-server-id: 1\n";
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    // suffix synchronized
    String synchroServerLdif = "dn: cn=example, " + synchroPluginStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider-config\n"
        + "cn: example\n"
        + "ds-cfg-synchronization-dn: dc=example,dc=com\n"
        + "ds-cfg-changelog-server: localhost:"+ changelogPort + "\n"
        + "ds-cfg-directory-server-id: 1\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    configureSynchronization();
    // Create a dummy entry
    addEntry("dn: dc=dummy, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
  }
  /**
   * Utility function. Can be used to create and add and entry
   * in the local DS from its ldif description.
   *
   * @param entryString  The entry in ldif from.
   * @return             The ResultCode of the operation.
   * @throws Exception   If something went wrong.
   */
  private ResultCode addEntry(String entryString) throws Exception
  {
    Entry entry;
    AddOperation addOp;
    entry = TestCaseUtils.entryFromLdifString(entryString);
    addOp = new AddOperation(connection,
       InternalClientConnection.nextOperationID(), InternalClientConnection
       .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
       entry.getUserAttributes(), entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    entryList.add(entry.getDN());
    return addOp.getResultCode();
  }
  /**
   * Test re-synchronization after after backup/restore
   */
  @Test()
  public void testResyncAfterRestore() throws Exception
  {
    /*
     * - Backup the server
     * - ADD an entry
     * - Restore the backup taken previously
     * - Check that entry has been added again in the LDAP server.
     */
    // Delete the entry we are going to use to make sure that
    // we do test something.
    connection.processDelete(DN.decode("dc=foo, dc=example,dc=com"));
    MultimasterSynchronization.notificationBackupStart(
        DN.decode("dc=example,dc=com")); // TEMPORARY UNTIL core server
                                        // sends a notification
    task("dn: ds-task-id=" + UUID.randomUUID()
        +  ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-backup\n"
        + "ds-task-class-name: org.opends.server.tasks.BackupTask\n"
        + "ds-backup-directory-path: bak\n"
        + "ds-task-backup-all: TRUE\n");
    addEntry("dn: dc=foo, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
    MultimasterSynchronization.notificationRestoreStart(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       //  sends a notification
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-restore\n"
        + "ds-task-class-name: org.opends.server.tasks.RestoreTask\n"
        + "ds-backup-directory-path: bak" + File.separator
        + "userRoot\n");
    MultimasterSynchronization.notificationRestoreEnd(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       // sends a notification
   if (getEntry(DN.decode("dc=foo, dc=example,dc=com"), 30000, true) == null)
     fail("The Directory has not been resynchronized after the restore.");
   connection.processDelete(DN.decode("dc=foo, dc=example,dc=com"));
  }
  /**
   * Test re-synchronization after after backup/restore
   */
  @Test()
  public void testResyncAfterImport() throws Exception
  {
    /*
     * - Do an export to a LDIF file
     * - Add an entry
     * - Import LDIF file generated above.
     * - Check that entry has been added again in the LDAP server.
     */
    // delete the entry we are going to use to make sure that
    // we do test something.
    connection.processDelete(DN.decode("dc=foo, dc=example,dc=com"));
    MultimasterSynchronization.notificationBackupStart(
        DN.decode("dc=example,dc=com")); // TEMPORARY UNTIL core server
                                        // sends a notification
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = buildRoot + File.separator + "ReSynchTest";
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-export\n"
        + "ds-task-class-name: org.opends.server.tasks.ExportTask\n"
        + "ds-task-export-backend-id: userRoot\n"
        + "ds-task-export-ldif-file: " + path + "\n");
    addEntry("dn: dc=foo, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
    MultimasterSynchronization.notificationRestoreStart(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       //  sends a notification
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
        + "objectclass: ds-task\n"
        + "objectclass: ds-task-import\n"
        + "ds-task-class-name: org.opends.server.tasks.ImportTask\n"
        + "ds-task-import-backend-id: userRoot\n"
        + "ds-task-import-ldif-file: " + path + "\n"
        + "ds-task-import-reject-file: " + path + "reject\n");
    MultimasterSynchronization.notificationRestoreEnd(
        DN.decode("dc=example,dc=com"));// TEMPORARY UNTIL core server
                                       //  sends a notification
   if (getEntry(DN.decode("dc=foo, dc=example,dc=com"), 30000, true) == null)
     fail("The Directory has not been resynchronized after the restore.");
  }
  /**
   * Utility method to create, run a task and check its result.
   */
  private void task(String task) throws Exception
  {
    Entry taskEntry = TestCaseUtils.makeEntry(task);
    InternalClientConnection connection =
         InternalClientConnection.getRootConnection();
    // Add the task.
    AddOperation addOperation =
         connection.processAdd(taskEntry.getDN(),
                               taskEntry.getObjectClasses(),
                               taskEntry.getUserAttributes(),
                               taskEntry.getOperationalAttributes());
    assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
                 "Add of the task definition was not successful");
    // Wait until the task completes.
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
         ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    String completionTime = null;
    long startMillisecs = System.currentTimeMillis();
    do
    {
      InternalSearchOperation searchOperation =
           connection.processSearch(taskEntry.getDN(),
                                    SearchScope.BASE_OBJECT,
                                    filter);
      try
      {
        resultEntry = searchOperation.getSearchEntries().getFirst();
      } catch (Exception e)
      {
        continue;
      }
      completionTime =
           resultEntry.getAttributeValue(completionTimeType,
                                         DirectoryStringSyntax.DECODER);
      if (completionTime == null)
      {
        if (System.currentTimeMillis() - startMillisecs > 1000*30)
        {
          break;
        }
        Thread.sleep(10);
      }
    } while (completionTime == null);
    if (completionTime == null)
    {
      fail("The task has not completed after 30 seconds.");
    }
    // Check that the task state is as expected.
    AttributeType taskStateType =
         DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
    String stateString =
         resultEntry.getAttributeValue(taskStateType,
                                       DirectoryStringSyntax.DECODER);
    TaskState taskState = TaskState.fromString(stateString);
    assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
                 "The task completed in an unexpected state");
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java
@@ -77,10 +77,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -394,4 +394,52 @@
    return found;
  }
  /**
   * Retrieves an entry from the local Directory Server.
   * @throws Exception When the entry cannot be locked.
   */
  protected Entry getEntry(DN dn, int timeout, boolean exist) throws Exception
  {
    int count = timeout/200;
    if (count<1)
      count=1;
    Thread.sleep(50);
    boolean found = DirectoryServer.entryExists(dn);
    while ((count> 0) && (found != exist))
    {
      Thread.sleep(200);
      found = DirectoryServer.entryExists(dn);
      count--;
    }
    Lock lock = null;
    for (int i=0; i < 3; i++)
    {
      lock = LockManager.lockRead(dn);
      if (lock != null)
      {
        break;
      }
    }
    if (lock == null)
    {
      throw new Exception("could not lock entry " + dn);
    }
    try
    {
      Entry entry = DirectoryServer.getEntry(dn);
      if (entry == null)
        return null;
      else
        return entry.duplicate();
    }
    finally
    {
      LockManager.unlock(dn, lock);
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -1079,54 +1079,6 @@
  }
  /**
   * Retrieves an entry from the local Directory Server.
   * @throws Exception When the entry cannot be locked.
   */
  private Entry getEntry(DN dn, int timeout, boolean exist)
               throws Exception
  {
    int count = timeout/200;
    if (count<1)
      count=1;
    boolean found = DirectoryServer.entryExists(dn);
    while ((count> 0) && (found != exist))
    {
      Thread.sleep(200);
      found = DirectoryServer.entryExists(dn);
      count--;
    }
    Lock lock = null;
    for (int i=0; i < 3; i++)
    {
      lock = LockManager.lockRead(dn);
      if (lock != null)
      {
        break;
      }
    }
    if (lock == null)
    {
      throw new Exception("could not lock entry " + dn);
    }
    try
    {
      Entry entry = DirectoryServer.getEntry(dn);
      if (entry == null)
        return null;
      else
        return entry.duplicate();
    }
    finally
    {
      LockManager.unlock(dn, lock);
    }
  }
  /**
   * Test case for
   * [Issue 635] NullPointerException when trying to access non existing entry.
   */