| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * An identifier for the Replication Service. |
| | | * All Replication Domain using this identifier will be connected |
| | | * The baseDN for the Replication Service. |
| | | * All Replication Domain using this baseDN will be connected |
| | | * through the Replication Service. |
| | | */ |
| | | private final String serviceID; |
| | | private final String baseDN; |
| | | |
| | | /** |
| | | * The identifier of this Replication Domain inside the |
| | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | // Is assured mode enabled or not for this domain ? |
| | | /** Whether assured mode is enabled for this domain. */ |
| | | private boolean assured = false; |
| | | // Assured sub mode (used when assured is true) |
| | | /** Assured sub mode (used when assured is true). */ |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | // Safe Data level (used when assuredMode is SAFE_DATA) |
| | | private byte assuredSdLevel = (byte)1; |
| | | // The timeout in ms that should be used, when waiting for assured acks |
| | | /** Safe Data level (used when assuredMode is SAFE_DATA). */ |
| | | private byte assuredSdLevel = 1; |
| | | /** The timeout in ms that should be used, when waiting for assured acks. */ |
| | | private long assuredTimeout = 2000; |
| | | |
| | | // Group id |
| | | private byte groupId = (byte)1; |
| | | // Referrals urls to be published to other servers of the topology |
| | | // TODO: fill that with all currently opened urls if no urls configured |
| | | /** Group id. */ |
| | | private byte groupId = 1; |
| | | /** |
| | | * Referrals urls to be published to other servers of the topology. |
| | | * <p> |
| | | * TODO: fill that with all currently opened urls if no urls configured |
| | | */ |
| | | private final List<String> refUrls = new ArrayList<String>(); |
| | | |
| | | /** |
| | |
| | | private AtomicInteger numRcvdUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numSentUpdates = new AtomicInteger(0); |
| | | |
| | | /* Assured replication monitoring counters */ |
| | | /** Assured replication monitoring counters. */ |
| | | |
| | | // Number of updates sent in Assured Mode, Safe Read |
| | | /** Number of updates sent in Assured Mode, Safe Read. */ |
| | | private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Read, that have been |
| | | // successfully acknowledged |
| | | /** |
| | | * Number of updates sent in Assured Mode, Safe Read, that have been |
| | | * successfully acknowledged. |
| | | */ |
| | | private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | // successfully acknowledged (either because of timeout, wrong status or error |
| | | // at replay) |
| | | /** |
| | | * Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | * successfully acknowledged (either because of timeout, wrong status or error |
| | | * at replay). |
| | | */ |
| | | private AtomicInteger assuredSrNotAcknowledgedUpdates = |
| | | new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | // successfully acknowledged because of timeout |
| | | /** |
| | | * Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | * successfully acknowledged because of timeout. |
| | | */ |
| | | private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | // successfully acknowledged because of wrong status |
| | | /** |
| | | * Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | * successfully acknowledged because of wrong status. |
| | | */ |
| | | private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | // successfully acknowledged because of replay error |
| | | /** |
| | | * Number of updates sent in Assured Mode, Safe Read, that have not been |
| | | * successfully acknowledged because of replay error. |
| | | */ |
| | | private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0); |
| | | // Multiple values allowed: number of updates sent in Assured Mode, Safe Read, |
| | | // that have not been successfully acknowledged (either because of timeout, |
| | | // wrong status or error at replay) for a particular server (DS or RS). String |
| | | // format: <server id>:<number of failed updates> |
| | | /** |
| | | * Multiple values allowed: number of updates sent in Assured Mode, Safe Read, |
| | | * that have not been successfully acknowledged (either because of timeout, |
| | | * wrong status or error at replay) for a particular server (DS or RS). |
| | | * <p> |
| | | * String format: <server id>:<number of failed updates> |
| | | */ |
| | | private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | // Number of updates received in Assured Mode, Safe Read request |
| | | /** Number of updates received in Assured Mode, Safe Read request. */ |
| | | private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0); |
| | | // Number of updates received in Assured Mode, Safe Read request that we have |
| | | // acked without errors |
| | | /** |
| | | * Number of updates received in Assured Mode, Safe Read request that we have |
| | | * acked without errors. |
| | | */ |
| | | private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0); |
| | | // Number of updates received in Assured Mode, Safe Read request that we have |
| | | // acked with errors |
| | | /** |
| | | * Number of updates received in Assured Mode, Safe Read request that we have |
| | | * acked with errors. |
| | | */ |
| | | private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Data |
| | | /** Number of updates sent in Assured Mode, Safe Data. */ |
| | | private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Data, that have been |
| | | // successfully acknowledged |
| | | /** |
| | | * Number of updates sent in Assured Mode, Safe Data, that have been |
| | | * successfully acknowledged. |
| | | */ |
| | | private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0); |
| | | // Number of updates sent in Assured Mode, Safe Data, that have not been |
| | | // successfully acknowledged because of timeout |
| | | /** |
| | | * Number of updates sent in Assured Mode, Safe Data, that have not been |
| | | * successfully acknowledged because of timeout. |
| | | */ |
| | | private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0); |
| | | // Multiple values allowed: number of updates sent in Assured Mode, Safe Data, |
| | | // that have not been successfully acknowledged because of timeout for a |
| | | // particular RS. String format: <server id>:<number of failed updates> |
| | | /** |
| | | * Multiple values allowed: number of updates sent in Assured Mode, Safe Data, |
| | | * that have not been successfully acknowledged because of timeout for a |
| | | * particular RS. |
| | | * <p> |
| | | * String format: <server id>:<number of failed updates> |
| | | */ |
| | | private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | |
| | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * |
| | | * @param serviceID The identifier of the Replication Domain to which |
| | | * @param baseDN The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | |
| | | * is participating to a given Replication Domain. |
| | | * @param initWindow Window used during initialization. |
| | | */ |
| | | public ReplicationDomain(String serviceID, int serverID,int initWindow) |
| | | public ReplicationDomain(String baseDN, int serverID,int initWindow) |
| | | { |
| | | this.serviceID = serviceID; |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.initWindow = initWindow; |
| | | this.state = new ServerState(); |
| | | this.generator = new ChangeNumberGenerator(serverID, state); |
| | | |
| | | domains.put(serviceID, this); |
| | | } |
| | | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * |
| | | * @param serviceID The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | */ |
| | | public ReplicationDomain(String serviceID, int serverID) |
| | | { |
| | | this.serviceID = serviceID; |
| | | this.serverID = serverID; |
| | | this.state = new ServerState(); |
| | | this.generator = new ChangeNumberGenerator(serverID, state); |
| | | |
| | | domains.put(serviceID, this); |
| | | domains.put(baseDN, this); |
| | | } |
| | | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * (for unit test purpose only) |
| | | * |
| | | * @param serviceID The identifier of the Replication Domain to which |
| | | * @param baseDN The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | |
| | | * is participating to a given Replication Domain. |
| | | * @param serverState The serverState to use |
| | | */ |
| | | public ReplicationDomain(String serviceID, int serverID, |
| | | public ReplicationDomain(String baseDN, int serverID, |
| | | ServerState serverState) |
| | | { |
| | | this.serviceID = serviceID; |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.state = serverState; |
| | | this.generator = new ChangeNumberGenerator(serverID, state); |
| | | |
| | | domains.put(serviceID, this); |
| | | domains.put(baseDN, this); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (!isValidInitialStatus(initStatus)) |
| | | { |
| | | Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(), |
| | | serviceID, Integer.toString(serverID)); |
| | | baseDN, Integer.toString(serverID)); |
| | | logError(msg); |
| | | } else |
| | | { |
| | |
| | | private void receiveChangeStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + serviceID + |
| | | TRACER.debugInfo("Replication domain " + baseDN + |
| | | " received change status message:\n" + csMsg); |
| | | |
| | | ServerStatus reqStatus = csMsg.getRequestedStatus(); |
| | |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(), |
| | | serviceID, Integer.toString(serverID)); |
| | | baseDN, Integer.toString(serverID)); |
| | | logError(msg); |
| | | return; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the identifier of this domain. |
| | | * Gets the baseDN of this domain. |
| | | * |
| | | * @return The identifier for this domain. |
| | | * @return The baseDN for this domain. |
| | | */ |
| | | public String getServiceID() |
| | | public String getBaseDNString() |
| | | { |
| | | return serviceID; |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (numProcessedUpdates != null) |
| | | return numProcessedUpdates.get(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (numRcvdUpdates != null) |
| | | return numRcvdUpdates.get(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (numSentUpdates != null) |
| | | return numSentUpdates.get(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | return null; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMsg)) |
| | | if (debugEnabled() && !(msg instanceof HeartbeatMsg)) |
| | | { |
| | | TRACER.debugVerbose("Message received <" + msg + ">"); |
| | | } |
| | | |
| | | if (msg instanceof AckMsg) |
| | | { |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] processErrorMsg:" + this.serverID + |
| | | " serviceID: " + this.serviceID + |
| | | " baseDN: " + this.baseDN + |
| | | " Error Msg received: " + errorMsg); |
| | | |
| | | if (errorMsg.getCreationTime() > ieContext.startTime) |
| | |
| | | |
| | | numRcvdUpdates.incrementAndGet(); |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | if ( update.isAssured() && (update.getAssuredMode() == |
| | | AssuredMode.SAFE_READ_MODE) && (rsGroupId == groupId) ) |
| | | if (update.isAssured() |
| | | && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE |
| | | && rsGroupId == groupId) |
| | | { |
| | | assuredSrReceivedUpdates.incrementAndGet(); |
| | | } |
| | |
| | | requested servers. Log problem |
| | | */ |
| | | Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get( |
| | | serviceID, Integer.toString(serverID), |
| | | baseDN, Integer.toString(serverID), |
| | | update.toString(), ack.errorsToString()); |
| | | logError(errorMsg); |
| | | |
| | |
| | | */ |
| | | private class ExportThread extends DirectoryThread |
| | | { |
| | | // Id of server that will be initialized |
| | | private final int serverToInitialize; |
| | | /** Id of server that will be initialized. */ |
| | | private final int serverIdToInitialize; |
| | | private final int initWindow; |
| | | |
| | | |
| | |
| | | /** |
| | | * Constructor for the ExportThread. |
| | | * |
| | | * @param serverToInitialize |
| | | * @param serverIdToInitialize |
| | | * serverId of server that will receive entries |
| | | * @param initWindow |
| | | * The value of the initialization window for flow control between |
| | | * the importer and the exporter. |
| | | */ |
| | | public ExportThread(int serverToInitialize, int initWindow) |
| | | public ExportThread(int serverIdToInitialize, int initWindow) |
| | | { |
| | | super("Export thread from serverId=" + serverID + " to serverId=" |
| | | + serverToInitialize); |
| | | this.serverToInitialize = serverToInitialize; |
| | | + serverIdToInitialize); |
| | | this.serverIdToInitialize = serverIdToInitialize; |
| | | this.initWindow = initWindow; |
| | | } |
| | | |
| | |
| | | TRACER.debugInfo("[IE] starting " + this.getName()); |
| | | try |
| | | { |
| | | initializeRemote(serverToInitialize, serverToInitialize, null, |
| | | initializeRemote(serverIdToInitialize, serverIdToInitialize, null, |
| | | initWindow); |
| | | } catch (DirectoryException de) |
| | | { |
| | |
| | | */ |
| | | protected class IEContext |
| | | { |
| | | // The private task that initiated the operation. |
| | | Task initializeTask; |
| | | // The destination in the case of an export |
| | | int exportTarget = RoutableMsg.UNKNOWN_SERVER; |
| | | // The source in the case of an import |
| | | int importSource = RoutableMsg.UNKNOWN_SERVER; |
| | | /** The private task that initiated the operation. */ |
| | | private Task initializeTask; |
| | | /** The destination in the case of an export. */ |
| | | private int exportTarget = RoutableMsg.UNKNOWN_SERVER; |
| | | /** The source in the case of an import. */ |
| | | private int importSource = RoutableMsg.UNKNOWN_SERVER; |
| | | |
| | | // The total entry count expected to be processed |
| | | long entryCount = 0; |
| | | // The count for the entry not yet processed |
| | | long entryLeftCount = 0; |
| | | /** The total entry count expected to be processed. */ |
| | | private long entryCount = 0; |
| | | /** The count for the entry not yet processed. */ |
| | | private long entryLeftCount = 0; |
| | | |
| | | // Exception raised during the initialization. |
| | | DirectoryException exception = null; |
| | | /** Exception raised during the initialization. */ |
| | | private DirectoryException exception = null; |
| | | |
| | | // Whether the context is related to an import or an export. |
| | | boolean importInProgress; |
| | | /** Whether the context is related to an import or an export. */ |
| | | private boolean importInProgress; |
| | | |
| | | // Current counter of messages exchanged during the initialization |
| | | int msgCnt = 0; |
| | | /** Current counter of messages exchanged during the initialization. */ |
| | | private int msgCnt = 0; |
| | | |
| | | // Number of connections lost when we start the initialization. |
| | | // Will help counting connections lost during initialization, |
| | | int initNumLostConnections = 0; |
| | | /** |
| | | * Number of connections lost when we start the initialization. Will help |
| | | * counting connections lost during initialization, |
| | | */ |
| | | private int initNumLostConnections = 0; |
| | | |
| | | // Request message sent when this server has the initializeFromRemote task. |
| | | InitializeRequestMsg initReqMsgSent = null; |
| | | /** |
| | | * Request message sent when this server has the initializeFromRemote task. |
| | | */ |
| | | private InitializeRequestMsg initReqMsgSent = null; |
| | | |
| | | // Start time of the initialization process. ErrorMsg timestamped |
| | | // before thi startTime will be ignored. |
| | | long startTime; |
| | | /** |
| | | * Start time of the initialization process. ErrorMsg timestamped before |
| | | * this startTime will be ignored. |
| | | */ |
| | | private long startTime; |
| | | |
| | | // List fo replicas (DS) connected to the topology when |
| | | // initialization started. |
| | | Set<Integer> startList = new HashSet<Integer>(0); |
| | | /** |
| | | * List for replicas (DS) connected to the topology when initialization |
| | | * started. |
| | | */ |
| | | private Set<Integer> startList = new HashSet<Integer>(0); |
| | | |
| | | // List fo replicas (DS) with a failure (disconnected from the topology) |
| | | // since the initialization started. |
| | | Set<Integer> failureList = new HashSet<Integer>(0); |
| | | /** |
| | | * List for replicas (DS) with a failure (disconnected from the topology) |
| | | * since the initialization started. |
| | | */ |
| | | private Set<Integer> failureList = new HashSet<Integer>(0); |
| | | |
| | | // Flow control during initialization |
| | | // - for each remote server, counter of messages received |
| | | /** |
| | | * Flow control during initialization: for each remote server, counter of |
| | | * messages received. |
| | | */ |
| | | private final HashMap<Integer, Integer> ackVals = |
| | | new HashMap<Integer, Integer>(); |
| | | // - serverId of the slowest server (the one with the smallest non null |
| | | // counter) |
| | | /** |
| | | * ServerId of the slowest server (the one with the smallest non null |
| | | * counter). |
| | | */ |
| | | private int slowestServerId = -1; |
| | | |
| | | short exporterProtocolVersion = -1; |
| | | private short exporterProtocolVersion = -1; |
| | | |
| | | // Window used during this initialization |
| | | int initWindow; |
| | | /** Window used during this initialization. */ |
| | | private int initWindow; |
| | | |
| | | // Number of attempt already done for this initialization |
| | | short attemptCnt; |
| | | /** Number of attempt already done for this initialization. */ |
| | | private short attemptCnt; |
| | | |
| | | /** |
| | | * Creates a new IEContext. |
| | |
| | | this.importInProgress = importInProgress; |
| | | this.startTime = System.currentTimeMillis(); |
| | | this.attemptCnt = 0; |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get( |
| | | countEntries(), serviceID, serverID); |
| | | countEntries(), baseDN, serverID); |
| | | logError(msg); |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | |
| | | else |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | countEntries(), serviceID, serverID, serverToInitialize); |
| | | countEntries(), baseDN, serverID, serverToInitialize); |
| | | logError(msg); |
| | | |
| | | ieContext.startList.add(serverToInitialize); |
| | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | if (dsi.getDsId() == serverToInitialize) |
| | | if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | if (dsi.getDsId() == serverToInitialize && |
| | | dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | ieContext.setAckVal(dsi.getDsId(), 0); |
| | | } |
| | | } |
| | |
| | | // loop for the case where the exporter is the initiator |
| | | int attempt = 0; |
| | | boolean done = false; |
| | | while ((!done) && (++attempt<2)) // attempt loop |
| | | while (!done && ++attempt < 2) // attempt loop |
| | | { |
| | | try |
| | | { |
| | |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( |
| | | serviceID, serverID, serverToInitialize, serverRunningTheTask, |
| | | baseDN, serverID, serverToInitialize, serverRunningTheTask, |
| | | ieContext.entryCount, initWindow); |
| | | |
| | | broker.publish(initTargetMsg); |
| | |
| | | TRACER.debugInfo( |
| | | "[IE] Exporter wait for reconnection by the listener thread"); |
| | | int att=0; |
| | | while ((!broker.shuttingDown()) && |
| | | (!broker.isConnected())&& (++att<100)) |
| | | while (!broker.shuttingDown() && !broker.isConnected() |
| | | && ++att < 100) |
| | | try { Thread.sleep(100); } |
| | | catch(Exception e){ /* do nothing */ } |
| | | } |
| | | |
| | | if ((initTask != null) && broker.isConnected() && |
| | | (serverToInitialize != RoutableMsg.ALL_SERVERS)) |
| | | if (initTask != null && broker.isConnected() |
| | | && serverToInitialize != RoutableMsg.ALL_SERVERS) |
| | | { |
| | | /* |
| | | NewAttempt case : In the case where |
| | |
| | | |
| | | // Servers that left in the list are those for which we could not test |
| | | // that they have been successfully initialized. |
| | | if (!ieContext.failureList.isEmpty()) |
| | | if (!ieContext.failureList.isEmpty() && exportRootException == null) |
| | | { |
| | | if (exportRootException == null) |
| | | exportRootException = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get( |
| | | Long.toString(getGenerationID()), |
| | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL |
| | | .get(serviceID, serverID, cause); |
| | | .get(baseDN, serverID, cause); |
| | | logError(msg); |
| | | } |
| | | else |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | serviceID, serverID, serverToInitialize, cause); |
| | | baseDN, serverID, serverToInitialize, cause); |
| | | logError(msg); |
| | | } |
| | | |
| | | |
| | | if (exportRootException != null) |
| | | { |
| | | throw(exportRootException); |
| | | throw exportRootException; |
| | | } |
| | | |
| | | } |
| | | |
| | | private String getReplicationMonitorInstanceName() |
| | |
| | | return broker.getReplicationMonitor().getMonitorInstanceName(); |
| | | } |
| | | |
| | | /* |
| | | * For all remote servers in tht start list, |
| | | * - wait it has finished the import and present the expected generationID |
| | | * - build the failureList |
| | | /** |
| | | * For all remote servers in the start list: |
| | | * - wait it has finished the import and present the expected generationID, |
| | | * - build the failureList. |
| | | */ |
| | | private void waitForRemoteStartOfInit() |
| | | { |
| | |
| | | } |
| | | } |
| | | } |
| | | while ((!done) && (waitResultAttempt<1200) // 2mn |
| | | && (!broker.shuttingDown())); |
| | | while (!done && waitResultAttempt < 1200 && !broker.shuttingDown()); |
| | | |
| | | ieContext.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | |
| | | "[IE] wait for start ends with " + ieContext.failureList); |
| | | } |
| | | |
| | | /* |
| | | * For all remote servers in the start list, |
| | | * - wait it has finished the import and present the expected generationID |
| | | * - build the failureList |
| | | /** |
| | | * For all remote servers in the start list: |
| | | * - wait it has finished the import and present the expected generationID, |
| | | * - build the failureList. |
| | | */ |
| | | private void waitForRemoteEndOfInit() |
| | | { |
| | | Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0); |
| | | |
| | | for (Integer sid : ieContext.startList) |
| | | replicasWeAreWaitingFor.add(sid); |
| | | Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>( |
| | | ieContext.startList); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | } // 1sec |
| | | |
| | | } |
| | | while ((!done) && (!broker.shuttingDown())); // infinite wait |
| | | while (!done && !broker.shuttingDown()); // infinite wait |
| | | |
| | | ieContext.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | |
| | | Message errMsg = |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | | ERR_INIT_EXPORTER_DISCONNECTION.get( |
| | | this.serviceID, |
| | | this.baseDN, |
| | | Integer.toString(this.serverID), |
| | | Integer.toString(ieContext.importSource))); |
| | | if (ieContext.getException()==null) |
| | |
| | | we just abandon the export by throwing an exception. |
| | | */ |
| | | if (ieContext.getException() != null) |
| | | throw(new IOException(ieContext.getException().getMessage())); |
| | | throw new IOException(ieContext.getException().getMessage()); |
| | | |
| | | int slowestServerId = ieContext.getSlowestServer(); |
| | | if (isRemoteDSConnected(slowestServerId)==null) |
| | |
| | | |
| | | if (!broker.isConnected()) |
| | | { |
| | | errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID()); |
| | | errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString()); |
| | | } |
| | | |
| | | /* |
| | |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.attemptCnt = 0; |
| | | ieContext.initReqMsgSent = new InitializeRequestMsg( |
| | | serviceID, serverID, source, this.initWindow); |
| | | baseDN, serverID, source, this.initWindow); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(ieContext.initReqMsgSent); |
| | |
| | | // No need to call here updateTaskCompletionState - will be done |
| | | // by the caller |
| | | releaseIEContext(); |
| | | DirectoryException de = new DirectoryException( |
| | | ResultCode.OTHER, |
| | | errMsg); |
| | | throw (de); |
| | | throw new DirectoryException(ResultCode.OTHER, errMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | // Log starting |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | serviceID, initTargetMsgReceived.getSenderID(), serverID); |
| | | baseDN, initTargetMsgReceived.getSenderID(), serverID); |
| | | logError(msg); |
| | | |
| | | // Go into full update status |
| | |
| | | */ |
| | | broker.reStart(false); |
| | | |
| | | if (ieContext.getException() != null) |
| | | { |
| | | if (broker.isConnected() && (initFromTask != null) |
| | | && (++ieContext.attemptCnt<2)) |
| | | if (ieContext.getException() != null |
| | | && broker.isConnected() |
| | | && initFromTask != null |
| | | && ++ieContext.attemptCnt < 2) |
| | | { |
| | | /* |
| | | Worth a new attempt |
| | |
| | | { |
| | | /* |
| | | An error occurs when sending a new request for a new import. |
| | | This error is not stored, prefering to keep the initial one. |
| | | This error is not stored, preferring to keep the initial one. |
| | | */ |
| | | logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get( |
| | | e.getLocalizedMessage(), |
| | | ieContext.getException().getLocalizedMessage())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // =================== |
| | | // No new attempt case |
| | |
| | | |
| | | try |
| | | { |
| | | if (broker.isConnected() && (ieContext.getException() != null)) |
| | | if (broker.isConnected() && ieContext.getException() != null) |
| | | { |
| | | // Let's notify the exporter |
| | | ErrorMsg errorMsg = new ErrorMsg(requesterServerId, |
| | |
| | | finally |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | serviceID, initTargetMsgReceived.getSenderID(), serverID, |
| | | baseDN, initTargetMsgReceived.getSenderID(), serverID, |
| | | (ieContext.getException() != null ? ieContext |
| | | .getException().getLocalizedMessage() : "")); |
| | | logError(msg); |
| | |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(serviceID, |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDN, |
| | | Integer.toString(serverID), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + serviceID + |
| | | " new status is: " + status); |
| | | TRACER.debugInfo("Replication domain " + baseDN + " new status is: " |
| | | + status); |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | |
| | | */ |
| | | public boolean ieRunning() |
| | | { |
| | | return (ieContext != null); |
| | | return ieContext != null; |
| | | } |
| | | |
| | | /** |
| | |
| | | for (RSInfo rsInfo : getRsList()) |
| | | { |
| | | // the 'empty' RSes (generationId==-1) are considered as good citizens |
| | | if ((rsInfo.getGenerationId() != -1) && |
| | | (rsInfo.getGenerationId() != generationID)) |
| | | if (rsInfo.getGenerationId() != -1 && |
| | | rsInfo.getGenerationId() != generationID) |
| | | { |
| | | try |
| | | { |
| | |
| | | if (!allSet) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID); |
| | | Message message = ERR_RESET_GENERATION_ID_FAILED.get(baseDN); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | |
| | | |
| | | // wait for the domain to reconnect. |
| | | int count = 0; |
| | | while (!isConnected() && (count < 10)) |
| | | while (!isConnected() && count < 10) |
| | | { |
| | | try |
| | | { |
| | |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Server id " + serverID + " and domain " + serviceID |
| | | TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN |
| | | + " resetGenerationId " + generationIdNewValue); |
| | | |
| | | ResetGenerationIdMsg genIdMessage; |
| | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID, |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(baseDN, |
| | | Integer.toString(serverID), |
| | | Long.toString(genIdMessage.getGenerationId())); |
| | | throw new DirectoryException( |
| | |
| | | { |
| | | if (broker != null) |
| | | return broker.getMaxRcvWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (broker != null) |
| | | return broker.getCurrentRcvWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (broker != null) |
| | | return broker.getMaxSendWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (broker != null) |
| | | return broker.getCurrentSendWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (broker != null) |
| | | return broker.getNumLostConnections(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (broker != null) |
| | | return broker.getReplicationServer(); |
| | | else |
| | | return ReplicationBroker.NO_CONNECTED_SERVER; |
| | | } |
| | | |
| | |
| | | { |
| | | if (broker == null) |
| | | { |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | // create the broker object used to publish and receive changes |
| | | broker = new ReplicationBroker( |
| | | this, state, serviceID, |
| | | this, state, baseDN, |
| | | serverID, window, |
| | | getGenerationID(), |
| | | heartbeatInterval, |
| | |
| | | { |
| | | synchronized (sessionLock) |
| | | { |
| | | // |
| | | // Create the listener thread |
| | | listenerThread = new ListenerThread(this); |
| | | listenerThread.start(); |
| | |
| | | public void stopDomain() |
| | | { |
| | | disableService(); |
| | | domains.remove(serviceID); |
| | | domains.remove(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | this.groupId = groupId; |
| | | |
| | | if (broker != null) |
| | | { |
| | | if (broker.changeConfig( |
| | | replicationServers, windowSize, heartbeatInterval, groupId)) |
| | | if (broker != null |
| | | && broker.changeConfig(replicationServers, windowSize, |
| | | heartbeatInterval, groupId)) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | |
| | | public void changeConfig(Set<String> includeAttributes, |
| | | Set<String> includeAttributesForDeletes) |
| | | { |
| | | if (setEclIncludes(serverID, includeAttributes, |
| | | includeAttributesForDeletes)) |
| | | { |
| | | if (broker != null) |
| | | if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes) |
| | | && broker != null) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | |
| | | } else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(serverID), msgAssuredMode.toString(), serviceID, |
| | | Integer.toString(serverID), msgAssuredMode.toString(), baseDN, |
| | | msg.toString()); |
| | | logError(errorMsg); |
| | | } |
| | |
| | | * geographical location). If we are connected to a RS which is not in our |
| | | * locality, no need to ask for an ack. |
| | | */ |
| | | if (assured && (rsGroupId == groupId)) |
| | | if (assured && rsGroupId == groupId) |
| | | { |
| | | msg.setAssured(true); |
| | | msg.setAssuredMode(assuredMode); |
| | | if (assuredMode == AssuredMode.SAFE_DATA_MODE) |
| | | msg.setSafeDataLevel(assuredSdLevel); |
| | | |
| | | /* |
| | | Add the assured message to the list of update that are |
| | | waiting for acks |
| | | */ |
| | | // Add the assured message to the list of update that are waiting for acks |
| | | waitingAckMsgs.put(msg.getChangeNumber(), msg); |
| | | } |
| | | } |
| | |
| | | |
| | | // If assured mode configured, wait for acknowledgement for the just sent |
| | | // message |
| | | if (assured && (rsGroupId == groupId)) |
| | | if (assured && rsGroupId == groupId) |
| | | { |
| | | // Increment assured replication monitoring counters |
| | | switch (assuredMode) |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("waitForAck method interrupted for replication " + |
| | | "serviceID: " + serviceID); |
| | | "baseDN: " + baseDN); |
| | | } |
| | | break; |
| | | } |
| | |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message cn: " + cn + |
| | | " and replication servceID: " + serviceID + " after " + |
| | | " and replication servceID: " + baseDN + " after " + |
| | | assuredTimeout + " ms."); |
| | | } else |
| | | { |
| | |
| | | { |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString( |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(baseDN, Long.toString( |
| | | assuredTimeout), update.toString()); |
| | | logError(errorMsg); |
| | | } |
| | |
| | | { |
| | | if (ieContext != null) |
| | | return ieContext.entryLeftCount; |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | |
| | | { |
| | | if (ieContext != null) |
| | | return ieContext.entryCount; |
| | | else |
| | | return 0; |
| | | } |
| | | |