opends/resource/schema/02-config.ldif
@@ -1045,10 +1045,6 @@ NAME 'ds-cfg-heartbeat-interval' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306 NAME 'ds-cfg-changelog-db-directory' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307 NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' ) @@ -1107,22 +1103,6 @@ NAME 'ds-cfg-case-sensitive-validation' SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332 NAME 'ds-task-initialize-domain-dn' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333 NAME 'ds-task-initialize-replica-server-id' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334 NAME 'ds-task-unprocessed-entry-count' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335 NAME 'ds-task-processed-entry-count' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.1 NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled ) @@ -1445,8 +1425,7 @@ 'ds-cfg-synchronization-changelog-server-config' SUP top STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port ) MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $ ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory ) X-ORIGIN 'OpenDS Directory Server' ) ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory' SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn ) X-ORIGIN 'OpenDS Directory Server' ) @@ -1553,14 +1532,4 @@ SUP ds-cfg-password-validator STRUCTURAL MUST ( ds-cfg-maximum-consecutive-length $ ds-cfg-case-sensitive-validation ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.91 NAME 'ds-task-initialize-from-remote-replica' SUP ds-task MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id ) MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.92 NAME 'ds-task-initialize-remote-replica' SUP ds-task MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id ) MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count ) X-ORIGIN 'OpenDS Directory Server' ) opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,62 +3665,6 @@ NAME_PREFIX_TASK + "import-is-encrypted"; /** * The name of the objectclass that will be used for a Directory Server * initialize task definition. */ public static final String OC_INITIALIZE_TASK = NAME_PREFIX_TASK + "initialize-from-remote-replica"; /** * The name of the attribute in an initialize task definition that specifies * the base dn related to the synchonization domain to initialize. */ public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN = NAME_PREFIX_TASK + "initialize-domain-dn"; /** * The name of the attribute in an initialize target task definition that * specifies the source in terms of source server from which to initialize. */ public static final String ATTR_TASK_INITIALIZE_SOURCE = NAME_PREFIX_TASK + "initialize-replica-server-id"; /** * The name of the objectclass that will be used for a Directory Server * initialize target task definition. */ public static final String OC_INITIALIZE_TARGET_TASK = NAME_PREFIX_TASK + "initialize-remote-replica"; /** * The name of the attribute in an initialize target task definition that * specifies the base dn related to the synchonization domain to initialize. */ public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN = NAME_PREFIX_TASK + "initialize-domain-dn"; /** * The name of the attribute in an initialize target task definition that * specifies the scope in terms of servers to initialize. */ public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE = NAME_PREFIX_TASK + "initialize-replica-server-id"; /** * The name of the attribute in an initialize target task definition that * specifies the scope in terms of servers to initialize. */ public static final String ATTR_TASK_INITIALIZE_LEFT = NAME_PREFIX_TASK + "unprocessed-entry-count"; /** * The name of the attribute in an initialize target task definition that * specifies the scope in terms of servers to initialize. */ public static final String ATTR_TASK_INITIALIZE_DONE = NAME_PREFIX_TASK + "processed-entry-count"; /** * The name of the objectclass that will be used for a Directory Server opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,19 +211,7 @@ public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER = CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17; /** * The message ID for the message that will be used when an invalid domain * base DN is provided as argument to the initialize target task. */ public static final int MSGID_TASK_INITIALIZE_TARGET_INVALID_DN = CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18; /** * The message ID for the message that will be used when an invalid domain * base DN is provided as argument to the initialize task. */ public static final int MSGID_TASK_INITIALIZE_INVALID_DN = CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19; /** * Associates a set of generic messages with the message IDs defined in this @@ -291,11 +279,6 @@ registerMessage(MSGID_TASK_LDIFEXPORT_INSUFFICIENT_PRIVILEGES, "You do not have sufficient privileges to initiate an " + "LDIF export."); registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN, "Invalid DN provided with the Initialize Target task."); registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN, "Invalid DN provided with the Initialize task."); } } opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@ static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port"; static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size"; static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory"; static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname"; static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay"; opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,31 +26,26 @@ */ package org.opends.server.synchronization.changelog; import static org.opends.server.loggers.Error.logError; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.loggers.Error.logError; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.InitializeRequestMessage; import org.opends.server.synchronization.protocol.ErrorMessage; import org.opends.server.synchronization.protocol.RoutableMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import com.sleepycat.je.DatabaseException; import org.opends.server.types.DN; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import com.sleepycat.je.DatabaseException; import java.io.IOException; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * This class define an in-memory cache that will be used to store @@ -430,263 +425,148 @@ } } /** * Retrieves the destination handlers for a routable message. * Send back an ack to the server that sent the change. * * @param msg The message to route. * @param senderHandler The handler of the server that published this message. * @return The list of destination handlers. * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. */ protected List<ServerHandler> getDestinationServers(RoutableMessage msg, ServerHandler senderHandler) public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) { short serverId = changeNumber.getServerId(); sendAck(changeNumber, isLDAPserver, serverId); } List<ServerHandler> servers = new ArrayList<ServerHandler>(); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "getDestinationServers" + " msgDest:" + msg.getDestination() , 1); if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER) { // TODO Import from the "closest server" to be implemented } else if (msg.getDestination() == RoutableMessage.ALL_SERVERS) { if (!senderHandler.isChangelogServer()) { // Send to all changelogServers for (ServerHandler destinationHandler : changelogServers.values()) { servers.add(destinationHandler); } } // Send to all connected LDAP servers for (ServerHandler destinationHandler : connectedServers.values()) { // Don't loop on the sender if (destinationHandler == senderHandler) continue; servers.add(destinationHandler); } } /** * * Send back an ack to a server that sent the change. * * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. * @param serverId The identifier of the server from which we * received the change.. */ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, short serverId) { ServerHandler handler; if (isLDAPserver) handler = connectedServers.get(serverId); else { // Destination is one server ServerHandler destinationHandler = connectedServers.get(msg.getDestination()); if (destinationHandler != null) { servers.add(destinationHandler); } else { // the targeted server is NOT connected if (senderHandler.isLDAPserver()) { // let's forward to the other changelogs servers.addAll(changelogServers.values()); } } } handler = changelogServers.get(serverId); return servers; // TODO : check for null handler and log error try { handler.sendAck(changeNumber); } catch (IOException e) { /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. */ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); handler.shutdown(); } } /** * Process an InitializeRequestMessage. * * @param msg The message received and to be processed. * @param senderHandler The server handler of the server that emitted * the message. * Shutdown this ChangelogCache. */ public void process(RoutableMessage msg, ServerHandler senderHandler) public void shutdown() { List<ServerHandler> servers = getDestinationServers(msg, senderHandler); if (servers.isEmpty()) // Close session with other changelogs for (ServerHandler serverHandler : changelogServers.values()) { if (!(msg instanceof InitializeRequestMessage)) { // TODO A more elaborated policy is probably needed } else { ErrorMessage errMsg = new ErrorMessage( msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, "serverID:" + msg.getDestination()); try { senderHandler.send(errMsg); } catch(IOException ioe) { // TODO Handle error properly (sender timeout in addition) } } return; serverHandler.shutdown(); } for (ServerHandler targetHandler : servers) // Close session with other LDAP servers for (ServerHandler serverHandler : connectedServers.values()) { try { targetHandler.send(msg); } catch(IOException ioe) { // TODO Handle error properly (sender timeout in addition) } serverHandler.shutdown(); } // Shutdown the dbHandlers synchronized (sourceDbHandlers) { for (DbHandler dbHandler : sourceDbHandlers.values()) { dbHandler.shutdown(); } sourceDbHandlers.clear(); } } /** * Send back an ack to the server that sent the change. * * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. */ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) /** * Returns the ServerState describing the last change from this replica. * * @return The ServerState describing the last change from this replica. */ public ServerState getDbServerState() { ServerState serverState = new ServerState(); for (DbHandler db : sourceDbHandlers.values()) { short serverId = changeNumber.getServerId(); sendAck(changeNumber, isLDAPserver, serverId); serverState.update(db.getLastChange()); } return serverState; } /** * {@inheritDoc} */ @Override public String toString() { return "ChangelogCache " + baseDn; } /** * Check if some server Handler should be removed from flow control state. * @throws IOException If an error happened. */ public void checkAllSaturation() throws IOException { for (ServerHandler handler : changelogServers.values()) { handler.checkWindow(); } /** * * Send back an ack to a server that sent the change. * * @param changeNumber The ChangeNumber of the change that must be acked. * @param isLDAPserver This boolean indicates if the server that sent the * change was an LDAP server or a Changelog server. * @param serverId The identifier of the server from which we * received the change.. */ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, short serverId) for (ServerHandler handler : connectedServers.values()) { ServerHandler handler; if (isLDAPserver) handler = connectedServers.get(serverId); else handler = changelogServers.get(serverId); // TODO : check for null handler and log error try { handler.sendAck(changeNumber); } catch (IOException e) { /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. */ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); handler.shutdown(); } handler.checkWindow(); } } /** * Shutdown this ChangelogCache. */ public void shutdown() /** * Check if a server that was in flow control can now restart * sending updates. * @param sourceHandler The server that must be checked. * @return true if the server can restart sending changes. * false if the server can't restart sending changes. */ public boolean restartAfterSaturation(ServerHandler sourceHandler) { for (ServerHandler handler : changelogServers.values()) { // Close session with other changelogs for (ServerHandler serverHandler : changelogServers.values()) { serverHandler.shutdown(); } // Close session with other LDAP servers for (ServerHandler serverHandler : connectedServers.values()) { serverHandler.shutdown(); } // Shutdown the dbHandlers synchronized (sourceDbHandlers) { for (DbHandler dbHandler : sourceDbHandlers.values()) { dbHandler.shutdown(); } sourceDbHandlers.clear(); } } /** * Returns the ServerState describing the last change from this replica. * * @return The ServerState describing the last change from this replica. */ public ServerState getDbServerState() { ServerState serverState = new ServerState(); for (DbHandler db : sourceDbHandlers.values()) { serverState.update(db.getLastChange()); } return serverState; } /** * {@inheritDoc} */ @Override public String toString() { return "ChangelogCache " + baseDn; } /** * Check if some server Handler should be removed from flow control state. * @throws IOException If an error happened. */ public void checkAllSaturation() throws IOException { for (ServerHandler handler : changelogServers.values()) { handler.checkWindow(); } for (ServerHandler handler : connectedServers.values()) { handler.checkWindow(); } } /** * Check if a server that was in flow control can now restart * sending updates. * @param sourceHandler The server that must be checked. * @return true if the server can restart sending changes. * false if the server can't restart sending changes. */ public boolean restartAfterSaturation(ServerHandler sourceHandler) { for (ServerHandler handler : changelogServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) if (!handler.restartAfterSaturation(sourceHandler)) return false; } for (ServerHandler handler : connectedServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) return false; } return true; } for (ServerHandler handler : connectedServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) return false; } return true; } } opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,8 +27,6 @@ package org.opends.server.synchronization.changelog; import static org.opends.server.loggers.Error.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -46,18 +44,6 @@ import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.ChangelogStartMessage; import org.opends.server.synchronization.protocol.HeartbeatThread; import org.opends.server.synchronization.protocol.ProtocolSession; import org.opends.server.synchronization.protocol.RoutableMessage; import org.opends.server.synchronization.protocol.ServerStartMessage; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import org.opends.server.synchronization.protocol.WindowMessage; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; @@ -65,6 +51,17 @@ import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.InitializationException; import org.opends.server.core.DirectoryServer; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.ChangelogStartMessage; import org.opends.server.synchronization.protocol.ProtocolSession; import org.opends.server.synchronization.protocol.ServerStartMessage; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import org.opends.server.synchronization.protocol.WindowMessage; import org.opends.server.synchronization.protocol.HeartbeatThread; import org.opends.server.util.TimeThread; /** @@ -111,7 +108,6 @@ // flow controled and should // be stopped from sending messsages. private int saturationCount = 0; private short changelogId; /** * The time in milliseconds between heartbeats from the synchronization @@ -159,7 +155,6 @@ public void start(DN baseDn, short changelogId, String changelogURL, int windowSize, Changelog changelog) { this.changelogId = changelogId; rcvWindowSizeHalf = windowSize/2; maxRcvWindow = windowSize; rcvWindow = windowSize; @@ -1268,40 +1263,4 @@ { return heartbeatInterval; } /** * Processes a routable message. * * @param msg The message to be processed. */ public void process(RoutableMessage msg) { if (debugEnabled()) debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1); changelogCache.process(msg, this); } /** * Send an InitializeRequestMessage to the server connected through this * handler. * * @param msg The message to be processed * @throws IOException when raised by the underlying session */ public void send(RoutableMessage msg) throws IOException { if (debugEnabled()) debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1); session.publish(msg); } } opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,11 +34,6 @@ import org.opends.server.api.DirectoryThread; import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.ErrorMessage; import org.opends.server.synchronization.protocol.DoneMessage; import org.opends.server.synchronization.protocol.EntryMessage; import org.opends.server.synchronization.protocol.InitializeRequestMessage; import org.opends.server.synchronization.protocol.InitializeTargetMessage; import org.opends.server.synchronization.protocol.ProtocolSession; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; @@ -121,33 +116,6 @@ WindowMessage windowMsg = (WindowMessage) msg; handler.updateWindow(windowMsg); } else if (msg instanceof InitializeRequestMessage) { InitializeRequestMessage initializeMsg = (InitializeRequestMessage) msg; handler.process(initializeMsg); } else if (msg instanceof InitializeTargetMessage) { InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg; handler.process(initializeMsg); } else if (msg instanceof EntryMessage) { EntryMessage entryMsg = (EntryMessage) msg; handler.process(entryMsg); } else if (msg instanceof DoneMessage) { DoneMessage doneMsg = (DoneMessage) msg; handler.process(doneMsg); } else if (msg instanceof ErrorMessage) { ErrorMessage errorMsg = (ErrorMessage) msg; handler.process(errorMsg); } } } catch (IOException e) { opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,61 +317,13 @@ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42; /** * The message id for the description of the attribute used to configure * The message id for thedescription of the attribute used to configure * the purge delay of the Changelog Servers. */ public static final int MSGID_PURGE_DELAY_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43; /** * The message id for the error raised when export/import * is rejected due to an export/import already in progress. */ public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44; /** * The message id for the error raised when import * is rejected due to an invalid source of data imported. */ public static final int MSGID_INVALID_IMPORT_SOURCE = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45; /** * The message id for the error raised when export * is rejected due to an invalid target to export datas. */ public static final int MSGID_INVALID_EXPORT_TARGET = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46; /** * The message id for the error raised when import/export message * cannot be routed to an up-and-running target in the domain. */ public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47; /** * The message ID for the message that will be used when no domain * can be found matching the provided domain base DN. */ public static final int MSGID_NO_MATCHING_DOMAIN = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48; /** * The message ID for the message that will be used when no domain * can be found matching the provided domain base DN. */ public static final int MSGID_MULTIPLE_MATCHING_DOMAIN = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49; /** * The message ID for the message that will be used when the domain * belongs to a provider class that does not allow the export. */ public static final int MSGID_INVALID_PROVIDER = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50; /** * Register the messages from this class in the core server. @@ -497,20 +449,5 @@ " restored because changelog servers would not be able to refresh" + " LDAP servers with older versions of the data. A zero value" + " can be used to specify an infinite delay (or never purge)."); MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED, "The current request is rejected due to an import or an export" + " already in progress for the same data."); MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE, "Invalid source for the import."); MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET, "Invalid target for the export."); MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, "No reachable peer in the domain."); MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN, "No domain matches the base DN provided."); MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN, "Multiple domains match the base DN provided."); MessageHandler.registerMessage(MSGID_INVALID_PROVIDER, "The provider class does not allow the operation requested."); } } opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,8 +27,6 @@ package org.opends.server.synchronization.plugin; import static org.opends.server.loggers.Error.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -347,10 +345,6 @@ { if (session != null) { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "Broker : connect closing session" , 1); session.close(); session = null; } @@ -504,7 +498,6 @@ try { SynchronizationMessage msg = session.receive(); if (msg instanceof WindowMessage) { WindowMessage windowMsg = (WindowMessage) msg; @@ -551,11 +544,6 @@ connected = false; try { if (debugEnabled()) { debugInfo("ChangelogBroker Stop Closing session"); } session.close(); } catch (IOException e) {} @@ -694,12 +682,4 @@ { return numLostConnections; } private void log(String message) { int msgID = MSGID_UNKNOWN_TYPE; logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } } opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,13 +27,11 @@ package org.opends.server.synchronization.plugin; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import java.io.IOException; import org.opends.server.api.DirectoryThread; import org.opends.server.synchronization.protocol.ProtocolSession; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import java.io.IOException; /** * This class implements a thread to monitor heartbeat messages from the @@ -105,9 +103,6 @@ long lastReceiveTime = session.getLastReceiveTime(); if (now > lastReceiveTime + 2 * heartbeatInterval) { debugInfo("Heartbeat monitor is closing the broker session " + "because it could not detect a heartbeat."); // Heartbeat is well overdue so the server is assumed to be dead. if (debugEnabled()) { opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@ * Can be null is the request has no associated operation. * @return The Synchronization domain for this DN. */ public static SynchronizationDomain findDomain(DN dn, Operation op) private static SynchronizationDomain findDomain(DN dn, Operation op) { /* * Don't run the special synchronization code on Operation that are opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,8 +233,7 @@ { int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString(), Thread.currentThread().getStackTrace()); op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,59 +26,47 @@ */ package org.opends.server.synchronization.plugin; import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN; import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS; import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID; import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE; import static org.opends.server.loggers.Error.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.debugInfo; import static org.opends.server.messages.ConfigMessages.*; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.messages.ToolMessages.*; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME; import static org.opends.server.synchronization.protocol.OperationContext.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.createEntry; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.server.util.ServerConstants. TIME_UNIT_MILLISECONDS_ABBR; import static org.opends.server.util.ServerConstants. TIME_UNIT_MILLISECONDS_FULL; import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR; import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.synchronization.plugin.Historical.*; import static org.opends.server.synchronization.protocol.OperationContext.*; import static org.opends.server.loggers.Error.*; import static org.opends.server.messages.MessageHandler.*; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import java.util.LinkedHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; import org.opends.server.api.Backend; import org.opends.server.api.ConfigurableComponent; import org.opends.server.api.DirectoryThread; import org.opends.server.api.SynchronizationProvider; import org.opends.server.backends.jeb.BackendImpl; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; import org.opends.server.config.BooleanConfigAttribute; import org.opends.server.config.ConfigAttribute; import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.config.DNConfigAttribute; import org.opends.server.config.IntegerConfigAttribute; import org.opends.server.config.IntegerWithUnitConfigAttribute; import org.opends.server.config.StringConfigAttribute; import org.opends.server.config.IntegerWithUnitConfigAttribute; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.core.LockFileManager; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.Operation; import org.opends.server.messages.MessageHandler; import org.opends.server.synchronization.common.LogMessages; import org.opends.server.protocols.asn1.ASN1Exception; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; @@ -89,30 +77,19 @@ import org.opends.server.synchronization.protocol.AckMessage; import org.opends.server.synchronization.protocol.AddContext; import org.opends.server.synchronization.protocol.DeleteContext; import org.opends.server.synchronization.protocol.DoneMessage; import org.opends.server.synchronization.protocol.EntryMessage; import org.opends.server.synchronization.protocol.ErrorMessage; import org.opends.server.synchronization.protocol.InitializeRequestMessage; import org.opends.server.synchronization.protocol.InitializeTargetMessage; import org.opends.server.synchronization.protocol.ModifyContext; import org.opends.server.synchronization.protocol.ModifyDNMsg; import org.opends.server.synchronization.protocol.ModifyDnContext; import org.opends.server.synchronization.protocol.OperationContext; import org.opends.server.synchronization.protocol.RoutableMessage; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.synchronization.protocol.UpdateMessage; import org.opends.server.tasks.InitializeTargetTask; import org.opends.server.tasks.InitializeTask; import org.opends.server.tasks.TaskUtils; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DirectoryException; import org.opends.server.types.DN; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.Modification; import org.opends.server.types.RDN; import org.opends.server.types.ResultCode; @@ -160,96 +137,8 @@ * server. Zero means heartbeats are off. */ private long heartbeatInterval = 0; short serverId; /** * This class contain the context related to an import or export * launched on the domain. */ private class IEContext { // The task that initiated the operation. Task initializeTask; // The input stream for the import SynchroLDIFInputStream ldifImportInputStream = null; // The target in the case of an export short exportTarget = RoutableMessage.UNKNOWN_SERVER; // The source in the case of an import short importSource = RoutableMessage.UNKNOWN_SERVER; // The total entry count expected to be processed long entryCount = 0; // The count for the entry left to be processed long entryLeftCount = 0; // The exception raised when any DirectoryException exception = null; /** * Initializes the counters of the task with the provider value. * @param count The value with which to initialize the counters. */ public void initTaskCounters(long count) { entryCount = count; entryLeftCount = count; if (initializeTask != null) { if (initializeTask instanceof InitializeTask) { ((InitializeTask)initializeTask).setTotal(entryCount); ((InitializeTask)initializeTask).setLeft(entryCount); } else if (initializeTask instanceof InitializeTargetTask) { ((InitializeTargetTask)initializeTask).setTotal(entryCount); ((InitializeTargetTask)initializeTask).setLeft(entryCount); } } } /** * Update the counters of the task for each entry processed during * an import or export. */ public void updateTaskCounters() { entryLeftCount--; if (initializeTask != null) { if (initializeTask instanceof InitializeTask) { ((InitializeTask)initializeTask).setLeft(entryLeftCount); } else if (initializeTask instanceof InitializeTargetTask) { ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); } } } /** * Update the state of the task. */ protected TaskState updateTaskCompletionState() { if (exception == null) return TaskState.COMPLETED_SUCCESSFULLY; else return TaskState.STOPPED_BY_ERROR; } } // The context related to an import or export being processed // Null when none is being processed. private IEContext ieContext = null; // The backend informations necessary to make an import or export. private Backend backend; private ConfigEntry backendConfigEntry; private List<DN> branches = new ArrayList<DN>(0); private short serverId; private int listenerThreadNumber = 10; private boolean receiveStatus = true; @@ -271,7 +160,6 @@ private boolean solveConflictFlag = true; private boolean disabled = false; private boolean stateSavingDisabled = false; static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn"; @@ -317,6 +205,8 @@ timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D); } /** * Creates a new SynchronizationDomain using configuration from configEntry. * @@ -327,7 +217,6 @@ public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException { super("Synchronization flush"); /* * read the centralized changelog server configuration * this is a multivalued attribute @@ -508,10 +397,6 @@ if (!receiveStatus) broker.suspendReceive(); } // Retrieves the related backend and its config entry retrievesBackendInfos(baseDN); } catch (Exception e) { /* TODO should mark that changelog service is @@ -918,9 +803,9 @@ } /** * Receives an update message from the changelog. * Receive an update message from the changelog. * also responsible for updating the list of pending changes * @return the received message - null if none * @return the received message */ public UpdateMessage receive() { @@ -938,7 +823,7 @@ // The server is in the shutdown process return null; } log("Broker received message :" + msg); if (msg instanceof AckMessage) { AckMessage ack = (AckMessage) msg; @@ -949,56 +834,6 @@ update = (UpdateMessage) msg; receiveUpdate(update); } else if (msg instanceof InitializeRequestMessage) { // Another server requests us to provide entries // for a total update InitializeRequestMessage initMsg = (InitializeRequestMessage) msg; try { initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), null); } catch(DirectoryException de) { // Returns an error message to notify the sender int msgID = de.getErrorMessageID(); ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), msgID, de.getMessage()); broker.publish(errorMsg); } } else if (msg instanceof InitializeTargetMessage) { // Another server is exporting its entries to us InitializeTargetMessage initMsg = (InitializeTargetMessage) msg; try { importBackend(initMsg); } catch(DirectoryException de) { // Return an error message to notify the sender int msgID = de.getErrorMessageID(); ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), msgID, de.getMessage()); log(getMessage(msgID, backend.getBackendID()) + de.getMessage()); broker.publish(errorMsg); } } else if (msg instanceof ErrorMessage) { if (ieContext != null) { // This is an error termination for the 2 following cases : // - either during an export // - or before an import really started // For example, when we publish a request and the // changelog did not find any import source. abandonImportExport((ErrorMessage)msg); } } } catch (SocketTimeoutException e) { // just retry @@ -1041,9 +876,7 @@ public void receiveAck(AckMessage ack) { UpdateMessage update; ChangeNumber changeNumber; changeNumber = ack.getChangeNumber(); ChangeNumber changeNumber = ack.getChangeNumber(); synchronized (pendingChanges) { @@ -1272,7 +1105,7 @@ synchronized (this) { this.wait(1000); if (!disabled && !stateSavingDisabled ) if (!disabled ) { // save the RUV state.save(); @@ -1318,8 +1151,6 @@ this.notify(); } DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName()); // stop the ChangelogBroker broker.stop(); } @@ -2026,7 +1857,6 @@ return broker.getNumLostConnections(); } /** * Check if the domain solve conflicts. * @@ -2103,988 +1933,6 @@ // Nothing is needed at the moment } /* * Total Update >> */ /** * Receives bytes related to an entry in the context of an import to * initialize the domain (called by SynchronizationDomainLDIFInputStream). * * @return The bytes. Null when the Done or Err message has been received */ public byte[] receiveEntryBytes() { SynchronizationMessage msg; while (true) { try { msg = broker.receive(); if (msg == null) { // The server is in the shutdown process return null; } log("receiveEntryBytes: received " + msg); if (msg instanceof EntryMessage) { // FIXME EntryMessage entryMsg = (EntryMessage)msg; byte[] entryBytes = entryMsg.getEntryBytes().clone(); ieContext.updateTaskCounters(); return entryBytes; } else if (msg instanceof DoneMessage) { // This is the normal termination of the import // No error is stored and the import is ended // by returning null return null; } else if (msg instanceof ErrorMessage) { // This is an error termination during the import // The error is stored and the import is ended // by returning null ErrorMessage errorMsg = (ErrorMessage)msg; ieContext.exception = new DirectoryException(ResultCode.OTHER, errorMsg.getDetails() , errorMsg.getMsgID()); return null; } else { // Other messages received during an import are trashed } } catch(Exception e) { ieContext.exception = new DirectoryException(ResultCode.OTHER, "received an unexpected message type" , 1, e); } return null; } } /** * Processes an error message received while an import/export is * on going. * @param errorMsg The error message received. */ protected void abandonImportExport(ErrorMessage errorMsg) { // FIXME TBD Treat the case where the error happens while entries // are being exported if (ieContext != null) { ieContext.exception = new DirectoryException(ResultCode.OTHER, errorMsg.getDetails() , errorMsg.getMsgID()); if (ieContext.initializeTask instanceof InitializeTask) { // Update the task that initiated the import ((InitializeTask)ieContext.initializeTask). setState(ieContext.updateTaskCompletionState(),ieContext.exception); ieContext = null; } } } /** * Clears all the entries from the JE backend determined by the * be id passed into the method. * * @param createBaseEntry Indicate whether to automatically create the base * entry and add it to the backend. * @param beID The be id to clear. * @param dn The suffix of the backend to create if the the createBaseEntry * boolean is true. * @throws Exception If an unexpected problem occurs. */ public static void clearJEBackend(boolean createBaseEntry, String beID, String dn) throws Exception { BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID); DN[] baseDNs = backend.getBaseDNs(); // FIXME Should getConfigEntry be part of TaskUtils ? ConfigEntry configEntry = TaskUtils.getConfigEntry(backend); // FIXME Should setBackendEnabled be part of TaskUtils ? TaskUtils.setBackendEnabled(configEntry, false); try { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason)) { throw new RuntimeException(failureReason.toString()); } try { backend.clearBackend(configEntry, baseDNs); } finally { LockFileManager.releaseLock(lockFile, failureReason); } } finally { TaskUtils.setBackendEnabled(configEntry, true); } if (createBaseEntry) { DN baseDN = DN.decode(dn); Entry e = createEntry(baseDN); backend = (BackendImpl)DirectoryServer.getBackend(beID); backend.addEntry(e, null); } } /** * Log debug message. * @param message The message to log. */ private void log(String message) { if (debugEnabled()) { debugInfo("DebugInfo" + message); int msgID = MSGID_UNKNOWN_TYPE; logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationDomain/ " + message, msgID); } } /** * Export the entries. * @throws DirectoryException when an error occured */ protected void exportBackend() throws DirectoryException { // FIXME Temporary workaround - will probably be fixed when implementing // dynamic config retrievesBackendInfos(this.baseDN); // Acquire a shared lock for the backend. try { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (! LockFileManager.acquireSharedLock(lockFile, failureReason)) { int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } catch (Exception e) { int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message + " " + stackTraceToSingleLineString(e), msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this); LDIFExportConfig exportConfig = new LDIFExportConfig(os); // Launch the export. try { DN[] baseDNs = {this.baseDN}; backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig); } catch (DirectoryException de) { int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT; String message = getMessage(msgID, de.getErrorMessage()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT; String message = getMessage(msgID, stackTraceToSingleLineString(e)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } finally { // Clean up after the export by closing the export config. exportConfig.close(); // Release the shared lock on the backend. try { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (! LockFileManager.releaseLock(lockFile, failureReason)) { int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } catch (Exception e) { int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), stackTraceToSingleLineString(e)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } } /** * Retrieves the backend object related to the domain and the backend's * config entry. They will be used for import and export. * TODO This should be in a shared package rather than here. * * @param baseDN The baseDN to retrieve the backend * @throws DirectoryException when an error occired */ protected void retrievesBackendInfos(DN baseDN) throws DirectoryException { ArrayList<Backend> backendList = new ArrayList<Backend>(); ArrayList<ConfigEntry> entryList = new ArrayList<ConfigEntry>(); ArrayList<List<DN>> dnList = new ArrayList<List<DN>>(); Backend backend = null; ConfigEntry backendConfigEntry = null; List<DN> branches = new ArrayList<DN>(0); // Retrieves the backend related to this domain Backend domainBackend = DirectoryServer.getBackend(baseDN); if (domainBackend == null) { int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Retrieves its config entry and its DNs int code = getBackends(backendList, entryList, dnList); if (code != 0) { int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } int numBackends = backendList.size(); for (int i=0; i < numBackends; i++) { Backend b = backendList.get(i); if (domainBackend.getBackendID() != b.getBackendID()) { continue; } if (backend == null) { backend = domainBackend; backendConfigEntry = entryList.get(i).duplicate(); branches = dnList.get(i); } else { int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID; String message = getMessage(msgID, domainBackend.getBackendID()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } } if (backend == null) { int msgID = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID; String message = getMessage(msgID, domainBackend.getBackendID()); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } else if (! backend.supportsLDIFExport()) { int msgID = MSGID_LDIFIMPORT_CANNOT_IMPORT; String message = getMessage(msgID, 0); // FIXME logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } this.backend = backend; this.backendConfigEntry = backendConfigEntry; this.branches = branches; } /** * Sends lDIFEntry entry lines to the export target currently set. * * @param lDIFEntry The lines for the LDIF entry. * @throws IOException when an error occured. */ public void sendEntryLines(String lDIFEntry) throws IOException { // If an error was raised - like receiving an ErrorMessage // we just let down the export. if (ieContext.exception != null) { IOException ioe = new IOException(ieContext.exception.getMessage()); ieContext = null; throw ioe; } // new entry then send the current one EntryMessage entryMessage = new EntryMessage( serverId, ieContext.exportTarget, lDIFEntry.getBytes()); broker.publish(entryMessage); ieContext.updateTaskCounters(); } /** * Retrieves information about the backends defined in the Directory Server * configuration. * * @param backendList A list into which instantiated (but not initialized) * backend instances will be placed. * @param entryList A list into which the config entries associated with * the backends will be placed. * @param dnList A list into which the set of base DNs for each backend * will be placed. */ private static int getBackends(ArrayList<Backend> backendList, ArrayList<ConfigEntry> entryList, ArrayList<List<DN>> dnList) throws DirectoryException { // Get the base entry for all backend configuration. DN backendBaseDN = null; try { backendBaseDN = DN.decode(DN_BACKEND_BASE); } catch (DirectoryException de) { int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN; String message = getMessage(msgID, DN_BACKEND_BASE, stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } ConfigEntry baseEntry = null; try { baseEntry = DirectoryServer.getConfigEntry(backendBaseDN); } catch (ConfigException ce) { int msgID = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY; String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY; String message = getMessage(msgID, DN_BACKEND_BASE, stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Iterate through the immediate children, attempting to parse them as // backends. for (ConfigEntry configEntry : baseEntry.getChildren().values()) { // Get the backend ID attribute from the entry. If there isn't one, then // skip the entry. String backendID = null; try { int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID; StringConfigAttribute idStub = new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID), true, false, true); StringConfigAttribute idAttr = (StringConfigAttribute) configEntry.getConfigAttribute(idStub); if (idAttr == null) { continue; } else { backendID = idAttr.activeValue(); } } catch (ConfigException ce) { int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), ce.getMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Get the backend class name attribute from the entry. If there isn't // one, then just skip the entry. String backendClassName = null; try { int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS; StringConfigAttribute classStub = new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID), true, false, false); StringConfigAttribute classAttr = (StringConfigAttribute) configEntry.getConfigAttribute(classStub); if (classAttr == null) { continue; } else { backendClassName = classAttr.activeValue(); } } catch (ConfigException ce) { int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), ce.getMessage()); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } catch (Exception e) { int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } Class backendClass = null; try { backendClass = Class.forName(backendClassName); } catch (Exception e) { int msgID = MSGID_LDIFIMPORT_CANNOT_LOAD_BACKEND_CLASS; String message = getMessage(msgID, backendClassName, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } Backend backend = null; try { backend = (Backend) backendClass.newInstance(); backend.setBackendID(backendID); } catch (Exception e) { int msgID = MSGID_LDIFIMPORT_CANNOT_INSTANTIATE_BACKEND_CLASS; String message = getMessage(msgID, backendClassName, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } // Get the base DN attribute from the entry. If there isn't one, then // just skip this entry. List<DN> baseDNs = null; try { int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS; DNConfigAttribute baseDNStub = new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID), true, true, true); DNConfigAttribute baseDNAttr = (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub); if (baseDNAttr == null) { msgID = MSGID_LDIFIMPORT_NO_BASES_FOR_BACKEND; String message = getMessage(msgID, String.valueOf(configEntry.getDN())); throw new DirectoryException( DirectoryServer.getServerErrorResultCode(), message,msgID, null); } else { baseDNs = baseDNAttr.activeValues(); } } catch (Exception e) { int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BASES_FOR_BACKEND; String message = getMessage(msgID, String.valueOf(configEntry.getDN()), stackTraceToSingleLineString(e)); throw new DirectoryException( ResultCode.OTHER, message, msgID, null); } backendList.add(backend); entryList.add(configEntry); dnList.add(baseDNs); } return 0; } /** * Initializes this domain from another source server. * * @param source The source from which to initialize * @param initTask The task that launched the initialization * and should be updated of its progress. * @throws DirectoryException when an error occurs */ public void initialize(short source, Task initTask) throws DirectoryException { acquireIEContext(); ieContext.initializeTask = initTask; InitializeRequestMessage initializeMsg = new InitializeRequestMessage( baseDN, serverId, source); // Publish Init request msg broker.publish(initializeMsg); // .. we expect to receive entries or err after that } /** * Verifies that the given string represents a valid source * from which this server can be initialized. * @param sourceString The string representaing the source * @return The source as a short value * @throws DirectoryException if the string is not valid */ public short decodeSource(String sourceString) throws DirectoryException { short source = 0; Throwable cause = null; try { source = Integer.decode(sourceString).shortValue(); if (source >= -1) { // TODO Verifies serverID is in the domain // We shold check here that this is a server implied // in the current domain. log("Source decoded for import:" + source); return source; } } catch(Exception e) { cause = e; } ResultCode resultCode = ResultCode.OTHER; int errorMessageID = MSGID_INVALID_IMPORT_SOURCE; String message = getMessage(errorMessageID); if (cause != null) throw new DirectoryException( resultCode, message, errorMessageID, cause); else throw new DirectoryException( resultCode, message, errorMessageID); } /** * Verifies that the given string represents a valid source * from which this server can be initialized. * @param targetString The string representing the source * @return The source as a short value * @throws DirectoryException if the string is not valid */ public short decodeTarget(String targetString) throws DirectoryException { short target = 0; Throwable cause; if (targetString.equalsIgnoreCase("all")) { return RoutableMessage.ALL_SERVERS; } // So should be a serverID try { target = Integer.decode(targetString).shortValue(); if (target >= 0) { // FIXME Could we check now that it is a know server in the domain ? } return target; } catch(Exception e) { cause = e; } ResultCode resultCode = ResultCode.OTHER; int errorMessageID = MSGID_INVALID_EXPORT_TARGET; String message = getMessage(errorMessageID); if (cause != null) throw new DirectoryException( resultCode, message, errorMessageID, cause); else throw new DirectoryException( resultCode, message, errorMessageID); } private synchronized void acquireIEContext() throws DirectoryException { if (ieContext != null) { // Rejects 2 simultaneous exports int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED; String message = getMessage(msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } ieContext = new IEContext(); } private synchronized void releaseIEContext() { ieContext = null; } /** * Process the initialization of some other server or servers in the topology * specified by the target argument. * @param target The target that should be initialized * @param initTask The task that triggers this initialization and that should * be updated with its progress. * @exception DirectoryException When an error occurs. */ public void initializeTarget(short target, Task initTask) throws DirectoryException { initializeTarget(target, serverId, initTask); } /** * Process the initialization of some other server or servers in the topology * specified by the target argument when this initialization has been * initiated by another server than this one. * @param target The target that should be initialized. * @param requestorID The server that initiated the export. * @param initTask The task that triggers this initialization and that should * be updated with its progress. * @exception DirectoryException When an error occurs. */ public void initializeTarget(short target, short requestorID, Task initTask) throws DirectoryException { acquireIEContext(); ieContext.exportTarget = target; ieContext.initializeTask = initTask; ieContext.initTaskCounters(backend.getEntryCount()); // Send start message InitializeTargetMessage initializeMessage = new InitializeTargetMessage( baseDN, serverId, ieContext.exportTarget, requestorID, ieContext.entryLeftCount); log("SD : publishes " + initializeMessage + " for #entries=" + ieContext.entryCount); broker.publish(initializeMessage); // make an export and send entries exportBackend(); // Successfull termnation DoneMessage doneMsg = new DoneMessage(serverId, initializeMessage.getDestination()); broker.publish(doneMsg); if (ieContext != null) { ieContext.updateTaskCompletionState(); ieContext = null; } } /** * Process backend before import. * @param backend The backend. * @param backendConfigEntry The config entry of the backend. * @throws Exception */ private void preBackendImport(Backend backend, ConfigEntry backendConfigEntry) throws Exception { // Stop saving state stateSavingDisabled = true; // Clear the backend clearJEBackend(false,backend.getBackendID(),null); // FIXME setBackendEnabled should be part of TaskUtils ? TaskUtils.setBackendEnabled(backendConfigEntry, false); // Acquire an exclusive lock for the backend. String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) { int msgID = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } } /** * Initializes the domain's backend with received entries. * @param initializeMessage The message that initiated the import. * @exception DirectoryException Thrown when an error occurs. */ protected void importBackend(InitializeTargetMessage initializeMessage) throws DirectoryException { LDIFImportConfig importConfig = null; try { log("startImport"); if (initializeMessage.getRequestorID() == serverId) { // The import responds to a request we did so the IEContext // is already acquired } else { acquireIEContext(); } ieContext.importSource = initializeMessage.getsenderID(); ieContext.entryLeftCount = initializeMessage.getEntryCount(); ieContext.initTaskCounters(initializeMessage.getEntryCount()); preBackendImport(this.backend, this.backendConfigEntry); DN[] baseDNs = {baseDN}; ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this); importConfig = new LDIFImportConfig(ieContext.ldifImportInputStream); importConfig.setIncludeBranches(this.branches); // TODO How to deal with rejected entries during the import // importConfig.writeRejectedEntries("rejectedImport", // ExistingFileBehavior.OVERWRITE); // Process import this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig); stateSavingDisabled = false; // Re-exchange state with SS broker.stop(); broker.start(changelogServers); } catch(Exception e) { throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(), 2);// FIXME } finally { // Cleanup importConfig.close(); // Re-enable backend closeBackendImport(this.backend, this.backendConfigEntry); // Update the task that initiated the import if ((ieContext != null ) && (ieContext.initializeTask != null)) { ((InitializeTask)ieContext.initializeTask). setState(ieContext.updateTaskCompletionState(),ieContext.exception); } releaseIEContext(); log("End importBackend"); } // Success } /** * Make post import operations. * @param backend The backend implied in the import. * @param backendConfigEntry The config entry of the backend. * @exception DirectoryException Thrown when an error occurs. */ protected void closeBackendImport(Backend backend, ConfigEntry backendConfigEntry) throws DirectoryException { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); // Release lock if (!LockFileManager.releaseLock(lockFile, failureReason)) { int msgID = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND; String message = getMessage(msgID, backend.getBackendID(), String.valueOf(failureReason)); logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message, msgID); new DirectoryException(ResultCode.OTHER, message, msgID); } // FIXME setBackendEnabled should be part taskUtils ? TaskUtils.setBackendEnabled(backendConfigEntry, true); } /** * Retrieves a synchronization domain based on the baseDN. * * @param baseDN The baseDN of the domain to retrieve * @return The domain retrieved * @throws DirectoryException When an error occured. */ public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN) throws DirectoryException { SynchronizationDomain synchronizationDomain = null; // Retrieves the domain DirectoryServer.getSynchronizationProviders(); for (SynchronizationProvider provider : DirectoryServer.getSynchronizationProviders()) { if (!( provider instanceof MultimasterSynchronization)) { int msgID = LogMessages.MSGID_INVALID_PROVIDER; String message = getMessage(msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } // From the domainDN retrieves the synchronization domain SynchronizationDomain sdomain = MultimasterSynchronization.findDomain(baseDN, null); if (sdomain == null) { int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN; String message = getMessage(msgID) + " " + baseDN; throw new DirectoryException(ResultCode.OTHER, message, msgID); } if (synchronizationDomain != null) { // Should never happen int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN; String message = getMessage(msgID); throw new DirectoryException(ResultCode.OTHER, message, msgID); } synchronizationDomain = sdomain; } return synchronizationDomain; } /** * Returns the backend associated to this domain. * @return The associated backend. */ public Backend getBackend() { return backend; } /** * Returns a boolean indiciating if an import or export is currently * processed. * @return The status */ public boolean ieRunning() { return (ieContext != null); } /* * <<Total Update */ /** * Push the modifications contain the in given parameter has * a modification that would happen on a local server. opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,16 +112,11 @@ /* Read the first 8 bytes containing the packet length */ int length = 0; /* Let's start the stop-watch before waiting on read */ /* for the heartbeat check to be operationnal */ lastReceiveTime = System.currentTimeMillis(); while (length<8) { int read = input.read(rcvLengthBuf, length, 8-length); if (read == -1) { lastReceiveTime=0; throw new IOException("no more data"); } else @@ -140,9 +135,8 @@ { length += input.read(buffer, length, totalLength - length); } /* We do not want the heartbeat to close the session when */ /* we are processing a message even a time consuming one. */ lastReceiveTime=0; lastReceiveTime = System.currentTimeMillis(); return SynchronizationMessage.generateMsg(buffer); } catch (OutOfMemoryError e) @@ -165,10 +159,6 @@ */ public long getLastReceiveTime() { if (lastReceiveTime==0) { return System.currentTimeMillis(); } return lastReceiveTime; } opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,13 +47,6 @@ static final byte MSG_TYPE_CHANGELOG_START = 7; static final byte MSG_TYPE_WINDOW = 8; static final byte MSG_TYPE_HEARTBEAT = 9; static final byte MSG_TYPE_INITIALIZE_REQUEST = 10; static final byte MSG_TYPE_INITIALIZE_TARGET = 11; static final byte MSG_TYPE_ENTRY = 12; static final byte MSG_TYPE_DONE = 13; static final byte MSG_TYPE_ERROR = 14; // Adding a new type of message here probably requires to // change accordingly generateMsg method below /** * Return the byte[] representation of this message. @@ -67,11 +60,6 @@ * MSG_TYPE_CHANGELOG_START * MSG_TYPE_WINDOW * MSG_TYPE_HEARTBEAT * MSG_TYPE_INITIALIZE * MSG_TYPE_INITIALIZE_TARGET * MSG_TYPE_ENTRY * MSG_TYPE_DONE * MSG_TYPE_ERROR * * @return the byte[] representation of this message. */ @@ -119,21 +107,6 @@ case MSG_TYPE_HEARTBEAT: msg = new HeartbeatMessage(buffer); break; case MSG_TYPE_INITIALIZE_REQUEST: msg = new InitializeRequestMessage(buffer); break; case MSG_TYPE_INITIALIZE_TARGET: msg = new InitializeTargetMessage(buffer); break; case MSG_TYPE_ENTRY: msg = new EntryMessage(buffer); break; case MSG_TYPE_DONE: msg = new DoneMessage(buffer); break; case MSG_TYPE_ERROR: msg = new ErrorMessage(buffer); break; default: throw new DataFormatException("received message with unknown type"); } opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,7 +26,6 @@ */ package org.opends.server.synchronization; import static org.opends.server.loggers.Error.logError; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -55,8 +54,6 @@ import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ByteStringFactory; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.SearchScope; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.AttributeType; @@ -161,11 +158,7 @@ } } catch (Exception e) { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1); } { } } return broker; } @@ -228,8 +221,7 @@ } } catch (Exception e) { } { } } return broker; } @@ -239,10 +231,6 @@ */ protected void cleanEntries() { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationTestCase/Cleaning entries" , 1); DeleteOperation op; // Delete entries try @@ -250,10 +238,6 @@ while (true) { DN dn = entryList.removeLast(); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "cleaning entry " + dn, 1); op = new DeleteOperation(connection, InternalClientConnection .nextOperationID(), InternalClientConnection.nextMessageID(), null, dn); @@ -280,13 +264,8 @@ // WORKAROUND FOR BUG #639 - BEGIN - if (mms != null) { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "SynchronizationTestCase/FinalizeSynchronization Provider" , 1); DirectoryServer.deregisterSynchronizationProvider(mms); mms.finalizeSynchronizationProvider(); mms = null; } // WORKAROUND FOR BUG #639 - END - @@ -324,22 +303,17 @@ // // Add the changelog server if (changeLogEntry!=null) { DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), "Unable to add the changeLog server"); entryList.add(changeLogEntry.getDN()); } if (synchroServerEntry!=null) { // We also have a replicated suffix (synchronization domain) DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized suffix"); entryList.add(synchroServerEntry.getDN()); } entryList.add(changeLogEntry.getDN()); // // We also have a replicated suffix (synchronization domain) DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized server"); entryList.add(synchroServerEntry.getDN()); } /** opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@ + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n" + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n" + "ds-cfg-window-size: 100" + "\n" + "ds-cfg-changelog-db-directory: changelogDb"+i; + "ds-cfg-changelog-db-dirname: changelogDb"+i; Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); ConfigEntry changelogConfig = new ConfigEntry(tmp, null); changelogs[i] = new Changelog(changelogConfig); opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,18 +26,16 @@ */ package org.opends.server.synchronization.protocol; import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.UUID; import java.util.zip.DataFormatException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.testng.Assert.*; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; @@ -58,8 +56,8 @@ import org.opends.server.types.ObjectClass; import org.opends.server.types.RDN; import org.opends.server.util.TimeThread; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.opends.server.synchronization.protocol.OperationContext.*; /** * Test the contructors, encoders and decoders of the synchronization @@ -522,104 +520,6 @@ } /** * Test that EntryMessage encoding and decoding works * by checking that : msg == new EntryMessageTest(msg.getBytes()). */ @Test() public void EntryMessageTest() throws Exception { String taskInitFromS2 = new String( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks\n" + "objectclass: top\n" + "objectclass: ds-task\n" + "objectclass: ds-task-initialize\n" + "ds-task-class-name: org.opends.server.tasks.InitializeTask" + "ds-task-initialize-domain-dn: dc=example,dc=com" + "ds-task-initialize-source: 1"); short sender = 1; short target = 2; byte[] entry = taskInitFromS2.getBytes(); EntryMessage msg = new EntryMessage(sender, target, entry); EntryMessage newMsg = new EntryMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes()); } /** * Test that InitializeRequestMessage encoding and decoding works */ @Test() public void InitializeRequestMessageTest() throws Exception { short sender = 1; short target = 2; InitializeRequestMessage msg = new InitializeRequestMessage( DN.decode("dc=example"), sender, target); InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn())); } /** * Test that InitializeTargetMessage encoding and decoding works */ @Test() public void InitializeTargetMessageTest() throws Exception { short senderID = 1; short targetID = 2; short requestorID = 3; long entryCount = 4; DN baseDN = DN.decode("dc=example"); InitializeTargetMessage msg = new InitializeTargetMessage( baseDN, senderID, targetID, requestorID, entryCount); InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertEquals(msg.getRequestorID(), newMsg.getRequestorID()); assertEquals(msg.getEntryCount(), newMsg.getEntryCount()); assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ; assertEquals(senderID, newMsg.getsenderID()); assertEquals(targetID, newMsg.getDestination()); assertEquals(requestorID, newMsg.getRequestorID()); assertEquals(entryCount, newMsg.getEntryCount()); assertTrue(baseDN.equals(newMsg.getBaseDN())) ; } /** * Test that DoneMessage encoding and decoding works */ @Test() public void DoneMessage() throws Exception { DoneMessage msg = new DoneMessage((short)1, (short)2); DoneMessage newMsg = new DoneMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); } /** * Test that ErrorMessage encoding and decoding works */ @Test() public void ErrorMessage() throws Exception { ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details"); ErrorMessage newMsg = new ErrorMessage(msg.getBytes()); assertEquals(msg.getsenderID(), newMsg.getsenderID()); assertEquals(msg.getDestination(), newMsg.getDestination()); assertEquals(msg.getMsgID(), newMsg.getMsgID()); assertEquals(msg.getDetails(), newMsg.getDetails()); } /** * Test PendingChange */ private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)