mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
29.14.2007 c8816d7020a4bf6dc3cd5b116b40a94bf7acabc3
Fix #794 unit test should cover changelog to changelog communications

This does not allow to cover the code that deals with changelog server to
changelog server communications.

In order to have more than one changelog services running in the same instance
and instance layout, the serverID has been added to the changelogstate dbName.

Changelog db path is now configurable,

The 2 following tests have been implemented:

+ /**
+ * Chaining tests of the changelog code with 2 changelog servers involved
+ * 2 tests are done here (itest=0 or itest=1)
+ *
+ * Test 1
+ * - Create changelog server 1
+ * - Create changelog server 2 connected with changelog server 1
+ * - Create and connect client 1 to changelog server 1
+ * - Create and connect client 2 to changelog server 2
+ * - Make client1 publish changes
+ * - Check that client 2 receives the changes published by client 1
+ *
+ * Test 2
+ * - Create changelog server 1
+ * - Create and connect client1 to changelog server 1
+ * - Make client1 publish changes
+ * - Create changelog server 2 connected with changelog server 1
+ * - Create and connect client 2 to changelog server 2
+ * - Check that client 2 receives the changes published by client 1
+ *
+ */
3 files modified
317 ■■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java 70 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java 16 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java 231 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -26,13 +26,23 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import com.sleepycat.je.DatabaseException;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigAttribute;
@@ -48,17 +58,7 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.io.File;
import java.io.IOException;
import com.sleepycat.je.DatabaseException;
/**
 * Changelog Listener.
@@ -99,12 +99,14 @@
  private ChangelogDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
  private String dbDirname = null;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
  static final IntegerConfigAttribute changelogPortStub =
    new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -128,6 +130,11 @@
    new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
                               false, false, false, true, 0, false, 0);
  static final StringConfigAttribute dbDirnameStub =
    new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
        "changelog storage directory path", false,
        false, true);
  /**
   * Check if a ConfigEntry is valid.
   * @param config The config entry that needs to be checked.
@@ -263,6 +270,35 @@
      configAttributes.add(queueSizeAttr);
    }
    /*
     * read the storage directory path attribute
     */
    StringConfigAttribute dbDirnameAttr =
      (StringConfigAttribute) config.getConfigAttribute(dbDirnameStub);
    if (dbDirnameAttr == null)
    {
      dbDirname = "changelogDb";
    }
    else
    {
      dbDirname = dbDirnameAttr.activeValue();
      configAttributes.add(changelogServer);
    }
    // Exists or Create
    File f = getFileForPath(dbDirname);
    try
    {
      if (!f.exists())
      {
        f.mkdir();
      }
    }
    catch (Exception e)
    {
      throw new ConfigException(MSGID_FILE_CHECK_CREATE_FAILED,
          e.getMessage() + " " + getFileForPath(dbDirname));
    }
    initialize(changelogServerId, changelogPort);
    configDn = config.getDN();
@@ -442,10 +478,8 @@
    {
      /*
       * Initialize the changelog database.
       * TODO : the changelog db path should be configurable
       */
      dbEnv = new ChangelogDbEnv(
          DirectoryServer.getServerRoot() + File.separator + "changelogDb",
      dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
          this);
      /*
@@ -475,13 +509,13 @@
    } catch (DatabaseException e)
    {
      int msgID = MSGID_COULD_NOT_INITIALIZE_DB;
      String message = getMessage(msgID, "changelogDb");
      String message = getMessage(msgID, dbDirname);
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
    } catch (ChangelogDBException e)
    {
      int msgID = MSGID_COULD_NOT_READ_DB;
      String message = getMessage(msgID, "changelogDb");
      String message = getMessage(msgID, dbDirname);
      message += getMessage(e.getMessageID());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.common;
@@ -265,6 +265,15 @@
    CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 35;
  /**
   * Failure when test existence or try to create directory
   * for the changelog database.  This message takes one
   * string argument containing details of the exception
   * and path of the directory.
   */
  public static final int MSGID_FILE_CHECK_CREATE_FAILED =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 36;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -354,6 +363,9 @@
        MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE,
        "An Exception was caught while receiving synchronization message : %s");
    MessageHandler.registerMessage(MSGID_LOOP_REPLAYING_OPERATION,
         "A loop was detected while replaying operation: %s");
        "A loop was detected while replaying operation: %s");
    MessageHandler.registerMessage(MSGID_FILE_CHECK_CREATE_FAILED,
        "An Exception was caught while testing existence or trying " +
        " to create the directory for the changelog database : %s");
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,25 +26,38 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.config.ConfigEntry;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.synchronization.SynchronizationTestCase;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.protocol.AddMsg;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.ModifyMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Tests for the changelog service code.
@@ -447,4 +460,218 @@
      }
    }
  }
  /**
   * Chaining tests of the changelog code with 2 changelog servers involved
   * 2 tests are done here (itest=0 or itest=1)
   *
   * Test 1
   * - Create changelog server 1
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 1 to changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Make client1 publish changes
   * - Check that client 2 receives the changes published by client 1
   *
   * Test 2
   * - Create changelog server 1
   * - Create and connect client1 to changelog server 1
   * - Make client1 publish changes
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Check that client 2 receives the changes published by client 1
   *
   */
  @Test(enabled=true)
  public void changelogChaining() throws Exception
  {
    for (int itest = 0; itest <2; itest++)
    {
      ChangelogBroker broker2 = null;
      boolean emptyOldChanges = true;
      // - Create 2 connected changelog servers
      Changelog[] changelogs = new Changelog[2];
      int[] changelogPorts = new int[2];
      int[] changelogIds = new int[2];
      short[] brokerIds = new short[2];
      ServerSocket socket = null;
      // Find 2 free ports
      for (int i = 0; i <= 1; i++)
      {
        // find  a free port
        socket = TestCaseUtils.bindFreePort();
        changelogPorts[i] = socket.getLocalPort();
        changelogIds[i] = i + 10;
        brokerIds[i] = (short) (100+i);
        if ((itest==0) || (i ==0))
          socket.close();
      }
      for (int i = 0; i <= ((itest == 0) ? 1 : 0); i++)
      {
        changelogs[i] = null;
        // for itest=0, create the 2 connected changelog servers
        // for itest=1, create the 1rst changelog server, the second
        // one will be created later
        String changelogLdif = "dn: cn=Changelog Server\n"
          + "objectClass: top\n"
          + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
          + "cn: Changelog Server\n"
          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
          + "ds-cfg-window-size: 100" + "\n"
          + "ds-cfg-changelog-db-dirname: changelogDb"+i;
        Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
        ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
        changelogs[i] = new Changelog(changelogConfig);
      }
      ChangelogBroker broker1 = null;
      try
      {
        // For itest=0, create and connect client1 to changelog1
        //              and client2 to changelog2
        // For itest=1, only create and connect client1 to changelog1
        //              client2 will be created later
        broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
            (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
        if (itest == 0)
        {
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
        }
        // - Test messages between clients by publishing now
        // - Delete
        long time = TimeThread.getTime();
        int ts = 1;
        ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
        broker1.publish(delMsg);
        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
        String baseUUID = "22222222-2222-2222-2222-222222222222";
        // - Add
        String lentry = new String("dn: dc=example,dc=com\n"
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
            .getAttributes(), new ArrayList<Attribute>());
        broker1.publish(addMsg);
        // - Modify
        Attribute attr1 = new Attribute("description", "new value");
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        ModifyMsg modMsg = new ModifyMsg(cn, DN
            .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
        broker1.publish(modMsg);
        // - ModifyDN
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
            .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
            null);
        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(cn, "uniqueid",
        "newparentId"));
        ModifyDNMsg modDNMsg = new ModifyDNMsg(op);
        broker1.publish(modDNMsg);
        if (itest > 0)
        {
          socket.close();
          String changelogLdif = "dn: cn=Changelog Server\n"
            + "objectClass: top\n"
            + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
            + "cn: Changelog Server\n"
            + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
            + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
          Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
          ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
          changelogs[1] = new Changelog(changelogConfig);
          // Connect broker 2 to changelog2
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
        }
        // - Check msg receives by broker, through changeLog2
        while (ts > 1)
        {
          SynchronizationMessage msg2;
          try
          {
            msg2 = broker2.receive();
            if (msg2 == null)
              break;
          }
          catch (Exception e)
          {
            fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest);
            break;
          }
          if (msg2 instanceof DeleteMsg)
          {
            DeleteMsg delMsg2 = (DeleteMsg) msg2;
            if (delMsg2.toString().equals(delMsg.toString()))
              ts--;
          }
          else if (msg2 instanceof AddMsg)
          {
            AddMsg addMsg2 = (AddMsg) msg2;
            if (addMsg2.toString().equals(addMsg.toString()))
              ts--;
          }
          else if (msg2 instanceof ModifyMsg)
          {
            ModifyMsg modMsg2 = (ModifyMsg) msg2;
            if (modMsg.equals(modMsg2))
              ts--;
          }
          else if (msg2 instanceof ModifyDNMsg)
          {
            ModifyDNMsg modDNMsg2 = (ModifyDNMsg) msg2;
            if (modDNMsg.equals(modDNMsg2))
              ts--;
          }
          else
          {
            fail("Changelog transmission failed: no expected message class.");
            break;
          }
        }
        // Check that everything expected has been received
        assertTrue(ts == 1, "Broker2 did not receive the complete set of"
            + " expected messages: #msg received " + ts);
      }
      finally
      {
        if (changelogs[0] != null)
          changelogs[0].shutdown();
        if (changelogs[1] != null)
          changelogs[1].shutdown();
        if (broker1 != null)
          broker1.stop();
        if (broker2 != null)
          broker2.stop();
      }
    }
  }
}