From 3eb214ba900a7f3e4550f6a047db26e7f40b82a8 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Tue, 17 Apr 2007 11:42:38 +0000
Subject: [PATCH] Issue 605 Total Update over protocol
---
opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFInputStream.java | 122 +
opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFOutputStream.java | 97 +
opends/src/server/org/opends/server/synchronization/protocol/ErrorMessage.java | 188 ++
opends/src/server/org/opends/server/tasks/InitializeTask.java | 267 +++
opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java | 9
opends/src/server/org/opends/server/synchronization/common/LogMessages.java | 65
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/InitOnLineTest.java | 1488 +++++++++++++++++
opends/src/server/org/opends/server/synchronization/protocol/EntryMessage.java | 142 +
opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java | 32
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java | 366 ++-
opends/src/server/org/opends/server/synchronization/protocol/InitializeTargetMessage.java | 212 ++
opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java | 27
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java | 214 ++
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java | 2
opends/resource/schema/02-config.ldif | 33
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java | 1194 +++++++++++++
opends/src/server/org/opends/server/synchronization/protocol/DoneMessage.java | 122 +
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java | 112 +
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 2
opends/src/server/org/opends/server/synchronization/protocol/InitializeRequestMessage.java | 158 +
opends/src/server/org/opends/server/messages/TaskMessages.java | 22
opends/src/server/org/opends/server/synchronization/protocol/RoutableMessage.java | 103 +
opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java | 20
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java | 2
opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java | 14
opends/src/server/org/opends/server/config/ConfigConstants.java | 56
opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 63
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java | 50
29 files changed, 5,000 insertions(+), 185 deletions(-)
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 9dba19d..517ccbb 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -1045,6 +1045,10 @@
NAME 'ds-cfg-heartbeat-interval'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306
+ NAME 'ds-cfg-changelog-db-directory'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307
NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' )
@@ -1125,6 +1129,22 @@
NAME 'ds-cfg-virtual-attribute-conflict-behavior'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332
+ NAME 'ds-task-initialize-domain-dn'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333
+ NAME 'ds-task-initialize-replica-server-id'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334
+ NAME 'ds-task-unprocessed-entry-count'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335
+ NAME 'ds-task-processed-entry-count'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.336
NAME 'ds-cfg-dictionary-file' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' )
@@ -1481,7 +1501,8 @@
'ds-cfg-synchronization-changelog-server-config' SUP top
STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
- ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
+ ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory )
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
X-ORIGIN 'OpenDS Directory Server' )
@@ -1594,6 +1615,16 @@
ds-cfg-virtual-attribute-conflict-behavior )
MAY ( ds-cfg-virtual-attribute-base-dn $ ds-cfg-virtual-attribute-group-dn $
ds-cfg-virtual-attribute-filter ) X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.92
+ NAME 'ds-task-initialize-from-remote-replica' SUP ds-task
+ MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
+ MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
+ X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.93
+ NAME 'ds-task-initialize-remote-replica' SUP ds-task
+ MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
+ MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.95
NAME 'ds-cfg-dictionary-password-validator' SUP ds-cfg-password-validator
STRUCTURAL MUST ( ds-cfg-dictionary-file $ ds-cfg-case-sensitive-validation $
diff --git a/opends/src/server/org/opends/server/config/ConfigConstants.java b/opends/src/server/org/opends/server/config/ConfigConstants.java
index 1ad5911..343d7a3 100644
--- a/opends/src/server/org/opends/server/config/ConfigConstants.java
+++ b/opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,6 +3665,62 @@
NAME_PREFIX_TASK + "import-is-encrypted";
+ /**
+ * The name of the objectclass that will be used for a Directory Server
+ * initialize task definition.
+ */
+ public static final String OC_INITIALIZE_TASK =
+ NAME_PREFIX_TASK + "initialize-from-remote-replica";
+
+ /**
+ * The name of the attribute in an initialize task definition that specifies
+ * the base dn related to the synchonization domain to initialize.
+ */
+ public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN =
+ NAME_PREFIX_TASK + "initialize-domain-dn";
+
+ /**
+ * The name of the attribute in an initialize target task definition that
+ * specifies the source in terms of source server from which to initialize.
+ */
+ public static final String ATTR_TASK_INITIALIZE_SOURCE =
+ NAME_PREFIX_TASK + "initialize-replica-server-id";
+
+ /**
+ * The name of the objectclass that will be used for a Directory Server
+ * initialize target task definition.
+ */
+ public static final String OC_INITIALIZE_TARGET_TASK =
+ NAME_PREFIX_TASK + "initialize-remote-replica";
+
+ /**
+ * The name of the attribute in an initialize target task definition that
+ * specifies the base dn related to the synchonization domain to initialize.
+ */
+ public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN =
+ NAME_PREFIX_TASK + "initialize-domain-dn";
+
+ /**
+ * The name of the attribute in an initialize target task definition that
+ * specifies the scope in terms of servers to initialize.
+ */
+ public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE =
+ NAME_PREFIX_TASK + "initialize-replica-server-id";
+
+ /**
+ * The name of the attribute in an initialize target task definition that
+ * specifies the scope in terms of servers to initialize.
+ */
+ public static final String ATTR_TASK_INITIALIZE_LEFT =
+ NAME_PREFIX_TASK + "unprocessed-entry-count";
+
+ /**
+ * The name of the attribute in an initialize target task definition that
+ * specifies the scope in terms of servers to initialize.
+ */
+ public static final String ATTR_TASK_INITIALIZE_DONE =
+ NAME_PREFIX_TASK + "processed-entry-count";
+
/**
* The name of the objectclass that will be used for a Directory Server
diff --git a/opends/src/server/org/opends/server/messages/TaskMessages.java b/opends/src/server/org/opends/server/messages/TaskMessages.java
index cab8aa7..d1577d6 100644
--- a/opends/src/server/org/opends/server/messages/TaskMessages.java
+++ b/opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,8 +211,6 @@
public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER =
CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17;
-
-
/**
* The message ID for the message that will be used an attempt is made to
* invoke the index rebuild task by a user that does not have the required
@@ -221,6 +219,20 @@
public static final int MSGID_TASK_INDEXREBUILD_INSUFFICIENT_PRIVILEGES =
CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18;
+ /**
+ * The message ID for the message that will be used when an invalid domain
+ * base DN is provided as argument to the initialize target task.
+ */
+ public static final int MSGID_TASK_INITIALIZE_TARGET_INVALID_DN =
+ CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19;
+
+ /**
+ * The message ID for the message that will be used when an invalid domain
+ * base DN is provided as argument to the initialize task.
+ */
+ public static final int MSGID_TASK_INITIALIZE_INVALID_DN =
+ CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 20;
+
/**
@@ -292,6 +304,12 @@
registerMessage(MSGID_TASK_INDEXREBUILD_INSUFFICIENT_PRIVILEGES,
"You do not have sufficient privileges to initiate an " +
"index rebuild.");
+
+ registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN,
+ "Invalid DN provided with the Initialize Target task.");
+
+ registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN,
+ "Invalid DN provided with the Initialize task.");
}
}
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 64bf887..531569c 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@
static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
- static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
+ static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index d7e0e2a..51f099e 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@
*/
package org.opends.server.synchronization.changelog;
+import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.loggers.Error.logError;
-
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import com.sleepycat.je.DatabaseException;
-import org.opends.server.types.DN;
-import org.opends.server.types.ErrorLogCategory;
-import org.opends.server.types.ErrorLogSeverity;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-
import java.io.IOException;
+import java.util.ArrayList;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.types.DN;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+
+import com.sleepycat.je.DatabaseException;
+
/**
* This class define an in-memory cache that will be used to store
* the messages that have been received from an LDAP server or
@@ -425,148 +430,263 @@
}
}
-
/**
- * Send back an ack to the server that sent the change.
+ * Retrieves the destination handlers for a routable message.
*
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
+ * @param msg The message to route.
+ * @param senderHandler The handler of the server that published this message.
+ * @return The list of destination handlers.
*/
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+ protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
+ ServerHandler senderHandler)
{
- short serverId = changeNumber.getServerId();
- sendAck(changeNumber, isLDAPserver, serverId);
- }
- /**
- *
- * Send back an ack to a server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
- * @param serverId The identifier of the server from which we
- * received the change..
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
- short serverId)
- {
- ServerHandler handler;
- if (isLDAPserver)
- handler = connectedServers.get(serverId);
- else
- handler = changelogServers.get(serverId);
+ List<ServerHandler> servers =
+ new ArrayList<ServerHandler>();
- // TODO : check for null handler and log error
- try
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "getDestinationServers"
+ + " msgDest:" + msg.getDestination() , 1);
+
+ if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
{
- handler.sendAck(changeNumber);
- } catch (IOException e)
- {
- /*
- * An error happened trying the send back an ack to this server.
- * Log an error and close the connection to this server.
- */
- int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
- String message = getMessage(msgID, this.toString())
- + stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- handler.shutdown();
+ // TODO Import from the "closest server" to be implemented
}
- }
-
- /**
- * Shutdown this ChangelogCache.
- */
- public void shutdown()
- {
- // Close session with other changelogs
- for (ServerHandler serverHandler : changelogServers.values())
+ else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
{
- serverHandler.shutdown();
- }
-
- // Close session with other LDAP servers
- for (ServerHandler serverHandler : connectedServers.values())
- {
- serverHandler.shutdown();
- }
-
- // Shutdown the dbHandlers
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
+ if (!senderHandler.isChangelogServer())
{
- dbHandler.shutdown();
+ // Send to all changelogServers
+ for (ServerHandler destinationHandler : changelogServers.values())
+ {
+ servers.add(destinationHandler);
+ }
}
- sourceDbHandlers.clear();
+
+ // Send to all connected LDAP servers
+ for (ServerHandler destinationHandler : connectedServers.values())
+ {
+ // Don't loop on the sender
+ if (destinationHandler == senderHandler)
+ continue;
+ servers.add(destinationHandler);
+ }
}
+ else
+ {
+ // Destination is one server
+ ServerHandler destinationHandler =
+ connectedServers.get(msg.getDestination());
+ if (destinationHandler != null)
+ {
+ servers.add(destinationHandler);
+ }
+ else
+ {
+ // the targeted server is NOT connected
+ if (senderHandler.isLDAPserver())
+ {
+ // let's forward to the other changelogs
+ servers.addAll(changelogServers.values());
+ }
+ }
+ }
+
+ return servers;
}
/**
- * Returns the ServerState describing the last change from this replica.
+ * Process an InitializeRequestMessage.
*
- * @return The ServerState describing the last change from this replica.
+ * @param msg The message received and to be processed.
+ * @param senderHandler The server handler of the server that emitted
+ * the message.
*/
- public ServerState getDbServerState()
+ public void process(RoutableMessage msg, ServerHandler senderHandler)
{
- ServerState serverState = new ServerState();
- for (DbHandler db : sourceDbHandlers.values())
- {
- serverState.update(db.getLastChange());
- }
- return serverState;
- }
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- return "ChangelogCache " + baseDn;
- }
+ List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
- /**
- * Check if some server Handler should be removed from flow control state.
- * @throws IOException If an error happened.
- */
- public void checkAllSaturation() throws IOException
- {
- for (ServerHandler handler : changelogServers.values())
+ if (servers.isEmpty())
{
- handler.checkWindow();
+ if (!(msg instanceof InitializeRequestMessage))
+ {
+ // TODO A more elaborated policy is probably needed
+ }
+ else
+ {
+ ErrorMessage errMsg = new ErrorMessage(
+ msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+ "serverID:" + msg.getDestination());
+
+ try
+ {
+ senderHandler.send(errMsg);
+ }
+ catch(IOException ioe)
+ {
+ // TODO Handle error properly (sender timeout in addition)
+ }
+ }
+ return;
}
- for (ServerHandler handler : connectedServers.values())
+ for (ServerHandler targetHandler : servers)
{
- handler.checkWindow();
+ try
+ {
+ targetHandler.send(msg);
+ }
+ catch(IOException ioe)
+ {
+ // TODO Handle error properly (sender timeout in addition)
+ }
}
+
}
- /**
- * Check if a server that was in flow control can now restart
- * sending updates.
- * @param sourceHandler The server that must be checked.
- * @return true if the server can restart sending changes.
- * false if the server can't restart sending changes.
- */
- public boolean restartAfterSaturation(ServerHandler sourceHandler)
- {
- for (ServerHandler handler : changelogServers.values())
+ /**
+ * Send back an ack to the server that sent the change.
+ *
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
+ */
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
{
- if (!handler.restartAfterSaturation(sourceHandler))
+ short serverId = changeNumber.getServerId();
+ sendAck(changeNumber, isLDAPserver, serverId);
+ }
+
+ /**
+ *
+ * Send back an ack to a server that sent the change.
+ *
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
+ * @param serverId The identifier of the server from which we
+ * received the change..
+ */
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+ short serverId)
+ {
+ ServerHandler handler;
+ if (isLDAPserver)
+ handler = connectedServers.get(serverId);
+ else
+ handler = changelogServers.get(serverId);
+
+ // TODO : check for null handler and log error
+ try
+ {
+ handler.sendAck(changeNumber);
+ } catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ handler.shutdown();
+ }
+ }
+
+ /**
+ * Shutdown this ChangelogCache.
+ */
+ public void shutdown()
+ {
+ // Close session with other changelogs
+ for (ServerHandler serverHandler : changelogServers.values())
+ {
+ serverHandler.shutdown();
+ }
+
+ // Close session with other LDAP servers
+ for (ServerHandler serverHandler : connectedServers.values())
+ {
+ serverHandler.shutdown();
+ }
+
+ // Shutdown the dbHandlers
+ synchronized (sourceDbHandlers)
+ {
+ for (DbHandler dbHandler : sourceDbHandlers.values())
+ {
+ dbHandler.shutdown();
+ }
+ sourceDbHandlers.clear();
+ }
+ }
+
+ /**
+ * Returns the ServerState describing the last change from this replica.
+ *
+ * @return The ServerState describing the last change from this replica.
+ */
+ public ServerState getDbServerState()
+ {
+ ServerState serverState = new ServerState();
+ for (DbHandler db : sourceDbHandlers.values())
+ {
+ serverState.update(db.getLastChange());
+ }
+ return serverState;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ return "ChangelogCache " + baseDn;
+ }
+
+ /**
+ * Check if some server Handler should be removed from flow control state.
+ * @throws IOException If an error happened.
+ */
+ public void checkAllSaturation() throws IOException
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ handler.checkWindow();
+ }
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ handler.checkWindow();
+ }
+ }
+
+ /**
+ * Check if a server that was in flow control can now restart
+ * sending updates.
+ * @param sourceHandler The server that must be checked.
+ * @return true if the server can restart sending changes.
+ * false if the server can't restart sending changes.
+ */
+ public boolean restartAfterSaturation(ServerHandler sourceHandler)
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
return false;
- }
+ }
- for (ServerHandler handler : connectedServers.values())
- {
- if (!handler.restartAfterSaturation(sourceHandler))
+ for (ServerHandler handler : connectedServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
return false;
+ }
+ return true;
}
- return true;
- }
}
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 3cee58d..a4e1ae8 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,6 +27,8 @@
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -44,6 +46,18 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.ChangelogStartMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.ServerStartMessage;
+import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -51,17 +65,6 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.ChangelogStartMessage;
-import org.opends.server.synchronization.protocol.ProtocolSession;
-import org.opends.server.synchronization.protocol.ServerStartMessage;
-import org.opends.server.synchronization.protocol.SynchronizationMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.synchronization.protocol.WindowMessage;
-import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -108,6 +111,7 @@
// flow controled and should
// be stopped from sending messsages.
private int saturationCount = 0;
+ private short changelogId;
/**
* The time in milliseconds between heartbeats from the synchronization
@@ -155,6 +159,7 @@
public void start(DN baseDn, short changelogId, String changelogURL,
int windowSize, Changelog changelog)
{
+ this.changelogId = changelogId;
rcvWindowSizeHalf = windowSize/2;
maxRcvWindow = windowSize;
rcvWindow = windowSize;
@@ -1263,4 +1268,40 @@
{
return heartbeatInterval;
}
+
+ /**
+ * Processes a routable message.
+ *
+ * @param msg The message to be processed.
+ */
+ public void process(RoutableMessage msg)
+ {
+ if (debugEnabled())
+ debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
+
+ changelogCache.process(msg, this);
+ }
+
+ /**
+ * Send an InitializeRequestMessage to the server connected through this
+ * handler.
+ *
+ * @param msg The message to be processed
+ * @throws IOException when raised by the underlying session
+ */
+ public void send(RoutableMessage msg) throws IOException
+ {
+ if (debugEnabled())
+ debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
+
+ session.publish(msg);
+ }
}
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java b/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
index 6f2e451..2dc7a75 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,6 +34,11 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.DoneMessage;
+import org.opends.server.synchronization.protocol.EntryMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
@@ -116,6 +121,33 @@
WindowMessage windowMsg = (WindowMessage) msg;
handler.updateWindow(windowMsg);
}
+ else if (msg instanceof InitializeRequestMessage)
+ {
+ InitializeRequestMessage initializeMsg =
+ (InitializeRequestMessage) msg;
+ handler.process(initializeMsg);
+ }
+ else if (msg instanceof InitializeTargetMessage)
+ {
+ InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
+ handler.process(initializeMsg);
+ }
+ else if (msg instanceof EntryMessage)
+ {
+ EntryMessage entryMsg = (EntryMessage) msg;
+ handler.process(entryMsg);
+ }
+ else if (msg instanceof DoneMessage)
+ {
+ DoneMessage doneMsg = (DoneMessage) msg;
+ handler.process(doneMsg);
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ ErrorMessage errorMsg = (ErrorMessage) msg;
+ handler.process(errorMsg);
+ }
+
}
} catch (IOException e)
{
diff --git a/opends/src/server/org/opends/server/synchronization/common/LogMessages.java b/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
index bab9527..d2fc088 100644
--- a/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
+++ b/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,13 +317,61 @@
CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
/**
- * The message id for thedescription of the attribute used to configure
+ * The message id for the description of the attribute used to configure
* the purge delay of the Changelog Servers.
*/
public static final int MSGID_PURGE_DELAY_ATTR =
CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
+ /**
+ * The message id for the error raised when export/import
+ * is rejected due to an export/import already in progress.
+ */
+ public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44;
+ /**
+ * The message id for the error raised when import
+ * is rejected due to an invalid source of data imported.
+ */
+ public static final int MSGID_INVALID_IMPORT_SOURCE =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45;
+
+ /**
+ * The message id for the error raised when export
+ * is rejected due to an invalid target to export datas.
+ */
+ public static final int MSGID_INVALID_EXPORT_TARGET =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46;
+
+ /**
+ * The message id for the error raised when import/export message
+ * cannot be routed to an up-and-running target in the domain.
+ */
+ public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47;
+
+ /**
+ * The message ID for the message that will be used when no domain
+ * can be found matching the provided domain base DN.
+ */
+ public static final int MSGID_NO_MATCHING_DOMAIN =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48;
+
+ /**
+ * The message ID for the message that will be used when no domain
+ * can be found matching the provided domain base DN.
+ */
+ public static final int MSGID_MULTIPLE_MATCHING_DOMAIN
+ = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49;
+
+
+ /**
+ * The message ID for the message that will be used when the domain
+ * belongs to a provider class that does not allow the export.
+ */
+ public static final int MSGID_INVALID_PROVIDER =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50;
/**
* Register the messages from this class in the core server.
@@ -449,5 +497,20 @@
" restored because changelog servers would not be able to refresh" +
" LDAP servers with older versions of the data. A zero value" +
" can be used to specify an infinite delay (or never purge).");
+ MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
+ "The current request is rejected due to an import or an export" +
+ " already in progress for the same data.");
+ MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE,
+ "Invalid source for the import.");
+ MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET,
+ "Invalid target for the export.");
+ MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+ "No reachable peer in the domain.");
+ MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN,
+ "No domain matches the base DN provided.");
+ MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN,
+ "Multiple domains match the base DN provided.");
+ MessageHandler.registerMessage(MSGID_INVALID_PROVIDER,
+ "The provider class does not allow the operation requested.");
}
}
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index 8fb1917..a985e6b 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,6 +27,8 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -345,6 +347,10 @@
{
if (session != null)
{
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Broker : connect closing session" , 1);
+
session.close();
session = null;
}
@@ -498,6 +504,7 @@
try
{
SynchronizationMessage msg = session.receive();
+
if (msg instanceof WindowMessage)
{
WindowMessage windowMsg = (WindowMessage) msg;
@@ -544,6 +551,11 @@
connected = false;
try
{
+ if (debugEnabled())
+ {
+ debugInfo("ChangelogBroker Stop Closing session");
+ }
+
session.close();
} catch (IOException e)
{}
@@ -682,4 +694,12 @@
{
return numLostConnections;
}
+
+ private void log(String message)
+ {
+ int msgID = MSGID_UNKNOWN_TYPE;
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
}
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java b/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
index b0bc862..670db40 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,12 +27,14 @@
package org.opends.server.synchronization.plugin;
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.synchronization.protocol.ProtocolSession;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+
import java.io.IOException;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+
/**
* This class implements a thread to monitor heartbeat messages from the
* synchronization server. Each broker runs one of these threads.
@@ -103,6 +105,9 @@
long lastReceiveTime = session.getLastReceiveTime();
if (now > lastReceiveTime + 2 * heartbeatInterval)
{
+ debugInfo("Heartbeat monitor is closing the broker session " +
+ "because it could not detect a heartbeat.");
+
// Heartbeat is well overdue so the server is assumed to be dead.
if (debugEnabled())
{
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java b/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
index e71fd3b..edbb868 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@
* Can be null is the request has no associated operation.
* @return The Synchronization domain for this DN.
*/
- private static SynchronizationDomain findDomain(DN dn, Operation op)
+ public static SynchronizationDomain findDomain(DN dn, Operation op)
{
/*
* Don't run the special synchronization code on Operation that are
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
index c0d02fd..1753d0e 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,7 +233,8 @@
{
int msgID = MSGID_ERROR_UPDATING_RUV;
String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
- op.toString(), op.getErrorMessage(), baseDn.toString());
+ op.toString(), op.getErrorMessage(), baseDn.toString(),
+ Thread.currentThread().getStackTrace());
logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
}
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFInputStream.java b/opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFInputStream.java
new file mode 100644
index 0000000..973437c
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFInputStream.java
@@ -0,0 +1,122 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.plugin;
+
+
+
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * This class creates an input stream that can be used to read entries generated
+ * by SynchroLDIF as if they were being read from another source like a file.
+ */
+public class SynchroLDIFInputStream
+extends InputStream
+{
+ // Indicates whether this input stream has been closed.
+ private boolean closed;
+
+ // The synchronization domain associated to this import.
+ SynchronizationDomain domain;
+
+ /**
+ * Creates a new SynchroLDIFInputStream that will import entries
+ * for a synchronzation domain.
+ *
+ * @param domain The synchronization domain
+ */
+ public SynchroLDIFInputStream(SynchronizationDomain domain)
+ {
+ this.domain = domain;
+ closed = false;
+ }
+
+ /**
+ * Closes this input stream so that no more data may be read from it.
+ */
+ public void close()
+ {
+ closed = true;
+ }
+
+ /**
+ * Reads data from this input stream.
+ *
+ * @param b The array into which the data should be read.
+ * @param off The position in the array at which point the data read may be
+ * placed.
+ * @param len The maximum number of bytes that may be read into the
+ * provided array.
+ *
+ * @return The number of bytes read from the input stream into the provided
+ * array, or -1 if the end of the stream has been reached.
+ *
+ * @throws IOException If a problem has occurred while generating data for
+ * use by this input stream.
+ */
+ public int read(byte[] b, int off, int len)
+ throws IOException
+ {
+ if (closed)
+ return -1;
+
+ byte[] bytes = domain.receiveEntryBytes();
+
+ if (bytes==null)
+ {
+ closed = true;
+ return -1;
+ }
+
+ int l = bytes.length;
+ for (int i =0; i<l; i++)
+ {
+ b[off+i] = bytes[i];
+ }
+ return l;
+ }
+
+ /**
+ * Reads a single byte of data from this input stream.
+ *
+ * @return The byte read from the input stream, or -1 if the end of the
+ * stream has been reached.
+ *
+ * @throws IOException If a problem has occurred while generating data for
+ * use by this input stream.
+ */
+ public int read()
+ throws IOException
+ {
+ // This method is not supposed to be called to make an LDIF import
+ // for synchronization.
+ throw new IOException("Not implemented");
+ }
+}
+
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFOutputStream.java b/opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFOutputStream.java
new file mode 100644
index 0000000..75c40a0
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFOutputStream.java
@@ -0,0 +1,97 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.plugin;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+/**
+ * This class creates an output stream that can be used to export entries
+ * to a synchonization domain.
+ */
+public class SynchroLDIFOutputStream
+ extends OutputStream
+{
+ SynchronizationDomain domain;
+ String entryBuffer = "";
+
+ /**
+ * Creates a new SynchroLDIFOutputStream related to a synchronization
+ * domain.
+ *
+ * @param domain The synchronization domain
+ */
+ public SynchroLDIFOutputStream(SynchronizationDomain domain)
+ {
+ this.domain = domain;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(int i) throws IOException
+ {
+ throw new IOException("Invalid call");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(byte b[], int off, int len) throws IOException
+ {
+ int endOfEntryIndex;
+ int startOfEntryIndex = off;
+ int bytesToRead = len;
+
+ while (true)
+ {
+ // if we have the bytes for an entry, let's make an entry and send it
+ String ebytes = new String(b,startOfEntryIndex,bytesToRead);
+ endOfEntryIndex = ebytes.indexOf("\n\n");
+ if ( endOfEntryIndex >= 0 )
+ {
+ endOfEntryIndex += 2;
+ entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
+
+ // Send the entry
+ domain.sendEntryLines(entryBuffer);
+
+ startOfEntryIndex = startOfEntryIndex + endOfEntryIndex;
+ entryBuffer = "";
+ bytesToRead -= endOfEntryIndex;
+ if (bytesToRead==0)
+ break;
+ }
+ else
+ {
+ entryBuffer = new String(b, startOfEntryIndex, len);
+ break;
+ }
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 7b2e078..728fecf 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,43 +26,54 @@
*/
package org.opends.server.synchronization.plugin;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.util.ServerConstants.
- TIME_UNIT_MILLISECONDS_ABBR;
-import static org.opends.server.util.ServerConstants.
- TIME_UNIT_MILLISECONDS_FULL;
-import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
-import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
+import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+import static org.opends.server.messages.ConfigMessages.*;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.synchronization.plugin.Historical.*;
+import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
-import static org.opends.server.loggers.Error.*;
-import static org.opends.server.messages.MessageHandler.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.createEntry;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
+import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
+import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.backends.jeb.BackendImpl;
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
-import org.opends.server.config.StringConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
+import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
@@ -73,23 +84,35 @@
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
+import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddContext;
import org.opends.server.synchronization.protocol.DeleteContext;
+import org.opends.server.synchronization.protocol.DoneMessage;
+import org.opends.server.synchronization.protocol.EntryMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ModifyContext;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.OperationContext;
+import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.tasks.InitializeTargetTask;
+import org.opends.server.tasks.InitializeTask;
+import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.LDIFExportConfig;
+import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
@@ -137,8 +160,96 @@
* server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
+ short serverId;
- private short serverId;
+ /**
+ * This class contain the context related to an import or export
+ * launched on the domain.
+ */
+ private class IEContext
+ {
+ // The task that initiated the operation.
+ Task initializeTask;
+ // The input stream for the import
+ SynchroLDIFInputStream ldifImportInputStream = null;
+ // The target in the case of an export
+ short exportTarget = RoutableMessage.UNKNOWN_SERVER;
+ // The source in the case of an import
+ short importSource = RoutableMessage.UNKNOWN_SERVER;
+
+ // The total entry count expected to be processed
+ long entryCount = 0;
+ // The count for the entry left to be processed
+ long entryLeftCount = 0;
+
+ // The exception raised when any
+ DirectoryException exception = null;
+
+ /**
+ * Initializes the counters of the task with the provider value.
+ * @param count The value with which to initialize the counters.
+ */
+ public void initTaskCounters(long count)
+ {
+ entryCount = count;
+ entryLeftCount = count;
+
+ if (initializeTask != null)
+ {
+ if (initializeTask instanceof InitializeTask)
+ {
+ ((InitializeTask)initializeTask).setTotal(entryCount);
+ ((InitializeTask)initializeTask).setLeft(entryCount);
+ }
+ else if (initializeTask instanceof InitializeTargetTask)
+ {
+ ((InitializeTargetTask)initializeTask).setTotal(entryCount);
+ ((InitializeTargetTask)initializeTask).setLeft(entryCount);
+ }
+ }
+ }
+
+ /**
+ * Update the counters of the task for each entry processed during
+ * an import or export.
+ */
+ public void updateTaskCounters()
+ {
+ entryLeftCount--;
+
+ if (initializeTask != null)
+ {
+ if (initializeTask instanceof InitializeTask)
+ {
+ ((InitializeTask)initializeTask).setLeft(entryLeftCount);
+ }
+ else if (initializeTask instanceof InitializeTargetTask)
+ {
+ ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
+ }
+ }
+ }
+
+ /**
+ * Update the state of the task.
+ */
+ protected TaskState updateTaskCompletionState()
+ {
+ if (exception == null)
+ return TaskState.COMPLETED_SUCCESSFULLY;
+ else
+ return TaskState.STOPPED_BY_ERROR;
+ }
+ }
+
+ // The context related to an import or export being processed
+ // Null when none is being processed.
+ private IEContext ieContext = null;
+
+ // The backend informations necessary to make an import or export.
+ private Backend backend;
+ private ConfigEntry backendConfigEntry;
+ private List<DN> branches = new ArrayList<DN>(0);
private int listenerThreadNumber = 10;
private boolean receiveStatus = true;
@@ -160,6 +271,7 @@
private boolean solveConflictFlag = true;
private boolean disabled = false;
+ private boolean stateSavingDisabled = false;
static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -205,8 +317,6 @@
timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
}
-
-
/**
* Creates a new SynchronizationDomain using configuration from configEntry.
*
@@ -217,6 +327,7 @@
public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
{
super("Synchronization flush");
+
/*
* read the centralized changelog server configuration
* this is a multivalued attribute
@@ -397,6 +508,10 @@
if (!receiveStatus)
broker.suspendReceive();
}
+
+ // Retrieves the related backend and its config entry
+ retrievesBackendInfos(baseDN);
+
} catch (Exception e)
{
/* TODO should mark that changelog service is
@@ -803,9 +918,9 @@
}
/**
- * Receive an update message from the changelog.
+ * Receives an update message from the changelog.
* also responsible for updating the list of pending changes
- * @return the received message
+ * @return the received message - null if none
*/
public UpdateMessage receive()
{
@@ -823,7 +938,7 @@
// The server is in the shutdown process
return null;
}
-
+ log("Broker received message :" + msg);
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -834,6 +949,56 @@
update = (UpdateMessage) msg;
receiveUpdate(update);
}
+ else if (msg instanceof InitializeRequestMessage)
+ {
+ // Another server requests us to provide entries
+ // for a total update
+ InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
+ try
+ {
+ initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+ null);
+ }
+ catch(DirectoryException de)
+ {
+ // Returns an error message to notify the sender
+ int msgID = de.getErrorMessageID();
+ ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof InitializeTargetMessage)
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+
+ try
+ {
+ importBackend(initMsg);
+ }
+ catch(DirectoryException de)
+ {
+ // Return an error message to notify the sender
+ int msgID = de.getErrorMessageID();
+ ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ if (ieContext != null)
+ {
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // changelog did not find any import source.
+ abandonImportExport((ErrorMessage)msg);
+ }
+ }
} catch (SocketTimeoutException e)
{
// just retry
@@ -876,7 +1041,9 @@
public void receiveAck(AckMessage ack)
{
UpdateMessage update;
- ChangeNumber changeNumber = ack.getChangeNumber();
+ ChangeNumber changeNumber;
+
+ changeNumber = ack.getChangeNumber();
synchronized (pendingChanges)
{
@@ -1105,7 +1272,7 @@
synchronized (this)
{
this.wait(1000);
- if (!disabled )
+ if (!disabled && !stateSavingDisabled )
{
// save the RUV
state.save();
@@ -1151,6 +1318,8 @@
this.notify();
}
+ DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
+
// stop the ChangelogBroker
broker.stop();
}
@@ -1857,6 +2026,7 @@
return broker.getNumLostConnections();
}
+
/**
* Check if the domain solve conflicts.
*
@@ -1933,6 +2103,988 @@
// Nothing is needed at the moment
}
+ /*
+ * Total Update >>
+ */
+
+ /**
+ * Receives bytes related to an entry in the context of an import to
+ * initialize the domain (called by SynchronizationDomainLDIFInputStream).
+ *
+ * @return The bytes. Null when the Done or Err message has been received
+ */
+ public byte[] receiveEntryBytes()
+ {
+ SynchronizationMessage msg;
+ while (true)
+ {
+ try
+ {
+ msg = broker.receive();
+
+ if (msg == null)
+ {
+ // The server is in the shutdown process
+ return null;
+ }
+ log("receiveEntryBytes: received " + msg);
+ if (msg instanceof EntryMessage)
+ {
+ // FIXME
+ EntryMessage entryMsg = (EntryMessage)msg;
+ byte[] entryBytes = entryMsg.getEntryBytes().clone();
+ ieContext.updateTaskCounters();
+ return entryBytes;
+ }
+ else if (msg instanceof DoneMessage)
+ {
+ // This is the normal termination of the import
+ // No error is stored and the import is ended
+ // by returning null
+ return null;
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ // This is an error termination during the import
+ // The error is stored and the import is ended
+ // by returning null
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ ieContext.exception = new DirectoryException(ResultCode.OTHER,
+ errorMsg.getDetails() , errorMsg.getMsgID());
+ return null;
+ }
+ else
+ {
+ // Other messages received during an import are trashed
+ }
+ }
+ catch(Exception e)
+ {
+ ieContext.exception = new DirectoryException(ResultCode.OTHER,
+ "received an unexpected message type" , 1, e);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Processes an error message received while an import/export is
+ * on going.
+ * @param errorMsg The error message received.
+ */
+ protected void abandonImportExport(ErrorMessage errorMsg)
+ {
+ // FIXME TBD Treat the case where the error happens while entries
+ // are being exported
+
+ if (ieContext != null)
+ {
+ ieContext.exception = new DirectoryException(ResultCode.OTHER,
+ errorMsg.getDetails() , errorMsg.getMsgID());
+
+ if (ieContext.initializeTask instanceof InitializeTask)
+ {
+ // Update the task that initiated the import
+ ((InitializeTask)ieContext.initializeTask).
+ setState(ieContext.updateTaskCompletionState(),ieContext.exception);
+
+ ieContext = null;
+ }
+ }
+ }
+
+ /**
+ * Clears all the entries from the JE backend determined by the
+ * be id passed into the method.
+ *
+ * @param createBaseEntry Indicate whether to automatically create the base
+ * entry and add it to the backend.
+ * @param beID The be id to clear.
+ * @param dn The suffix of the backend to create if the the createBaseEntry
+ * boolean is true.
+ * @throws Exception If an unexpected problem occurs.
+ */
+ public static void clearJEBackend(boolean createBaseEntry, String beID,
+ String dn) throws Exception
+ {
+ BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
+ DN[] baseDNs = backend.getBaseDNs();
+
+ // FIXME Should getConfigEntry be part of TaskUtils ?
+ ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
+
+ // FIXME Should setBackendEnabled be part of TaskUtils ?
+ TaskUtils.setBackendEnabled(configEntry, false);
+
+ try
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+
+ if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
+ {
+ throw new RuntimeException(failureReason.toString());
+ }
+
+ try
+ {
+ backend.clearBackend(configEntry, baseDNs);
+ }
+ finally
+ {
+ LockFileManager.releaseLock(lockFile, failureReason);
+ }
+ }
+ finally
+ {
+ TaskUtils.setBackendEnabled(configEntry, true);
+ }
+
+ if (createBaseEntry)
+ {
+ DN baseDN = DN.decode(dn);
+ Entry e = createEntry(baseDN);
+ backend = (BackendImpl)DirectoryServer.getBackend(beID);
+ backend.addEntry(e, null);
+ }
+ }
+
+ /**
+ * Log debug message.
+ * @param message The message to log.
+ */
+ private void log(String message)
+ {
+ if (debugEnabled())
+ {
+ debugInfo("DebugInfo" + message);
+ int msgID = MSGID_UNKNOWN_TYPE;
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "SynchronizationDomain/ " + message, msgID);
+ }
+ }
+
+ /**
+ * Export the entries.
+ * @throws DirectoryException when an error occured
+ */
+ protected void exportBackend() throws DirectoryException
+ {
+ // FIXME Temporary workaround - will probably be fixed when implementing
+ // dynamic config
+ retrievesBackendInfos(this.baseDN);
+
+ // Acquire a shared lock for the backend.
+ try
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+ if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message + " " + stackTraceToSingleLineString(e), msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
+
+ LDIFExportConfig exportConfig = new LDIFExportConfig(os);
+
+ // Launch the export.
+ try
+ {
+ DN[] baseDNs = {this.baseDN};
+ backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
+ }
+ catch (DirectoryException de)
+ {
+ int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
+ String message = getMessage(msgID, de.getErrorMessage());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
+ msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
+ msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ finally
+ {
+ // Clean up after the export by closing the export config.
+ exportConfig.close();
+
+ // Release the shared lock on the backend.
+ try
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+ if (! LockFileManager.releaseLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ stackTraceToSingleLineString(e));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+ }
+
+ /**
+ * Retrieves the backend object related to the domain and the backend's
+ * config entry. They will be used for import and export.
+ * TODO This should be in a shared package rather than here.
+ *
+ * @param baseDN The baseDN to retrieve the backend
+ * @throws DirectoryException when an error occired
+ */
+ protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
+ {
+ ArrayList<Backend> backendList = new ArrayList<Backend>();
+ ArrayList<ConfigEntry> entryList = new ArrayList<ConfigEntry>();
+ ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
+
+ Backend backend = null;
+ ConfigEntry backendConfigEntry = null;
+ List<DN> branches = new ArrayList<DN>(0);
+
+ // Retrieves the backend related to this domain
+ Backend domainBackend = DirectoryServer.getBackend(baseDN);
+ if (domainBackend == null)
+ {
+ int msgID = MSGID_CANNOT_DECODE_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ // Retrieves its config entry and its DNs
+ int code = getBackends(backendList, entryList, dnList);
+ if (code != 0)
+ {
+ int msgID = MSGID_CANNOT_DECODE_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ int numBackends = backendList.size();
+ for (int i=0; i < numBackends; i++)
+ {
+ Backend b = backendList.get(i);
+
+ if (domainBackend.getBackendID() != b.getBackendID())
+ {
+ continue;
+ }
+
+ if (backend == null)
+ {
+ backend = domainBackend;
+ backendConfigEntry = entryList.get(i).duplicate();
+ branches = dnList.get(i);
+ }
+ else
+ {
+ int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
+ String message = getMessage(msgID, domainBackend.getBackendID());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+
+ if (backend == null)
+ {
+ int msgID = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
+ String message = getMessage(msgID, domainBackend.getBackendID());
+ logError(ErrorLogCategory.BACKEND,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ else if (! backend.supportsLDIFExport())
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_IMPORT;
+ String message = getMessage(msgID, 0); // FIXME
+ logError(ErrorLogCategory.BACKEND,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+
+ this.backend = backend;
+ this.backendConfigEntry = backendConfigEntry;
+ this.branches = branches;
+ }
+
+
+ /**
+ * Sends lDIFEntry entry lines to the export target currently set.
+ *
+ * @param lDIFEntry The lines for the LDIF entry.
+ * @throws IOException when an error occured.
+ */
+ public void sendEntryLines(String lDIFEntry) throws IOException
+ {
+ // If an error was raised - like receiving an ErrorMessage
+ // we just let down the export.
+ if (ieContext.exception != null)
+ {
+ IOException ioe = new IOException(ieContext.exception.getMessage());
+ ieContext = null;
+ throw ioe;
+ }
+
+ // new entry then send the current one
+ EntryMessage entryMessage = new EntryMessage(
+ serverId, ieContext.exportTarget, lDIFEntry.getBytes());
+ broker.publish(entryMessage);
+
+ ieContext.updateTaskCounters();
+ }
+
+ /**
+ * Retrieves information about the backends defined in the Directory Server
+ * configuration.
+ *
+ * @param backendList A list into which instantiated (but not initialized)
+ * backend instances will be placed.
+ * @param entryList A list into which the config entries associated with
+ * the backends will be placed.
+ * @param dnList A list into which the set of base DNs for each backend
+ * will be placed.
+ */
+ private static int getBackends(ArrayList<Backend> backendList,
+ ArrayList<ConfigEntry> entryList,
+ ArrayList<List<DN>> dnList)
+ throws DirectoryException
+ {
+ // Get the base entry for all backend configuration.
+ DN backendBaseDN = null;
+ try
+ {
+ backendBaseDN = DN.decode(DN_BACKEND_BASE);
+ }
+ catch (DirectoryException de)
+ {
+ int msgID = MSGID_CANNOT_DECODE_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_CANNOT_DECODE_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE,
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ ConfigEntry baseEntry = null;
+ try
+ {
+ baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
+ }
+ catch (ConfigException ce)
+ {
+ int msgID = MSGID_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
+ String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
+ String message = getMessage(msgID, DN_BACKEND_BASE,
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+
+ // Iterate through the immediate children, attempting to parse them as
+ // backends.
+ for (ConfigEntry configEntry : baseEntry.getChildren().values())
+ {
+ // Get the backend ID attribute from the entry. If there isn't one, then
+ // skip the entry.
+ String backendID = null;
+ try
+ {
+ int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
+ StringConfigAttribute idStub =
+ new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
+ true, false, true);
+ StringConfigAttribute idAttr =
+ (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
+ if (idAttr == null)
+ {
+ continue;
+ }
+ else
+ {
+ backendID = idAttr.activeValue();
+ }
+ }
+ catch (ConfigException ce)
+ {
+ int msgID = MSGID_CANNOT_DETERMINE_BACKEND_ID;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ ce.getMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_CANNOT_DETERMINE_BACKEND_ID;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+
+ // Get the backend class name attribute from the entry. If there isn't
+ // one, then just skip the entry.
+ String backendClassName = null;
+ try
+ {
+ int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
+ StringConfigAttribute classStub =
+ new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
+ true, false, false);
+ StringConfigAttribute classAttr =
+ (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
+ if (classAttr == null)
+ {
+ continue;
+ }
+ else
+ {
+ backendClassName = classAttr.activeValue();
+ }
+ }
+ catch (ConfigException ce)
+ {
+ int msgID = MSGID_CANNOT_DETERMINE_BACKEND_CLASS;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ ce.getMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_CANNOT_DETERMINE_BACKEND_CLASS;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ Class backendClass = null;
+ try
+ {
+ backendClass = Class.forName(backendClassName);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_CANNOT_LOAD_BACKEND_CLASS;
+ String message = getMessage(msgID, backendClassName,
+ String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ Backend backend = null;
+ try
+ {
+ backend = (Backend) backendClass.newInstance();
+ backend.setBackendID(backendID);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_CANNOT_INSTANTIATE_BACKEND_CLASS;
+ String message = getMessage(msgID, backendClassName,
+ String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null); }
+
+
+ // Get the base DN attribute from the entry. If there isn't one, then
+ // just skip this entry.
+ List<DN> baseDNs = null;
+ try
+ {
+ int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
+ DNConfigAttribute baseDNStub =
+ new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
+ true, true, true);
+ DNConfigAttribute baseDNAttr =
+ (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
+ if (baseDNAttr == null)
+ {
+ msgID = MSGID_NO_BASES_FOR_BACKEND;
+ String message = getMessage(msgID,
+ String.valueOf(configEntry.getDN()));
+ throw new DirectoryException(
+ DirectoryServer.getServerErrorResultCode(), message,msgID, null);
+ }
+ else
+ {
+ baseDNs = baseDNAttr.activeValues();
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_CANNOT_DETERMINE_BASES_FOR_BACKEND;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null); }
+
+
+ backendList.add(backend);
+ entryList.add(configEntry);
+ dnList.add(baseDNs);
+ }
+ return 0;
+ }
+
+ /**
+ * Initializes this domain from another source server.
+ *
+ * @param source The source from which to initialize
+ * @param initTask The task that launched the initialization
+ * and should be updated of its progress.
+ * @throws DirectoryException when an error occurs
+ */
+ public void initialize(short source, Task initTask)
+ throws DirectoryException
+ {
+ acquireIEContext();
+ ieContext.initializeTask = initTask;
+
+ InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
+ baseDN, serverId, source);
+
+ // Publish Init request msg
+ broker.publish(initializeMsg);
+
+ // .. we expect to receive entries or err after that
+ }
+
+ /**
+ * Verifies that the given string represents a valid source
+ * from which this server can be initialized.
+ * @param sourceString The string representaing the source
+ * @return The source as a short value
+ * @throws DirectoryException if the string is not valid
+ */
+ public short decodeSource(String sourceString)
+ throws DirectoryException
+ {
+ short source = 0;
+ Throwable cause = null;
+ try
+ {
+ source = Integer.decode(sourceString).shortValue();
+ if (source >= -1)
+ {
+ // TODO Verifies serverID is in the domain
+ // We shold check here that this is a server implied
+ // in the current domain.
+
+ log("Source decoded for import:" + source);
+ return source;
+ }
+ }
+ catch(Exception e)
+ {
+ cause = e;
+ }
+
+ ResultCode resultCode = ResultCode.OTHER;
+ int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
+ String message = getMessage(errorMessageID);
+
+ if (cause != null)
+ throw new DirectoryException(
+ resultCode, message, errorMessageID, cause);
+ else
+ throw new DirectoryException(
+ resultCode, message, errorMessageID);
+ }
+
+ /**
+ * Verifies that the given string represents a valid source
+ * from which this server can be initialized.
+ * @param targetString The string representing the source
+ * @return The source as a short value
+ * @throws DirectoryException if the string is not valid
+ */
+ public short decodeTarget(String targetString)
+ throws DirectoryException
+ {
+ short target = 0;
+ Throwable cause;
+ if (targetString.equalsIgnoreCase("all"))
+ {
+ return RoutableMessage.ALL_SERVERS;
+ }
+
+ // So should be a serverID
+ try
+ {
+ target = Integer.decode(targetString).shortValue();
+ if (target >= 0)
+ {
+ // FIXME Could we check now that it is a know server in the domain ?
+ }
+ return target;
+ }
+ catch(Exception e)
+ {
+ cause = e;
+ }
+ ResultCode resultCode = ResultCode.OTHER;
+ int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
+ String message = getMessage(errorMessageID);
+
+ if (cause != null)
+ throw new DirectoryException(
+ resultCode, message, errorMessageID, cause);
+ else
+ throw new DirectoryException(
+ resultCode, message, errorMessageID);
+
+ }
+
+ private synchronized void acquireIEContext()
+ throws DirectoryException
+ {
+ if (ieContext != null)
+ {
+ // Rejects 2 simultaneous exports
+ int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
+ String message = getMessage(msgID);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+
+ ieContext = new IEContext();
+ }
+
+ private synchronized void releaseIEContext()
+ {
+ ieContext = null;
+ }
+
+ /**
+ * Process the initialization of some other server or servers in the topology
+ * specified by the target argument.
+ * @param target The target that should be initialized
+ * @param initTask The task that triggers this initialization and that should
+ * be updated with its progress.
+ * @exception DirectoryException When an error occurs.
+ */
+ public void initializeTarget(short target, Task initTask)
+ throws DirectoryException
+ {
+ initializeTarget(target, serverId, initTask);
+ }
+
+ /**
+ * Process the initialization of some other server or servers in the topology
+ * specified by the target argument when this initialization has been
+ * initiated by another server than this one.
+ * @param target The target that should be initialized.
+ * @param requestorID The server that initiated the export.
+ * @param initTask The task that triggers this initialization and that should
+ * be updated with its progress.
+ * @exception DirectoryException When an error occurs.
+ */
+ public void initializeTarget(short target, short requestorID, Task initTask)
+ throws DirectoryException
+ {
+ acquireIEContext();
+
+ ieContext.exportTarget = target;
+ ieContext.initializeTask = initTask;
+ ieContext.initTaskCounters(backend.getEntryCount());
+
+ // Send start message
+ InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
+ baseDN, serverId, ieContext.exportTarget, requestorID,
+ ieContext.entryLeftCount);
+
+ log("SD : publishes " + initializeMessage +
+ " for #entries=" + ieContext.entryCount);
+
+ broker.publish(initializeMessage);
+
+ // make an export and send entries
+ exportBackend();
+
+ // Successfull termnation
+ DoneMessage doneMsg = new DoneMessage(serverId,
+ initializeMessage.getDestination());
+ broker.publish(doneMsg);
+
+ if (ieContext != null)
+ {
+ ieContext.updateTaskCompletionState();
+ ieContext = null;
+ }
+ }
+
+ /**
+ * Process backend before import.
+ * @param backend The backend.
+ * @param backendConfigEntry The config entry of the backend.
+ * @throws Exception
+ */
+ private void preBackendImport(Backend backend,
+ ConfigEntry backendConfigEntry)
+ throws Exception
+ {
+ // Stop saving state
+ stateSavingDisabled = true;
+
+ // Clear the backend
+ clearJEBackend(false,backend.getBackendID(),null);
+
+ // FIXME setBackendEnabled should be part of TaskUtils ?
+ TaskUtils.setBackendEnabled(backendConfigEntry, false);
+
+ // Acquire an exclusive lock for the backend.
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+ if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ throw new DirectoryException(ResultCode.OTHER, message, msgID);
+ }
+ }
+
+ /**
+ * Initializes the domain's backend with received entries.
+ * @param initializeMessage The message that initiated the import.
+ * @exception DirectoryException Thrown when an error occurs.
+ */
+ protected void importBackend(InitializeTargetMessage initializeMessage)
+ throws DirectoryException
+ {
+ LDIFImportConfig importConfig = null;
+ try
+ {
+ log("startImport");
+
+ if (initializeMessage.getRequestorID() == serverId)
+ {
+ // The import responds to a request we did so the IEContext
+ // is already acquired
+ }
+ else
+ {
+ acquireIEContext();
+ }
+
+ ieContext.importSource = initializeMessage.getsenderID();
+ ieContext.entryLeftCount = initializeMessage.getEntryCount();
+ ieContext.initTaskCounters(initializeMessage.getEntryCount());
+
+ preBackendImport(this.backend, this.backendConfigEntry);
+
+ DN[] baseDNs = {baseDN};
+ ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
+ importConfig =
+ new LDIFImportConfig(ieContext.ldifImportInputStream);
+ importConfig.setIncludeBranches(this.branches);
+
+ // TODO How to deal with rejected entries during the import
+ // importConfig.writeRejectedEntries("rejectedImport",
+ // ExistingFileBehavior.OVERWRITE);
+
+ // Process import
+ this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
+
+ stateSavingDisabled = false;
+
+ // Re-exchange state with SS
+ broker.stop();
+ broker.start(changelogServers);
+
+ }
+ catch(Exception e)
+ {
+ throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
+ 2);// FIXME
+ }
+ finally
+ {
+ // Cleanup
+ importConfig.close();
+
+ // Re-enable backend
+ closeBackendImport(this.backend, this.backendConfigEntry);
+
+ // Update the task that initiated the import
+ if ((ieContext != null ) && (ieContext.initializeTask != null))
+ {
+ ((InitializeTask)ieContext.initializeTask).
+ setState(ieContext.updateTaskCompletionState(),ieContext.exception);
+ }
+
+ releaseIEContext();
+
+ log("End importBackend");
+ }
+ // Success
+ }
+
+ /**
+ * Make post import operations.
+ * @param backend The backend implied in the import.
+ * @param backendConfigEntry The config entry of the backend.
+ * @exception DirectoryException Thrown when an error occurs.
+ */
+ protected void closeBackendImport(Backend backend,
+ ConfigEntry backendConfigEntry)
+ throws DirectoryException
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+
+ // Release lock
+ if (!LockFileManager.releaseLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ new DirectoryException(ResultCode.OTHER, message, msgID);
+ }
+
+ // FIXME setBackendEnabled should be part taskUtils ?
+ TaskUtils.setBackendEnabled(backendConfigEntry, true);
+ }
+
+ /**
+ * Retrieves a synchronization domain based on the baseDN.
+ *
+ * @param baseDN The baseDN of the domain to retrieve
+ * @return The domain retrieved
+ * @throws DirectoryException When an error occured.
+ */
+ public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
+ throws DirectoryException
+ {
+ SynchronizationDomain synchronizationDomain = null;
+
+ // Retrieves the domain
+ DirectoryServer.getSynchronizationProviders();
+ for (SynchronizationProvider provider :
+ DirectoryServer.getSynchronizationProviders())
+ {
+ if (!( provider instanceof MultimasterSynchronization))
+ {
+ int msgID = LogMessages.MSGID_INVALID_PROVIDER;
+ String message = getMessage(msgID);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+
+ // From the domainDN retrieves the synchronization domain
+ SynchronizationDomain sdomain =
+ MultimasterSynchronization.findDomain(baseDN, null);
+ if (sdomain == null)
+ {
+ int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
+ String message = getMessage(msgID) + " " + baseDN;
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+
+ if (synchronizationDomain != null)
+ {
+ // Should never happen
+ int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
+ String message = getMessage(msgID);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+ synchronizationDomain = sdomain;
+ }
+ return synchronizationDomain;
+ }
+
+ /**
+ * Returns the backend associated to this domain.
+ * @return The associated backend.
+ */
+ public Backend getBackend()
+ {
+ return backend;
+ }
+
+ /**
+ * Returns a boolean indiciating if an import or export is currently
+ * processed.
+ * @return The status
+ */
+ public boolean ieRunning()
+ {
+ return (ieContext != null);
+ }
+ /*
+ * <<Total Update
+ */
+
+
/**
* Push the modifications contain the in given parameter has
* a modification that would happen on a local server.
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/DoneMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/DoneMessage.java
new file mode 100644
index 0000000..23bc42f
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/protocol/DoneMessage.java
@@ -0,0 +1,122 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is part of the synchronization protocol.
+ * This message is sent by a server to one or several other servers after the
+ * last entry sent in the context of a total update and signals to the server
+ * that receives it that the export is now finished.
+ */
+public class DoneMessage extends RoutableMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = 5216659571724730361L;
+
+ /**
+ * Creates a message.
+ *
+ * @param sender The sender server of this message.
+ * @param destination The server or servers targetted by this message.
+ */
+ public DoneMessage(short sender, short destination)
+ {
+ super(sender, destination);
+ }
+
+ /**
+ * Creates a new message by decoding the provided byte array.
+ * @param in A byte array containing the encoded information for the message,
+ * @throws DataFormatException If the in does not contain a properly,
+ * encoded message.
+ */
+ public DoneMessage(byte[] in) throws DataFormatException
+ {
+ super();
+ try
+ {
+ // First byte is the type
+ if (in[0] != MSG_TYPE_DONE)
+ throw new DataFormatException("input is not a valid DoneMessage");
+ int pos = 1;
+
+ // sender
+ int length = getNextLength(in, pos);
+ String senderString = new String(in, pos, length, "UTF-8");
+ this.senderID = Short.valueOf(senderString);
+ pos += length +1;
+
+ // destination
+ length = getNextLength(in, pos);
+ String destinationString = new String(in, pos, length, "UTF-8");
+ this.destination = Short.valueOf(destinationString);
+ pos += length +1;
+
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ try
+ {
+ byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+
+ int length = 1 + senderBytes.length + 1
+ + destinationBytes.length + 1;
+
+ byte[] resultByteArray = new byte[length];
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_DONE;
+ int pos = 1;
+
+ /* put the sender */
+ pos = addByteArray(senderBytes, resultByteArray, pos);
+
+ /* put the destination */
+ pos = addByteArray(destinationBytes, resultByteArray, pos);
+
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/EntryMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/EntryMessage.java
new file mode 100644
index 0000000..cea9198
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/protocol/EntryMessage.java
@@ -0,0 +1,142 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is part of the synchronization protocol.
+ * This message is sent by a server to one or several other servers and
+ * contain one entry to be sent over the protocol in the context of
+ * an import/export over the protocol.
+ */
+public class EntryMessage extends RoutableMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = 6116955858351992926L;
+
+ // The byte array containing the bytes of the entry transported
+ private byte[] entryByteArray;
+
+ /**
+ * Creates a new EntryMessage.
+ *
+ * @param sender The sender of this message.
+ * @param destination The destination of this message.
+ * @param entryBytes The bytes of the entry.
+ */
+ public EntryMessage(short sender, short destination, byte[] entryBytes)
+ {
+ super(sender, destination);
+ this.entryByteArray = entryBytes.clone();
+ }
+
+ /**
+ * Creates a new EntryMessage from its encoded form.
+ *
+ * @param in The byte array containing the encoded form of the message.
+ * @throws DataFormatException If the byte array does not contain a valid
+ * encoded form of the ServerStartMessage.
+ */
+ public EntryMessage(byte[] in) throws DataFormatException
+ {
+ try
+ {
+ /* first byte is the type */
+ if (in[0] != MSG_TYPE_ENTRY)
+ throw new DataFormatException("input is not a valid ServerStart msg");
+ int pos = 1;
+
+ // sender
+ int length = getNextLength(in, pos);
+ String senderIDString = new String(in, pos, length, "UTF-8");
+ this.senderID = Short.valueOf(senderIDString);
+ pos += length +1;
+
+ // destination
+ length = getNextLength(in, pos);
+ String destinationString = new String(in, pos, length, "UTF-8");
+ this.destination = Short.valueOf(destinationString);
+ pos += length +1;
+
+ // entry
+ length = getNextLength(in, pos);
+ this.entryByteArray = new byte[length];
+ for (int i=0; i<length; i++)
+ {
+ entryByteArray[i] = in[pos+i];
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * Returns the entry bytes.
+ * @return The entry bytes.
+ */
+ public byte[] getEntryBytes()
+ {
+ return entryByteArray;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ try {
+ byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+ byte[] entryBytes = entryByteArray;
+
+ int length = 1 + senderBytes.length +
+ 1 + destinationBytes.length +
+ 1 + entryBytes.length + 1;
+
+ byte[] resultByteArray = new byte[length];
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_ENTRY;
+ int pos = 1;
+
+ pos = addByteArray(senderBytes, resultByteArray, pos);
+ pos = addByteArray(destinationBytes, resultByteArray, pos);
+ pos = addByteArray(entryBytes, resultByteArray, pos);
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/ErrorMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/ErrorMessage.java
new file mode 100644
index 0000000..597041e
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/protocol/ErrorMessage.java
@@ -0,0 +1,188 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is part of the synchronization protocol.
+ * This message is sent by a server or a changelog server when an error
+ * is detected in the context of a total update.
+ */
+public class ErrorMessage extends RoutableMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = 2726389860247088266L;
+
+ // Specifies the messageID built form the error that was detected
+ private int msgID;
+ // Specifies the complementary details about the error that was detected
+ private String details = null;
+
+ /**
+ * Create a InitializeMessage.
+ * @param sender The server ID of the server that send this message.
+ * @param destination The destination server or servers of this message.
+ * @param msgID The error message ID.
+ * @param details The details of the error.
+ */
+ public ErrorMessage(short sender, short destination, int msgID,
+ String details)
+ {
+ super(sender, destination);
+ this.msgID = msgID;
+ this.details = details;
+ }
+
+ /**
+ * Create a InitializeMessage.
+ *
+ * @param destination changelog server id
+ * @param msgID error message ID
+ * @param details details of the error
+ */
+ public ErrorMessage(short destination, int msgID, String details)
+ {
+ super((short)-2, destination);
+ this.msgID = msgID;
+ this.details = details;
+ }
+
+ /**
+ * Creates a new InitializeMessage by decoding the provided byte array.
+ * @param in A byte array containing the encoded information for the Message
+ * @throws DataFormatException If the in does not contain a properly
+ * encoded InitializeMessage.
+ */
+ public ErrorMessage(byte[] in) throws DataFormatException
+ {
+ super();
+ try
+ {
+ /* first byte is the type */
+ if (in[0] != MSG_TYPE_ERROR)
+ throw new DataFormatException("input is not a valid InitializeMessage");
+ int pos = 1;
+
+ // sender
+ int length = getNextLength(in, pos);
+ String senderString = new String(in, pos, length, "UTF-8");
+ senderID = Short.valueOf(senderString);
+ pos += length +1;
+
+ // destination
+ length = getNextLength(in, pos);
+ String serverIdString = new String(in, pos, length, "UTF-8");
+ destination = Short.valueOf(serverIdString);
+ pos += length +1;
+
+ // MsgID
+ length = getNextLength(in, pos);
+ String msgIdString = new String(in, pos, length, "UTF-8");
+ msgID = Integer.valueOf(msgIdString);
+ pos += length +1;
+
+ // Details
+ length = getNextLength(in, pos);
+ details = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * Get the base DN from this InitializeMessage.
+ *
+ * @return the base DN from this InitializeMessage.
+ */
+ public String getDetails()
+ {
+ return details;
+ }
+
+ /**
+ * Get the base DN from this InitializeMessage.
+ *
+ * @return the base DN from this InitializeMessage.
+ */
+ public int getMsgID()
+ {
+ return msgID;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ /* The InitializeMessage is stored in the form :
+ * <operation type><basedn><serverid>
+ */
+ try {
+ byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
+ byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
+ byte[] byteDetails = details.getBytes("UTF-8");
+
+ int length = 1 + byteSender.length + 1
+ + byteDestination.length + 1
+ + byteErrMsgId.length + 1
+ + byteDetails.length + 1;
+
+ byte[] resultByteArray = new byte[length];
+
+ // put the type of the operation
+ resultByteArray[0] = MSG_TYPE_ERROR;
+ int pos = 1;
+
+ // sender
+ pos = addByteArray(byteSender, resultByteArray, pos);
+
+ // destination
+ pos = addByteArray(byteDestination, resultByteArray, pos);
+
+ // MsgId
+ pos = addByteArray(byteErrMsgId, resultByteArray, pos);
+
+ // details
+ pos = addByteArray(byteDetails, resultByteArray, pos);
+
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/InitializeRequestMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/InitializeRequestMessage.java
new file mode 100644
index 0000000..42a262f
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/protocol/InitializeRequestMessage.java
@@ -0,0 +1,158 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+
+/**
+ * This message is part of the synchronization protocol.
+ * This message is sent by a server to another server in order to
+ * request this other server to do an export to the server sender
+ * of this message.
+ */
+public class InitializeRequestMessage extends RoutableMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = 8303271162942249215L;
+
+ private String baseDn = null;
+
+ /**
+ * Creates a InitializeRequestMessage message.
+ *
+ * @param baseDn The base DN of the synchronization domain.
+ * @param destination destination of this message
+ * @param senderID serverID of the server that will send this message
+ */
+ public InitializeRequestMessage(DN baseDn, short senderID, short destination)
+ {
+ super(senderID, destination);
+ this.baseDn = baseDn.toNormalizedString();
+ }
+
+ /**
+ * Creates a new InitializeRequestMessage by decoding the provided byte array.
+ * @param in A byte array containing the encoded information for the Message
+ * @throws DataFormatException If the in does not contain a properly
+ * encoded InitializeMessage.
+ */
+ public InitializeRequestMessage(byte[] in) throws DataFormatException
+ {
+ super();
+ try
+ {
+ /* first byte is the type */
+ if (in[0] != MSG_TYPE_INITIALIZE_REQUEST)
+ throw new DataFormatException(
+ "input is not a valid InitializeRequestMessage");
+ int pos = 1;
+
+ // baseDn
+ int length = getNextLength(in, pos);
+ baseDn = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+
+ // sender
+ length = getNextLength(in, pos);
+ String sourceServerIdString = new String(in, pos, length, "UTF-8");
+ senderID = Short.valueOf(sourceServerIdString);
+ pos += length +1;
+
+ // destination
+ length = getNextLength(in, pos);
+ String destinationServerIdString = new String(in, pos, length, "UTF-8");
+ destination = Short.valueOf(destinationServerIdString);
+ pos += length +1;
+
+
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * Get the base DN from this InitializeRequestMessage.
+ *
+ * @return the base DN from this InitializeRequestMessage.
+ */
+ public DN getBaseDn()
+ {
+ if (baseDn == null)
+ return null;
+ try
+ {
+ return DN.decode(baseDn);
+ } catch (DirectoryException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ try {
+ byte[] baseDNBytes = baseDn.getBytes("UTF-8");
+ byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] destinationBytes = String.valueOf(destination).
+ getBytes("UTF-8");
+
+ int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1
+ + destinationBytes.length + 1;
+
+ byte[] resultByteArray = new byte[length];
+
+ // type of the operation
+ resultByteArray[0] = MSG_TYPE_INITIALIZE_REQUEST;
+ int pos = 1;
+
+ // baseDN
+ pos = addByteArray(baseDNBytes, resultByteArray, pos);
+
+ // sender
+ pos = addByteArray(senderBytes, resultByteArray, pos);
+
+ // destination
+ pos = addByteArray(destinationBytes, resultByteArray, pos);
+
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/InitializeTargetMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/InitializeTargetMessage.java
new file mode 100644
index 0000000..b86523b
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/protocol/InitializeTargetMessage.java
@@ -0,0 +1,212 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+
+/**
+ * This message is part of the synchronization protocol.
+ * This message is sent by a server to one or several servers as the
+ * first message of an export, before sending the entries.
+ */
+public class InitializeTargetMessage extends RoutableMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = -2122460559739139735L;
+
+ private String baseDN = null;
+
+ // Specifies the number of entries expected to be exported.
+ private long entryCount;
+
+ // Specifies the serverID of the server that requested this export
+ // to happen. It allows a server that previously sent an
+ // InitializeRequestMessage to know that the current message
+ // is related to its own request.
+ private short requestorID;
+
+ /**
+ * Creates a InitializeDestinationMessage.
+ *
+ * @param baseDN The base DN for which the InitializeMessage is created.
+ * @param senderID The serverID of the server that sends this message.
+ * @param destination The destination of this message.
+ * @param requestorID The server that initiates this export.
+ * @param entryCount The count of entries that will be sent.
+ */
+ public InitializeTargetMessage(DN baseDN, short senderID,
+ short destination, short requestorID, long entryCount)
+ {
+ super(senderID, destination);
+ this.requestorID = requestorID;
+ this.baseDN = baseDN.toNormalizedString();
+ this.entryCount = entryCount;
+ }
+
+ /**
+ * Creates an InitializeTargetMessage by decoding the provided byte array.
+ * @param in A byte array containing the encoded information for the Message
+ * @throws DataFormatException If the in does not contain a properly
+ * encoded InitializeMessage.
+ */
+ public InitializeTargetMessage(byte[] in) throws DataFormatException
+ {
+ super();
+ try
+ {
+ /* first byte is the type */
+ if (in[0] != MSG_TYPE_INITIALIZE_TARGET)
+ throw new DataFormatException(
+ "input is not a valid InitializeDestinationMessage");
+ int pos = 1;
+
+ // destination
+ int length = getNextLength(in, pos);
+ String destinationString = new String(in, pos, length, "UTF-8");
+ this.destination = Short.valueOf(destinationString);
+ pos += length +1;
+
+ // baseDn
+ length = getNextLength(in, pos);
+ baseDN = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+
+ // sender
+ length = getNextLength(in, pos);
+ String senderString = new String(in, pos, length, "UTF-8");
+ senderID = Short.valueOf(senderString);
+ pos += length +1;
+
+ // requestor
+ length = getNextLength(in, pos);
+ String requestorString = new String(in, pos, length, "UTF-8");
+ requestorID = Short.valueOf(requestorString);
+ pos += length +1;
+
+ // entryCount
+ length = getNextLength(in, pos);
+ String entryCountString = new String(in, pos, length, "UTF-8");
+ entryCount = Long.valueOf(entryCountString);
+ pos += length +1;
+
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * Get the number of entries expected to be sent during the export.
+ * @return the entry count
+ */
+ public long getEntryCount()
+ {
+ return this.entryCount;
+ }
+
+ /**
+ * Get the serverID of the server that initiated the export.
+ * @return the serverID
+ */
+ public long getRequestorID()
+ {
+ return this.requestorID;
+ }
+
+ /**
+ * Get the base DN of the domain.
+ *
+ * @return the base DN
+ */
+ public DN getBaseDN()
+ {
+ if (baseDN == null)
+ return null;
+ try
+ {
+ return DN.decode(baseDN);
+ } catch (DirectoryException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ try
+ {
+ byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
+ byte[] byteDn = baseDN.getBytes("UTF-8");
+ byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
+ byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
+
+ int length = 1 + byteDestination.length + 1
+ + byteDn.length + 1
+ + byteSender.length + 1
+ + byteRequestor.length + 1
+ + byteEntryCount.length + 1;
+
+ byte[] resultByteArray = new byte[length];
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_INITIALIZE_TARGET;
+ int pos = 1;
+
+ /* put the destination */
+ pos = addByteArray(byteDestination, resultByteArray, pos);
+
+ /* put the baseDN and a terminating 0 */
+ pos = addByteArray(byteDn, resultByteArray, pos);
+
+ /* put the sender */
+ pos = addByteArray(byteSender, resultByteArray, pos);
+
+ /* put the requestorID */
+ pos = addByteArray(byteRequestor, resultByteArray, pos);
+
+ /* put the entryCount */
+ pos = addByteArray(byteEntryCount, resultByteArray, pos);
+
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/RoutableMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/RoutableMessage.java
new file mode 100644
index 0000000..625f29a
--- /dev/null
+++ b/opends/src/server/org/opends/server/synchronization/protocol/RoutableMessage.java
@@ -0,0 +1,103 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.protocol;
+
+import java.io.Serializable;
+
+/**
+ * This is an abstract class of messages of the synchronization protocol
+ * for message that needs to contain information about the server that
+ * send them and the destination servers to whitch they should be sent.
+ */
+public abstract class RoutableMessage extends SynchronizationMessage implements
+ Serializable
+{
+
+ /**
+ * Special values for the server ids fields contained in the routable
+ * messages.
+ **/
+
+ /**
+ * Specifies that no server is identified.
+ */
+ public static final short UNKNOWN_SERVER = -1;
+ /**
+ * Specifies all servers in the synchronization domain.
+ */
+ public static final short ALL_SERVERS = -2;
+ /**
+ * Inside a topology of servers in the same domain, it specifies
+ * the server that is the "closest" to the sender.
+ */
+ public static final short THE_CLOSEST_SERVER = -3;
+
+ /**
+ * The destination server or servers of this message.
+ */
+ protected short destination = UNKNOWN_SERVER;
+ /**
+ * The serverID of the server that sends this message.
+ */
+ protected short senderID = UNKNOWN_SERVER;
+
+ /**
+ * Creates a routable message.
+ * @param senderID changelog server id
+ * @param destination changelog server id
+ */
+ public RoutableMessage(short senderID, short destination)
+ {
+ this.senderID = senderID;
+ this.destination = destination;
+ }
+
+ /**
+ * Creates a routable message.
+ */
+ public RoutableMessage()
+ {
+ }
+
+ /**
+ * Get the destination.
+ * @return the destination
+ */
+ public short getDestination()
+ {
+ return this.destination;
+ }
+
+ /**
+ * Get the server ID of the server that sent this message.
+ * @return the server id
+ */
+ public short getsenderID()
+ {
+ return this.senderID;
+ }
+}
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java b/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
index b57cc5d..d1ceb76 100644
--- a/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,11 +112,16 @@
/* Read the first 8 bytes containing the packet length */
int length = 0;
+ /* Let's start the stop-watch before waiting on read */
+ /* for the heartbeat check to be operationnal */
+ lastReceiveTime = System.currentTimeMillis();
+
while (length<8)
{
int read = input.read(rcvLengthBuf, length, 8-length);
if (read == -1)
{
+ lastReceiveTime=0;
throw new IOException("no more data");
}
else
@@ -135,8 +140,9 @@
{
length += input.read(buffer, length, totalLength - length);
}
-
- lastReceiveTime = System.currentTimeMillis();
+ /* We do not want the heartbeat to close the session when */
+ /* we are processing a message even a time consuming one. */
+ lastReceiveTime=0;
return SynchronizationMessage.generateMsg(buffer);
}
catch (OutOfMemoryError e)
@@ -159,6 +165,10 @@
*/
public long getLastReceiveTime()
{
+ if (lastReceiveTime==0)
+ {
+ return System.currentTimeMillis();
+ }
return lastReceiveTime;
}
diff --git a/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java b/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
index aab19be..59777e5 100644
--- a/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
+++ b/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,6 +47,13 @@
static final byte MSG_TYPE_CHANGELOG_START = 7;
static final byte MSG_TYPE_WINDOW = 8;
static final byte MSG_TYPE_HEARTBEAT = 9;
+ static final byte MSG_TYPE_INITIALIZE_REQUEST = 10;
+ static final byte MSG_TYPE_INITIALIZE_TARGET = 11;
+ static final byte MSG_TYPE_ENTRY = 12;
+ static final byte MSG_TYPE_DONE = 13;
+ static final byte MSG_TYPE_ERROR = 14;
+ // Adding a new type of message here probably requires to
+ // change accordingly generateMsg method below
/**
* Return the byte[] representation of this message.
@@ -60,6 +67,11 @@
* MSG_TYPE_CHANGELOG_START
* MSG_TYPE_WINDOW
* MSG_TYPE_HEARTBEAT
+ * MSG_TYPE_INITIALIZE
+ * MSG_TYPE_INITIALIZE_TARGET
+ * MSG_TYPE_ENTRY
+ * MSG_TYPE_DONE
+ * MSG_TYPE_ERROR
*
* @return the byte[] representation of this message.
*/
@@ -107,6 +119,21 @@
case MSG_TYPE_HEARTBEAT:
msg = new HeartbeatMessage(buffer);
break;
+ case MSG_TYPE_INITIALIZE_REQUEST:
+ msg = new InitializeRequestMessage(buffer);
+ break;
+ case MSG_TYPE_INITIALIZE_TARGET:
+ msg = new InitializeTargetMessage(buffer);
+ break;
+ case MSG_TYPE_ENTRY:
+ msg = new EntryMessage(buffer);
+ break;
+ case MSG_TYPE_DONE:
+ msg = new DoneMessage(buffer);
+ break;
+ case MSG_TYPE_ERROR:
+ msg = new ErrorMessage(buffer);
+ break;
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
new file mode 100644
index 0000000..090fe76
--- /dev/null
+++ b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -0,0 +1,214 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.tasks;
+
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.core.DirectoryServer.getAttributeType;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+import static org.opends.server.messages.CoreMessages.*;
+import static org.opends.server.messages.MessageHandler.getMessage;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
+import org.opends.server.messages.TaskMessages;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.synchronization.plugin.SynchronizationDomain;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.ResultCode;
+
+/**
+ * This class provides an implementation of a Directory Server task that can
+ * be used to import data from an LDIF file into a backend.
+ */
+public class InitializeTargetTask extends Task
+{
+ // Config properties
+ boolean append = false;
+ boolean isCompressed = false;
+ boolean isEncrypted = false;
+ boolean skipSchemaValidation = false;
+ String domainString = null;
+ SynchronizationDomain domain = null;
+ short target;
+ long total;
+ long left;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void initializeTask() throws DirectoryException
+ {
+ // FIXME -- Do we need any special authorization here?
+ Entry taskEntry = getTaskEntry();
+
+ AttributeType typeDomainBase;
+ AttributeType typeScope;
+
+ typeDomainBase =
+ getAttributeType(ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN, true);
+ typeScope =
+ getAttributeType(ATTR_TASK_INITIALIZE_TARGET_SCOPE, true);
+
+ List<Attribute> attrList;
+ attrList = taskEntry.getAttribute(typeDomainBase);
+ domainString = TaskUtils.getSingleValueString(attrList);
+
+ DN domainDN = DN.nullDN();
+ try
+ {
+ domainDN = DN.decode(domainString);
+ }
+ catch(Exception e)
+ {
+ int msgID = TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN;
+ String message = getMessage(msgID) + e.getMessage();
+ throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX,
+ message, msgID);
+ }
+ domain=SynchronizationDomain.retrievesSynchronizationDomain(domainDN);
+
+ attrList = taskEntry.getAttribute(typeScope);
+ String targetString = TaskUtils.getSingleValueString(attrList);
+ target = domain.decodeTarget(targetString);
+
+ createCounterAttribute(ATTR_TASK_INITIALIZE_LEFT, 0);
+ createCounterAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected TaskState runTask()
+ {
+ if (debugEnabled())
+ {
+ debugInfo("DebugInfo" + "InitializeTarget Task/runTask ");
+ }
+ try
+ {
+ domain.initializeTarget(target, this);
+ }
+ catch(DirectoryException de)
+ {
+ logError(ErrorLogCategory.TASK,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "Initialize Task stopped by error", 1);
+
+ return TaskState.STOPPED_BY_ERROR;
+ }
+ return TaskState.COMPLETED_SUCCESSFULLY;
+ }
+
+ /**
+ * Create attribute to store entry counters.
+ * @param name The name of the attribute.
+ * @param value The value to store for that attribute.
+ */
+ protected void createCounterAttribute(String name, long value)
+ {
+ AttributeType type;
+ LinkedHashSet<AttributeValue> values =
+ new LinkedHashSet<AttributeValue>();
+
+ Entry taskEntry = getTaskEntry();
+ try
+ {
+ type = getAttributeType(name, true);
+ values.add(new AttributeValue(type,
+ new ASN1OctetString(String.valueOf(value))));
+ ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
+ attrList.add(new Attribute(type, name,values));
+ taskEntry.putAttribute(type, attrList);
+ }
+ finally
+ {
+ // taskScheduler.unlockEntry(taskEntryDN, lock);
+ }
+ }
+
+ /**
+ * Set the total number of entries expected to be exported.
+ * @param total The total number of entries.
+ */
+ public void setTotal(long total)
+ {
+ this.total = total;
+ try
+ {
+ updateAttribute(ATTR_TASK_INITIALIZE_LEFT, total);
+ updateAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
+ }
+ catch(Exception e) {}
+ }
+
+ /**
+ * Set the total number of entries still to be exported.
+ * @param left The total number of entries to be exported.
+ */
+ public void setLeft(long left)
+ {
+ this.left = left;
+ try
+ {
+ updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
+ updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
+ }
+ catch(Exception e) {}
+ }
+
+ /**
+ * Update an attribute for this task.
+ * @param name The name of the attribute.
+ * @param value The value.
+ * @throws DirectoryException When an error occurs.
+ */
+ protected void updateAttribute(String name, long value)
+ throws DirectoryException
+ {
+ Entry taskEntry = getTaskEntry();
+
+ ArrayList<Modification> modifications = new ArrayList<Modification>();
+ modifications.add(new Modification(ModificationType.REPLACE,
+ new Attribute(name, String.valueOf(value))));
+
+ taskEntry.applyModifications(modifications);
+ }
+}
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTask.java b/opends/src/server/org/opends/server/tasks/InitializeTask.java
new file mode 100644
index 0000000..f96235b
--- /dev/null
+++ b/opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -0,0 +1,267 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.tasks;
+
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.core.DirectoryServer.getAttributeType;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+import static org.opends.server.messages.CoreMessages.*;
+import static org.opends.server.messages.MessageHandler.getMessage;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
+import org.opends.server.messages.TaskMessages;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.synchronization.plugin.SynchronizationDomain;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.ResultCode;
+
+/**
+ * This class provides an implementation of a Directory Server task that can
+ * be used to import data over the synchronization protocol from another
+ * server hosting the same synchronization domain.
+ */
+public class InitializeTask extends Task
+{
+ boolean isCompressed = false;
+ boolean isEncrypted = false;
+ boolean skipSchemaValidation = false;
+ String domainString = null;
+ short source;
+ SynchronizationDomain domain = null;
+ TaskState initState;
+
+ // The total number of entries expected to be processed when this import
+ // will end successfully
+ long total = 0;
+
+ // The number of entries still to be processed for this import to be
+ // completed
+ long left = 0;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void initializeTask() throws DirectoryException
+ {
+
+ // FIXME -- Do we need any special authorization here?
+ Entry taskEntry = getTaskEntry();
+
+ AttributeType typeDomainBase;
+ AttributeType typeSourceScope;
+
+ typeDomainBase =
+ getAttributeType(ATTR_TASK_INITIALIZE_DOMAIN_DN, true);
+ typeSourceScope =
+ getAttributeType(ATTR_TASK_INITIALIZE_SOURCE, true);
+
+ List<Attribute> attrList;
+ attrList = taskEntry.getAttribute(typeDomainBase);
+ domainString = TaskUtils.getSingleValueString(attrList);
+ DN domainDN = DN.nullDN();
+ try
+ {
+ domainDN = DN.decode(domainString);
+ }
+ catch(Exception e)
+ {
+ int msgID = TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN;
+ String message = getMessage(msgID) + e.getMessage();
+ throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX,
+ message, msgID);
+ }
+
+ domain=SynchronizationDomain.retrievesSynchronizationDomain(domainDN);
+
+
+ attrList = taskEntry.getAttribute(typeSourceScope);
+ String sourceString = TaskUtils.getSingleValueString(attrList);
+ source = domain.decodeSource(sourceString);
+
+ createAttribute(ATTR_TASK_INITIALIZE_LEFT, 0);
+ createAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected TaskState runTask()
+ {
+ if (debugEnabled())
+ {
+ debugInfo("InitializeTask is starting domain: %s source:%d",
+ domain.getBaseDN(), source);
+ }
+ initState = getTaskState(); // RUNNING
+ try
+ {
+ // launch the import
+ domain.initialize(source, this);
+
+ synchronized(initState)
+ {
+ // Waiting for the end of the job
+ while (initState == TaskState.RUNNING)
+ {
+ initState.wait(1000);
+ updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
+ updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
+ }
+ }
+ updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
+ updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
+ }
+ catch(InterruptedException ie) {}
+ catch(DirectoryException de)
+ {
+ int msgID = de.getErrorMessageID();
+ String message = getMessage(msgID, de.getErrorMessage());
+ logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ initState = TaskState.STOPPED_BY_ERROR;
+ }
+
+ if (debugEnabled())
+ {
+ debugInfo("InitializeTask is ending with state:%d", initState);
+ }
+ return initState;
+ }
+
+ /**
+ * Set the state for the current task.
+ *
+ * @param newState The new state value to set
+ * @param de When the new state is different from COMPLETED_SUCCESSFULLY
+ * this is the exception that contains the cause of the failure.
+ */
+ public void setState(TaskState newState, DirectoryException de)
+ {
+ try
+ {
+ if (de != null)
+ {
+ int msgID = de.getErrorMessageID();
+ String message = getMessage(msgID, de.getErrorMessage());
+ logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ if (debugEnabled())
+ {
+ logError(ErrorLogCategory.TASK,
+ ErrorLogSeverity.SEVERE_ERROR,
+ "setState: "+newState, 1);
+ debugInfo("InitializeTask/setState: ", newState);
+ }
+ initState = newState;
+ synchronized (initState)
+ {
+ initState.notify();
+ }
+ }
+ catch(Exception e)
+ {}
+ }
+
+ /**
+ * Create a new attribute the task entry.
+ * @param name The name of the attribute
+ * @param value The value to store
+ */
+ protected void createAttribute(String name, long value)
+ {
+ AttributeType type;
+ LinkedHashSet<AttributeValue> values =
+ new LinkedHashSet<AttributeValue>();
+
+ Entry taskEntry = getTaskEntry();
+ try
+ {
+ type = getAttributeType(name, true);
+ values.add(new AttributeValue(type,
+ new ASN1OctetString(String.valueOf(value))));
+ ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
+ attrList.add(new Attribute(type, name,values));
+ taskEntry.putAttribute(type, attrList);
+ }
+ finally
+ {
+ // taskScheduler.unlockEntry(taskEntryDN, lock);
+ }
+ }
+
+ /**
+ * Update an attribute for this task.
+ * @param name The name of the attribute.
+ * @param value The value.
+ * @throws DirectoryException When an error occurs.
+ */
+ protected void updateAttribute(String name, long value)
+ throws DirectoryException
+ {
+ Entry taskEntry = getTaskEntry();
+
+ ArrayList<Modification> modifications = new ArrayList<Modification>();
+ modifications.add(new Modification(ModificationType.REPLACE,
+ new Attribute(name, String.valueOf(value))));
+
+ taskEntry.applyModifications(modifications);
+ }
+
+ /**
+ * Set the total number of entries expected to be imported.
+ * @param total The total number of entries.
+ */
+ public void setTotal(long total)
+ {
+ this.total = total;
+ }
+
+ /**
+ * Set the total number of entries still to be imported.
+ * @param left The total number of entries to be imported.
+ */
+ public void setLeft(long left)
+ {
+ this.left = left;
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/InitOnLineTest.java
new file mode 100644
index 0000000..a6d43a0
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/InitOnLineTest.java
@@ -0,0 +1,1488 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_INITIALIZE_DONE;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_INITIALIZE_LEFT;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.UUID;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.backends.task.TaskState;
+import org.opends.server.config.ConfigEntry;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.messages.TaskMessages;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.schema.DirectoryStringSyntax;
+import org.opends.server.synchronization.changelog.Changelog;
+import org.opends.server.synchronization.common.LogMessages;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.plugin.ChangelogBroker;
+import org.opends.server.synchronization.plugin.SynchronizationDomain;
+import org.opends.server.synchronization.protocol.ChangelogStartMessage;
+import org.opends.server.synchronization.protocol.DoneMessage;
+import org.opends.server.synchronization.protocol.EntryMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.InitializeTargetMessage;
+import org.opends.server.synchronization.protocol.RoutableMessage;
+import org.opends.server.synchronization.protocol.ServerStartMessage;
+import org.opends.server.synchronization.protocol.SocketSession;
+import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
+import org.opends.server.types.SearchScope;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+/**
+ * Tests contained here:
+ *
+ * Initialize Test Cases <=> Pull entries
+ * ---------------------
+ * InitializeImport : Tests the import in the target DS.
+ * Creates a task on current DS and makes a broker simulates DS2 sending entries.
+ * InitializeExport : Tests the export from the source DS
+ * A broker simulates DS2 pulling entries from current DS.
+ *
+ * Initialize Target Test Cases <=> Push entries
+ * ----------------------------
+ * InitializeTargetExport : Tests the export from the source DS
+ * Creates a task on current DS and makes broker simulates DS2 receiving entries
+ * InitializeTargetImport : Test the import in the target DS
+ * A broker simulates DS2 receiving entries from current DS.
+ *
+ * InitializeTargetConfigErrors : Tests configuration errors of the
+ * InitializeTarget task
+ */
+
+public class InitOnLineTest extends SynchronizationTestCase
+{
+ private static final int WINDOW_SIZE = 10;
+ private static final int CHANGELOG_QUEUE_SIZE = 100;
+
+ private static final String SYNCHRONIZATION_STRESS_TEST =
+ "Synchronization Stress Test";
+
+ /**
+ * A "person" entry
+ */
+ protected Entry personEntry;
+ protected Entry taskInitFromS2;
+ protected Entry taskInitTargetS2;
+ protected Entry taskInitTargetAll;
+
+ SocketSession ssSession = null;
+ boolean ssShutdownRequested = false;
+ protected String[] updatedEntries;
+ boolean externalDS = false;
+ short server1ID = 1;
+ short server2ID = 2;
+ short server3ID = 3;
+ short changelog1ID = 12;
+ short changelog2ID = 13;
+ int changelogPort = 8989;
+
+ private DN baseDn;
+ ChangelogBroker server2 = null;
+ Changelog changelog1 = null;
+ Changelog changelog2 = null;
+ boolean emptyOldChanges = true;
+ SynchronizationDomain sd = null;
+
+ private void log(String s)
+ {
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "InitOnLineTests/" + s, 1);
+ if (debugEnabled())
+ {
+ debugInfo(s);
+ }
+ }
+ protected void log(String message, Exception e)
+ {
+ log(message + stackTraceToSingleLineString(e));
+ }
+
+ /**
+ * Set up the environment for performing the tests in this Class.
+ * synchronization
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ log("Setup: debugEnabled:" + debugEnabled());
+
+ // This test suite depends on having the schema available.
+ TestCaseUtils.startServer();
+
+ // Disable schema check
+ schemaCheck = DirectoryServer.checkSchema();
+ DirectoryServer.setCheckSchema(false);
+
+ baseDn = DN.decode("dc=example,dc=com");
+
+ updatedEntries = newLDIFEntries();
+
+ // Create an internal connection in order to provide operations
+ // to DS to populate the db -
+ connection = InternalClientConnection.getRootConnection();
+
+ // Synchro provider
+ String synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+ // Synchro multi-master
+ synchroPluginStringDN = "cn=Multimaster Synchronization, "
+ + synchroStringDN;
+ String synchroPluginLdif = "dn: "
+ + synchroPluginStringDN
+ + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider\n"
+ + "ds-cfg-synchronization-provider-enabled: true\n"
+ + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
+ synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
+
+ // Synchro suffix
+ synchroServerEntry = null;
+
+ // Add config entries to the current DS server based on :
+ // Add the synchronization plugin: synchroPluginEntry & synchroPluginStringDN
+ // Add synchroServerEntry
+ // Add changeLogEntry
+ configureSynchronization();
+
+ taskInitFromS2 = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-from-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask",
+ "ds-task-initialize-domain-dn: dc=example,dc=com",
+ "ds-task-initialize-replica-server-id: " + server2ID);
+
+ taskInitTargetS2 = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
+ "ds-task-initialize-domain-dn: dc=example,dc=com",
+ "ds-task-initialize-replica-server-id: " + server2ID);
+
+ taskInitTargetAll = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
+ "ds-task-initialize-domain-dn: dc=example,dc=com",
+ "ds-task-initialize-replica-server-id: all");
+
+ // Change log
+ String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
+ String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
+ + "ds-cfg-changelog-server-id: 1\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
+ + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
+ changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
+ changeLogEntry = null;
+
+ }
+
+ // Tests that entries have been written in the db
+ private void testEntriesInDb()
+ {
+ log("TestEntriesInDb");
+ short found = 0;
+
+ for (String entry : updatedEntries)
+ {
+
+ int dns = entry.indexOf("dn: ");
+ int dne = entry.indexOf("dc=com");
+ String dn = entry.substring(dns+4,dne+6);
+
+ log("Search Entry: " + dn);
+
+ DN entryDN = null;
+ try
+ {
+ entryDN = DN.decode(dn);
+ }
+ catch(Exception e)
+ {
+ log("TestEntriesInDb/" + e);
+ }
+
+ try
+ {
+ Entry resultEntry = getEntry(entryDN, 1000, true);
+ if (resultEntry==null)
+ {
+ log("Entry not found <" + dn + ">");
+ }
+ else
+ {
+ log("Entry found <" + dn + ">");
+ found++;
+ }
+ }
+ catch(Exception e)
+ {
+ log("TestEntriesInDb/", e);
+ }
+ }
+
+ assertEquals(found, updatedEntries.length,
+ " Entries present in DB :" + found +
+ " Expected entries :" + updatedEntries.length);
+ }
+
+ private void addTask(Entry taskEntry, ResultCode expectedResult,
+ int errorMessageID)
+ {
+ try
+ {
+ log("AddTask/" + taskEntry);
+
+ // Change config of DS to launch the total update task
+ InternalClientConnection connection =
+ InternalClientConnection.getRootConnection();
+
+ // Add the task.
+
+ AddOperation addOperation =
+ connection.processAdd(taskEntry.getDN(),
+ taskEntry.getObjectClasses(),
+ taskEntry.getUserAttributes(),
+ taskEntry.getOperationalAttributes());
+
+ assertEquals(addOperation.getResultCode(), expectedResult,
+ "Result of ADD operation of the task is: "
+ + addOperation.getResultCode()
+ + " Expected:"
+ + expectedResult + " Details:" + addOperation.getErrorMessage()
+ + addOperation.getAdditionalLogMessage());
+
+ if (expectedResult != ResultCode.SUCCESS)
+ {
+ assertTrue(addOperation.getErrorMessage().toString().
+ startsWith(getMessage(errorMessageID).toString()),
+ "Error MsgID of the task <"
+ + addOperation.getErrorMessage()
+ + "> equals <"
+ + getMessage(errorMessageID) + ">");
+ log("Create config task: <"+ errorMessageID + addOperation.getErrorMessage() + ">");
+
+ }
+ else
+ {
+ waitTaskState(taskEntry, TaskState.RUNNING, -1);
+ }
+
+ // Entry will be removed at the end of the test
+ entryList.addLast(taskEntry.getDN());
+
+ log("AddedTask/" + taskEntry.getDN());
+ }
+ catch(Exception e)
+ {
+ fail("Exception when adding task:"+ e.getMessage());
+ }
+ }
+
+ /**
+ * Wait a task to be completed and check the expected state and expected
+ * stats.
+ * @param taskEntry The task to process.
+ * @param expectedState The expected state fot this task.
+ * @param expectedLeft The expected number of entries still to be processed.
+ * @param expectedDone The expected numner of entries to be processed.
+ */
+ private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
+ long expectedLeft, long expectedDone)
+ {
+ try
+ {
+ // FIXME - Factorize with TasksTestCase
+ // Wait until the task completes.
+ int timeout = 2000;
+
+ AttributeType completionTimeType = DirectoryServer.getAttributeType(
+ ATTR_TASK_COMPLETION_TIME.toLowerCase());
+ SearchFilter filter =
+ SearchFilter.createFilterFromString("(objectclass=*)");
+ Entry resultEntry = null;
+ String completionTime = null;
+ long startMillisecs = System.currentTimeMillis();
+ do
+ {
+ InternalSearchOperation searchOperation =
+ connection.processSearch(taskEntry.getDN(),
+ SearchScope.BASE_OBJECT,
+ filter);
+ try
+ {
+ resultEntry = searchOperation.getSearchEntries().getFirst();
+ } catch (Exception e)
+ {
+ // FIXME How is this possible? Must be issue 858.
+ fail("Task entry was not returned from the search.");
+ continue;
+ }
+ completionTime =
+ resultEntry.getAttributeValue(completionTimeType,
+ DirectoryStringSyntax.DECODER);
+
+ if (completionTime == null)
+ {
+ if (System.currentTimeMillis() - startMillisecs > 1000*timeout)
+ {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ } while (completionTime == null);
+
+ if (completionTime == null)
+ {
+ fail("The task had not completed after " + timeout + " seconds.");
+ }
+
+ // Check that the task state is as expected.
+ AttributeType taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
+ String stateString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+ TaskState taskState = TaskState.fromString(stateString);
+ assertEquals(taskState, expectedState,
+ "The task completed in an unexpected state");
+
+ // Check that the task contains some log messages.
+ AttributeType logMessagesType = DirectoryServer.getAttributeType(
+ ATTR_TASK_LOG_MESSAGES.toLowerCase());
+ ArrayList<String> logMessages = new ArrayList<String>();
+ resultEntry.getAttributeValues(logMessagesType,
+ DirectoryStringSyntax.DECODER,
+ logMessages);
+ if (taskState != TaskState.COMPLETED_SUCCESSFULLY &&
+ logMessages.size() == 0)
+ {
+ fail("No log messages were written to the task entry on a failed task");
+ }
+
+ try
+ {
+ // Check that the task state is as expected.
+ taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
+ stateString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+
+ assertEquals(Long.decode(stateString).longValue(),expectedLeft,
+ "The number of entries to process is not correct.");
+
+ // Check that the task state is as expected.
+ taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
+ stateString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+
+ assertEquals(Long.decode(stateString).longValue(),expectedDone,
+ "The number of entries processed is not correct.");
+
+ }
+ catch(Exception e)
+ {
+ fail("Exception"+ e.getMessage()+e.getStackTrace());
+ }
+
+ }
+ catch(Exception e)
+ {
+ fail("Exception"+ e.getMessage()+e.getStackTrace());
+ }
+ }
+
+ private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
+ int expectedMessage)
+ {
+ TaskState taskState = null;
+ try
+ {
+
+ SearchFilter filter =
+ SearchFilter.createFilterFromString("(objectclass=*)");
+ Entry resultEntry = null;
+ do
+ {
+ InternalSearchOperation searchOperation =
+ connection.processSearch(taskEntry.getDN(),
+ SearchScope.BASE_OBJECT,
+ filter);
+ try
+ {
+ resultEntry = searchOperation.getSearchEntries().getFirst();
+ } catch (Exception e)
+ {
+ // FIXME How is this possible? Must be issue 858.
+ fail("Task entry was not returned from the search.");
+ continue;
+ }
+
+ try
+ {
+ // Check that the task state is as expected.
+ AttributeType taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
+ String stateString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+ taskState = TaskState.fromString(stateString);
+ }
+ catch(Exception e)
+ {
+ fail("Exception"+ e.getMessage()+e.getStackTrace());
+ }
+
+ try
+ {
+ // Check that the left counter.
+ AttributeType taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
+ String leftString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+
+ // Check that the total counter.
+ taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
+ String totalString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+ }
+ catch(Exception e)
+ {
+ fail("Exception"+ e.getMessage()+e.getStackTrace());
+ }
+
+ Thread.sleep(2000);
+ }
+ while ((taskState != expectedTaskState) &&
+ (taskState != TaskState.STOPPED_BY_ERROR));
+
+ // Check that the task contains some log messages.
+ AttributeType logMessagesType = DirectoryServer.getAttributeType(
+ ATTR_TASK_LOG_MESSAGES.toLowerCase());
+ ArrayList<String> logMessages = new ArrayList<String>();
+ resultEntry.getAttributeValues(logMessagesType,
+ DirectoryStringSyntax.DECODER,
+ logMessages);
+
+ if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
+ && (taskState != TaskState.RUNNING))
+ {
+ if (logMessages.size() == 0)
+ {
+ fail("No log messages were written to the task entry on a failed task");
+ }
+ else
+ {
+ if (expectedMessage > 0)
+ {
+ log(logMessages.get(0));
+ log(getMessage(expectedMessage));
+ assertTrue(logMessages.get(0).indexOf(
+ getMessage(expectedMessage))>0);
+ }
+ }
+ }
+
+ assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
+ " Expected task state:" + expectedTaskState);
+ }
+ catch(Exception e)
+ {
+ fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ /**
+ * Add to the current DB the entries necessary to the test
+ */
+ private void addTestEntriesToDB()
+ {
+ try
+ {
+ for (String ldifEntry : updatedEntries)
+ {
+ Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+ entry.getUserAttributes(), entry.getOperationalAttributes());
+ addOp.setInternalOperation(true);
+ addOp.run();
+ if (addOp.getResultCode() != ResultCode.SUCCESS)
+ {
+ log("addEntry: Failed" + addOp.getResultCode());
+ }
+
+ // They will be removed at the end of the test
+ entryList.addLast(entry.getDN());
+ }
+ }
+ catch(Exception e)
+ {
+ fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ /*
+ * Creates entries necessary to the test.
+ */
+ private String[] newLDIFEntries()
+ {
+ String[] entries =
+ {
+ "dn: dc=example,dc=com\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
+ + "\n",
+ "dn: ou=People,dc=example,dc=com\n"
+ + "objectClass: top\n"
+ + "objectClass: organizationalUnit\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
+ + "\n",
+ "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n"
+ + "objectclass: top\n"
+ + "objectclass: person\n"
+ + "objectclass: organizationalPerson\n"
+ + "cn: Fiona Jensen\n"
+ + "sn: Jensen\n"
+ + "uid: fiona\n"
+ + "telephonenumber: +1 408 555 1212\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
+ + "\n",
+ "dn: cn=Robert Langman,ou=people,dc=example,dc=com\n"
+ + "objectclass: top\n"
+ + "objectclass: person\n"
+ + "objectclass: organizationalPerson\n"
+ + "cn: Robert Langman\n"
+ + "sn: Langman\n"
+ + "uid: robert\n"
+ + "telephonenumber: +1 408 555 1213\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
+ + "\n"
+ };
+
+ return entries;
+ }
+
+ /**
+ * Broker will send the entries to a server.
+ * @param broker The broker that will send the entries.
+ * @param senderID The serverID of this broker.
+ * @param destinationServerID The target server.
+ * @param requestorID The initiator server.
+ */
+ private void makeBrokerPublishEntries(ChangelogBroker broker,
+ short senderID, short destinationServerID, short requestorID)
+ {
+ // Send entries
+ try
+ {
+ RoutableMessage initTargetMessage = new InitializeTargetMessage(
+ baseDn, server2ID, destinationServerID, requestorID, updatedEntries.length);
+ broker.publish(initTargetMessage);
+
+ for (String entry : updatedEntries)
+ {
+ log("Broker will pusblish 1 entry: bytes:"+ entry.length());
+
+ EntryMessage entryMsg = new EntryMessage(senderID, destinationServerID,
+ entry.getBytes());
+ broker.publish(entryMsg);
+ }
+
+ DoneMessage doneMsg = new DoneMessage(senderID, destinationServerID);
+ broker.publish(doneMsg);
+
+ log("Broker " + senderID + " published entries");
+
+ }
+ catch(Exception e)
+ {
+ fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " "
+ + stackTraceToSingleLineString(e));
+ }
+ }
+
+ void receiveUpdatedEntries(ChangelogBroker broker, short serverID,
+ String[] updatedEntries)
+ {
+ // Expect the broker to receive the entries
+ SynchronizationMessage msg;
+ short entriesReceived = 0;
+ while (true)
+ {
+ try
+ {
+ log("Broker " + serverID + " Wait for entry or done msg");
+ msg = broker.receive();
+
+ if (msg == null)
+ break;
+
+ if (msg instanceof InitializeTargetMessage)
+ {
+ log("Broker " + serverID + " receives InitializeTargetMessage ");
+ entriesReceived = 0;
+ }
+ else if (msg instanceof EntryMessage)
+ {
+ EntryMessage em = (EntryMessage)msg;
+ log("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
+ entriesReceived++;
+ }
+ else if (msg instanceof DoneMessage)
+ {
+ log("Broker " + serverID + " receives done ");
+ break;
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ ErrorMessage em = (ErrorMessage)msg;
+ log("Broker " + serverID + " receives ERROR "
+ + getMessage(em.getMsgID())
+ + " " + em.getDetails());
+ break;
+ }
+ else
+ {
+ log("Broker " + serverID + " receives and trashes " + msg);
+ }
+ }
+ catch(Exception e)
+ {
+ log("receiveUpdatedEntries" + stackTraceToSingleLineString(e));
+ }
+ }
+
+ assertTrue(entriesReceived == updatedEntries.length,
+ " Received entries("+entriesReceived +
+ ") == Expected entries("+updatedEntries.length+")");
+ }
+
+ /**
+ * Creates a new changelog server.
+ * @param changelogId The serverID of the changelog to create.
+ * @return The new changelog server.
+ */
+ private Changelog createChangelogServer(short changelogId)
+ {
+ try
+ {
+ if ((changelogId==changelog1ID)&&(changelog1!=null))
+ return changelog1;
+
+ if ((changelogId==changelog2ID)&&(changelog2!=null))
+ return changelog2;
+
+ {
+ int chPort = getChangelogPort(changelogId);
+
+ // Create a changelog server
+ String changelogLdif = "dn: cn=Changelog Server\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n"
+ + "ds-cfg-changelog-port: " + chPort + "\n"
+ + "ds-cfg-changelog-server-id: " + changelogId + "\n"
+// + "ds-cfg-heartbeat-interval: 0 ms\n"
+ + "ds-cfg-window-size: 100" + "\n";
+
+ if (changelogId==changelog2ID)
+ {
+ changelogLdif += new String(
+ "ds-cfg-changelog-server: localhost:"
+ + getChangelogPort(changelog1ID)+"\n");
+ }
+ Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
+ ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
+ Changelog changelog = new Changelog(changelogConfig);
+
+ Thread.sleep(1000);
+
+ return changelog;
+ }
+ }
+ catch (Exception e)
+ {
+ fail("createChangelog" + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Create a synchronized suffix in the current server providing the
+ * changelog serverID.
+ * @param changelogID
+ */
+ private void connectServer1ToChangelog(short changelogID)
+ {
+ // Connect DS to the changelog
+ try
+ {
+ // suffix synchronized
+ String synchroServerStringDN = synchroPluginStringDN;
+ String synchroServerLdif = "dn: cn=example," + synchroServerStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider-config\n"
+ + "cn: example\n"
+ + "ds-cfg-synchronization-dn: dc=example,dc=com\n"
+ + "ds-cfg-changelog-server: localhost:"
+ + getChangelogPort(changelogID)+"\n"
+ + "ds-cfg-directory-server-id: " + server1ID + "\n"
+ + "ds-cfg-receive-status: true\n"
+// + "ds-cfg-heartbeat-interval: 0 ms\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE;
+
+ if (synchroServerEntry == null)
+ {
+ synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the synchronized server");
+ entryList.add(synchroServerEntry.getDN());
+
+ sd = SynchronizationDomain.retrievesSynchronizationDomain(baseDn);
+
+ // Clear the backend
+ SynchronizationDomain.clearJEBackend(false,
+ sd.getBackend().getBackendID(),
+ baseDn.toNormalizedString());
+
+ }
+ if (sd != null)
+ {
+ log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
+ }
+ }
+ catch(Exception e)
+ {
+ log("connectServer1ToChangelog", e);
+ fail("connectServer1ToChangelog", e);
+ }
+ }
+
+ private int getChangelogPort(short changelogID)
+ {
+ return (changelogPort+changelogID);
+ }
+
+ /**
+ * Tests the import side of the Initialize task
+ */
+ @Test(enabled=false)
+ public void InitializeImport() throws Exception
+ {
+ String testCase = "InitializeImport";
+
+ log("Starting "+testCase);
+
+ try
+ {
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Connect DS to the changelog
+ connectServer1ToChangelog(changelog1ID);
+
+ if (server2 == null)
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
+
+ Thread.sleep(2000);
+
+ // In S1 launch the total update
+ addTask(taskInitFromS2, ResultCode.SUCCESS, 0);
+
+ // S2 should receive init msg
+ SynchronizationMessage msg;
+ msg = server2.receive();
+ if (!(msg instanceof InitializeRequestMessage))
+ {
+ fail(testCase + " Message received by S2 is of unexpected class" + msg);
+ }
+ InitializeRequestMessage initMsg = (InitializeRequestMessage)msg;
+
+ // S2 publishes entries to S1
+ makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(),
+ initMsg.getsenderID());
+
+ // Wait for task (import) completion in S1
+ waitTaskCompleted(taskInitFromS2, TaskState.COMPLETED_SUCCESSFULLY,
+ 0, updatedEntries.length);
+
+ // Test import result in S1
+ testEntriesInDb();
+
+ cleanEntries();
+
+ log("Successfully ending " + testCase);
+ }
+ catch(Exception e)
+ {
+ fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ /**
+ * Tests the export side of the Initialize task
+ */
+ @Test(enabled=false)
+ public void InitializeExport() throws Exception
+ {
+ String testCase = "Synchronization/InitializeExport";
+
+ log("Starting "+testCase);
+
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Connect DS to the changelog
+ connectServer1ToChangelog(changelog1ID);
+
+ addTestEntriesToDB();
+
+ if (server2 == null)
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
+
+ Thread.sleep(3000);
+
+ InitializeRequestMessage initMsg = new InitializeRequestMessage(baseDn,
+ server2ID, server1ID);
+ server2.publish(initMsg);
+
+ receiveUpdatedEntries(server2, server2ID, updatedEntries);
+
+ cleanEntries();
+
+ log("Successfully ending "+testCase);
+}
+
+ /**
+ * Tests the import side of the InitializeTarget task
+ */
+ @Test(enabled=false)
+ public void InitializeTargetExport() throws Exception
+ {
+ String testCase = "Synchronization/InitializeTargetExport";
+
+ log("Starting " + testCase);
+
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ // Add in S1 the entries to be exported
+ addTestEntriesToDB();
+
+ // S1 is the server we are running in, S2 is simulated by a broker
+ if (server2 == null)
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
+
+ Thread.sleep(1000);
+
+ // Launch in S1 the task that will initialize S2
+ addTask(taskInitTargetS2, ResultCode.SUCCESS, 0);
+
+ // Wait for task completion
+ waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, -1);
+
+ // Tests that entries have been received by S2
+ receiveUpdatedEntries(server2, server2ID, updatedEntries);
+
+ cleanEntries();
+
+ log("Successfully ending " + testCase);
+
+ }
+
+ /**
+ * Tests the import side of the InitializeTarget task
+ */
+ @Test(enabled=false)
+ public void InitializeTargetExportAll() throws Exception
+ {
+ String testCase = "Synchronization/InitializeTargetExportAll";
+
+ log("Starting " + testCase);
+
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ // Add in S1 the entries to be exported
+ addTestEntriesToDB();
+
+ // S1 is the server we are running in, S2 and S3 are simulated by brokers
+ if (server2==null)
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
+
+ ChangelogBroker server3 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
+
+ Thread.sleep(1000);
+
+ // Launch in S1 the task that will initialize S2
+ addTask(taskInitTargetAll, ResultCode.SUCCESS, 0);
+
+ // Wait for task completion
+ waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, -1);
+
+ // Tests that entries have been received by S2
+ receiveUpdatedEntries(server2, server2ID, updatedEntries);
+ receiveUpdatedEntries(server3, server3ID, updatedEntries);
+
+ cleanEntries();
+
+ log("Successfully ending " + testCase);
+
+ }
+
+ /**
+ * Tests the import side of the InitializeTarget task
+ */
+ @Test(enabled=false)
+ public void InitializeTargetImport() throws Exception
+ {
+ String testCase = "InitializeTargetImport";
+
+ try
+ {
+ log("Starting " + testCase + " debugEnabled:" + debugEnabled());
+
+ // Start SS
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // S1 is the server we are running in, S2 is simulated by a broker
+ if (server2==null)
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ // S2 publishes entries to S1
+ makeBrokerPublishEntries(server2, server2ID, server1ID, server2ID);
+
+ Thread.sleep(10000); // FIXME - how to know the import is done
+
+ // Test that entries have been imported in S1
+ testEntriesInDb();
+
+ cleanEntries();
+
+ log("Successfully ending " + testCase);
+ }
+ catch(Exception e)
+ {
+ fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ /**
+ * Tests the import side of the InitializeTarget task
+ */
+ @Test(enabled=false)
+ public void InitializeTargetConfigErrors() throws Exception
+ {
+ String testCase = "InitializeTargetConfigErrors";
+
+ try
+ {
+ log("Starting " + testCase);
+
+ // Invalid domain base dn
+ Entry taskInitTarget = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
+ "ds-task-initialize-domain-dn: foo",
+ "ds-task-initialize-remote-replica-server-id: " + server2ID);
+ addTask(taskInitTarget, ResultCode.INVALID_DN_SYNTAX,
+ TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
+
+ // Domain base dn not related to any domain
+ taskInitTarget = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
+ "ds-task-initialize-domain-dn: dc=foo",
+ "ds-task-initialize-remote-replica-server-id: " + server2ID);
+ addTask(taskInitTarget, ResultCode.OTHER,
+ LogMessages.MSGID_NO_MATCHING_DOMAIN);
+
+ // Invalid scope
+ // createTask(taskInitTargetS2);
+
+ // Scope containing a serverID absent from the domain
+ // createTask(taskInitTargetS2);
+
+ cleanEntries();
+
+ log("Successfully ending " + testCase);
+ }
+ catch(Exception e)
+ {
+ fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ /**
+ * Tests the import side of the InitializeTarget task
+ */
+ @Test(enabled=false)
+ public void InitializeConfigErrors() throws Exception
+ {
+ String testCase = "InitializeConfigErrors";
+
+ try
+ {
+ log("Starting " + testCase);
+
+ // Start SS
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ // Invalid domain base dn
+ Entry taskInit = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask",
+ "ds-task-initialize-domain-dn: foo",
+ "ds-task-initialize-source: " + server2ID);
+ addTask(taskInit, ResultCode.INVALID_DN_SYNTAX,
+ TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
+
+ // Domain base dn not related to any domain
+ taskInit = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask",
+ "ds-task-initialize-domain-dn: dc=foo",
+ "ds-task-initialize-source: " + server2ID);
+ addTask(taskInit, ResultCode.OTHER, LogMessages.MSGID_NO_MATCHING_DOMAIN);
+
+ // Invalid Source
+ taskInit = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask",
+ "ds-task-initialize-domain-dn: " + baseDn,
+ "ds-task-initialize-source: -3");
+ addTask(taskInit, ResultCode.OTHER,
+ LogMessages.MSGID_INVALID_IMPORT_SOURCE);
+
+ // Scope containing a serverID absent from the domain
+ // createTask(taskInitTargetS2);
+
+ cleanEntries();
+
+ log("Successfully ending " + testCase);
+ }
+ catch(Exception e)
+ {
+ fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+ }
+ }
+
+ @Test(enabled=false)
+ public void InitializeTargetBroken() throws Exception
+ {
+ String testCase = "InitializeTargetBroken";
+ fail(testCase + " NYI");
+ }
+
+ @Test(enabled=false)
+ public void InitializeBroken() throws Exception
+ {
+ String testCase = "InitializeBroken";
+ fail(testCase + " NYI");
+ }
+
+ @Test(enabled=false)
+ public void InitializeTargetExportMultiSS() throws Exception
+ {
+ String testCase = "Synchronization/InitializeTargetExportMultiSS";
+
+ log("Starting " + testCase);
+
+ // Create 2 changelogs
+ changelog1 = createChangelogServer(changelog1ID);
+
+ changelog2 = createChangelogServer(changelog2ID);
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ // Add in S1 the entries to be exported
+ addTestEntriesToDB();
+
+ // S1 is the server we are running in, S2 is simulated by a broker
+ // connected to changelog2
+ if (server2 == null)
+ {
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
+ }
+
+ Thread.sleep(1000);
+
+ // Launch in S1 the task that will initialize S2
+ addTask(taskInitTargetS2, ResultCode.SUCCESS, 0);
+
+ // Wait for task completion
+ waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, -1);
+
+ // Tests that entries have been received by S2
+ receiveUpdatedEntries(server2, server2ID, updatedEntries);
+
+ cleanEntries();
+
+ changelog2.shutdown();
+ changelog2 = null;
+
+ log("Successfully ending " + testCase);
+
+ }
+
+ @Test(enabled=false)
+ public void InitializeExportMultiSS() throws Exception
+ {
+ String testCase = "Synchronization/InitializeExportMultiSS";
+ log("Starting "+testCase);
+
+ // Create 2 changelogs
+ changelog1 = createChangelogServer(changelog1ID);
+ Thread.sleep(3000);
+
+ changelog2 = createChangelogServer(changelog2ID);
+ Thread.sleep(3000);
+
+ // Connect DS to the changelog 1
+ connectServer1ToChangelog(changelog1ID);
+
+ // Put entries in DB
+ addTestEntriesToDB();
+
+ // Connect a broker acting as server 2 to changelog2
+ if (server2 == null)
+ {
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog2ID),
+ 1000, emptyOldChanges);
+ }
+
+ Thread.sleep(3000);
+
+ // S2 sends init request
+ InitializeRequestMessage initMsg =
+ new InitializeRequestMessage(baseDn, server2ID, server1ID);
+ server2.publish(initMsg);
+
+ // S2 should receive target, entries & done
+ receiveUpdatedEntries(server2, server2ID, updatedEntries);
+
+ cleanEntries();
+
+ changelog2.shutdown();
+ changelog2 = null;
+
+ log("Successfully ending "+testCase);
+ }
+
+ @Test(enabled=false)
+ public void InitializeNoSource() throws Exception
+ {
+ String testCase = "InitializeNoSource";
+ log("Starting "+testCase);
+
+ // Start SS
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ Entry taskInit = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-from-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask",
+ "ds-task-initialize-domain-dn: "+baseDn,
+ "ds-task-initialize-replica-server-id: " + 20);
+
+ addTask(taskInit, ResultCode.SUCCESS, 0);
+
+ waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
+ LogMessages.MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN);
+
+ if (sd != null)
+ {
+ log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
+ }
+
+ log("Successfully ending "+testCase);
+
+ }
+
+ @Test(enabled=false)
+ public void InitializeTargetNoTarget() throws Exception
+ {
+ String testCase = "InitializeTargetNoTarget" + baseDn;
+ log("Starting "+testCase);
+
+ // Start SS
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ // Put entries in DB
+ addTestEntriesToDB();
+
+ Entry taskInit = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-target",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
+ "ds-task-initialize-target-domain-dn: "+baseDn,
+ "ds-task-initialize-target-scope: " + 10);
+
+ addTask(taskInit, ResultCode.SUCCESS, 0);
+
+ waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
+ LogMessages.MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN);
+
+ if (sd != null)
+ {
+ log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
+ }
+
+ log("Successfully ending "+testCase);
+ }
+
+ @Test(enabled=false)
+ public void InitializeStopped() throws Exception
+ {
+ String testCase = "InitializeStopped";
+ fail(testCase + " NYI");
+ }
+ @Test(enabled=false)
+ public void InitializeTargetStopped() throws Exception
+ {
+ String testCase = "InitializeTargetStopped";
+ fail(testCase + " NYI");
+ }
+ @Test(enabled=false)
+ public void InitializeCompressed() throws Exception
+ {
+ String testCase = "InitializeStopped";
+ fail(testCase + " NYI");
+ }
+ @Test(enabled=false)
+ public void InitializeTargetEncrypted() throws Exception
+ {
+ String testCase = "InitializeTargetCompressed";
+ fail(testCase + " NYI");
+ }
+
+ @Test(enabled=false)
+ public void InitializeSimultaneous() throws Exception
+ {
+ String testCase = "InitializeSimultaneous";
+
+ // Start SS
+ changelog1 = createChangelogServer(changelog1ID);
+
+ // Connect a broker acting as server 2 to changelog2
+ if (server2 == null)
+ {
+ server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog1ID),
+ 1000, emptyOldChanges);
+ }
+
+ // Creates config to synchronize suffix
+ connectServer1ToChangelog(changelog1ID);
+
+ Entry taskInit = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-from-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask",
+ "ds-task-initialize-domain-dn: "+baseDn,
+ "ds-task-initialize-replica-server-id: " + server2ID);
+
+ addTask(taskInit, ResultCode.SUCCESS, 0);
+
+ Thread.sleep(3000);
+
+ Entry taskInit2 = TestCaseUtils.makeEntry(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks",
+ "objectclass: top",
+ "objectclass: ds-task",
+ "objectclass: ds-task-initialize-from-remote-replica",
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask",
+ "ds-task-initialize-domain-dn: "+baseDn,
+ "ds-task-initialize-replica-server-id: " + server2ID);
+
+ // Second task is expected to be rejected
+ addTask(taskInit2, ResultCode.SUCCESS, 0);
+
+ waitTaskState(taskInit2, TaskState.STOPPED_BY_ERROR,
+ LogMessages.MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED);
+
+ // First task is stilll running
+ waitTaskState(taskInit, TaskState.RUNNING, -1);
+
+ // External request is supposed to be rejected
+
+ // Now tests error in the middle of an import
+ // S2 sends init request
+ ErrorMessage msg =
+ new ErrorMessage(server1ID, 1, "");
+ server2.publish(msg);
+
+ waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
+ 1);
+
+ cleanEntries();
+
+ log("Successfully ending "+testCase);
+
+ }
+
+ /**
+ * Disconnect broker and remove entries from the local DB
+ * @throws Exception
+ */
+ protected void cleanEntries()
+ {
+
+ if (sd != null)
+ {
+ log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning());
+ }
+
+ // Clean brokers
+ if (server2 != null)
+ {
+ server2.stop();
+
+ TestCaseUtils.sleep(100); // give some time to the broker to disconnect
+ // fromthe changelog server.
+ server2 = null;
+ }
+ super.cleanEntries();
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index fd4e449..7374e38 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,6 +26,7 @@
*/
package org.opends.server.synchronization;
+import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -54,6 +55,8 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ByteStringFactory;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -158,7 +161,11 @@
}
}
catch (Exception e)
- { }
+ {
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1);
+ }
}
return broker;
}
@@ -221,7 +228,8 @@
}
}
catch (Exception e)
- { }
+ {
+ }
}
return broker;
}
@@ -231,6 +239,10 @@
*/
protected void cleanEntries()
{
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "SynchronizationTestCase/Cleaning entries" , 1);
+
DeleteOperation op;
// Delete entries
try
@@ -238,6 +250,10 @@
while (true)
{
DN dn = entryList.removeLast();
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "cleaning entry " + dn, 1);
+
op = new DeleteOperation(connection, InternalClientConnection
.nextOperationID(), InternalClientConnection.nextMessageID(), null,
dn);
@@ -264,8 +280,13 @@
// WORKAROUND FOR BUG #639 - BEGIN -
if (mms != null)
{
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "SynchronizationTestCase/FinalizeSynchronization Provider" , 1);
+
DirectoryServer.deregisterSynchronizationProvider(mms);
mms.finalizeSynchronizationProvider();
+ mms = null;
}
// WORKAROUND FOR BUG #639 - END -
@@ -303,17 +324,22 @@
//
// Add the changelog server
- DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
- assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
+ if (changeLogEntry!=null)
+ {
+ DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
"Unable to add the changeLog server");
- entryList.add(changeLogEntry.getDN());
-
- //
- // We also have a replicated suffix (synchronization domain)
- DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
- assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
- "Unable to add the synchronized server");
- entryList.add(synchroServerEntry.getDN());
+ entryList.add(changeLogEntry.getDN());
+ }
+
+ if (synchroServerEntry!=null)
+ {
+ // We also have a replicated suffix (synchronization domain)
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the synchronized suffix");
+ entryList.add(synchroServerEntry.getDN());
+ }
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index 0b1bc2e..dcf23e0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@
+ "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
+ "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
+ "ds-cfg-window-size: 100" + "\n"
- + "ds-cfg-changelog-db-dirname: changelogDb"+i;
+ + "ds-cfg-changelog-db-directory: changelogDb"+i;
Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
changelogs[i] = new Changelog(changelogConfig);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
index 20e8a20..0dccba8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,16 +26,18 @@
*/
package org.opends.server.synchronization.protocol;
+import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.UUID;
import java.util.zip.DataFormatException;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import static org.testng.Assert.*;
-
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -56,8 +58,8 @@
import org.opends.server.types.ObjectClass;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
-
-import static org.opends.server.synchronization.protocol.OperationContext.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
/**
* Test the contructors, encoders and decoders of the synchronization
@@ -520,6 +522,104 @@
}
/**
+ * Test that EntryMessage encoding and decoding works
+ * by checking that : msg == new EntryMessageTest(msg.getBytes()).
+ */
+ @Test()
+ public void EntryMessageTest() throws Exception
+ {
+ String taskInitFromS2 = new String(
+ "dn: ds-task-id=" + UUID.randomUUID() +
+ ",cn=Scheduled Tasks,cn=Tasks\n" +
+ "objectclass: top\n" +
+ "objectclass: ds-task\n" +
+ "objectclass: ds-task-initialize\n" +
+ "ds-task-class-name: org.opends.server.tasks.InitializeTask" +
+ "ds-task-initialize-domain-dn: dc=example,dc=com" +
+ "ds-task-initialize-source: 1");
+ short sender = 1;
+ short target = 2;
+ byte[] entry = taskInitFromS2.getBytes();
+ EntryMessage msg = new EntryMessage(sender, target, entry);
+ EntryMessage newMsg = new EntryMessage(msg.getBytes());
+ assertEquals(msg.getsenderID(), newMsg.getsenderID());
+ assertEquals(msg.getDestination(), newMsg.getDestination());
+ assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
+ }
+
+ /**
+ * Test that InitializeRequestMessage encoding and decoding works
+ */
+ @Test()
+ public void InitializeRequestMessageTest() throws Exception
+ {
+ short sender = 1;
+ short target = 2;
+ InitializeRequestMessage msg = new InitializeRequestMessage(
+ DN.decode("dc=example"), sender, target);
+ InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes());
+ assertEquals(msg.getsenderID(), newMsg.getsenderID());
+ assertEquals(msg.getDestination(), newMsg.getDestination());
+ assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
+ }
+
+ /**
+ * Test that InitializeTargetMessage encoding and decoding works
+ */
+ @Test()
+ public void InitializeTargetMessageTest() throws Exception
+ {
+ short senderID = 1;
+ short targetID = 2;
+ short requestorID = 3;
+ long entryCount = 4;
+ DN baseDN = DN.decode("dc=example");
+
+ InitializeTargetMessage msg = new InitializeTargetMessage(
+ baseDN, senderID, targetID, requestorID, entryCount);
+ InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes());
+ assertEquals(msg.getsenderID(), newMsg.getsenderID());
+ assertEquals(msg.getDestination(), newMsg.getDestination());
+ assertEquals(msg.getRequestorID(), newMsg.getRequestorID());
+ assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
+ assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ;
+
+ assertEquals(senderID, newMsg.getsenderID());
+ assertEquals(targetID, newMsg.getDestination());
+ assertEquals(requestorID, newMsg.getRequestorID());
+ assertEquals(entryCount, newMsg.getEntryCount());
+ assertTrue(baseDN.equals(newMsg.getBaseDN())) ;
+
+ }
+
+ /**
+ * Test that DoneMessage encoding and decoding works
+ */
+ @Test()
+ public void DoneMessage() throws Exception
+ {
+ DoneMessage msg = new DoneMessage((short)1, (short)2);
+ DoneMessage newMsg = new DoneMessage(msg.getBytes());
+ assertEquals(msg.getsenderID(), newMsg.getsenderID());
+ assertEquals(msg.getDestination(), newMsg.getDestination());
+ }
+
+ /**
+ * Test that ErrorMessage encoding and decoding works
+ */
+ @Test()
+ public void ErrorMessage() throws Exception
+ {
+ ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
+ ErrorMessage newMsg = new ErrorMessage(msg.getBytes());
+ assertEquals(msg.getsenderID(), newMsg.getsenderID());
+ assertEquals(msg.getDestination(), newMsg.getDestination());
+ assertEquals(msg.getMsgID(), newMsg.getMsgID());
+ assertEquals(msg.getDetails(), newMsg.getDetails());
+ }
+
+
+ /**
* Test PendingChange
*/
private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)
--
Gitblit v1.10.0