| | |
| | | * Full Initialization of a replica can be triggered by LDAP clients |
| | | * by creating InitializeTasks or InitializeTargetTask. |
| | | * Full initialization can also by triggered from the ReplicationDomain |
| | | * implementation using methods {@link #initializeRemote(short)} |
| | | * or {@link #initializeFromRemote(short)}. |
| | | * implementation using methods {@link #initializeRemote(int)} |
| | | * or {@link #initializeFromRemote(int)}. |
| | | * <p> |
| | | * At shutdown time, the {@link #stopDomain()} method should be called to |
| | | * cleanly stop the replication service. |
| | |
| | | * Replication Service. |
| | | * Each Domain must use a unique ServerID. |
| | | */ |
| | | private final short serverID; |
| | | private final int serverID; |
| | | |
| | | /** |
| | | * The ReplicationBroker that is used by this ReplicationDomain to |
| | |
| | | // that have not been successfully acknowledged (either because of timeout, |
| | | // wrong status or error at replay) for a particular server (DS or RS). String |
| | | // format: <server id>:<number of failed updates> |
| | | private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates = |
| | | new HashMap<Short,Integer>(); |
| | | private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | // Number of updates received in Assured Mode, Safe Read request |
| | | private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0); |
| | | // Number of updates received in Assured Mode, Safe Read request that we have |
| | |
| | | // Multiple values allowed: number of updates sent in Assured Mode, Safe Data, |
| | | // that have not been successfully acknowledged because of timeout for a |
| | | // particular RS. String format: <server id>:<number of failed updates> |
| | | private Map<Short,Integer> assuredSdServerTimeoutUpdates = |
| | | new HashMap<Short,Integer>(); |
| | | private Map<Integer, Integer> assuredSdServerTimeoutUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | |
| | | /* Status related monitoring fields */ |
| | | |
| | |
| | | * A Map containing of the ServerStates of all the replicas in the topology |
| | | * as seen by the ReplicationServer the last time it was polled. |
| | | */ |
| | | private Map<Short, ServerState> replicaStates = |
| | | new HashMap<Short, ServerState>(); |
| | | private HashMap<Integer, ServerState> replicaStates = |
| | | new HashMap<Integer, ServerState>(); |
| | | |
| | | Set<String> cfgEclIncludes = new HashSet<String>(); |
| | | Set<String> eClIncludes = new HashSet<String>(); |
| | |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | */ |
| | | public ReplicationDomain(String serviceID, short serverID) |
| | | public ReplicationDomain(String serviceID, int serverID) |
| | | { |
| | | this.serviceID = serviceID; |
| | | this.serverID = serverID; |
| | |
| | | * is participating to a given Replication Domain. |
| | | * @param serverState The serverState to use |
| | | */ |
| | | public ReplicationDomain(String serviceID, short serverID, |
| | | public ReplicationDomain(String serviceID, int serverID, |
| | | ServerState serverState) |
| | | { |
| | | this.serviceID = serviceID; |
| | |
| | | if (!isValidInitialStatus(initStatus)) |
| | | { |
| | | Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(), |
| | | serviceID, Short.toString(serverID)); |
| | | serviceID, Integer.toString(serverID)); |
| | | logError(msg); |
| | | } else |
| | | { |
| | |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(), |
| | | serviceID, Short.toString(serverID)); |
| | | serviceID, Integer.toString(serverID)); |
| | | logError(msg); |
| | | return; |
| | | } |
| | |
| | | * Get the server ID. |
| | | * @return The server ID. |
| | | */ |
| | | public short getServerId() |
| | | public int getServerId() |
| | | { |
| | | return serverID; |
| | | } |
| | |
| | | * Gets the States of all the Replicas currently in the |
| | | * Topology. |
| | | * When this method is called, a Monitoring message will be sent |
| | | * to the Replication to which this domain is currently connected |
| | | * to the Replication Server to which this domain is currently connected |
| | | * so that it computes a table containing information about |
| | | * all Directory Servers in the topology. |
| | | * This Computation involves communications will all the servers |
| | |
| | | * |
| | | * @return The States of all Replicas in the topology (except us) |
| | | */ |
| | | public Map<Short, ServerState> getReplicaStates() |
| | | public Map<Integer, ServerState> getReplicaStates() |
| | | { |
| | | // publish Monitor Request Message to the Replication Server |
| | | broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId())); |
| | |
| | | * @return The server ID of the Replication Server to which the domain |
| | | * is currently connected. |
| | | */ |
| | | public short getRsServerId() |
| | | public int getRsServerId() |
| | | { |
| | | return broker.getRsServerId(); |
| | | } |
| | |
| | | { |
| | | // This is the response to a MonitorRequest that was sent earlier |
| | | // build the replicaStates Map. |
| | | replicaStates = new HashMap<Short, ServerState>(); |
| | | replicaStates = new HashMap<Integer, ServerState>(); |
| | | MonitorMsg monitorMsg = (MonitorMsg) msg; |
| | | Iterator<Short> it = monitorMsg.ldapIterator(); |
| | | Iterator<Integer> it = monitorMsg.ldapIterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | short serverId = it.next(); |
| | | int serverId = it.next(); |
| | | replicaStates.put( |
| | | serverId, monitorMsg.getLDAPServerState(serverId)); |
| | | } |
| | |
| | | * passed server, or creates an initial value of 1 error for it if the server |
| | | * is not yet present in the map. |
| | | * @param errorList |
| | | * @param serverId |
| | | * @param sid |
| | | */ |
| | | private void updateAssuredErrorsByServer(Map<Short,Integer> errorsByServer, |
| | | Short serverId) |
| | | private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer, |
| | | Integer sid) |
| | | { |
| | | synchronized (errorsByServer) |
| | | { |
| | | Integer serverErrCount = errorsByServer.get(serverId); |
| | | Integer serverErrCount = errorsByServer.get(sid); |
| | | if (serverErrCount == null) |
| | | { |
| | | // Server not present in list, create an entry with an |
| | | // initial number of errors set to 1 |
| | | errorsByServer.put(serverId, 1); |
| | | errorsByServer.put(sid, 1); |
| | | } else |
| | | { |
| | | // Server already present in list, just increment number of |
| | | // errors for the server |
| | | int val = serverErrCount.intValue(); |
| | | val++; |
| | | errorsByServer.put(serverId, val); |
| | | errorsByServer.put(sid, val); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | // Some problems detected: message not correclty reached every requested |
| | | // servers. Log problem |
| | | Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(serviceID, |
| | | Short.toString(serverID), update.toString(), ack.errorsToString()); |
| | | Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get( |
| | | serviceID, Integer.toString(serverID), |
| | | update.toString(), ack.errorsToString()); |
| | | logError(errorMsg); |
| | | |
| | | List<Short> failedServers = ack.getFailedServers(); |
| | | List<Integer> failedServers = ack.getFailedServers(); |
| | | |
| | | // Increment assured replication monitoring counters |
| | | switch (updateAssuredMode) |
| | |
| | | assuredSrWrongStatusUpdates.incrementAndGet(); |
| | | if (failedServers != null) // This should always be the case ! |
| | | { |
| | | for(Short sid : failedServers) |
| | | for(Integer sid : failedServers) |
| | | { |
| | | updateAssuredErrorsByServer( |
| | | assuredSrServerNotAcknowledgedUpdates, sid); |
| | |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | if (failedServers != null) // This should always be the case ! |
| | | { |
| | | for(Short sid : failedServers) |
| | | for(Integer sid : failedServers) |
| | | { |
| | | updateAssuredErrorsByServer( |
| | | assuredSdServerTimeoutUpdates, sid); |
| | |
| | | private class ExportThread extends DirectoryThread |
| | | { |
| | | // Id of server that will receive updates |
| | | private short target; |
| | | private int target; |
| | | |
| | | /** |
| | | * Constructor for the ExportThread. |
| | | * |
| | | * @param target Id of server that will receive updates |
| | | * @param i Id of server that will receive updates |
| | | */ |
| | | public ExportThread(short target) |
| | | public ExportThread(int i) |
| | | { |
| | | super("Export thread " + serverID); |
| | | this.target = target; |
| | | this.target = i; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | protected class IEContext |
| | | { |
| | | // Theprivate task that initiated the operation. |
| | | // The private task that initiated the operation. |
| | | Task initializeTask; |
| | | // The destination in the case of an export |
| | | short exportTarget = RoutableMsg.UNKNOWN_SERVER; |
| | | int exportTarget = RoutableMsg.UNKNOWN_SERVER; |
| | | // The source in the case of an import |
| | | short importSource = RoutableMsg.UNKNOWN_SERVER; |
| | | int importSource = RoutableMsg.UNKNOWN_SERVER; |
| | | |
| | | // The total entry count expected to be processed |
| | | long entryCount = 0; |
| | |
| | | * Gets the server id of the exporting server. |
| | | * @return the server id of the exporting server. |
| | | */ |
| | | public short getExportTarget() |
| | | public int getExportTarget() |
| | | { |
| | | return exportTarget; |
| | | } |
| | |
| | | * Gets the server id of the importing server. |
| | | * @return the server id of the importing server. |
| | | */ |
| | | public short getImportSource() |
| | | public int getImportSource() |
| | | { |
| | | return importSource; |
| | | } |
| | |
| | | * @return The source as a short value |
| | | * @throws DirectoryException if the string is not valid |
| | | */ |
| | | public short decodeTarget(String targetString) |
| | | public int decodeTarget(String targetString) |
| | | throws DirectoryException |
| | | { |
| | | short target = 0; |
| | | int target = 0; |
| | | Throwable cause; |
| | | if (targetString.equalsIgnoreCase("all")) |
| | | { |
| | |
| | | // So should be a serverID |
| | | try |
| | | { |
| | | target = Integer.decode(targetString).shortValue(); |
| | | target = Integer.decode(targetString); |
| | | if (target >= 0) |
| | | { |
| | | // FIXME Could we check now that it is a know server in the domain ? |
| | |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeRemote(short target, Task initTask) |
| | | public void initializeRemote(int target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | initializeRemote(target, serverID, initTask); |
| | |
| | | * server that requests the initialization. |
| | | * |
| | | * @param target The target that should be initialized. |
| | | * @param requestorID The server that initiated the export. |
| | | * @param target2 The server that initiated the export. |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | */ |
| | | protected void initializeRemote(short target, short requestorID, |
| | | protected void initializeRemote(int target, int target2, |
| | | Task initTask) throws DirectoryException |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | Short.toString(serverID), |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Short.toString(requestorID)); |
| | | Integer.toString(target2)); |
| | | logError(msg); |
| | | |
| | | boolean contextAcquired=false; |
| | |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initializeMessage = new InitializeTargetMsg( |
| | | serviceID, serverID, target, requestorID, entryCount); |
| | | serviceID, serverID, target, target2, entryCount); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | Short.toString(serverID), |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Short.toString(requestorID)); |
| | | Integer.toString(target2)); |
| | | logError(msg); |
| | | } |
| | | |
| | |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeFromRemote(short source) |
| | | public void initializeFromRemote(int source) |
| | | throws DirectoryException |
| | | { |
| | | initializeFromRemote(source, null); |
| | |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeRemote(short target) throws DirectoryException |
| | | public void initializeRemote(int target) throws DirectoryException |
| | | { |
| | | initializeRemote(target, null); |
| | | } |
| | |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeFromRemote(short source, Task initTask) |
| | | public void initializeFromRemote(int source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | DirectoryException de = null; |
| | | |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | Short.toString(serverID), |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | Short.toString(serverID), |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(serviceID, |
| | | Short.toString(serverID), status.toString(), event.toString()); |
| | | Integer.toString(serverID), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | |
| | | * @throws DirectoryException When the generation ID of the Replication |
| | | * Servers is not the expected value. |
| | | */ |
| | | private void checkGenerationID(long generationID) throws DirectoryException |
| | | private void checkGenerationID(long generationID) |
| | | throws DirectoryException |
| | | { |
| | | boolean flag = false; |
| | | boolean allset = true; |
| | | |
| | | for (int i = 0; i< 10; i++) |
| | | { |
| | | allset = true; |
| | | for (RSInfo rsInfo : getRsList()) |
| | | { |
| | | if (rsInfo.getGenerationId() == generationID) |
| | | { |
| | | flag = true; |
| | | break; |
| | | } |
| | | else |
| | | if (rsInfo.getGenerationId() != generationID) |
| | | { |
| | | try |
| | | { |
| | |
| | | } catch (InterruptedException e) |
| | | { |
| | | } |
| | | allset = false; |
| | | break; |
| | | } |
| | | } |
| | | if (flag) |
| | | if (allset) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | |
| | | if (!flag) |
| | | if (!allset) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID); |
| | |
| | | * @return The number of updates sent in assured safe read mode that have not |
| | | * been acknowledged per server. |
| | | */ |
| | | public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates() |
| | | public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates() |
| | | { |
| | | // Clone a snapshot with synchronized section to have a consistent view in |
| | | // monitoring |
| | | Map<Short, Integer> snapshot = new HashMap<Short, Integer>(); |
| | | Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>(); |
| | | synchronized(assuredSrServerNotAcknowledgedUpdates) |
| | | { |
| | | Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet(); |
| | | for (Short serverId : keySet) |
| | | Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet(); |
| | | for (Integer serverId : keySet) |
| | | { |
| | | Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId); |
| | | snapshot.put(serverId, i); |
| | |
| | | * @return The number of updates sent in assured safe data mode that have not |
| | | * been acknowledged due to timeout error per server. |
| | | */ |
| | | public Map<Short, Integer> getAssuredSdServerTimeoutUpdates() |
| | | public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates() |
| | | { |
| | | // Clone a snapshot with synchronized section to have a consistent view in |
| | | // monitoring |
| | | Map<Short, Integer> snapshot = new HashMap<Short, Integer>(); |
| | | Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>(); |
| | | synchronized(assuredSdServerTimeoutUpdates) |
| | | { |
| | | Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet(); |
| | | for (Short serverId : keySet) |
| | | Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet(); |
| | | for (Integer serverId : keySet) |
| | | { |
| | | Integer i = assuredSdServerTimeoutUpdates.get(serverId); |
| | | snapshot.put(serverId, i); |
| | |
| | | assuredSrTimeoutUpdates = new AtomicInteger(0); |
| | | assuredSrWrongStatusUpdates = new AtomicInteger(0); |
| | | assuredSrReplayErrorUpdates = new AtomicInteger(0); |
| | | assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>(); |
| | | assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>(); |
| | | assuredSrReceivedUpdates = new AtomicInteger(0); |
| | | assuredSrReceivedUpdatesAcked = new AtomicInteger(0); |
| | | assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); |
| | | assuredSdSentUpdates = new AtomicInteger(0); |
| | | assuredSdAcknowledgedUpdates = new AtomicInteger(0); |
| | | assuredSdTimeoutUpdates = new AtomicInteger(0); |
| | | assuredSdServerTimeoutUpdates = new HashMap<Short,Integer>(); |
| | | assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>(); |
| | | } |
| | | |
| | | /* |
| | |
| | | // -> replay error occured |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occured in our server |
| | | List<Short> idList = new ArrayList<Short>(); |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | |
| | | } else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get( |
| | | Short.toString(serverID), msgAssuredMode.toString(), serviceID, |
| | | Integer.toString(serverID), msgAssuredMode.toString(), serviceID, |
| | | msg.toString()); |
| | | logError(errorMsg); |
| | | } else |