fix for #1733 & #845 - Initialization of replication
3 files added
41 files modified
| | |
| | | ds-cfg-allowed-task: org.opends.server.tasks.ImportTask |
| | | ds-cfg-allowed-task: org.opends.server.tasks.InitializeTargetTask |
| | | ds-cfg-allowed-task: org.opends.server.tasks.InitializeTask |
| | | ds-cfg-allowed-task: org.opends.server.tasks.SetGenerationIdTask |
| | | ds-cfg-allowed-task: org.opends.server.tasks.LeaveLockdownModeTask |
| | | ds-cfg-allowed-task: org.opends.server.tasks.RebuildTask |
| | | ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask |
| | |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.450 NAME 'ds-cfg-message-body' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.479 |
| | | NAME 'ds-sync-generation-id' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 USAGE directoryOperation SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.451 |
| | | NAME 'ds-task-import-clear-backend' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE |
| | |
| | | NAME 'ds-cfg-notification-sender-address' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.480 |
| | | NAME 'ds-task-reset-generation-id-domain-base-dn' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.466 |
| | | NAME 'ds-cfg-plugin-order-subordinate-modify-dn' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE |
| | |
| | | MUST ds-task-disconnect-connection-id |
| | | MAY ( ds-task-disconnect-message $ ds-task-disconnect-notify-client ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.124 |
| | | NAME 'ds-task-reset-generation-id' SUP ds-task |
| | | MUST ( ds-task-reset-generation-id-domain-base-dn ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.121 |
| | | NAME 'ds-cfg-regular-expression-identity-mapper' SUP ds-cfg-identity-mapper |
| | | STRUCTURAL MUST ( ds-cfg-match-attribute $ ds-cfg-match-pattern ) |
| | |
| | | MILD_ERR_UNKNOWN_TYPE_7=Unknown operation type : %s |
| | | MILD_ERR_OPERATION_NOT_FOUND_IN_PENDING_9=Internal Error : Operation %s \ |
| | | change number %s was not found in pending list |
| | | MILD_ERR_COULD_NOT_INITIALIZE_DB_10=Changelog failed to start because the \ |
| | | MILD_ERR_COULD_NOT_INITIALIZE_DB_10=The replication server failed to start because the \ |
| | | database %s could not be opened |
| | | MILD_ERR_COULD_NOT_READ_DB_11=Changelog failed to start because the database \ |
| | | %s could not be read |
| | | MILD_ERR_COULD_NOT_READ_DB_11=The replication server failed to start because the database \ |
| | | %s could not be read : %s |
| | | MILD_ERR_EXCEPTION_REPLAYING_OPERATION_12=An Exception was caught while \ |
| | | replaying operation %s : %s |
| | | MILD_ERR_NEED_CHANGELOG_PORT_13=The replication server port must be defined |
| | |
| | | NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \ |
| | | listening on %s |
| | | NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \ |
| | | changes that this server has already processed |
| | | changes that this server has already processed on suffix %s |
| | | NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \ |
| | | server should be configured |
| | | SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught exception during initial \ |
| | | communication with replication server: %s |
| | | NOTICE_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \ |
| | | communication on domain %s with replication server %s : %s |
| | | MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \ |
| | | database for base DN %s |
| | | NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \ |
| | | replication server that has seen all the local changes. Going to replay \ |
| | | replication server that has seen all the local changes on suffix %s. Going to replay \ |
| | | changes |
| | | NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \ |
| | | server on suffix %s, retrying... |
| | |
| | | NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \ |
| | | has been dropped by the Replication Server |
| | | SEVERE_ERR_CHANGELOG_ERROR_SENDING_INFO_64=An unexpected error occurred \ |
| | | while sending a Server Info message to %s. This connection is going to be \ |
| | | while sending a Server Info message to %s. This connection is going to be \ |
| | | closed and reopened |
| | | SEVERE_ERR_CHANGELOG_ERROR_SENDING_ERROR_65=An unexpected error occurred \ |
| | | while sending an Error Message to %s. This connection is going to be closed \ |
| | |
| | | ChangeNumber %s error %s %s |
| | | MILD_ERR_UNKNOWN_ATTRIBUTE_IN_HISTORICAL_68=The entry %s has historical \ |
| | | information for attribute %s which is not defined in the schema. This \ |
| | | information will be ignored |
| | | information will be ignored |
| | | NOTICE_UNRESOLVED_CONFLICT_69=An unresolved conflict was detected for DN %s |
| | | SEVERE_ERR_COULD_NOT_CLOSE_THE_SOCKET_70=The Replication Server socket could not \ |
| | | be closed : %s |
| | | SEVERE_ERR_COULD_NOT_STOP_LISTEN_THREAD_71=The thread listening on the \ |
| | | replication server port could not be stopped : %s |
| | | DEBUG_REPLICATION_PORT_IOEXCEPTION_72=An IOException was caught while \ |
| | | listening on the replication port |
| | | listening on the replication port |
| | | SEVERE_ERR_SEARCHING_GENERATION_ID_73=An unexpected error %s occured when \ |
| | | searching for generation id for domain : %s |
| | | SEVERE_ERR_SEARCHING_DOMAIN_BACKEND_74=An unexpected error occured when \ |
| | | searching for the backend of the domain : %s |
| | | SEVERE_ERR_LOADING_GENERATION_ID_75=An unexpected error occured when \ |
| | | searching in %s for the generation ID : %s |
| | | SEVERE_ERR_UPDATING_GENERATION_ID_76=An unexpected error %s occured \ |
| | | when updating generation ID for the domain : %s |
| | | NOTICE_BAD_GENERATION_ID_77=On suffix %s. server %s presented generation ID=%s \ |
| | | when expected generation ID=%s. Consequently, replication is degraded for that server |
| | | NOTICE_RESET_GENERATION_ID_78=The generation ID has been reset for domain %s.\ |
| | | Replication is now degraded for this domain |
| | | MILD_ERR_ERROR_MSG_RECEIVED_79=The following error has been received : <%s> |
| | | MILD_ERR_IGNORING_UPDATE_FROM_80=Update <%s> received from server <%s> is \ |
| | | ignored due to a bad generation ID of this server |
| | | MILD_ERR_IGNORING_UPDATE_TO_81=Update <%s> will not be sent to server %s that has \ |
| | | not the right generation ID |
| | | SEVERE_ERR_INIT_IMPORT_NOT_SUPPORTED_82= Initialization cannot be done because \ |
| | | import is not supported by the backend %s |
| | | SEVERE_ERR_INIT_EXPORT_NOT_SUPPORTED_83= Initialization cannot be done because \ |
| | | export is not supported by the backend %s |
| | | SEVERE_ERR_INIT_CANNOT_LOCK_BACKEND_84= Initialization cannot be done because \ |
| | | the following error occured while locking the backend %s : %s |
| | | NOTICE_EXCEPTION_RESTARTING_SESSION_85=Caught Exception during reinitialization of \ |
| | | communication on domain %s : %s |
| | | SEVERE_ERR_EXCEPTION_LISTENING_86=Replication server caught exception while \ |
| | | listening for client connections %s |
| | | SEVERE_ERR_ERROR_CLEARING_DB_87=While clearing the database %s , the following \ |
| | | error happened: %s |
| | | NOTICE_ERR_ROUTING_TO_SERVER_88=Protocol error : a replication server is not expected \ |
| | | to be the destination of a message of type %s |
| | |
| | | // The attribute type that will be used to save the synchronization state. |
| | | private AttributeType synchronizationStateType; |
| | | |
| | | // The attribute type that will be used to save the synchronization |
| | | // generationId. |
| | | private AttributeType synchronizationGenerationIdType; |
| | | |
| | | // The value containing DN of the user we'll say created the configuration. |
| | | private AttributeValue creatorsName; |
| | | |
| | |
| | | nameFormsType = DirectoryServer.getAttributeType(ATTR_NAME_FORMS_LC, true); |
| | | synchronizationStateType = |
| | | DirectoryServer.getAttributeType(ATTR_SYNCHRONIZATION_STATE_LC, true); |
| | | synchronizationGenerationIdType = |
| | | DirectoryServer.getAttributeType(ATTR_SYNCHRONIZATION_GENERATIONID_LC, |
| | | true); |
| | | |
| | | |
| | | // Initialize the lastmod attributes. |
| | |
| | | attrList.add(attr); |
| | | operationalAttrs.put(synchronizationStateType, attrList); |
| | | |
| | | // Add the synchronization GenerationId attribute. |
| | | valueSet = DirectoryServer.getSchema().getSynchronizationGenerationId(); |
| | | attr = new Attribute(synchronizationGenerationIdType, |
| | | ATTR_SYNCHRONIZATION_GENERATIONID_LC, valueSet); |
| | | attrList = new ArrayList<Attribute>(1); |
| | | attrList.add(attr); |
| | | operationalAttrs.put(synchronizationGenerationIdType, attrList); |
| | | |
| | | // Add all the user-defined attributes. |
| | | for (Attribute a : userDefinedAttributes) |
| | | { |
| | |
| | | public static final String ATTR_MATCHING_RULE_USE_LC = "matchingruleuse"; |
| | | |
| | | /** |
| | | * The name of the attribute that holds the sycnhronization state, |
| | | * The name of the attribute that holds the synchronization state, |
| | | * formatted in lowercase. |
| | | */ |
| | | public static final String ATTR_SYNCHRONIZATION_STATE_LC = "ds-sync-state"; |
| | | |
| | | /** |
| | | * The name of the attribute that holds the relication generationId, |
| | | * formatted in lowercase. |
| | | */ |
| | | public static final String ATTR_SYNCHRONIZATION_GENERATIONID_LC = |
| | | "ds-sync-generation-id"; |
| | | |
| | | /** |
| | | * The default maximum request size that should be used if none is specified |
| | | * in the configuration. |
| | | */ |
| | |
| | | NAME_PREFIX_TASK + "rebuild-max-threads"; |
| | | |
| | | /** |
| | | * The name of the objectclass that will be used for a Directory Server |
| | | * reset generationId task definition. |
| | | */ |
| | | public static final String OC_RESET_GENERATION_ID_TASK = |
| | | NAME_PREFIX_TASK + "reset-generation-id"; |
| | | |
| | | |
| | | /** |
| | | * The name of the attribute containing the baseDn related to the replication |
| | | * domain to which applies the task. |
| | | */ |
| | | public static final String ATTR_TASK_SET_GENERATION_ID_DOMAIN_DN = |
| | | OC_RESET_GENERATION_ID_TASK + "-domain-base-dn"; |
| | | |
| | | /** |
| | | * The name of the attribute in an import task definition that specifies |
| | | * whether the backend should be cleared before the import. |
| | | */ |
| | |
| | | new MatchingRuleUseSyntax()); |
| | | } |
| | | |
| | | AttributeType synchronizationGenerationIdType = |
| | | schema.getAttributeType(ATTR_SYNCHRONIZATION_GENERATIONID_LC); |
| | | if (synchronizationGenerationIdType == null) |
| | | { |
| | | synchronizationGenerationIdType = DirectoryServer.getDefaultAttributeType |
| | | (ATTR_SYNCHRONIZATION_GENERATIONID_LC, new MatchingRuleUseSyntax()); |
| | | } |
| | | |
| | | List<Attribute> synchronizationState = |
| | | entry.getAttribute(synchronizationStateType); |
| | | if (synchronizationState != null && !(synchronizationState.isEmpty())) |
| | |
| | | |
| | | /** |
| | | * ServerState class. |
| | | * This object is used to store the last update seem on this server |
| | | * This object is used to store the last update seen on this server |
| | | * from each server. |
| | | * It is exchanged with the replication servers at connection establishment |
| | | * time. |
| | |
| | | */ |
| | | public ArrayList<ASN1OctetString> toASN1ArrayList() |
| | | { |
| | | ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(); |
| | | ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(0); |
| | | |
| | | synchronized (this) |
| | | { |
| | |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | |
| | | import java.io.IOException; |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Heartbeat monitor is starting, expected interval is %d", |
| | | heartbeatInterval); |
| | | TRACER.debugInfo("Heartbeat monitor is starting, expected interval is " + |
| | | heartbeatInterval + |
| | | stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | try |
| | | { |
| | |
| | | long lastReceiveTime = session.getLastReceiveTime(); |
| | | if (now > lastReceiveTime + 2 * heartbeatInterval) |
| | | { |
| | | TRACER.debugInfo("Heartbeat monitor is closing the broker session " + |
| | | "because it could not detect a heartbeat."); |
| | | |
| | | // Heartbeat is well overdue so the server is assumed to be dead. |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Heartbeat monitor is exiting."); |
| | | TRACER.debugInfo("Heartbeat monitor is exiting." + |
| | | stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); |
| | | |
| | | if (values.size() == 0) |
| | | return ResultCode.SUCCESS; |
| | | |
| | | LDAPAttribute attr = |
| | | new LDAPAttribute(REPLICATION_STATE, values); |
| | | LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); |
| | |
| | | } |
| | | return op.getResultCode(); |
| | | } |
| | | |
| | | /** |
| | | * Empty the ServerState. |
| | | * After this call the Server State will be in the same state |
| | | * as if it was just created. |
| | | */ |
| | | public void clearInMemory() |
| | | { |
| | | super.clear(); |
| | | this.savedStatus = false; |
| | | } |
| | | |
| | | /** |
| | | * Empty the ServerState. |
| | | * After this call the Server State will be in the same state |
| | | * as if it was just created. |
| | | */ |
| | | public void clear() |
| | | { |
| | | clearInMemory(); |
| | | save(); |
| | | } |
| | | } |
| | |
| | | import java.io.IOException; |
| | | import java.io.OutputStream; |
| | | |
| | | |
| | | /** |
| | | * This class creates an output stream that can be used to export entries |
| | | * to a synchonization domain. |
| | |
| | | public class ReplLDIFOutputStream |
| | | extends OutputStream |
| | | { |
| | | // The synchronization domain on which the export is done |
| | | ReplicationDomain domain; |
| | | |
| | | // The number of entries to be exported |
| | | long numEntries; |
| | | |
| | | // The current number of entries exported |
| | | long numExportedEntries; |
| | | |
| | | String entryBuffer = ""; |
| | | |
| | | /** |
| | |
| | | * domain. |
| | | * |
| | | * @param domain The replication domain |
| | | * @param numEntries The max number of entry to process. |
| | | */ |
| | | public ReplLDIFOutputStream(ReplicationDomain domain) |
| | | public ReplLDIFOutputStream(ReplicationDomain domain, long numEntries) |
| | | { |
| | | this.domain = domain; |
| | | this.numEntries = numEntries; |
| | | } |
| | | |
| | | /** |
| | |
| | | endOfEntryIndex = ebytes.indexOf("\n\n"); |
| | | if ( endOfEntryIndex >= 0 ) |
| | | { |
| | | |
| | | endOfEntryIndex += 2; |
| | | entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex); |
| | | |
| | | // Send the entry |
| | | domain.sendEntryLines(entryBuffer); |
| | | if ((numEntries>0) && (numExportedEntries > numEntries)) |
| | | { |
| | | // This outputstream has reached the total number |
| | | // of entries to export. |
| | | return; |
| | | } |
| | | domain.exportLDIFEntry(entryBuffer); |
| | | numExportedEntries++; |
| | | |
| | | startOfEntryIndex = startOfEntryIndex + endOfEntryIndex; |
| | | entryBuffer = ""; |
| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | private int maxRcvWindow; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private long generationId = -1; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | |
| | | /** |
| | |
| | | * @param window The size of the send and receive window to use. |
| | | * @param heartbeatInterval The interval between heartbeats requested of the |
| | | * replicationServer, or zero if no heartbeats are requested. |
| | | * |
| | | * @param generationId The generationId for the server associated to the |
| | | * provided serverID and for the domain associated to the provided baseDN. |
| | | * @param replSessionSecurity The session security configuration. |
| | | */ |
| | | public ReplicationBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval, |
| | | ReplSessionSecurity replSessionSecurity) |
| | | long generationId, ReplSessionSecurity replSessionSecurity) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | |
| | | this.halfRcvWindow = window/2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | this.generationId = generationId; |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | } |
| | | |
| | |
| | | */ |
| | | private void connect() |
| | | { |
| | | ReplServerStartMessage startMsg; |
| | | ReplServerStartMessage replServerStartMsg; |
| | | |
| | | // Stop any existing heartbeat monitor from a previous session. |
| | | if (heartbeatMonitor != null) |
| | |
| | | heartbeatMonitor = null; |
| | | } |
| | | |
| | | // checkState is true for the first loop on all replication servers |
| | | // looking for one already up-to-date. |
| | | // If we found some responding replication servers but none up-to-date |
| | | // then we set check-state to false and do a second loop where the first |
| | | // found will be the one elected and then we will update this replication |
| | | // server. |
| | | boolean checkState = true; |
| | | boolean receivedResponse = true; |
| | | |
| | | // TODO: We are doing here 2 loops opening , closing , reopening session to |
| | | // the same servers .. risk to have 'same server id' erros. |
| | | // Would be better to do only one loop, keeping the best candidate while |
| | | // traversing the list of replication servers to connect to. |
| | | if (servers.size()==1) |
| | | { |
| | | checkState = false; |
| | | } |
| | | |
| | | synchronized (connectPhaseLock) |
| | | { |
| | | while ((!connected) && (!shutdown) && (receivedResponse)) |
| | |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow*2, heartbeatInterval, state, |
| | | protocolVersion, isSslEncryption); |
| | | protocolVersion, generationId, isSslEncryption); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ReplServerStartMessage) session.receive(); |
| | | replServerStartMsg = (ReplServerStartMessage) session.receive(); |
| | | receivedResponse = true; |
| | | |
| | | /* |
| | |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | startMsg.getVersion()); |
| | | replServerStartMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | if (!isSslEncryption) |
| | |
| | | * those changes and send them again to any replicationServer. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | startMsg.getServerState().getMaxChangeNumber(serverID); |
| | | replServerStartMsg.getServerState().getMaxChangeNumber(serverID); |
| | | if (replServerMaxChangeNumber == null) |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | ChangeNumber ourMaxChangeNumber = |
| | |
| | | (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | startHeartBeat(); |
| | | break; |
| | |
| | | * of our changes, we are going to try another server |
| | | * but before log a notice message |
| | | */ |
| | | Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server); |
| | | Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server, |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } |
| | | else |
| | |
| | | else |
| | | { |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage()); |
| | | Message message = NOTE_EXCEPTION_STARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } // for servers |
| | | |
| | | // We have traversed all the replication servers |
| | | |
| | | if ((!connected) && (checkState == true) && receivedResponse) |
| | | { |
| | |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any replicationServer. |
| | | */ |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(); |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | checkState = false; |
| | | } |
| | | } |
| | | |
| | | // We have traversed all the replication servers as many times as needed |
| | | // to find one if one is up and running. |
| | | |
| | | if (connected) |
| | | { |
| | | // This server has connected correctly. |
| | |
| | | /** |
| | | * restart the ReplicationBroker. |
| | | */ |
| | | private void reStart() |
| | | public void reStart() |
| | | { |
| | | reStart(null); |
| | | reStart(this.session); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param failingSession the socket which failed |
| | | */ |
| | | private void reStart(ProtocolSession failingSession) |
| | | public void reStart(ProtocolSession failingSession) |
| | | { |
| | | try |
| | | { |
| | |
| | | } catch (Exception e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage())); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | // choice than to return without sending the ReplicationMessage |
| | | // and relying on the resend procedure of the connect phase to |
| | | // fix the problem when we finally connect. |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() Publishing a " + |
| | | " message is not possible due to existing connection error."); |
| | | } |
| | | |
| | | return; |
| | | } |
| | | |
| | |
| | | } catch (InterruptedException e1) |
| | | { |
| | | // ignore |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() " + |
| | | "IO exception raised : " + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // just loop. |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() " + |
| | | "Interrupted exception raised." + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | if (!connected) |
| | | { |
| | | reStart(); |
| | | reStart(null); |
| | | } |
| | | |
| | | ProtocolSession failingSession = session; |
| | |
| | | Message message = |
| | | NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); |
| | | logError(message); |
| | | |
| | | debugInfo("ReplicationBroker.receive() " + baseDn + |
| | | " Exception raised." + e + e.getLocalizedMessage()); |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("ReplicationBroker Stop Closing session"); |
| | | debugInfo("ReplicationBroker is stopping. and will" + |
| | | "close the connection"); |
| | | } |
| | | |
| | | if (session != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the value of the generationId for that broker. Normally the |
| | | * generationId is set through the constructor but there are cases |
| | | * where the value of the generationId must be changed while the broker |
| | | * already exist for example after an on-line import. |
| | | * |
| | | * @param generationId The value of the generationId. |
| | | * |
| | | */ |
| | | public void setGenerationId(long generationId) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the replicationServer to which this broker is currently |
| | | * connected. |
| | | * |
| | |
| | | return !connectionError; |
| | | } |
| | | |
| | | private boolean debugEnabled() { return true; } |
| | | private static final void debugInfo(String s) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); |
| | | TRACER.debugInfo(s); |
| | | } |
| | | |
| | | /** |
| | | * Determine whether the connection to the replication server is encrypted. |
| | | * @return true if the connection is encrypted, false otherwise. |
| | |
| | | package org.opends.server.replication.plugin; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.util.StaticUtils.createEntry; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.HashSet; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.CheckedOutputStream; |
| | | import java.util.zip.DataFormatException; |
| | | import java.util.zip.Adler32; |
| | | import java.io.OutputStream; |
| | | |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*; |
| | | import org.opends.server.admin.std.server.MultimasterDomainCfg; |
| | | import org.opends.server.admin.std.server.BackendCfg; |
| | | import org.opends.server.api.AlertGenerator; |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPAttribute; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.protocols.ldap.LDAPModification; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.ServerState; |
| | |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.Control; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DereferencePolicy; |
| | | import org.opends.server.types.DirectoryException; |
| | |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.RDN; |
| | | import org.opends.server.types.RawModification; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchFilter; |
| | | import org.opends.server.types.SearchResultEntry; |
| | |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | private long generationId = -1; |
| | | private long rejectedGenerationId = -1; |
| | | private boolean requestedResetSinceLastStart = false; |
| | | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | |
| | | private int window = 100; |
| | | |
| | | /** |
| | | * The isoalation policy that this domain is going to use. |
| | | * The isolation policy that this domain is going to use. |
| | | * This field describes the behavior of the domain when an update is |
| | | * attempted and the domain could not connect to any Replication Server. |
| | | * Possible values are accept-updates or deny-updates, but other values |
| | |
| | | |
| | | // The total entry count expected to be processed |
| | | long entryCount = 0; |
| | | // The count for the entry left to be processed |
| | | // The count for the entry not yet processed |
| | | long entryLeftCount = 0; |
| | | |
| | | boolean checksumOutput = false; |
| | | |
| | | // The exception raised when any |
| | | DirectoryException exception = null; |
| | | long checksumOutputValue = (long)0; |
| | | |
| | | /** |
| | | * Initializes the counters of the task with the provider value. |
| | | * Initializes the import/export counters with the provider value. |
| | | * @param count The value with which to initialize the counters. |
| | | */ |
| | | public void initTaskCounters(long count) |
| | | public void initImportExportCounters(long count) |
| | | { |
| | | entryCount = count; |
| | | entryLeftCount = count; |
| | |
| | | * Update the counters of the task for each entry processed during |
| | | * an import or export. |
| | | */ |
| | | public void updateTaskCounters() |
| | | public void updateCounters() |
| | | { |
| | | entryLeftCount--; |
| | | |
| | |
| | | configDn = configuration.dn(); |
| | | |
| | | /* |
| | | * Modify conflicts are solved for all suffixes but the schema suffix |
| | | * because we don't want to store extra information in the schema |
| | | * ldif files. |
| | | * This has no negative impact because the changes on schema should |
| | | * not produce conflicts. |
| | | */ |
| | | * Modify conflicts are solved for all suffixes but the schema suffix |
| | | * because we don't want to store extra information in the schema |
| | | * ldif files. |
| | | * This has no negative impact because the changes on schema should |
| | | * not produce conflicts. |
| | | */ |
| | | if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0) |
| | | { |
| | | solveConflictFlag = false; |
| | |
| | | monitor = new ReplicationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | backend = retrievesBackend(baseDN); |
| | | if (backend == null) |
| | | { |
| | | throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get( |
| | | baseDN.toNormalizedString())); |
| | | } |
| | | |
| | | try |
| | | { |
| | | generationId = loadGenerationId(); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | } |
| | | |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | heartbeatInterval, new ReplSessionSecurity(configuration)); |
| | | heartbeatInterval, generationId, |
| | | new ReplSessionSecurity(configuration)); |
| | | |
| | | broker.start(replicationServers); |
| | | |
| | | // Retrieves the related backend and its config entry |
| | | try |
| | | { |
| | | retrievesBackendInfos(baseDN); |
| | | } catch (DirectoryException e) |
| | | { |
| | | // The backend associated to this suffix is not able to |
| | | // perform export and import. |
| | | // The replication can continue but this replicationDomain |
| | | // won't be able to use total update. |
| | | } |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | |
| | | * If not set the ResultCode and the response message, |
| | | * interrupt the operation, and return false |
| | | * |
| | | * @param op The Operation that needs to be checked. |
| | | * @param Operation The Operation that needs to be checked. |
| | | * |
| | | * @return true when it OK to process the Operation, false otherwise. |
| | | * When false is returned the resultCode and the reponse message |
| | |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | log(Message.raw("Broker received message :" + msg)); |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMessage)) |
| | | TRACER.debugInfo("Message received <" + msg + ">"); |
| | | |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | initMsg = (InitializeRequestMessage) msg; |
| | | initMsg = (InitializeRequestMessage)msg; |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | importBackend(importMsg); |
| | | initialize(importMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(de.getMessageObject()); |
| | | mb.append("Backend ID: "); |
| | | mb.append(backend.getBackendID()); |
| | | log(mb.toMessage()); |
| | | |
| | | // Return an error message to notify the sender |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | TRACER.debugInfo(Message.toString(mb.toMessage())); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | else |
| | | { |
| | | /* We can receive an error message from the replication server |
| | | * in the following cases : |
| | | * - we connected with an incorrect generation id |
| | | */ |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | |
| | | if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId()) |
| | | { |
| | | TRACER.debugInfo("requestedResetSinceLastStart=" + |
| | | requestedResetSinceLastStart + |
| | | "rejectedGenerationId=" + rejectedGenerationId); |
| | | |
| | | if (requestedResetSinceLastStart && (rejectedGenerationId>0)) |
| | | { |
| | | // When the last generation presented was refused and we are |
| | | // the 'reseter' server then restart automatically to become |
| | | // the 'master' |
| | | state.clear(); |
| | | rejectedGenerationId = -1; |
| | | requestedResetSinceLastStart = false; |
| | | broker.stop(); |
| | | broker.start(replicationServers); |
| | | } |
| | | } |
| | | if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId()) |
| | | { |
| | | rejectedGenerationId = generationId; |
| | | } |
| | | } |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | null); |
| | | } |
| | | catch(DirectoryException de) |
| | |
| | | public void disable() |
| | | { |
| | | state.save(); |
| | | state.clear(); |
| | | state.clearInMemory(); |
| | | disabled = true; |
| | | // stop the listener threads |
| | | for (ListenerThread thread : synchroThreads) |
| | |
| | | * The domain will connect back to a replication Server and |
| | | * will recreate threads to listen for messages from the Sycnhronization |
| | | * server. |
| | | * The generationId will be retrieved or computed if necessary. |
| | | * The ServerState will also be read again from the local database. |
| | | */ |
| | | public void enable() |
| | | { |
| | | state.clear(); |
| | | state.clearInMemory(); |
| | | state.loadState(); |
| | | disabled = false; |
| | | |
| | | |
| | | try |
| | | { |
| | | generationId = loadGenerationId(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | /* TODO should mark that replicationServer service is |
| | | * not available, log an error and retry upon timeout |
| | | * should we stop the modifications ? |
| | | */ |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | return; |
| | | } |
| | | |
| | | // After an on-line import, the value of the generationId is new |
| | | // and it is necessary for the broker to send this new value as part |
| | | // of the serverStart message. |
| | | broker.setGenerationId(generationId); |
| | | |
| | | broker.start(replicationServers); |
| | | |
| | | createListeners(); |
| | | } |
| | | |
| | | /** |
| | | * Compute the data generationId associated with the current data present |
| | | * in the backend for this domain. |
| | | * @return The computed generationId. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | public long computeGenerationId() throws DirectoryException |
| | | { |
| | | long bec = backend.getEntryCount(); |
| | | if (bec<0) |
| | | backend = this.retrievesBackend(baseDN); |
| | | bec = backend.getEntryCount(); |
| | | this.acquireIEContext(); |
| | | ieContext.checksumOutput = true; |
| | | ieContext.entryCount = (bec<1000?bec:1000); |
| | | ieContext.entryLeftCount = ieContext.entryCount; |
| | | exportBackend(); |
| | | long genId = ieContext.checksumOutputValue; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Computed generationId: #entries=" + bec + |
| | | " generationId=" + ieContext.checksumOutputValue); |
| | | ieContext.checksumOutput = false; |
| | | this.releaseIEContext(); |
| | | return genId; |
| | | } |
| | | |
| | | /** |
| | | * Returns the generationId set for this domain. |
| | | * |
| | | * @return The generationId. |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * The attribute name used to store the state in the backend. |
| | | */ |
| | | protected static final String REPLICATION_GENERATION_ID = |
| | | "ds-sync-generation-id"; |
| | | |
| | | /** |
| | | * Stores the value of the generationId. |
| | | * @param generationId The value of the generationId. |
| | | * @return a ResultCode indicating if the method was successfull. |
| | | */ |
| | | public ResultCode saveGenerationId(long generationId) |
| | | { |
| | | // The generationId is stored in the root entry of the domain. |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); |
| | | |
| | | ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(); |
| | | ASN1OctetString value = new ASN1OctetString(Long.toString(generationId)); |
| | | values.add(value); |
| | | |
| | | LDAPAttribute attr = |
| | | new LDAPAttribute(REPLICATION_GENERATION_ID, values); |
| | | LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); |
| | | ArrayList<RawModification> mods = new ArrayList<RawModification>(1); |
| | | mods.add(mod); |
| | | |
| | | ModifyOperationBasis op = |
| | | new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), |
| | | new ArrayList<Control>(0), asn1BaseDn, |
| | | mods); |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | | op.setDontSynchronize(true); |
| | | |
| | | op.run(); |
| | | |
| | | ResultCode result = op.getResultCode(); |
| | | if (result != ResultCode.SUCCESS) |
| | | { |
| | | Message message = ERR_UPDATING_GENERATION_ID.get( |
| | | op.getResultCode().getResultCodeName() + " " + |
| | | op.getErrorMessage(), |
| | | baseDN.toString()); |
| | | logError(message); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Load the GenerationId from the root entry of the domain |
| | | * from the REPLICATION_GENERATION_ID attribute in database |
| | | * to memory, or compute it if not found. |
| | | * |
| | | * @return generationId The retrieved value of generationId |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | public long loadGenerationId() |
| | | throws DirectoryException |
| | | { |
| | | long generationId=-1; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Attempt to read generation ID from DB " + baseDN.toString()); |
| | | |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); |
| | | boolean found = false; |
| | | LDAPFilter filter; |
| | | try |
| | | { |
| | | filter = LDAPFilter.decode("objectclass=*"); |
| | | } |
| | | catch (LDAPException e) |
| | | { |
| | | // can not happen |
| | | return -1; |
| | | } |
| | | |
| | | /* |
| | | * Search the database entry that is used to periodically |
| | | * save the ServerState |
| | | */ |
| | | InternalSearchOperation search = null; |
| | | LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); |
| | | attributes.add(REPLICATION_GENERATION_ID); |
| | | search = conn.processSearch(asn1BaseDn, |
| | | SearchScope.BASE_OBJECT, |
| | | DereferencePolicy.DEREF_ALWAYS, 0, 0, false, |
| | | filter,attributes); |
| | | if (((search.getResultCode() != ResultCode.SUCCESS)) && |
| | | ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) |
| | | { |
| | | Message message = ERR_SEARCHING_GENERATION_ID.get( |
| | | search.getResultCode().getResultCodeName() + " " + |
| | | search.getErrorMessage(), |
| | | baseDN.toString()); |
| | | logError(message); |
| | | } |
| | | |
| | | SearchResultEntry resultEntry = null; |
| | | if (search.getResultCode() == ResultCode.SUCCESS) |
| | | { |
| | | LinkedList<SearchResultEntry> result = search.getSearchEntries(); |
| | | resultEntry = result.getFirst(); |
| | | if (resultEntry != null) |
| | | { |
| | | AttributeType synchronizationGenIDType = |
| | | DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); |
| | | List<Attribute> attrs = |
| | | resultEntry.getAttribute(synchronizationGenIDType); |
| | | if (attrs != null) |
| | | { |
| | | Attribute attr = attrs.get(0); |
| | | LinkedHashSet<AttributeValue> values = attr.getValues(); |
| | | if (values.size()!=1) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toString(), "#Values != 1"); |
| | | logError(message); |
| | | } |
| | | else |
| | | { |
| | | found=true; |
| | | try |
| | | { |
| | | generationId = Long.decode(values.iterator().next(). |
| | | getStringValue()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toString(), e.getLocalizedMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (!found) |
| | | { |
| | | generationId = computeGenerationId(); |
| | | saveGenerationId(generationId); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Generation ID created for domain base DN=" + |
| | | baseDN.toString() + |
| | | " generationId=" + generationId); |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Generation ID successfully read from domain base DN=" + baseDN + |
| | | " generationId=" + generationId); |
| | | } |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Reset the generationId of this domain in the whole topology. |
| | | * A message is sent to the Replication Servers for them to reset |
| | | * their change dbs. |
| | | */ |
| | | public void resetGenerationId() |
| | | { |
| | | requestedResetSinceLastStart = true; |
| | | ResetGenerationId genIdMessage = new ResetGenerationId(); |
| | | broker.publish(genIdMessage); |
| | | } |
| | | |
| | | /** |
| | | * Do whatever is needed when a backup is started. |
| | | * We need to make sure that the serverState is correclty save. |
| | | */ |
| | |
| | | { |
| | | msg = broker.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Import: EntryBytes received " + msg); |
| | | if (msg == null) |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | log(Message.raw("receiveEntryBytes: received " + msg)); |
| | | |
| | | if (msg instanceof EntryMessage) |
| | | { |
| | | // FIXME |
| | | EntryMessage entryMsg = (EntryMessage)msg; |
| | | byte[] entryBytes = entryMsg.getEntryBytes().clone(); |
| | | ieContext.updateTaskCounters(); |
| | | ieContext.updateCounters(); |
| | | return entryBytes; |
| | | } |
| | | else if (msg instanceof DoneMessage) |
| | |
| | | // The error is stored and the import is ended |
| | | // by returning null |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | ieContext.exception = new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails()); |
| | | ieContext.exception = new DirectoryException( |
| | | ResultCode.OTHER, |
| | | errorMsg.getDetails()); |
| | | return null; |
| | | } |
| | | else |
| | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // TODO: i18n |
| | | ieContext.exception = new DirectoryException(ResultCode.OTHER, |
| | | Message.raw("received an unexpected message type"), e); |
| | | return null; |
| | | Message.raw("received an unexpected message type" + |
| | | e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Log debug message. |
| | | * @param message The message to log. |
| | | * Export the entries from the backend. |
| | | * The ieContext must have been set before calling. |
| | | * |
| | | * @throws DirectoryException when an error occured |
| | | */ |
| | | private void log(Message message) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("DebugInfo" + message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Export the entries. |
| | | * @throws DirectoryException when an error occurred |
| | | */ |
| | | protected void exportBackend() throws DirectoryException |
| | | protected void exportBackend() |
| | | throws DirectoryException |
| | | { |
| | | // FIXME Temporary workaround - will probably be fixed when implementing |
| | | // dynamic config |
| | | retrievesBackendInfos(this.baseDN); |
| | | backend = retrievesBackend(this.baseDN); |
| | | |
| | | // Acquire a shared lock for the backend. |
| | | try |
| | |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | |
| | | ReplLDIFOutputStream os = new ReplLDIFOutputStream(this); |
| | | OutputStream os; |
| | | ReplLDIFOutputStream ros; |
| | | |
| | | if (ieContext.checksumOutput) |
| | | { |
| | | ros = new ReplLDIFOutputStream(this, ieContext.entryCount); |
| | | os = new CheckedOutputStream(ros, new Adler32()); |
| | | try |
| | | { |
| | | os.write((Long.toString(ieContext.entryCount)).getBytes()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // Should never happen |
| | | } |
| | | } |
| | | else |
| | | { |
| | | ros = new ReplLDIFOutputStream(this, (short)-1); |
| | | os = ros; |
| | | } |
| | | LDIFExportConfig exportConfig = new LDIFExportConfig(os); |
| | | |
| | | // baseDN branch is the only one included in the export |
| | | List<DN> includeBranches = new ArrayList<DN>(1); |
| | | includeBranches.add(this.baseDN); |
| | | exportConfig.setIncludeBranches(includeBranches); |
| | | |
| | | // For the checksum computing mode, only consider the 'stable' attributes |
| | | if (ieContext.checksumOutput) |
| | | { |
| | | String includeAttributeStrings[] = |
| | | {"objectclass", "sn", "cn", "entryuuid"}; |
| | | HashSet<AttributeType> includeAttributes; |
| | | includeAttributes = new HashSet<AttributeType>(); |
| | | for (String attrName : includeAttributeStrings) |
| | | { |
| | | AttributeType attrType = DirectoryServer.getAttributeType(attrName); |
| | | if (attrType == null) |
| | | { |
| | | attrType = DirectoryServer.getDefaultAttributeType(attrName); |
| | | } |
| | | includeAttributes.add(attrType); |
| | | } |
| | | exportConfig.setIncludeAttributes(includeAttributes); |
| | | } |
| | | |
| | | // Launch the export. |
| | | try |
| | | { |
| | |
| | | } |
| | | finally |
| | | { |
| | | // Clean up after the export by closing the export config. |
| | | exportConfig.close(); |
| | | |
| | | if ((ieContext != null) && (ieContext.checksumOutput)) |
| | | { |
| | | ieContext.checksumOutputValue = |
| | | ((CheckedOutputStream)os).getChecksum().getValue(); |
| | | } |
| | | else |
| | | { |
| | | // Clean up after the export by closing the export config. |
| | | // Will also flush the export and export the remaining entries. |
| | | // This is a real export where writer has been initialized. |
| | | exportConfig.close(); |
| | | } |
| | | |
| | | // Release the shared lock on the backend. |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the backend object related to the domain and the backend's |
| | | * config entry. They will be used for import and export. |
| | | * TODO This should be in a shared package rather than here. |
| | | * Retrieves the backend related to the domain. |
| | | * |
| | | * @return The backend of that domain. |
| | | * @param baseDN The baseDN to retrieve the backend |
| | | * @throws DirectoryException when an error occired |
| | | */ |
| | | protected void retrievesBackendInfos(DN baseDN) throws DirectoryException |
| | | protected Backend retrievesBackend(DN baseDN) |
| | | { |
| | | // Retrieves the backend related to this domain |
| | | Backend domainBackend = DirectoryServer.getBackend(baseDN); |
| | | if (domainBackend == null) |
| | | { |
| | | Message message = ERR_CANNOT_DECODE_BASE_DN.get(DN_BACKEND_BASE, ""); |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | |
| | | // Retrieves its configuration |
| | | BackendCfg backendCfg = TaskUtils.getConfigEntry(domainBackend); |
| | | if (backendCfg == null) |
| | | { |
| | | Message message = |
| | | ERR_LDIFIMPORT_NO_BACKENDS_FOR_ID.get(); |
| | | logError(message); |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | |
| | | this.backend = domainBackend; |
| | | if (! domainBackend.supportsLDIFImport()) |
| | | { |
| | | Message message = ERR_LDIFIMPORT_CANNOT_IMPORT.get( |
| | | String.valueOf(baseDN)); |
| | | logError(message); |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | return DirectoryServer.getBackend(baseDN); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Sends lDIFEntry entry lines to the export target currently set. |
| | | * Exports an entry in LDIF format. |
| | | * |
| | | * @param lDIFEntry The lines for the LDIF entry. |
| | | * @param lDIFEntry The entry to be exported.. |
| | | * |
| | | * @throws IOException when an error occurred. |
| | | */ |
| | | public void sendEntryLines(String lDIFEntry) throws IOException |
| | | public void exportLDIFEntry(String lDIFEntry) throws IOException |
| | | { |
| | | // If an error was raised - like receiving an ErrorMessage |
| | | // we just let down the export. |
| | |
| | | throw ioe; |
| | | } |
| | | |
| | | // new entry then send the current one |
| | | EntryMessage entryMessage = new EntryMessage( |
| | | if (ieContext.checksumOutput == false) |
| | | { |
| | | // Actually send the entry |
| | | EntryMessage entryMessage = new EntryMessage( |
| | | serverId, ieContext.exportTarget, lDIFEntry.getBytes()); |
| | | broker.publish(entryMessage); |
| | | |
| | | ieContext.updateTaskCounters(); |
| | | broker.publish(entryMessage); |
| | | } |
| | | ieContext.updateCounters(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * and should be updated of its progress. |
| | | * @throws DirectoryException when an error occurs |
| | | */ |
| | | public void initialize(short source, Task initTask) |
| | | public void initializeFromRemote(short source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | // TRACER.debugInfo("Entering initializeFromRemote"); |
| | | |
| | | acquireIEContext(); |
| | | ieContext.initializeTask = initTask; |
| | | |
| | |
| | | /** |
| | | * Verifies that the given string represents a valid source |
| | | * from which this server can be initialized. |
| | | * @param sourceString The string representaing the source |
| | | * @param sourceString The string representing the source |
| | | * @return The source as a short value |
| | | * @throws DirectoryException if the string is not valid |
| | | */ |
| | | public short decodeSource(String sourceString) |
| | | throws DirectoryException |
| | | { |
| | | TRACER.debugInfo("Entering decodeSource"); |
| | | short source = 0; |
| | | Throwable cause = null; |
| | | try |
| | |
| | | // TODO Verifies serverID is in the domain |
| | | // We shold check here that this is a server implied |
| | | // in the current domain. |
| | | |
| | | log(Message.raw("Source decoded for import:" + source)); |
| | | return source; |
| | | } |
| | | } |
| | |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_INVALID_IMPORT_SOURCE.get(); |
| | | if (cause != null) |
| | | { |
| | | throw new DirectoryException( |
| | | resultCode, message, cause); |
| | | } |
| | | else |
| | | { |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param target The target that should be initialized |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeTarget(short target, Task initTask) |
| | | public void initializeRemote(short target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | initializeTarget(target, serverId, initTask); |
| | | initializeRemote(target, serverId, initTask); |
| | | } |
| | | |
| | | /** |
| | | * Process the initialization of some other server or servers in the topology |
| | | * specified by the target argument when this initialization has been |
| | | * initiated by another server than this one. |
| | | * specified by the target argument when this initialization specifying the |
| | | * server that requests the initialization. |
| | | * |
| | | * @param target The target that should be initialized. |
| | | * @param requestorID The server that initiated the export. |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeTarget(short target, short requestorID, Task initTask) |
| | | public void initializeRemote(short target, short requestorID, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | // FIXME Temporary workaround - will probably be fixed when implementing |
| | | // dynamic config |
| | | retrievesBackendInfos(this.baseDN); |
| | | |
| | | acquireIEContext(); |
| | | |
| | | ieContext.exportTarget = target; |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.initTaskCounters(backend.getEntryCount()); |
| | | } |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMessage initializeMessage = new InitializeTargetMessage( |
| | | baseDN, serverId, ieContext.exportTarget, requestorID, |
| | | backend.getEntryCount()); |
| | | |
| | | log(Message.raw("SD : publishes " + initializeMessage + |
| | | " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount)); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | | try |
| | | { |
| | | // FIXME Temporary workaround - will probably be fixed when implementing |
| | | // dynamic config |
| | | backend = retrievesBackend(this.baseDN); |
| | | |
| | | if (!backend.supportsLDIFExport()) |
| | | { |
| | | Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get( |
| | | backend.getBackendID().toString()); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | acquireIEContext(); |
| | | |
| | | ieContext.exportTarget = target; |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.initImportExportCounters(backend.getEntryCount()); |
| | | } |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMessage initializeMessage = new InitializeTargetMessage( |
| | | baseDN, serverId, ieContext.exportTarget, requestorID, |
| | | backend.getEntryCount()); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | | exportBackend(); |
| | | |
| | | // Notify the peer of the success |
| | | DoneMessage doneMsg = new DoneMessage(serverId, |
| | | initializeMessage.getDestination()); |
| | | initializeMessage.getDestination()); |
| | | broker.publish(doneMsg); |
| | | |
| | | releaseIEContext(); |
| | |
| | | catch(DirectoryException de) |
| | | { |
| | | // Notify the peer of the failure |
| | | ErrorMessage errorMsg = new ErrorMessage(target, de.getMessageObject()); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(target, |
| | | de.getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | |
| | | releaseIEContext(); |
| | |
| | | StringBuilder failureReason = new StringBuilder(); |
| | | if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) |
| | | { |
| | | Message message = ERR_LDIFIMPORT_CANNOT_LOCK_BACKEND.get( |
| | | backend.getBackendID(), String.valueOf(failureReason)); |
| | | Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get( |
| | | backend.getBackendID(), |
| | | String.valueOf(failureReason)); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | |
| | | * @param initializeMessage The message that initiated the import. |
| | | * @exception DirectoryException Thrown when an error occurs. |
| | | */ |
| | | protected void importBackend(InitializeTargetMessage initializeMessage) |
| | | protected void initialize(InitializeTargetMessage initializeMessage) |
| | | throws DirectoryException |
| | | { |
| | | LDIFImportConfig importConfig = null; |
| | | DirectoryException de = null; |
| | | |
| | | if (!backend.supportsLDIFImport()) |
| | | { |
| | | Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get( |
| | | backend.getBackendID().toString()); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | try |
| | | { |
| | | log(Message.raw("startImport")); |
| | | |
| | | if (initializeMessage.getRequestorID() == serverId) |
| | | { |
| | | // The import responds to a request we did so the IEContext |
| | |
| | | |
| | | ieContext.importSource = initializeMessage.getsenderID(); |
| | | ieContext.entryLeftCount = initializeMessage.getEntryCount(); |
| | | ieContext.initTaskCounters(initializeMessage.getEntryCount()); |
| | | ieContext.initImportExportCounters(initializeMessage.getEntryCount()); |
| | | |
| | | preBackendImport(this.backend); |
| | | |
| | |
| | | // Process import |
| | | this.backend.importLDIF(importConfig); |
| | | |
| | | TRACER.debugInfo("The import has ended successfully."); |
| | | stateSavingDisabled = false; |
| | | |
| | | // Re-exchange state with SS |
| | | broker.stop(); |
| | | broker.start(replicationServers); |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | DirectoryException de = |
| | | new DirectoryException( |
| | | ResultCode.OTHER, Message.raw(e.getLocalizedMessage())); |
| | | ieContext.exception = de; |
| | | throw (de); |
| | | de = new DirectoryException(ResultCode.OTHER, |
| | | Message.raw(e.getLocalizedMessage())); |
| | | } |
| | | finally |
| | | { |
| | |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | setState(ieContext.updateTaskCompletionState(),ieContext.exception); |
| | | } |
| | | |
| | | releaseIEContext(); |
| | | |
| | | log(Message.raw("End importBackend")); |
| | | // Retrieves the generation ID associated with the data imported |
| | | try |
| | | { |
| | | generationId = loadGenerationId(); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), |
| | | e.getLocalizedMessage())); |
| | | } |
| | | rejectedGenerationId = -1; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "After import, the replication plugin restarts connections" + |
| | | " to all RSs to provide new generation ID=" + generationId); |
| | | broker.setGenerationId(generationId); |
| | | |
| | | // Re-exchange generationID and state with RS |
| | | broker.reStart(); |
| | | } |
| | | // Success |
| | | // Sends up the root error. |
| | | if (de != null) |
| | | throw de; |
| | | } |
| | | |
| | | /** |
| | |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | // FIXME setBackendEnabled should be part taskUtils ? |
| | | TaskUtils.enableBackend(backend.getBackendID()); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if the domain is connected to a ReplicationServer. |
| | | * |
| | | * @return true if the server is connected, false if not. |
| | | */ |
| | | public boolean isConnected() |
| | | { |
| | | return broker.isConnected(); |
| | | } |
| | | |
| | | /** |
| | | * Determine whether the connection to the replication server is encrypted. |
| | | * @return true if the connection is encrypted, false otherwise. |
| | | */ |
| | |
| | | attributes.add(attr); |
| | | |
| | | attributes.add(new Attribute("ssl-encryption", |
| | | String.valueOf(domain.isSessionEncrypted()))); |
| | | String.valueOf(domain.isSessionEncrypted()))); |
| | | |
| | | attributes.add(new Attribute("generation-id", |
| | | String.valueOf(domain.getGenerationId()))); |
| | | |
| | | return attributes; |
| | | |
| | |
| | | package org.opends.server.replication.protocol; |
| | | import org.opends.messages.Message; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | |
| | | /** |
| | | * This message is part of the replication protocol. |
| | | * This message is sent by a server or a replication server when an error |
| | |
| | | { |
| | | private static final long serialVersionUID = 2726389860247088266L; |
| | | |
| | | // The tracer object for the debug logger |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | // Specifies the messageID built form the error that was detected |
| | | private int msgID; |
| | | |
| | |
| | | private Message details = null; |
| | | |
| | | /** |
| | | * Create a InitializeMessage. |
| | | * Creates an ErrorMessage providing the destination server. |
| | | * |
| | | * @param sender The server ID of the server that send this message. |
| | | * @param destination The destination server or servers of this message. |
| | | * @param details The details of the error. |
| | | * @param details The message containing the details of the error. |
| | | */ |
| | | public ErrorMessage(short sender, short destination, |
| | | Message details) |
| | |
| | | super(sender, destination); |
| | | this.msgID = details.getDescriptor().getId(); |
| | | this.details = details; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" Creating error message" + this.toString()); |
| | | } |
| | | |
| | | /** |
| | | * Create a InitializeMessage. |
| | | * Creates an ErrorMessage. |
| | | * |
| | | * @param destination replication server id |
| | | * @param details details of the error |
| | |
| | | super((short)-2, destination); |
| | | this.msgID = details.getDescriptor().getId(); |
| | | this.details = details; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this.toString()); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new InitializeMessage by decoding the provided byte array. |
| | | * @param in A byte array containing the encoded information for the Message |
| | | * Creates a new ErrorMessage by decoding the provided byte array. |
| | | * |
| | | * @param in A byte array containing the encoded information for the Message |
| | | * @throws DataFormatException If the in does not contain a properly |
| | | * encoded InitializeMessage. |
| | | * encoded message. |
| | | */ |
| | | public ErrorMessage(byte[] in) throws DataFormatException |
| | | { |
| | |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a string representation of the message. |
| | | * |
| | | * @return the string representation of this message. |
| | | */ |
| | | public String toString() |
| | | { |
| | | return "ErrorMessage=["+ |
| | | " sender=" + this.senderID + |
| | | " destination=" + this.destination + |
| | | " msgID=" + this.msgID + |
| | | " details=" + this.details + "]"; |
| | | } |
| | | } |
| | |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get a string representation of this object. |
| | | * @return A string representation of this object. |
| | | */ |
| | | public String toString() |
| | | { |
| | | return "InitializeRequestMessage: baseDn="+baseDn+" senderId="+senderID + |
| | | " destination=" + destination; |
| | | } |
| | | } |
| | |
| | | public class ReplServerInfoMessage extends ReplicationMessage |
| | | { |
| | | private List<String> connectedServers = null; |
| | | private long generationId; |
| | | |
| | | /** |
| | | * Creates a new changelogInfo message from its encoded form. |
| | |
| | | throw new DataFormatException( |
| | | "Input is not a valid changelogInfo Message."); |
| | | |
| | | connectedServers = new ArrayList<String>(); |
| | | int pos = 1; |
| | | |
| | | /* read the generationId */ |
| | | int length = getNextLength(in, pos); |
| | | generationId = Long.valueOf(new String(in, pos, length, |
| | | "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* read the connected servers */ |
| | | connectedServers = new ArrayList<String>(); |
| | | while (pos < in.length) |
| | | { |
| | | /* |
| | | * Read the next server ID |
| | | * first calculate the length then construct the string |
| | | */ |
| | | int length = getNextLength(in, pos); |
| | | length = getNextLength(in, pos); |
| | | connectedServers.add(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | } |
| | |
| | | * connected servers. |
| | | * |
| | | * @param connectedServers The list of currently connected servers ID. |
| | | * @param generationId The generationId currently associated with this |
| | | * domain. |
| | | */ |
| | | public ReplServerInfoMessage(List<String> connectedServers) |
| | | public ReplServerInfoMessage(List<String> connectedServers, |
| | | long generationId) |
| | | { |
| | | this.connectedServers = connectedServers; |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /* Put the message type */ |
| | | oStream.write(MSG_TYPE_REPL_SERVER_INFO); |
| | | |
| | | // Put the generationId |
| | | oStream.write(String.valueOf(generationId).getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | |
| | | // Put the servers |
| | | if (connectedServers.size() >= 1) |
| | | { |
| | | for (String server : connectedServers) |
| | |
| | | oStream.write(0); |
| | | } |
| | | } |
| | | |
| | | return oStream.toByteArray(); |
| | | } |
| | | catch (IOException e) |
| | |
| | | { |
| | | return connectedServers; |
| | | } |
| | | |
| | | /** |
| | | * Get the generationId from this message. |
| | | * @return The generationId. |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String csrvs = ""; |
| | | for (String s : connectedServers) |
| | | { |
| | | csrvs += s + "/"; |
| | | } |
| | | return ("ReplServerInfoMessage: genId=" + getGenerationId() + |
| | | " Connected peers:" + csrvs); |
| | | } |
| | | |
| | | } |
| | |
| | | * @param windowSize The window size. |
| | | * @param serverState our ServerState for this baseDn. |
| | | * @param protocolVersion The replication protocol version of the creator. |
| | | * @param generationId The generationId for this server. |
| | | * @param sslEncryption Whether to continue using SSL to encrypt messages |
| | | * after the start messages have been exchanged. |
| | | */ |
| | |
| | | int windowSize, |
| | | ServerState serverState, |
| | | short protocolVersion, |
| | | long generationId, |
| | | boolean sslEncryption) |
| | | { |
| | | super(protocolVersion); |
| | | super(protocolVersion, generationId); |
| | | this.serverId = serverId; |
| | | this.serverURL = serverURL; |
| | | if (baseDn != null) |
| | |
| | | static final byte MSG_TYPE_ERROR = 14; |
| | | static final byte MSG_TYPE_WINDOW_PROBE = 15; |
| | | static final byte MSG_TYPE_REPL_SERVER_INFO = 16; |
| | | static final byte MSG_TYPE_RESET_GENERATION_ID = 17; |
| | | |
| | | // Adding a new type of message here probably requires to |
| | | // change accordingly generateMsg method below |
| | | |
| | |
| | | * MSG_TYPE_ERROR |
| | | * MSG_TYPE_WINDOW_PROBE |
| | | * MSG_TYPE_REPL_SERVER_INFO |
| | | * MSG_TYPE_RESET_GENERATION_ID |
| | | * |
| | | * @return the byte[] representation of this message. |
| | | * @throws UnsupportedEncodingException When the encoding of the message |
| | |
| | | case MSG_TYPE_ERROR: |
| | | msg = new ErrorMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_RESET_GENERATION_ID: |
| | | msg = new ResetGenerationId(buffer); |
| | | break; |
| | | case MSG_TYPE_WINDOW_PROBE: |
| | | msg = new WindowProbe(buffer); |
| | | break; |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.Serializable; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | |
| | | /** |
| | | * This message is used by an LDAP server to communicate to the topology |
| | | * that the generation must be reset for the domain. |
| | | */ |
| | | public class ResetGenerationId extends ReplicationMessage implements |
| | | Serializable |
| | | { |
| | | private static final long serialVersionUID = 7657049716115572226L; |
| | | |
| | | |
| | | /** |
| | | * Creates a new message. |
| | | */ |
| | | public ResetGenerationId() |
| | | { |
| | | } |
| | | |
| | | /** |
| | | * Creates a new GenerationIdMessage from its encoded form. |
| | | * |
| | | * @param in The byte array containing the encoded form of the |
| | | * WindowMessage. |
| | | * @throws DataFormatException If the byte array does not contain a valid |
| | | * encoded form of the WindowMessage. |
| | | */ |
| | | public ResetGenerationId(byte[] in) throws DataFormatException |
| | | { |
| | | if (in[0] != MSG_TYPE_RESET_GENERATION_ID) |
| | | throw new |
| | | DataFormatException("input is not a valid GenerationId Message"); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | int length = 1; |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_RESET_GENERATION_ID; |
| | | return resultByteArray; |
| | | } |
| | | } |
| | |
| | | private boolean sslEncryption; |
| | | |
| | | /** |
| | | * Create a new ServerStartMessage. |
| | | * Creates a new ServerStartMessage. This message is to be sent by an LDAP |
| | | * Server after being connected to a replication server for a given |
| | | * replication domain. |
| | | * |
| | | * @param serverId The serverId of the server for which the ServerStartMessage |
| | | * is created. |
| | |
| | | * @param heartbeatInterval The requested heartbeat interval. |
| | | * @param serverState The state of this server. |
| | | * @param protocolVersion The replication protocol version of the creator. |
| | | * @param generationId The generationId for this server. |
| | | * @param sslEncryption Whether to continue using SSL to encrypt messages |
| | | * after the start messages have been exchanged. |
| | | */ |
| | |
| | | long heartbeatInterval, |
| | | ServerState serverState, |
| | | short protocolVersion, |
| | | long generationId, |
| | | boolean sslEncryption) |
| | | { |
| | | super(protocolVersion); |
| | | super(protocolVersion, generationId); |
| | | |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn.toString(); |
| | |
| | | { |
| | | super(MSG_TYPE_SERVER_START, in); |
| | | |
| | | /* The ServerStartMessage is encoded in the form : |
| | | * <header><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState> |
| | | */ |
| | | try |
| | | { |
| | | /* first bytes are the header */ |
| | |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | /* |
| | | * ServerStartMessage contains. |
| | | * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><windowsize><heartbeatInterval><ServerState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | | byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8"); |
| | |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | /** |
| | | * This class Implement a protocol session using a basic socket and relying on |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugVerbose("Closing SocketSession."); |
| | | TRACER.debugInfo("Closing SocketSession." |
| | | + stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | socket.close(); |
| | | } |
| | |
| | | { |
| | | byte[] buffer = msg.getBytes(); |
| | | String str = String.format("%08x", buffer.length); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("SocketSession publish <" + str + ">"); |
| | | } |
| | | |
| | | byte[] sendLengthBuf = str.getBytes(); |
| | | |
| | | output.write(sendLengthBuf); |
| | |
| | | public abstract class StartMessage extends ReplicationMessage |
| | | { |
| | | private short protocolVersion; |
| | | private long generationId; |
| | | |
| | | /** |
| | | * The length of the header of this message. |
| | |
| | | * Create a new StartMessage. |
| | | * |
| | | * @param protocolVersion The Replication Protocol version of the server |
| | | * for which the StartMessage is created. |
| | | * for which the StartMessage is created. |
| | | * @param generationId The generationId for this server. |
| | | * |
| | | */ |
| | | public StartMessage(short protocolVersion) |
| | | public StartMessage(short protocolVersion, long generationId) |
| | | { |
| | | this.protocolVersion = protocolVersion; |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | |
| | | throws UnsupportedEncodingException |
| | | { |
| | | byte[] versionByte = Short.toString(protocolVersion).getBytes("UTF-8"); |
| | | byte[] byteGenerationID = |
| | | String.valueOf(generationId).getBytes("UTF-8"); |
| | | |
| | | /* The message header is stored in the form : |
| | | * <message type><protocol version> |
| | | */ |
| | | int length = 1 + versionByte.length + 1 + |
| | | byteGenerationID.length + 1 + |
| | | additionalLength; |
| | | |
| | | byte[] encodedMsg = new byte[length]; |
| | |
| | | int pos = 1; |
| | | |
| | | /* put the protocol version */ |
| | | headerLength = addByteArray(versionByte, encodedMsg, pos); |
| | | pos = addByteArray(versionByte, encodedMsg, pos); |
| | | |
| | | /* put the generationId */ |
| | | headerLength = addByteArray(byteGenerationID, encodedMsg, pos); |
| | | |
| | | return encodedMsg; |
| | | } |
| | |
| | | protocolVersion = Short.valueOf( |
| | | new String(encodedMsg, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | |
| | | /* read the generationId */ |
| | | length = getNextLength(encodedMsg, pos); |
| | | generationId = Long.valueOf(new String(encodedMsg, pos, length, |
| | | "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | |
| | | return pos; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | |
| | | /** |
| | | * Get the generationId from this message. |
| | | * @return The generationId. |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | public class DbHandler implements Runnable |
| | | { |
| | | // This queue hold all the updates not yet saved to stable storage |
| | | // it is only used as a temporary placeholder so that the write |
| | | // The msgQueue holds all the updates not yet saved to stable storage. |
| | | // This list is only used as a temporary placeholder so that the write |
| | | // in the stable storage can be grouped for efficiency reason. |
| | | // it is never read back by replicationServer threads that are responsible |
| | | // Adding an update synchronously add the update to this list. |
| | | // A dedicated thread loops on flush() and trim(). |
| | | // flush() : get a number of changes from the in memory list by block |
| | | // and write them to the db. |
| | | // trim() : deletes from the DB a number of changes that are older than a |
| | | // certain date. |
| | | // |
| | | // Changes are not read back by replicationServer threads that are responsible |
| | | // for pushing the changes to other replication server or to LDAP server |
| | | // |
| | | private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>(); |
| | | private ReplicationDB db; |
| | | private ChangeNumber firstChange = null; |
| | |
| | | private boolean done = false; |
| | | private DirectoryThread thread = null; |
| | | private Object flushLock = new Object(); |
| | | private ReplicationDbEnv dbenv; |
| | | |
| | | |
| | | // The High and low water mark for the max size of the msgQueue. |
| | | // the threads calling add() method will be blocked if the size of |
| | |
| | | final static int MSG_QUEUE_LOWMARK = 4000; |
| | | |
| | | /** |
| | | * the trim age in milliseconds. |
| | | * |
| | | * The trim age in milliseconds. Changes record in the change DB that |
| | | * are older than this age are removed. |
| | | * |
| | | */ |
| | | private long trimage; |
| | | |
| | | /** |
| | | * Creates a New dbHandler associated to a given LDAP server. |
| | | * Creates a new dbHandler associated to a given LDAP server. |
| | | * |
| | | * @param id Identifier of the DB. |
| | | * @param baseDn the baseDn for which this DB was created. |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * @param generationId The generationId of the data contained in the LDAP |
| | | * server for this domain. |
| | | * @throws DatabaseException If a database problem happened |
| | | */ |
| | | public DbHandler(short id, DN baseDn, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) |
| | | ReplicationDbEnv dbenv, long generationId) |
| | | throws DatabaseException |
| | | { |
| | | this.dbenv = dbenv; |
| | | this.serverId = id; |
| | | this.baseDn = baseDn; |
| | | this.trimage = replicationServer.getTrimage(); |
| | |
| | | * |
| | | * @param number the number of changes to be removed. |
| | | */ |
| | | private void clear(int number) |
| | | private void clearQueue(int number) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Flush old change information from this replicationServer database. |
| | | * Trim old changes from this replicationServer database. |
| | | * @throws DatabaseException In case of database problem. |
| | | */ |
| | | private void trim() throws DatabaseException, Exception |
| | |
| | | db.addEntries(changes); |
| | | |
| | | // remove the changes from the list of changes to be saved. |
| | | clear(changes.size()); |
| | | clearQueue(changes.size()); |
| | | } |
| | | } while (size >=500); |
| | | } |
| | |
| | | trimage = delay; |
| | | } |
| | | |
| | | /** |
| | | * Clear the changes from this DB (from both memory cache and DB storage). |
| | | * @throws DatabaseException When an exception occurs while removing the |
| | | * changes from the DB. |
| | | * @throws Exception When an exception occurs while accessing a resource |
| | | * from the DB. |
| | | * |
| | | */ |
| | | public void clear() throws DatabaseException, Exception |
| | | { |
| | | synchronized(flushLock) |
| | | { |
| | | msgQueue.clear(); |
| | | } |
| | | db.clear(); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | } |
| | | } |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.ReplServerInfoMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.types.DN; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | |
| | | new ConcurrentHashMap<Short, DbHandler>(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | /* GenerationId management */ |
| | | private long generationId = -1; |
| | | private boolean generationIdSavedStatus = false; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Creates a new ReplicationCache associated to the DN baseDn. |
| | | * |
| | |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Created Cache for " + baseDn + " " + |
| | | stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | |
| | | /** |
| | | * Add an update that has been received to the list of |
| | |
| | | * other replication server before pushing it to the LDAP servers |
| | | */ |
| | | |
| | | short id = update.getChangeNumber().getServerId(); |
| | | sourceHandler.updateServerState(update); |
| | | sourceHandler.incrementInCount(); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | // look for the dbHandler that is responsible for the master server which |
| | | // look for the dbHandler that is responsible for the LDAP server which |
| | | // generated the change. |
| | | DbHandler dbHandler = null; |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | short id = update.getChangeNumber().getServerId(); |
| | | dbHandler = sourceDbHandlers.get(id); |
| | | if (dbHandler == null) |
| | | { |
| | | try |
| | | { |
| | | dbHandler = replicationServer.newDbHandler(id, baseDn); |
| | | } catch (DatabaseException e) |
| | | dbHandler = replicationServer.newDbHandler(id, |
| | | baseDn, generationId); |
| | | generationIdSavedStatus = true; |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes |
| | |
| | | } |
| | | connectedServers.put(handler.getServerId(), handler); |
| | | |
| | | // It can be that the server that connects here is the |
| | | // first server connected for a domain. |
| | | // In that case, we will establish the appriopriate connections |
| | | // to the other repl servers for this domain and receive |
| | | // their ReplServerInfo messages. |
| | | // FIXME: Is it necessary to end this above processing BEFORE listening |
| | | // to incoming messages for that domain ? But the replica |
| | | // would raise Read Timeout for replica that connects. |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | |
| | | */ |
| | | public void stopServer(ServerHandler handler) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " stopServer " + handler.getMonitorInstanceName()); |
| | | |
| | | handler.stopHandler(); |
| | | |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | } |
| | | else |
| | | { |
| | | connectedServers.remove(handler.getServerId()); |
| | | } |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | mayResetGenerationId(); |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationId for this domain if there is no LDAP |
| | | * server currently connected and if the generationId has never |
| | | * been saved. |
| | | */ |
| | | protected void mayResetGenerationId() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId generationIdSavedStatus=" + |
| | | generationIdSavedStatus); |
| | | |
| | | // If there is no more any LDAP server connected to this domain in the |
| | | // topology and the generationId has never been saved, then we can reset |
| | | // it and the next LDAP server to connect will become the new reference. |
| | | boolean lDAPServersConnectedInTheTopology = false; |
| | | if (connectedServers.isEmpty()) |
| | | { |
| | | for (ServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (generationId != rsh.getGenerationId()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() + |
| | | " thas different genId"); |
| | | } |
| | | else |
| | | { |
| | | if (!rsh.getRemoteLDAPServers().isEmpty()) |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId RS" + rsh.getMonitorInstanceName() + |
| | | " has servers connected to it - will not reset generationId"); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " has servers connected to it - will not reset generationId"); |
| | | } |
| | | |
| | | if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus)) |
| | | { |
| | | setGenerationId(-1, false); |
| | | } |
| | | } |
| | | |
| | |
| | | // Update this server with the list of LDAP servers |
| | | // already connected |
| | | handler.sendInfo( |
| | | new ReplServerInfoMessage(getConnectedLDAPservers())); |
| | | new ReplServerInfoMessage(getConnectedLDAPservers(),generationId)); |
| | | |
| | | return true; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * creates a new ReplicationDB with specified identifier. |
| | | * @param id the identifier of the new ReplicationDB. |
| | | * @param db the new db. |
| | | * Sets the provided DbHandler associated to the provided serverId. |
| | | * |
| | | * @param serverId the serverId for the server to which is |
| | | * associated the Dbhandler. |
| | | * @param dbHandler the dbHandler associated to the serverId. |
| | | * |
| | | * @throws DatabaseException If a database error happened. |
| | | */ |
| | | public void newDb(short id, DbHandler db) throws DatabaseException |
| | | public void setDbHandler(short serverId, DbHandler dbHandler) |
| | | throws DatabaseException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | sourceDbHandlers.put(id , db); |
| | | sourceDbHandlers.put(serverId , dbHandler); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Process an InitializeRequestMessage. |
| | | * Processes a message coming from one server in the topology |
| | | * and potentially forwards it to one or all other servers. |
| | | * |
| | | * @param msg The message received and to be processed. |
| | | * @param senderHandler The server handler of the server that emitted |
| | |
| | | */ |
| | | public void process(RoutableMessage msg, ServerHandler senderHandler) |
| | | { |
| | | // A replication server is not expected to be the destination |
| | | // of a routable message except for an error message. |
| | | if (msg.getDestination() == this.replicationServer.getServerId()) |
| | | { |
| | | if (msg instanceof ErrorMessage) |
| | | { |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | } |
| | | else |
| | | { |
| | | logError(NOTE_ERR_ROUTING_TO_SERVER.get( |
| | | msg.getClass().getCanonicalName())); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | List<ServerHandler> servers = getDestinationServers(msg, senderHandler); |
| | | |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get()); |
| | | mb.append("serverID:" + msg.getDestination()); |
| | | mb.append(" unreachable server ID=" + msg.getDestination()); |
| | | mb.append(" unroutable message =" + msg); |
| | | ErrorMessage errMsg = new ErrorMessage( |
| | | msg.getsenderID(), mb.toMessage()); |
| | | this.replicationServer.getServerId(), |
| | | msg.getsenderID(), |
| | | mb.toMessage()); |
| | | |
| | | try |
| | | { |
| | | senderHandler.send(errMsg); |
| | |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | /* |
| | | * An error happened trying the send back an error to this server. |
| | | * Log an error and close the connection to the sender server. |
| | | * An error happened trying to send an error msg to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | MessageBuilder mb2 = new MessageBuilder(); |
| | | mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); |
| | |
| | | private void sendReplServerInfo() |
| | | { |
| | | ReplServerInfoMessage info = |
| | | new ReplServerInfoMessage(getConnectedLDAPservers()); |
| | | new ReplServerInfoMessage(getConnectedLDAPservers(), generationId); |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the generationId associated to this domain. |
| | | * |
| | | * @return The generationId |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Get the generationId saved status. |
| | | * |
| | | * @return The generationId saved status. |
| | | */ |
| | | public boolean getGenerationIdSavedStatus() |
| | | { |
| | | return generationIdSavedStatus; |
| | | } |
| | | |
| | | /** |
| | | * Sets the replication server informations for the provided |
| | | * handler from the provided ReplServerInfoMessage. |
| | | * |
| | |
| | | { |
| | | handler.setReplServerInfo(infoMsg); |
| | | } |
| | | |
| | | /** |
| | | * Sets the provided value as the new in memory generationId. |
| | | * |
| | | * @param generationId The new value of generationId. |
| | | * @param savedStatus The saved status of the generationId. |
| | | */ |
| | | synchronized public void setGenerationId(long generationId, |
| | | boolean savedStatus) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " RCache.set GenerationId=" + generationId); |
| | | |
| | | if (generationId == this.generationId) |
| | | return; |
| | | |
| | | if (this.generationId>0) |
| | | { |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | handler.resetGenerationId(); |
| | | } |
| | | } |
| | | |
| | | this.generationId = generationId; |
| | | this.generationIdSavedStatus = savedStatus; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationID. |
| | | * |
| | | * @param senderHandler The handler associated to the server |
| | | * that requested to reset the generationId. |
| | | */ |
| | | public void resetGenerationId(ServerHandler senderHandler) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " RCache.resetGenerationId"); |
| | | |
| | | // Notifies the others LDAP servers that from now on |
| | | // they have the bad generationId |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | handler.resetGenerationId(); |
| | | } |
| | | |
| | | // Propagates the reset message to the others replication servers |
| | | // dealing with the same domain. |
| | | if (senderHandler.isLDAPserver()) |
| | | { |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | | handler.sendGenerationId(new ResetGenerationId()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_INFO. |
| | | get(handler.getMonitorInstanceName())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Reset the localchange and state db for the current domain |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | try |
| | | { |
| | | dbHandler.clear(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // TODO: i18n |
| | | logError(Message.raw( |
| | | "Exception caught while clearing dbHandler:" + |
| | | e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | sourceDbHandlers.clear(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " The source db handler has been cleared"); |
| | | } |
| | | try |
| | | { |
| | | replicationServer.clearGenerationId(baseDn); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // TODO: i18n |
| | | logError(Message.raw( |
| | | "Exception caught while clearing generationId:" + |
| | | e.getLocalizedMessage())); |
| | | } |
| | | |
| | | // Reset the in memory domain generationId |
| | | generationId = -1; |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the provided server is in degraded |
| | | * state due to the fact that the peer server has an invalid |
| | | * generationId for this domain. |
| | | * |
| | | * @param serverId The serverId for which we want to know the |
| | | * the state. |
| | | * @return Whether it is degraded or not. |
| | | */ |
| | | |
| | | public boolean isDegradedDueToGenerationId(short serverId) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " isDegraded serverId=" + serverId + |
| | | " given local generation Id=" + this.generationId); |
| | | |
| | | ServerHandler handler = replicationServers.get(serverId); |
| | | if (handler == null) |
| | | { |
| | | handler = connectedServers.get(serverId); |
| | | if (handler == null) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " Compute degradation of serverId=" + serverId + |
| | | " LS server generation Id=" + handler.getGenerationId()); |
| | | return (handler.getGenerationId() != this.generationId); |
| | | } |
| | | |
| | | /** |
| | | * Return the associated replication server. |
| | | * @return The replication server. |
| | | */ |
| | | public ReplicationServer getReplicationServer() |
| | | { |
| | | return replicationServer; |
| | | } |
| | | } |
| | |
| | | /** |
| | | * Creates a new database or open existing database that will be used |
| | | * to store and retrieve changes from an LDAP server. |
| | | * @param serverId Identifier of the LDAP server. |
| | | * @param baseDn baseDn of the LDAP server. |
| | | * @param replicationServer the ReplicationServer that needs to be shutdown |
| | | * @param dbenv the Db encironemnet to use to create the db |
| | | * @throws DatabaseException if a database problem happened |
| | | * @param serverId The identifier of the LDAP server. |
| | | * @param baseDn The baseDn of the replication domain. |
| | | * @param replicationServer The ReplicationServer that needs to be shutdown. |
| | | * @param dbenv The Db environment to use to create the db. |
| | | * @throws DatabaseException If a database problem happened. |
| | | */ |
| | | public ReplicationDB(Short serverId, DN baseDn, |
| | | ReplicationServer replicationServer, |
| | |
| | | this.baseDn = baseDn; |
| | | this.dbenv = dbenv; |
| | | this.replicationServer = replicationServer; |
| | | db = dbenv.getOrAddDb(serverId, baseDn); |
| | | |
| | | // Get or create the associated Replicationcache and Db. |
| | | db = dbenv.getOrAddDb(serverId, baseDn, |
| | | replicationServer.getReplicationCache(baseDn, true).getGenerationId()); |
| | | } |
| | | |
| | | /** |
| | |
| | | cursor.delete(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears this change DB from the changes it contains. |
| | | * |
| | | * @throws Exception Throws an exception it occurs. |
| | | * @throws DatabaseException Throws a DatabaseException when it occurs. |
| | | */ |
| | | public void clear() throws Exception, DatabaseException |
| | | { |
| | | // Clears the changes |
| | | dbenv.clearDb(this.toString()); |
| | | |
| | | // Clears the reference to this serverID |
| | | dbenv.clearServerId(baseDn, serverId); |
| | | } |
| | | } |
| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | |
| | | private Environment dbEnvironment = null; |
| | | private Database stateDb = null; |
| | | private ReplicationServer replicationServer = null; |
| | | private static final String GENERATION_ID_TAG = "GENID"; |
| | | private static final String FIELD_SEPARATOR = " "; |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Initialize this class. |
| | |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * Get the domain base DN/ generationIDs records |
| | | */ |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data.getData(), "UTF-8"); |
| | | String[] str = stringData.split(" ", 2); |
| | | short serverId = new Short(str[0]); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Read tag baseDn generationId=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | long generationId=-1; |
| | | |
| | | DN baseDn; |
| | | |
| | | try |
| | | { |
| | | // <generationId> |
| | | generationId = new Long(str[1]); |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format: " + |
| | | e.getLocalizedMessage() |
| | | + "<" + str[1] + ">")); |
| | | } |
| | | |
| | | // <baseDn> |
| | | baseDn = null; |
| | | try |
| | | { |
| | | baseDn = DN.decode(str[2]); |
| | | } catch (DirectoryException e) |
| | | { |
| | | Message message = |
| | | ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); |
| | | logError(message); |
| | | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Has read baseDn=" + baseDn |
| | | + " generationId=" + generationId); |
| | | |
| | | replicationServer.getReplicationCache(baseDn, true). |
| | | setGenerationId(generationId, true); |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw("need UTF-8 support")); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | /* |
| | | * Get the server Id / domain base DN records |
| | | */ |
| | | status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | String stringData = null; |
| | | try |
| | | { |
| | | stringData = new String(data.getData(), "UTF-8"); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "need UTF-8 support")); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Read serverId BaseDN=" + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 2); |
| | | if (!str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | short serverId = -1; |
| | | try |
| | | { |
| | | // <serverId> |
| | | serverId = new Short(str[0]); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format: " + |
| | | e.getLocalizedMessage() |
| | | + "<" + str[0] + ">")); |
| | | } |
| | | // <baseDn> |
| | | DN baseDn = null; |
| | | try |
| | | { |
| | |
| | | } catch (DirectoryException e) |
| | | { |
| | | Message message = |
| | | ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); |
| | | ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]); |
| | | logError(message); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Has read: baseDn=" + baseDn |
| | | + " serverId=" + serverId); |
| | | |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, replicationServer, this); |
| | | replicationServer.getReplicationCache(baseDn).newDb(serverId, |
| | | dbHandler); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "replicationServer state database has a wrong format")); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | // TODO: i18n |
| | | throw new ReplicationDBException(Message.raw( |
| | | "need UTF-8 support")); |
| | | new DbHandler(serverId, baseDn, replicationServer, this, 1); |
| | | |
| | | replicationServer.getReplicationCache(baseDn, true). |
| | | setDbHandler(serverId, dbHandler); |
| | | } |
| | | |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | cursor.close(); |
| | | |
| | | } catch (DatabaseException dbe) { |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | cursor.close(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Find or create the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | | * @param serverId The server id that identifies the server. |
| | | * @param baseDn The baseDn that identifies the server. |
| | | * @return the Database. |
| | | * @throws DatabaseException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(Short serverId, DN baseDn) |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | /** |
| | | * Finds or creates the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | | * |
| | | * @param serverId The server id that identifies the server. |
| | | * @param baseDn The baseDn that identifies the domain. |
| | | * @param generationId The generationId associated to this domain. |
| | | * @return the Database. |
| | | * @throws DatabaseException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(Short serverId, DN baseDn, Long generationId) |
| | | throws DatabaseException |
| | | { |
| | | String stringId = serverId.toString() + " " + baseDn.toNormalizedString(); |
| | | byte[] byteId; |
| | | |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | |
| | | // Open the database. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " + |
| | | serverId + " " + baseDn + " " + generationId); |
| | | try |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | String stringId = serverId.toString() + FIELD_SEPARATOR |
| | | + baseDn.toNormalizedString(); |
| | | |
| | | // Opens the database for the changes received from this server |
| | | // on this domain. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | // Creates the record serverId/domain base Dn in the stateDb |
| | | // if it does not already exist. |
| | | byte[] byteId; |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("getOrAddDb() Created in the state Db record " + |
| | | " serverId/Domain=<"+stringId+">"); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | // Creates the record domain base Dn/ generationId in the stateDb |
| | | // if it does not already exist. |
| | | stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + |
| | | baseDn.toNormalizedString(); |
| | | String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR + |
| | | generationId.toString() + FIELD_SEPARATOR + |
| | | baseDn.toNormalizedString(); |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | byte[] dataByteId; |
| | | dataByteId = dataStringId.getBytes("UTF-8"); |
| | | key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | data = new DatabaseEntry(); |
| | | status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(dataByteId); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Created in the state Db record Tag/Domain/GenId key=" + |
| | | stringId + " value=" + dataStringId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | return db; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new transaction. |
| | | * |
| | | * @return the transaction. |
| | | * @throws DatabaseException in case of underlying database Exception. |
| | | */ |
| | | public Transaction beginTransaction() throws DatabaseException |
| | | { |
| | | return dbEnvironment.beginTransaction(null, null); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the provided generationId associated to the provided baseDn |
| | | * from the state Db. |
| | | * |
| | | * @param baseDn The baseDn for which the generationID must be cleared. |
| | | * |
| | | */ |
| | | public void clearGenerationId(DN baseDn) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId " + baseDn); |
| | | try |
| | | { |
| | | // Deletes the record domain base Dn/ generationId in the stateDb |
| | | String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR + |
| | | baseDn.toNormalizedString(); |
| | | byte[] byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if ((status == OperationStatus.SUCCESS) || |
| | | (status == OperationStatus.KEYEXIST)) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | stateDb.delete(txn, key); |
| | | txn.commitWriteNoSync(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId (" + |
| | | baseDn +") succeeded."); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // TODO : should have a better error logging |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearGenerationId ("+ baseDn + " failed" + status.toString()); |
| | | } |
| | | } |
| | | return db; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // can't happen |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new transaction. |
| | | * |
| | | * @return the transaction. |
| | | * @throws DatabaseException in case of underlying database Exception. |
| | | */ |
| | | public Transaction beginTransaction() throws DatabaseException |
| | | { |
| | | return dbEnvironment.beginTransaction(null, null); |
| | | } |
| | | /** |
| | | * Clears the provided serverId associated to the provided baseDn |
| | | * from the state Db. |
| | | * |
| | | * @param baseDn The baseDn for which the generationID must be cleared. |
| | | * @param serverId The serverId to remove from the Db. |
| | | * |
| | | */ |
| | | public void clearServerId(DN baseDn, Short serverId) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId); |
| | | try |
| | | { |
| | | String stringId = serverId.toString() + FIELD_SEPARATOR |
| | | + baseDn.toNormalizedString(); |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | // Deletes the record serverId/domain base Dn in the stateDb |
| | | byte[] byteId; |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.delete(txn, key); |
| | | txn.commitWriteNoSync(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | " In " + this.replicationServer.getMonitorInstanceName() + |
| | | " clearServerId() succeeded " + baseDn + " " + |
| | | serverId); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | // can't happen |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears the database. |
| | | * |
| | | * @param databaseName The name of the database to clear. |
| | | */ |
| | | public final void clearDb(String databaseName) |
| | | { |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "clearDb" + databaseName); |
| | | |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | dbEnvironment.truncateDatabase(txn, databaseName, false); |
| | | } |
| | | catch (DatabaseException dbe) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLEARING_DB.get(databaseName, |
| | | dbe.getLocalizedMessage())); |
| | | logError(mb.toMessage()); |
| | | } |
| | | } |
| | | } |
| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.Message; |
| | | |
| | | import org.opends.messages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import org.opends.messages.MessageBuilder; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ResultCode; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | |
| | | private int queueSize; |
| | | private String dbDirname = null; |
| | | private long trimAge; // the time (in sec) after which the changes must |
| | | // be deleted from the persistent storage. |
| | | private int replicationPort; |
| | | // de deleted from the persistent storage. |
| | | private boolean stopListen = false; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Creates a new Replication server using the provided configuration entry. |
| | | * |
| | | * @param configuration The configuration of this replication server. |
| | |
| | | // The socket has probably been closed as part of the |
| | | // shutdown or changing the port number process. |
| | | // just log debug information and loop. |
| | | Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get(); |
| | | Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | String hostname = serverURL.substring(0, separator); |
| | | boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " + this.getMonitorInstanceName() + |
| | | " connects to " + serverURL); |
| | | |
| | | try |
| | | { |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | |
| | | listenSocket.bind(new InetSocketAddress(changelogPort)); |
| | | |
| | | /* |
| | | * create working threads |
| | | * creates working threads |
| | | * We must first connect, then start to listen. |
| | | */ |
| | | listenThread = |
| | | new ReplicationServerListenThread("Replication Server Listener", this); |
| | | listenThread.start(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates connect threads"); |
| | | connectThread = |
| | | new ReplicationServerConnectThread("Replication Server Connect", this); |
| | | connectThread.start(); |
| | | |
| | | // FIXME : Is it better to have the time to receive the ReplServerInfo |
| | | // from all the other replication servers since this info is necessary |
| | | // to route an early received total update request. |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates listen threads"); |
| | | |
| | | listenThread = |
| | | new ReplicationServerListenThread("Replication Server Listener", this); |
| | | listenThread.start(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " successfully initialized"); |
| | | |
| | | } catch (DatabaseException e) |
| | | { |
| | | Message message = ERR_COULD_NOT_INITIALIZE_DB.get(dbDirname); |
| | | Message message = ERR_COULD_NOT_INITIALIZE_DB.get( |
| | | getFileForPath(dbDirname).getAbsolutePath()); |
| | | logError(message); |
| | | } catch (ReplicationDBException e) |
| | | { |
| | | Message message = ERR_COULD_NOT_READ_DB.get(dbDirname); |
| | | Message message = ERR_COULD_NOT_READ_DB.get(dbDirname, |
| | | e.getLocalizedMessage()); |
| | | logError(message); |
| | | } catch (UnknownHostException e) |
| | | { |
| | |
| | | * Get the ReplicationCache associated to the base DN given in parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ReplicationCache must be returned. |
| | | * @param create Specifies whether to create the ReplicationCache if it does |
| | | * not already exist. |
| | | * @return The ReplicationCache associated to the base DN given in parameter. |
| | | */ |
| | | public ReplicationCache getReplicationCache(DN baseDn) |
| | | public ReplicationCache getReplicationCache(DN baseDn, boolean create) |
| | | { |
| | | ReplicationCache replicationCache; |
| | | |
| | | synchronized (baseDNs) |
| | | { |
| | | replicationCache = baseDNs.get(baseDn); |
| | | if (replicationCache == null) |
| | | if ((replicationCache == null) && (create)) |
| | | { |
| | | replicationCache = new ReplicationCache(baseDn, this); |
| | | baseDNs.put(baseDn, replicationCache); |
| | | baseDNs.put(baseDn, replicationCache); |
| | | } |
| | | } |
| | | |
| | | return replicationCache; |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | if (shutdown) |
| | | return; |
| | | |
| | | shutdown = true; |
| | | |
| | | // shutdown the connect thread |
| | |
| | | // replication Server service is closing anyway. |
| | | } |
| | | |
| | | // shutdown the listen thread |
| | | if (listenThread != null) |
| | | { |
| | | listenThread.interrupt(); |
| | | } |
| | | |
| | | // shutdown all the ChangelogCaches |
| | | for (ReplicationCache replicationCache : baseDNs.values()) |
| | | { |
| | |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @param generationId The generationId for this server and this domain. |
| | | * @return The new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | | */ |
| | | DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException |
| | | public DbHandler newDbHandler(short id, DN baseDn, long generationId) |
| | | throws DatabaseException |
| | | { |
| | | return new DbHandler(id, baseDn, this, dbEnv); |
| | | return new DbHandler(id, baseDn, this, dbEnv, generationId); |
| | | } |
| | | |
| | | /** |
| | | * Clears the generationId for the domain related to the provided baseDn. |
| | | * @param baseDn The baseDn for which to delete the generationId. |
| | | * @throws DatabaseException When it occurs. |
| | | */ |
| | | public void clearGenerationId(DN baseDn) |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDn); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS <" + getMonitorInstanceName() + |
| | | " Exception in clearGenerationId" + |
| | | stackTraceToSingleLineString(e) + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | Attribute bases = new Attribute(baseType, "base-dn", baseValues); |
| | | attributes.add(bases); |
| | | |
| | | // Publish to monitor the generation ID by domain |
| | | AttributeType generationIdType= |
| | | DirectoryServer.getAttributeType("base-dn-generation-id", true); |
| | | LinkedHashSet<AttributeValue> generationIdValues = |
| | | new LinkedHashSet<AttributeValue>(); |
| | | for (DN base : baseDNs.keySet()) |
| | | { |
| | | long generationId=-1; |
| | | ReplicationCache cache = getReplicationCache(base, false); |
| | | if (cache != null) |
| | | generationId = cache.getGenerationId(); |
| | | generationIdValues.add(new AttributeValue(generationIdType, |
| | | base.toString() + " " + generationId)); |
| | | } |
| | | Attribute generationIds = new Attribute(generationIdType, "generation-id", |
| | | generationIdValues); |
| | | attributes.add(generationIds); |
| | | |
| | | return attributes; |
| | | } |
| | | |
| | | /** |
| | | * Get the value of generationId for the replication domain |
| | | * associated with the provided baseDN. |
| | | * |
| | | * @param baseDN The baseDN of the domain. |
| | | * @return The value of the generationID. |
| | | */ |
| | | public long getGenerationId(DN baseDN) |
| | | { |
| | | ReplicationCache rc = this.getReplicationCache(baseDN, false); |
| | | if (rc!=null) |
| | | return rc.getGenerationId(); |
| | | return -1; |
| | | } |
| | | |
| | | /** |
| | | * Get the serverId for this replication server. |
| | | * |
| | | * @return The value of the serverId. |
| | | * |
| | | */ |
| | | public short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | } |
| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import org.opends.messages.*; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | |
| | | import java.util.HashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | |
| | | private short replicationServerId; |
| | | |
| | | private short protocolVersion; |
| | | private long generationId=-1; |
| | | |
| | | |
| | | /** |
| | |
| | | * Then create the reader and writer thread. |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection. |
| | | * null if this is an incoming connection (listen). |
| | | * @param replicationServerId The identifier of the replicationServer that |
| | | * creates this server handler. |
| | | * @param replicationServerURL The URL of the replicationServer that creates |
| | |
| | | int windowSize, boolean sslEncryption, |
| | | ReplicationServer replicationServer) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + |
| | | " starts a new LS or RS " + |
| | | ((baseDn == null)?"incoming connection":"outgoing connection")); |
| | | |
| | | this.replicationServerId = replicationServerId; |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | long localGenerationId=-1; |
| | | try |
| | | { |
| | | if (baseDn != null) |
| | | { |
| | | // This is an outgoing connection. Publish our start message. |
| | | this.baseDn = baseDn; |
| | | replicationCache = replicationServer.getReplicationCache(baseDn); |
| | | |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(baseDn, true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage msg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | baseDn, windowSize, localServerState, |
| | | protocolVersion, sslEncryption); |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption); |
| | | |
| | | session.publish(msg); |
| | | } |
| | | |
| | |
| | | ReplicationMessage msg = session.receive(); |
| | | if (msg instanceof ServerStartMessage) |
| | | { |
| | | // The remote server is an LDAP Server |
| | | // The remote server is an LDAP Server. |
| | | ServerStartMessage receivedMsg = (ServerStartMessage) msg; |
| | | |
| | | generationId = receivedMsg.getGenerationId(); |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | serverId = receivedMsg.getServerId(); |
| | |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | // This an incoming connection. Publish our start message |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | // Get or Create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | // This an incoming connection. Publish our start message |
| | | ReplServerStartMessage myStartMsg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState, |
| | | protocolVersion, sslEncryption); |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | |
| | | /* Until here session is encrypted then it depends on the negociation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = replicationCache.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | ", SH received START from LS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | | " localGenerationId=" + localGenerationId + |
| | | " state=" + ss + |
| | | " and sent ReplServerStart with state=" + lss); |
| | | } |
| | | |
| | | /* |
| | | * If we have already a generationID set for the domain |
| | | * then |
| | | * if the connecting replica has not the same |
| | | * then it is degraded locally and notified by an error message |
| | | * else |
| | | * we set the generationID from the one received |
| | | * (unsaved yet on disk . will be set with the 1rst change received) |
| | | */ |
| | | if (localGenerationId>0) |
| | | { |
| | | if (generationId != localGenerationId) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | receivedMsg.getBaseDn().toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | replicationCache.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | else if (msg instanceof ReplServerStartMessage) |
| | | { |
| | |
| | | ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | generationId = receivedMsg.getGenerationId(); |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | int separator = serverURL.lastIndexOf(':'); |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | ServerState serverState = replicationCache.getDbServerState(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | |
| | | new ReplServerStartMessage(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, serverState, |
| | | protocolVersion, sslEncryption); |
| | | protocolVersion, |
| | | localGenerationId, |
| | | sslEncryption); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | |
| | | } |
| | | this.serverState = receivedMsg.getServerState(); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | |
| | | /* Until here session is encrypted then it depends on the negociation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = replicationCache.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | ", SH received START from RS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | | " localGenerationId=" + localGenerationId + |
| | | " state=" + ss + |
| | | " and sent ReplServerStart with state=" + lss); |
| | | } |
| | | |
| | | // if the remote RS and the local RS have the same genID |
| | | // then it's ok and nothing else to do |
| | | if (generationId == localGenerationId) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS with serverID=" + serverId + |
| | | " is connected with the right generation ID"); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (localGenerationId>0) |
| | | { |
| | | // if the local RS is initialized |
| | | if (generationId>0) |
| | | { |
| | | // if the remote RS is initialized |
| | | if (generationId != localGenerationId) |
| | | { |
| | | // if the 2 RS have different generationID |
| | | if (replicationCache.getGenerationIdSavedStatus()) |
| | | { |
| | | // it the present RS has received changes regarding its |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | else |
| | | { |
| | | // The present RS has never received changes regarding its |
| | | // gen ID. |
| | | // |
| | | // Example case: |
| | | // - we are in RS1 |
| | | // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) |
| | | // - RS1 has genId1 from LS1 /genId1 comes from data in suffix |
| | | // - we are in RS1 and we receive a START msg from RS2 |
| | | // - Each RS keeps its genID / is degraded and when LS2 will |
| | | // be populated from LS1 everything will becomes ok. |
| | | // |
| | | // Issue: |
| | | // FIXME : Would it be a good idea in some cases to just |
| | | // set the gen ID received from the peer RS |
| | | // specially if the peer has a non nul state and |
| | | // we have a nul state ? |
| | | // replicationCache.setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // The remote has no genId. We don't change anything for the |
| | | // current RS. |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | replicationCache.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | |
| | | boolean started; |
| | | if (serverIsLDAPserver) |
| | |
| | | |
| | | if (started) |
| | | { |
| | | writer = new ServerWriter(session, serverId, this, replicationCache); |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | reader = new ServerReader(session, serverId, this, |
| | | replicationCache); |
| | | writer = new ServerWriter(session, serverId, this, replicationCache); |
| | | reader = new ServerReader(session, serverId, this, replicationCache); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | |
| | | // the connection is not valid, close it. |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS failed to start locally " + |
| | | " the connection from serverID="+serverId); |
| | | } |
| | | session.close(); |
| | | } catch (IOException e1) |
| | | { |
| | |
| | | { |
| | | // some problem happened, reject the connection |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(this.toString())); |
| | | mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get( |
| | | this.getMonitorInstanceName())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | try |
| | |
| | | // ignore |
| | | } |
| | | } |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void add(UpdateMessage update, ServerHandler sourceHandler) |
| | | { |
| | | /* |
| | | * Ignore updates from a server that is degraded due to |
| | | * its inconsistent generationId |
| | | */ |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | (referenceGenerationId != generationId)) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_TO.get( |
| | | update.getDn(), |
| | | this.getMonitorInstanceName())); |
| | | |
| | | return; |
| | | } |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | |
| | | if (serverIsLDAPserver) |
| | | return "Remote LDAP Server " + str; |
| | | else |
| | | return "Remote Replication Server " + str; |
| | | return "Remote Repl Server " + str; |
| | | } |
| | | |
| | | /** |
| | |
| | | attributes.add(attr); |
| | | |
| | | attributes.add(new Attribute("ssl-encryption", |
| | | String.valueOf(session.isEncrypted()))); |
| | | String.valueOf(session.isEncrypted()))); |
| | | |
| | | attributes.add(new Attribute("generation-id", |
| | | String.valueOf(generationId))); |
| | | |
| | | return attributes; |
| | | } |
| | |
| | | public void process(RoutableMessage msg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("SH(" + replicationServerId + ") receives " + |
| | | msg + " from " + serverId); |
| | | |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " processes received msg=" + msg); |
| | | replicationCache.process(msg, this); |
| | | } |
| | | |
| | |
| | | public void sendInfo(ReplServerInfoMessage info) |
| | | throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + info); |
| | | |
| | | session.publish(info); |
| | | } |
| | | |
| | |
| | | */ |
| | | public void setReplServerInfo(ReplServerInfoMessage infoMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sets replServerInfo " + "<" + infoMsg + ">"); |
| | | remoteLDAPservers = infoMsg.getConnectedServers(); |
| | | generationId = infoMsg.getGenerationId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void send(RoutableMessage msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + msg); |
| | | session.publish(msg); |
| | | } |
| | | |
| | |
| | | checkWindow(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the value of generationId for that handler. |
| | | * @return The value of the generationId. |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationId for this domain. |
| | | */ |
| | | public void resetGenerationId() |
| | | { |
| | | // Notify the peer that it is now invalid regarding the generationId |
| | | // We are now waiting a startServer message from this server with |
| | | // a valid generationId. |
| | | try |
| | | { |
| | | Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString()); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(serverId, replicationServerId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // FIXME Log exception when sending reset error message |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sends a message containing a generationId to a peer server. |
| | | * The peer is expected to be a replication server. |
| | | * |
| | | * @param msg The GenerationIdMessage message to be sent. |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | */ |
| | | public void sendGenerationId(ResetGenerationId msg) |
| | | throws IOException |
| | | { |
| | | session.publish(msg); |
| | | } |
| | | } |
| | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | |
| | | import java.io.IOException; |
| | |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | | import org.opends.server.replication.protocol.EntryMessage; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.replication.protocol.InitializeRequestMessage; |
| | | import org.opends.server.replication.protocol.InitializeTargetMessage; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | |
| | | * Constructor for the LDAP server reader part of the replicationServer. |
| | | * |
| | | * @param session The ProtocolSession from which to read the data. |
| | | * @param serverId The server ID of the server from which we read changes. |
| | | * @param serverId The server ID of the server from which we read messages. |
| | | * @param handler The server handler for this server reader. |
| | | * @param replicationCache The ReplicationCache for this server reader. |
| | | */ |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | TRACER.debugInfo("Replication server reader starting " + serverId); |
| | | } |
| | | else |
| | | { |
| | | TRACER.debugInfo("LDAP server reader starting " + serverId); |
| | | } |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" RS ":" LS")+ |
| | | " reader starting for serverId=" + serverId); |
| | | } |
| | | /* |
| | | * wait on input stream |
| | |
| | | { |
| | | ReplicationMessage msg = session.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | "> from RS server with serverId=" + serverId + |
| | | " receives " + msg); |
| | | } |
| | | else |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | "> from LDAP server with serverId=" + serverId + |
| | | " receives " + msg); |
| | | } |
| | | } |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationCache.put(update, handler); |
| | | // Ignore update received from a replica with |
| | | // a bad generation ID |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_FROM.get( |
| | | msg.toString(), |
| | | handler.getMonitorInstanceName())); |
| | | } |
| | | else |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationCache.put(update, handler); |
| | | } |
| | | } |
| | | else if (msg instanceof WindowMessage) |
| | | { |
| | |
| | | ErrorMessage errorMsg = (ErrorMessage) msg; |
| | | handler.process(errorMsg); |
| | | } |
| | | else if (msg instanceof ResetGenerationId) |
| | | { |
| | | ResetGenerationId genIdMsg = (ResetGenerationId) msg; |
| | | replicationCache.resetGenerationId(this.handler); |
| | | } |
| | | else if (msg instanceof WindowProbe) |
| | | { |
| | | WindowProbe windowProbeMsg = (WindowProbe) msg; |
| | |
| | | { |
| | | ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg; |
| | | handler.setReplServerInfo(infoMsg); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | getServerId() + |
| | | " Receiving replServerInfo from " + handler.getServerId() + |
| | | " baseDn=" + replicationCache.getBaseDn() + |
| | | " genId=" + infoMsg.getGenerationId()); |
| | | } |
| | | |
| | | if (replicationCache.getGenerationId()<0) |
| | | { |
| | | // Here is the case where a ReplicationServer receives from |
| | | // another ReplicationServer the generationId for a domain |
| | | // for which the generation ID has never been set. |
| | | replicationCache.setGenerationId(infoMsg.getGenerationId(), false); |
| | | } |
| | | else |
| | | { |
| | | if (infoMsg.getGenerationId()<0) |
| | | { |
| | | // Here is the case where another ReplicationServer |
| | | // signals that it has no generationId set for the domain. |
| | | // If we have generationId set locally and no server currently |
| | | // connected for that domain in the topology then we may also |
| | | // reset the generationId localy. |
| | | replicationCache.mayResetGenerationId(); |
| | | } |
| | | |
| | | if (replicationCache.getGenerationId() != infoMsg.getGenerationId()) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | replicationCache.getBaseDn().toNormalizedString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(infoMsg.getGenerationId()), |
| | | Long.toString(replicationCache.getGenerationId())); |
| | | |
| | | ErrorMessage errorMsg = new ErrorMessage( |
| | | replicationCache.getReplicationServer().getServerId(), |
| | | handler.getServerId(), |
| | | message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | } |
| | | else if (msg == null) |
| | | { |
| | |
| | | * Log a message and exit from this loop |
| | | * So that this handler is stopped. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader IO EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e) + e.getLocalizedMessage() + |
| | | e.getCause()); |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); |
| | | logError(message); |
| | | } catch (ClassNotFoundException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader CNF EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the conenction. |
| | | * close the connection. |
| | | */ |
| | | Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString()); |
| | | logError(message); |
| | | } catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the conenction. |
| | | * close the connection. |
| | | */ |
| | | Message message = NOTE_READER_EXCEPTION.get(handler.toString()); |
| | | logError(message); |
| | |
| | | * happen. |
| | | * Attempt to close the socket and stop the server handler. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader CLOSE serverID=" + serverId |
| | | + stackTraceToSingleLineString(new Exception())); |
| | | try |
| | | { |
| | | session.close(); |
| | |
| | | replicationCache.stopServer(handler); |
| | | } |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | TRACER.debugInfo("Replication server reader stopping " + serverId); |
| | | } |
| | | else |
| | | { |
| | | TRACER.debugInfo("LDAP server reader stopping " + serverId); |
| | | } |
| | | } |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?"RS":"LDAP") + |
| | | " server reader stopped for serverID=" + serverId |
| | | + stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | } |
| | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import java.io.IOException; |
| | |
| | | TRACER.debugInfo("LDAP server writer starting " + serverId); |
| | | } |
| | | } |
| | | try { |
| | | try |
| | | { |
| | | while (true) |
| | | { |
| | | UpdateMessage update = replicationCache.take(this.handler); |
| | | if (update == null) |
| | | return; /* this connection is closing */ |
| | | |
| | | // Ignore update to be sent to a replica with a bad generation ID |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | if (referenceGenerationId != handler.getGenerationId()) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_TO.get( |
| | | update.getDn(), |
| | | this.handler.getMonitorInstanceName())); |
| | | continue; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | ", writer to " + this.handler.getMonitorInstanceName() + |
| | | " publishes" + update.toString() + |
| | | " refgenId=" + referenceGenerationId + |
| | | " server=" + handler.getServerId() + |
| | | " generationId=" + handler.getGenerationId()); |
| | | } |
| | | session.publish(update); |
| | | } |
| | | } |
| | |
| | | * An unexpected error happened. |
| | | * Log an error and close the connection. |
| | | */ |
| | | Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString()); |
| | | Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() + |
| | | " " + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | finally { |
| | |
| | | } |
| | | try |
| | | { |
| | | domain.initializeTarget(target, this); |
| | | domain.initializeRemote(target, this); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | try |
| | | { |
| | | // launch the import |
| | | domain.initialize(source, this); |
| | | domain.initializeFromRemote(source, this); |
| | | |
| | | synchronized(initState) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.tasks; |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | | import static org.opends.server.core.DirectoryServer.getAttributeType; |
| | | |
| | | import java.util.List; |
| | | |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.TaskMessages; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.plugin.ReplicationDomain; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | /** |
| | | * This class provides an implementation of a Directory Server task that can |
| | | * be used to import data over the replication protocol from another |
| | | * server hosting the same replication domain. |
| | | */ |
| | | public class SetGenerationIdTask extends Task |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | boolean isCompressed = false; |
| | | boolean isEncrypted = false; |
| | | boolean skipSchemaValidation = false; |
| | | String domainString = null; |
| | | ReplicationDomain domain = null; |
| | | TaskState initState; |
| | | |
| | | private static final void debugInfo(String s) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | // System.out.println(Message.raw(Category.SYNC, Severity.NOTICE, s)); |
| | | TRACER.debugInfo(s); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override public void initializeTask() throws DirectoryException |
| | | { |
| | | if (TaskState.isDone(getTaskState())) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // FIXME -- Do we need any special authorization here? |
| | | Entry taskEntry = getTaskEntry(); |
| | | |
| | | AttributeType typeDomainBase; |
| | | |
| | | // Retrieves the replication domain |
| | | typeDomainBase = |
| | | getAttributeType(ATTR_TASK_SET_GENERATION_ID_DOMAIN_DN, true); |
| | | |
| | | List<Attribute> attrList; |
| | | attrList = taskEntry.getAttribute(typeDomainBase); |
| | | domainString = TaskUtils.getSingleValueString(attrList); |
| | | DN domainDN = DN.nullDN(); |
| | | try |
| | | { |
| | | domainDN = DN.decode(domainString); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get()); |
| | | mb.append(e.getMessage()); |
| | | throw new DirectoryException(ResultCode.INVALID_DN_SYNTAX, |
| | | mb.toMessage()); |
| | | } |
| | | |
| | | domain = ReplicationDomain.retrievesReplicationDomain(domainDN); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | protected TaskState runTask() |
| | | { |
| | | debugInfo("setGenerationIdTask is starting on domain%s" + |
| | | domain.getBaseDN()); |
| | | |
| | | domain.resetGenerationId(); |
| | | |
| | | debugInfo("setGenerationIdTask is ending SUCCESSFULLY"); |
| | | return TaskState.COMPLETED_SUCCESSFULLY; |
| | | } |
| | | } |
| | |
| | | // file. |
| | | private long youngestModificationTime; |
| | | |
| | | // The Synchronization State. |
| | | // The synchronization State. |
| | | private LinkedHashSet<AttributeValue> synchronizationState = null; |
| | | |
| | | // The synchronization generationId. |
| | | private LinkedHashSet<AttributeValue> synchronizationGenerationId |
| | | = null; |
| | | |
| | | |
| | | |
| | | /** |
| | |
| | | dupSchema.synchronizationState = |
| | | new LinkedHashSet<AttributeValue>(synchronizationState); |
| | | } |
| | | if (synchronizationGenerationId != null) |
| | | { |
| | | dupSchema.synchronizationGenerationId = new |
| | | LinkedHashSet<AttributeValue>(synchronizationGenerationId); |
| | | } |
| | | |
| | | return dupSchema; |
| | | } |
| | |
| | | * |
| | | * @param values Synchronization state for this schema. |
| | | */ |
| | | public void setSynchronizationGenerationId( |
| | | LinkedHashSet<AttributeValue> values) |
| | | { |
| | | synchronizationGenerationId = values; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the Synchronization generationId for this schema. |
| | | * |
| | | * @return The Synchronization generationId for this schema. |
| | | */ |
| | | public LinkedHashSet<AttributeValue> |
| | | getSynchronizationGenerationId() |
| | | { |
| | | return synchronizationGenerationId; |
| | | } |
| | | |
| | | /** |
| | | * Sets the Synchronization state for this schema. |
| | | * |
| | | * @param values Synchronization state for this schema. |
| | | */ |
| | | public void setSynchronizationState( |
| | | LinkedHashSet<AttributeValue> values) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES; |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.assertNotNull; |
| | | import static org.testng.Assert.assertTrue; |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import java.io.File; |
| | | import java.net.ServerSocket; |
| | | import java.net.SocketException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.UUID; |
| | | import java.net.SocketTimeoutException; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.AddOperationBasis; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.ReplicationDomain; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | | import org.opends.server.replication.protocol.EntryMessage; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.InitializeTargetMessage; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.protocol.SocketSession; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.schema.DirectoryStringSyntax; |
| | | import org.opends.server.tasks.LdifFileWriter; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchFilter; |
| | | import org.opends.server.types.SearchScope; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | /** |
| | | * Tests contained here: |
| | | * |
| | | * - testSingleRS : test generation ID setting with different servers and one |
| | | * Replication server. |
| | | * |
| | | * - testMultiRS : tests generation ID propagatoion with more than one |
| | | * Replication server. |
| | | * |
| | | */ |
| | | |
| | | public class GenerationIdTest extends ReplicationTestCase |
| | | { |
| | | // The tracer object for the debug logger |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private static final String baseDnStr = "dc=example,dc=com"; |
| | | private static final String baseSnStr = "genidcom"; |
| | | |
| | | private static final int WINDOW_SIZE = 10; |
| | | private static final int CHANGELOG_QUEUE_SIZE = 100; |
| | | private static final short server1ID = 1; |
| | | private static final short server2ID = 2; |
| | | private static final short server3ID = 3; |
| | | private static final short changelog1ID = 11; |
| | | private static final short changelog2ID = 12; |
| | | private static final short changelog3ID = 13; |
| | | |
| | | private DN baseDn; |
| | | private ReplicationBroker broker2 = null; |
| | | private ReplicationBroker broker3 = null; |
| | | private ReplicationServer replServer1 = null; |
| | | private ReplicationServer replServer2 = null; |
| | | private ReplicationServer replServer3 = null; |
| | | private boolean emptyOldChanges = true; |
| | | ReplicationDomain replDomain = null; |
| | | private Entry taskInitRemoteS2; |
| | | SocketSession ssSession = null; |
| | | boolean ssShutdownRequested = false; |
| | | protected String[] updatedEntries; |
| | | |
| | | private static int[] replServerPort = new int[20]; |
| | | |
| | | /** |
| | | * A temporary LDIF file containing some test entries. |
| | | */ |
| | | private File ldifFile; |
| | | |
| | | /** |
| | | * A temporary file to contain rejected entries. |
| | | */ |
| | | private File rejectFile; |
| | | |
| | | /** |
| | | * A makeldif template used to create some test entries. |
| | | */ |
| | | private static String diff = ""; |
| | | private static String[] template = new String[] { |
| | | "define suffix=" + baseDnStr, |
| | | "define maildomain=example.com", |
| | | "define numusers=11", |
| | | "", |
| | | "branch: [suffix]", |
| | | "", |
| | | "branch: ou=People,[suffix]", |
| | | "subordinateTemplate: person:[numusers]", |
| | | "", |
| | | "template: person", |
| | | "rdnAttr: uid", |
| | | "objectClass: top", |
| | | "objectClass: person", |
| | | "objectClass: organizationalPerson", |
| | | "objectClass: inetOrgPerson", |
| | | "givenName: <first>", |
| | | "sn: <last>", |
| | | "cn: {givenName} {sn}", |
| | | "initials: {givenName:1}<random:chars:" + |
| | | "ABCDEFGHIJKLMNOPQRSTUVWXYZ:1>{sn:1}", |
| | | "employeeNumber: <sequential:0>", |
| | | "uid: user.{employeeNumber}", |
| | | "mail: {uid}@[maildomain]", |
| | | "userPassword: password", |
| | | "telephoneNumber: <random:telephone>", |
| | | "homePhone: <random:telephone>", |
| | | "pager: <random:telephone>", |
| | | "mobile: <random:telephone>", |
| | | "street: <random:numeric:5> <file:streets> Street", |
| | | "l: <file:cities>", |
| | | "st: <file:states>", |
| | | "postalCode: <random:numeric:5>", |
| | | "postalAddress: {cn}${street}${l}, {st} {postalCode}", |
| | | "description: This is the description for {cn} " + diff, |
| | | ""}; |
| | | |
| | | |
| | | private void debugInfo(String s) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("** TEST **" + s); |
| | | } |
| | | } |
| | | protected void debugInfo(String message, Exception e) |
| | | { |
| | | debugInfo(message + stackTraceToSingleLineString(e)); |
| | | } |
| | | |
| | | /** |
| | | * Set up the environment for performing the tests in this Class. |
| | | * |
| | | * @throws Exception |
| | | * If the environment could not be set up. |
| | | */ |
| | | @BeforeClass |
| | | public void setUp() throws Exception |
| | | { |
| | | //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled()); |
| | | |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | |
| | | baseDn = DN.decode(baseDnStr); |
| | | |
| | | updatedEntries = newLDIFEntries(); |
| | | |
| | | // Create an internal connection in order to provide operations |
| | | // to DS to populate the db - |
| | | connection = InternalClientConnection.getRootConnection(); |
| | | |
| | | // Synchro provider |
| | | String synchroStringDN = "cn=Synchronization Providers,cn=config"; |
| | | |
| | | // Synchro multi-master |
| | | synchroPluginStringDN = "cn=Multimaster Synchronization, " |
| | | + synchroStringDN; |
| | | |
| | | // Synchro suffix |
| | | synchroServerEntry = null; |
| | | |
| | | // Add config entries to the current DS server based on : |
| | | // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN |
| | | // Add synchroServerEntry |
| | | // Add replServerEntry |
| | | configureReplication(); |
| | | |
| | | taskInitRemoteS2 = TestCaseUtils.makeEntry( |
| | | "dn: ds-task-id=" + UUID.randomUUID() + |
| | | ",cn=Scheduled Tasks,cn=Tasks", |
| | | "objectclass: top", |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-replica-server-id: " + server2ID); |
| | | |
| | | // Change log |
| | | String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN; |
| | | String changeLogLdif = "dn: " + changeLogStringDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n" |
| | | + "ds-cfg-changelog-server-id: 1\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE + "\n" |
| | | + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE; |
| | | replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); |
| | | replServerEntry = null; |
| | | |
| | | } |
| | | |
| | | // Tests that entries have been written in the db |
| | | private int testEntriesInDb() |
| | | { |
| | | debugInfo("TestEntriesInDb"); |
| | | short found = 0; |
| | | |
| | | for (String entry : updatedEntries) |
| | | { |
| | | |
| | | int dns = entry.indexOf("dn: "); |
| | | int dne = entry.indexOf("dc=com"); |
| | | String dn = entry.substring(dns+4,dne+6); |
| | | |
| | | debugInfo("Search Entry: " + dn); |
| | | |
| | | DN entryDN = null; |
| | | try |
| | | { |
| | | entryDN = DN.decode(dn); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugInfo("TestEntriesInDb/" + e); |
| | | } |
| | | |
| | | try |
| | | { |
| | | Entry resultEntry = getEntry(entryDN, 1000, true); |
| | | if (resultEntry==null) |
| | | { |
| | | debugInfo("Entry not found <" + dn + ">"); |
| | | } |
| | | else |
| | | { |
| | | debugInfo("Entry found <" + dn + ">"); |
| | | found++; |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugInfo("TestEntriesInDb/", e); |
| | | } |
| | | } |
| | | return found; |
| | | } |
| | | |
| | | /** |
| | | * Add a task to the configuration of the current running DS. |
| | | * @param taskEntry The task to add. |
| | | * @param expectedResult The expected result code for the ADD. |
| | | * @param errorMessageID The expected error messageID when the expected |
| | | * result code is not SUCCESS |
| | | */ |
| | | private void addTask(Entry taskEntry, ResultCode expectedResult, |
| | | Message errorMessage) |
| | | { |
| | | try |
| | | { |
| | | debugInfo("AddTask/" + taskEntry); |
| | | |
| | | // Change config of DS to launch the total update task |
| | | InternalClientConnection connection = |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | | // Add the task. |
| | | |
| | | AddOperation addOperation = |
| | | connection.processAdd(taskEntry.getDN(), |
| | | taskEntry.getObjectClasses(), |
| | | taskEntry.getUserAttributes(), |
| | | taskEntry.getOperationalAttributes()); |
| | | |
| | | assertEquals(addOperation.getResultCode(), expectedResult, |
| | | "Result of ADD operation of the task is: " |
| | | + addOperation.getResultCode() |
| | | + " Expected:" |
| | | + expectedResult + " Details:" + addOperation.getErrorMessage() |
| | | + addOperation.getAdditionalLogMessage()); |
| | | |
| | | if (expectedResult != ResultCode.SUCCESS) |
| | | { |
| | | assertTrue(addOperation.getErrorMessage().toString(). |
| | | startsWith(errorMessage.toString()), |
| | | "Error MsgID of the task <" |
| | | + addOperation.getErrorMessage() |
| | | + "> equals <" |
| | | + errorMessage + ">"); |
| | | debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId() |
| | | + addOperation.getErrorMessage() + ">"); |
| | | |
| | | } |
| | | else |
| | | { |
| | | waitTaskState(taskEntry, TaskState.RUNNING, null); |
| | | } |
| | | |
| | | // Entry will be removed at the end of the test |
| | | entryList.addLast(taskEntry.getDN()); |
| | | |
| | | debugInfo("AddedTask/" + taskEntry.getDN()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("Exception when adding task:"+ e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private void waitTaskState(Entry taskEntry, TaskState expectedTaskState, |
| | | Message expectedMessage) |
| | | { |
| | | TaskState taskState = null; |
| | | try |
| | | { |
| | | |
| | | SearchFilter filter = |
| | | SearchFilter.createFilterFromString("(objectclass=*)"); |
| | | Entry resultEntry = null; |
| | | do |
| | | { |
| | | InternalSearchOperation searchOperation = |
| | | connection.processSearch(taskEntry.getDN(), |
| | | SearchScope.BASE_OBJECT, |
| | | filter); |
| | | try |
| | | { |
| | | resultEntry = searchOperation.getSearchEntries().getFirst(); |
| | | } catch (Exception e) |
| | | { |
| | | fail("Task entry was not returned from the search."); |
| | | continue; |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Check that the task state is as expected. |
| | | AttributeType taskStateType = |
| | | DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); |
| | | String stateString = |
| | | resultEntry.getAttributeValue(taskStateType, |
| | | DirectoryStringSyntax.DECODER); |
| | | taskState = TaskState.fromString(stateString); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("Exception"+ e.getMessage()+e.getStackTrace()); |
| | | } |
| | | Thread.sleep(500); |
| | | } |
| | | while ((taskState != expectedTaskState) && |
| | | (taskState != TaskState.STOPPED_BY_ERROR)); |
| | | |
| | | // Check that the task contains some log messages. |
| | | AttributeType logMessagesType = DirectoryServer.getAttributeType( |
| | | ATTR_TASK_LOG_MESSAGES.toLowerCase()); |
| | | ArrayList<String> logMessages = new ArrayList<String>(); |
| | | resultEntry.getAttributeValues(logMessagesType, |
| | | DirectoryStringSyntax.DECODER, |
| | | logMessages); |
| | | |
| | | if ((taskState != TaskState.COMPLETED_SUCCESSFULLY) |
| | | && (taskState != TaskState.RUNNING)) |
| | | { |
| | | if (logMessages.size() == 0) |
| | | { |
| | | fail("No log messages were written to the task entry on a failed task"); |
| | | } |
| | | else |
| | | { |
| | | if (expectedMessage != null) |
| | | { |
| | | debugInfo(logMessages.get(0)); |
| | | debugInfo(expectedMessage.toString()); |
| | | assertTrue(logMessages.get(0).indexOf( |
| | | expectedMessage.toString())>0); |
| | | } |
| | | } |
| | | } |
| | | |
| | | assertEquals(taskState, expectedTaskState, "Task State:" + taskState + |
| | | " Expected task state:" + expectedTaskState); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add to the current DB the entries necessary to the test |
| | | */ |
| | | private void addTestEntriesToDB(String[] ldifEntries) |
| | | { |
| | | try |
| | | { |
| | | // Change config of DS to launch the total update task |
| | | InternalClientConnection connection = |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | | for (String ldifEntry : ldifEntries) |
| | | { |
| | | Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry); |
| | | AddOperationBasis addOp = new AddOperationBasis( |
| | | connection, |
| | | InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), |
| | | null, |
| | | entry.getDN(), |
| | | entry.getObjectClasses(), |
| | | entry.getUserAttributes(), |
| | | entry.getOperationalAttributes()); |
| | | addOp.setInternalOperation(true); |
| | | addOp.run(); |
| | | if (addOp.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | debugInfo("addEntry: Failed" + addOp.getResultCode()); |
| | | } |
| | | // They will be removed at the end of the test |
| | | entryList.addLast(entry.getDN()); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * Creates entries necessary to the test. |
| | | */ |
| | | private String[] newLDIFEntries() |
| | | { |
| | | String[] entries = |
| | | { |
| | | "dn: " + baseDn + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n" |
| | | + "entryUUID: 21111111-1111-1111-1111-111111111111\n" |
| | | + "\n", |
| | | "dn: ou=People," + baseDn + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: organizationalUnit\n" |
| | | + "entryUUID: 21111111-1111-1111-1111-111111111112\n" |
| | | + "\n", |
| | | "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n" |
| | | + "objectclass: top\n" |
| | | + "objectclass: person\n" |
| | | + "objectclass: organizationalPerson\n" |
| | | + "objectclass: inetOrgPerson\n" |
| | | + "cn: Fiona Jensen\n" |
| | | + "sn: Jensen\n" |
| | | + "uid: fiona\n" |
| | | + "telephonenumber: +1 408 555 1212\n" |
| | | + "entryUUID: 21111111-1111-1111-1111-111111111113\n" |
| | | + "\n", |
| | | "dn: cn=Robert Langman,ou=people," + baseDn + "\n" |
| | | + "objectclass: top\n" |
| | | + "objectclass: person\n" |
| | | + "objectclass: organizationalPerson\n" |
| | | + "objectclass: inetOrgPerson\n" |
| | | + "cn: Robert Langman\n" |
| | | + "sn: Langman\n" |
| | | + "uid: robert\n" |
| | | + "telephonenumber: +1 408 555 1213\n" |
| | | + "entryUUID: 21111111-1111-1111-1111-111111111114\n" |
| | | + "\n" |
| | | }; |
| | | |
| | | return entries; |
| | | } |
| | | |
| | | private int receiveImport(ReplicationBroker broker, short serverID, |
| | | String[] updatedEntries) |
| | | { |
| | | // Expect the broker to receive the entries |
| | | ReplicationMessage msg; |
| | | short entriesReceived = 0; |
| | | while (true) |
| | | { |
| | | try |
| | | { |
| | | debugInfo("Broker " + serverID + " Wait for entry or done msg"); |
| | | msg = broker.receive(); |
| | | |
| | | if (msg == null) |
| | | break; |
| | | |
| | | if (msg instanceof InitializeTargetMessage) |
| | | { |
| | | debugInfo("Broker " + serverID + " receives InitializeTargetMessage "); |
| | | entriesReceived = 0; |
| | | } |
| | | else if (msg instanceof EntryMessage) |
| | | { |
| | | EntryMessage em = (EntryMessage)msg; |
| | | debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes())); |
| | | entriesReceived++; |
| | | } |
| | | else if (msg instanceof DoneMessage) |
| | | { |
| | | debugInfo("Broker " + serverID + " receives done "); |
| | | break; |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | { |
| | | ErrorMessage em = (ErrorMessage)msg; |
| | | debugInfo("Broker " + serverID + " receives ERROR " |
| | | + em.toString()); |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | debugInfo("Broker " + serverID + " receives and trashes " + msg); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugInfo("receiveUpdatedEntries" + stackTraceToSingleLineString(e)); |
| | | } |
| | | } |
| | | |
| | | if (updatedEntries != null) |
| | | { |
| | | assertTrue(entriesReceived == updatedEntries.length, |
| | | " Received entries("+entriesReceived + |
| | | ") == Expected entries("+updatedEntries.length+")"); |
| | | } |
| | | |
| | | return entriesReceived; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new replicationServer. |
| | | * @param changelogId The serverID of the replicationServer to create. |
| | | * @param all Specifies whether to coonect the created replication |
| | | * server to the other replication servers in the test. |
| | | * @return The new created replication server. |
| | | */ |
| | | private ReplicationServer createReplicationServer(short changelogId, |
| | | boolean all, String suffix) |
| | | { |
| | | SortedSet<String> servers = null; |
| | | servers = new TreeSet<String>(); |
| | | try |
| | | { |
| | | if (changelogId==changelog1ID) |
| | | { |
| | | if (replServer1!=null) |
| | | return replServer1; |
| | | } |
| | | else if (changelogId==changelog2ID) |
| | | { |
| | | if (replServer2!=null) |
| | | return replServer2; |
| | | } |
| | | else if (changelogId==changelog3ID) |
| | | { |
| | | if (replServer3!=null) |
| | | return replServer3; |
| | | } |
| | | if (all) |
| | | { |
| | | servers.add("localhost:" + getChangelogPort(changelog1ID)); |
| | | servers.add("localhost:" + getChangelogPort(changelog2ID)); |
| | | servers.add("localhost:" + getChangelogPort(changelog3ID)); |
| | | } |
| | | int chPort = getChangelogPort(changelogId); |
| | | String chDir = "genid"+changelogId+suffix+"Db"; |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100, |
| | | servers); |
| | | ReplicationServer replicationServer = new ReplicationServer(conf); |
| | | Thread.sleep(1000); |
| | | |
| | | return replicationServer; |
| | | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | fail("createChangelog" + stackTraceToSingleLineString(e)); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Create a synchronized suffix in the current server providing the |
| | | * replication Server ID. |
| | | * @param changelogID |
| | | */ |
| | | private void connectToReplServer(short changelogID) |
| | | { |
| | | // Connect DS to the replicationServer |
| | | try |
| | | { |
| | | // suffix synchronized |
| | | String synchroServerStringDN = synchroPluginStringDN; |
| | | String synchroServerLdif = |
| | | "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-provider-config\n" |
| | | + "cn: " + baseSnStr + "\n" |
| | | + "ds-cfg-synchronization-dn: " + baseDnStr + "\n" |
| | | + "ds-cfg-changelog-server: localhost:" |
| | | + getChangelogPort(changelogID)+"\n" |
| | | + "ds-cfg-directory-server-id: " + server1ID + "\n" |
| | | + "ds-cfg-receive-status: true\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE; |
| | | |
| | | if (synchroServerEntry == null) |
| | | { |
| | | synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the synchronized server"); |
| | | configEntryList.add(synchroServerEntry.getDN()); |
| | | |
| | | replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | |
| | | } |
| | | if (replDomain != null) |
| | | { |
| | | debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning()); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugInfo("connectToReplServer", e); |
| | | fail("connectToReplServer", e); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * Disconnect DS from the replicationServer |
| | | */ |
| | | private void disconnectFromReplServer(short changelogID) |
| | | { |
| | | try |
| | | { |
| | | // suffix synchronized |
| | | String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," + |
| | | synchroPluginStringDN; |
| | | if (synchroServerEntry != null) |
| | | { |
| | | DN synchroServerDN = DN.decode(synchroServerStringDN); |
| | | DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null); |
| | | assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null, |
| | | "Unable to delete the synchronized domain"); |
| | | synchroServerEntry = null; |
| | | |
| | | configEntryList.remove(configEntryList.indexOf(synchroServerDN)); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("disconnectFromReplServer", e); |
| | | } |
| | | } |
| | | |
| | | private int getChangelogPort(short changelogID) |
| | | { |
| | | if (replServerPort[changelogID] == 0) |
| | | { |
| | | try |
| | | { |
| | | // Find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | replServerPort[changelogID] = socket.getLocalPort(); |
| | | socket.close(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("Cannot retrieve a free port for replication server." |
| | | + e.getMessage()); |
| | | } |
| | | } |
| | | return replServerPort[changelogID]; |
| | | } |
| | | |
| | | protected static final String REPLICATION_GENERATION_ID = |
| | | "ds-sync-generation-id"; |
| | | |
| | | private long readGenId() |
| | | { |
| | | long genId=-1; |
| | | try |
| | | { |
| | | Entry resultEntry = getEntry(baseDn, 1000, true); |
| | | if (resultEntry==null) |
| | | { |
| | | debugInfo("Entry not found <" + baseDn + ">"); |
| | | } |
| | | else |
| | | { |
| | | debugInfo("Entry found <" + baseDn + ">"); |
| | | |
| | | AttributeType synchronizationGenIDType = |
| | | DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); |
| | | List<Attribute> attrs = |
| | | resultEntry.getAttribute(synchronizationGenIDType); |
| | | if (attrs != null) |
| | | { |
| | | Attribute attr = attrs.get(0); |
| | | LinkedHashSet<AttributeValue> values = attr.getValues(); |
| | | if (values.size() == 1) |
| | | { |
| | | genId = Long.decode(values.iterator().next().getStringValue()); |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("Exception raised in readGenId", e); |
| | | } |
| | | return genId; |
| | | } |
| | | |
| | | private Entry getTaskImport() |
| | | { |
| | | Entry task = null; |
| | | |
| | | try |
| | | { |
| | | // Create a temporary test LDIF file. |
| | | ldifFile = File.createTempFile("import-test", ".ldif"); |
| | | String resourcePath = DirectoryServer.getServerRoot() + File.separator + |
| | | "config" + File.separator + "MakeLDIF"; |
| | | LdifFileWriter.makeLdif(ldifFile.getPath(), resourcePath, template); |
| | | // Create a temporary rejects file. |
| | | rejectFile = File.createTempFile("import-test-rejects", ".ldif"); |
| | | |
| | | task = TestCaseUtils.makeEntry( |
| | | "dn: ds-task-id=" + UUID.randomUUID() + |
| | | ",cn=Scheduled Tasks,cn=Tasks", |
| | | "objectclass: top", |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-import", |
| | | "ds-task-class-name: org.opends.server.tasks.ImportTask", |
| | | "ds-task-import-backend-id: userRoot", |
| | | "ds-task-import-ldif-file: " + ldifFile.getPath(), |
| | | "ds-task-import-reject-file: " + rejectFile.getPath(), |
| | | "ds-task-import-overwrite-rejects: TRUE", |
| | | "ds-task-import-exclude-attribute: description" |
| | | ); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | } |
| | | return task; |
| | | } |
| | | |
| | | private String createEntry(UUID uid) |
| | | { |
| | | String user2dn = "uid=user"+uid+",ou=People," + baseDnStr; |
| | | return new String( |
| | | "dn: "+ user2dn + "\n" |
| | | + "objectClass: top\n" + "objectClass: person\n" |
| | | + "objectClass: organizationalPerson\n" |
| | | + "objectClass: inetOrgPerson\n" + "uid: user.1\n" |
| | | + "homePhone: 951-245-7634\n" |
| | | + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" |
| | | + "mobile: 027-085-0537\n" |
| | | + "postalAddress: Aaccf Amar$17984 Thirteenth Street" |
| | | + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" |
| | | + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n" |
| | | + "street: 17984 Thirteenth Street\n" |
| | | + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n" |
| | | + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n" |
| | | + "userPassword: password\n" + "initials: AA\n"); |
| | | } |
| | | |
| | | static protected ReplicationMessage createAddMsg() |
| | | { |
| | | Entry personWithUUIDEntry = null; |
| | | String user1entryUUID; |
| | | String baseUUID = null; |
| | | String user1dn; |
| | | |
| | | /* |
| | | * Create a Change number generator to generate new changenumbers |
| | | * when we need to send operation messages to the replicationServer. |
| | | */ |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0); |
| | | |
| | | user1entryUUID = "33333333-3333-3333-3333-333333333333"; |
| | | user1dn = "uid=user1,ou=People," + baseDnStr; |
| | | String entryWithUUIDldif = "dn: "+ user1dn + "\n" |
| | | + "objectClass: top\n" + "objectClass: person\n" |
| | | + "objectClass: organizationalPerson\n" |
| | | + "objectClass: inetOrgPerson\n" + "uid: user.1\n" |
| | | + "homePhone: 951-245-7634\n" |
| | | + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" |
| | | + "mobile: 027-085-0537\n" |
| | | + "postalAddress: Aaccf Amar$17984 Thirteenth Street" |
| | | + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" |
| | | + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" |
| | | + "street: 17984 Thirteenth Street\n" |
| | | + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" |
| | | + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" |
| | | + "userPassword: password\n" + "initials: AA\n" |
| | | + "entryUUID: " + user1entryUUID + "\n"; |
| | | |
| | | try |
| | | { |
| | | personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail(e.getMessage()); |
| | | } |
| | | |
| | | // Create and publish an update message to add an entry. |
| | | AddMsg addMsg = new AddMsg(gen.newChangeNumber(), |
| | | personWithUUIDEntry.getDN().toString(), |
| | | user1entryUUID, |
| | | baseUUID, |
| | | personWithUUIDEntry.getObjectClassAttribute(), |
| | | personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>()); |
| | | |
| | | return addMsg; |
| | | } |
| | | |
| | | /** |
| | | * SingleRS tests basic features of generationID |
| | | * with one single Replication Server. |
| | | * |
| | | * @throws Exception |
| | | */ |
| | | @Test(enabled=true) |
| | | public void testSingleRS() throws Exception |
| | | { |
| | | String testCase = "testSingleRS"; |
| | | debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled()); |
| | | |
| | | debugInfo(testCase + " Clearing DS1 backend"); |
| | | ReplicationDomain.clearJEBackend(false, |
| | | "userRoot", |
| | | baseDn.toNormalizedString()); |
| | | |
| | | try |
| | | { |
| | | long rgenId; |
| | | long genId; |
| | | |
| | | replServer1 = createReplicationServer(changelog1ID, false, testCase); |
| | | assertEquals(replServer1.getGenerationId(baseDn), -1); |
| | | |
| | | /* |
| | | * Test : empty replicated backend |
| | | * Check : nothing is broken - no generationId generated |
| | | */ |
| | | |
| | | // Connect DS to RS with no data |
| | | // Read generationId - should be not retrievable since no entry |
| | | debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")"); |
| | | connectToReplServer(changelog1ID); |
| | | Thread.sleep(1000); |
| | | |
| | | debugInfo(testCase + " Expect genId attribute to be not retrievable"); |
| | | genId = readGenId(); |
| | | assertEquals(genId,-1); |
| | | |
| | | debugInfo(testCase + " Expect genId to be set in memory on the replication " + |
| | | " server side even if not wrote on disk/db since no change occured."); |
| | | rgenId = replServer1.getGenerationId(baseDn); |
| | | assertEquals(rgenId, 3211313L); |
| | | |
| | | debugInfo(testCase + " Disconnecting DS1 from replServer1(" + changelog1ID + ")"); |
| | | disconnectFromReplServer(changelog1ID); |
| | | |
| | | /* |
| | | * Test : non empty replicated backend |
| | | * Check : generationId correctly generated |
| | | */ |
| | | |
| | | // Now disconnect - create entries and reconnect |
| | | // Test that generation has been added to the data. |
| | | debugInfo(testCase + " add test entries to DS"); |
| | | this.addTestEntriesToDB(updatedEntries); |
| | | connectToReplServer(changelog1ID); |
| | | |
| | | // Test that the generationId is written in the DB in the |
| | | // root entry on the replica side |
| | | genId = readGenId(); |
| | | assertTrue(genId != -1); |
| | | assertTrue(genId != 3211313L); |
| | | |
| | | // Test that the generationId is set on the replication server side |
| | | rgenId = replServer1.getGenerationId(baseDn); |
| | | assertEquals(genId, rgenId); |
| | | |
| | | /* |
| | | * Test : Connection from 2nd broker with a different generationId |
| | | * Check: We should receive an error message |
| | | */ |
| | | |
| | | try |
| | | { |
| | | broker2 = openReplicationSession(baseDn, |
| | | server2ID, 100, getChangelogPort(changelog1ID), |
| | | 1000, !emptyOldChanges, genId+1); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | try |
| | | { |
| | | ReplicationMessage msg = broker2.receive(); |
| | | if (!(msg instanceof ErrorMessage)) |
| | | { |
| | | fail("Broker connection is expected to receive an ErrorMessage." |
| | | + msg); |
| | | } |
| | | ErrorMessage emsg = (ErrorMessage)msg; |
| | | debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); |
| | | } |
| | | catch(SocketTimeoutException se) |
| | | { |
| | | fail("Broker is expected to receive an ErrorMessage."); |
| | | } |
| | | |
| | | /* |
| | | * Test : Connect with same generationId |
| | | * Check : Must be accepted. |
| | | */ |
| | | try |
| | | { |
| | | broker3 = openReplicationSession(baseDn, |
| | | server3ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | /* |
| | | * Test : generationID persistence in Replication server |
| | | * Shutdown/Restart Replication Server and redo connections |
| | | * with valid and invalid generationId |
| | | * Check : same expected connections results |
| | | */ |
| | | |
| | | // The changes from broker2 should be ignored |
| | | broker2.publish(createAddMsg()); |
| | | |
| | | try |
| | | { |
| | | broker3.receive(); |
| | | fail("No update message is supposed to be received here."); |
| | | } |
| | | catch(SocketTimeoutException e) |
| | | { |
| | | // This is the expected result |
| | | } |
| | | |
| | | // Now create a change that must be replicated |
| | | String ent1[] = { createEntry(UUID.randomUUID()) }; |
| | | this.addTestEntriesToDB(ent1); |
| | | |
| | | try |
| | | { |
| | | ReplicationMessage msg = broker3.receive(); |
| | | debugInfo("Broker 3 received expected update msg" + msg); |
| | | } |
| | | catch(SocketTimeoutException e) |
| | | { |
| | | fail("Update message is supposed to be received."); |
| | | } |
| | | |
| | | long genIdBeforeShut = replServer1.getGenerationId(baseDn); |
| | | |
| | | debugInfo("Shutdown replServer1"); |
| | | broker2.stop(); |
| | | broker2 = null; |
| | | broker3.stop(); |
| | | broker3 = null; |
| | | replServer1.shutdown(); |
| | | replServer1 = null; |
| | | |
| | | debugInfo("Create again replServer1"); |
| | | replServer1 = createReplicationServer(changelog1ID, false, testCase); |
| | | debugInfo("Delay to allow DS to reconnect to replServer1"); |
| | | Thread.sleep(200); |
| | | |
| | | long genIdAfterRestart = replServer1.getGenerationId(baseDn); |
| | | debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart); |
| | | assertTrue(replServer1!=null, "Replication server creation failed."); |
| | | assertTrue(genIdBeforeShut == genIdAfterRestart, |
| | | "generationId is expected to have the same value after replServer1 restart"); |
| | | |
| | | try |
| | | { |
| | | debugInfo("Create again broker2"); |
| | | broker2 = openReplicationSession(baseDn, |
| | | server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); |
| | | assertTrue(broker2.isConnected(), "Broker2 failed to connect to replication server"); |
| | | |
| | | debugInfo("Create again broker3"); |
| | | broker3 = openReplicationSession(baseDn, |
| | | server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); |
| | | assertTrue(broker3.isConnected(), "Broker3 failed to connect to replication server"); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | /* |
| | | * |
| | | * FIXME Should clearJEBackend() regenerate generationId and do a start |
| | | * against ReplicationServer ? |
| | | */ |
| | | |
| | | /* |
| | | * Test: Reset the replication server in order to allow new data set. |
| | | */ |
| | | |
| | | Entry taskReset = TestCaseUtils.makeEntry( |
| | | "dn: ds-task-id=resetgenid"+genId+ UUID.randomUUID() + |
| | | ",cn=Scheduled Tasks,cn=Tasks", |
| | | "objectclass: top", |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-reset-generation-id", |
| | | "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask", |
| | | "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr); |
| | | |
| | | debugInfo("Reset generationId"); |
| | | addTask(taskReset, ResultCode.SUCCESS, null); |
| | | waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); |
| | | Thread.sleep(200); |
| | | |
| | | // TODO: Test that replication server db has been cleared |
| | | |
| | | assertEquals(replServer1.getGenerationId(baseDn), |
| | | -1, "Expected genId to be reset in replServer1"); |
| | | |
| | | ReplicationMessage rcvmsg = broker2.receive(); |
| | | if (!(rcvmsg instanceof ErrorMessage)) |
| | | { |
| | | fail("Broker2 is expected to receive an ErrorMessage " + |
| | | " to signal degradation due to reset" + rcvmsg); |
| | | } |
| | | ErrorMessage emsg = (ErrorMessage)rcvmsg; |
| | | debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); |
| | | |
| | | rcvmsg = broker3.receive(); |
| | | if (!(rcvmsg instanceof ErrorMessage)) |
| | | { |
| | | fail("Broker3 is expected to receive an ErrorMessage " + |
| | | " to signal degradation due to reset" + rcvmsg); |
| | | } |
| | | emsg = (ErrorMessage)rcvmsg; |
| | | debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); |
| | | |
| | | rgenId = replServer1.getGenerationId(baseDn); |
| | | assertTrue(rgenId==-1,"Expecting that genId has been reset in replServer1: rgenId="+rgenId); |
| | | |
| | | assertTrue(replServer1.getReplicationCache(baseDn, false). |
| | | isDegradedDueToGenerationId(server1ID), |
| | | "Expecting that DS is degraded since domain genId has been reset"); |
| | | |
| | | assertTrue(replServer1.getReplicationCache(baseDn, false). |
| | | isDegradedDueToGenerationId(server2ID), |
| | | "Expecting that broker2 is degraded since domain genId has been reset"); |
| | | assertTrue(replServer1.getReplicationCache(baseDn, false). |
| | | isDegradedDueToGenerationId(server3ID), |
| | | "Expecting that broker3 is degraded since domain genId has been reset"); |
| | | |
| | | |
| | | // Now create a change that normally would be replicated |
| | | // but will not be replicated here since DS and brokers are degraded |
| | | String[] ent2 = { createEntry(UUID.randomUUID()) }; |
| | | this.addTestEntriesToDB(ent2); |
| | | |
| | | try |
| | | { |
| | | ReplicationMessage msg = broker2.receive(); |
| | | fail("No update message is supposed to be received by degraded broker2" + msg); |
| | | } catch(SocketTimeoutException e) { /* expected */ } |
| | | |
| | | try |
| | | { |
| | | ReplicationMessage msg = broker3.receive(); |
| | | fail("No update message is supposed to be received by degraded broker3"+ msg); |
| | | } catch(SocketTimeoutException e) { /* expected */ } |
| | | |
| | | debugInfo("broker2 is publishing a change, " + |
| | | "replServer1 expected to ignore this change."); |
| | | broker2.publish(createAddMsg()); |
| | | try |
| | | { |
| | | ReplicationMessage msg = broker3.receive(); |
| | | fail("No update message is supposed to be received by degraded broker3"+ msg); |
| | | } catch(SocketTimeoutException e) { /* expected */ } |
| | | |
| | | |
| | | debugInfo("Launch an on-line import on DS."); |
| | | genId=-1; |
| | | Entry importTask = getTaskImport(); |
| | | addTask(importTask, ResultCode.SUCCESS, null); |
| | | waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null); |
| | | Thread.sleep(500); |
| | | |
| | | debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import."); |
| | | genId = readGenId(); |
| | | assertTrue(genId != -1, "DS is expected to have a new genID computed " + |
| | | " after on-line import but genId=" + genId); |
| | | |
| | | rgenId = replServer1.getGenerationId(baseDn); |
| | | assertEquals(genId, rgenId, "DS and replServer are expected to have same genId."); |
| | | |
| | | // In S1 launch the total update to initialize S2 |
| | | addTask(taskInitRemoteS2, ResultCode.SUCCESS, null); |
| | | |
| | | // S2 should be re-initialized and have a new valid genId |
| | | int receivedEntriesNb = this.receiveImport(broker2, server2ID, null); |
| | | debugInfo("broker2 has been initialized from DS with #entries=" + receivedEntriesNb); |
| | | |
| | | debugInfo("Adding reset task to DS."); |
| | | taskReset = TestCaseUtils.makeEntry( |
| | | "dn: ds-task-id=resetgenid"+ UUID.randomUUID() + |
| | | ",cn=Scheduled Tasks,cn=Tasks", |
| | | "objectclass: top", |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-reset-generation-id", |
| | | "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask", |
| | | "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr); |
| | | |
| | | addTask(taskReset, ResultCode.SUCCESS, null); |
| | | waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); |
| | | Thread.sleep(200); |
| | | |
| | | debugInfo("Verifying that replServer1 has been reset."); |
| | | assertEquals(replServer1.getGenerationId(baseDn), -1); |
| | | |
| | | debugInfo("Disconnect DS from replServer1 (required in order to DEL entries)."); |
| | | disconnectFromReplServer(changelog1ID); |
| | | |
| | | postTest(); |
| | | |
| | | debugInfo(testCase + " Clearing DS backend"); |
| | | ReplicationDomain.clearJEBackend(false, |
| | | replDomain.getBackend().getBackendID(), |
| | | baseDn.toNormalizedString()); |
| | | |
| | | // At this moment, root entry of the domain has been removed so |
| | | // genId is no more in the database ... but it has still the old |
| | | // value in memory. |
| | | int found = testEntriesInDb(); |
| | | replDomain.loadGenerationId(); |
| | | |
| | | debugInfo("Successfully ending " + testCase); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail(testCase + " Exception:"+ e.getMessage() + " " + |
| | | stackTraceToSingleLineString(e)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | /** |
| | | * SingleRS tests basic features of generationID |
| | | * with more than one Replication Server. |
| | | * The following test focus on: |
| | | * - genId checking accross multiple starting RS (replication servers) |
| | | * - genId setting propagation from one RS to the others |
| | | * - genId reset propagation from one RS to the others |
| | | */ |
| | | @Test(enabled=true) |
| | | public void testMultiRS() throws Exception |
| | | { |
| | | String testCase = "testMultiRS"; |
| | | long genId; |
| | | |
| | | debugInfo("Starting " + testCase); |
| | | |
| | | ReplicationDomain.clearJEBackend(false, |
| | | "userRoot", |
| | | baseDn.toNormalizedString()); |
| | | |
| | | debugInfo ("Creating 3 RS"); |
| | | replServer1 = createReplicationServer(changelog1ID, true, testCase); |
| | | replServer2 = createReplicationServer(changelog2ID, true, testCase); |
| | | replServer3 = createReplicationServer(changelog3ID, true, testCase); |
| | | Thread.sleep(500); |
| | | |
| | | debugInfo("Connecting DS to replServer1"); |
| | | connectToReplServer(changelog1ID); |
| | | Thread.sleep(1500); |
| | | |
| | | debugInfo("Expect genId are set in all replServers."); |
| | | assertEquals(replServer1.getGenerationId(baseDn), 3211313L, " in replServer1"); |
| | | assertEquals(replServer2.getGenerationId(baseDn), 3211313L, " in replServer2"); |
| | | assertEquals(replServer3.getGenerationId(baseDn), 3211313L, " in replServer3"); |
| | | |
| | | debugInfo("Disconnect DS from replServer1."); |
| | | disconnectFromReplServer(changelog1ID); |
| | | Thread.sleep(1000); |
| | | |
| | | debugInfo("Expect genId to be unset(-1) in all servers since no server is " + |
| | | " connected and no change ever occured"); |
| | | assertEquals(replServer1.getGenerationId(baseDn), -1, " in replServer1"); |
| | | assertEquals(replServer2.getGenerationId(baseDn), -1, " in replServer2"); |
| | | assertEquals(replServer3.getGenerationId(baseDn), -1, " in replServer3"); |
| | | |
| | | debugInfo("Add entries to DS"); |
| | | this.addTestEntriesToDB(updatedEntries); |
| | | |
| | | debugInfo("Connecting DS to replServer2"); |
| | | connectToReplServer(changelog2ID); |
| | | Thread.sleep(1000); |
| | | |
| | | debugInfo("Expect genIds to be set in all servers based on the added entries."); |
| | | genId = readGenId(); |
| | | assertTrue(genId != -1); |
| | | assertEquals(replServer1.getGenerationId(baseDn), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDn), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDn), genId); |
| | | |
| | | debugInfo("Connecting broker2 to replServer3 with a good genId"); |
| | | try |
| | | { |
| | | broker2 = openReplicationSession(baseDn, |
| | | server2ID, 100, getChangelogPort(changelog3ID), |
| | | 1000, !emptyOldChanges, genId); |
| | | Thread.sleep(1000); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | debugInfo("Expecting that broker2 is not degraded since it has a correct genId"); |
| | | assertTrue(!replServer1.getReplicationCache(baseDn, false). |
| | | isDegradedDueToGenerationId(server2ID)); |
| | | |
| | | debugInfo("Disconnecting DS from replServer1"); |
| | | disconnectFromReplServer(changelog1ID); |
| | | |
| | | debugInfo("Expect all genIds to keep their value since broker2 is still connected."); |
| | | assertEquals(replServer1.getGenerationId(baseDn), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDn), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDn), genId); |
| | | |
| | | debugInfo("Connecting broker2 to replServer1 with a bad genId"); |
| | | try |
| | | { |
| | | long badgenId=1; |
| | | broker3 = openReplicationSession(baseDn, |
| | | server3ID, 100, getChangelogPort(changelog1ID), |
| | | 1000, !emptyOldChanges, badgenId); |
| | | Thread.sleep(1000); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | debugInfo("Expecting that broker3 is degraded since it has a bad genId"); |
| | | assertTrue(replServer1.getReplicationCache(baseDn, false). |
| | | isDegradedDueToGenerationId(server3ID)); |
| | | |
| | | int found = testEntriesInDb(); |
| | | assertEquals(found, updatedEntries.length, |
| | | " Entries present in DB :" + found + |
| | | " Expected entries :" + updatedEntries.length); |
| | | |
| | | debugInfo("Connecting DS to replServer1."); |
| | | connectToReplServer(changelog1ID); |
| | | Thread.sleep(1000); |
| | | |
| | | |
| | | debugInfo("Adding reset task to DS."); |
| | | Entry taskReset = TestCaseUtils.makeEntry( |
| | | "dn: ds-task-id=resetgenid"+ UUID.randomUUID() + |
| | | ",cn=Scheduled Tasks,cn=Tasks", |
| | | "objectclass: top", |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-reset-generation-id", |
| | | "ds-task-class-name: org.opends.server.tasks.SetGenerationIdTask", |
| | | "ds-task-reset-generation-id-domain-base-dn: " + baseDnStr); |
| | | addTask(taskReset, ResultCode.SUCCESS, null); |
| | | waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); |
| | | Thread.sleep(500); |
| | | |
| | | debugInfo("Verifying that all replservers genIds have been reset."); |
| | | genId = readGenId(); |
| | | assertEquals(replServer1.getGenerationId(baseDn), -1); |
| | | assertEquals(replServer2.getGenerationId(baseDn), -1); |
| | | assertEquals(replServer3.getGenerationId(baseDn), -1); |
| | | |
| | | debugInfo("Disconnect DS from replServer1 (required in order to DEL entries)."); |
| | | disconnectFromReplServer(changelog1ID); |
| | | |
| | | debugInfo("Cleaning entries"); |
| | | postTest(); |
| | | |
| | | debugInfo("Successfully ending " + testCase); |
| | | } |
| | | |
| | | /** |
| | | * Disconnect broker and remove entries from the local DB |
| | | * @throws Exception |
| | | */ |
| | | protected void postTest() |
| | | { |
| | | debugInfo("Post test cleaning."); |
| | | |
| | | // Clean brokers |
| | | if (broker2 != null) |
| | | broker2.stop(); |
| | | broker2 = null; |
| | | if (broker3 != null) |
| | | broker3.stop(); |
| | | broker3 = null; |
| | | |
| | | if (replServer1 != null) |
| | | replServer1.shutdown(); |
| | | if (replServer2 != null) |
| | | replServer2.shutdown(); |
| | | if (replServer2 != null) |
| | | replServer2.shutdown(); |
| | | replServer1 = null; |
| | | replServer2 = null; |
| | | replServer3 = null; |
| | | |
| | | super.cleanRealEntries(); |
| | | |
| | | try |
| | | { |
| | | ReplicationDomain.clearJEBackend(false, |
| | | replDomain.getBackend().getBackendID(), |
| | | baseDn.toNormalizedString()); |
| | | |
| | | // At this moment, root entry of the domain has been removed so |
| | | // genId is no more in the database ... but it has still the old |
| | | // value in memory. |
| | | testEntriesInDb(); |
| | | replDomain.loadGenerationId(); |
| | | } |
| | | catch (Exception e) {} |
| | | } |
| | | } |
| | |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.UUID; |
| | | import java.net.SocketTimeoutException; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.task.TaskState; |
| | |
| | | boolean ssShutdownRequested = false; |
| | | protected String[] updatedEntries; |
| | | boolean externalDS = false; |
| | | private static final short server1ID = 11; |
| | | private static final short server2ID = 21; |
| | | private static final short server3ID = 31; |
| | | private static final short changelog1ID = 1; |
| | | private static final short changelog2ID = 2; |
| | | private static final short changelog3ID = 3; |
| | | private static final short server1ID = 1; |
| | | private static final short server2ID = 2; |
| | | private static final short server3ID = 3; |
| | | private static final short changelog1ID = 8; |
| | | private static final short changelog2ID = 9; |
| | | private static final short changelog3ID = 10; |
| | | |
| | | private static int[] replServerPort = new int[4]; |
| | | private static int[] replServerPort = new int[20]; |
| | | |
| | | private DN baseDn; |
| | | ReplicationBroker server2 = null; |
| | |
| | | ReplicationServer changelog2 = null; |
| | | ReplicationServer changelog3 = null; |
| | | boolean emptyOldChanges = true; |
| | | ReplicationDomain sd = null; |
| | | ReplicationDomain replDomain = null; |
| | | |
| | | private void log(String s) |
| | | { |
| | |
| | | |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | |
| | | baseDn = DN.decode("dc=example,dc=com"); |
| | | |
| | | updatedEntries = newLDIFEntries(); |
| | |
| | | servers.add("localhost:" + getChangelogPort(changelog2ID)); |
| | | servers.add("localhost:" + getChangelogPort(changelog3ID)); |
| | | |
| | | int chPort = getChangelogPort(changelogId); |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100, |
| | | new ReplServerFakeConfiguration( |
| | | getChangelogPort(changelogId), |
| | | "rsdbdirname" + getChangelogPort(changelogId), |
| | | 0, |
| | | changelogId, |
| | | 0, |
| | | 100, |
| | | servers); |
| | | ReplicationServer replicationServer = new ReplicationServer(conf); |
| | | Thread.sleep(1000); |
| | |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the synchronized server"); |
| | | |
| | | sd = ReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | |
| | | // Clear the backend |
| | | ReplicationDomain.clearJEBackend(false, |
| | | sd.getBackend().getBackendID(), |
| | | replDomain.getBackend().getBackendID(), |
| | | baseDn.toNormalizedString()); |
| | | |
| | | } |
| | | if (sd != null) |
| | | if (replDomain != null) |
| | | { |
| | | log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning()); |
| | | assertTrue(!replDomain.ieRunning(), |
| | | "ReplicationDomain: Import/Export is not expected to be running"); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | |
| | | // Test import result in S1 |
| | | testEntriesInDb(); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending " + testCase); |
| | | } |
| | |
| | | |
| | | receiveUpdatedEntries(server2, server2ID, updatedEntries); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending "+testCase); |
| | | } |
| | |
| | | // Tests that entries have been received by S2 |
| | | receiveUpdatedEntries(server2, server2ID, updatedEntries); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending " + testCase); |
| | | |
| | |
| | | receiveUpdatedEntries(server2, server2ID, updatedEntries); |
| | | receiveUpdatedEntries(server3, server3ID, updatedEntries); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending " + testCase); |
| | | |
| | |
| | | // Test that entries have been imported in S1 |
| | | testEntriesInDb(); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending " + testCase); |
| | | } |
| | |
| | | // Scope containing a serverID absent from the domain |
| | | // createTask(taskInitTargetS2); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending " + testCase); |
| | | } |
| | |
| | | // Scope containing a serverID absent from the domain |
| | | // createTask(taskInitTargetS2); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending " + testCase); |
| | | } |
| | |
| | | |
| | | // Check that the list of connected LDAP servers is correct |
| | | // in each replication servers |
| | | List<String> l1 = changelog1.getReplicationCache(baseDn). |
| | | List<String> l1 = changelog1.getReplicationCache(baseDn, false). |
| | | getConnectedLDAPservers(); |
| | | assertEquals(l1.size(), 1); |
| | | assertEquals(l1.get(0), String.valueOf(server1ID)); |
| | | |
| | | List<String> l2; |
| | | l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers(); |
| | | l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers(); |
| | | assertEquals(l2.size(), 2); |
| | | assertEquals(l2.get(0), String.valueOf(server3ID)); |
| | | assertEquals(l2.get(1), String.valueOf(server2ID)); |
| | | |
| | | List<String> l3; |
| | | l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers(); |
| | | l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers(); |
| | | assertEquals(l3.size(), 0); |
| | | |
| | | // Test updates |
| | | broker3.stop(); |
| | | Thread.sleep(1000); |
| | | l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers(); |
| | | l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers(); |
| | | assertEquals(l2.size(), 1); |
| | | assertEquals(l2.get(0), String.valueOf(server2ID)); |
| | | |
| | |
| | | server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); |
| | | broker2.stop(); |
| | | Thread.sleep(1000); |
| | | l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers(); |
| | | l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers(); |
| | | assertEquals(l2.size(), 1); |
| | | assertEquals(l2.get(0), String.valueOf(server3ID)); |
| | | |
| | |
| | | broker2.stop(); |
| | | broker3.stop(); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | changelog3.shutdown(); |
| | | changelog3 = null; |
| | |
| | | // Tests that entries have been received by S2 |
| | | receiveUpdatedEntries(server2, server2ID, updatedEntries); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | changelog2.shutdown(); |
| | | changelog2 = null; |
| | |
| | | changelog2 = createChangelogServer(changelog2ID); |
| | | Thread.sleep(1000); |
| | | |
| | | changelog3 = createChangelogServer(changelog3ID); |
| | | Thread.sleep(1000); |
| | | |
| | | // Connect DS to the replicationServer 1 |
| | | connectServer1ToChangelog(changelog1ID); |
| | | |
| | | // Put entries in DB |
| | | log(testCase + " Will add entries"); |
| | | addTestEntriesToDB(); |
| | | |
| | | // Connect a broker acting as server 2 to Repl Server 2 |
| | | if (server2 == null) |
| | | { |
| | | log(testCase + " Will connect server 2 to " + changelog2ID); |
| | | server2 = openReplicationSession(DN.decode("dc=example,dc=com"), |
| | | server2ID, 100, getChangelogPort(changelog2ID), |
| | | 1000, emptyOldChanges); |
| | | 1000, emptyOldChanges, changelog1.getGenerationId(baseDn)); |
| | | } |
| | | |
| | | // Connect a broker acting as server 3 to Repl Server 3 |
| | | log(testCase + " Will create replServer " + changelog3ID); |
| | | changelog3 = createChangelogServer(changelog3ID); |
| | | Thread.sleep(500); |
| | | if (server3 == null) |
| | | { |
| | | log(testCase + " Will connect server 3 to " + changelog3ID); |
| | | server3 = openReplicationSession(DN.decode("dc=example,dc=com"), |
| | | server3ID, 100, getChangelogPort(changelog3ID), |
| | | 1000, emptyOldChanges); |
| | | 1000, emptyOldChanges, changelog1.getGenerationId(baseDn)); |
| | | } |
| | | |
| | | Thread.sleep(3000); |
| | | Thread.sleep(500); |
| | | |
| | | // S2 sends init request |
| | | // S3 sends init request |
| | | log(testCase + " server 3 Will send reqinit to " + server1ID); |
| | | InitializeRequestMessage initMsg = |
| | | new InitializeRequestMessage(baseDn, server2ID, server1ID); |
| | | server2.publish(initMsg); |
| | | new InitializeRequestMessage(baseDn, server3ID, server1ID); |
| | | server3.publish(initMsg); |
| | | |
| | | // S2 should receive target, entries & done |
| | | receiveUpdatedEntries(server2, server2ID, updatedEntries); |
| | | // S3 should receive target, entries & done |
| | | log(testCase + " Will verify server 3 has received expected entries"); |
| | | receiveUpdatedEntries(server3, server3ID, updatedEntries); |
| | | |
| | | cleanEntries(); |
| | | while(true) |
| | | { |
| | | try |
| | | { |
| | | ReplicationMessage msg = server3.receive(); |
| | | fail("Receive unexpected message " + msg); |
| | | } |
| | | catch(SocketTimeoutException e) |
| | | { |
| | | // Test is a success |
| | | break; |
| | | } |
| | | } |
| | | |
| | | afterTest(); |
| | | |
| | | changelog3.shutdown(); |
| | | changelog3 = null; |
| | | |
| | | changelog2.shutdown(); |
| | | changelog2 = null; |
| | |
| | | |
| | | addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get()); |
| | | |
| | | if (sd != null) |
| | | if (replDomain != null) |
| | | { |
| | | log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning()); |
| | | assertTrue(!replDomain.ieRunning(), |
| | | "ReplicationDomain: Import/Export is not expected to be running"); |
| | | } |
| | | |
| | | log("Successfully ending "+testCase); |
| | |
| | | waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, |
| | | ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get()); |
| | | |
| | | if (sd != null) |
| | | if (replDomain != null) |
| | | { |
| | | log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning()); |
| | | assertTrue(!replDomain.ieRunning(), |
| | | "ReplicationDomain: Import/Export is not expected to be running"); |
| | | } |
| | | |
| | | log("Successfully ending "+testCase); |
| | |
| | | waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, |
| | | null); |
| | | |
| | | cleanEntries(); |
| | | afterTest(); |
| | | |
| | | log("Successfully ending "+testCase); |
| | | |
| | |
| | | /** |
| | | * Disconnect broker and remove entries from the local DB |
| | | */ |
| | | protected void cleanEntries() |
| | | protected void afterTest() |
| | | { |
| | | |
| | | if (sd != null) |
| | | if (replDomain != null) |
| | | { |
| | | log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning()); |
| | | assertTrue(!replDomain.ieRunning(), |
| | | "ReplicationDomain: Import/Export is not expected to be running"); |
| | | } |
| | | |
| | | // Clean brokers |
| | | if (server2 != null) |
| | | { |
| | | server2.stop(); |
| | | |
| | | TestCaseUtils.sleep(100); // give some time to the broker to disconnect |
| | | // from the replicationServer. |
| | | server2 = null; |
| | | } |
| | | if (server3 != null) |
| | | { |
| | | server3.stop(); |
| | | TestCaseUtils.sleep(100); // give some time to the broker to disconnect |
| | | // from the replicationServer. |
| | | server3 = null; |
| | | } |
| | | super.cleanRealEntries(); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.ReplicationDomain; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | |
| | | ProtocolVersion.setCurrentVersion((short)2); |
| | | |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | new ServerState(), |
| | | baseDn, |
| | | new ServerState(), |
| | | baseDn, |
| | | (short) 13, 0, 0, 0, 0, 1000, 0, |
| | | ReplicationTestCase.getGenerationId(baseDn), |
| | | getReplSessionSecurity()); |
| | | |
| | | |
| | |
| | | // Check broker negociated version |
| | | pversion = broker.getProtocolVersion(); |
| | | assertEquals(pversion, 0); |
| | | } |
| | | |
| | | broker.stop(); |
| | | |
| | | logError(Message.raw( |
| | | Category.SYNC, Severity.INFORMATION, |
| | | "Ending Replication ProtocolWindowTest : protocolVersion")); |
| | | } |
| | | } |
| | |
| | | |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; |
| | | import org.opends.server.config.ConfigException; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.assertNotNull; |
| | | import static org.testng.Assert.assertTrue; |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import java.net.SocketException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | |
| | | import java.util.NoSuchElementException; |
| | | import java.util.concurrent.locks.Lock; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperationBasis; |
| | | import org.opends.server.core.DirectoryServer; |
| | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.PersistentServerState; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.ReplicationDomain; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.schema.DirectoryStringSyntax; |
| | | import org.opends.server.schema.IntegerSyntax; |
| | | import org.opends.server.types.Attribute; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Open a replicationServer session to the local ReplicationServer. |
| | | * Retrieves the domain associated to the baseDn, and the value of the generationId |
| | | * of this domain. If the domain does not exist, returns the default hard-coded\ |
| | | * value of the generationId corresponding to 'no entry'. |
| | | * |
| | | * @param baseDn The baseDn for which we want the generationId |
| | | * @return The value of the generationId. |
| | | */ |
| | | static protected long getGenerationId(DN baseDn) |
| | | { |
| | | // This is the value of the generationId computed by the server when the |
| | | // suffix is empty. |
| | | long genId = 3276850; |
| | | try |
| | | { |
| | | ReplicationDomain replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | genId = replDomain.getGenerationId(); |
| | | } |
| | | catch(Exception e) {} |
| | | return genId; |
| | | } |
| | | |
| | | /** |
| | | * Open a replicationServer session to the local ReplicationServer. |
| | | * The generation is read from the replicationDomain object. If it |
| | | * does not exist, take the 'empty backend' generationID. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, boolean emptyOldChanges) |
| | | throws Exception |
| | | throws Exception, SocketException |
| | | { |
| | | return openReplicationSession(baseDn, serverId, window_size, |
| | | port, timeout, emptyOldChanges, getGenerationId(baseDn)); |
| | | } |
| | | |
| | | /** |
| | | * Open a replicationServer session to the local ReplicationServer |
| | | * providing the generationId. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, boolean emptyOldChanges, |
| | | long generationId) |
| | | throws Exception, SocketException |
| | | { |
| | | ServerState state; |
| | | if (emptyOldChanges) |
| | |
| | | state = new ServerState(); |
| | | |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, |
| | | getReplSessionSecurity()); |
| | | state, baseDn, serverId, 0, 0, 0, 0, |
| | | window_size, 0, generationId, getReplSessionSecurity()); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | | broker.start(servers); |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | ReplicationMessage rMsg = broker.receive(); |
| | | if (rMsg instanceof ErrorMessage) |
| | | { |
| | | ErrorMessage eMsg = (ErrorMessage)rMsg; |
| | | logError(new MessageBuilder( |
| | | "ReplicationTestCase/openReplicationSession ").append( |
| | | " received ErrorMessage when emptying old changes ").append( |
| | | eMsg.getDetails()).toMessage()); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Open a replicationServer session to the local ReplicationServer |
| | | * with a default value generationId. |
| | | * |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, ServerState state) |
| | | throws Exception, SocketException |
| | | { |
| | | return openReplicationSession(baseDn, serverId, window_size, |
| | | port, timeout, state, getGenerationId(baseDn)); |
| | | } |
| | | |
| | | /** |
| | | * Open a new session to the ReplicationServer |
| | | * starting with a given ServerState. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, ServerState state) |
| | | throws Exception |
| | | int port, int timeout, ServerState state, long generationId) |
| | | throws Exception, SocketException |
| | | { |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, generationId, |
| | | getReplSessionSecurity()); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, int maxSendQueue, int maxRcvQueue, |
| | | boolean emptyOldChanges) |
| | | throws Exception |
| | | throws Exception, SocketException |
| | | { |
| | | return openReplicationSession(baseDn, serverId, window_size, |
| | | port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges, |
| | | getGenerationId(baseDn)); |
| | | } |
| | | |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, int maxSendQueue, int maxRcvQueue, |
| | | boolean emptyOldChanges, long generationId) |
| | | throws Exception, SocketException |
| | | { |
| | | ServerState state; |
| | | if (emptyOldChanges) |
| | |
| | | |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | state, baseDn, serverId, maxRcvQueue, 0, |
| | | maxSendQueue, 0, window_size, 0, |
| | | maxSendQueue, 0, window_size, 0, generationId, |
| | | getReplSessionSecurity()); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | ReplicationMessage rMsg = broker.receive(); |
| | | if (rMsg instanceof ErrorMessage) |
| | | { |
| | | ErrorMessage eMsg = (ErrorMessage)rMsg; |
| | | logError(new MessageBuilder( |
| | | "ReplicationTestCase/openReplicationSession ").append( |
| | | " received ErrorMessage when emptying old changes ").append( |
| | | eMsg.getDetails()).toMessage()); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | while (true) |
| | | { |
| | | DN dn = configEntryList.removeLast(); |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | "cleaning config entry " + dn)); |
| | | |
| | | op = new DeleteOperationBasis(connection, InternalClientConnection |
| | | .nextOperationID(), InternalClientConnection.nextMessageID(), null, |
| | | dn); |
| | | op.run(); |
| | | if ((op.getResultCode() != ResultCode.SUCCESS) && |
| | | (op.getResultCode() != ResultCode.NO_SUCH_OBJECT)) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | "ReplicationTestCase/Cleaning config entries" + |
| | | "DEL " + dn + |
| | | " failed " + op.getResultCode().getResultCodeName())); |
| | | } |
| | | } |
| | | } |
| | | catch (NoSuchElementException e) { |
| | |
| | | while (true) |
| | | { |
| | | DN dn = entryList.removeLast(); |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | "cleaning entry " + dn)); |
| | | |
| | | op = new DeleteOperationBasis(connection, InternalClientConnection |
| | | .nextOperationID(), InternalClientConnection.nextMessageID(), null, |
| | | dn); |
| | | op = new DeleteOperationBasis(connection, |
| | | InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), |
| | | null, |
| | | dn); |
| | | |
| | | op.run(); |
| | | |
| | | if ((op.getResultCode() != ResultCode.SUCCESS) && |
| | | (op.getResultCode() != ResultCode.NO_SUCH_OBJECT)) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | "ReplicationTestCase/Cleaning entries" + |
| | | "DEL " + dn + |
| | | " failed " + op.getResultCode().getResultCodeName())); |
| | | } |
| | | } |
| | | } |
| | | catch (NoSuchElementException e) { |
| | |
| | | |
| | | ResultCode code = modOp.getResultCode(); |
| | | assertTrue(code.equals(ResultCode.SUCCESS), |
| | | "The original operation failed"); |
| | | "The original operation failed: " + code.getResultCodeName()); |
| | | |
| | | // See if the client has received the msg |
| | | ReplicationMessage msg = broker.receive(); |
| | |
| | | public void replaySchemaChange() throws Exception |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | "Starting replication test : pushSchemaChange ")); |
| | | "Starting replication test : replaySchemaChange ")); |
| | | |
| | | final DN baseDn = DN.decode("cn=schema"); |
| | | |
| | |
| | | |
| | | // chek that the operation was successful. |
| | | // check that the update failed. |
| | | assertEquals(ResultCode.SUCCESS, op.getResultCode()); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS, |
| | | op.getAdditionalLogMessage().toString()); |
| | | } |
| | | finally |
| | | { |
| | |
| | | ChangeNumber cn1 = gen1.newChangeNumber(); |
| | | ChangeNumber cn2 = gen2.newChangeNumber(); |
| | | |
| | | state.update(cn1); |
| | | state.update(cn2); |
| | | assertEquals(state.update(cn1), true); |
| | | assertEquals(state.update(cn2), true); |
| | | |
| | | state.save(); |
| | | |
| | |
| | | "cn1 has not been saved or loaded correctly for " + dn); |
| | | assertEquals(cn2Saved, cn2, |
| | | "cn2 has not been saved or loaded correctly for " + dn); |
| | | |
| | | state.clear(); |
| | | stateSaved = new PersistentServerState(baseDn); |
| | | cn1Saved = stateSaved.getMaxChangeNumber((short) 1); |
| | | assertEquals(cn1Saved, null, |
| | | "cn1 has not been saved after clear for " + dn); |
| | | |
| | | } |
| | | } |
| | |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ServerStartMessage msg = new ServerStartMessage(serverId, baseDN, |
| | | window, window, window, window, window, window, state, (short)1, true); |
| | | window, window, window, window, window, window, state, (short)1, |
| | | (long)1, true); |
| | | ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | assertEquals(msg.getVersion(), newMsg.getVersion()); |
| | | assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); |
| | | } |
| | | |
| | | @DataProvider(name="changelogStart") |
| | |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ReplServerStartMessage msg = new ReplServerStartMessage(serverId, |
| | | url, baseDN, window, state, (short)1, true); |
| | | url, baseDN, window, state, (short)1, (long)1, true); |
| | | ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getServerURL(), newMsg.getServerURL()); |
| | |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | assertEquals(msg.getVersion(), newMsg.getVersion()); |
| | | assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); |
| | | assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption()); |
| | | } |
| | | |
| | |
| | | List<String> connectedServers = new ArrayList<String>(0); |
| | | connectedServers.add("s1"); |
| | | connectedServers.add("s2"); |
| | | ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers); |
| | | ReplServerInfoMessage msg = |
| | | new ReplServerInfoMessage(connectedServers, 13); |
| | | ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes()); |
| | | assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers()); |
| | | assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); |
| | | } |
| | | |
| | | /** |
| | |
| | | ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer); |
| | | |
| | | DbHandler handler = |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv); |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1); |
| | | |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0); |
| | | ChangeNumber changeNumber1 = gen.newChangeNumber(); |
| | |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | } |
| | | |
| | | /* |
| | | * Test the feature of clearing a dbHandler used by a replication server. |
| | | * The clear feature is used when a replication server receives a request |
| | | * to reset the generationId of a given domain. |
| | | */ |
| | | @Test() |
| | | void testDbHandlerClear() throws Exception |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | int changelogPort = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | // configure a ReplicationServer. |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(changelogPort, null, 0, |
| | | 2, 0, 100, null); |
| | | ReplicationServer replicationServer = new ReplicationServer(conf); |
| | | |
| | | // create or clean a directory for the dbHandler |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = buildRoot + File.separator + "build" + File.separator + |
| | | "unit-tests" + File.separator + "dbHandler"; |
| | | File testRoot = new File(path); |
| | | if (testRoot.exists()) |
| | | { |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | } |
| | | testRoot.mkdirs(); |
| | | |
| | | ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer); |
| | | |
| | | DbHandler handler = |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1); |
| | | |
| | | // Creates changes added to the dbHandler |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0); |
| | | ChangeNumber changeNumber1 = gen.newChangeNumber(); |
| | | ChangeNumber changeNumber2 = gen.newChangeNumber(); |
| | | ChangeNumber changeNumber3 = gen.newChangeNumber(); |
| | | |
| | | DeleteMsg update1 = new DeleteMsg("o=test", changeNumber1, "uid"); |
| | | DeleteMsg update2 = new DeleteMsg("o=test", changeNumber2, "uid"); |
| | | DeleteMsg update3 = new DeleteMsg("o=test", changeNumber3, "uid"); |
| | | |
| | | // Add the changes |
| | | handler.add(update1); |
| | | handler.add(update2); |
| | | handler.add(update3); |
| | | |
| | | // Check they are here |
| | | assertEquals(changeNumber1, handler.getFirstChange()); |
| | | assertEquals(changeNumber3, handler.getLastChange()); |
| | | |
| | | // Clear ... |
| | | handler.clear(); |
| | | |
| | | // Check the db is cleared. |
| | | assertEquals(null, handler.getFirstChange()); |
| | | assertEquals(null, handler.getLastChange()); |
| | | |
| | | handler.shutdown(); |
| | | dbEnv.shutdown(); |
| | | replicationServer.shutdown(); |
| | | |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | } |
| | | |
| | | |
| | | } |
| | |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | |
| | | import java.net.InetAddress; |
| | |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.core.ModifyDNOperationBasis; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | |
| | | |
| | | public class ReplicationServerTest extends ReplicationTestCase |
| | | { |
| | | // The tracer object for the debug logger |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | /** |
| | | * The replicationServer that will be used in this test. |
| | | */ |
| | |
| | | replicationServer = new ReplicationServer(conf); |
| | | } |
| | | |
| | | private void debugInfo(String s) |
| | | { |
| | | // logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("** TEST **" + s); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Basic test of the replicationServer code : |
| | | * Connect 2 clients to the replicationServer and exchange messages |
| | |
| | | @Test() |
| | | public void changelogBasic() throws Exception |
| | | { |
| | | debugInfo("Starting changelogBasic"); |
| | | ReplicationBroker server1 = null; |
| | | ReplicationBroker server2 = null; |
| | | |
| | |
| | | fail("ReplicationServer basic : incorrect message type received."); |
| | | |
| | | /* |
| | | * Send and receive a Delete Msg from server 1 to server 2 |
| | | * Send and receive a Delete Msg from server 2 to server 1 |
| | | */ |
| | | msg = |
| | | new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2, |
| | |
| | | if (server2 != null) |
| | | server2.stop(); |
| | | } |
| | | debugInfo("Ending changelogBasic"); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) |
| | | public void newClient() throws Exception |
| | | { |
| | | debugInfo("Starting newClient"); |
| | | ReplicationBroker broker = null; |
| | | |
| | | try { |
| | |
| | | |
| | | ReplicationMessage msg2 = broker.receive(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | | fail("ReplicationServer basic transmission failed"); |
| | | fail("ReplicationServer basic transmission failed:" + msg2); |
| | | else |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | |
| | | if (broker != null) |
| | | broker.stop(); |
| | | } |
| | | debugInfo("Ending newClient"); |
| | | } |
| | | |
| | | |
| | |
| | | try { |
| | | broker = |
| | | openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3, |
| | | 100, replicationServerPort, 1000, state); |
| | | 100, replicationServerPort, 5000, state); |
| | | |
| | | ReplicationMessage msg2 = broker.receive(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | | fail("ReplicationServer basic transmission failed"); |
| | | { |
| | | fail("ReplicationServer basic transmission failed:" + msg2); |
| | | } |
| | | else |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | |
| | | @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) |
| | | public void newClientWithFirstChanges() throws Exception |
| | | { |
| | | debugInfo("Starting newClientWithFirstChanges"); |
| | | /* |
| | | * Create a ServerState updated with the first changes from both servers |
| | | * done in test changelogBasic. |
| | |
| | | state.update(firstChangeNumberServer2); |
| | | |
| | | newClientWithChanges(state, secondChangeNumberServer1); |
| | | debugInfo("Ending newClientWithFirstChanges"); |
| | | } |
| | | |
| | | /** |
| | |
| | | ServerStartMessage msg = |
| | | new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"), |
| | | 0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(), |
| | | ProtocolVersion.currentVersion(), sslEncryption); |
| | | ProtocolVersion.currentVersion(), 0, sslEncryption); |
| | | session.publish(msg); |
| | | |
| | | // Read the Replication Server state from the ReplServerStartMessage that |
| | |
| | | |
| | | // send a ServerStartMessage containing the ServerState that was just |
| | | // received. |
| | | DN baseDn = DN.decode("dc=example,dc=com"); |
| | | msg = new ServerStartMessage( |
| | | (short) 1724, DN.decode("dc=example,dc=com"), |
| | | (short) 1724, baseDn, |
| | | 0, 0, 0, 0, WINDOW, (long) 5000, replServerState, |
| | | ProtocolVersion.currentVersion(), sslEncryption); |
| | | ProtocolVersion.currentVersion(), |
| | | ReplicationTestCase.getGenerationId(baseDn), |
| | | sslEncryption); |
| | | session.publish(msg); |
| | | |
| | | // Read the ReplServerStartMessage that come back. |