From c8816d7020a4bf6dc3cd5b116b40a94bf7acabc3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 29 Jan 2007 16:14:16 +0000
Subject: [PATCH] Fix #794 unit test should cover changelog to changelog communications

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java                             |   70 ++++++++---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java                              |   16 ++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java |  231 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 295 insertions(+), 22 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 0857df9..4761d06 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -26,13 +26,23 @@
  */
 package org.opends.server.synchronization.changelog;
 
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.util.StaticUtils.getFileForPath;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 
-import com.sleepycat.je.DatabaseException;
-
 import org.opends.server.api.ConfigurableComponent;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.config.ConfigAttribute;
@@ -48,17 +58,7 @@
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
 
-import static org.opends.server.loggers.Error.logError;
-import static org.opends.server.messages.MessageHandler.getMessage;
-import static org.opends.server.synchronization.common.LogMessages.*;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.io.File;
-import java.io.IOException;
+import com.sleepycat.je.DatabaseException;
 
 /**
  * Changelog Listener.
@@ -99,12 +99,14 @@
   private ChangelogDbEnv dbEnv;
   private int rcvWindow;
   private int queueSize;
+  private String dbDirname = null;
 
   static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
   static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
   static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
   static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
   static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
+  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
 
   static final IntegerConfigAttribute changelogPortStub =
     new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -128,6 +130,11 @@
     new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
                                false, false, false, true, 0, false, 0);
 
+  static final StringConfigAttribute dbDirnameStub =
+    new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
+        "changelog storage directory path", false,
+        false, true);
+
   /**
    * Check if a ConfigEntry is valid.
    * @param config The config entry that needs to be checked.
@@ -263,6 +270,35 @@
       configAttributes.add(queueSizeAttr);
     }
 
+    /*
+     * read the storage directory path attribute
+     */
+    StringConfigAttribute dbDirnameAttr =
+      (StringConfigAttribute) config.getConfigAttribute(dbDirnameStub);
+    if (dbDirnameAttr == null)
+    {
+      dbDirname = "changelogDb";
+    }
+    else
+    {
+      dbDirname = dbDirnameAttr.activeValue();
+      configAttributes.add(changelogServer);
+    }
+    // Exists or Create
+    File f = getFileForPath(dbDirname);
+    try
+    {
+      if (!f.exists())
+      {
+        f.mkdir();
+      }
+    }
+    catch (Exception e)
+    {
+      throw new ConfigException(MSGID_FILE_CHECK_CREATE_FAILED,
+          e.getMessage() + " " + getFileForPath(dbDirname));
+    }
+
     initialize(changelogServerId, changelogPort);
 
     configDn = config.getDN();
@@ -442,10 +478,8 @@
     {
       /*
        * Initialize the changelog database.
-       * TODO : the changelog db path should be configurable
        */
-      dbEnv = new ChangelogDbEnv(
-          DirectoryServer.getServerRoot() + File.separator + "changelogDb",
+      dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
           this);
 
       /*
@@ -475,13 +509,13 @@
     } catch (DatabaseException e)
     {
       int msgID = MSGID_COULD_NOT_INITIALIZE_DB;
-      String message = getMessage(msgID, "changelogDb");
+      String message = getMessage(msgID, dbDirname);
       logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
                message, msgID);
     } catch (ChangelogDBException e)
     {
       int msgID = MSGID_COULD_NOT_READ_DB;
-      String message = getMessage(msgID, "changelogDb");
+      String message = getMessage(msgID, dbDirname);
       message += getMessage(e.getMessageID());
       logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
                message, msgID);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
index 329dfbc..b592b88 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.synchronization.common;
 
@@ -265,6 +265,15 @@
     CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 35;
 
   /**
+   * Failure when test existence or try to create directory
+   * for the changelog database.  This message takes one
+   * string argument containing details of the exception
+   * and path of the directory.
+   */
+  public static final int MSGID_FILE_CHECK_CREATE_FAILED =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 36;
+
+  /**
    * Register the messages from this class in the core server.
    *
    */
@@ -354,6 +363,9 @@
         MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE,
         "An Exception was caught while receiving synchronization message : %s");
     MessageHandler.registerMessage(MSGID_LOOP_REPLAYING_OPERATION,
-         "A loop was detected while replaying operation: %s");
+        "A loop was detected while replaying operation: %s");
+    MessageHandler.registerMessage(MSGID_FILE_CHECK_CREATE_FAILED,
+        "An Exception was caught while testing existence or trying " +
+        " to create the directory for the changelog database : %s");
   }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index cd1f2c6..9639cea 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,25 +26,38 @@
  */
 package org.opends.server.synchronization.changelog;
 
+import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.opends.server.TestCaseUtils;
 import org.opends.server.config.ConfigEntry;
-import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ModifyDNOperation;
 import org.opends.server.synchronization.SynchronizationTestCase;
 import org.opends.server.synchronization.common.ChangeNumber;
 import org.opends.server.synchronization.common.ChangeNumberGenerator;
 import org.opends.server.synchronization.common.ServerState;
 import org.opends.server.synchronization.plugin.ChangelogBroker;
+import org.opends.server.synchronization.protocol.AddMsg;
 import org.opends.server.synchronization.protocol.DeleteMsg;
+import org.opends.server.synchronization.protocol.ModifyDNMsg;
+import org.opends.server.synchronization.protocol.ModifyDnContext;
+import org.opends.server.synchronization.protocol.ModifyMsg;
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.types.Attribute;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.RDN;
 import org.opends.server.util.TimeThread;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static org.testng.Assert.*;
 
 /**
  * Tests for the changelog service code.
@@ -447,4 +460,218 @@
       }
     }
   }
+  
+  /**
+   * Chaining tests of the changelog code with 2 changelog servers involved
+   * 2 tests are done here (itest=0 or itest=1)
+   * 
+   * Test 1
+   * - Create changelog server 1
+   * - Create changelog server 2 connected with changelog server 1
+   * - Create and connect client 1 to changelog server 1
+   * - Create and connect client 2 to changelog server 2
+   * - Make client1 publish changes
+   * - Check that client 2 receives the changes published by client 1
+   * 
+   * Test 2
+   * - Create changelog server 1
+   * - Create and connect client1 to changelog server 1
+   * - Make client1 publish changes
+   * - Create changelog server 2 connected with changelog server 1
+   * - Create and connect client 2 to changelog server 2
+   * - Check that client 2 receives the changes published by client 1
+   * 
+   */
+  @Test(enabled=true)
+  public void changelogChaining() throws Exception
+  {
+    for (int itest = 0; itest <2; itest++)
+    {
+      ChangelogBroker broker2 = null;
+      boolean emptyOldChanges = true;
+
+      // - Create 2 connected changelog servers
+      Changelog[] changelogs = new Changelog[2];
+      int[] changelogPorts = new int[2];
+      int[] changelogIds = new int[2];
+      short[] brokerIds = new short[2];
+      ServerSocket socket = null;
+
+      // Find 2 free ports
+      for (int i = 0; i <= 1; i++)
+      {
+        // find  a free port
+        socket = TestCaseUtils.bindFreePort();
+        changelogPorts[i] = socket.getLocalPort();
+        changelogIds[i] = i + 10;
+        brokerIds[i] = (short) (100+i);
+        if ((itest==0) || (i ==0))
+          socket.close();
+      }
+
+      for (int i = 0; i <= ((itest == 0) ? 1 : 0); i++)
+      {
+        changelogs[i] = null;
+
+        // for itest=0, create the 2 connected changelog servers
+        // for itest=1, create the 1rst changelog server, the second
+        // one will be created later
+
+        String changelogLdif = "dn: cn=Changelog Server\n"
+          + "objectClass: top\n"
+          + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+          + "cn: Changelog Server\n" 
+          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n" 
+          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
+          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
+          + "ds-cfg-window-size: 100" + "\n"
+          + "ds-cfg-changelog-db-dirname: changelogDb"+i;
+        Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
+        ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
+        changelogs[i] = new Changelog(changelogConfig);
+      }
+
+      ChangelogBroker broker1 = null;
+
+      try
+      {
+        // For itest=0, create and connect client1 to changelog1 
+        //              and client2 to changelog2
+        // For itest=1, only create and connect client1 to changelog1 
+        //              client2 will be created later
+        broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
+            (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
+
+        if (itest == 0)
+        {
+          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+              (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
+        }
+
+        // - Test messages between clients by publishing now
+
+        // - Delete
+        long time = TimeThread.getTime();
+        int ts = 1;
+        ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+
+        DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
+        broker1.publish(delMsg);
+
+        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
+        String baseUUID = "22222222-2222-2222-2222-222222222222";
+
+        // - Add
+        String lentry = new String("dn: dc=example,dc=com\n"
+            + "objectClass: top\n" + "objectClass: domain\n"
+            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
+        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+        AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
+            user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
+            .getAttributes(), new ArrayList<Attribute>());
+        broker1.publish(addMsg);
+
+        // - Modify
+        Attribute attr1 = new Attribute("description", "new value");
+        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
+        List<Modification> mods = new ArrayList<Modification>();
+        mods.add(mod1);
+        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+        ModifyMsg modMsg = new ModifyMsg(cn, DN
+            .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
+        broker1.publish(modMsg);
+
+        // - ModifyDN
+        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+        ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
+            .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
+            null);
+        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(cn, "uniqueid",
+        "newparentId"));
+        ModifyDNMsg modDNMsg = new ModifyDNMsg(op);
+        broker1.publish(modDNMsg);
+
+        if (itest > 0)
+        {
+          socket.close();
+          String changelogLdif = "dn: cn=Changelog Server\n"
+            + "objectClass: top\n"
+            + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+            + "cn: Changelog Server\n" 
+            + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
+            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n" 
+            + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
+          Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
+          ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
+          changelogs[1] = new Changelog(changelogConfig);
+
+          // Connect broker 2 to changelog2
+          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+              (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
+        }
+
+        // - Check msg receives by broker, through changeLog2
+        while (ts > 1)
+        {
+          SynchronizationMessage msg2;
+          try
+          {
+            msg2 = broker2.receive();
+            if (msg2 == null)
+              break;
+          }
+          catch (Exception e)
+          {
+            fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest);
+            break;
+          }
+
+          if (msg2 instanceof DeleteMsg)
+          {
+            DeleteMsg delMsg2 = (DeleteMsg) msg2;
+            if (delMsg2.toString().equals(delMsg.toString()))
+              ts--;
+          }
+          else if (msg2 instanceof AddMsg)
+          {
+            AddMsg addMsg2 = (AddMsg) msg2;
+            if (addMsg2.toString().equals(addMsg.toString()))
+              ts--;
+          }
+          else if (msg2 instanceof ModifyMsg)
+          {
+            ModifyMsg modMsg2 = (ModifyMsg) msg2;
+            if (modMsg.equals(modMsg2))
+              ts--;
+          }
+          else if (msg2 instanceof ModifyDNMsg)
+          {
+            ModifyDNMsg modDNMsg2 = (ModifyDNMsg) msg2;
+            if (modDNMsg.equals(modDNMsg2))
+              ts--;
+          }
+          else
+          {
+            fail("Changelog transmission failed: no expected message class.");
+            break;
+          }
+        }
+        // Check that everything expected has been received
+        assertTrue(ts == 1, "Broker2 did not receive the complete set of" 
+            + " expected messages: #msg received " + ts);
+      }
+      finally
+      {
+        if (changelogs[0] != null)
+          changelogs[0].shutdown();
+        if (changelogs[1] != null)
+          changelogs[1].shutdown();
+        if (broker1 != null)
+          broker1.stop();
+        if (broker2 != null)
+          broker2.stop();
+      }
+    }
+  }
 }

--
Gitblit v1.10.0