opends/resource/schema/02-config.ldif
@@ -1045,6 +1045,10 @@ NAME 'ds-cfg-heartbeat-interval' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306 NAME 'ds-cfg-changelog-db-directory' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307 NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' ) @@ -1125,6 +1129,22 @@ NAME 'ds-cfg-virtual-attribute-conflict-behavior' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332 NAME 'ds-task-initialize-domain-dn' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333 NAME 'ds-task-initialize-replica-server-id' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334 NAME 'ds-task-unprocessed-entry-count' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335 NAME 'ds-task-processed-entry-count' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.336 NAME 'ds-cfg-dictionary-file' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) @@ -1481,7 +1501,8 @@ 'ds-cfg-synchronization-changelog-server-config' SUP top STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port ) MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $ ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' ) ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory' SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn ) X-ORIGIN 'OpenDS Directory Server' ) @@ -1594,6 +1615,16 @@ ds-cfg-virtual-attribute-conflict-behavior ) MAY ( ds-cfg-virtual-attribute-base-dn $ ds-cfg-virtual-attribute-group-dn $ ds-cfg-virtual-attribute-filter ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.92 NAME 'ds-task-initialize-from-remote-replica' SUP ds-task MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id ) MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.93 NAME 'ds-task-initialize-remote-replica' SUP ds-task MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id ) MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.95 NAME 'ds-cfg-dictionary-password-validator' SUP ds-cfg-password-validator STRUCTURAL MUST ( ds-cfg-dictionary-file $ ds-cfg-case-sensitive-validation $ opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,6 +3665,62 @@ NAME_PREFIX_TASK + "import-is-encrypted"; /** * The name of the objectclass that will be used for a Directory Server * initialize task definition. */ public static final String OC_INITIALIZE_TASK = NAME_PREFIX_TASK + "initialize-from-remote-replica"; /** * The name of the attribute in an initialize task definition that specifies * the base dn related to the synchonization domain to initialize. */ public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN = NAME_PREFIX_TASK + "initialize-domain-dn"; /** * The name of the attribute in an initialize target task definition that * specifies the source in terms of source server from which to initialize. */ public static final String ATTR_TASK_INITIALIZE_SOURCE = NAME_PREFIX_TASK + "initialize-replica-server-id"; /** * The name of the objectclass that will be used for a Directory Server * initialize target task definition. */ public static final String OC_INITIALIZE_TARGET_TASK = NAME_PREFIX_TASK + "initialize-remote-replica"; /** * The name of the attribute in an initialize target task definition that * specifies the base dn related to the synchonization domain to initialize. */ public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN = NAME_PREFIX_TASK + "initialize-domain-dn"; /** * The name of the attribute in an initialize target task definition that * specifies the scope in terms of servers to initialize. */ public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE = NAME_PREFIX_TASK + "initialize-replica-server-id"; /** * The name of the attribute in an initialize target task definition that * specifies the scope in terms of servers to initialize. */ public static final String ATTR_TASK_INITIALIZE_LEFT = NAME_PREFIX_TASK + "unprocessed-entry-count"; /** * The name of the attribute in an initialize target task definition that * specifies the scope in terms of servers to initialize. */ public static final String ATTR_TASK_INITIALIZE_DONE = NAME_PREFIX_TASK + "processed-entry-count"; /** * The name of the objectclass that will be used for a Directory Server opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,8 +211,6 @@ public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER = CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17; /** * The message ID for the message that will be used an attempt is made to * invoke the index rebuild task by a user that does not have the required @@ -221,6 +219,20 @@ public static final int MSGID_TASK_INDEXREBUILD_INSUFFICIENT_PRIVILEGES = CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18; /** * The message ID for the message that will be used when an invalid domain * base DN is provided as argument to the initialize target task. */ public static final int MSGID_TASK_INITIALIZE_TARGET_INVALID_DN = CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19; /** * The message ID for the message that will be used when an invalid domain * base DN is provided as argument to the initialize task. */ public static final int MSGID_TASK_INITIALIZE_INVALID_DN = CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 20; /** @@ -292,6 +304,12 @@ registerMessage(MSGID_TASK_INDEXREBUILD_INSUFFICIENT_PRIVILEGES, "You do not have sufficient privileges to initiate an " + "index rebuild."); registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN, "Invalid DN provided with the Initialize Target task."); registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN, "Invalid DN provided with the Initialize task."); } } opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@ static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port"; static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size"; static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname"; static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory"; static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay"; opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,27 +26,32 @@ */ package org.opends.server.synchronization.changelog; import static org.opends.server.loggers.Error.logError; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.loggers.Error.logError; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import com.sleepycat.je.DatabaseException; import org.opends.server.types.DN; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.InitializeRequestMessage; import org.opends.server.synchronization.protocol.ErrorMessage; import org.opends.server.synchronization.protocol.RoutableMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import org.opends.server.types.DN; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import com.sleepycat.je.DatabaseException; /** * This class define an in-memory cache that will be used to store * the messages that have been received from an LDAP server or @@ -425,148 +430,263 @@ } } /** * Send back an ack to the server that sent the change. * Retrieves the destination handlers for a routable message. * * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. * @param msg The message to route. * @param senderHandler The handler of the server that published this message. * @return The list of destination handlers. */ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) protected List<ServerHandler> getDestinationServers(RoutableMessage msg, ServerHandler senderHandler) { short serverId = changeNumber.getServerId(); sendAck(changeNumber, isLDAPserver, serverId); } /** * * Send back an ack to a server that sent the change. * * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. * @param serverId The identifier of the server from which we * received the change.. */ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, short serverId) { ServerHandler handler; if (isLDAPserver) handler = connectedServers.get(serverId); else handler = changelogServers.get(serverId); List<ServerHandler> servers = new ArrayList<ServerHandler>(); // TODO : check for null handler and log error try logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "getDestinationServers" + " msgDest:" + msg.getDestination() , 1); if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER) { handler.sendAck(changeNumber); } catch (IOException e) { /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. */ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); handler.shutdown(); // TODO Import from the "closest server" to be implemented } } /** * Shutdown this ChangelogCache. */ public void shutdown() { // Close session with other changelogs for (ServerHandler serverHandler : changelogServers.values()) else if (msg.getDestination() == RoutableMessage.ALL_SERVERS) { serverHandler.shutdown(); } // Close session with other LDAP servers for (ServerHandler serverHandler : connectedServers.values()) { serverHandler.shutdown(); } // Shutdown the dbHandlers synchronized (sourceDbHandlers) { for (DbHandler dbHandler : sourceDbHandlers.values()) if (!senderHandler.isChangelogServer()) { dbHandler.shutdown(); // Send to all changelogServers for (ServerHandler destinationHandler : changelogServers.values()) { servers.add(destinationHandler); } } sourceDbHandlers.clear(); // Send to all connected LDAP servers for (ServerHandler destinationHandler : connectedServers.values()) { // Don't loop on the sender if (destinationHandler == senderHandler) continue; servers.add(destinationHandler); } } else { // Destination is one server ServerHandler destinationHandler = connectedServers.get(msg.getDestination()); if (destinationHandler != null) { servers.add(destinationHandler); } else { // the targeted server is NOT connected if (senderHandler.isLDAPserver()) { // let's forward to the other changelogs servers.addAll(changelogServers.values()); } } } return servers; } /** * Returns the ServerState describing the last change from this replica. * Process an InitializeRequestMessage. * * @return The ServerState describing the last change from this replica. * @param msg The message received and to be processed. * @param senderHandler The server handler of the server that emitted * the message. */ public ServerState getDbServerState() public void process(RoutableMessage msg, ServerHandler senderHandler) { ServerState serverState = new ServerState(); for (DbHandler db : sourceDbHandlers.values()) { serverState.update(db.getLastChange()); } return serverState; } /** * {@inheritDoc} */ @Override public String toString() { return "ChangelogCache " + baseDn; } List<ServerHandler> servers = getDestinationServers(msg, senderHandler); /** * Check if some server Handler should be removed from flow control state. * @throws IOException If an error happened. */ public void checkAllSaturation() throws IOException { for (ServerHandler handler : changelogServers.values()) if (servers.isEmpty()) { handler.checkWindow(); if (!(msg instanceof InitializeRequestMessage)) { // TODO A more elaborated policy is probably needed } else { ErrorMessage errMsg = new ErrorMessage( msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, "serverID:" + msg.getDestination()); try { senderHandler.send(errMsg); } catch(IOException ioe) { // TODO Handle error properly (sender timeout in addition) } } return; } for (ServerHandler handler : connectedServers.values()) for (ServerHandler targetHandler : servers) { handler.checkWindow(); try { targetHandler.send(msg); } catch(IOException ioe) { // TODO Handle error properly (sender timeout in addition) } } } /** * Check if a server that was in flow control can now restart * sending updates. * @param sourceHandler The server that must be checked. * @return true if the server can restart sending changes. * false if the server can't restart sending changes. */ public boolean restartAfterSaturation(ServerHandler sourceHandler) { for (ServerHandler handler : changelogServers.values()) /** * Send back an ack to the server that sent the change. * * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. */ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) { if (!handler.restartAfterSaturation(sourceHandler)) short serverId = changeNumber.getServerId(); sendAck(changeNumber, isLDAPserver, serverId); } /** * * Send back an ack to a server that sent the change. * * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. * @param serverId The identifier of the server from which we * received the change.. */ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, short serverId) { ServerHandler handler; if (isLDAPserver) handler = connectedServers.get(serverId); else handler = changelogServers.get(serverId); // TODO : check for null handler and log error try { handler.sendAck(changeNumber); } catch (IOException e) { /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. */ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); handler.shutdown(); } } /** * Shutdown this ChangelogCache. */ public void shutdown() { // Close session with other changelogs for (ServerHandler serverHandler : changelogServers.values()) { serverHandler.shutdown(); } // Close session with other LDAP servers for (ServerHandler serverHandler : connectedServers.values()) { serverHandler.shutdown(); } // Shutdown the dbHandlers synchronized (sourceDbHandlers) { for (DbHandler dbHandler : sourceDbHandlers.values()) { dbHandler.shutdown(); } sourceDbHandlers.clear(); } } /** * Returns the ServerState describing the last change from this replica. * * @return The ServerState describing the last change from this replica. */ public ServerState getDbServerState() { ServerState serverState = new ServerState(); for (DbHandler db : sourceDbHandlers.values()) { serverState.update(db.getLastChange()); } return serverState; } /** * {@inheritDoc} */ @Override public String toString() { return "ChangelogCache " + baseDn; } /** * Check if some server Handler should be removed from flow control state. * @throws IOException If an error happened. */ public void checkAllSaturation() throws IOException { for (ServerHandler handler : changelogServers.values()) { handler.checkWindow(); } for (ServerHandler handler : connectedServers.values()) { handler.checkWindow(); } } /** * Check if a server that was in flow control can now restart * sending updates. * @param sourceHandler The server that must be checked. * @return true if the server can restart sending changes. * false if the server can't restart sending changes. */ public boolean restartAfterSaturation(ServerHandler sourceHandler) { for (ServerHandler handler : changelogServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) return false; } } for (ServerHandler handler : connectedServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) for (ServerHandler handler : connectedServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) return false; } return true; } return true; } } opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,6 +27,8 @@ package org.opends.server.synchronization.changelog; import static org.opends.server.loggers.Error.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -44,6 +46,18 @@ import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.ChangelogStartMessage; import org.opends.server.synchronization.protocol.HeartbeatThread; import org.opends.server.synchronization.protocol.ProtocolSession; import org.opends.server.synchronization.protocol.RoutableMessage; import org.opends.server.synchronization.protocol.ServerStartMessage; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import org.opends.server.synchronization.protocol.WindowMessage; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; @@ -51,17 +65,6 @@ import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.InitializationException; import org.opends.server.core.DirectoryServer; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.ChangelogStartMessage; import org.opends.server.synchronization.protocol.ProtocolSession; import org.opends.server.synchronization.protocol.ServerStartMessage; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import org.opends.server.synchronization.protocol.WindowMessage; import org.opends.server.synchronization.protocol.HeartbeatThread; import org.opends.server.util.TimeThread; /** @@ -108,6 +111,7 @@ // flow controled and should // be stopped from sending messsages. private int saturationCount = 0; private short changelogId; /** * The time in milliseconds between heartbeats from the synchronization @@ -155,6 +159,7 @@ public void start(DN baseDn, short changelogId, String changelogURL, int windowSize, Changelog changelog) { this.changelogId = changelogId; rcvWindowSizeHalf = windowSize/2; maxRcvWindow = windowSize; rcvWindow = windowSize; @@ -1263,4 +1268,40 @@ { return heartbeatInterval; } /** * Processes a routable message. * * @param msg The message to be processed. */ public void process(RoutableMessage msg) { if (debugEnabled()) debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1); changelogCache.process(msg, this); } /** * Send an InitializeRequestMessage to the server connected through this * handler. * * @param msg The message to be processed * @throws IOException when raised by the underlying session */ public void send(RoutableMessage msg) throws IOException { if (debugEnabled()) debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1); session.publish(msg); } } opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,6 +34,11 @@ import org.opends.server.api.DirectoryThread; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.ErrorMessage; import org.opends.server.synchronization.protocol.DoneMessage; import org.opends.server.synchronization.protocol.EntryMessage; import org.opends.server.synchronization.protocol.InitializeRequestMessage; import org.opends.server.synchronization.protocol.InitializeTargetMessage; import org.opends.server.synchronization.protocol.ProtocolSession; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; @@ -116,6 +121,33 @@ WindowMessage windowMsg = (WindowMessage) msg; handler.updateWindow(windowMsg); } else if (msg instanceof InitializeRequestMessage) { InitializeRequestMessage initializeMsg = (InitializeRequestMessage) msg; handler.process(initializeMsg); } else if (msg instanceof InitializeTargetMessage) { InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg; handler.process(initializeMsg); } else if (msg instanceof EntryMessage) { EntryMessage entryMsg = (EntryMessage) msg; handler.process(entryMsg); } else if (msg instanceof DoneMessage) { DoneMessage doneMsg = (DoneMessage) msg; handler.process(doneMsg); } else if (msg instanceof ErrorMessage) { ErrorMessage errorMsg = (ErrorMessage) msg; handler.process(errorMsg); } } } catch (IOException e) { opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,13 +317,61 @@ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42; /** * The message id for thedescription of the attribute used to configure * The message id for the description of the attribute used to configure * the purge delay of the Changelog Servers. */ public static final int MSGID_PURGE_DELAY_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43; /** * The message id for the error raised when export/import * is rejected due to an export/import already in progress. */ public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44; /** * The message id for the error raised when import * is rejected due to an invalid source of data imported. */ public static final int MSGID_INVALID_IMPORT_SOURCE = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45; /** * The message id for the error raised when export * is rejected due to an invalid target to export datas. */ public static final int MSGID_INVALID_EXPORT_TARGET = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46; /** * The message id for the error raised when import/export message * cannot be routed to an up-and-running target in the domain. */ public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47; /** * The message ID for the message that will be used when no domain * can be found matching the provided domain base DN. */ public static final int MSGID_NO_MATCHING_DOMAIN = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48; /** * The message ID for the message that will be used when no domain * can be found matching the provided domain base DN. */ public static final int MSGID_MULTIPLE_MATCHING_DOMAIN = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49; /** * The message ID for the message that will be used when the domain * belongs to a provider class that does not allow the export. */ public static final int MSGID_INVALID_PROVIDER = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50; /** * Register the messages from this class in the core server. @@ -449,5 +497,20 @@ " restored because changelog servers would not be able to refresh" + " LDAP servers with older versions of the data. A zero value" + " can be used to specify an infinite delay (or never purge)."); MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED, "The current request is rejected due to an import or an export" + " already in progress for the same data."); MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE, "Invalid source for the import."); MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET, "Invalid target for the export."); MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, "No reachable peer in the domain."); MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN, "No domain matches the base DN provided."); MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN, "Multiple domains match the base DN provided."); MessageHandler.registerMessage(MSGID_INVALID_PROVIDER, "The provider class does not allow the operation requested."); } } opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,6 +27,8 @@ package org.opends.server.synchronization.plugin; import static org.opends.server.loggers.Error.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -345,6 +347,10 @@ { if (session != null) { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "Broker : connect closing session" , 1); session.close(); session = null; } @@ -498,6 +504,7 @@ try { SynchronizationMessage msg = session.receive(); if (msg instanceof WindowMessage) { WindowMessage windowMsg = (WindowMessage) msg; @@ -544,6 +551,11 @@ connected = false; try { if (debugEnabled()) { debugInfo("ChangelogBroker Stop Closing session"); } session.close(); } catch (IOException e) {} @@ -682,4 +694,12 @@ { return numLostConnections; } private void log(String message) { int msgID = MSGID_UNKNOWN_TYPE; logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } } opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,12 +27,14 @@ package org.opends.server.synchronization.plugin; import org.opends.server.api.DirectoryThread; import org.opends.server.synchronization.protocol.ProtocolSession; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import java.io.IOException; import org.opends.server.api.DirectoryThread; import org.opends.server.synchronization.protocol.ProtocolSession; /** * This class implements a thread to monitor heartbeat messages from the * synchronization server. Each broker runs one of these threads. @@ -103,6 +105,9 @@ long lastReceiveTime = session.getLastReceiveTime(); if (now > lastReceiveTime + 2 * heartbeatInterval) { debugInfo("Heartbeat monitor is closing the broker session " + "because it could not detect a heartbeat."); // Heartbeat is well overdue so the server is assumed to be dead. if (debugEnabled()) { opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@ * Can be null is the request has no associated operation. * @return The Synchronization domain for this DN. */ private static SynchronizationDomain findDomain(DN dn, Operation op) public static SynchronizationDomain findDomain(DN dn, Operation op) { /* * Don't run the special synchronization code on Operation that are opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,7 +233,8 @@ { int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); op.toString(), op.getErrorMessage(), baseDn.toString(), Thread.currentThread().getStackTrace()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFInputStream.java
New file @@ -0,0 +1,122 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.plugin; import java.io.IOException; import java.io.InputStream; /** * This class creates an input stream that can be used to read entries generated * by SynchroLDIF as if they were being read from another source like a file. */ public class SynchroLDIFInputStream extends InputStream { // Indicates whether this input stream has been closed. private boolean closed; // The synchronization domain associated to this import. SynchronizationDomain domain; /** * Creates a new SynchroLDIFInputStream that will import entries * for a synchronzation domain. * * @param domain The synchronization domain */ public SynchroLDIFInputStream(SynchronizationDomain domain) { this.domain = domain; closed = false; } /** * Closes this input stream so that no more data may be read from it. */ public void close() { closed = true; } /** * Reads data from this input stream. * * @param b The array into which the data should be read. * @param off The position in the array at which point the data read may be * placed. * @param len The maximum number of bytes that may be read into the * provided array. * * @return The number of bytes read from the input stream into the provided * array, or -1 if the end of the stream has been reached. * * @throws IOException If a problem has occurred while generating data for * use by this input stream. */ public int read(byte[] b, int off, int len) throws IOException { if (closed) return -1; byte[] bytes = domain.receiveEntryBytes(); if (bytes==null) { closed = true; return -1; } int l = bytes.length; for (int i =0; i<l; i++) { b[off+i] = bytes[i]; } return l; } /** * Reads a single byte of data from this input stream. * * @return The byte read from the input stream, or -1 if the end of the * stream has been reached. * * @throws IOException If a problem has occurred while generating data for * use by this input stream. */ public int read() throws IOException { // This method is not supposed to be called to make an LDIF import // for synchronization. throw new IOException("Not implemented"); } } opends/src/server/org/opends/server/synchronization/plugin/SynchroLDIFOutputStream.java
New file @@ -0,0 +1,97 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.plugin; import java.io.IOException; import java.io.OutputStream; /** * This class creates an output stream that can be used to export entries * to a synchonization domain. */ public class SynchroLDIFOutputStream extends OutputStream { SynchronizationDomain domain; String entryBuffer = ""; /** * Creates a new SynchroLDIFOutputStream related to a synchronization * domain. * * @param domain The synchronization domain */ public SynchroLDIFOutputStream(SynchronizationDomain domain) { this.domain = domain; } /** * {@inheritDoc} */ public void write(int i) throws IOException { throw new IOException("Invalid call"); } /** * {@inheritDoc} */ public void write(byte b[], int off, int len) throws IOException { int endOfEntryIndex; int startOfEntryIndex = off; int bytesToRead = len; while (true) { // if we have the bytes for an entry, let's make an entry and send it String ebytes = new String(b,startOfEntryIndex,bytesToRead); endOfEntryIndex = ebytes.indexOf("\n\n"); if ( endOfEntryIndex >= 0 ) { endOfEntryIndex += 2; entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex); // Send the entry domain.sendEntryLines(entryBuffer); startOfEntryIndex = startOfEntryIndex + endOfEntryIndex; entryBuffer = ""; bytesToRead -= endOfEntryIndex; if (bytesToRead==0) break; } else { entryBuffer = new String(b, startOfEntryIndex, len); break; } } } } opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,43 +26,54 @@ */ package org.opends.server.synchronization.plugin; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.server.util.ServerConstants. TIME_UNIT_MILLISECONDS_ABBR; import static org.opends.server.util.ServerConstants. TIME_UNIT_MILLISECONDS_FULL; import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR; import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL; import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN; import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS; import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID; import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE; import static org.opends.server.loggers.Error.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.ConfigMessages.*; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.messages.ToolMessages.*; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.synchronization.plugin.Historical.*; import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME; import static org.opends.server.synchronization.protocol.OperationContext.*; import static org.opends.server.loggers.Error.*; import static org.opends.server.messages.MessageHandler.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.createEntry; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import java.util.LinkedHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; import org.opends.server.api.Backend; import org.opends.server.api.ConfigurableComponent; import org.opends.server.api.DirectoryThread; import org.opends.server.api.SynchronizationProvider; import org.opends.server.backends.jeb.BackendImpl; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; import org.opends.server.config.BooleanConfigAttribute; import org.opends.server.config.ConfigAttribute; import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.config.DNConfigAttribute; import org.opends.server.config.IntegerConfigAttribute; import org.opends.server.config.StringConfigAttribute; import org.opends.server.config.IntegerWithUnitConfigAttribute; import org.opends.server.config.StringConfigAttribute; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.core.LockFileManager; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.Operation; @@ -73,23 +84,35 @@ import org.opends.server.protocols.ldap.LDAPException; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ChangeNumberGenerator; import org.opends.server.synchronization.common.LogMessages; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.AddContext; import org.opends.server.synchronization.protocol.DeleteContext; import org.opends.server.synchronization.protocol.DoneMessage; import org.opends.server.synchronization.protocol.EntryMessage; import org.opends.server.synchronization.protocol.ErrorMessage; import org.opends.server.synchronization.protocol.InitializeRequestMessage; import org.opends.server.synchronization.protocol.InitializeTargetMessage; import org.opends.server.synchronization.protocol.ModifyContext; import org.opends.server.synchronization.protocol.ModifyDNMsg; import org.opends.server.synchronization.protocol.ModifyDnContext; import org.opends.server.synchronization.protocol.OperationContext; import org.opends.server.synchronization.protocol.RoutableMessage; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import org.opends.server.tasks.InitializeTargetTask; import org.opends.server.tasks.InitializeTask; import org.opends.server.tasks.TaskUtils; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DirectoryException; import org.opends.server.types.DN; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.Modification; import org.opends.server.types.RDN; import org.opends.server.types.ResultCode; @@ -137,8 +160,96 @@ * server. Zero means heartbeats are off. */ private long heartbeatInterval = 0; short serverId; private short serverId; /** * This class contain the context related to an import or export * launched on the domain. */ private class IEContext { // The task that initiated the operation. Task initializeTask; // The input stream for the import SynchroLDIFInputStream ldifImportInputStream = null; // The target in the case of an export short exportTarget = RoutableMessage.UNKNOWN_SERVER; // The source in the case of an import short importSource = RoutableMessage.UNKNOWN_SERVER; // The total entry count expected to be processed long entryCount = 0; // The count for the entry left to be processed long entryLeftCount = 0; // The exception raised when any DirectoryException exception = null; /** * Initializes the counters of the task with the provider value. * @param count The value with which to initialize the counters. */ public void initTaskCounters(long count) { entryCount = count; entryLeftCount = count; if (initializeTask != null) { if (initializeTask instanceof InitializeTask) { ((InitializeTask)initializeTask).setTotal(entryCount); ((InitializeTask)initializeTask).setLeft(entryCount); } else if (initializeTask instanceof InitializeTargetTask) { ((InitializeTargetTask)initializeTask).setTotal(entryCount); ((InitializeTargetTask)initializeTask).setLeft(entryCount); } } } /** * Update the counters of the task for each entry processed during * an import or export. */ public void updateTaskCounters() { entryLeftCount--; if (initializeTask != null) { if (initializeTask instanceof InitializeTask) { ((InitializeTask)initializeTask).setLeft(entryLeftCount); } else if (initializeTask instanceof InitializeTargetTask) { ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); } } } /** * Update the state of the task. */ protected TaskState updateTaskCompletionState() { if (exception == null) return TaskState.COMPLETED_SUCCESSFULLY; else return TaskState.STOPPED_BY_ERROR; } } // The context related to an import or export being processed // Null when none is being processed. private IEContext ieContext = null; // The backend informations necessary to make an import or export. private Backend backend; private ConfigEntry backendConfigEntry; private List<DN> branches = new ArrayList<DN>(0); private int listenerThreadNumber = 10; private boolean receiveStatus = true; @@ -160,6 +271,7 @@ private boolean solveConflictFlag = true; private boolean disabled = false; private boolean stateSavingDisabled = false; static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn"; @@ -205,8 +317,6 @@ timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D); } /** * Creates a new SynchronizationDomain using configuration from configEntry. * @@ -217,6 +327,7 @@ public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException { super("Synchronization flush"); /* * read the centralized changelog server configuration * this is a multivalued attribute @@ -397,6 +508,10 @@ if (!receiveStatus) broker.suspendReceive(); } // Retrieves the related backend and its config entry retrievesBackendInfos(baseDN); } catch (Exception e) { /* TODO should mark that changelog service is @@ -803,9 +918,9 @@ } /** * Receive an update message from the changelog. * Receives an update message from the changelog. * also responsible for updating the list of pending changes * @return the received message * @return the received message - null if none */ public UpdateMessage receive() { @@ -823,7 +938,7 @@ // The server is in the shutdown process return null; } log("Broker received message :" + msg); if (msg instanceof AckMessage) { AckMessage ack = (AckMessage) msg; @@ -834,6 +949,56 @@ update = (UpdateMessage) msg; receiveUpdate(update); } else if (msg instanceof InitializeRequestMessage) { // Another server requests us to provide entries // for a total update InitializeRequestMessage initMsg = (InitializeRequestMessage) msg; try { initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), null); } catch(DirectoryException de) { // Returns an error message to notify the sender int msgID = de.getErrorMessageID(); ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), msgID, de.getMessage()); broker.publish(errorMsg); } } else if (msg instanceof InitializeTargetMessage) { // Another server is exporting its entries to us InitializeTargetMessage initMsg = (InitializeTargetMessage) msg; try { importBackend(initMsg); } catch(DirectoryException de) { // Return an error message to notify the sender int msgID = de.getErrorMessageID(); ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), msgID, de.getMessage()); log(getMessage(msgID, backend.getBackendID()) + de.getMessage()); broker.publish(errorMsg); } } else if (msg instanceof ErrorMessage) { if (ieContext != null) { // This is an error termination for the 2 following cases : // - either during an export // - or before an import really started // For example, when we publish a request and the // changelog did not find any import source. abandonImportExport((ErrorMessage)msg); } } } catch (SocketTimeoutException e) { // just retry @@ -876,7 +1041,9 @@ public void receiveAck(AckMessage ack) { UpdateMessage update; ChangeNumber changeNumber = ack.getChangeNumber(); ChangeNumber changeNumber; changeNumber = ack.getChangeNumber(); synchronized (pendingChanges) { @@ -1105,7 +1272,7 @@ synchronized (this) { this.wait(1000); if (!disabled ) if (!disabled && !stateSavingDisabled ) { // save the RUV state.save(); @@ -1151,6 +1318,8 @@ this.notify(); } DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName()); // stop the ChangelogBroker broker.stop(); } @@ -1857,6 +2026,7 @@ return broker.getNumLostConnections(); } /** * Check if the domain solve conflicts. * @@ -1933,6 +2103,988 @@ // Nothing is needed at the moment } /* * Total Update >> */ /** * Receives bytes related to an entry in the context of an import to * initialize the domain (called by SynchronizationDomainLDIFInputStream). * * @return The bytes. Null when the Done or Err message has been received */ public byte[] receiveEntryBytes() { SynchronizationMessage msg; while (true) { try { msg = broker.receive(); if (msg == null) { // The server is in the shutdown process return null; } log("receiveEntryBytes: received " + msg); if (msg instanceof EntryMessage) { // FIXME EntryMessage entryMsg = (EntryMessage)msg; byte[] entryBytes = entryMsg.getEntryBytes().clone(); ieContext.updateTaskCounters(); return entryBytes; } else if (msg instanceof DoneMessage) { // This is the normal termination of the import // No error is stored and the import is ended // by returning null return null; } else if (msg instanceof ErrorMessage) { // This is an error termination during the import // The error is stored and the import is ended // by returning null ErrorMessage errorMsg = (ErrorMessage)msg; ieContext.exception = new DirectoryException(ResultCode.OTHER, errorMsg.getDetails() , errorMsg.getMsgID()); return null; } else { // Other messages received during an import are trashed } } catch(Exception e) { ieContext.exception = new DirectoryException(ResultCode.OTHER, "received an unexpected message type" , 1, e); } return null; } } /** * Processes an error message received while an import/export is * on going. * @param errorMsg The error message received. */ protected void abandonImportExport(ErrorMessage errorMsg) { // FIXME TBD Treat the case where the error happens while entries // are being exported if (ieContext != null) { ieContext.exception = new DirectoryException(ResultCode.OTHER, errorMsg.getDetails() , errorMsg.getMsgID()); if (ieContext.initializeTask instanceof InitializeTask) { // Update the task that initiated the import ((InitializeTask)ieContext.initializeTask). setState(ieContext.updateTaskCompletionState(),ieContext.exception); ieContext = null; } } } /** * Clears all the entries from the JE backend determined by the * be id passed into the method. * * @param createBaseEntry Indicate whether to automatically create the base * entry and add it to the backend. * @param beID The be id to clear. * @param dn The suffix of the backend to create if the the createBaseEntry * boolean is true. * @throws Exception If an unexpected problem occurs. */ public static void clearJEBackend(boolean createBaseEntry, String beID, String dn) throws Exception { BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID); DN[] baseDNs = backend.getBaseDNs(); // FIXME Should getConfigEntry be part of TaskUtils ? ConfigEntry configEntry = TaskUtils.getConfigEntry(backend); // FIXME Should setBackendEnabled be part of TaskUtils ? TaskUtils.setBackendEnabled(configEntry, false); try { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason)) { throw new RuntimeException(failureReason.toString()); } try { backend.clearBackend(configEntry, baseDNs); } finally { LockFileManager.releaseLock(lockFile, failureReason); } } finally { TaskUtils.setBackendEnabled(configEntry, true); } if (createBaseEntry) { DN baseDN = DN.decode(dn); Entry e = createEntry(baseDN); backend = (BackendImpl)DirectoryServer.getBackend(beID); backend.addEntry(e, null); } } /** * Log debug message. * @param message The message to log. */ private void log(String message) { if (debugEnabled()) { debugInfo("DebugInfo" + message); int msgID = MSGID_UNKNOWN_TYPE; logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationDomain/ " + message, msgID); } } /** * Export the entries. * @throws DirectoryException when an error occured */ protected void exportBackend() throws DirectoryException { // FIXME Temporary workaround - will probably be fixed when implementing // dynamic config retrievesBackendInfos(this.baseDN); // Acquire a shared lock for the backend. try { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (! LockFileManager.acquireSharedLock(lockFile, failureReason)) { int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } catch (Exception e) { int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message + " " + stackTraceToSingleLineString(e), msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this); LDIFExportConfig exportConfig = new LDIFExportConfig(os); // Launch the export. try { DN[] baseDNs = {this.baseDN}; backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig); } catch (DirectoryException de) { int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT; String message = getMessage(msgID, de.getErrorMessage()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT; String message = getMessage(msgID, stackTraceToSingleLineString(e)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } finally { // Clean up after the export by closing the export config. exportConfig.close(); // Release the shared lock on the backend. try { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (! LockFileManager.releaseLock(lockFile, failureReason)) { int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } catch (Exception e) { int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), stackTraceToSingleLineString(e)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } } /** * Retrieves the backend object related to the domain and the backend's * config entry. They will be used for import and export. * TODO This should be in a shared package rather than here. * * @param baseDN The baseDN to retrieve the backend * @throws DirectoryException when an error occired */ protected void retrievesBackendInfos(DN baseDN) throws DirectoryException { ArrayList<Backend> backendList = new ArrayList<Backend>(); ArrayList<ConfigEntry> entryList = new ArrayList<ConfigEntry>(); ArrayList<List<DN>> dnList = new ArrayList<List<DN>>(); Backend backend = null; ConfigEntry backendConfigEntry = null; List<DN> branches = new ArrayList<DN>(0); // Retrieves the backend related to this domain Backend domainBackend = DirectoryServer.getBackend(baseDN); if (domainBackend == null) { int msgID = MSGID_CANNOT_DECODE_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Retrieves its config entry and its DNs int code = getBackends(backendList, entryList, dnList); if (code != 0) { int msgID = MSGID_CANNOT_DECODE_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } int numBackends = backendList.size(); for (int i=0; i < numBackends; i++) { Backend b = backendList.get(i); if (domainBackend.getBackendID() != b.getBackendID()) { continue; } if (backend == null) { backend = domainBackend; backendConfigEntry = entryList.get(i).duplicate(); branches = dnList.get(i); } else { int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID; String message = getMessage(msgID, domainBackend.getBackendID()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } if (backend == null) { int msgID = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID; String message = getMessage(msgID, domainBackend.getBackendID()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } else if (! backend.supportsLDIFExport()) { int msgID = MSGID_LDIFIMPORT_CANNOT_IMPORT; String message = getMessage(msgID, 0); // FIXME logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } this.backend = backend; this.backendConfigEntry = backendConfigEntry; this.branches = branches; } /** * Sends lDIFEntry entry lines to the export target currently set. * * @param lDIFEntry The lines for the LDIF entry. * @throws IOException when an error occured. */ public void sendEntryLines(String lDIFEntry) throws IOException { // If an error was raised - like receiving an ErrorMessage // we just let down the export. if (ieContext.exception != null) { IOException ioe = new IOException(ieContext.exception.getMessage()); ieContext = null; throw ioe; } // new entry then send the current one EntryMessage entryMessage = new EntryMessage( serverId, ieContext.exportTarget, lDIFEntry.getBytes()); broker.publish(entryMessage); ieContext.updateTaskCounters(); } /** * Retrieves information about the backends defined in the Directory Server * configuration. * * @param backendList A list into which instantiated (but not initialized) * backend instances will be placed. * @param entryList A list into which the config entries associated with * the backends will be placed. * @param dnList A list into which the set of base DNs for each backend * will be placed. */ private static int getBackends(ArrayList<Backend> backendList, ArrayList<ConfigEntry> entryList, ArrayList<List<DN>> dnList) throws DirectoryException { // Get the base entry for all backend configuration. DN backendBaseDN = null; try { backendBaseDN = DN.decode(DN_BACKEND_BASE); } catch (DirectoryException de) { int msgID = MSGID_CANNOT_DECODE_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_CANNOT_DECODE_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE, stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } ConfigEntry baseEntry = null; try { baseEntry = DirectoryServer.getConfigEntry(backendBaseDN); } catch (ConfigException ce) { int msgID = MSGID_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY; String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY; String message = getMessage(msgID, DN_BACKEND_BASE, stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Iterate through the immediate children, attempting to parse them as // backends. for (ConfigEntry configEntry : baseEntry.getChildren().values()) { // Get the backend ID attribute from the entry. If there isn't one, then // skip the entry. String backendID = null; try { int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID; StringConfigAttribute idStub = new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID), true, false, true); StringConfigAttribute idAttr = (StringConfigAttribute) configEntry.getConfigAttribute(idStub); if (idAttr == null) { continue; } else { backendID = idAttr.activeValue(); } } catch (ConfigException ce) { int msgID = MSGID_CANNOT_DETERMINE_BACKEND_ID; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), ce.getMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_CANNOT_DETERMINE_BACKEND_ID; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Get the backend class name attribute from the entry. If there isn't // one, then just skip the entry. String backendClassName = null; try { int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS; StringConfigAttribute classStub = new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID), true, false, false); StringConfigAttribute classAttr = (StringConfigAttribute) configEntry.getConfigAttribute(classStub); if (classAttr == null) { continue; } else { backendClassName = classAttr.activeValue(); } } catch (ConfigException ce) { int msgID = MSGID_CANNOT_DETERMINE_BACKEND_CLASS; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), ce.getMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_CANNOT_DETERMINE_BACKEND_CLASS; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } Class backendClass = null; try { backendClass = Class.forName(backendClassName); } catch (Exception e) { int msgID = MSGID_CANNOT_LOAD_BACKEND_CLASS; String message = getMessage(msgID, backendClassName, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } Backend backend = null; try { backend = (Backend) backendClass.newInstance(); backend.setBackendID(backendID); } catch (Exception e) { int msgID = MSGID_CANNOT_INSTANTIATE_BACKEND_CLASS; String message = getMessage(msgID, backendClassName, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Get the base DN attribute from the entry. If there isn't one, then // just skip this entry. List<DN> baseDNs = null; try { int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS; DNConfigAttribute baseDNStub = new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID), true, true, true); DNConfigAttribute baseDNAttr = (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub); if (baseDNAttr == null) { msgID = MSGID_NO_BASES_FOR_BACKEND; String message = getMessage(msgID, String.valueOf(configEntry.getDN())); throw new DirectoryException( DirectoryServer.getServerErrorResultCode(), message,msgID, null); } else { baseDNs = baseDNAttr.activeValues(); } } catch (Exception e) { int msgID = MSGID_CANNOT_DETERMINE_BASES_FOR_BACKEND; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } backendList.add(backend); entryList.add(configEntry); dnList.add(baseDNs); } return 0; } /** * Initializes this domain from another source server. * * @param source The source from which to initialize * @param initTask The task that launched the initialization * and should be updated of its progress. * @throws DirectoryException when an error occurs */ public void initialize(short source, Task initTask) throws DirectoryException { acquireIEContext(); ieContext.initializeTask = initTask; InitializeRequestMessage initializeMsg = new InitializeRequestMessage( baseDN, serverId, source); // Publish Init request msg broker.publish(initializeMsg); // .. we expect to receive entries or err after that } /** * Verifies that the given string represents a valid source * from which this server can be initialized. * @param sourceString The string representaing the source * @return The source as a short value * @throws DirectoryException if the string is not valid */ public short decodeSource(String sourceString) throws DirectoryException { short source = 0; Throwable cause = null; try { source = Integer.decode(sourceString).shortValue(); if (source >= -1) { // TODO Verifies serverID is in the domain // We shold check here that this is a server implied // in the current domain. log("Source decoded for import:" + source); return source; } } catch(Exception e) { cause = e; } ResultCode resultCode = ResultCode.OTHER; int errorMessageID = MSGID_INVALID_IMPORT_SOURCE; String message = getMessage(errorMessageID); if (cause != null) throw new DirectoryException( resultCode, message, errorMessageID, cause); else throw new DirectoryException( resultCode, message, errorMessageID); } /** * Verifies that the given string represents a valid source * from which this server can be initialized. * @param targetString The string representing the source * @return The source as a short value * @throws DirectoryException if the string is not valid */ public short decodeTarget(String targetString) throws DirectoryException { short target = 0; Throwable cause; if (targetString.equalsIgnoreCase("all")) { return RoutableMessage.ALL_SERVERS; } // So should be a serverID try { target = Integer.decode(targetString).shortValue(); if (target >= 0) { // FIXME Could we check now that it is a know server in the domain ? } return target; } catch(Exception e) { cause = e; } ResultCode resultCode = ResultCode.OTHER; int errorMessageID = MSGID_INVALID_EXPORT_TARGET; String message = getMessage(errorMessageID); if (cause != null) throw new DirectoryException( resultCode, message, errorMessageID, cause); else throw new DirectoryException( resultCode, message, errorMessageID); } private synchronized void acquireIEContext() throws DirectoryException { if (ieContext != null) { // Rejects 2 simultaneous exports int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED; String message = getMessage(msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } ieContext = new IEContext(); } private synchronized void releaseIEContext() { ieContext = null; } /** * Process the initialization of some other server or servers in the topology * specified by the target argument. * @param target The target that should be initialized * @param initTask The task that triggers this initialization and that should * be updated with its progress. * @exception DirectoryException When an error occurs. */ public void initializeTarget(short target, Task initTask) throws DirectoryException { initializeTarget(target, serverId, initTask); } /** * Process the initialization of some other server or servers in the topology * specified by the target argument when this initialization has been * initiated by another server than this one. * @param target The target that should be initialized. * @param requestorID The server that initiated the export. * @param initTask The task that triggers this initialization and that should * be updated with its progress. * @exception DirectoryException When an error occurs. */ public void initializeTarget(short target, short requestorID, Task initTask) throws DirectoryException { acquireIEContext(); ieContext.exportTarget = target; ieContext.initializeTask = initTask; ieContext.initTaskCounters(backend.getEntryCount()); // Send start message InitializeTargetMessage initializeMessage = new InitializeTargetMessage( baseDN, serverId, ieContext.exportTarget, requestorID, ieContext.entryLeftCount); log("SD : publishes " + initializeMessage + " for #entries=" + ieContext.entryCount); broker.publish(initializeMessage); // make an export and send entries exportBackend(); // Successfull termnation DoneMessage doneMsg = new DoneMessage(serverId, initializeMessage.getDestination()); broker.publish(doneMsg); if (ieContext != null) { ieContext.updateTaskCompletionState(); ieContext = null; } } /** * Process backend before import. * @param backend The backend. * @param backendConfigEntry The config entry of the backend. * @throws Exception */ private void preBackendImport(Backend backend, ConfigEntry backendConfigEntry) throws Exception { // Stop saving state stateSavingDisabled = true; // Clear the backend clearJEBackend(false,backend.getBackendID(),null); // FIXME setBackendEnabled should be part of TaskUtils ? TaskUtils.setBackendEnabled(backendConfigEntry, false); // Acquire an exclusive lock for the backend. String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) { int msgID = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } } /** * Initializes the domain's backend with received entries. * @param initializeMessage The message that initiated the import. * @exception DirectoryException Thrown when an error occurs. */ protected void importBackend(InitializeTargetMessage initializeMessage) throws DirectoryException { LDIFImportConfig importConfig = null; try { log("startImport"); if (initializeMessage.getRequestorID() == serverId) { // The import responds to a request we did so the IEContext // is already acquired } else { acquireIEContext(); } ieContext.importSource = initializeMessage.getsenderID(); ieContext.entryLeftCount = initializeMessage.getEntryCount(); ieContext.initTaskCounters(initializeMessage.getEntryCount()); preBackendImport(this.backend, this.backendConfigEntry); DN[] baseDNs = {baseDN}; ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this); importConfig = new LDIFImportConfig(ieContext.ldifImportInputStream); importConfig.setIncludeBranches(this.branches); // TODO How to deal with rejected entries during the import // importConfig.writeRejectedEntries("rejectedImport", // ExistingFileBehavior.OVERWRITE); // Process import this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig); stateSavingDisabled = false; // Re-exchange state with SS broker.stop(); broker.start(changelogServers); } catch(Exception e) { throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(), 2);// FIXME } finally { // Cleanup importConfig.close(); // Re-enable backend closeBackendImport(this.backend, this.backendConfigEntry); // Update the task that initiated the import if ((ieContext != null ) && (ieContext.initializeTask != null)) { ((InitializeTask)ieContext.initializeTask). setState(ieContext.updateTaskCompletionState(),ieContext.exception); } releaseIEContext(); log("End importBackend"); } // Success } /** * Make post import operations. * @param backend The backend implied in the import. * @param backendConfigEntry The config entry of the backend. * @exception DirectoryException Thrown when an error occurs. */ protected void closeBackendImport(Backend backend, ConfigEntry backendConfigEntry) throws DirectoryException { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); // Release lock if (!LockFileManager.releaseLock(lockFile, failureReason)) { int msgID = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); new DirectoryException(ResultCode.OTHER, message, msgID); } // FIXME setBackendEnabled should be part taskUtils ? TaskUtils.setBackendEnabled(backendConfigEntry, true); } /** * Retrieves a synchronization domain based on the baseDN. * * @param baseDN The baseDN of the domain to retrieve * @return The domain retrieved * @throws DirectoryException When an error occured. */ public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN) throws DirectoryException { SynchronizationDomain synchronizationDomain = null; // Retrieves the domain DirectoryServer.getSynchronizationProviders(); for (SynchronizationProvider provider : DirectoryServer.getSynchronizationProviders()) { if (!( provider instanceof MultimasterSynchronization)) { int msgID = LogMessages.MSGID_INVALID_PROVIDER; String message = getMessage(msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } // From the domainDN retrieves the synchronization domain SynchronizationDomain sdomain = MultimasterSynchronization.findDomain(baseDN, null); if (sdomain == null) { int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN; String message = getMessage(msgID) + " " + baseDN; throw new DirectoryException(ResultCode.OTHER, message, msgID); } if (synchronizationDomain != null) { // Should never happen int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN; String message = getMessage(msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } synchronizationDomain = sdomain; } return synchronizationDomain; } /** * Returns the backend associated to this domain. * @return The associated backend. */ public Backend getBackend() { return backend; } /** * Returns a boolean indiciating if an import or export is currently * processed. * @return The status */ public boolean ieRunning() { return (ieContext != null); } /* * <<Total Update */ /** * Push the modifications contain the in given parameter has * a modification that would happen on a local server. opends/src/server/org/opends/server/synchronization/protocol/DoneMessage.java
New file @@ -0,0 +1,122 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.protocol; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; /** * This message is part of the synchronization protocol. * This message is sent by a server to one or several other servers after the * last entry sent in the context of a total update and signals to the server * that receives it that the export is now finished. */ public class DoneMessage extends RoutableMessage implements Serializable { private static final long serialVersionUID = 5216659571724730361L; /** * Creates a message. * * @param sender The sender server of this message. * @param destination The server or servers targetted by this message. */ public DoneMessage(short sender, short destination) { super(sender, destination); } /** * Creates a new message by decoding the provided byte array. * @param in A byte array containing the encoded information for the message, * @throws DataFormatException If the in does not contain a properly, * encoded message. */ public DoneMessage(byte[] in) throws DataFormatException { super(); try { // First byte is the type if (in[0] != MSG_TYPE_DONE) throw new DataFormatException("input is not a valid DoneMessage"); int pos = 1; // sender int length = getNextLength(in, pos); String senderString = new String(in, pos, length, "UTF-8"); this.senderID = Short.valueOf(senderString); pos += length +1; // destination length = getNextLength(in, pos); String destinationString = new String(in, pos, length, "UTF-8"); this.destination = Short.valueOf(destinationString); pos += length +1; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * {@inheritDoc} */ @Override public byte[] getBytes() { try { byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8"); byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8"); int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1; byte[] resultByteArray = new byte[length]; /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_DONE; int pos = 1; /* put the sender */ pos = addByteArray(senderBytes, resultByteArray, pos); /* put the destination */ pos = addByteArray(destinationBytes, resultByteArray, pos); return resultByteArray; } catch (UnsupportedEncodingException e) { return null; } } } opends/src/server/org/opends/server/synchronization/protocol/EntryMessage.java
New file @@ -0,0 +1,142 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.protocol; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; /** * This message is part of the synchronization protocol. * This message is sent by a server to one or several other servers and * contain one entry to be sent over the protocol in the context of * an import/export over the protocol. */ public class EntryMessage extends RoutableMessage implements Serializable { private static final long serialVersionUID = 6116955858351992926L; // The byte array containing the bytes of the entry transported private byte[] entryByteArray; /** * Creates a new EntryMessage. * * @param sender The sender of this message. * @param destination The destination of this message. * @param entryBytes The bytes of the entry. */ public EntryMessage(short sender, short destination, byte[] entryBytes) { super(sender, destination); this.entryByteArray = entryBytes.clone(); } /** * Creates a new EntryMessage from its encoded form. * * @param in The byte array containing the encoded form of the message. * @throws DataFormatException If the byte array does not contain a valid * encoded form of the ServerStartMessage. */ public EntryMessage(byte[] in) throws DataFormatException { try { /* first byte is the type */ if (in[0] != MSG_TYPE_ENTRY) throw new DataFormatException("input is not a valid ServerStart msg"); int pos = 1; // sender int length = getNextLength(in, pos); String senderIDString = new String(in, pos, length, "UTF-8"); this.senderID = Short.valueOf(senderIDString); pos += length +1; // destination length = getNextLength(in, pos); String destinationString = new String(in, pos, length, "UTF-8"); this.destination = Short.valueOf(destinationString); pos += length +1; // entry length = getNextLength(in, pos); this.entryByteArray = new byte[length]; for (int i=0; i<length; i++) { entryByteArray[i] = in[pos+i]; } } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * Returns the entry bytes. * @return The entry bytes. */ public byte[] getEntryBytes() { return entryByteArray; } /** * {@inheritDoc} */ @Override public byte[] getBytes() { try { byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8"); byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8"); byte[] entryBytes = entryByteArray; int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1 + entryBytes.length + 1; byte[] resultByteArray = new byte[length]; /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_ENTRY; int pos = 1; pos = addByteArray(senderBytes, resultByteArray, pos); pos = addByteArray(destinationBytes, resultByteArray, pos); pos = addByteArray(entryBytes, resultByteArray, pos); return resultByteArray; } catch (UnsupportedEncodingException e) { return null; } } } opends/src/server/org/opends/server/synchronization/protocol/ErrorMessage.java
New file @@ -0,0 +1,188 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.protocol; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; /** * This message is part of the synchronization protocol. * This message is sent by a server or a changelog server when an error * is detected in the context of a total update. */ public class ErrorMessage extends RoutableMessage implements Serializable { private static final long serialVersionUID = 2726389860247088266L; // Specifies the messageID built form the error that was detected private int msgID; // Specifies the complementary details about the error that was detected private String details = null; /** * Create a InitializeMessage. * @param sender The server ID of the server that send this message. * @param destination The destination server or servers of this message. * @param msgID The error message ID. * @param details The details of the error. */ public ErrorMessage(short sender, short destination, int msgID, String details) { super(sender, destination); this.msgID = msgID; this.details = details; } /** * Create a InitializeMessage. * * @param destination changelog server id * @param msgID error message ID * @param details details of the error */ public ErrorMessage(short destination, int msgID, String details) { super((short)-2, destination); this.msgID = msgID; this.details = details; } /** * Creates a new InitializeMessage by decoding the provided byte array. * @param in A byte array containing the encoded information for the Message * @throws DataFormatException If the in does not contain a properly * encoded InitializeMessage. */ public ErrorMessage(byte[] in) throws DataFormatException { super(); try { /* first byte is the type */ if (in[0] != MSG_TYPE_ERROR) throw new DataFormatException("input is not a valid InitializeMessage"); int pos = 1; // sender int length = getNextLength(in, pos); String senderString = new String(in, pos, length, "UTF-8"); senderID = Short.valueOf(senderString); pos += length +1; // destination length = getNextLength(in, pos); String serverIdString = new String(in, pos, length, "UTF-8"); destination = Short.valueOf(serverIdString); pos += length +1; // MsgID length = getNextLength(in, pos); String msgIdString = new String(in, pos, length, "UTF-8"); msgID = Integer.valueOf(msgIdString); pos += length +1; // Details length = getNextLength(in, pos); details = new String(in, pos, length, "UTF-8"); pos += length +1; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * Get the base DN from this InitializeMessage. * * @return the base DN from this InitializeMessage. */ public String getDetails() { return details; } /** * Get the base DN from this InitializeMessage. * * @return the base DN from this InitializeMessage. */ public int getMsgID() { return msgID; } /** * {@inheritDoc} */ @Override public byte[] getBytes() { /* The InitializeMessage is stored in the form : * <operation type><basedn><serverid> */ try { byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8"); byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8"); byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8"); byte[] byteDetails = details.getBytes("UTF-8"); int length = 1 + byteSender.length + 1 + byteDestination.length + 1 + byteErrMsgId.length + 1 + byteDetails.length + 1; byte[] resultByteArray = new byte[length]; // put the type of the operation resultByteArray[0] = MSG_TYPE_ERROR; int pos = 1; // sender pos = addByteArray(byteSender, resultByteArray, pos); // destination pos = addByteArray(byteDestination, resultByteArray, pos); // MsgId pos = addByteArray(byteErrMsgId, resultByteArray, pos); // details pos = addByteArray(byteDetails, resultByteArray, pos); return resultByteArray; } catch (UnsupportedEncodingException e) { return null; } } } opends/src/server/org/opends/server/synchronization/protocol/InitializeRequestMessage.java
New file @@ -0,0 +1,158 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.protocol; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; /** * This message is part of the synchronization protocol. * This message is sent by a server to another server in order to * request this other server to do an export to the server sender * of this message. */ public class InitializeRequestMessage extends RoutableMessage implements Serializable { private static final long serialVersionUID = 8303271162942249215L; private String baseDn = null; /** * Creates a InitializeRequestMessage message. * * @param baseDn The base DN of the synchronization domain. * @param destination destination of this message * @param senderID serverID of the server that will send this message */ public InitializeRequestMessage(DN baseDn, short senderID, short destination) { super(senderID, destination); this.baseDn = baseDn.toNormalizedString(); } /** * Creates a new InitializeRequestMessage by decoding the provided byte array. * @param in A byte array containing the encoded information for the Message * @throws DataFormatException If the in does not contain a properly * encoded InitializeMessage. */ public InitializeRequestMessage(byte[] in) throws DataFormatException { super(); try { /* first byte is the type */ if (in[0] != MSG_TYPE_INITIALIZE_REQUEST) throw new DataFormatException( "input is not a valid InitializeRequestMessage"); int pos = 1; // baseDn int length = getNextLength(in, pos); baseDn = new String(in, pos, length, "UTF-8"); pos += length +1; // sender length = getNextLength(in, pos); String sourceServerIdString = new String(in, pos, length, "UTF-8"); senderID = Short.valueOf(sourceServerIdString); pos += length +1; // destination length = getNextLength(in, pos); String destinationServerIdString = new String(in, pos, length, "UTF-8"); destination = Short.valueOf(destinationServerIdString); pos += length +1; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * Get the base DN from this InitializeRequestMessage. * * @return the base DN from this InitializeRequestMessage. */ public DN getBaseDn() { if (baseDn == null) return null; try { return DN.decode(baseDn); } catch (DirectoryException e) { return null; } } /** * {@inheritDoc} */ @Override public byte[] getBytes() { try { byte[] baseDNBytes = baseDn.getBytes("UTF-8"); byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8"); byte[] destinationBytes = String.valueOf(destination). getBytes("UTF-8"); int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1 + destinationBytes.length + 1; byte[] resultByteArray = new byte[length]; // type of the operation resultByteArray[0] = MSG_TYPE_INITIALIZE_REQUEST; int pos = 1; // baseDN pos = addByteArray(baseDNBytes, resultByteArray, pos); // sender pos = addByteArray(senderBytes, resultByteArray, pos); // destination pos = addByteArray(destinationBytes, resultByteArray, pos); return resultByteArray; } catch (UnsupportedEncodingException e) { return null; } } } opends/src/server/org/opends/server/synchronization/protocol/InitializeTargetMessage.java
New file @@ -0,0 +1,212 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.protocol; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; /** * This message is part of the synchronization protocol. * This message is sent by a server to one or several servers as the * first message of an export, before sending the entries. */ public class InitializeTargetMessage extends RoutableMessage implements Serializable { private static final long serialVersionUID = -2122460559739139735L; private String baseDN = null; // Specifies the number of entries expected to be exported. private long entryCount; // Specifies the serverID of the server that requested this export // to happen. It allows a server that previously sent an // InitializeRequestMessage to know that the current message // is related to its own request. private short requestorID; /** * Creates a InitializeDestinationMessage. * * @param baseDN The base DN for which the InitializeMessage is created. * @param senderID The serverID of the server that sends this message. * @param destination The destination of this message. * @param requestorID The server that initiates this export. * @param entryCount The count of entries that will be sent. */ public InitializeTargetMessage(DN baseDN, short senderID, short destination, short requestorID, long entryCount) { super(senderID, destination); this.requestorID = requestorID; this.baseDN = baseDN.toNormalizedString(); this.entryCount = entryCount; } /** * Creates an InitializeTargetMessage by decoding the provided byte array. * @param in A byte array containing the encoded information for the Message * @throws DataFormatException If the in does not contain a properly * encoded InitializeMessage. */ public InitializeTargetMessage(byte[] in) throws DataFormatException { super(); try { /* first byte is the type */ if (in[0] != MSG_TYPE_INITIALIZE_TARGET) throw new DataFormatException( "input is not a valid InitializeDestinationMessage"); int pos = 1; // destination int length = getNextLength(in, pos); String destinationString = new String(in, pos, length, "UTF-8"); this.destination = Short.valueOf(destinationString); pos += length +1; // baseDn length = getNextLength(in, pos); baseDN = new String(in, pos, length, "UTF-8"); pos += length +1; // sender length = getNextLength(in, pos); String senderString = new String(in, pos, length, "UTF-8"); senderID = Short.valueOf(senderString); pos += length +1; // requestor length = getNextLength(in, pos); String requestorString = new String(in, pos, length, "UTF-8"); requestorID = Short.valueOf(requestorString); pos += length +1; // entryCount length = getNextLength(in, pos); String entryCountString = new String(in, pos, length, "UTF-8"); entryCount = Long.valueOf(entryCountString); pos += length +1; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * Get the number of entries expected to be sent during the export. * @return the entry count */ public long getEntryCount() { return this.entryCount; } /** * Get the serverID of the server that initiated the export. * @return the serverID */ public long getRequestorID() { return this.requestorID; } /** * Get the base DN of the domain. * * @return the base DN */ public DN getBaseDN() { if (baseDN == null) return null; try { return DN.decode(baseDN); } catch (DirectoryException e) { return null; } } /** * {@inheritDoc} */ @Override public byte[] getBytes() { try { byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8"); byte[] byteDn = baseDN.getBytes("UTF-8"); byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8"); byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8"); byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8"); int length = 1 + byteDestination.length + 1 + byteDn.length + 1 + byteSender.length + 1 + byteRequestor.length + 1 + byteEntryCount.length + 1; byte[] resultByteArray = new byte[length]; /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_INITIALIZE_TARGET; int pos = 1; /* put the destination */ pos = addByteArray(byteDestination, resultByteArray, pos); /* put the baseDN and a terminating 0 */ pos = addByteArray(byteDn, resultByteArray, pos); /* put the sender */ pos = addByteArray(byteSender, resultByteArray, pos); /* put the requestorID */ pos = addByteArray(byteRequestor, resultByteArray, pos); /* put the entryCount */ pos = addByteArray(byteEntryCount, resultByteArray, pos); return resultByteArray; } catch (UnsupportedEncodingException e) { return null; } } } opends/src/server/org/opends/server/synchronization/protocol/RoutableMessage.java
New file @@ -0,0 +1,103 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization.protocol; import java.io.Serializable; /** * This is an abstract class of messages of the synchronization protocol * for message that needs to contain information about the server that * send them and the destination servers to whitch they should be sent. */ public abstract class RoutableMessage extends SynchronizationMessage implements Serializable { /** * Special values for the server ids fields contained in the routable * messages. **/ /** * Specifies that no server is identified. */ public static final short UNKNOWN_SERVER = -1; /** * Specifies all servers in the synchronization domain. */ public static final short ALL_SERVERS = -2; /** * Inside a topology of servers in the same domain, it specifies * the server that is the "closest" to the sender. */ public static final short THE_CLOSEST_SERVER = -3; /** * The destination server or servers of this message. */ protected short destination = UNKNOWN_SERVER; /** * The serverID of the server that sends this message. */ protected short senderID = UNKNOWN_SERVER; /** * Creates a routable message. * @param senderID changelog server id * @param destination changelog server id */ public RoutableMessage(short senderID, short destination) { this.senderID = senderID; this.destination = destination; } /** * Creates a routable message. */ public RoutableMessage() { } /** * Get the destination. * @return the destination */ public short getDestination() { return this.destination; } /** * Get the server ID of the server that sent this message. * @return the server id */ public short getsenderID() { return this.senderID; } } opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,11 +112,16 @@ /* Read the first 8 bytes containing the packet length */ int length = 0; /* Let's start the stop-watch before waiting on read */ /* for the heartbeat check to be operationnal */ lastReceiveTime = System.currentTimeMillis(); while (length<8) { int read = input.read(rcvLengthBuf, length, 8-length); if (read == -1) { lastReceiveTime=0; throw new IOException("no more data"); } else @@ -135,8 +140,9 @@ { length += input.read(buffer, length, totalLength - length); } lastReceiveTime = System.currentTimeMillis(); /* We do not want the heartbeat to close the session when */ /* we are processing a message even a time consuming one. */ lastReceiveTime=0; return SynchronizationMessage.generateMsg(buffer); } catch (OutOfMemoryError e) @@ -159,6 +165,10 @@ */ public long getLastReceiveTime() { if (lastReceiveTime==0) { return System.currentTimeMillis(); } return lastReceiveTime; } opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,6 +47,13 @@ static final byte MSG_TYPE_CHANGELOG_START = 7; static final byte MSG_TYPE_WINDOW = 8; static final byte MSG_TYPE_HEARTBEAT = 9; static final byte MSG_TYPE_INITIALIZE_REQUEST = 10; static final byte MSG_TYPE_INITIALIZE_TARGET = 11; static final byte MSG_TYPE_ENTRY = 12; static final byte MSG_TYPE_DONE = 13; static final byte MSG_TYPE_ERROR = 14; // Adding a new type of message here probably requires to // change accordingly generateMsg method below /** * Return the byte[] representation of this message. @@ -60,6 +67,11 @@ * MSG_TYPE_CHANGELOG_START * MSG_TYPE_WINDOW * MSG_TYPE_HEARTBEAT * MSG_TYPE_INITIALIZE * MSG_TYPE_INITIALIZE_TARGET * MSG_TYPE_ENTRY * MSG_TYPE_DONE * MSG_TYPE_ERROR * * @return the byte[] representation of this message. */ @@ -107,6 +119,21 @@ case MSG_TYPE_HEARTBEAT: msg = new HeartbeatMessage(buffer); break; case MSG_TYPE_INITIALIZE_REQUEST: msg = new InitializeRequestMessage(buffer); break; case MSG_TYPE_INITIALIZE_TARGET: msg = new InitializeTargetMessage(buffer); break; case MSG_TYPE_ENTRY: msg = new EntryMessage(buffer); break; case MSG_TYPE_DONE: msg = new DoneMessage(buffer); break; case MSG_TYPE_ERROR: msg = new ErrorMessage(buffer); break; default: throw new DataFormatException("received message with unknown type"); } opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
New file @@ -0,0 +1,214 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.tasks; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.core.DirectoryServer.getAttributeType; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.CoreMessages.*; import static org.opends.server.messages.MessageHandler.getMessage; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; import org.opends.server.messages.TaskMessages; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.synchronization.plugin.SynchronizationDomain; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; /** * This class provides an implementation of a Directory Server task that can * be used to import data from an LDIF file into a backend. */ public class InitializeTargetTask extends Task { // Config properties boolean append = false; boolean isCompressed = false; boolean isEncrypted = false; boolean skipSchemaValidation = false; String domainString = null; SynchronizationDomain domain = null; short target; long total; long left; /** * {@inheritDoc} */ @Override public void initializeTask() throws DirectoryException { // FIXME -- Do we need any special authorization here? Entry taskEntry = getTaskEntry(); AttributeType typeDomainBase; AttributeType typeScope; typeDomainBase = getAttributeType(ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN, true); typeScope = getAttributeType(ATTR_TASK_INITIALIZE_TARGET_SCOPE, true); List<Attribute> attrList; attrList = taskEntry.getAttribute(typeDomainBase); domainString = TaskUtils.getSingleValueString(attrList); DN domainDN = DN.nullDN(); try { domainDN = DN.decode(domainString); } catch(Exception e) { int msgID = TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN; String message = getMessage(msgID) + e.getMessage(); throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX, message, msgID); } domain=SynchronizationDomain.retrievesSynchronizationDomain(domainDN); attrList = taskEntry.getAttribute(typeScope); String targetString = TaskUtils.getSingleValueString(attrList); target = domain.decodeTarget(targetString); createCounterAttribute(ATTR_TASK_INITIALIZE_LEFT, 0); createCounterAttribute(ATTR_TASK_INITIALIZE_DONE, 0); } /** * {@inheritDoc} */ protected TaskState runTask() { if (debugEnabled()) { debugInfo("DebugInfo" + "InitializeTarget Task/runTask "); } try { domain.initializeTarget(target, this); } catch(DirectoryException de) { logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR, "Initialize Task stopped by error", 1); return TaskState.STOPPED_BY_ERROR; } return TaskState.COMPLETED_SUCCESSFULLY; } /** * Create attribute to store entry counters. * @param name The name of the attribute. * @param value The value to store for that attribute. */ protected void createCounterAttribute(String name, long value) { AttributeType type; LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); Entry taskEntry = getTaskEntry(); try { type = getAttributeType(name, true); values.add(new AttributeValue(type, new ASN1OctetString(String.valueOf(value)))); ArrayList<Attribute> attrList = new ArrayList<Attribute>(1); attrList.add(new Attribute(type, name,values)); taskEntry.putAttribute(type, attrList); } finally { // taskScheduler.unlockEntry(taskEntryDN, lock); } } /** * Set the total number of entries expected to be exported. * @param total The total number of entries. */ public void setTotal(long total) { this.total = total; try { updateAttribute(ATTR_TASK_INITIALIZE_LEFT, total); updateAttribute(ATTR_TASK_INITIALIZE_DONE, 0); } catch(Exception e) {} } /** * Set the total number of entries still to be exported. * @param left The total number of entries to be exported. */ public void setLeft(long left) { this.left = left; try { updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left); updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left); } catch(Exception e) {} } /** * Update an attribute for this task. * @param name The name of the attribute. * @param value The value. * @throws DirectoryException When an error occurs. */ protected void updateAttribute(String name, long value) throws DirectoryException { Entry taskEntry = getTaskEntry(); ArrayList<Modification> modifications = new ArrayList<Modification>(); modifications.add(new Modification(ModificationType.REPLACE, new Attribute(name, String.valueOf(value)))); taskEntry.applyModifications(modifications); } } opends/src/server/org/opends/server/tasks/InitializeTask.java
New file @@ -0,0 +1,267 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.tasks; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.core.DirectoryServer.getAttributeType; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.CoreMessages.*; import static org.opends.server.messages.MessageHandler.getMessage; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; import org.opends.server.messages.TaskMessages; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.synchronization.plugin.SynchronizationDomain; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; /** * This class provides an implementation of a Directory Server task that can * be used to import data over the synchronization protocol from another * server hosting the same synchronization domain. */ public class InitializeTask extends Task { boolean isCompressed = false; boolean isEncrypted = false; boolean skipSchemaValidation = false; String domainString = null; short source; SynchronizationDomain domain = null; TaskState initState; // The total number of entries expected to be processed when this import // will end successfully long total = 0; // The number of entries still to be processed for this import to be // completed long left = 0; /** * {@inheritDoc} */ @Override public void initializeTask() throws DirectoryException { // FIXME -- Do we need any special authorization here? Entry taskEntry = getTaskEntry(); AttributeType typeDomainBase; AttributeType typeSourceScope; typeDomainBase = getAttributeType(ATTR_TASK_INITIALIZE_DOMAIN_DN, true); typeSourceScope = getAttributeType(ATTR_TASK_INITIALIZE_SOURCE, true); List<Attribute> attrList; attrList = taskEntry.getAttribute(typeDomainBase); domainString = TaskUtils.getSingleValueString(attrList); DN domainDN = DN.nullDN(); try { domainDN = DN.decode(domainString); } catch(Exception e) { int msgID = TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN; String message = getMessage(msgID) + e.getMessage(); throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX, message, msgID); } domain=SynchronizationDomain.retrievesSynchronizationDomain(domainDN); attrList = taskEntry.getAttribute(typeSourceScope); String sourceString = TaskUtils.getSingleValueString(attrList); source = domain.decodeSource(sourceString); createAttribute(ATTR_TASK_INITIALIZE_LEFT, 0); createAttribute(ATTR_TASK_INITIALIZE_DONE, 0); } /** * {@inheritDoc} */ protected TaskState runTask() { if (debugEnabled()) { debugInfo("InitializeTask is starting domain: %s source:%d", domain.getBaseDN(), source); } initState = getTaskState(); // RUNNING try { // launch the import domain.initialize(source, this); synchronized(initState) { // Waiting for the end of the job while (initState == TaskState.RUNNING) { initState.wait(1000); updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left); updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left); } } updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left); updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left); } catch(InterruptedException ie) {} catch(DirectoryException de) { int msgID = de.getErrorMessageID(); String message = getMessage(msgID, de.getErrorMessage()); logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR, message, msgID); initState = TaskState.STOPPED_BY_ERROR; } if (debugEnabled()) { debugInfo("InitializeTask is ending with state:%d", initState); } return initState; } /** * Set the state for the current task. * * @param newState The new state value to set * @param de When the new state is different from COMPLETED_SUCCESSFULLY * this is the exception that contains the cause of the failure. */ public void setState(TaskState newState, DirectoryException de) { try { if (de != null) { int msgID = de.getErrorMessageID(); String message = getMessage(msgID, de.getErrorMessage()); logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } if (debugEnabled()) { logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR, "setState: "+newState, 1); debugInfo("InitializeTask/setState: ", newState); } initState = newState; synchronized (initState) { initState.notify(); } } catch(Exception e) {} } /** * Create a new attribute the task entry. * @param name The name of the attribute * @param value The value to store */ protected void createAttribute(String name, long value) { AttributeType type; LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); Entry taskEntry = getTaskEntry(); try { type = getAttributeType(name, true); values.add(new AttributeValue(type, new ASN1OctetString(String.valueOf(value)))); ArrayList<Attribute> attrList = new ArrayList<Attribute>(1); attrList.add(new Attribute(type, name,values)); taskEntry.putAttribute(type, attrList); } finally { // taskScheduler.unlockEntry(taskEntryDN, lock); } } /** * Update an attribute for this task. * @param name The name of the attribute. * @param value The value. * @throws DirectoryException When an error occurs. */ protected void updateAttribute(String name, long value) throws DirectoryException { Entry taskEntry = getTaskEntry(); ArrayList<Modification> modifications = new ArrayList<Modification>(); modifications.add(new Modification(ModificationType.REPLACE, new Attribute(name, String.valueOf(value)))); taskEntry.applyModifications(modifications); } /** * Set the total number of entries expected to be imported. * @param total The total number of entries. */ public void setTotal(long total) { this.total = total; } /** * Set the total number of entries still to be imported. * @param left The total number of entries to be imported. */ public void setLeft(long left) { this.left = left; } } opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/InitOnLineTest.java
New file @@ -0,0 +1,1488 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.synchronization; import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; import static org.opends.server.config.ConfigConstants.ATTR_TASK_INITIALIZE_DONE; import static org.opends.server.config.ConfigConstants.ATTR_TASK_INITIALIZE_LEFT; import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES; import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; import static org.opends.server.loggers.Error.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.MessageHandler.getMessage; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.UUID; import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.config.ConfigEntry; import org.opends.server.core.AddOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; import org.opends.server.messages.TaskMessages; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.synchronization.changelog.Changelog; import org.opends.server.synchronization.common.LogMessages; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.plugin.ChangelogBroker; import org.opends.server.synchronization.plugin.SynchronizationDomain; import org.opends.server.synchronization.protocol.ChangelogStartMessage; import org.opends.server.synchronization.protocol.DoneMessage; import org.opends.server.synchronization.protocol.EntryMessage; import org.opends.server.synchronization.protocol.ErrorMessage; import org.opends.server.synchronization.protocol.InitializeRequestMessage; import org.opends.server.synchronization.protocol.InitializeTargetMessage; import org.opends.server.synchronization.protocol.RoutableMessage; import org.opends.server.synchronization.protocol.ServerStartMessage; import org.opends.server.synchronization.protocol.SocketSession; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchScope; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; /** * Tests contained here: * * Initialize Test Cases <=> Pull entries * --------------------- * InitializeImport : Tests the import in the target DS. * Creates a task on current DS and makes a broker simulates DS2 sending entries. * InitializeExport : Tests the export from the source DS * A broker simulates DS2 pulling entries from current DS. * * Initialize Target Test Cases <=> Push entries * ---------------------------- * InitializeTargetExport : Tests the export from the source DS * Creates a task on current DS and makes broker simulates DS2 receiving entries * InitializeTargetImport : Test the import in the target DS * A broker simulates DS2 receiving entries from current DS. * * InitializeTargetConfigErrors : Tests configuration errors of the * InitializeTarget task */ public class InitOnLineTest extends SynchronizationTestCase { private static final int WINDOW_SIZE = 10; private static final int CHANGELOG_QUEUE_SIZE = 100; private static final String SYNCHRONIZATION_STRESS_TEST = "Synchronization Stress Test"; /** * A "person" entry */ protected Entry personEntry; protected Entry taskInitFromS2; protected Entry taskInitTargetS2; protected Entry taskInitTargetAll; SocketSession ssSession = null; boolean ssShutdownRequested = false; protected String[] updatedEntries; boolean externalDS = false; short server1ID = 1; short server2ID = 2; short server3ID = 3; short changelog1ID = 12; short changelog2ID = 13; int changelogPort = 8989; private DN baseDn; ChangelogBroker server2 = null; Changelog changelog1 = null; Changelog changelog2 = null; boolean emptyOldChanges = true; SynchronizationDomain sd = null; private void log(String s) { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "InitOnLineTests/" + s, 1); if (debugEnabled()) { debugInfo(s); } } protected void log(String message, Exception e) { log(message + stackTraceToSingleLineString(e)); } /** * Set up the environment for performing the tests in this Class. * synchronization * * @throws Exception * If the environment could not be set up. */ @BeforeClass public void setUp() throws Exception { log("Setup: debugEnabled:" + debugEnabled()); // This test suite depends on having the schema available. TestCaseUtils.startServer(); // Disable schema check schemaCheck = DirectoryServer.checkSchema(); DirectoryServer.setCheckSchema(false); baseDn = DN.decode("dc=example,dc=com"); updatedEntries = newLDIFEntries(); // Create an internal connection in order to provide operations // to DS to populate the db - connection = InternalClientConnection.getRootConnection(); // Synchro provider String synchroStringDN = "cn=Synchronization Providers,cn=config"; // Synchro multi-master synchroPluginStringDN = "cn=Multimaster Synchronization, " + synchroStringDN; String synchroPluginLdif = "dn: " + synchroPluginStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-provider\n" + "ds-cfg-synchronization-provider-enabled: true\n" + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n"; synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif); // Synchro suffix synchroServerEntry = null; // Add config entries to the current DS server based on : // Add the synchronization plugin: synchroPluginEntry & synchroPluginStringDN // Add synchroServerEntry // Add changeLogEntry configureSynchronization(); taskInitFromS2 = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: dc=example,dc=com", "ds-task-initialize-replica-server-id: " + server2ID); taskInitTargetS2 = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", "ds-task-initialize-domain-dn: dc=example,dc=com", "ds-task-initialize-replica-server-id: " + server2ID); taskInitTargetAll = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", "ds-task-initialize-domain-dn: dc=example,dc=com", "ds-task-initialize-replica-server-id: all"); // Change log String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN; String changeLogLdif = "dn: " + changeLogStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-changelog-server-config\n" + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n" + "ds-cfg-changelog-server-id: 1\n" + "ds-cfg-window-size: " + WINDOW_SIZE + "\n" + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE; changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); changeLogEntry = null; } // Tests that entries have been written in the db private void testEntriesInDb() { log("TestEntriesInDb"); short found = 0; for (String entry : updatedEntries) { int dns = entry.indexOf("dn: "); int dne = entry.indexOf("dc=com"); String dn = entry.substring(dns+4,dne+6); log("Search Entry: " + dn); DN entryDN = null; try { entryDN = DN.decode(dn); } catch(Exception e) { log("TestEntriesInDb/" + e); } try { Entry resultEntry = getEntry(entryDN, 1000, true); if (resultEntry==null) { log("Entry not found <" + dn + ">"); } else { log("Entry found <" + dn + ">"); found++; } } catch(Exception e) { log("TestEntriesInDb/", e); } } assertEquals(found, updatedEntries.length, " Entries present in DB :" + found + " Expected entries :" + updatedEntries.length); } private void addTask(Entry taskEntry, ResultCode expectedResult, int errorMessageID) { try { log("AddTask/" + taskEntry); // Change config of DS to launch the total update task InternalClientConnection connection = InternalClientConnection.getRootConnection(); // Add the task. AddOperation addOperation = connection.processAdd(taskEntry.getDN(), taskEntry.getObjectClasses(), taskEntry.getUserAttributes(), taskEntry.getOperationalAttributes()); assertEquals(addOperation.getResultCode(), expectedResult, "Result of ADD operation of the task is: " + addOperation.getResultCode() + " Expected:" + expectedResult + " Details:" + addOperation.getErrorMessage() + addOperation.getAdditionalLogMessage()); if (expectedResult != ResultCode.SUCCESS) { assertTrue(addOperation.getErrorMessage().toString(). startsWith(getMessage(errorMessageID).toString()), "Error MsgID of the task <" + addOperation.getErrorMessage() + "> equals <" + getMessage(errorMessageID) + ">"); log("Create config task: <"+ errorMessageID + addOperation.getErrorMessage() + ">"); } else { waitTaskState(taskEntry, TaskState.RUNNING, -1); } // Entry will be removed at the end of the test entryList.addLast(taskEntry.getDN()); log("AddedTask/" + taskEntry.getDN()); } catch(Exception e) { fail("Exception when adding task:"+ e.getMessage()); } } /** * Wait a task to be completed and check the expected state and expected * stats. * @param taskEntry The task to process. * @param expectedState The expected state fot this task. * @param expectedLeft The expected number of entries still to be processed. * @param expectedDone The expected numner of entries to be processed. */ private void waitTaskCompleted(Entry taskEntry, TaskState expectedState, long expectedLeft, long expectedDone) { try { // FIXME - Factorize with TasksTestCase // Wait until the task completes. int timeout = 2000; AttributeType completionTimeType = DirectoryServer.getAttributeType( ATTR_TASK_COMPLETION_TIME.toLowerCase()); SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; String completionTime = null; long startMillisecs = System.currentTimeMillis(); do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { // FIXME How is this possible? Must be issue 858. fail("Task entry was not returned from the search."); continue; } completionTime = resultEntry.getAttributeValue(completionTimeType, DirectoryStringSyntax.DECODER); if (completionTime == null) { if (System.currentTimeMillis() - startMillisecs > 1000*timeout) { break; } Thread.sleep(10); } } while (completionTime == null); if (completionTime == null) { fail("The task had not completed after " + timeout + " seconds."); } // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); TaskState taskState = TaskState.fromString(stateString); assertEquals(taskState, expectedState, "The task completed in an unexpected state"); // Check that the task contains some log messages. AttributeType logMessagesType = DirectoryServer.getAttributeType( ATTR_TASK_LOG_MESSAGES.toLowerCase()); ArrayList<String> logMessages = new ArrayList<String>(); resultEntry.getAttributeValues(logMessagesType, DirectoryStringSyntax.DECODER, logMessages); if (taskState != TaskState.COMPLETED_SUCCESSFULLY && logMessages.size() == 0) { fail("No log messages were written to the task entry on a failed task"); } try { // Check that the task state is as expected. taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true); stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); assertEquals(Long.decode(stateString).longValue(),expectedLeft, "The number of entries to process is not correct."); // Check that the task state is as expected. taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true); stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); assertEquals(Long.decode(stateString).longValue(),expectedDone, "The number of entries processed is not correct."); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } } private void waitTaskState(Entry taskEntry, TaskState expectedTaskState, int expectedMessage) { TaskState taskState = null; try { SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { // FIXME How is this possible? Must be issue 858. fail("Task entry was not returned from the search."); continue; } try { // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); taskState = TaskState.fromString(stateString); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } try { // Check that the left counter. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true); String leftString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); // Check that the total counter. taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true); String totalString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } Thread.sleep(2000); } while ((taskState != expectedTaskState) && (taskState != TaskState.STOPPED_BY_ERROR)); // Check that the task contains some log messages. AttributeType logMessagesType = DirectoryServer.getAttributeType( ATTR_TASK_LOG_MESSAGES.toLowerCase()); ArrayList<String> logMessages = new ArrayList<String>(); resultEntry.getAttributeValues(logMessagesType, DirectoryStringSyntax.DECODER, logMessages); if ((taskState != TaskState.COMPLETED_SUCCESSFULLY) && (taskState != TaskState.RUNNING)) { if (logMessages.size() == 0) { fail("No log messages were written to the task entry on a failed task"); } else { if (expectedMessage > 0) { log(logMessages.get(0)); log(getMessage(expectedMessage)); assertTrue(logMessages.get(0).indexOf( getMessage(expectedMessage))>0); } } } assertEquals(taskState, expectedTaskState, "Task State:" + taskState + " Expected task state:" + expectedTaskState); } catch(Exception e) { fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** * Add to the current DB the entries necessary to the test */ private void addTestEntriesToDB() { try { for (String ldifEntry : updatedEntries) { Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry); AddOperation addOp = new AddOperation(connection, InternalClientConnection.nextOperationID(), InternalClientConnection .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(), entry.getOperationalAttributes()); addOp.setInternalOperation(true); addOp.run(); if (addOp.getResultCode() != ResultCode.SUCCESS) { log("addEntry: Failed" + addOp.getResultCode()); } // They will be removed at the end of the test entryList.addLast(entry.getDN()); } } catch(Exception e) { fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /* * Creates entries necessary to the test. */ private String[] newLDIFEntries() { String[] entries = { "dn: dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 21111111-1111-1111-1111-111111111111\n" + "\n", "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: organizationalUnit\n" + "entryUUID: 21111111-1111-1111-1111-111111111112\n" + "\n", "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n" + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "cn: Fiona Jensen\n" + "sn: Jensen\n" + "uid: fiona\n" + "telephonenumber: +1 408 555 1212\n" + "entryUUID: 21111111-1111-1111-1111-111111111113\n" + "\n", "dn: cn=Robert Langman,ou=people,dc=example,dc=com\n" + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "cn: Robert Langman\n" + "sn: Langman\n" + "uid: robert\n" + "telephonenumber: +1 408 555 1213\n" + "entryUUID: 21111111-1111-1111-1111-111111111114\n" + "\n" }; return entries; } /** * Broker will send the entries to a server. * @param broker The broker that will send the entries. * @param senderID The serverID of this broker. * @param destinationServerID The target server. * @param requestorID The initiator server. */ private void makeBrokerPublishEntries(ChangelogBroker broker, short senderID, short destinationServerID, short requestorID) { // Send entries try { RoutableMessage initTargetMessage = new InitializeTargetMessage( baseDn, server2ID, destinationServerID, requestorID, updatedEntries.length); broker.publish(initTargetMessage); for (String entry : updatedEntries) { log("Broker will pusblish 1 entry: bytes:"+ entry.length()); EntryMessage entryMsg = new EntryMessage(senderID, destinationServerID, entry.getBytes()); broker.publish(entryMsg); } DoneMessage doneMsg = new DoneMessage(senderID, destinationServerID); broker.publish(doneMsg); log("Broker " + senderID + " published entries"); } catch(Exception e) { fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } void receiveUpdatedEntries(ChangelogBroker broker, short serverID, String[] updatedEntries) { // Expect the broker to receive the entries SynchronizationMessage msg; short entriesReceived = 0; while (true) { try { log("Broker " + serverID + " Wait for entry or done msg"); msg = broker.receive(); if (msg == null) break; if (msg instanceof InitializeTargetMessage) { log("Broker " + serverID + " receives InitializeTargetMessage "); entriesReceived = 0; } else if (msg instanceof EntryMessage) { EntryMessage em = (EntryMessage)msg; log("Broker " + serverID + " receives entry " + new String(em.getEntryBytes())); entriesReceived++; } else if (msg instanceof DoneMessage) { log("Broker " + serverID + " receives done "); break; } else if (msg instanceof ErrorMessage) { ErrorMessage em = (ErrorMessage)msg; log("Broker " + serverID + " receives ERROR " + getMessage(em.getMsgID()) + " " + em.getDetails()); break; } else { log("Broker " + serverID + " receives and trashes " + msg); } } catch(Exception e) { log("receiveUpdatedEntries" + stackTraceToSingleLineString(e)); } } assertTrue(entriesReceived == updatedEntries.length, " Received entries("+entriesReceived + ") == Expected entries("+updatedEntries.length+")"); } /** * Creates a new changelog server. * @param changelogId The serverID of the changelog to create. * @return The new changelog server. */ private Changelog createChangelogServer(short changelogId) { try { if ((changelogId==changelog1ID)&&(changelog1!=null)) return changelog1; if ((changelogId==changelog2ID)&&(changelog2!=null)) return changelog2; { int chPort = getChangelogPort(changelogId); // Create a changelog server String changelogLdif = "dn: cn=Changelog Server\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-changelog-server-config\n" + "cn: Changelog Server\n" + "ds-cfg-changelog-port: " + chPort + "\n" + "ds-cfg-changelog-server-id: " + changelogId + "\n" // + "ds-cfg-heartbeat-interval: 0 ms\n" + "ds-cfg-window-size: 100" + "\n"; if (changelogId==changelog2ID) { changelogLdif += new String( "ds-cfg-changelog-server: localhost:" + getChangelogPort(changelog1ID)+"\n"); } Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); ConfigEntry changelogConfig = new ConfigEntry(tmp, null); Changelog changelog = new Changelog(changelogConfig); Thread.sleep(1000); return changelog; } } catch (Exception e) { fail("createChangelog" + stackTraceToSingleLineString(e)); } return null; } /** * Create a synchronized suffix in the current server providing the * changelog serverID. * @param changelogID */ private void connectServer1ToChangelog(short changelogID) { // Connect DS to the changelog try { // suffix synchronized String synchroServerStringDN = synchroPluginStringDN; String synchroServerLdif = "dn: cn=example," + synchroServerStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-provider-config\n" + "cn: example\n" + "ds-cfg-synchronization-dn: dc=example,dc=com\n" + "ds-cfg-changelog-server: localhost:" + getChangelogPort(changelogID)+"\n" + "ds-cfg-directory-server-id: " + server1ID + "\n" + "ds-cfg-receive-status: true\n" // + "ds-cfg-heartbeat-interval: 0 ms\n" + "ds-cfg-window-size: " + WINDOW_SIZE; if (synchroServerEntry == null) { synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized server"); entryList.add(synchroServerEntry.getDN()); sd = SynchronizationDomain.retrievesSynchronizationDomain(baseDn); // Clear the backend SynchronizationDomain.clearJEBackend(false, sd.getBackend().getBackendID(), baseDn.toNormalizedString()); } if (sd != null) { log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning()); } } catch(Exception e) { log("connectServer1ToChangelog", e); fail("connectServer1ToChangelog", e); } } private int getChangelogPort(short changelogID) { return (changelogPort+changelogID); } /** * Tests the import side of the Initialize task */ @Test(enabled=false) public void InitializeImport() throws Exception { String testCase = "InitializeImport"; log("Starting "+testCase); try { changelog1 = createChangelogServer(changelog1ID); // Connect DS to the changelog connectServer1ToChangelog(changelog1ID); if (server2 == null) server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); Thread.sleep(2000); // In S1 launch the total update addTask(taskInitFromS2, ResultCode.SUCCESS, 0); // S2 should receive init msg SynchronizationMessage msg; msg = server2.receive(); if (!(msg instanceof InitializeRequestMessage)) { fail(testCase + " Message received by S2 is of unexpected class" + msg); } InitializeRequestMessage initMsg = (InitializeRequestMessage)msg; // S2 publishes entries to S1 makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(), initMsg.getsenderID()); // Wait for task (import) completion in S1 waitTaskCompleted(taskInitFromS2, TaskState.COMPLETED_SUCCESSFULLY, 0, updatedEntries.length); // Test import result in S1 testEntriesInDb(); cleanEntries(); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** * Tests the export side of the Initialize task */ @Test(enabled=false) public void InitializeExport() throws Exception { String testCase = "Synchronization/InitializeExport"; log("Starting "+testCase); changelog1 = createChangelogServer(changelog1ID); // Connect DS to the changelog connectServer1ToChangelog(changelog1ID); addTestEntriesToDB(); if (server2 == null) server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); Thread.sleep(3000); InitializeRequestMessage initMsg = new InitializeRequestMessage(baseDn, server2ID, server1ID); server2.publish(initMsg); receiveUpdatedEntries(server2, server2ID, updatedEntries); cleanEntries(); log("Successfully ending "+testCase); } /** * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetExport() throws Exception { String testCase = "Synchronization/InitializeTargetExport"; log("Starting " + testCase); changelog1 = createChangelogServer(changelog1ID); // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); // Add in S1 the entries to be exported addTestEntriesToDB(); // S1 is the server we are running in, S2 is simulated by a broker if (server2 == null) server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); Thread.sleep(1000); // Launch in S1 the task that will initialize S2 addTask(taskInitTargetS2, ResultCode.SUCCESS, 0); // Wait for task completion waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, -1); // Tests that entries have been received by S2 receiveUpdatedEntries(server2, server2ID, updatedEntries); cleanEntries(); log("Successfully ending " + testCase); } /** * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetExportAll() throws Exception { String testCase = "Synchronization/InitializeTargetExportAll"; log("Starting " + testCase); changelog1 = createChangelogServer(changelog1ID); // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); // Add in S1 the entries to be exported addTestEntriesToDB(); // S1 is the server we are running in, S2 and S3 are simulated by brokers if (server2==null) server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); ChangelogBroker server3 = openChangelogSession(DN.decode("dc=example,dc=com"), server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); Thread.sleep(1000); // Launch in S1 the task that will initialize S2 addTask(taskInitTargetAll, ResultCode.SUCCESS, 0); // Wait for task completion waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, -1); // Tests that entries have been received by S2 receiveUpdatedEntries(server2, server2ID, updatedEntries); receiveUpdatedEntries(server3, server3ID, updatedEntries); cleanEntries(); log("Successfully ending " + testCase); } /** * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetImport() throws Exception { String testCase = "InitializeTargetImport"; try { log("Starting " + testCase + " debugEnabled:" + debugEnabled()); // Start SS changelog1 = createChangelogServer(changelog1ID); // S1 is the server we are running in, S2 is simulated by a broker if (server2==null) server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); // S2 publishes entries to S1 makeBrokerPublishEntries(server2, server2ID, server1ID, server2ID); Thread.sleep(10000); // FIXME - how to know the import is done // Test that entries have been imported in S1 testEntriesInDb(); cleanEntries(); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetConfigErrors() throws Exception { String testCase = "InitializeTargetConfigErrors"; try { log("Starting " + testCase); // Invalid domain base dn Entry taskInitTarget = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", "ds-task-initialize-domain-dn: foo", "ds-task-initialize-remote-replica-server-id: " + server2ID); addTask(taskInitTarget, ResultCode.INVALID_DN_SYNTAX, TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN); // Domain base dn not related to any domain taskInitTarget = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", "ds-task-initialize-domain-dn: dc=foo", "ds-task-initialize-remote-replica-server-id: " + server2ID); addTask(taskInitTarget, ResultCode.OTHER, LogMessages.MSGID_NO_MATCHING_DOMAIN); // Invalid scope // createTask(taskInitTargetS2); // Scope containing a serverID absent from the domain // createTask(taskInitTargetS2); cleanEntries(); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeConfigErrors() throws Exception { String testCase = "InitializeConfigErrors"; try { log("Starting " + testCase); // Start SS changelog1 = createChangelogServer(changelog1ID); // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); // Invalid domain base dn Entry taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: foo", "ds-task-initialize-source: " + server2ID); addTask(taskInit, ResultCode.INVALID_DN_SYNTAX, TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN); // Domain base dn not related to any domain taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: dc=foo", "ds-task-initialize-source: " + server2ID); addTask(taskInit, ResultCode.OTHER, LogMessages.MSGID_NO_MATCHING_DOMAIN); // Invalid Source taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: " + baseDn, "ds-task-initialize-source: -3"); addTask(taskInit, ResultCode.OTHER, LogMessages.MSGID_INVALID_IMPORT_SOURCE); // Scope containing a serverID absent from the domain // createTask(taskInitTargetS2); cleanEntries(); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } @Test(enabled=false) public void InitializeTargetBroken() throws Exception { String testCase = "InitializeTargetBroken"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeBroken() throws Exception { String testCase = "InitializeBroken"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeTargetExportMultiSS() throws Exception { String testCase = "Synchronization/InitializeTargetExportMultiSS"; log("Starting " + testCase); // Create 2 changelogs changelog1 = createChangelogServer(changelog1ID); changelog2 = createChangelogServer(changelog2ID); // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); // Add in S1 the entries to be exported addTestEntriesToDB(); // S1 is the server we are running in, S2 is simulated by a broker // connected to changelog2 if (server2 == null) { server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); } Thread.sleep(1000); // Launch in S1 the task that will initialize S2 addTask(taskInitTargetS2, ResultCode.SUCCESS, 0); // Wait for task completion waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, -1); // Tests that entries have been received by S2 receiveUpdatedEntries(server2, server2ID, updatedEntries); cleanEntries(); changelog2.shutdown(); changelog2 = null; log("Successfully ending " + testCase); } @Test(enabled=false) public void InitializeExportMultiSS() throws Exception { String testCase = "Synchronization/InitializeExportMultiSS"; log("Starting "+testCase); // Create 2 changelogs changelog1 = createChangelogServer(changelog1ID); Thread.sleep(3000); changelog2 = createChangelogServer(changelog2ID); Thread.sleep(3000); // Connect DS to the changelog 1 connectServer1ToChangelog(changelog1ID); // Put entries in DB addTestEntriesToDB(); // Connect a broker acting as server 2 to changelog2 if (server2 == null) { server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); } Thread.sleep(3000); // S2 sends init request InitializeRequestMessage initMsg = new InitializeRequestMessage(baseDn, server2ID, server1ID); server2.publish(initMsg); // S2 should receive target, entries & done receiveUpdatedEntries(server2, server2ID, updatedEntries); cleanEntries(); changelog2.shutdown(); changelog2 = null; log("Successfully ending "+testCase); } @Test(enabled=false) public void InitializeNoSource() throws Exception { String testCase = "InitializeNoSource"; log("Starting "+testCase); // Start SS changelog1 = createChangelogServer(changelog1ID); // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); Entry taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: "+baseDn, "ds-task-initialize-replica-server-id: " + 20); addTask(taskInit, ResultCode.SUCCESS, 0); waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, LogMessages.MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN); if (sd != null) { log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning()); } log("Successfully ending "+testCase); } @Test(enabled=false) public void InitializeTargetNoTarget() throws Exception { String testCase = "InitializeTargetNoTarget" + baseDn; log("Starting "+testCase); // Start SS changelog1 = createChangelogServer(changelog1ID); // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); // Put entries in DB addTestEntriesToDB(); Entry taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-target", "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", "ds-task-initialize-target-domain-dn: "+baseDn, "ds-task-initialize-target-scope: " + 10); addTask(taskInit, ResultCode.SUCCESS, 0); waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, LogMessages.MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN); if (sd != null) { log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning()); } log("Successfully ending "+testCase); } @Test(enabled=false) public void InitializeStopped() throws Exception { String testCase = "InitializeStopped"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeTargetStopped() throws Exception { String testCase = "InitializeTargetStopped"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeCompressed() throws Exception { String testCase = "InitializeStopped"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeTargetEncrypted() throws Exception { String testCase = "InitializeTargetCompressed"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeSimultaneous() throws Exception { String testCase = "InitializeSimultaneous"; // Start SS changelog1 = createChangelogServer(changelog1ID); // Connect a broker acting as server 2 to changelog2 if (server2 == null) { server2 = openChangelogSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); } // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); Entry taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: "+baseDn, "ds-task-initialize-replica-server-id: " + server2ID); addTask(taskInit, ResultCode.SUCCESS, 0); Thread.sleep(3000); Entry taskInit2 = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: "+baseDn, "ds-task-initialize-replica-server-id: " + server2ID); // Second task is expected to be rejected addTask(taskInit2, ResultCode.SUCCESS, 0); waitTaskState(taskInit2, TaskState.STOPPED_BY_ERROR, LogMessages.MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED); // First task is stilll running waitTaskState(taskInit, TaskState.RUNNING, -1); // External request is supposed to be rejected // Now tests error in the middle of an import // S2 sends init request ErrorMessage msg = new ErrorMessage(server1ID, 1, ""); server2.publish(msg); waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, 1); cleanEntries(); log("Successfully ending "+testCase); } /** * Disconnect broker and remove entries from the local DB * @throws Exception */ protected void cleanEntries() { if (sd != null) { log("SynchronizationDomain: Import/Export is running ? " + sd.ieRunning()); } // Clean brokers if (server2 != null) { server2.stop(); TestCaseUtils.sleep(100); // give some time to the broker to disconnect // fromthe changelog server. server2 = null; } super.cleanEntries(); } } opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,6 +26,7 @@ */ package org.opends.server.synchronization; import static org.opends.server.loggers.Error.logError; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -54,6 +55,8 @@ import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ByteStringFactory; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.SearchScope; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.AttributeType; @@ -158,7 +161,11 @@ } } catch (Exception e) { } { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1); } } return broker; } @@ -221,7 +228,8 @@ } } catch (Exception e) { } { } } return broker; } @@ -231,6 +239,10 @@ */ protected void cleanEntries() { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationTestCase/Cleaning entries" , 1); DeleteOperation op; // Delete entries try @@ -238,6 +250,10 @@ while (true) { DN dn = entryList.removeLast(); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "cleaning entry " + dn, 1); op = new DeleteOperation(connection, InternalClientConnection .nextOperationID(), InternalClientConnection.nextMessageID(), null, dn); @@ -264,8 +280,13 @@ // WORKAROUND FOR BUG #639 - BEGIN - if (mms != null) { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationTestCase/FinalizeSynchronization Provider" , 1); DirectoryServer.deregisterSynchronizationProvider(mms); mms.finalizeSynchronizationProvider(); mms = null; } // WORKAROUND FOR BUG #639 - END - @@ -303,17 +324,22 @@ // // Add the changelog server DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), if (changeLogEntry!=null) { DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), "Unable to add the changeLog server"); entryList.add(changeLogEntry.getDN()); // // We also have a replicated suffix (synchronization domain) DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized server"); entryList.add(synchroServerEntry.getDN()); entryList.add(changeLogEntry.getDN()); } if (synchroServerEntry!=null) { // We also have a replicated suffix (synchronization domain) DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized suffix"); entryList.add(synchroServerEntry.getDN()); } } /** opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@ + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n" + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n" + "ds-cfg-window-size: 100" + "\n" + "ds-cfg-changelog-db-dirname: changelogDb"+i; + "ds-cfg-changelog-db-directory: changelogDb"+i; Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); ConfigEntry changelogConfig = new ConfigEntry(tmp, null); changelogs[i] = new Changelog(changelogConfig); opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,16 +26,18 @@ */ package org.opends.server.synchronization.protocol; import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.UUID; import java.util.zip.DataFormatException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.testng.Assert.*; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; @@ -56,8 +58,8 @@ import org.opends.server.types.ObjectClass; import org.opends.server.types.RDN; import org.opends.server.util.TimeThread; import static org.opends.server.synchronization.protocol.OperationContext.*; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** * Test the contructors, encoders and decoders of the synchronization @@ -520,6 +522,104 @@ } /** * Test that EntryMessage encoding and decoding works * by checking that : msg == new EntryMessageTest(msg.getBytes()). */ @Test() public void EntryMessageTest() throws Exception { String taskInitFromS2 = new String( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks\n" + "objectclass: top\n" + "objectclass: ds-task\n" + "objectclass: ds-task-initialize\n" + "ds-task-class-name: org.opends.server.tasks.InitializeTask" + "ds-task-initialize-domain-dn: dc=example,dc=com" + "ds-task-initialize-source: 1"); short sender = 1; short target = 2; byte[] entry = taskInitFromS2.getBytes(); EntryMessage msg = new EntryMessage(sender, target, entry); EntryMessage newMsg = new EntryMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes()); } /** * Test that InitializeRequestMessage encoding and decoding works */ @Test() public void InitializeRequestMessageTest() throws Exception { short sender = 1; short target = 2; InitializeRequestMessage msg = new InitializeRequestMessage( DN.decode("dc=example"), sender, target); InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn())); } /** * Test that InitializeTargetMessage encoding and decoding works */ @Test() public void InitializeTargetMessageTest() throws Exception { short senderID = 1; short targetID = 2; short requestorID = 3; long entryCount = 4; DN baseDN = DN.decode("dc=example"); InitializeTargetMessage msg = new InitializeTargetMessage( baseDN, senderID, targetID, requestorID, entryCount); InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertEquals(msg.getRequestorID(), newMsg.getRequestorID()); assertEquals(msg.getEntryCount(), newMsg.getEntryCount()); assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ; assertEquals(senderID, newMsg.getsenderID()); assertEquals(targetID, newMsg.getDestination()); assertEquals(requestorID, newMsg.getRequestorID()); assertEquals(entryCount, newMsg.getEntryCount()); assertTrue(baseDN.equals(newMsg.getBaseDN())) ; } /** * Test that DoneMessage encoding and decoding works */ @Test() public void DoneMessage() throws Exception { DoneMessage msg = new DoneMessage((short)1, (short)2); DoneMessage newMsg = new DoneMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); } /** * Test that ErrorMessage encoding and decoding works */ @Test() public void ErrorMessage() throws Exception { ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details"); ErrorMessage newMsg = new ErrorMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertEquals(msg.getMsgID(), newMsg.getMsgID()); assertEquals(msg.getDetails(), newMsg.getDetails()); } /** * Test PendingChange */ private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)