From c8816d7020a4bf6dc3cd5b116b40a94bf7acabc3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 29 Jan 2007 16:14:16 +0000
Subject: [PATCH] Fix #794 unit test should cover changelog to changelog communications
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 70 ++++++++---
opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java | 16 ++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java | 231 ++++++++++++++++++++++++++++++++++++++
3 files changed, 295 insertions(+), 22 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 0857df9..4761d06 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -26,13 +26,23 @@
*/
package org.opends.server.synchronization.changelog;
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.util.StaticUtils.getFileForPath;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
-import com.sleepycat.je.DatabaseException;
-
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigAttribute;
@@ -48,17 +58,7 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
-import static org.opends.server.loggers.Error.logError;
-import static org.opends.server.messages.MessageHandler.getMessage;
-import static org.opends.server.synchronization.common.LogMessages.*;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.io.File;
-import java.io.IOException;
+import com.sleepycat.je.DatabaseException;
/**
* Changelog Listener.
@@ -99,12 +99,14 @@
private ChangelogDbEnv dbEnv;
private int rcvWindow;
private int queueSize;
+ private String dbDirname = null;
static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
+ static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
static final IntegerConfigAttribute changelogPortStub =
new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -128,6 +130,11 @@
new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
false, false, false, true, 0, false, 0);
+ static final StringConfigAttribute dbDirnameStub =
+ new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
+ "changelog storage directory path", false,
+ false, true);
+
/**
* Check if a ConfigEntry is valid.
* @param config The config entry that needs to be checked.
@@ -263,6 +270,35 @@
configAttributes.add(queueSizeAttr);
}
+ /*
+ * read the storage directory path attribute
+ */
+ StringConfigAttribute dbDirnameAttr =
+ (StringConfigAttribute) config.getConfigAttribute(dbDirnameStub);
+ if (dbDirnameAttr == null)
+ {
+ dbDirname = "changelogDb";
+ }
+ else
+ {
+ dbDirname = dbDirnameAttr.activeValue();
+ configAttributes.add(changelogServer);
+ }
+ // Exists or Create
+ File f = getFileForPath(dbDirname);
+ try
+ {
+ if (!f.exists())
+ {
+ f.mkdir();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new ConfigException(MSGID_FILE_CHECK_CREATE_FAILED,
+ e.getMessage() + " " + getFileForPath(dbDirname));
+ }
+
initialize(changelogServerId, changelogPort);
configDn = config.getDN();
@@ -442,10 +478,8 @@
{
/*
* Initialize the changelog database.
- * TODO : the changelog db path should be configurable
*/
- dbEnv = new ChangelogDbEnv(
- DirectoryServer.getServerRoot() + File.separator + "changelogDb",
+ dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
this);
/*
@@ -475,13 +509,13 @@
} catch (DatabaseException e)
{
int msgID = MSGID_COULD_NOT_INITIALIZE_DB;
- String message = getMessage(msgID, "changelogDb");
+ String message = getMessage(msgID, dbDirname);
logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
} catch (ChangelogDBException e)
{
int msgID = MSGID_COULD_NOT_READ_DB;
- String message = getMessage(msgID, "changelogDb");
+ String message = getMessage(msgID, dbDirname);
message += getMessage(e.getMessageID());
logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
index 329dfbc..b592b88 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.synchronization.common;
@@ -265,6 +265,15 @@
CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 35;
/**
+ * Failure when test existence or try to create directory
+ * for the changelog database. This message takes one
+ * string argument containing details of the exception
+ * and path of the directory.
+ */
+ public static final int MSGID_FILE_CHECK_CREATE_FAILED =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 36;
+
+ /**
* Register the messages from this class in the core server.
*
*/
@@ -354,6 +363,9 @@
MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE,
"An Exception was caught while receiving synchronization message : %s");
MessageHandler.registerMessage(MSGID_LOOP_REPLAYING_OPERATION,
- "A loop was detected while replaying operation: %s");
+ "A loop was detected while replaying operation: %s");
+ MessageHandler.registerMessage(MSGID_FILE_CHECK_CREATE_FAILED,
+ "An Exception was caught while testing existence or trying " +
+ " to create the directory for the changelog database : %s");
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index cd1f2c6..9639cea 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,25 +26,38 @@
*/
package org.opends.server.synchronization.changelog;
+import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.config.ConfigEntry;
-import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ModifyDNOperation;
import org.opends.server.synchronization.SynchronizationTestCase;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
+import org.opends.server.synchronization.protocol.AddMsg;
import org.opends.server.synchronization.protocol.DeleteMsg;
+import org.opends.server.synchronization.protocol.ModifyDNMsg;
+import org.opends.server.synchronization.protocol.ModifyDnContext;
+import org.opends.server.synchronization.protocol.ModifyMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
/**
* Tests for the changelog service code.
@@ -447,4 +460,218 @@
}
}
}
+
+ /**
+ * Chaining tests of the changelog code with 2 changelog servers involved
+ * 2 tests are done here (itest=0 or itest=1)
+ *
+ * Test 1
+ * - Create changelog server 1
+ * - Create changelog server 2 connected with changelog server 1
+ * - Create and connect client 1 to changelog server 1
+ * - Create and connect client 2 to changelog server 2
+ * - Make client1 publish changes
+ * - Check that client 2 receives the changes published by client 1
+ *
+ * Test 2
+ * - Create changelog server 1
+ * - Create and connect client1 to changelog server 1
+ * - Make client1 publish changes
+ * - Create changelog server 2 connected with changelog server 1
+ * - Create and connect client 2 to changelog server 2
+ * - Check that client 2 receives the changes published by client 1
+ *
+ */
+ @Test(enabled=true)
+ public void changelogChaining() throws Exception
+ {
+ for (int itest = 0; itest <2; itest++)
+ {
+ ChangelogBroker broker2 = null;
+ boolean emptyOldChanges = true;
+
+ // - Create 2 connected changelog servers
+ Changelog[] changelogs = new Changelog[2];
+ int[] changelogPorts = new int[2];
+ int[] changelogIds = new int[2];
+ short[] brokerIds = new short[2];
+ ServerSocket socket = null;
+
+ // Find 2 free ports
+ for (int i = 0; i <= 1; i++)
+ {
+ // find a free port
+ socket = TestCaseUtils.bindFreePort();
+ changelogPorts[i] = socket.getLocalPort();
+ changelogIds[i] = i + 10;
+ brokerIds[i] = (short) (100+i);
+ if ((itest==0) || (i ==0))
+ socket.close();
+ }
+
+ for (int i = 0; i <= ((itest == 0) ? 1 : 0); i++)
+ {
+ changelogs[i] = null;
+
+ // for itest=0, create the 2 connected changelog servers
+ // for itest=1, create the 1rst changelog server, the second
+ // one will be created later
+
+ String changelogLdif = "dn: cn=Changelog Server\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n"
+ + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
+ + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
+ + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
+ + "ds-cfg-window-size: 100" + "\n"
+ + "ds-cfg-changelog-db-dirname: changelogDb"+i;
+ Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
+ ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
+ changelogs[i] = new Changelog(changelogConfig);
+ }
+
+ ChangelogBroker broker1 = null;
+
+ try
+ {
+ // For itest=0, create and connect client1 to changelog1
+ // and client2 to changelog2
+ // For itest=1, only create and connect client1 to changelog1
+ // client2 will be created later
+ broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
+
+ if (itest == 0)
+ {
+ broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
+ }
+
+ // - Test messages between clients by publishing now
+
+ // - Delete
+ long time = TimeThread.getTime();
+ int ts = 1;
+ ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+
+ DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
+ broker1.publish(delMsg);
+
+ String user1entryUUID = "33333333-3333-3333-3333-333333333333";
+ String baseUUID = "22222222-2222-2222-2222-222222222222";
+
+ // - Add
+ String lentry = new String("dn: dc=example,dc=com\n"
+ + "objectClass: top\n" + "objectClass: domain\n"
+ + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
+ Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+ cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+ AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
+ user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
+ .getAttributes(), new ArrayList<Attribute>());
+ broker1.publish(addMsg);
+
+ // - Modify
+ Attribute attr1 = new Attribute("description", "new value");
+ Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(mod1);
+ cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+ ModifyMsg modMsg = new ModifyMsg(cn, DN
+ .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
+ broker1.publish(modMsg);
+
+ // - ModifyDN
+ cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+ ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
+ .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
+ null);
+ op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(cn, "uniqueid",
+ "newparentId"));
+ ModifyDNMsg modDNMsg = new ModifyDNMsg(op);
+ broker1.publish(modDNMsg);
+
+ if (itest > 0)
+ {
+ socket.close();
+ String changelogLdif = "dn: cn=Changelog Server\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n"
+ + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
+ + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
+ + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
+ Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
+ ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
+ changelogs[1] = new Changelog(changelogConfig);
+
+ // Connect broker 2 to changelog2
+ broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
+ }
+
+ // - Check msg receives by broker, through changeLog2
+ while (ts > 1)
+ {
+ SynchronizationMessage msg2;
+ try
+ {
+ msg2 = broker2.receive();
+ if (msg2 == null)
+ break;
+ }
+ catch (Exception e)
+ {
+ fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest);
+ break;
+ }
+
+ if (msg2 instanceof DeleteMsg)
+ {
+ DeleteMsg delMsg2 = (DeleteMsg) msg2;
+ if (delMsg2.toString().equals(delMsg.toString()))
+ ts--;
+ }
+ else if (msg2 instanceof AddMsg)
+ {
+ AddMsg addMsg2 = (AddMsg) msg2;
+ if (addMsg2.toString().equals(addMsg.toString()))
+ ts--;
+ }
+ else if (msg2 instanceof ModifyMsg)
+ {
+ ModifyMsg modMsg2 = (ModifyMsg) msg2;
+ if (modMsg.equals(modMsg2))
+ ts--;
+ }
+ else if (msg2 instanceof ModifyDNMsg)
+ {
+ ModifyDNMsg modDNMsg2 = (ModifyDNMsg) msg2;
+ if (modDNMsg.equals(modDNMsg2))
+ ts--;
+ }
+ else
+ {
+ fail("Changelog transmission failed: no expected message class.");
+ break;
+ }
+ }
+ // Check that everything expected has been received
+ assertTrue(ts == 1, "Broker2 did not receive the complete set of"
+ + " expected messages: #msg received " + ts);
+ }
+ finally
+ {
+ if (changelogs[0] != null)
+ changelogs[0].shutdown();
+ if (changelogs[1] != null)
+ changelogs[1].shutdown();
+ if (broker1 != null)
+ broker1.stop();
+ if (broker2 != null)
+ broker2.stop();
+ }
+ }
+ }
}
--
Gitblit v1.10.0