From 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508 These changes implement a window mechanism in the sycnhronization protocol.
---
opends/resource/schema/02-config.ldif | 9
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java | 494 +++++++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java | 67 +
opends/src/server/org/opends/server/changelog/Changelog.java | 92 +
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java | 153 ++-
opends/src/server/org/opends/server/synchronization/ServerStartMessage.java | 32
opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java | 38
opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java | 33
opends/src/server/org/opends/server/synchronization/WindowMessage.java | 141 +++
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java | 331 ++++----
opends/src/server/org/opends/server/changelog/DbHandler.java | 12
opends/src/server/org/opends/server/changelog/ServerReader.java | 7
opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java | 2
opends/src/server/org/opends/server/changelog/ChangelogDB.java | 176 ---
opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java | 86 +-
opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java | 62 +
opends/src/server/org/opends/server/changelog/ChangelogCache.java | 29
opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java | 245 ++++++
opends/src/server/org/opends/server/changelog/ServerHandler.java | 122 ++
opends/src/server/org/opends/server/changelog/SocketSession.java | 4
opends/src/server/org/opends/server/synchronization/ChangelogBroker.java | 101 ++
22 files changed, 1,723 insertions(+), 517 deletions(-)
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 03a17ef..8c8f7ce 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -547,6 +547,10 @@
NAME 'ds-cfg-bind-with-dn-requires-password'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.288
+ NAME 'ds-cfg-window-size'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.164
NAME 'ds-cfg-max-receive-queue'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
@@ -1239,7 +1243,8 @@
STRUCTURAL MUST ( ds-cfg-changelog-server $ ds-cfg-directory-server-id
$ ds-cfg-synchronization-dn )
MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $
- ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay )
+ ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $
+ ds-cfg-window-size )
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.59
NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator
@@ -1295,7 +1300,7 @@
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME
'ds-cfg-synchronization-changelog-server-config' SUP top
STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
- MAY ( ds-cfg-changelog-server $ cn ) X-ORIGIN 'OpenDS Directory Server' )
+ MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
X-ORIGIN 'OpenDS Directory Server' )
diff --git a/opends/src/server/org/opends/server/changelog/Changelog.java b/opends/src/server/org/opends/server/changelog/Changelog.java
index 6660b1b..497e2f4 100644
--- a/opends/src/server/org/opends/server/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/changelog/Changelog.java
@@ -71,14 +71,14 @@
*/
public class Changelog implements Runnable, ConfigurableComponent
{
- static private short serverId;
- static private String serverURL;
+ private short serverId;
+ private String serverURL;
- private static ServerSocket listenSocket;
- private static Thread myListenThread;
- private static Thread myConnectThread;
+ private ServerSocket listenSocket;
+ private Thread myListenThread;
+ private Thread myConnectThread;
- private static boolean runListen = true;
+ private boolean runListen = true;
/* The list of changelog servers configured by the administrator */
private List<String> changelogServers;
@@ -86,19 +86,22 @@
/* This table is used to store the list of dn for which we are currently
* handling servers.
*/
- private static HashMap<DN, ChangelogCache> baseDNs =
+ private HashMap<DN, ChangelogCache> baseDNs =
new HashMap<DN, ChangelogCache>();
private String localURL = "null";
- private static boolean shutdown = false;
+ private boolean shutdown = false;
private short changelogServerId;
private DN configDn;
private List<ConfigAttribute> configAttributes =
new ArrayList<ConfigAttribute>();
+ private ChangelogDbEnv dbEnv;
+ private int rcvWindow;
static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
+ static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
static final IntegerConfigAttribute changelogPortStub =
new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -114,6 +117,10 @@
"changelog server information", true,
true, false);
+ static final IntegerConfigAttribute windowStub =
+ new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
+ false, false, false, true, 0, false, 0);
+
/**
* Check if a ConfigEntry is valid.
* @param config The config entry that needs to be checked.
@@ -229,6 +236,16 @@
}
configAttributes.add(changelogServer);
+ IntegerConfigAttribute windowAttr =
+ (IntegerConfigAttribute) config.getConfigAttribute(windowStub);
+ if (windowAttr == null)
+ rcvWindow = 100; // Attribute is not present : use the default value
+ else
+ {
+ rcvWindow = windowAttr.activeIntValue();
+ configAttributes.add(windowAttr);
+ }
+
initialize(changelogServerId, changelogPort);
configDn = config.getDN();
@@ -305,9 +322,10 @@
try
{
newSocket = listenSocket.accept();
+ newSocket.setReceiveBufferSize(1000000);
ServerHandler handler = new ServerHandler(
new SocketSession(newSocket));
- handler.start(null);
+ handler.start(null, serverId, serverURL, rcvWindow, this);
} catch (IOException e)
{
// ignore
@@ -378,11 +396,12 @@
InetSocketAddress ServerAddr = new InetSocketAddress(
InetAddress.getByName(hostname), Integer.parseInt(port));
Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
socket.connect(ServerAddr, 500);
ServerHandler handler = new ServerHandler(
new SocketSession(socket));
- handler.start(baseDn);
+ handler.start(baseDn, serverId, serverURL, rcvWindow, this);
}
catch (IOException e)
{
@@ -406,8 +425,9 @@
* Initialize the changelog database.
* TODO : the changelog db path should be configurable
*/
- ChangelogDB.initialize(DirectoryServer.getServerRoot() + File.separator
- + "changelogDb");
+ dbEnv = new ChangelogDbEnv(
+ DirectoryServer.getServerRoot() + File.separator + "changelogDb",
+ this);
/*
* create changelog cache
@@ -421,7 +441,9 @@
String localAdddress = InetAddress.getLocalHost().getHostAddress();
serverURL = localhostname + ":" + String.valueOf(changelogPort);
localURL = localAdddress + ":" + String.valueOf(changelogPort);
- listenSocket = new ServerSocket(changelogPort);
+ listenSocket = new ServerSocket();
+ listenSocket.setReceiveBufferSize(1000000);
+ listenSocket.bind(new InetSocketAddress(changelogPort));
/*
* create working threads
@@ -460,32 +482,12 @@
}
/**
- * Retrieves the unique identifier for this changelog.
- *
- * @return The unique identifier for this changelog.
- */
- public static short getServerId()
- {
- return serverId;
- }
-
- /**
- * Retrieves the host and port for this changelog, separated by a colon.
- *
- * @return The host and port for this changelog, separated by a colon.
- */
- public static String getServerURL()
- {
- return serverURL;
- }
-
- /**
* Get the ChangelogCache associated to the base DN given in parameter.
*
* @param baseDn The base Dn for which the ChangelogCache must be returned.
* @return The ChangelogCache associated to the base DN given in parameter.
*/
- public static ChangelogCache getChangelogCache(DN baseDn)
+ public ChangelogCache getChangelogCache(DN baseDn)
{
ChangelogCache changelogCache;
@@ -493,7 +495,7 @@
{
changelogCache = baseDNs.get(baseDn);
if (changelogCache == null)
- changelogCache = new ChangelogCache(baseDn);
+ changelogCache = new ChangelogCache(baseDn, this);
baseDNs.put(baseDn, changelogCache);
}
@@ -503,7 +505,7 @@
/**
* Shutdown the Changelog service and all its connections.
*/
- public static void shutdown()
+ public void shutdown()
{
shutdown = true;
@@ -525,6 +527,22 @@
changelogCache.shutdown();
}
- ChangelogDB.shutdownDbEnvironment();
+ dbEnv.shutdown();
+ }
+
+
+ /**
+ * Creates a new DB handler for this Changelog and the serverId and
+ * DN given in parameter.
+ *
+ * @param id The serverId for which the dbHandler must be created.
+ * @param baseDn The DN for which the dbHandler muste be created.
+ * @return The new DB handler for this Changelog and the serverId and
+ * DN given in parameter.
+ * @throws DatabaseException in case of underlying database problem.
+ */
+ public DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
+ {
+ return new DbHandler(id, baseDn, this, dbEnv);
}
}
diff --git a/opends/src/server/org/opends/server/changelog/ChangelogCache.java b/opends/src/server/org/opends/server/changelog/ChangelogCache.java
index c92a90b..398bcf3 100644
--- a/opends/src/server/org/opends/server/changelog/ChangelogCache.java
+++ b/opends/src/server/org/opends/server/changelog/ChangelogCache.java
@@ -100,15 +100,18 @@
*/
private Map<Short, DbHandler> sourceDbHandlers =
new ConcurrentHashMap<Short, DbHandler>();
+ private Changelog changelog;
/**
* Creates a new ChangelogCache associated to the DN baseDn.
*
* @param baseDn The baseDn associated to the ChangelogCache.
+ * @param changelog the Changelog that created this changelog cache.
*/
- public ChangelogCache(DN baseDn)
+ public ChangelogCache(DN baseDn, Changelog changelog)
{
this.baseDn = baseDn;
+ this.changelog = changelog;
}
/**
@@ -161,7 +164,7 @@
{
try
{
- dbHandler = new DbHandler(id, baseDn);
+ dbHandler = changelog.newDbHandler(id, baseDn);
} catch (DatabaseException e)
{
/*
@@ -169,14 +172,13 @@
* from at least one LDAP server.
* This changelog therefore can't do it's job properly anymore
* and needs to close all its connections and shutdown itself.
- * TODO : log error
*/
int msgID = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR;
String message = getMessage(msgID) + stackTraceToSingleLineString(e);
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
- Changelog.shutdown();
+ changelog.shutdown();
return;
}
sourceDbHandlers.put(id, dbHandler);
@@ -411,14 +413,15 @@
/**
* creates a new ChangelogDB with specified identifier.
* @param id the identifier of the new ChangelogDB.
- * @param baseDn the baseDn of the new ChangelogDB.
+ * @param db the new db.
+ *
* @throws DatabaseException If a database error happened.
*/
- public void newDb(short id, DN baseDn) throws DatabaseException
+ public void newDb(short id, DbHandler db) throws DatabaseException
{
synchronized (sourceDbHandlers)
{
- sourceDbHandlers.put(id , new DbHandler(id, baseDn));
+ sourceDbHandlers.put(id , db);
}
}
@@ -441,6 +444,16 @@
*/
public void ack(AckMessage message, short fromServerId)
{
+ /*
+ * there are 2 possible cases here :
+ * - the message that was acked comes from a server to which
+ * we are directly connected.
+ * In this case, we can find the handler from the connectedServers map
+ * - the message that was acked comes from a server to which we are not
+ * connected.
+ * In this case we need to find the changelog server that forwarded
+ * the change and send back the ack to this server.
+ */
ServerHandler handler = connectedServers.get(
message.getChangeNumber().getServerId());
if (handler != null)
@@ -551,6 +564,4 @@
{
return "ChangelogCache " + baseDn;
}
-
-
}
diff --git a/opends/src/server/org/opends/server/changelog/ChangelogDB.java b/opends/src/server/org/opends/server/changelog/ChangelogDB.java
index 45c6544..c476e40 100644
--- a/opends/src/server/org/opends/server/changelog/ChangelogDB.java
+++ b/opends/src/server/org/opends/server/changelog/ChangelogDB.java
@@ -32,10 +32,8 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
-import java.io.File;
import java.io.UnsupportedEncodingException;
-import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -45,10 +43,7 @@
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -60,143 +55,34 @@
*/
public class ChangelogDB
{
- private static Environment dbEnvironment = null;
private Database db = null;
- private static Database stateDb = null;
- private String stringId = null;
+ private ChangelogDbEnv dbenv = null;
+ private Changelog changelog;
+ private Short serverId;
+ private DN baseDn;
/**
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
* @param serverId Identifier of the LDAP server.
* @param baseDn baseDn of the LDAP server.
+ * @param changelog the Changelog that needs to be shutdown
+ * @param dbenv the Db encironemnet to use to create the db
* @throws DatabaseException if a database problem happened
*/
- public ChangelogDB(Short serverId, DN baseDn)
+ public ChangelogDB(Short serverId, DN baseDn, Changelog changelog,
+ ChangelogDbEnv dbenv)
throws DatabaseException
{
- try {
- stringId = serverId.toString() + " " + baseDn.toNormalizedString();
- byte[] byteId = stringId.getBytes("UTF-8");
+ this.serverId = serverId;
+ this.baseDn = baseDn;
+ this.dbenv = dbenv;
+ this.changelog = changelog;
+ db = dbenv.getOrAddDb(serverId, baseDn);
- // Open the database. Create it if it does not already exist.
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(true);
-
- db = dbEnvironment.openDatabase(null, stringId, dbConfig);
-
- DatabaseEntry key = new DatabaseEntry();
- key.setData(byteId);
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
- if (status == OperationStatus.NOTFOUND)
- {
- Transaction txn = dbEnvironment.beginTransaction(null, null);
- try {
- data.setData(byteId);
- stateDb.put(txn, key, data);
- txn.commitWriteNoSync();
- } catch (DatabaseException dbe)
- {
- // Abort the txn and propagate the Exception to the caller
- txn.abort();
- throw dbe;
- }
- }
- }
- catch (UnsupportedEncodingException e)
- {
- // never happens
- }
}
/**
- * Initialize this class.
- * Creates Db environment that will be used to create databases.
- * It also reads the currently known databases from the "changelogstate"
- * database.
- * @param path Path where the backing files must be created.
- * @throws DatabaseException If a DatabaseException occured that prevented
- * the initialization to happen.
- * @throws ChangelogDBException If a changelog internal error caused
- * a failure of the changelog processing.
- */
- public static void initialize(String path) throws DatabaseException,
- ChangelogDBException
- {
- EnvironmentConfig envConfig = new EnvironmentConfig();
-
- /* Create the DB Environment that will be used for all
- * the Changelog activities related to the db
- */
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam("je.cleaner.expunge", "true");
- // TODO : the DB cache size should be configurable
- // For now set 5M is OK for being efficient in 64M total for the JVM
- envConfig.setConfigParam("je.maxMemory", "5000000");
- dbEnvironment = new Environment(new File(path), envConfig);
-
- /*
- * One database is created to store the update from each LDAP
- * server in the topology.
- * The database "changelogstate" is used to store the list of all
- * the servers that have been seen in the past.
- */
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(true);
-
- stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
- Cursor cursor = stateDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- try
- {
- OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
- while (status == OperationStatus.SUCCESS)
- {
- try
- {
- String stringData = new String(data.getData(), "UTF-8");
- String[] str = stringData.split(" ", 2);
- short serverId = new Short(str[0]);
- DN baseDn = null;
- try
- {
- baseDn = DN.decode(str[1]);
- } catch (DirectoryException e)
- {
- int msgID = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER;
- String message = getMessage(msgID, str[1]);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
- Changelog.getChangelogCache(baseDn).newDb(serverId, baseDn);
- } catch (NumberFormatException e)
- {
- // should never happen
- throw new ChangelogDBException(0,
- "changelog state database has a wrong format");
- } catch (UnsupportedEncodingException e)
- {
- // should never happens
- throw new ChangelogDBException(0, "need UTF-8 support");
- }
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- }
- cursor.close();
-
- } catch (DatabaseException dbe) {
- cursor.close();
- throw dbe;
- }
- }
-
-
- /**
* add a list of changes to the underlying db.
*
* @param changes The list of changes to add to the underlying db.
@@ -207,7 +93,7 @@
try
{
- txn = dbEnvironment.beginTransaction(null, null);
+ txn = dbenv.beginTransaction();
for (UpdateMessage change : changes)
{
@@ -224,7 +110,7 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
- Changelog.shutdown();
+ changelog.shutdown();
}
}
@@ -238,7 +124,7 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
- Changelog.shutdown();
+ changelog.shutdown();
if (txn != null)
{
try
@@ -264,7 +150,7 @@
} catch (DatabaseException e)
{
int msgID = MSGID_EXCEPTION_CLOSING_DATABASE;
- String message = getMessage(msgID, stringId) +
+ String message = getMessage(msgID, this.toString()) +
stackTraceToSingleLineString(e);
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
@@ -355,7 +241,7 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
- Changelog.shutdown();
+ changelog.shutdown();
return null;
}
}
@@ -396,28 +282,18 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
- Changelog.shutdown();
+ changelog.shutdown();
return null;
}
}
/**
- * Shutdown the Db environment.
+ * {@inheritDoc}
*/
- public static void shutdownDbEnvironment()
+ @Override
+ public String toString()
{
- try
- {
- stateDb.close();
- dbEnvironment.close();
- } catch (DatabaseException e)
- {
- int msgID = MSGID_ERROR_CLOSING_CHANGELOG_ENV;
- String message = getMessage(msgID) + stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
+ return serverId.toString() + baseDn.toString();
}
/**
@@ -455,7 +331,7 @@
private ChangelogCursor() throws DatabaseException
{
- txn = dbEnvironment.beginTransaction(null, null);
+ txn = dbenv.beginTransaction();
cursor = db.openCursor(txn, null);
}
@@ -477,7 +353,7 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
- Changelog.shutdown();
+ changelog.shutdown();
}
if (txn != null)
{
@@ -491,7 +367,7 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
- Changelog.shutdown();
+ changelog.shutdown();
}
}
}
diff --git a/opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java b/opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java
new file mode 100644
index 0000000..f9fdfa4
--- /dev/null
+++ b/opends/src/server/org/opends/server/changelog/ChangelogDbEnv.java
@@ -0,0 +1,245 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.changelog;
+
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.opends.server.synchronization.SynchMessages.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
+/**
+ * This class is used to represent a Db environement that can be used
+ * to create ChangelogDB.
+ */
+public class ChangelogDbEnv
+{
+ private Environment dbEnvironment = null;
+ private Database stateDb = null;
+ private Changelog changelog = null;
+
+ /**
+ * Initialize this class.
+ * Creates Db environment that will be used to create databases.
+ * It also reads the currently known databases from the "changelogstate"
+ * database.
+ * @param path Path where the backing files must be created.
+ * @param changelog the Changelog that creates this ChangelogDbEnv.
+ * @throws DatabaseException If a DatabaseException occured that prevented
+ * the initialization to happen.
+ * @throws ChangelogDBException If a changelog internal error caused
+ * a failure of the changelog processing.
+ */
+ public ChangelogDbEnv(String path, Changelog changelog)
+ throws DatabaseException, ChangelogDBException
+ {
+ this.changelog = changelog;
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+
+ /* Create the DB Environment that will be used for all
+ * the Changelog activities related to the db
+ */
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setConfigParam("je.cleaner.expunge", "true");
+ // TODO : the DB cache size should be configurable
+ // For now set 5M is OK for being efficient in 64M total for the JVM
+ envConfig.setConfigParam("je.maxMemory", "5000000");
+ dbEnvironment = new Environment(new File(path), envConfig);
+
+ /*
+ * One database is created to store the update from each LDAP
+ * server in the topology.
+ * The database "changelogstate" is used to store the list of all
+ * the servers that have been seen in the past.
+ */
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setAllowCreate(true);
+ dbConfig.setTransactional(true);
+
+ stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
+ start();
+
+ }
+
+ /**
+ * Read the list of known servers from the database and start dbHandler
+ * for each of them.
+ *
+ * @throws DatabaseException in case of underlying DatabaseException
+ * @throws ChangelogDBException when the information from the database
+ * cannot be decoded correctly.
+ */
+ private void start() throws DatabaseException, ChangelogDBException
+ {
+ Cursor cursor = stateDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ try
+ {
+ OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+ while (status == OperationStatus.SUCCESS)
+ {
+ try
+ {
+ String stringData = new String(data.getData(), "UTF-8");
+ String[] str = stringData.split(" ", 2);
+ short serverId = new Short(str[0]);
+ DN baseDn = null;
+ try
+ {
+ baseDn = DN.decode(str[1]);
+ } catch (DirectoryException e)
+ {
+ int msgID = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER;
+ String message = getMessage(msgID, str[1]);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ DbHandler dbHandler =
+ new DbHandler(serverId, baseDn, changelog, this);
+ changelog.getChangelogCache(baseDn).newDb(serverId, dbHandler);
+ } catch (NumberFormatException e)
+ {
+ // should never happen
+ throw new ChangelogDBException(0,
+ "changelog state database has a wrong format");
+ } catch (UnsupportedEncodingException e)
+ {
+ // should never happens
+ throw new ChangelogDBException(0, "need UTF-8 support");
+ }
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+ cursor.close();
+
+ } catch (DatabaseException dbe) {
+ cursor.close();
+ throw dbe;
+ }
+ }
+
+ /**
+ * Find or create the database used to store changes from the server
+ * with the given serverId and the given baseDn.
+ * @param serverId The server id that identifies the server.
+ * @param baseDn The baseDn that identifies the server.
+ * @return the Database.
+ * @throws DatabaseException in case of underlying Exception.
+ */
+ public Database getOrAddDb(Short serverId, DN baseDn)
+ throws DatabaseException
+ {
+ try
+ {
+ String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
+ byte[] byteId;
+
+ byteId = stringId.getBytes("UTF-8");
+
+ // Open the database. Create it if it does not already exist.
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setAllowCreate(true);
+ dbConfig.setTransactional(true);
+ Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
+
+ DatabaseEntry key = new DatabaseEntry();
+ key.setData(byteId);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try {
+ data.setData(byteId);
+ stateDb.put(txn, key, data);
+ txn.commitWriteNoSync();
+ } catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+ return db;
+ } catch (UnsupportedEncodingException e)
+ {
+ // can't happen
+ return null;
+ }
+ }
+
+ /**
+ * Creates a new transaction.
+ *
+ * @return the transaction.
+ * @throws DatabaseException in case of underlying database Exception.
+ */
+ public Transaction beginTransaction() throws DatabaseException
+ {
+ return dbEnvironment.beginTransaction(null, null);
+ }
+
+ /**
+ * Shutdown the Db environment.
+ */
+ public void shutdown()
+ {
+ try
+ {
+ stateDb.close();
+ dbEnvironment.close();
+ } catch (DatabaseException e)
+ {
+ int msgID = MSGID_ERROR_CLOSING_CHANGELOG_ENV;
+ String message = getMessage(msgID) + stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/changelog/DbHandler.java b/opends/src/server/org/opends/server/changelog/DbHandler.java
index 45468b2..593b868 100644
--- a/opends/src/server/org/opends/server/changelog/DbHandler.java
+++ b/opends/src/server/org/opends/server/changelog/DbHandler.java
@@ -87,13 +87,17 @@
*
* @param id Identifier of the DB.
* @param baseDn of the DB.
+ * @param changelog the Changelog that creates this dbHandler.
+ * @param dbenv the Database Env to use to create the Changelog DB.
* @throws DatabaseException If a database problem happened
*/
- public DbHandler(short id, DN baseDn) throws DatabaseException
+ public DbHandler(short id, DN baseDn, Changelog changelog,
+ ChangelogDbEnv dbenv)
+ throws DatabaseException
{
this.serverId = id;
this.baseDn = baseDn;
- db = new ChangelogDB(id, baseDn);
+ db = new ChangelogDB(id, baseDn, changelog, dbenv);
firstChange = db.readFirstChange();
lastChange = db.readLastChange();
thread = new DirectoryThread(this, "changelog db " + id + " " + baseDn);
@@ -245,6 +249,10 @@
{}
}
}
+
+ while (msgQueue.size() != 0)
+ flush();
+
db.shutdown();
}
diff --git a/opends/src/server/org/opends/server/changelog/ServerHandler.java b/opends/src/server/org/opends/server/changelog/ServerHandler.java
index d4cc9d6..6934954 100644
--- a/opends/src/server/org/opends/server/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/changelog/ServerHandler.java
@@ -39,6 +39,7 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
@@ -58,6 +59,7 @@
import org.opends.server.synchronization.ServerState;
import org.opends.server.synchronization.SynchronizationMessage;
import org.opends.server.synchronization.UpdateMessage;
+import org.opends.server.synchronization.WindowMessage;
import org.opends.server.util.TimeThread;
/**
@@ -71,7 +73,7 @@
private MsgQueue msgQueue = new MsgQueue();
private MsgQueue lateQueue = new MsgQueue();
private Map<ChangeNumber, AckMessageList> waitingAcks =
- new HashMap<ChangeNumber, AckMessageList>();;
+ new HashMap<ChangeNumber, AckMessageList>();
private ChangelogCache changelogCache = null;
private String serverURL;
private int outCount = 0; // number of update sent to the server
@@ -93,6 +95,12 @@
private ServerWriter writer = null;
private DN baseDn = null;
private String serverAddressURL;
+ private int rcvWindow;
+ private int rcvWindowSizeHalf;
+ private int maxRcvWindow;
+ private ServerReader reader;
+ private Semaphore sendWindow;
+ private int sendWindowSize;
private static Map<ChangeNumber, ChangelogAckMessageList>
changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -116,20 +124,29 @@
*
* @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
* null if this is an incoming connection.
+ * @param changelogId The identifier of the changelog that creates this
+ * server handler.
+ * @param changelogURL The URL of the changelog that creates this
+ * server handler.
+ * @param windowSize the window size that this server handler must use.
+ * @param changelog the Changelog that created this server handler.
*/
- public void start(DN baseDn)
+ public void start(DN baseDn, short changelogId, String changelogURL,
+ int windowSize, Changelog changelog)
{
+ rcvWindowSizeHalf = windowSize/2;
+ maxRcvWindow = windowSize;
+ rcvWindow = windowSize;
try
{
if (baseDn != null)
{
this.baseDn = baseDn;
- changelogCache = Changelog.getChangelogCache(baseDn);
+ changelogCache = changelog.getChangelogCache(baseDn);
ServerState localServerState = changelogCache.getDbServerState();
ChangelogStartMessage msg =
- new ChangelogStartMessage(Changelog.getServerId(),
- Changelog.getServerURL(),
- baseDn, localServerState);
+ new ChangelogStartMessage(changelogId, changelogURL,
+ baseDn, windowSize, localServerState);
session.publish(msg);
}
@@ -175,16 +192,15 @@
restartSendDelay = 0;
serverIsLDAPserver = true;
- changelogCache = Changelog.getChangelogCache(this.baseDn);
+ changelogCache = changelog.getChangelogCache(this.baseDn);
ServerState localServerState = changelogCache.getDbServerState();
ChangelogStartMessage myStartMsg =
- new ChangelogStartMessage(Changelog.getServerId(),
- Changelog.getServerURL(),
- this.baseDn, localServerState);
+ new ChangelogStartMessage(changelogId, changelogURL,
+ this.baseDn, windowSize, localServerState);
session.publish(myStartMsg);
+ sendWindowSize = receivedMsg.getWindowSize();
}
- else if (msg.getClass() == Class.forName(
- "org.opends.server.synchronization.ChangelogStartMessage"))
+ else if (msg instanceof ChangelogStartMessage)
{
ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
serverId = receivedMsg.getServerId();
@@ -195,17 +211,17 @@
this.baseDn = receivedMsg.getBaseDn();
if (baseDn == null)
{
- changelogCache = Changelog.getChangelogCache(this.baseDn);
+ changelogCache = changelog.getChangelogCache(this.baseDn);
ServerState serverState = changelogCache.getDbServerState();
ChangelogStartMessage outMsg =
- new ChangelogStartMessage(Changelog.getServerId(),
- Changelog.getServerURL(),
- this.baseDn, serverState);
+ new ChangelogStartMessage(changelogId, changelogURL,
+ this.baseDn, windowSize, serverState);
session.publish(outMsg);
}
else
this.baseDn = baseDn;
this.serverState = receivedMsg.getServerState();
+ sendWindowSize = receivedMsg.getWindowSize();
}
else
{
@@ -213,7 +229,7 @@
return; // we did not recognize the message, ignore it
}
- changelogCache = Changelog.getChangelogCache(this.baseDn);
+ changelogCache = changelog.getChangelogCache(this.baseDn);
if (serverIsLDAPserver)
{
@@ -226,7 +242,7 @@
writer = new ServerWriter(session, serverId, this, changelogCache);
- ServerReader reader = new ServerReader(session, serverId, this,
+ reader = new ServerReader(session, serverId, this,
changelogCache);
reader.start();
@@ -251,7 +267,7 @@
// ignore
}
}
-
+ sendWindow = new Semaphore(sendWindowSize);
}
/**
@@ -576,6 +592,30 @@
*/
public UpdateMessage take()
{
+ boolean interrupted = true;
+ UpdateMessage msg = getnextMessage();
+ do {
+ try
+ {
+ sendWindow.acquire();
+ interrupted = false;
+ } catch (InterruptedException e)
+ {
+ // loop until not interrupted
+ }
+ } while (interrupted);
+ this.incrementOutCount();
+ return msg;
+ }
+
+ /**
+ * Get the next update that must be sent to the server
+ * from the message queue or from the database.
+ *
+ * @return The next update that must be sent to the server.
+ */
+ private UpdateMessage getnextMessage()
+ {
UpdateMessage msg;
do
{
@@ -668,7 +708,6 @@
msg1 = msgQueue.removeFirst();
} while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
this.updateServerState(msg);
- this.incrementOutCount();
return msg;
}
}
@@ -679,7 +718,6 @@
/* get the next change from the lateQueue */
msg = lateQueue.removeFirst();
this.updateServerState(msg);
- this.incrementOutCount();
return msg;
}
}
@@ -707,7 +745,6 @@
* by the other server.
* Otherwise just loop to select the next message.
*/
- this.incrementOutCount();
return msg;
}
}
@@ -927,7 +964,7 @@
@Override
public String getMonitorInstanceName()
{
- String str = changelogCache.getBaseDn().toString() +
+ String str = baseDn.toString() +
" " + serverURL + " " + String.valueOf(serverId);
if (serverIsLDAPserver)
@@ -985,7 +1022,7 @@
attributes.add(new Attribute("server-id",
String.valueOf(serverId)));
attributes.add(new Attribute("base-dn",
- changelogCache.getBaseDn().toString()));
+ baseDn.toString()));
attributes.add(new Attribute("waiting-changes",
String.valueOf(getRcvMsgQueueSize())));
attributes.add(new Attribute("update-waiting-acks",
@@ -999,6 +1036,14 @@
String.valueOf(getInAckCount())));
attributes.add(new Attribute("approximate-delay",
String.valueOf(getApproxDelay())));
+ attributes.add(new Attribute("max-send-window",
+ String.valueOf(sendWindowSize)));
+ attributes.add(new Attribute("current-send-window",
+ String.valueOf(sendWindow.availablePermits())));
+ attributes.add(new Attribute("max-rcv-window",
+ String.valueOf(maxRcvWindow)));
+ attributes.add(new Attribute("current-rcv-window",
+ String.valueOf(rcvWindow)));
long olderUpdateTime = getOlderUpdateTime();
if (olderUpdateTime != 0)
{
@@ -1058,4 +1103,33 @@
return localString;
}
+
+ /**
+ * Check the protocol window and send WindowMessage if necessary.
+ *
+ * @throws IOException when the session becomes unavailable.
+ */
+ public synchronized void checkWindow() throws IOException
+ {
+ rcvWindow--;
+ if (rcvWindow < rcvWindowSizeHalf)
+ {
+ WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+ session.publish(msg);
+ outAckCount++;
+ rcvWindow += rcvWindowSizeHalf;
+ }
+ }
+
+ /**
+ * Update the send window size based on the credit specified in the
+ * given window message.
+ *
+ * @param windowMsg The Window Message containing the information
+ * necessary for updating the window size.
+ */
+ public void updateWindow(WindowMessage windowMsg)
+ {
+ sendWindow.release(windowMsg.getNumAck());
+ }
}
diff --git a/opends/src/server/org/opends/server/changelog/ServerReader.java b/opends/src/server/org/opends/server/changelog/ServerReader.java
index ab8a68e..d0f1126 100644
--- a/opends/src/server/org/opends/server/changelog/ServerReader.java
+++ b/opends/src/server/org/opends/server/changelog/ServerReader.java
@@ -36,6 +36,7 @@
import org.opends.server.synchronization.AckMessage;
import org.opends.server.synchronization.SynchronizationMessage;
import org.opends.server.synchronization.UpdateMessage;
+import org.opends.server.synchronization.WindowMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -105,8 +106,14 @@
else if (msg instanceof UpdateMessage)
{
UpdateMessage update = (UpdateMessage) msg;
+ handler.checkWindow();
changelogCache.put(update, handler);
}
+ else if (msg instanceof WindowMessage)
+ {
+ WindowMessage windowMsg = (WindowMessage) msg;
+ handler.updateWindow(windowMsg);
+ }
}
} catch (IOException e)
{
diff --git a/opends/src/server/org/opends/server/changelog/SocketSession.java b/opends/src/server/org/opends/server/changelog/SocketSession.java
index 4bb87d6..13708ad 100644
--- a/opends/src/server/org/opends/server/changelog/SocketSession.java
+++ b/opends/src/server/org/opends/server/changelog/SocketSession.java
@@ -60,6 +60,10 @@
public SocketSession(Socket socket) throws IOException
{
this.socket = socket;
+ /*
+ * Use a window instead of the TCP flow control.
+ * Therefore set a very large value for send and receive buffer sizes.
+ */
input = socket.getInputStream();
output = socket.getOutputStream();
}
diff --git a/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index c9a0605..2778e34 100644
--- a/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@@ -78,6 +79,12 @@
private int maxReceiveDelay;
private int maxSendQueue;
private int maxReceiveQueue;
+ private Semaphore sendWindow;
+ private int maxSendWindow;
+ private int rcvWindow;
+ private int halfRcvWindow;
+ private int maxRcvWindow;
+ private int timeout = 0;
/**
* Creates a new Changelog Broker for a particular SynchronizationDomain.
@@ -95,10 +102,11 @@
* @param maxSendQueue The maximum size of the send queue to use on
* the changelog server.
* @param maxSendDelay The maximum send delay to use on the changelog server.
+ * @param window The size of the send and receive window to use.
*/
public ChangelogBroker(ServerState state, DN baseDn, short serverID,
int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay )
+ int maxSendDelay, int window)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -109,6 +117,9 @@
this.state = state;
replayOperations =
new TreeSet<FakeOperation>(new FakeOperationComparator());
+ this.rcvWindow = window;
+ this.maxRcvWindow = window;
+ this.halfRcvWindow = window/2;
}
/**
@@ -165,6 +176,7 @@
InetSocketAddress ServerAddr = new InetSocketAddress(
InetAddress.getByName(hostname), Integer.parseInt(port));
Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
socket.connect(ServerAddr, 500);
session = new SocketSession(socket);
@@ -173,7 +185,7 @@
*/
ServerStartMessage msg = new ServerStartMessage( serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- state);
+ halfRcvWindow*2, state);
session.publish(msg);
@@ -182,7 +194,7 @@
*/
session.setSoTimeout(1000);
startMsg = (ChangelogStartMessage) session.receive();
- session.setSoTimeout(0);
+ session.setSoTimeout(timeout);
/*
* We must not publish changes to a changelog that has not
@@ -202,6 +214,8 @@
(ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
{
changelogServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ this.sendWindow = new Semaphore(maxSendWindow);
connected = true;
break;
}
@@ -254,6 +268,8 @@
else
{
changelogServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ this.sendWindow = new Semaphore(maxSendWindow);
connected = true;
for (FakeOperation replayOp : replayOperations)
{
@@ -306,6 +322,14 @@
* changes that this server has already processed, start again
* the loop looking for any changelog server.
*/
+ try
+ {
+ Thread.sleep(500);
+ } catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
checkState = false;
int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
String message = getMessage(msgID);
@@ -393,13 +417,18 @@
{
if (this.connected == false)
this.reStart(failingSession);
-
+ if (msg instanceof UpdateMessage)
+ sendWindow.acquire();
session.publish(msg);
done = true;
} catch (IOException e)
{
this.reStart(failingSession);
}
+ catch (InterruptedException e)
+ {
+ this.reStart(failingSession);
+ }
}
}
}
@@ -418,7 +447,25 @@
ProtocolSession failingSession = session;
try
{
- return session.receive();
+ SynchronizationMessage msg = session.receive();
+ if (msg instanceof WindowMessage)
+ {
+ WindowMessage windowMsg = (WindowMessage) msg;
+ sendWindow.release(windowMsg.getNumAck());
+ }
+ else
+ {
+ if (msg instanceof UpdateMessage)
+ {
+ rcvWindow--;
+ if (rcvWindow < halfRcvWindow)
+ {
+ session.publish(new WindowMessage(halfRcvWindow));
+ rcvWindow += halfRcvWindow;
+ }
+ }
+ return msg;
+ }
} catch (Exception e)
{
if (e instanceof SocketTimeoutException)
@@ -485,6 +532,7 @@
*/
public void setSoTimeout(int timeout) throws SocketException
{
+ this.timeout = timeout;
session.setSoTimeout(timeout);
}
@@ -532,4 +580,47 @@
{
// TODO to be implemented
}
+
+ /**
+ * Get the maximum receive window size.
+ *
+ * @return The maximum receive window size.
+ */
+ public int getMaxRcvWindow()
+ {
+ return maxRcvWindow;
+ }
+
+ /**
+ * Get the current receive window size.
+ *
+ * @return The current receive window size.
+ */
+ public int getCurrentRcvWindow()
+ {
+ return rcvWindow;
+ }
+
+ /**
+ * Get the maximum send window size.
+ *
+ * @return The maximum send window size.
+ */
+ public int getMaxSendWindow()
+ {
+ return maxSendWindow;
+ }
+
+ /**
+ * Get the current send window size.
+ *
+ * @return The current send window size.
+ */
+ public int getCurrentSendWindow()
+ {
+ if (connected)
+ return sendWindow.availablePermits();
+ else
+ return 0;
+ }
}
diff --git a/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java b/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java
index 5cd6738..2767579 100644
--- a/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/ChangelogStartMessage.java
@@ -46,15 +46,19 @@
private String serverURL;
private ServerState serverState;
+ private int windowSize;
+
/**
* Create a ChangelogStartMessage.
*
* @param serverId changelog server id
* @param serverURL changelog server URL
* @param baseDn base DN for which the ChangelogStartMessage is created.
+ * @param windowSize The window size.
* @param serverState our ServerState for this baseDn.
*/
public ChangelogStartMessage(short serverId, String serverURL, DN baseDn,
+ int windowSize,
ServerState serverState)
{
this.serverId = serverId;
@@ -63,6 +67,7 @@
this.baseDn = baseDn.toNormalizedString();
else
this.baseDn = null;
+ this.windowSize = windowSize;
this.serverState = serverState;
}
@@ -76,7 +81,7 @@
public ChangelogStartMessage(byte[] in) throws DataFormatException
{
/* The ChangelogStartMessage is encoded in the form :
- * <baseDn><ServerId><ServerUrl><ServerState>
+ * <baseDn><ServerId><ServerUrl><windowsize><ServerState>
*/
try
{
@@ -108,6 +113,13 @@
pos += length +1;
/*
+ * read the window size
+ */
+ length = getNextLength(in, pos);
+ windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
+ /*
* read the ServerState
*/
serverState = new ServerState(in, pos, in.length-1);
@@ -179,16 +191,18 @@
public byte[] getBytes()
{
/* The ChangelogStartMessage is stored in the form :
- * <operation type><basedn><serverid><serverURL><serverState>
+ * <operation type><basedn><serverid><serverURL><windowsize><serverState>
*/
try {
byte[] byteDn = baseDn.getBytes("UTF-8");
byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
byte[] byteServerUrl = serverURL.getBytes("UTF-8");
byte[] byteServerState = serverState.getBytes();
+ byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 + byteServerState.length + 1;
+ byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+ byteServerState.length + 1;
byte[] resultByteArray = new byte[length];
@@ -205,6 +219,9 @@
/* put the ServerURL */
pos = addByteArray(byteServerUrl, resultByteArray, pos);
+ /* put the window size */
+ pos = addByteArray(byteWindowSize, resultByteArray, pos);
+
/* put the ServerState */
pos = addByteArray(byteServerState, resultByteArray, pos);
@@ -215,4 +232,14 @@
return null;
}
}
+
+ /**
+ * get the window size for the server that created this message.
+ *
+ * @return The window size for the server that created this message.
+ */
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
}
diff --git a/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java b/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
index d354f23..7369a49 100644
--- a/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
+++ b/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
@@ -446,7 +446,7 @@
// shutdown the Changelog Service if necessary
if (changelog != null)
- Changelog.shutdown();
+ changelog.shutdown();
}
/**
diff --git a/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java b/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java
index c9fa051..a93d6ee 100644
--- a/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/ServerStartMessage.java
@@ -52,6 +52,7 @@
private int maxSendQueue;
private int maxReceiveDelay;
private int maxSendDelay;
+ private int windowSize;
private ServerState serverState = null;
/**
@@ -64,11 +65,13 @@
* @param maxReceiveQueue The max receive Queue for this server.
* @param maxSendDelay The max Send Delay from this server.
* @param maxSendQueue The max send Queue from this server.
+ * @param windowSize The window size used by this server.
* @param serverState The state of this server.
*/
public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
int maxReceiveQueue, int maxSendDelay,
- int maxSendQueue, ServerState serverState)
+ int maxSendQueue, int windowSize,
+ ServerState serverState)
{
this.serverId = serverId;
this.baseDn = baseDn.toString();
@@ -77,6 +80,7 @@
this.maxSendDelay = maxSendDelay;
this.maxSendQueue = maxSendQueue;
this.serverState = serverState;
+ this.windowSize = windowSize;
try
{
@@ -100,7 +104,7 @@
{
/* The ServerStartMessage is encoded in the form :
* <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
- * <maxSendDelay><maxSendQueue><ServerState>
+ * <maxSendDelay><maxSendQueue><window><ServerState>
*/
try
{
@@ -161,6 +165,13 @@
pos += length +1;
/*
+ * read the windowSize
+ */
+ length = getNextLength(in, pos);
+ windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
+ /*
* read the ServerState
*/
serverState = new ServerState(in, pos, in.length-1);
@@ -269,7 +280,7 @@
/*
* ServerStartMessage contains.
* <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
- * <maxSendDelay><maxSendQueue><ServerState>
+ * <maxSendDelay><maxSendQueue><windowsize><ServerState>
*/
try {
byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -283,6 +294,8 @@
String.valueOf(maxSendDelay).getBytes("UTF-8");
byte[] byteMaxSendQueue =
String.valueOf(maxSendQueue).getBytes("UTF-8");
+ byte[] byteWindowSize =
+ String.valueOf(windowSize).getBytes("UTF-8");
byte[] byteServerState = serverState.getBytes();
int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
@@ -291,6 +304,7 @@
byteMaxRecvQueue.length + 1 +
byteMaxSendDelay.length + 1 +
byteMaxSendQueue.length + 1 +
+ byteWindowSize.length + 1 +
byteServerState.length + 1;
byte[] resultByteArray = new byte[length];
@@ -313,6 +327,8 @@
pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
+ pos = addByteArray(byteWindowSize, resultByteArray, pos);
+
pos = addByteArray(byteServerState, resultByteArray, pos);
return resultByteArray;
@@ -322,4 +338,14 @@
return null;
}
}
+
+ /**
+ * Get the window size for the ldap server that created the message.
+ *
+ * @return The window size for the ldap server that created the message.
+ */
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
}
diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
index 7b401b2..766359b 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -138,6 +138,7 @@
static String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay";
static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
static String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
+ static String WINDOW_SIZE = "ds-cfg-window-size";
private static final StringConfigAttribute changelogStub =
new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
@@ -275,6 +276,20 @@
configAttributes.add(maxSendDelayAttr);
}
+ Integer window;
+ IntegerConfigAttribute windowStub =
+ new IntegerConfigAttribute(WINDOW_SIZE, "window size",
+ false, false, false, true, 0, false, 0);
+ IntegerConfigAttribute windowAttr =
+ (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub);
+ if (windowAttr == null)
+ window = 100; // Attribute is not present : use the default value
+ else
+ {
+ window = windowAttr.activeIntValue();
+ configAttributes.add(windowAttr);
+ }
+
configDn = configEntry.getDN();
DirectoryServer.registerConfigurableComponent(this);
@@ -292,7 +307,7 @@
try
{
broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
- maxReceiveDelay, maxSendQueue, maxSendDelay);
+ maxReceiveDelay, maxSendQueue, maxSendDelay, window);
synchronized (broker)
{
broker.start(changelogServers);
@@ -882,10 +897,7 @@
*/
public int getPendingUpdatesCount()
{
- synchronized (pendingChanges)
- {
- return pendingChanges.size();
- }
+ return pendingChanges.size();
}
/**
@@ -1662,4 +1674,44 @@
}
return true;
}
+
+ /**
+ * Get the maximum receive window size.
+ *
+ * @return The maximum receive window size.
+ */
+ public int getMaxRcvWindow()
+ {
+ return broker.getMaxRcvWindow();
+ }
+
+ /**
+ * Get the current receive window size.
+ *
+ * @return The current receive window size.
+ */
+ public int getCurrentRcvWindow()
+ {
+ return broker.getCurrentRcvWindow();
+ }
+
+ /**
+ * Get the maximum send window size.
+ *
+ * @return The maximum send window size.
+ */
+ public int getMaxSendWindow()
+ {
+ return broker.getMaxSendWindow();
+ }
+
+ /**
+ * Get the current send window size.
+ *
+ * @return The current send window size.
+ */
+ public int getCurrentSendWindow()
+ {
+ return broker.getCurrentSendWindow();
+ }
}
diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java b/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java
index bc9e663..800bf39 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationMessage.java
@@ -46,6 +46,7 @@
static final byte MSG_TYPE_ACK = 5;
static final byte MSG_TYPE_SERVER_START = 6;
static final byte MSG_TYPE_CHANGELOG_START = 7;
+ static final byte MSG_TYPE_WINDOW = 8;
/**
* Do the processing necessary when the message is received.
@@ -106,6 +107,9 @@
case MSG_TYPE_CHANGELOG_START:
msg = new ChangelogStartMessage(buffer);
break;
+ case MSG_TYPE_WINDOW:
+ msg = new WindowMessage(buffer);
+ break;
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java b/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java
index f25a05d..c59ecec 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationMonitor.java
@@ -98,64 +98,37 @@
attributes.add(attr);
/* get number of received updates */
- final String ATTR_UPDATE_RECVD = "received-updates";
- AttributeType type =
- DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_RECVD);
- LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(type,
- String.valueOf(domain.getNumRcvdUpdates())));
- attr = new Attribute(type, "received-updates", values);
- attributes.add(attr);
+ addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates());
/* get number of updates sent */
- final String ATTR_UPDATE_SENT = "sent-updates";
- type = DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_SENT);
- values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(type,
- String.valueOf(domain.getNumSentUpdates())));
- attr = new Attribute(type, "sent-updates", values);
- attributes.add(attr);
+ addMonitorData(attributes, "sent-updates", domain.getNumSentUpdates());
/* get number of changes in the pending list */
- final String ATTR_UPDATE_PENDING = "pending-updates";
- type = DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_PENDING);
- values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(type,
- String.valueOf(domain.getPendingUpdatesCount())));
- attr = new Attribute(type, "pending-updates", values);
- attributes.add(attr);
+ addMonitorData(attributes, "pending-updates",
+ domain.getPendingUpdatesCount());
/* get number of changes replayed */
- final String ATTR_REPLAYED_UPDATE = "replayed-updates";
- type = DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE);
- values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(type,
- String.valueOf(domain.getNumProcessedUpdates())));
- attr = new Attribute(type, ATTR_REPLAYED_UPDATE, values);
- attributes.add(attr);
+ addMonitorData(attributes, "replayed-updates",
+ domain.getNumProcessedUpdates());
/* get number of changes successfully */
- final String ATTR_REPLAYED_UPDATE_OK = "replayed-updates-ok";
- type = DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE_OK);
- values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(type,
- String.valueOf(domain.getNumReplayedPostOpCalled())));
- attr = new Attribute(type, ATTR_REPLAYED_UPDATE_OK, values);
- attributes.add(attr);
+ addMonitorData(attributes, "replayed-updates-ok",
+ domain.getNumReplayedPostOpCalled());
- /* get debugCount */
- final String DEBUG_COUNT = "debug-count";
- type = DirectoryServer.getDefaultAttributeType(DEBUG_COUNT);
- values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(type,
- String.valueOf(domain.getDebugCount())));
- attr = new Attribute(type, DEBUG_COUNT, values);
- attributes.add(attr);
+ /* get window information */
+ addMonitorData(attributes, "max-rcv-window", domain.getMaxRcvWindow());
+ addMonitorData(attributes, "current-rcv-window",
+ domain.getCurrentRcvWindow());
+ addMonitorData(attributes, "max-send-window",
+ domain.getMaxSendWindow());
+ addMonitorData(attributes, "current-send-window",
+ domain.getCurrentSendWindow());
/* get the Server State */
final String ATTR_SERVER_STATE = "server-state";
- type = DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
- values = new LinkedHashSet<AttributeValue>();
+ AttributeType type =
+ DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
+ LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
for (String str : domain.getServerState().toStringSet())
{
values.add(new AttributeValue(type,str));
@@ -168,6 +141,27 @@
}
/**
+ * Add an attribute with an integer value to the list of monitoring
+ * attributes.
+ *
+ * @param attributes the list of monitoring attributes
+ * @param name the name of the attribute to add.
+ * @param value The integer value of he attribute to add.
+ */
+ private void addMonitorData(ArrayList<Attribute> attributes,
+ String name, int value)
+ {
+ Attribute attr;
+ AttributeType type;
+ LinkedHashSet<AttributeValue> values;
+ type = DirectoryServer.getDefaultAttributeType(name);
+ values = new LinkedHashSet<AttributeValue>();
+ values.add(new AttributeValue(type, String.valueOf(value)));
+ attr = new Attribute(type, name, values);
+ attributes.add(attr);
+ }
+
+ /**
* Retrieves the length of time in milliseconds that should elapse between
* calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero
* return value indicates that the <CODE>updateMonitorData()</CODE> method
diff --git a/opends/src/server/org/opends/server/synchronization/WindowMessage.java b/opends/src/server/org/opends/server/synchronization/WindowMessage.java
new file mode 100644
index 0000000..868bd12
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/WindowMessage.java
@@ -0,0 +1,141 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+
+/**
+ * This message is used by LDAP server when they first connect.
+ * to a changelog server to let them know who they are and what is their state
+ * (their RUV)
+ */
+public class WindowMessage extends SynchronizationMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = 8442267608764026867L;
+ private final int numAck;
+
+
+ /**
+ * Create a new WindowMessage.
+ *
+ * @param numAck The number of acknowledged messages.
+ * The window will be increase by this number.
+ */
+ public WindowMessage(int numAck)
+ {
+ this.numAck = numAck;
+ }
+
+ /**
+ * Creates a new WindowMessage from its encoded form.
+ *
+ * @param in The byte array containing the encoded form of the
+ * WindowMessage.
+ * @throws DataFormatException If the byte array does not contain a valid
+ * encoded form of the WindowMessage.
+ */
+ public WindowMessage(byte[] in) throws DataFormatException
+ {
+ /* The WindowMessage is encoded in the form :
+ * <numAck>
+ */
+ try
+ {
+ /* first byte is the type */
+ if (in[0] != MSG_TYPE_WINDOW)
+ throw new DataFormatException("input is not a valid Window Message");
+ int pos = 1;
+
+ /*
+ * read the number of acks contained in this message.
+ * first calculate the length then construct the string
+ */
+ int length = getNextLength(in, pos);
+ String numAckStr = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+ numAck = Integer.parseInt(numAckStr);
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ /*
+ * WindowMessage contains.
+ * <numAck>
+ */
+ try {
+ byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
+
+ int length = 1 + byteNumAck.length + 1;
+
+ byte[] resultByteArray = new byte[length];
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_WINDOW;
+ int pos = 1;
+
+ pos = addByteArray(byteNumAck, resultByteArray, pos);
+
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+
+
+ /**
+ * Get the number of message acknowledged by the Window Message.
+ *
+ * @return the number of message acknowledged by the Window Message.
+ */
+ public int getNumAck()
+ {
+ return numAck;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public UpdateMessage processReceive(SynchronizationDomain domain)
+ {
+ return null;
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 2494e27..c534159 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -70,6 +70,14 @@
"org.opends.server.BuildRoot";
/**
+ * The name of the system property that specifies the ldap port.
+ * Set this prtoperty when running the server if you want to use a given
+ * port number, otherwise a port is choosed randomly at test startup time.
+ */
+ public static final String PROPERTY_LDAP_PORT =
+ "org.opends.server.LdapPort";
+
+ /**
* The string representation of the DN that will be used as the base entry for
* the test backend. This must not be changed, as there are a number of test
* cases that depend on this specific value of "o=test".
@@ -205,8 +213,17 @@
ServerSocket serverJmxSocket = null;
ServerSocket serverLdapsSocket = null;
- serverLdapSocket = bindFreePort();
- serverLdapPort = serverLdapSocket.getLocalPort();
+ String ldapPort = System.getProperty(PROPERTY_LDAP_PORT);
+ if (ldapPort == null)
+ {
+ serverLdapSocket = bindFreePort();
+ serverLdapPort = serverLdapSocket.getLocalPort();
+ }
+ else
+ {
+ serverLdapPort = Integer.valueOf(ldapPort);
+ serverLdapSocket = bindPort(serverLdapPort);
+ }
serverJmxSocket = bindFreePort();
serverJmxPort = serverJmxSocket.getLocalPort();
@@ -263,6 +280,23 @@
}
/**
+ * Binds to the given socket port on the local host.
+ * @return the bounded Server socket.
+ *
+ * @throws IOException in case of underlying exception.
+ * @throws SocketException in case of underlying exception.
+ */
+ private static ServerSocket bindPort(int port)
+ throws IOException, SocketException
+ {
+ ServerSocket serverLdapSocket;
+ serverLdapSocket = new ServerSocket();
+ serverLdapSocket.setReuseAddress(true);
+ serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port));
+ return serverLdapSocket;
+ }
+
+ /**
* Find and binds to a free server socket port on the local host.
* @return the bounded Server socket.
*
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
new file mode 100644
index 0000000..e0a952e
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -0,0 +1,494 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization;
+
+import static org.opends.server.loggers.Error.logError;
+import static org.testng.Assert.*;
+
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.config.ConfigEntry;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.core.Operation;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPException;
+import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.OperationType;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchScope;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test the contructors, encoders and decoders of the synchronization AckMsg,
+ * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
+ */
+public class ProtocolWindowTest
+{
+ private static final int WINDOW_SIZE = 10;
+
+ private static final String SYNCHRONIZATION_STRESS_TEST =
+ "Synchronization Stress Test";
+
+ /**
+ * The internal connection used for operation
+ */
+ private InternalClientConnection connection;
+
+ /**
+ * Created entries that need to be deleted for cleanup
+ */
+ private ArrayList<Entry> entryList = new ArrayList<Entry>();
+
+ /**
+ * The Synchronization config manager entry
+ */
+ private String synchroStringDN;
+
+ /**
+ * The synchronization plugin entry
+ */
+ private String synchroPluginStringDN;
+
+ private Entry synchroPluginEntry;
+
+ /**
+ * The Server synchro entry
+ */
+ private String synchroServerStringDN;
+
+ private Entry synchroServerEntry;
+
+ /**
+ * The Change log entry
+ */
+ private String changeLogStringDN;
+
+ private Entry changeLogEntry;
+
+ /**
+ * A "person" entry
+ */
+ private Entry personEntry;
+
+ /**
+ * schema check flag
+ */
+ private boolean schemaCheck;
+
+ // WORKAROUND FOR BUG #639 - BEGIN -
+ /**
+ *
+ */
+ MultimasterSynchronization mms;
+
+ // WORKAROUND FOR BUG #639 - END -
+
+ /**
+ * Test the window mechanism by :
+ * - creating a Changelog service client using the ChangelogBroker class.
+ * - set a small window size.
+ * - perform more than the window size operations.
+ * - check that the Changelog has not sent more than window size operations.
+ * - receive all messages from the ChangelogBroker, check that
+ * the client receives the correct number of operations.
+ */
+ @Test(enabled=true, groups="slow")
+ public void saturateAndRestart() throws Exception
+ {
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1);
+
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+ cleanEntries();
+
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 13);
+
+ try {
+
+ /* Test that changelog monitor and synchro plugin monitor informations
+ * publish the correct window size.
+ * This allows both the check the monitoring code and to test that
+ * configuration is working.
+ */
+ Thread.sleep(1500);
+ assertTrue(checkWindows(WINDOW_SIZE));
+
+ // Create an Entry (add operation) that will be later used in the test.
+ Entry tmp = personEntry.duplicate();
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, tmp.getDN(),
+ tmp.getObjectClasses(), tmp.getUserAttributes(),
+ tmp.getOperationalAttributes());
+ addOp.run();
+ entryList.add(personEntry);
+ assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+ "The Add Entry operation failed");
+
+ // Check if the client has received the msg
+ SynchronizationMessage msg = broker.receive();
+ assertTrue(msg instanceof AddMsg,
+ "The received synchronization message is not an ADD msg");
+ AddMsg addMsg = (AddMsg) msg;
+
+ Operation receivedOp = addMsg.createOperation(connection);
+ assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+ "The received synchronization message is not an ADD msg");
+
+ assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+ "The received ADD synchronization message is not for the excepted DN");
+
+ // send twice the window modify operations
+ int count = WINDOW_SIZE * 2;
+ processModify(count);
+
+ // let some time to the message to reach the changelog client
+ Thread.sleep(500);
+
+ // check that the changelog only sent WINDOW_SIZE messages
+ assertTrue(searchUpdateSent());
+
+ int rcvCount=0;
+ try
+ {
+ while (true)
+ {
+ broker.receive();
+ rcvCount++;
+ }
+ }
+ catch (SocketTimeoutException e)
+ {}
+ /*
+ * check that we received all updates
+ */
+ assertEquals(rcvCount, WINDOW_SIZE*2);
+ }
+ finally {
+ broker.stop();
+ DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
+ }
+ }
+
+ /**
+ * Check that the window configuration has been successfull
+ * by reading the monitoring information and checking
+ * that we do have 2 entries with the configured max-rcv-window.
+ */
+ private boolean checkWindows(int windowSize) throws LDAPException
+ {
+ InternalSearchOperation op = connection.processSearch(
+ new ASN1OctetString("cn=monitor"),
+ SearchScope.WHOLE_SUBTREE,
+ LDAPFilter.decode("(max-rcv-window=" + windowSize + ")"));
+ assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+ return (op.getEntriesSent() == 3);
+ }
+
+ /**
+ * Search that the changelog has stopped sending changes after
+ * having reach the limit of the window size.
+ * Do this by checking the monitoring information.
+ */
+ private boolean searchUpdateSent() throws Exception
+ {
+ InternalSearchOperation op = connection.processSearch(
+ new ASN1OctetString("cn=monitor"),
+ SearchScope.WHOLE_SUBTREE,
+ LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")"));
+ assertEquals(op.getResultCode(), ResultCode.SUCCESS);
+ return (op.getEntriesSent() == 1);
+ }
+
+ /**
+ * Set up the environment for performing the tests in this Class.
+ * synchronization
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ // This test suite depends on having the schema available.
+ TestCaseUtils.startServer();
+
+ // Disable schema check
+ schemaCheck = DirectoryServer.checkSchema();
+ DirectoryServer.setCheckSchema(false);
+
+ // Create an internal connection
+ connection = new InternalClientConnection();
+
+ // Create backend top level entries
+ String[] topEntries = new String[2];
+ topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
+ + "objectClass: domain\n";
+ topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
+ + "objectClass: organizationalUnit\n"
+ + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
+ Entry entry;
+ for (int i = 0; i < topEntries.length; i++)
+ {
+ entry = TestCaseUtils.entryFromLdifString(topEntries[i]);
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+ entry.getUserAttributes(), entry.getOperationalAttributes());
+ addOp.setInternalOperation(true);
+ addOp.run();
+ entryList.add(entry);
+ }
+
+ // top level synchro provider
+ synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+ // Multimaster Synchro plugin
+ synchroPluginStringDN = "cn=Multimaster Synchronization, "
+ + synchroStringDN;
+ String synchroPluginLdif = "dn: "
+ + synchroPluginStringDN
+ + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider\n"
+ + "ds-cfg-synchronization-provider-enabled: true\n"
+ + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
+ synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
+
+ // Change log
+ changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
+ String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
+ + "ds-cfg-changelog-server-id: 1\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE;
+ changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
+
+ // suffix synchronized
+ synchroServerStringDN = "cn=example, " + synchroPluginStringDN;
+ String synchroServerLdif = "dn: " + synchroServerStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider-config\n"
+ + "cn: example\n"
+ + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
+ + "ds-cfg-changelog-server: localhost:8989\n"
+ + "ds-cfg-directory-server-id: 1\n"
+ + "ds-cfg-receive-status: true\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE;
+ synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+
+ String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+ + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n";
+ personEntry = TestCaseUtils.entryFromLdifString(personLdif);
+
+ configureSynchronization();
+ }
+
+ /**
+ * Clean up the environment. return null;
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @AfterClass
+ public void classCleanUp() throws Exception
+ {
+ DirectoryServer.setCheckSchema(schemaCheck);
+
+ // WORKAROUND FOR BUG #639 - BEGIN -
+ DirectoryServer.deregisterSynchronizationProvider(mms);
+ mms.finalizeSynchronizationProvider();
+ // WORKAROUND FOR BUG #639 - END -
+
+ cleanEntries();
+ }
+
+ /**
+ * suppress all the entries created by the tests in this class
+ */
+ private void cleanEntries()
+ {
+ DeleteOperation op;
+ // Delete entries
+ Entry entries[] = entryList.toArray(new Entry[0]);
+ for (int i = entries.length - 1; i != 0; i--)
+ {
+ try
+ {
+ op = new DeleteOperation(connection, InternalClientConnection
+ .nextOperationID(), InternalClientConnection.nextMessageID(), null,
+ entries[i].getDN());
+ op.run();
+ } catch (Exception e)
+ {
+ }
+ }
+ }
+
+ /**
+ * @return
+ */
+ private List<Modification> generatemods(String attrName, String attrValue)
+ {
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
+ LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+ values.add(new AttributeValue(attrType, attrValue));
+ Attribute attr = new Attribute(attrType, attrName, values);
+ List<Modification> mods = new ArrayList<Modification>();
+ Modification mod = new Modification(ModificationType.REPLACE, attr);
+ mods.add(mod);
+ return mods;
+ }
+
+ /**
+ * Open a changelog session to the local Changelog server.
+ *
+ */
+ private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
+ throws Exception, SocketException
+ {
+ ServerState state = new ServerState(baseDn);
+ state.loadState();
+ ChangelogBroker broker =
+ new ChangelogBroker(state, baseDn, serverId, 0, 0, 0, 0, WINDOW_SIZE);
+ ArrayList<String> servers = new ArrayList<String>(1);
+ servers.add("localhost:8989");
+ broker.start(servers);
+ broker.setSoTimeout(5000);
+ /*
+ * loop receiving update until there is nothing left
+ * to make sure that message from previous tests have been consumed.
+ */
+ try
+ {
+ while (true)
+ {
+ broker.receive();
+ }
+ }
+ catch (Exception e)
+ { }
+ return broker;
+ }
+
+ /**
+ * Configure the Synchronization for this test.
+ */
+ private void configureSynchronization() throws Exception
+ {
+ //
+ // Add the Multimaster synchronization plugin
+ DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
+ entryList.add(synchroPluginEntry);
+ assertNotNull(DirectoryServer.getConfigEntry(DN
+ .decode(synchroPluginStringDN)),
+ "Unable to add the Multimaster synchronization plugin");
+
+ // WORKAROUND FOR BUG #639 - BEGIN -
+ DN dn = DN.decode(synchroPluginStringDN);
+ ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
+ mms = new MultimasterSynchronization();
+ try
+ {
+ mms.initializeSynchronizationProvider(mmsConfigEntry);
+ }
+ catch (ConfigException e)
+ {
+ assertTrue(false,
+ "Unable to initialize the Multimaster synchronization plugin");
+ }
+ DirectoryServer.registerSynchronizationProvider(mms);
+ // WORKAROUND FOR BUG #639 - END -
+
+ //
+ // Add the changelog server
+ DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
+ "Unable to add the changeLog server");
+ entryList.add(changeLogEntry);
+
+ //
+ // We also have a replicated suffix (synchronization domain)
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the syncrhonized server");
+ entryList.add(synchroServerEntry);
+ }
+
+ private void processModify(int count)
+ {
+ while (count>0)
+ {
+ count--;
+ // must generate the mods for every operation because they are modified
+ // by processModify.
+ List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+
+ ModifyOperation modOp =
+ connection.processModify(personEntry.getDN(), mods);
+ assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
+ }
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
index d553104..f73ca4b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -27,6 +27,7 @@
package org.opends.server.synchronization;
+import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -53,6 +54,8 @@
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
@@ -139,96 +142,100 @@
@Test(enabled=true, groups="slow")
public void fromServertoBroker() throws Exception
{
-
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting Synchronization StressTest : fromServertoBroker" , 1);
+
final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
final int TOTAL_MESSAGES = 1000;
cleanEntries();
- ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 18);
DirectoryServer.registerMonitorProvider(this);
try {
- /*
- * loop receiving update until there is nothing left
- * to make sure that message from previous tests have been consumed.
- */
- try
- {
- while (true)
+ /*
+ * loop receiving update until there is nothing left
+ * to make sure that message from previous tests have been consumed.
+ */
+ try
{
- broker.receive();
+ while (true)
+ {
+ broker.receive();
+ }
}
- }
- catch (Exception e)
- { }
- /*
- * Test that operations done on this server are sent to the
- * changelog server and forwarded to our changelog broker session.
- */
+ catch (Exception e)
+ { }
+ /*
+ * Test that operations done on this server are sent to the
+ * changelog server and forwarded to our changelog broker session.
+ */
- // Create an Entry (add operation) that will be later used in the test.
- Entry tmp = personEntry.duplicate();
- AddOperation addOp = new AddOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, tmp.getDN(),
- tmp.getObjectClasses(), tmp.getUserAttributes(),
- tmp.getOperationalAttributes());
- addOp.run();
- entryList.add(personEntry);
- assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
- "The Add Entry operation failed");
+ // Create an Entry (add operation) that will be later used in the test.
+ Entry tmp = personEntry.duplicate();
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, tmp.getDN(),
+ tmp.getObjectClasses(), tmp.getUserAttributes(),
+ tmp.getOperationalAttributes());
+ addOp.run();
+ entryList.add(personEntry);
+ assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+ "The Add Entry operation failed");
- // Check if the client has received the msg
- SynchronizationMessage msg = broker.receive();
- assertTrue(msg instanceof AddMsg,
- "The received synchronization message is not an ADD msg");
- AddMsg addMsg = (AddMsg) msg;
+ // Check if the client has received the msg
+ SynchronizationMessage msg = broker.receive();
+ assertTrue(msg instanceof AddMsg,
+ "The received synchronization message is not an ADD msg");
+ AddMsg addMsg = (AddMsg) msg;
- Operation receivedOp = addMsg.createOperation(connection);
- assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
- "The received synchronization message is not an ADD msg");
+ Operation receivedOp = addMsg.createOperation(connection);
+ assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+ "The received synchronization message is not an ADD msg");
- assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
- "The received ADD synchronization message is not for the excepted DN");
+ assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+ "The received ADD synchronization message is not for the excepted DN");
- reader = new BrokerReader(broker);
- reader.start();
+ reader = new BrokerReader(broker);
+ reader.start();
- long startTime = TimeThread.getTime();
- int count = TOTAL_MESSAGES;
+ long startTime = TimeThread.getTime();
+ int count = TOTAL_MESSAGES;
- // Create a number of writer thread that will loop modifying the entry
- List<Thread> writerThreadList = new LinkedList<Thread>();
- for (int n = 0; n < 1; n++)
- {
- BrokerWriter writer = new BrokerWriter(count);
- writerThreadList.add(writer);
- }
- for (Thread thread : writerThreadList)
- {
- thread.start();
- }
- // wait for all the threads to finish.
- for (Thread thread : writerThreadList)
- {
- thread.join();
- }
+ // Create a number of writer thread that will loop modifying the entry
+ List<Thread> writerThreadList = new LinkedList<Thread>();
+ for (int n = 0; n < 1; n++)
+ {
+ BrokerWriter writer = new BrokerWriter(count);
+ writerThreadList.add(writer);
+ }
+ for (Thread thread : writerThreadList)
+ {
+ thread.start();
+ }
+ // wait for all the threads to finish.
+ for (Thread thread : writerThreadList)
+ {
+ thread.join();
+ }
- long afterSendTime = TimeThread.getTime();
+ long afterSendTime = TimeThread.getTime();
- int rcvCount = reader.getCount();
- long afterReceiveTime = TimeThread.getTime();
+ int rcvCount = reader.getCount();
+
+ long afterReceiveTime = TimeThread.getTime();
- if (rcvCount != TOTAL_MESSAGES)
- {
- fail("some messages were lost : expected : " +TOTAL_MESSAGES +
- " received : " + rcvCount);
- }
+ if (rcvCount != TOTAL_MESSAGES)
+ {
+ fail("some messages were lost : expected : " +TOTAL_MESSAGES +
+ " received : " + rcvCount);
+ }
}
finally {
- DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
- broker.stop();
+ DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
+ broker.stop();
}
}
@@ -393,7 +400,7 @@
ServerState state = new ServerState(baseDn);
state.loadState();
ChangelogBroker broker = new ChangelogBroker(state, baseDn,
- serverId, 0, 0, 0, 0);
+ serverId, 0, 0, 0, 0, 100);
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:8989");
broker.start(servers);
@@ -441,7 +448,7 @@
// We also have a replicated suffix (synchronization domain)
DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
- "Unable to add the syncrhonized server");
+ "Unable to add the synchronized server");
entryList.add(synchroServerEntry);
}
@@ -553,7 +560,9 @@
{
while (true)
{
- broker.receive();
+ SynchronizationMessage msg = broker.receive();
+ if (msg == null)
+ break;
count ++;
}
} catch (Exception e) {
@@ -577,7 +586,7 @@
return count;
try
{
- this.wait();
+ this.wait(60);
return count;
} catch (InterruptedException e)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
index 5df7ff9..8ccc759 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
@@ -438,6 +438,73 @@
// Check that retrieved CN is OK
msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes());
}
+
+ @DataProvider(name="serverStart")
+ public Object [][] createServerStartMessageTestData() throws Exception
+ {
+ DN baseDN = DN.decode("dc=example, dc=com");
+ ServerState state = new ServerState(baseDN);
+ return new Object [][] { {(short)1, baseDN, 100, state} };
+ }
+ /**
+ * Test that ServerStartMessage encoding and decoding works
+ * by checking that : msg == new ServerStartMessage(msg.getBytes()).
+ */
+ @Test(dataProvider="serverStart")
+ public void ServerStartMessageTest(short serverId, DN baseDN, int window,
+ ServerState state) throws Exception
+ {
+ state.update(new ChangeNumber((long)1, 1,(short)1));
+ ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
+ window, window, window, window, window, state);
+ ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
+ assertEquals(msg.getServerId(), newMsg.getServerId());
+ assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+ assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+ assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+ assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
+ newMsg.getServerState().getMaxChangeNumber((short)1));
+ }
+
+ @DataProvider(name="changelogStart")
+ public Object [][] createChangelogStartMessageTestData() throws Exception
+ {
+ DN baseDN = DN.decode("dc=example, dc=com");
+ ServerState state = new ServerState(baseDN);
+ return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} };
+ }
+
+ /**
+ * Test that changelogStartMessage encoding and decoding works
+ * by checking that : msg == new ChangelogStartMessage(msg.getBytes()).
+ */
+ @Test(dataProvider="changelogStart")
+ public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
+ String url, ServerState state) throws Exception
+ {
+ state.update(new ChangeNumber((long)1, 1,(short)1));
+ ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
+ url, baseDN, window, state);
+ ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes());
+ assertEquals(msg.getServerId(), newMsg.getServerId());
+ assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+ assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+ assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+ assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
+ newMsg.getServerState().getMaxChangeNumber((short)1));
+ }
+
+ /**
+ * Test that WindowMessageTest encoding and decoding works
+ * by checking that : msg == new WindowMessageTest(msg.getBytes()).
+ */
+ @Test()
+ public void WindowMessageTest() throws Exception
+ {
+ WindowMessage msg = new WindowMessage(123);
+ WindowMessage newMsg = new WindowMessage(msg.getBytes());
+ assertEquals(msg.getNumAck(), newMsg.getNumAck());
+ }
/**
* Test PendingChange
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index 9345b82..245cf7b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -27,6 +27,7 @@
package org.opends.server.synchronization;
+import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.*;
import java.net.SocketException;
@@ -325,6 +326,10 @@
@Test(enabled=true)
public void namingConflicts() throws Exception
{
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting synchronization test : namingConflicts" , 1);
+
final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
/*
@@ -635,193 +640,198 @@
@Test(enabled=true, dataProvider="assured")
public void updateOperations(boolean assured) throws Exception
{
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting synchronization test : updateOperations " + assured , 1);
+
final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
cleanEntries();
+
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 27);
+ try {
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
- ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
- ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
-
- /*
- * loop receiving update until there is nothing left
- * to make sure that message from previous tests have been consumed.
- */
- // broker.setSoTimeout(100);
- try
- {
- while (true)
+ /*
+ * loop receiving update until there is nothing left
+ * to make sure that message from previous tests have been consumed.
+ */
+ try
{
- broker.receive();
+ while (true)
+ {
+ broker.receive();
+ }
}
- }
- catch (Exception e)
- {
- // broker.setSoTimeout(1000);
- }
- /*
- * Test that operations done on this server are sent to the
- * changelog server and forwarded to our changelog broker session.
- */
+ catch (Exception e)
+ {}
+ /*
+ * Test that operations done on this server are sent to the
+ * changelog server and forwarded to our changelog broker session.
+ */
- // Create an Entry (add operation)
- Entry tmp = personEntry.duplicate();
- AddOperation addOp = new AddOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, tmp.getDN(),
- tmp.getObjectClasses(), tmp.getUserAttributes(),
- tmp.getOperationalAttributes());
- addOp.run();
- entryList.add(personEntry);
- assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+ // Create an Entry (add operation)
+ Entry tmp = personEntry.duplicate();
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, tmp.getDN(),
+ tmp.getObjectClasses(), tmp.getUserAttributes(),
+ tmp.getOperationalAttributes());
+ addOp.run();
+ entryList.add(personEntry);
+ assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
"The Add Entry operation failed");
- // Check if the client has received the msg
- SynchronizationMessage msg = broker.receive();
- assertTrue(msg instanceof AddMsg,
+ // Check if the client has received the msg
+ SynchronizationMessage msg = broker.receive();
+ assertTrue(msg instanceof AddMsg,
"The received synchronization message is not an ADD msg");
- AddMsg addMsg = (AddMsg) msg;
+ AddMsg addMsg = (AddMsg) msg;
- Operation receivedOp = addMsg.createOperation(connection);
- assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+ Operation receivedOp = addMsg.createOperation(connection);
+ assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
"The received synchronization message is not an ADD msg");
- assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+ assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
"The received ADD synchronization message is not for the excepted DN");
- // Modify the entry
- List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+ // Modify the entry
+ List<Modification> mods = generatemods("telephonenumber", "01 02 45");
- ModifyOperation modOp = new ModifyOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, personEntry.getDN(), mods);
- modOp.setInternalOperation(true);
- modOp.run();
+ ModifyOperation modOp = new ModifyOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, personEntry.getDN(), mods);
+ modOp.setInternalOperation(true);
+ modOp.run();
- // See if the client has received the msg
- msg = broker.receive();
- assertTrue(msg instanceof ModifyMsg,
- "The received synchronization message is not a MODIFY msg");
- ModifyMsg modMsg = (ModifyMsg) msg;
+ // See if the client has received the msg
+ msg = broker.receive();
+ assertTrue(msg instanceof ModifyMsg,
+ "The received synchronization message is not a MODIFY msg");
+ ModifyMsg modMsg = (ModifyMsg) msg;
- receivedOp = modMsg.createOperation(connection);
- assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
- "The received MODIFY synchronization message is not for the excepted DN");
+ receivedOp = modMsg.createOperation(connection);
+ assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+ "The received MODIFY synchronization message is not for the excepted DN");
- // Modify the entry DN
- DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
- ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, personEntry.getDN(), RDN
- .decode("uid=new person"), true, DN
- .decode("ou=People,dc=example,dc=com"));
- modDNOp.run();
- assertNotNull(DirectoryServer.getEntry(newDN),
- "The MOD_DN operation didn't create the new person entry");
- assertNull(DirectoryServer.getEntry(personEntry.getDN()),
- "The MOD_DN operation didn't delete the old person entry");
- entryList.add(DirectoryServer.getEntry(newDN));
+ // Modify the entry DN
+ DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
+ ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, personEntry.getDN(), RDN
+ .decode("uid=new person"), true, DN
+ .decode("ou=People,dc=example,dc=com"));
+ modDNOp.run();
+ assertNotNull(DirectoryServer.getEntry(newDN),
+ "The MOD_DN operation didn't create the new person entry");
+ assertNull(DirectoryServer.getEntry(personEntry.getDN()),
+ "The MOD_DN operation didn't delete the old person entry");
+ entryList.add(DirectoryServer.getEntry(newDN));
- // See if the client has received the msg
- msg = broker.receive();
- assertTrue(msg instanceof ModifyDNMsg,
- "The received synchronization message is not a MODIFY DN msg");
- ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
- receivedOp = moddnMsg.createOperation(connection);
+ // See if the client has received the msg
+ msg = broker.receive();
+ assertTrue(msg instanceof ModifyDNMsg,
+ "The received synchronization message is not a MODIFY DN msg");
+ ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
+ receivedOp = moddnMsg.createOperation(connection);
- assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
- "The received MODIFY_DN message is not for the excepted DN");
+ assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+ "The received MODIFY_DN message is not for the excepted DN");
- // Delete the entry
- Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
- DeleteOperation delOp = new DeleteOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, DN
- .decode("uid= new person,ou=People,dc=example,dc=com"));
- delOp.run();
- assertNull(DirectoryServer.getEntry(newDN),
- "Unable to delete the new person Entry");
- entryList.remove(newPersonEntry);
+ // Delete the entry
+ Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
+ DeleteOperation delOp = new DeleteOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, DN
+ .decode("uid= new person,ou=People,dc=example,dc=com"));
+ delOp.run();
+ assertNull(DirectoryServer.getEntry(newDN),
+ "Unable to delete the new person Entry");
+ entryList.remove(newPersonEntry);
- // See if the client has received the msg
- msg = broker.receive();
- assertTrue(msg instanceof DeleteMsg,
- "The received synchronization message is not a MODIFY DN msg");
- DeleteMsg delMsg = (DeleteMsg) msg;
- receivedOp = delMsg.createOperation(connection);
- assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
- .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
- "The received DELETE message is not for the excepted DN");
+ // See if the client has received the msg
+ msg = broker.receive();
+ assertTrue(msg instanceof DeleteMsg,
+ "The received synchronization message is not a MODIFY DN msg");
+ DeleteMsg delMsg = (DeleteMsg) msg;
+ receivedOp = delMsg.createOperation(connection);
+ assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
+ .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
+ "The received DELETE message is not for the excepted DN");
- /*
- * Now check that when we send message to the Changelog server
- * and that they are received and correctly replayed by the server.
- *
- * Start by testing the Add message reception
- */
- addMsg = new AddMsg(gen.NewChangeNumber(),
- personWithUUIDEntry.getDN().toString(),
- user1entryUUID, baseUUID,
- personWithUUIDEntry.getObjectClassAttribute(),
- personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
- if (assured)
- addMsg.setAssured();
- broker.publish(addMsg);
+ /*
+ * Now check that when we send message to the Changelog server
+ * and that they are received and correctly replayed by the server.
+ *
+ * Start by testing the Add message reception
+ */
+ addMsg = new AddMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID, baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ if (assured)
+ addMsg.setAssured();
+ broker.publish(addMsg);
- /*
- * Check that the entry has been created in the local DS.
- */
- Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true);
- assertNotNull(resultEntry,
- "The send ADD synchronization message was not applied");
- entryList.add(resultEntry);
+ /*
+ * Check that the entry has been created in the local DS.
+ */
+ Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true);
+ assertNotNull(resultEntry,
+ "The send ADD synchronization message was not applied");
+ entryList.add(resultEntry);
- /*
- * Test the reception of Modify Msg
- */
- modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
- mods, user1entryUUID);
- if (assured)
- modMsg.setAssured();
- broker.publish(modMsg);
+ /*
+ * Test the reception of Modify Msg
+ */
+ modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
+ mods, user1entryUUID);
+ if (assured)
+ modMsg.setAssured();
+ broker.publish(modMsg);
- boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
- "telephonenumber", "01 02 45", 1000);
+ boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+ "telephonenumber", "01 02 45", 1000);
- if (found == false)
- fail("The modification has not been correctly replayed.");
+ if (found == false)
+ fail("The modification has not been correctly replayed.");
- /*
- * Test the Reception of Modify Dn Msg
- */
- moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
- gen.NewChangeNumber(),
- user1entryUUID, null,
- true, null, "uid= new person");
- if (assured)
- moddnMsg.setAssured();
- broker.publish(moddnMsg);
+ /*
+ * Test the Reception of Modify Dn Msg
+ */
+ moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
+ gen.NewChangeNumber(),
+ user1entryUUID, null,
+ true, null, "uid= new person");
+ if (assured)
+ moddnMsg.setAssured();
+ broker.publish(moddnMsg);
- resultEntry = getEntry(
- DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true);
+ resultEntry = getEntry(
+ DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true);
- assertNotNull(resultEntry,
- "The modify DN synchronization message was not applied");
+ assertNotNull(resultEntry,
+ "The modify DN synchronization message was not applied");
- /*
- * Test the Reception of Delete Msg
- */
- delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
- gen.NewChangeNumber(), user1entryUUID);
- if (assured)
- delMsg.setAssured();
- broker.publish(delMsg);
- resultEntry = getEntry(
+ /*
+ * Test the Reception of Delete Msg
+ */
+ delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
+ gen.NewChangeNumber(), user1entryUUID);
+ if (assured)
+ delMsg.setAssured();
+ broker.publish(delMsg);
+ resultEntry = getEntry(
DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, false);
- assertNull(resultEntry,
- "The DELETE synchronization message was not replayed");
-
- broker.stop();
+ assertNull(resultEntry,
+ "The DELETE synchronization message was not replayed");
+ }
+ finally
+ {
+ broker.stop();
+ }
}
/**
@@ -850,7 +860,7 @@
ServerState state = new ServerState(baseDn);
state.loadState();
ChangelogBroker broker = new ChangelogBroker(state, baseDn,
- serverId, 0, 0, 0, 0);
+ serverId, 0, 0, 0, 0, 100);
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:8989");
broker.start(servers);
@@ -996,6 +1006,10 @@
@Test(enabled=true)
public void deleteNoSuchObject() throws Exception
{
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting synchronization test : deleteNoSuchObject" , 1);
+
DN dn = DN.decode("cn=No Such Object,ou=People,dc=example,dc=com");
Operation op =
new DeleteOperation(connection,
@@ -1014,12 +1028,17 @@
@Test(enabled=true)
public void infiniteReplayLoop() throws Exception
{
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting synchronization test : infiniteReplayLoop" , 1);
+
final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
- ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+ Thread.sleep(2000);
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 11);
try
{
- ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
// Create a test entry.
String personLdif = "dn: uid=user.2,ou=People,dc=example,dc=com\n"
--
Gitblit v1.10.0