| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.task.Task; |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.AssuredMode.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | /** |
| | |
| | | * implementation using methods {@link #initializeRemote(int)} |
| | | * or {@link #initializeFromRemote(int)}. |
| | | * <p> |
| | | * At shutdown time, the {@link #stopDomain()} method should be called to |
| | | * At shutdown time, the {@link #disableService()} method should be called to |
| | | * cleanly stop the replication service. |
| | | */ |
| | | public abstract class ReplicationDomain |
| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** The configuration of the replication domain. */ |
| | | protected volatile ReplicationDomainCfg config; |
| | | /** |
| | | * The baseDN for the Replication Service. |
| | | * All Replication Domain using this baseDN will be connected |
| | | * through the Replication Service. |
| | | * The assured configuration of the replication domain. It is a duplicate of |
| | | * {@link #config} because of its update model. |
| | | * |
| | | * @see #readAssuredConfig(ReplicationDomainCfg, boolean) |
| | | */ |
| | | private final DN baseDN; |
| | | |
| | | /** |
| | | * The identifier of this Replication Domain inside the |
| | | * Replication Service. |
| | | * Each Domain must use a unique ServerID. |
| | | */ |
| | | private final int serverID; |
| | | private volatile ReplicationDomainCfg assuredConfig; |
| | | |
| | | /** |
| | | * The ReplicationBroker that is used by this ReplicationDomain to |
| | | * connect to the ReplicationService. |
| | | */ |
| | | protected ReplicationBroker broker = null; |
| | | protected ReplicationBroker broker; |
| | | |
| | | /** |
| | | * This Map is used to store all outgoing assured messages in order |
| | |
| | | private volatile DirectoryThread listenerThread = null; |
| | | |
| | | /** |
| | | * A Map used to store all the ReplicationDomains created on this server. |
| | | */ |
| | | private static Map<DN, ReplicationDomain> domains = |
| | | new HashMap<DN, ReplicationDomain>(); |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | /** Whether assured mode is enabled for this domain. */ |
| | | private boolean assured = false; |
| | | /** Assured sub mode (used when assured is true). */ |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | /** Safe Data level (used when assuredMode is SAFE_DATA). */ |
| | | private byte assuredSdLevel = 1; |
| | | /** The timeout in ms that should be used, when waiting for assured acks. */ |
| | | private long assuredTimeout = 2000; |
| | | |
| | | /** Group id. */ |
| | | private byte groupId = 1; |
| | | /** |
| | | * Referrals urls to be published to other servers of the topology. |
| | | * <p> |
| | | * TODO: fill that with all currently opened urls if no urls configured |
| | | */ |
| | | private final List<String> refUrls = new ArrayList<String>(); |
| | | |
| | | /** |
| | | * A set of counters used for Monitoring. |
| | | */ |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(0); |
| | |
| | | private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | |
| | | /** |
| | | * Window size used during initialization .. between |
| | | * - the initializer/exporter DS that listens/waits acknowledges and that |
| | | * slows down data msg publishing based on the slowest server |
| | | * - and each initialized/importer DS that publishes acknowledges each |
| | | * WINDOW/2 data msg received. |
| | | */ |
| | | protected final int initWindow; |
| | | |
| | | /* Status related monitoring fields */ |
| | | |
| | | /** |
| | |
| | | private final Object sessionLock = new Object(); |
| | | |
| | | /** |
| | | * The generationId for this replication domain. It is made of a hash of the |
| | | * 1000 first entries for this domain. |
| | | */ |
| | | protected volatile long generationId; |
| | | |
| | | /** |
| | | * Returns the {@link CSNGenerator} that will be used to |
| | | * generate {@link CSN} for this domain. |
| | | * |
| | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * |
| | | * @param baseDN The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | * @param initWindow Window used during initialization. |
| | | * @param config |
| | | * The configuration object for this ReplicationDomain |
| | | * @param generationId |
| | | * the generation of this ReplicationDomain |
| | | */ |
| | | public ReplicationDomain(DN baseDN, int serverID, int initWindow) |
| | | public ReplicationDomain(ReplicationDomainCfg config, long generationId) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.initWindow = initWindow; |
| | | this.config = config; |
| | | this.assuredConfig = config; |
| | | this.generationId = generationId; |
| | | this.state = new ServerState(); |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | | domains.put(baseDN, this); |
| | | this.generator = new CSNGenerator(getServerId(), state); |
| | | } |
| | | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * (for unit test purpose only) |
| | | * Creates a ReplicationDomain with the provided parameters. (for unit test |
| | | * purpose only) |
| | | * |
| | | * @param baseDN The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | * @param serverState The serverState to use |
| | | * @param config |
| | | * The configuration object for this ReplicationDomain |
| | | * @param generationId |
| | | * the generation of this ReplicationDomain |
| | | * @param serverState |
| | | * The serverState to use |
| | | */ |
| | | public ReplicationDomain(DN baseDN, int serverID, ServerState serverState) |
| | | public ReplicationDomain(ReplicationDomainCfg config, long generationId, |
| | | ServerState serverState) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.initWindow = 100; |
| | | this.config = config; |
| | | this.assuredConfig = config; |
| | | this.generationId = generationId; |
| | | this.state = serverState; |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | | domains.put(baseDN, this); |
| | | this.generator = new CSNGenerator(getServerId(), state); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (!isValidInitialStatus(initStatus)) |
| | | { |
| | | logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(), |
| | | getBaseDNString(), Integer.toString(serverID))); |
| | | getBaseDNString(), Integer.toString(getServerId()))); |
| | | } |
| | | else |
| | | { |
| | |
| | | private void receiveChangeStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDN + |
| | | TRACER.debugInfo("Replication domain " + getBaseDN() + |
| | | " received change status message:\n" + csMsg); |
| | | |
| | | ServerStatus reqStatus = csMsg.getRequestedStatus(); |
| | |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(), |
| | | getBaseDNString(), Integer.toString(serverID))); |
| | | getBaseDNString(), Integer.toString(getServerId()))); |
| | | return; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. |
| | | * Returns the current config of this ReplicationDomain. |
| | | * |
| | | * @return the config |
| | | */ |
| | | protected ReplicationDomainCfg getConfig() |
| | | { |
| | | return config; |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. All Replication Domain using |
| | | * this baseDN will be connected through the Replication Service. |
| | | * |
| | | * @return The base DN of this ReplicationDomain |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | return config.getBaseDN(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public String getBaseDNString() |
| | | { |
| | | return baseDN.toNormalizedString(); |
| | | return getBaseDN().toNormalizedString(); |
| | | } |
| | | |
| | | /** |
| | | * Get the server ID. |
| | | * Get the server ID. The identifier of this Replication Domain inside the |
| | | * Replication Service. Each Domain must use a unique ServerID. |
| | | * |
| | | * @return The server ID. |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverID; |
| | | return config.getServerId(); |
| | | } |
| | | |
| | | /** |
| | | * Window size used during initialization .. between - the |
| | | * initializer/exporter DS that listens/waits acknowledges and that slows down |
| | | * data msg publishing based on the slowest server - and each |
| | | * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg |
| | | * received. |
| | | * |
| | | * @return the initWindow |
| | | */ |
| | | protected int getInitWindow() |
| | | { |
| | | return config.getInitializationWindowSize(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean isAssured() |
| | | { |
| | | return assured; |
| | | return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType()) |
| | | || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType()); |
| | | } |
| | | |
| | | /** |
| | | * Gives the mode for the assured replication of the domain. |
| | | * Gives the mode for the assured replication of the domain. Only used when |
| | | * assured is true). |
| | | * |
| | | * @return The mode for the assured replication of the domain. |
| | | */ |
| | | public AssuredMode getAssuredMode() |
| | | { |
| | | return assuredMode; |
| | | switch (assuredConfig.getAssuredType()) |
| | | { |
| | | case SAFE_DATA: |
| | | case NOT_ASSURED: // The assured mode will be ignored in that case anyway |
| | | return AssuredMode.SAFE_DATA_MODE; |
| | | case SAFE_READ: |
| | | return AssuredMode.SAFE_READ_MODE; |
| | | } |
| | | return null; // should never happen |
| | | } |
| | | |
| | | /** |
| | | * Gives the assured level of the replication of the domain. |
| | | * Gives the assured Safe Data level of the replication of the domain. (used |
| | | * when assuredMode is SAFE_DATA). |
| | | * |
| | | * @return The assured level of the replication of the domain. |
| | | */ |
| | | public byte getAssuredSdLevel() |
| | | { |
| | | return assuredSdLevel; |
| | | return (byte) assuredConfig.getAssuredSdLevel(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getAssuredTimeout() |
| | | { |
| | | return assuredTimeout; |
| | | return assuredConfig.getAssuredTimeout(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | return (byte) config.getGroupId(); |
| | | } |
| | | |
| | | /** |
| | | * Gets the referrals URLs this domain publishes. |
| | | * Gets the referrals URLs this domain publishes. Referrals urls to be |
| | | * published to other servers of the topology. |
| | | * <p> |
| | | * TODO: fill that with all currently opened urls if no urls configured |
| | | * |
| | | * @return The referrals URLs this domain publishes. |
| | | */ |
| | | public List<String> getRefUrls() |
| | | public Set<String> getRefUrls() |
| | | { |
| | | return refUrls; |
| | | return config.getReferralsUrl(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the list of Referrals that should be returned when an |
| | | * operation needs to be redirected to this server. |
| | | * |
| | | * @param referralsUrl The list of referrals. |
| | | */ |
| | | public void setURLs(Set<String> referralsUrl) |
| | | { |
| | | this.refUrls.addAll(referralsUrl); |
| | | } |
| | | |
| | | /** |
| | | * Set the timeout of the assured replication. |
| | | * |
| | | * @param assuredTimeout the timeout of the assured replication. |
| | | */ |
| | | public void setAssuredTimeout(long assuredTimeout) |
| | | { |
| | | this.assuredTimeout = assuredTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Sets the groupID. |
| | | * |
| | | * @param groupId The groupID. |
| | | */ |
| | | public void setGroupId(byte groupId) |
| | | { |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | | /** |
| | | * Sets the level of assured replication. |
| | | * |
| | | * @param assuredSdLevel The level of assured replication. |
| | | */ |
| | | public void setAssuredSdLevel(byte assuredSdLevel) |
| | | { |
| | | this.assuredSdLevel = assuredSdLevel; |
| | | } |
| | | |
| | | /** |
| | | * Sets the assured replication mode. |
| | | * |
| | | * @param dataMode The assured replication mode. |
| | | */ |
| | | public void setAssuredMode(AssuredMode dataMode) |
| | | { |
| | | this.assuredMode = dataMode; |
| | | } |
| | | |
| | | /** |
| | | * Sets assured replication. |
| | | * |
| | | * @param assured A boolean indicating if assured replication should be used. |
| | | */ |
| | | public void setAssured(boolean assured) |
| | | { |
| | | this.assured = assured; |
| | | } |
| | | |
| | | /** |
| | | * Receives an update message from the replicationServer. |
| | | * The other types of messages are processed in an opaque way for the caller. |
| | | * Also responsible for updating the list of pending changes |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] processErrorMsg:" + this.serverID + |
| | | " baseDN: " + this.baseDN + |
| | | "[IE] processErrorMsg:" + getServerId() + |
| | | " baseDN: " + getBaseDN() + |
| | | " Error Msg received: " + errorMsg); |
| | | |
| | | if (errorMsg.getCreationTime() > ieContext.startTime) |
| | |
| | | } |
| | | |
| | | numRcvdUpdates.incrementAndGet(); |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | if (update.isAssured() |
| | | && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE |
| | | && rsGroupId == groupId) |
| | | && broker.getRsGroupId() == getGroupId() |
| | | && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | assuredSrReceivedUpdates.incrementAndGet(); |
| | | } |
| | |
| | | requested servers. Log problem |
| | | */ |
| | | logError(NOTE_DS_RECEIVED_ACK_ERROR.get( |
| | | getBaseDNString(), Integer.toString(serverID), |
| | | getBaseDNString(), Integer.toString(getServerId()), |
| | | update.toString(), ack.errorsToString())); |
| | | |
| | | List<Integer> failedServers = ack.getFailedServers(); |
| | |
| | | */ |
| | | public ExportThread(int serverIdToInitialize, int initWindow) |
| | | { |
| | | super("Export thread from serverId=" + serverID + " to serverId=" |
| | | super("Export thread from serverId=" + getServerId() + " to serverId=" |
| | | + serverIdToInitialize); |
| | | this.serverIdToInitialize = serverIdToInitialize; |
| | | this.initWindow = initWindow; |
| | |
| | | public void initializeRemote(int target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | initializeRemote(target, this.serverID, initTask, this.initWindow); |
| | | initializeRemote(target, getServerId(), initTask, getInitWindow()); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get( |
| | | countEntries(), getBaseDNString(), serverID)); |
| | | countEntries(), getBaseDNString(), getServerId())); |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | countEntries(), getBaseDNString(), serverID, serverToInitialize)); |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(), |
| | | getBaseDNString(), getServerId(), serverToInitialize)); |
| | | |
| | | ieContext.startList.add(serverToInitialize); |
| | | |
| | |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( |
| | | getBaseDN(), serverID, serverToInitialize, |
| | | getBaseDN(), getServerId(), serverToInitialize, |
| | | serverRunningTheTask, ieContext.entryCount, initWindow); |
| | | |
| | | broker.publish(initTargetMsg); |
| | |
| | | exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); |
| | | |
| | | // Notify the peer of the success |
| | | DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination()); |
| | | broker.publish(doneMsg); |
| | | broker.publish( |
| | | new DoneMsg(getServerId(), initTargetMsg.getDestination())); |
| | | } |
| | | catch(DirectoryException exportException) |
| | | { |
| | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get( |
| | | getBaseDNString(), serverID, cause)); |
| | | getBaseDNString(), getServerId(), cause)); |
| | | } |
| | | else |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | getBaseDNString(), serverID, serverToInitialize, cause)); |
| | | getBaseDNString(), getServerId(), serverToInitialize, cause)); |
| | | } |
| | | |
| | | |
| | |
| | | // send the ack of flow control mgmt |
| | | if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0) |
| | | { |
| | | InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( |
| | | this.serverID, |
| | | entryMsg.getSenderID(), |
| | | ieContext.msgCnt); |
| | | final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( |
| | | getServerId(), entryMsg.getSenderID(), ieContext.msgCnt); |
| | | broker.publish(amsg, false); |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | | ERR_INIT_EXPORTER_DISCONNECTION.get( |
| | | getBaseDNString(), |
| | | Integer.toString(this.serverID), |
| | | Integer.toString(getServerId()), |
| | | Integer.toString(ieContext.importSource))); |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, errMsg)); |
| | |
| | | |
| | | // build the message |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | serverID,ieContext.getExportTarget(), lDIFEntry, pos, length, |
| | | getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length, |
| | | ++ieContext.msgCnt); |
| | | |
| | | // Waiting the slowest loop |
| | |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.attemptCnt = 0; |
| | | ieContext.initReqMsgSent = new InitializeRequestMsg( |
| | | getBaseDN(), serverID, source, this.initWindow); |
| | | getBaseDN(), getServerId(), source, getInitWindow()); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(ieContext.initReqMsgSent); |
| | |
| | | try |
| | | { |
| | | // Log starting |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID)); |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(), |
| | | initTargetMsgReceived.getSenderID(), getServerId())); |
| | | |
| | | // Go into full update status |
| | | setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT); |
| | | |
| | | // Acquire an import context if no already done (and initialize). |
| | | if (initTargetMsgReceived.getInitiatorID() != this.serverID) |
| | | if (initTargetMsgReceived.getInitiatorID() != getServerId()) |
| | | { |
| | | /* |
| | | The initTargetMsgReceived is for an import initiated by the remote |
| | |
| | | finally |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID, |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), |
| | | getServerId(), |
| | | (ieContext.getException() == null ? "" |
| | | : ieContext.getException().getLocalizedMessage())); |
| | | logError(msg); |
| | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(), |
| | | Integer.toString(serverID), status.toString(), event.toString())); |
| | | String.valueOf(getServerId()), status.toString(), event.toString())); |
| | | return; |
| | | } |
| | | |
| | |
| | | resetMonitoringCounters(); |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication domain " + baseDN + " new status is: " |
| | | + status); |
| | | TRACER.debugInfo("Replication domain " + getBaseDN() |
| | | + " new status is: " + status); |
| | | } |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | checkGenerationID(-1); |
| | | |
| | | // Reconnect to the Replication Server so that it adopt our |
| | | // GenerationID. |
| | | disableService(); |
| | | enableService(); |
| | | // Reconnect to the Replication Server so that it adopts our GenerationID. |
| | | restartService(); |
| | | |
| | | // wait for the domain to reconnect. |
| | | int count = 0; |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN |
| | | + " resetGenerationId " + generationIdNewValue); |
| | | TRACER.debugInfo("Server id " + getServerId() + " and domain " |
| | | + getBaseDN() + " resetGenerationId " + generationIdNewValue); |
| | | } |
| | | |
| | | ResetGenerationIdMsg genIdMessage = |
| | |
| | | if (!isConnected()) |
| | | { |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(), |
| | | Integer.toString(serverID), |
| | | Integer.toString(getServerId()), |
| | | Long.toString(genIdMessage.getGenerationId())); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Definitively stops the Replication Service. |
| | | */ |
| | | public void stopDomain() |
| | | { |
| | | disableService(); |
| | | domains.remove(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Change some ReplicationDomain parameters. |
| | | * |
| | | * @param config |
| | | * The new configuration that this domain should now use. |
| | | */ |
| | | public void changeConfig(ReplicationDomainCfg config) |
| | | protected void changeConfig(ReplicationDomainCfg config) |
| | | { |
| | | this.groupId = (byte) config.getGroupId(); |
| | | |
| | | if (broker != null && broker.changeConfig(config)) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | restartService(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Applies a configuration change to the attributes which should be be |
| | | * included in the ECL. |
| | |
| | | public void changeConfig(Set<String> includeAttributes, |
| | | Set<String> includeAttributesForDeletes) |
| | | { |
| | | if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes) |
| | | && broker != null) |
| | | final boolean attrsModified = setEclIncludes( |
| | | getServerId(), includeAttributes, includeAttributesForDeletes); |
| | | if (attrsModified && broker != null) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | restartService(); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void restartService() |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | } |
| | | |
| | | /** |
| | | * This method should trigger an export of the replicated data. |
| | |
| | | Send an ack if it was requested and the group id is the same of the RS |
| | | one. Only Safe Read mode makes sense in DS for returning an ack. |
| | | */ |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (msg.isAssured() |
| | | && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2) |
| | | { |
| | | AssuredMode msgAssuredMode = msg.getAssuredMode(); |
| | | if (msgAssuredMode == AssuredMode.SAFE_READ_MODE) |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | if (rsGroupId == groupId) |
| | | if (broker.getRsGroupId() == getGroupId()) |
| | | { |
| | | // Send the ack |
| | | AckMsg ackMsg = new AckMsg(msg.getCSN()); |
| | |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occurred in our server |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | idList.add(getServerId()); |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | | broker.publish(ackMsg); |
| | |
| | | } |
| | | } |
| | | } |
| | | else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID), |
| | | msgAssuredMode.toString(), getBaseDNString(), msg.toString())); |
| | | logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()), |
| | | msg.getAssuredMode().toString(), getBaseDNString(), |
| | | msg.toString())); |
| | | } |
| | | // Nothing to do in Assured safe data mode, only RS ack updates. |
| | | } |
| | |
| | | */ |
| | | protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg) |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | /* |
| | | * If assured configured, set message accordingly to request an ack in the |
| | | * right assured mode. |
| | | * No ack requested for a RS with a different group id. Assured |
| | | * replication supported for the same locality, i.e: a topology working in |
| | | * the same |
| | | * geographical location). If we are connected to a RS which is not in our |
| | | * locality, no need to ask for an ack. |
| | | * No ack requested for a RS with a different group id. |
| | | * Assured replication supported for the same locality, |
| | | * i.e: a topology working in the same geographical location). |
| | | * If we are connected to a RS which is not in our locality, |
| | | * no need to ask for an ack. |
| | | */ |
| | | if (assured && rsGroupId == groupId) |
| | | if (needsAck()) |
| | | { |
| | | msg.setAssured(true); |
| | | msg.setAssuredMode(assuredMode); |
| | | if (assuredMode == AssuredMode.SAFE_DATA_MODE) |
| | | msg.setAssuredMode(getAssuredMode()); |
| | | if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | msg.setSafeDataLevel(assuredSdLevel); |
| | | msg.setSafeDataLevel(getAssuredSdLevel()); |
| | | } |
| | | |
| | | // Add the assured message to the list of update that are waiting for acks |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean needsAck() |
| | | { |
| | | return isAssured() && broker.getRsGroupId() == getGroupId(); |
| | | } |
| | | |
| | | /** |
| | | * Wait for the processing of an assured message after it has been sent, if |
| | | * assured replication is configured, otherwise, do nothing. |
| | |
| | | protected void waitForAckIfAssuredEnabled(UpdateMsg msg) |
| | | throws TimeoutException |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | |
| | | // If assured mode configured, wait for acknowledgment for the just sent |
| | | // message |
| | | if (assured && rsGroupId == groupId) |
| | | if (needsAck()) |
| | | { |
| | | // Increment assured replication monitoring counters |
| | | switch (assuredMode) |
| | | switch (getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrSentUpdates.incrementAndGet(); |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("waitForAck method interrupted for replication " + |
| | | "baseDN: " + baseDN); |
| | | "baseDN: " + getBaseDN()); |
| | | } |
| | | break; |
| | | } |
| | | // Timeout ? |
| | | if ( (System.currentTimeMillis() - startTime) >= assuredTimeout ) |
| | | if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout()) |
| | | { |
| | | /* |
| | | Timeout occurred, be sure that ack is not being received and if so, |
| | |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message csn: " + csn |
| | | + " and replication domain: " + baseDN + " after " |
| | | + assuredTimeout + " ms."); |
| | | + " and replication domain: " + getBaseDN() + " after " |
| | | + getAssuredTimeout() + " ms."); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | // This exception may only be raised if assured replication is enabled |
| | | logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(), |
| | | Long.toString(assuredTimeout), update.toString())); |
| | | Long.toString(getAssuredTimeout()), update.toString())); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * @return The GenerationID. |
| | | */ |
| | | public abstract long getGenerationID(); |
| | | public long getGenerationID() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Subclasses should use this method to add additional monitoring |
| | | * information in the ReplicationDomain. |
| | | * Sets the generationId for this replication domain. |
| | | * |
| | | * @param generationId |
| | | * the generationId to set |
| | | */ |
| | | public void setGenerationID(long generationId) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | | * Subclasses should use this method to add additional monitoring information |
| | | * in the ReplicationDomain. |
| | | * |
| | | * @return Additional monitoring attributes that will be added in the |
| | | * ReplicationDomain monitoring entry. |
| | |
| | | */ |
| | | public CSN getLastLocalChange() |
| | | { |
| | | return state.getCSN(serverID); |
| | | return state.getCSN(getServerId()); |
| | | } |
| | | |
| | | /** |
| | | * Gets and stores the assured replication configuration parameters. Returns a |
| | | * boolean indicating if the passed configuration has changed compared to |
| | | * previous values and the changes require a reconnection. |
| | | * |
| | | * @param config |
| | | * The configuration object |
| | | * @param allowReconnection |
| | | * Tells if one must reconnect if significant changes occurred |
| | | */ |
| | | protected void readAssuredConfig(ReplicationDomainCfg config, |
| | | boolean allowReconnection) |
| | | { |
| | | // Disconnect if required: changing configuration values before |
| | | // disconnection would make assured replication used immediately and |
| | | // disconnection could cause some timeouts error. |
| | | if (needReconnection(config) && allowReconnection) |
| | | { |
| | | disableService(); |
| | | |
| | | assuredConfig = config; |
| | | |
| | | enableService(); |
| | | } |
| | | } |
| | | |
| | | private boolean needReconnection(ReplicationDomainCfg cfg) |
| | | { |
| | | final AssuredMode assuredMode = getAssuredMode(); |
| | | switch (cfg.getAssuredType()) |
| | | { |
| | | case NOT_ASSURED: |
| | | if (isAssured()) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | case SAFE_DATA: |
| | | if (!isAssured() || assuredMode == SAFE_READ_MODE) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | case SAFE_READ: |
| | | if (!isAssured() || assuredMode == SAFE_DATA_MODE) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | } |
| | | |
| | | return isAssured() |
| | | && assuredMode == SAFE_DATA_MODE |
| | | && cfg.getAssuredSdLevel() != getAssuredSdLevel(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID; |
| | | return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId(); |
| | | } |
| | | } |