| | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.config.ConfigException; |
| | |
| | | * The startup phase of the ReplicationDomain subclass, |
| | | * should read the list of replication servers from the configuration, |
| | | * instantiate a {@link ServerState} then start the publish service |
| | | * by calling |
| | | * {@link #startPublishService(Set, int, long, long)}. |
| | | * by calling {@link #startPublishService(ReplicationDomainCfg)}. |
| | | * At this point it can start calling the {@link #publish(UpdateMsg)} |
| | | * method if needed. |
| | | * <p> |
| | |
| | | * - and each initialized/importer DS that publishes acknowledges each |
| | | * WINDOW/2 data msg received. |
| | | */ |
| | | protected int initWindow = 100; |
| | | protected final int initWindow; |
| | | |
| | | /* Status related monitoring fields */ |
| | | |
| | |
| | | |
| | | private final Map<Integer, Set<String>> eclIncludesForDeletesByServer = |
| | | new HashMap<Integer, Set<String>>(); |
| | | private Set<String> eclIncludesForDeletesAllServers = Collections |
| | | .emptySet(); |
| | | private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet(); |
| | | |
| | | /** |
| | | * An object used to protect the initialization of the underlying broker |
| | |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.initWindow = 100; |
| | | this.state = serverState; |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | |
| | | public void run() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] starting " + this.getName()); |
| | | TRACER.debugInfo("[IE] starting " + getName()); |
| | | try |
| | | { |
| | | initializeRemote(serverIdToInitialize, serverIdToInitialize, null, |
| | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] ending " + this.getName()); |
| | | TRACER.debugInfo("[IE] ending " + getName()); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public int decodeTarget(String targetString) throws DirectoryException |
| | | { |
| | | if (targetString.equalsIgnoreCase("all")) |
| | | if ("all".equalsIgnoreCase(targetString)) |
| | | { |
| | | return RoutableMsg.ALL_SERVERS; |
| | | } |
| | |
| | | "[IE] wait for start dsid " + dsi.getDsId() |
| | | + " " + dsi.getStatus() |
| | | + " " + dsi.getGenerationId() |
| | | + " " + this.getGenerationID()); |
| | | + " " + getGenerationID()); |
| | | if (ieContext.startList.contains(dsi.getDsId())) |
| | | { |
| | | if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS) |
| | |
| | | } |
| | | else |
| | | { |
| | | if (dsInfo.getGenerationId() == this.getGenerationID()) |
| | | if (dsInfo.getGenerationId() == getGenerationID()) |
| | | { // and with the expected generationId |
| | | // We're done with this server |
| | | it.remove(); |
| | |
| | | { |
| | | // Rejects 2 simultaneous exports |
| | | Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | ieContext = new IEContext(importInProgress); |
| | |
| | | */ |
| | | private void processErrorMsg(ErrorMsg errorMsg) |
| | | { |
| | | if (ieContext != null) |
| | | //Exporting must not be stopped on the first error, if we run initialize-all |
| | | if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | { |
| | | /* |
| | | Exporting must not be stopped on the first error, if we |
| | | run initialize-all. |
| | | */ |
| | | if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | // The ErrorMsg is received while we have started an initialization |
| | | if (ieContext.getException() == null) |
| | | { |
| | | // The ErrorMsg is received while we have started an initialization |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails())); |
| | | ieContext.setException( |
| | | new DirectoryException(ResultCode.OTHER, errorMsg.getDetails())); |
| | | } |
| | | |
| | | /* |
| | | * This can happen : |
| | | * - on the first InitReqMsg sent when source in not known for example |
| | | * - on the next attempt when source crashed and did not reconnect |
| | | * even after the nextInitAttemptDelay |
| | | * During the import, the ErrorMsg will be received by receiveEntryBytes |
| | | */ |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | updateTaskCompletionState(ieContext.getException()); |
| | | /* |
| | | * This can happen : |
| | | * - on the first InitReqMsg sent when source in not known for example |
| | | * - on the next attempt when source crashed and did not reconnect |
| | | * even after the nextInitAttemptDelay |
| | | * During the import, the ErrorMsg will be received by receiveEntryBytes |
| | | */ |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask) ieContext.initializeTask) |
| | | .updateTaskCompletionState(ieContext.getException()); |
| | | |
| | | releaseIEContext(); |
| | | } |
| | | releaseIEContext(); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | /* |
| | | This is the normal termination of the import |
| | | No error is stored and the import is ended |
| | | by returning null |
| | | No error is stored and the import is ended by returning null |
| | | */ |
| | | return null; |
| | | } |
| | |
| | | { |
| | | /* |
| | | This is an error termination during the import |
| | | The error is stored and the import is ended |
| | | by returning null |
| | | The error is stored and the import is ended by returning null |
| | | */ |
| | | if (ieContext.getException() == null) |
| | | { |
| | |
| | | { |
| | | // Other messages received during an import are trashed except |
| | | // the topologyMsg. |
| | | if ((msg instanceof TopologyMsg) && |
| | | (isRemoteDSConnected(ieContext.importSource)==null)) |
| | | if (msg instanceof TopologyMsg |
| | | && isRemoteDSConnected(ieContext.importSource) == null) |
| | | { |
| | | Message errMsg = |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | |
| | | catch(Exception e) { /* do nothing */ } |
| | | |
| | | // process any connection error |
| | | if ((broker.hasConnectionError())|| |
| | | (broker.getNumLostConnections()!= ieContext.initNumLostConnections)) |
| | | if (broker.hasConnectionError() |
| | | || broker.getNumLostConnections() != ieContext.initNumLostConnections) |
| | | { |
| | | // publish failed - store the error in the ieContext ... |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | |
| | | * @throws DirectoryException When the generation ID of the Replication |
| | | * Servers is not the expected value. |
| | | */ |
| | | private void checkGenerationID(long generationID) |
| | | throws DirectoryException |
| | | private void checkGenerationID(long generationID) throws DirectoryException |
| | | { |
| | | boolean allSet = true; |
| | | |
| | |
| | | public void resetReplicationLog() throws DirectoryException |
| | | { |
| | | // Reset the Generation ID to -1 to clean the ReplicationServers. |
| | | resetGenerationId((long)-1); |
| | | resetGenerationId(-1L); |
| | | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | checkGenerationID(-1); |
| | |
| | | * @throws DirectoryException When an error occurs |
| | | */ |
| | | public void resetGenerationId(Long generationIdNewValue) |
| | | throws DirectoryException |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN |
| | | + " resetGenerationId " + generationIdNewValue); |
| | | |
| | | ResetGenerationIdMsg genIdMessage; |
| | | |
| | | if (generationIdNewValue == null) |
| | | { |
| | | genIdMessage = new ResetGenerationIdMsg(this.getGenerationID()); |
| | | } |
| | | else |
| | | { |
| | | genIdMessage = new ResetGenerationIdMsg(generationIdNewValue); |
| | | } |
| | | ResetGenerationIdMsg genIdMessage = |
| | | new ResetGenerationIdMsg(getGenId(generationIdNewValue)); |
| | | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(), |
| | | Integer.toString(serverID), |
| | | Long.toString(genIdMessage.getGenerationId())); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | broker.publish(genIdMessage); |
| | | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | if (generationIdNewValue == null) |
| | | checkGenerationID(getGenId(generationIdNewValue)); |
| | | } |
| | | |
| | | private long getGenId(Long generationIdNewValue) |
| | | { |
| | | if (generationIdNewValue != null) |
| | | { |
| | | checkGenerationID(this.getGenerationID()); |
| | | return generationIdNewValue; |
| | | } |
| | | else |
| | | { |
| | | checkGenerationID(generationIdNewValue); |
| | | } |
| | | return getGenerationID(); |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | |
| | | /** |
| | | * Start the publish mechanism of the Replication Service. |
| | | * After this method has been called, the publish service can be used |
| | | * by calling the {@link #publish(UpdateMsg)} method. |
| | | * Start the publish mechanism of the Replication Service. After this method |
| | | * has been called, the publish service can be used by calling the |
| | | * {@link #publish(UpdateMsg)} method. |
| | | * |
| | | * @param replicationServers The replication servers that should be used. |
| | | * @param window The window size of this replication domain. |
| | | * @param heartbeatInterval The heartbeatInterval that should be used |
| | | * to check the availability of the replication |
| | | * servers. |
| | | * @param changetimeHeartbeatInterval The interval used to send change |
| | | * time heartbeat to the replication server. |
| | | * |
| | | * @throws ConfigException If the DirectoryServer configuration was |
| | | * incorrect. |
| | | * @param config |
| | | * The configuration that should be used. |
| | | * @throws ConfigException |
| | | * If the DirectoryServer configuration was incorrect. |
| | | */ |
| | | public void startPublishService(Set<String> replicationServers, int window, |
| | | long heartbeatInterval, long changetimeHeartbeatInterval) |
| | | throws ConfigException |
| | | public void startPublishService(ReplicationDomainCfg config) |
| | | throws ConfigException |
| | | { |
| | | synchronized (sessionLock) |
| | | { |
| | |
| | | { |
| | | // create the broker object used to publish and receive changes |
| | | broker = new ReplicationBroker( |
| | | this, state, baseDN, |
| | | serverID, window, |
| | | getGenerationID(), |
| | | heartbeatInterval, |
| | | new ReplSessionSecurity(), |
| | | getGroupId(), |
| | | changetimeHeartbeatInterval); |
| | | |
| | | broker.start(replicationServers); |
| | | this, state, config, getGenerationID(), new ReplSessionSecurity()); |
| | | broker.start(); |
| | | } |
| | | } |
| | | } |
| | |
| | | * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}. |
| | | * <p> |
| | | * This method must be called once and must be called after the |
| | | * {@link #startPublishService(Collection, int, long, long)}. |
| | | * {@link #startPublishService(ReplicationDomainCfg)}. |
| | | */ |
| | | public void startListenService() |
| | | { |
| | |
| | | * <p> |
| | | * The Replication Service will restart from the point indicated by the |
| | | * {@link ServerState} that was given as a parameter to the |
| | | * {@link #startPublishService(Collection, int, long, long)} |
| | | * at startup time. |
| | | * {@link #startPublishService(ReplicationDomainCfg)} at startup time. |
| | | * <p> |
| | | * If some data have changed in the repository during the period of time when |
| | | * the Replication Service was disabled, this {@link ServerState} should |
| | | * therefore be updated by the Replication Domain subclass before calling |
| | | * this method. This method is not MT safe. |
| | | * therefore be updated by the Replication Domain subclass before calling this |
| | | * method. This method is not MT safe. |
| | | */ |
| | | public void enableService() |
| | | { |
| | |
| | | /** |
| | | * Change some ReplicationDomain parameters. |
| | | * |
| | | * @param replicationServers The new set of Replication Servers that this |
| | | * domain should now use. |
| | | * @param windowSize The window size that this domain should use. |
| | | * @param heartbeatInterval The heartbeatInterval that this domain should |
| | | * use. |
| | | * @param groupId The new group id to use |
| | | * @param config |
| | | * The new configuration that this domain should now use. |
| | | */ |
| | | public void changeConfig(Set<String> replicationServers, int windowSize, |
| | | long heartbeatInterval, byte groupId) |
| | | public void changeConfig(ReplicationDomainCfg config) |
| | | { |
| | | this.groupId = groupId; |
| | | this.groupId = (byte) config.getGroupId(); |
| | | |
| | | if (broker != null |
| | | && broker.changeConfig(replicationServers, windowSize, |
| | | heartbeatInterval, groupId)) |
| | | if (broker != null && broker.changeConfig(config)) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | |
| | | one. Only Safe Read mode makes sense in DS for returning an ack. |
| | | */ |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | if (msg.isAssured()) |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (msg.isAssured() |
| | | && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2) |
| | | { |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (broker.getProtocolVersion() >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V2) |
| | | AssuredMode msgAssuredMode = msg.getAssuredMode(); |
| | | if (msgAssuredMode == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | AssuredMode msgAssuredMode = msg.getAssuredMode(); |
| | | if (msgAssuredMode == AssuredMode.SAFE_READ_MODE) |
| | | if (rsGroupId == groupId) |
| | | { |
| | | if (rsGroupId == groupId) |
| | | // Send the ack |
| | | AckMsg ackMsg = new AckMsg(msg.getCSN()); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | // Send the ack |
| | | AckMsg ackMsg = new AckMsg(msg.getCSN()); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | // Mark the error in the ack |
| | | // -> replay error occurred |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occurred in our server |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | | broker.publish(ackMsg); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | assuredSrReceivedUpdatesNotAcked.incrementAndGet(); |
| | | } else |
| | | { |
| | | assuredSrReceivedUpdatesAcked.incrementAndGet(); |
| | | } |
| | | // Mark the error in the ack |
| | | // -> replay error occurred |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occurred in our server |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | | } else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(serverID), msgAssuredMode.toString(), |
| | | getBaseDNString(), msg.toString()); |
| | | logError(errorMsg); |
| | | broker.publish(ackMsg); |
| | | if (replayErrorMsg != null) |
| | | { |
| | | assuredSrReceivedUpdatesNotAcked.incrementAndGet(); |
| | | } |
| | | else |
| | | { |
| | | assuredSrReceivedUpdatesAcked.incrementAndGet(); |
| | | } |
| | | } |
| | | // Nothing to do in Assured safe data mode, only RS ack updates. |
| | | } |
| | | else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | Message errorMsg = |
| | | ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID), |
| | | msgAssuredMode.toString(), getBaseDNString(), msg.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | // Nothing to do in Assured safe data mode, only RS ack updates. |
| | | } |
| | | |
| | | incProcessedUpdates(); |
| | |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | |
| | | // If assured mode configured, wait for acknowledgement for the just sent |
| | | // If assured mode configured, wait for acknowledgment for the just sent |
| | | // message |
| | | if (assured && rsGroupId == groupId) |
| | | { |
| | |
| | | remove the update from the wait list, log the timeout error and |
| | | also update assured monitoring counters |
| | | */ |
| | | UpdateMsg update = waitingAckMsgs.remove(csn); |
| | | |
| | | if (update != null) |
| | | { |
| | | // No luck, this is a real timeout |
| | | // Increment assured replication monitoring counters |
| | | switch (msg.getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrNotAcknowledgedUpdates.incrementAndGet(); |
| | | assuredSrTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer( |
| | | assuredSrServerNotAcknowledgedUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | case SAFE_DATA_MODE: |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | default: |
| | | // Should not happen |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message csn: " |
| | | + csn + " and replication servceID: " + baseDN + " after " |
| | | + assuredTimeout + " ms."); |
| | | } else |
| | | final UpdateMsg update = waitingAckMsgs.remove(csn); |
| | | if (update == null) |
| | | { |
| | | // Ack received just before timeout limit: we can exit |
| | | break; |
| | | } |
| | | |
| | | // No luck, this is a real timeout |
| | | // Increment assured replication monitoring counters |
| | | switch (msg.getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrNotAcknowledgedUpdates.incrementAndGet(); |
| | | assuredSrTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | case SAFE_DATA_MODE: |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | // Increment number of errors for our RS |
| | | updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, |
| | | broker.getRsServerId()); |
| | | break; |
| | | default: |
| | | // Should not happen |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message csn: " + csn |
| | | + " and replication domain: " + baseDN + " after " |
| | | + assuredTimeout + " ms."); |
| | | } |
| | | } |
| | | } |
| | |
| | | update = new UpdateMsg(generator.newCSN(), msg); |
| | | /* |
| | | If assured replication is configured, this will prepare blocking |
| | | mechanism. If assured replication is disabled, this returns |
| | | immediately |
| | | mechanism. If assured replication is disabled, this returns immediately |
| | | */ |
| | | prepareWaitForAckIfAssuredEnabled(update); |
| | | |
| | |
| | | waitForAckIfAssuredEnabled(update); |
| | | } catch (TimeoutException ex) |
| | | { |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | // This exception may only be raised if assured replication is enabled |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(), |
| | | Long.toString(assuredTimeout), update.toString()); |
| | | logError(errorMsg); |