| | |
| | | public DSInfo isRemoteDSConnected(int serverId) |
| | | { |
| | | for (DSInfo remoteDS : getReplicasList()) |
| | | { |
| | | if (remoteDS.getDsId() == serverId) |
| | | { |
| | | return remoteDS; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | |
| | | int getNumProcessedUpdates() |
| | | { |
| | | if (numProcessedUpdates != null) |
| | | { |
| | | return numProcessedUpdates.get(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | int getNumRcvdUpdates() |
| | | { |
| | | if (numRcvdUpdates != null) |
| | | { |
| | | return numRcvdUpdates.get(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | int getNumSentUpdates() |
| | | { |
| | | if (numSentUpdates != null) |
| | | { |
| | | return numSentUpdates.get(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | case SAFE_READ_MODE: |
| | | assuredSrNotAcknowledgedUpdates.incrementAndGet(); |
| | | if (hasTimeout) |
| | | { |
| | | assuredSrTimeoutUpdates.incrementAndGet(); |
| | | } |
| | | if (hasReplayErrors) |
| | | { |
| | | assuredSrReplayErrorUpdates.incrementAndGet(); |
| | | } |
| | | if (hasWrongStatus) |
| | | { |
| | | assuredSrWrongStatusUpdates.incrementAndGet(); |
| | | } |
| | | if (failedServers != null) // This should always be the case ! |
| | | { |
| | | for(Integer sid : failedServers) |
| | |
| | | case SAFE_DATA_MODE: |
| | | // The only possible cause of ack error in safe data mode is timeout |
| | | if (hasTimeout) // So should always be the case |
| | | { |
| | | assuredSdTimeoutUpdates.incrementAndGet(); |
| | | } |
| | | if (failedServers != null) // This should always be the case ! |
| | | { |
| | | for(Integer sid : failedServers) |
| | |
| | | * Flow control during initialization: for each remote server, counter of |
| | | * messages received. |
| | | */ |
| | | private final HashMap<Integer, Integer> ackVals = |
| | | private final Map<Integer, Integer> ackVals = |
| | | new HashMap<Integer, Integer>(); |
| | | /** |
| | | * ServerId of the slowest server (the one with the smallest non null |
| | |
| | | } |
| | | |
| | | /** |
| | | * Only sets the exception that occurred during the import/export if none |
| | | * was already set on this object. |
| | | * |
| | | * @param exception the exception that occurred during the import/export. |
| | | */ |
| | | public void setExceptionIfNoneSet(DirectoryException exception) |
| | | { |
| | | if (exception == null) |
| | | { |
| | | this.exception = exception; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Set the id of the EntryMsg acknowledged from a receiver (importer)server. |
| | | * (updated via the listener thread) |
| | | * @param serverId serverId of the acknowledger/receiver/importer server. |
| | |
| | | // Recompute the server with the minAck returned,means the slowest server. |
| | | slowestServerId = serverId; |
| | | for (Integer sid : ieContext.ackVals.keySet()) |
| | | { |
| | | if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) |
| | | { |
| | | slowestServerId = sid; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | return this.slowestServerId; |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Verifies that the given string represents a valid source |
| | | * from which this server can be initialized. |
| | |
| | | logError(msg); |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | ieContext.startList.add(dsi.getDsId()); |
| | | } |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | ieContext.setAckVal(dsi.getDsId(), 0); |
| | | } |
| | | } |
| | | } |
| | | else |
| | |
| | | { |
| | | if (dsi.getDsId() == serverToInitialize && |
| | | dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | ieContext.setAckVal(dsi.getDsId(), 0); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | ieContext.exportTarget = serverToInitialize; |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | } |
| | | ieContext.initializeCounters(this.countEntries()); |
| | | ieContext.msgCnt = 0; |
| | | ieContext.initNumLostConnections = broker.getNumLostConnections(); |
| | |
| | | catch(DirectoryException exportException) |
| | | { |
| | | // Give priority to the first exception raised - stored in the context |
| | | if (ieContext.exception != null) |
| | | exportRootException = ieContext.exception; |
| | | else |
| | | exportRootException = exportException; |
| | | final DirectoryException ieEx = ieContext.exception; |
| | | exportRootException = ieEx != null ? ieEx : exportException; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | |
| | | int att=0; |
| | | while (!broker.shuttingDown() && !broker.isConnected() |
| | | && ++att < 100) |
| | | { |
| | | try { Thread.sleep(100); } |
| | | catch(Exception e){ /* do nothing */ } |
| | | } |
| | | } |
| | | |
| | | if (initTask != null && broker.isConnected() |
| | |
| | | and the others |
| | | */ |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | replicasWeAreWaitingFor.add(dsi.getDsId()); |
| | | } |
| | | |
| | | boolean done; |
| | | do |
| | |
| | | |
| | | // loop and wait |
| | | if (!done) |
| | | { |
| | | try { Thread.sleep(1000); } |
| | | catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | } // 1sec |
| | | } |
| | | |
| | | } |
| | | while (!done && !broker.shuttingDown()); // infinite wait |
| | |
| | | if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | { |
| | | // The ErrorMsg is received while we have started an initialization |
| | | if (ieContext.getException() == null) |
| | | { |
| | | ieContext.setException( |
| | | new DirectoryException(ResultCode.OTHER, errorMsg.getDetails())); |
| | | } |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, errorMsg.getDetails())); |
| | | |
| | | /* |
| | | * This can happen : |
| | |
| | | msg = broker.receive(false, false, true); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] In " |
| | | + broker.getReplicationMonitorInstanceName() |
| | | + ", receiveEntryBytes " + msg); |
| | | } |
| | | |
| | | if (msg == null) |
| | | { |
| | |
| | | else |
| | | { |
| | | // Handle connection issues |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_IMPORT.get( |
| | | broker.getReplicationServer()))); |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT |
| | | .get(broker.getReplicationServer()))); |
| | | return null; |
| | | } |
| | | } |
| | |
| | | // check the msgCnt of the msg received to check ordering |
| | | if (++ieContext.msgCnt != entryMsg.getMsgId()) |
| | | { |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get( |
| | | String.valueOf(ieContext.msgCnt), |
| | | String.valueOf(entryMsg.getMsgId())))); |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get( |
| | | String.valueOf(ieContext.msgCnt), |
| | | String.valueOf(entryMsg.getMsgId())))); |
| | | return null; |
| | | } |
| | | |
| | |
| | | ieContext.msgCnt); |
| | | broker.publish(amsg, false); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] In " |
| | | + broker.getReplicationMonitorInstanceName() |
| | | + ", publish InitializeRcvAckMsg" + amsg); |
| | | } |
| | | } |
| | | } |
| | | return entryBytes; |
| | |
| | | getBaseDNString(), |
| | | Integer.toString(this.serverID), |
| | | Integer.toString(ieContext.importSource))); |
| | | if (ieContext.getException()==null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | errMsg)); |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, errMsg)); |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); |
| | | } |
| | | } |
| | |
| | | we just abandon the export by throwing an exception. |
| | | */ |
| | | if (ieContext.getException() != null) |
| | | { |
| | | throw new IOException(ieContext.getException().getMessage()); |
| | | } |
| | | |
| | | int slowestServerId = ieContext.getSlowestServer(); |
| | | if (isRemoteDSConnected(slowestServerId)==null) |
| | |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( |
| | | Integer.toString(broker.getRsServerId()))); |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(de); |
| | | ieContext.setExceptionIfNoneSet(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( |
| | | Integer.toString(broker.getRsServerId()))); |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(de); |
| | | ieContext.setExceptionIfNoneSet(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | |
| | | } |
| | | catch (DirectoryException de) |
| | | { |
| | | // store the error in the ieContext ... |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(de); |
| | | ieContext.setExceptionIfNoneSet(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | |
| | | Message errMsg = null; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this); |
| | | } |
| | | |
| | | if (!broker.isConnected()) |
| | | { |
| | |
| | | InitializeTask initFromTask = null; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Entering initialize - domain=" + this); |
| | | } |
| | | |
| | | int source = initTargetMsgReceived.getSenderID(); |
| | | |
| | |
| | | Store the exception raised. It will be considered if no other exception |
| | | has been previously stored in the context |
| | | */ |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(e); |
| | | ieContext.setExceptionIfNoneSet(e); |
| | | } |
| | | finally |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | + " ends import with exception=" + ieContext.getException() |
| | | + " connected=" + broker.isConnected()); |
| | | } |
| | | |
| | | /* |
| | | It is necessary to restart (reconnect to RS) for different reasons |
| | |
| | | // No new attempt case |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | + " ends initialization with exception=" + ieContext.getException() |
| | | + " connected=" + broker.isConnected() |
| | | + " task=" + initFromTask |
| | | + " attempt=" + ieContext.attemptCnt); |
| | | } |
| | | |
| | | try |
| | | { |
| | |
| | | lastStatusChangeDate = new Date(); |
| | | // Reset monitoring counters if reconnection |
| | | if (newStatus == ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | resetMonitoringCounters(); |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication domain " + baseDN + " new status is: " |
| | | + status); |
| | | } |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN |
| | | + " resetGenerationId " + generationIdNewValue); |
| | | } |
| | | |
| | | ResetGenerationIdMsg genIdMessage = |
| | | new ResetGenerationIdMsg(getGenId(generationIdNewValue)); |
| | |
| | | int getMaxRcvWindow() |
| | | { |
| | | if (broker != null) |
| | | { |
| | | return broker.getMaxRcvWindow(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | int getCurrentRcvWindow() |
| | | { |
| | | if (broker != null) |
| | | { |
| | | return broker.getCurrentRcvWindow(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | int getMaxSendWindow() |
| | | { |
| | | if (broker != null) |
| | | { |
| | | return broker.getMaxSendWindow(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | int getCurrentSendWindow() |
| | | { |
| | | if (broker != null) |
| | | { |
| | | return broker.getCurrentSendWindow(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | int getNumLostConnections() |
| | | { |
| | | if (broker != null) |
| | | { |
| | | return broker.getNumLostConnections(); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | public String getReplicationServer() |
| | | { |
| | | if (broker != null) |
| | | { |
| | | return broker.getReplicationServer(); |
| | | } |
| | | return ReplicationBroker.NO_CONNECTED_SERVER; |
| | | } |
| | | |
| | |
| | | msg.setAssured(true); |
| | | msg.setAssuredMode(assuredMode); |
| | | if (assuredMode == AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | msg.setSafeDataLevel(assuredSdLevel); |
| | | } |
| | | |
| | | // Add the assured message to the list of update that are waiting for acks |
| | | waitingAckMsgs.put(msg.getCSN(), msg); |
| | |
| | | long getLeftEntryCount() |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | return ieContext.entryLeftCount; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | long getTotalEntryCount() |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | return ieContext.entryCount; |
| | | } |
| | | return 0; |
| | | } |
| | | |