| | |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | |
| | | * The context related to an import or export being processed |
| | | * Null when none is being processed. |
| | | */ |
| | | protected IEContext ieContext = null; |
| | | private final AtomicReference<IEContext> ieContext = |
| | | new AtomicReference<IEContext>(); |
| | | |
| | | /** |
| | | * The Thread waiting for incoming update messages for this domain and pushing |
| | | * them to the global incoming update message queue for later processing by |
| | | * replay threads. |
| | | */ |
| | | private volatile DirectoryThread listenerThread = null; |
| | | private volatile DirectoryThread listenerThread; |
| | | |
| | | /** |
| | | * A set of counters used for Monitoring. |
| | |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | if (ieContext != null) |
| | | IEContext ieCtx = ieContext.get(); |
| | | if (ieCtx != null) |
| | | { |
| | | /* |
| | | This is an error termination for the 2 following cases : |
| | |
| | | " baseDN: " + getBaseDN() + |
| | | " Error Msg received: " + errorMsg); |
| | | |
| | | if (errorMsg.getCreationTime() > ieContext.startTime) |
| | | if (errorMsg.getCreationTime() > ieCtx.startTime) |
| | | { |
| | | // consider only ErrorMsg that relate to the current import/export |
| | | processErrorMsg(errorMsg); |
| | | processErrorMsg(errorMsg, ieCtx); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | else if (msg instanceof InitializeRcvAckMsg) |
| | | { |
| | | if (ieContext != null) |
| | | IEContext ieCtx = ieContext.get(); |
| | | if (ieCtx != null) |
| | | { |
| | | InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg; |
| | | ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck()); |
| | | ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck()); |
| | | } |
| | | // Trash this msg When no input/export is running/should never happen |
| | | } |
| | |
| | | private long entryLeftCount = 0; |
| | | |
| | | /** Exception raised during the initialization. */ |
| | | private DirectoryException exception = null; |
| | | private DirectoryException exception; |
| | | |
| | | /** Whether the context is related to an import or an export. */ |
| | | private boolean importInProgress; |
| | |
| | | /** |
| | | * Request message sent when this server has the initializeFromRemote task. |
| | | */ |
| | | private InitializeRequestMsg initReqMsgSent = null; |
| | | private InitializeRequestMsg initReqMsgSent; |
| | | |
| | | /** |
| | | * Start time of the initialization process. ErrorMsg timestamped before |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a boolean indicating if a total update import is currently in |
| | | * Progress. |
| | | * |
| | | * @return A boolean indicating if a total update import is currently in |
| | | * Progress. |
| | | */ |
| | | public boolean importInProgress() |
| | | { |
| | | return importInProgress; |
| | | } |
| | | |
| | | /** |
| | | * Returns the total number of entries to be processed when a total update |
| | | * is in progress. |
| | | * |
| | | * @return The total number of entries to be processed when a total update |
| | | * is in progress. |
| | | */ |
| | | long getTotalEntryCount() |
| | | { |
| | | return entryCount; |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of entries still to be processed when a total update |
| | | * is in progress. |
| | | * |
| | | * @return The number of entries still to be processed when a total update |
| | | * is in progress. |
| | | */ |
| | | long getLeftEntryCount() |
| | | { |
| | | return entryLeftCount; |
| | | } |
| | | |
| | | /** |
| | | * Initializes the import/export counters with the provider value. |
| | | * @param total Total number of entries to be processed. |
| | | * @throws DirectoryException if an error occurred. |
| | | */ |
| | | private void initializeCounters(long total) |
| | | throws DirectoryException |
| | | private void initializeCounters(long total) throws DirectoryException |
| | | { |
| | | entryCount = total; |
| | | entryLeftCount = total; |
| | |
| | | * |
| | | * @throws DirectoryException if an error occurred. |
| | | */ |
| | | public void updateCounters(int entriesDone) |
| | | throws DirectoryException |
| | | public void updateCounters(int entriesDone) throws DirectoryException |
| | | { |
| | | entryLeftCount -= entriesDone; |
| | | |
| | |
| | | |
| | | // Recompute the server with the minAck returned,means the slowest server. |
| | | slowestServerId = serverId; |
| | | for (Integer sid : ieContext.ackVals.keySet()) |
| | | for (Integer sid : ieContext.get().ackVals.keySet()) |
| | | { |
| | | if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) |
| | | { |
| | |
| | | int serverRunningTheTask, Task initTask, int initWindow) |
| | | throws DirectoryException |
| | | { |
| | | DirectoryException exportRootException = null; |
| | | |
| | | // Acquire and initialize the export context |
| | | acquireIEContext(false); |
| | | final IEContext ieCtx = acquireIEContext(false); |
| | | |
| | | /* |
| | | We manage the list of servers to initialize in order : |
| | |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | ieContext.startList.add(dsi.getDsId()); |
| | | ieCtx.startList.add(dsi.getDsId()); |
| | | } |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | |
| | | { |
| | | if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | ieContext.setAckVal(dsi.getDsId(), 0); |
| | | ieCtx.setAckVal(dsi.getDsId(), 0); |
| | | } |
| | | } |
| | | } |
| | |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(), |
| | | getBaseDNString(), getServerId(), serverToInitialize)); |
| | | |
| | | ieContext.startList.add(serverToInitialize); |
| | | ieCtx.startList.add(serverToInitialize); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | |
| | | if (dsi.getDsId() == serverToInitialize && |
| | | dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | ieContext.setAckVal(dsi.getDsId(), 0); |
| | | ieCtx.setAckVal(dsi.getDsId(), 0); |
| | | } |
| | | } |
| | | } |
| | | |
| | | DirectoryException exportRootException = null; |
| | | |
| | | // loop for the case where the exporter is the initiator |
| | | int attempt = 0; |
| | | boolean done = false; |
| | |
| | | { |
| | | try |
| | | { |
| | | ieContext.exportTarget = serverToInitialize; |
| | | ieCtx.exportTarget = serverToInitialize; |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | ieCtx.initializeTask = initTask; |
| | | } |
| | | ieContext.initializeCounters(this.countEntries()); |
| | | ieContext.msgCnt = 0; |
| | | ieContext.initNumLostConnections = broker.getNumLostConnections(); |
| | | ieContext.initWindow = initWindow; |
| | | ieCtx.initializeCounters(this.countEntries()); |
| | | ieCtx.msgCnt = 0; |
| | | ieCtx.initNumLostConnections = broker.getNumLostConnections(); |
| | | ieCtx.initWindow = initWindow; |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( |
| | | getBaseDN(), getServerId(), serverToInitialize, |
| | | serverRunningTheTask, ieContext.entryCount, initWindow); |
| | | serverRunningTheTask, ieCtx.entryCount, initWindow); |
| | | |
| | | broker.publish(initTargetMsg); |
| | | |
| | | // Wait for all servers to be ok |
| | | waitForRemoteStartOfInit(); |
| | | waitForRemoteStartOfInit(ieCtx); |
| | | |
| | | // Servers that left in the list are those for which we could not test |
| | | // that they have been successfully initialized. |
| | | if (!ieContext.failureList.isEmpty()) |
| | | if (!ieCtx.failureList.isEmpty()) |
| | | { |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get( |
| | | ieContext.failureList.toString())); |
| | | ieCtx.failureList.toString())); |
| | | } |
| | | |
| | | exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); |
| | |
| | | catch(DirectoryException exportException) |
| | | { |
| | | // Give priority to the first exception raised - stored in the context |
| | | final DirectoryException ieEx = ieContext.exception; |
| | | final DirectoryException ieEx = ieCtx.exception; |
| | | exportRootException = ieEx != null ? ieEx : exportException; |
| | | } |
| | | |
| | |
| | | continue; |
| | | } |
| | | |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(serverToInitialize, |
| | | exportRootException.getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | broker.publish(new ErrorMsg( |
| | | serverToInitialize, exportRootException.getMessageObject())); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | } // attempt loop |
| | | |
| | | // Wait for all servers to be ok, and build the failure list |
| | | waitForRemoteEndOfInit(); |
| | | waitForRemoteEndOfInit(ieCtx); |
| | | |
| | | // Servers that left in the list are those for which we could not test |
| | | // that they have been successfully initialized. |
| | | if (!ieContext.failureList.isEmpty() && exportRootException == null) |
| | | if (!ieCtx.failureList.isEmpty() && exportRootException == null) |
| | | { |
| | | exportRootException = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get( |
| | | Long.toString(getGenerationID()), |
| | | ieContext.failureList.toString())); |
| | | ieCtx.failureList.toString())); |
| | | } |
| | | |
| | | // Don't forget to release IEcontext acquired at beginning. |
| | | releaseIEContext(); |
| | | releaseIEContext(); // FIXME should not this be in a finally? |
| | | |
| | | final String cause = exportRootException == null ? "" |
| | | : exportRootException.getLocalizedMessage(); |
| | |
| | | * - wait it has finished the import and present the expected generationID, |
| | | * - build the failureList. |
| | | */ |
| | | private void waitForRemoteStartOfInit() |
| | | private void waitForRemoteStartOfInit(IEContext ieCtx) |
| | | { |
| | | final Set<Integer> replicasWeAreWaitingFor = |
| | | new HashSet<Integer>(ieContext.startList); |
| | | new HashSet<Integer>(ieCtx.startList); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | + " " + dsi.getStatus() |
| | | + " " + dsi.getGenerationId() |
| | | + " " + getGenerationID()); |
| | | if (ieContext.startList.contains(dsi.getDsId())) |
| | | if (ieCtx.startList.contains(dsi.getDsId())) |
| | | { |
| | | if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | |
| | | } |
| | | while (!done && waitResultAttempt < 1200 && !broker.shuttingDown()); |
| | | |
| | | ieContext.failureList.addAll(replicasWeAreWaitingFor); |
| | | ieCtx.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for start ends with " + ieContext.failureList); |
| | | "[IE] wait for start ends with " + ieCtx.failureList); |
| | | } |
| | | |
| | | /** |
| | |
| | | * - wait it has finished the import and present the expected generationID, |
| | | * - build the failureList. |
| | | */ |
| | | private void waitForRemoteEndOfInit() |
| | | private void waitForRemoteEndOfInit(IEContext ieCtx) |
| | | { |
| | | final Set<Integer> replicasWeAreWaitingFor = |
| | | new HashSet<Integer>(ieContext.startList); |
| | | new HashSet<Integer>(ieCtx.startList); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | while (it.hasNext()) |
| | | { |
| | | int serverId = it.next(); |
| | | if (ieContext.failureList.contains(serverId)) |
| | | if (ieCtx.failureList.contains(serverId)) |
| | | { |
| | | /* |
| | | this server has already been in error during initialization |
| | |
| | | } |
| | | while (!done && !broker.shuttingDown()); // infinite wait |
| | | |
| | | ieContext.failureList.addAll(replicasWeAreWaitingFor); |
| | | ieCtx.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for end ends with " + ieContext.failureList); |
| | | "[IE] wait for end ends with " + ieCtx.failureList); |
| | | } |
| | | |
| | | /** |
| | |
| | | return state; |
| | | } |
| | | |
| | | |
| | | private synchronized void acquireIEContext(boolean importInProgress) |
| | | throws DirectoryException |
| | | /** |
| | | * Acquire and initialize the import/export context, verifying no other |
| | | * import/export is in progress. |
| | | */ |
| | | private IEContext acquireIEContext(boolean importInProgress) |
| | | throws DirectoryException |
| | | { |
| | | if (ieContext != null) |
| | | final IEContext ieCtx = new IEContext(importInProgress); |
| | | if (!ieContext.compareAndSet(null, ieCtx)) |
| | | { |
| | | // Rejects 2 simultaneous exports |
| | | Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | ieContext = new IEContext(importInProgress); |
| | | return ieCtx; |
| | | } |
| | | |
| | | private synchronized void releaseIEContext() |
| | | private void releaseIEContext() |
| | | { |
| | | ieContext = null; |
| | | ieContext.set(null); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param errorMsg The error message received. |
| | | */ |
| | | private void processErrorMsg(ErrorMsg errorMsg) |
| | | private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx) |
| | | { |
| | | //Exporting must not be stopped on the first error, if we run initialize-all |
| | | if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | { |
| | | // The ErrorMsg is received while we have started an initialization |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ieCtx.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, errorMsg.getDetails())); |
| | | |
| | | /* |
| | |
| | | * even after the nextInitAttemptDelay |
| | | * During the import, the ErrorMsg will be received by receiveEntryBytes |
| | | */ |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | if (ieCtx.initializeTask instanceof InitializeTask) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask) ieContext.initializeTask) |
| | | .updateTaskCompletionState(ieContext.getException()); |
| | | ((InitializeTask) ieCtx.initializeTask) |
| | | .updateTaskCompletionState(ieCtx.getException()); |
| | | |
| | | releaseIEContext(); |
| | | } |
| | |
| | | ReplicationMsg msg; |
| | | while (true) |
| | | { |
| | | IEContext ieCtx = ieContext.get(); |
| | | try |
| | | { |
| | | // In the context of the total update, we don't want any automatic |
| | |
| | | else |
| | | { |
| | | // Handle connection issues |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ieCtx.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT |
| | | .get(broker.getReplicationServer()))); |
| | | return null; |
| | |
| | | { |
| | | EntryMsg entryMsg = (EntryMsg)msg; |
| | | byte[] entryBytes = entryMsg.getEntryBytes(); |
| | | ieContext.updateCounters(countEntryLimits(entryBytes)); |
| | | ieCtx.updateCounters(countEntryLimits(entryBytes)); |
| | | |
| | | if (ieContext.exporterProtocolVersion >= |
| | | if (ieCtx.exporterProtocolVersion >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // check the msgCnt of the msg received to check ordering |
| | | if (++ieContext.msgCnt != entryMsg.getMsgId()) |
| | | if (++ieCtx.msgCnt != entryMsg.getMsgId()) |
| | | { |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ieCtx.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get( |
| | | String.valueOf(ieContext.msgCnt), |
| | | String.valueOf(ieCtx.msgCnt), |
| | | String.valueOf(entryMsg.getMsgId())))); |
| | | return null; |
| | | } |
| | | |
| | | // send the ack of flow control mgmt |
| | | if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0) |
| | | if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0) |
| | | { |
| | | final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( |
| | | getServerId(), entryMsg.getSenderID(), ieContext.msgCnt); |
| | | getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt); |
| | | broker.publish(amsg, false); |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | This is an error termination during the import |
| | | The error is stored and the import is ended by returning null |
| | | */ |
| | | if (ieContext.getException() == null) |
| | | if (ieCtx.getException() == null) |
| | | { |
| | | ErrorMsg errMsg = (ErrorMsg)msg; |
| | | if (errMsg.getCreationTime() > ieContext.startTime) |
| | | if (errMsg.getCreationTime() > ieCtx.startTime) |
| | | { |
| | | ieContext.setException( |
| | | ieCtx.setException( |
| | | new DirectoryException(ResultCode.OTHER,errMsg.getDetails())); |
| | | return null; |
| | | } |
| | |
| | | // Other messages received during an import are trashed except |
| | | // the topologyMsg. |
| | | if (msg instanceof TopologyMsg |
| | | && isRemoteDSConnected(ieContext.importSource) == null) |
| | | && isRemoteDSConnected(ieCtx.importSource) == null) |
| | | { |
| | | Message errMsg = |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | | ERR_INIT_EXPORTER_DISCONNECTION.get( |
| | | getBaseDNString(), |
| | | Integer.toString(getServerId()), |
| | | Integer.toString(ieContext.importSource))); |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | Integer.toString(ieCtx.importSource))); |
| | | ieCtx.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, errMsg)); |
| | | return null; |
| | | } |
| | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ieCtx.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); |
| | | } |
| | |
| | | Arrays.toString(lDIFEntry)); |
| | | |
| | | // build the message |
| | | IEContext ieCtx = ieContext.get(); |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length, |
| | | ++ieContext.msgCnt); |
| | | getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length, |
| | | ++ieCtx.msgCnt); |
| | | |
| | | // Waiting the slowest loop |
| | | while (!broker.shuttingDown()) |
| | |
| | | server that have been stored by the listener thread in the ieContext, |
| | | we just abandon the export by throwing an exception. |
| | | */ |
| | | if (ieContext.getException() != null) |
| | | if (ieCtx.getException() != null) |
| | | { |
| | | throw new IOException(ieContext.getException().getMessage()); |
| | | throw new IOException(ieCtx.getException().getMessage()); |
| | | } |
| | | |
| | | int slowestServerId = ieContext.getSlowestServer(); |
| | | int slowestServerId = ieCtx.getSlowestServer(); |
| | | if (isRemoteDSConnected(slowestServerId)==null) |
| | | { |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | ieCtx.setException(new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get( |
| | | Integer.toString(ieContext.getSlowestServer())))); |
| | | Integer.toString(ieCtx.getSlowestServer())))); |
| | | |
| | | throw new IOException("IOException with nested DirectoryException", |
| | | ieContext.getException()); |
| | | ieCtx.getException()); |
| | | } |
| | | |
| | | int ourLastExportedCnt = ieContext.msgCnt; |
| | | int slowestCnt = ieContext.ackVals.get(slowestServerId); |
| | | int ourLastExportedCnt = ieCtx.msgCnt; |
| | | int slowestCnt = ieCtx.ackVals.get(slowestServerId); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " + |
| | | " our=" + ourLastExportedCnt + " slowest=" + slowestCnt); |
| | | |
| | | if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow) |
| | | if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting"); |
| | |
| | | |
| | | // process any connection error |
| | | if (broker.hasConnectionError() |
| | | || broker.getNumLostConnections() != ieContext.initNumLostConnections) |
| | | || broker.getNumLostConnections() != ieCtx.initNumLostConnections) |
| | | { |
| | | // publish failed - store the error in the ieContext ... |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( |
| | | Integer.toString(broker.getRsServerId()))); |
| | | ieContext.setExceptionIfNoneSet(de); |
| | | ieCtx.setExceptionIfNoneSet(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | |
| | | // process any publish error |
| | | if (!sent |
| | | || broker.hasConnectionError() |
| | | || broker.getNumLostConnections() != ieContext.initNumLostConnections) |
| | | || broker.getNumLostConnections() != ieCtx.initNumLostConnections) |
| | | { |
| | | // publish failed - store the error in the ieContext ... |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( |
| | | Integer.toString(broker.getRsServerId()))); |
| | | ieContext.setExceptionIfNoneSet(de); |
| | | ieCtx.setExceptionIfNoneSet(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | |
| | | // publish succeeded |
| | | try |
| | | { |
| | | ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length)); |
| | | ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length)); |
| | | } |
| | | catch (DirectoryException de) |
| | | { |
| | | ieContext.setExceptionIfNoneSet(de); |
| | | ieCtx.setExceptionIfNoneSet(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | |
| | | public void initializeFromRemote(int source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | Message errMsg = null; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this); |
| | | } |
| | | |
| | | if (!broker.isConnected()) |
| | | { |
| | | errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString()); |
| | | } |
| | | Message errMsg = !broker.isConnected() |
| | | ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString()) |
| | | : null; |
| | | |
| | | /* |
| | | We must not test here whether the remote source is connected to |
| | |
| | | update the task. |
| | | */ |
| | | |
| | | acquireIEContext(true); //test and set if no import already in progress |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.attemptCnt = 0; |
| | | ieContext.initReqMsgSent = new InitializeRequestMsg( |
| | | final IEContext ieCtx = acquireIEContext(true); |
| | | ieCtx.initializeTask = initTask; |
| | | ieCtx.attemptCnt = 0; |
| | | ieCtx.initReqMsgSent = new InitializeRequestMsg( |
| | | getBaseDN(), getServerId(), source, getInitWindow()); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(ieContext.initReqMsgSent); |
| | | broker.publish(ieCtx.initReqMsgSent); |
| | | |
| | | /* |
| | | The normal success processing is now to receive InitTargetMsg then |
| | |
| | | |
| | | int source = initTargetMsgReceived.getSenderID(); |
| | | |
| | | IEContext ieCtx = ieContext.get(); |
| | | try |
| | | { |
| | | // Log starting |
| | |
| | | server. |
| | | Test and set if no import already in progress |
| | | */ |
| | | acquireIEContext(true); |
| | | ieCtx = acquireIEContext(true); |
| | | } |
| | | |
| | | // Initialize stuff |
| | | ieContext.importSource = source; |
| | | ieContext.initializeCounters(initTargetMsgReceived.getEntryCount()); |
| | | ieContext.initWindow = initTargetMsgReceived.getInitWindow(); |
| | | ieCtx.importSource = source; |
| | | ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount()); |
| | | ieCtx.initWindow = initTargetMsgReceived.getInitWindow(); |
| | | // Protocol version is -1 when not known. |
| | | ieContext.exporterProtocolVersion = getProtocolVersion(source); |
| | | initFromTask = (InitializeTask)ieContext.initializeTask; |
| | | ieCtx.exporterProtocolVersion = getProtocolVersion(source); |
| | | initFromTask = (InitializeTask) ieCtx.initializeTask; |
| | | |
| | | // Launch the import |
| | | importBackend(new ReplInputStream(this)); |
| | |
| | | Store the exception raised. It will be considered if no other exception |
| | | has been previously stored in the context |
| | | */ |
| | | ieContext.setExceptionIfNoneSet(e); |
| | | ieCtx.setExceptionIfNoneSet(e); |
| | | } |
| | | finally |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | + " ends import with exception=" + ieContext.getException() |
| | | + " ends import with exception=" + ieCtx.getException() |
| | | + " connected=" + broker.isConnected()); |
| | | } |
| | | |
| | |
| | | */ |
| | | broker.reStart(false); |
| | | |
| | | if (ieContext.getException() != null |
| | | if (ieCtx.getException() != null |
| | | && broker.isConnected() |
| | | && initFromTask != null |
| | | && ++ieContext.attemptCnt < 2) |
| | | && ++ieCtx.attemptCnt < 2) |
| | | { |
| | | /* |
| | | Worth a new attempt |
| | |
| | | the request |
| | | */ |
| | | logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get( |
| | | ieContext.getException().getLocalizedMessage())); |
| | | ieCtx.getException().getLocalizedMessage())); |
| | | |
| | | broker.publish(ieContext.initReqMsgSent); |
| | | broker.publish(ieCtx.initReqMsgSent); |
| | | |
| | | ieContext.initializeCounters(0); |
| | | ieContext.exception = null; |
| | | ieContext.msgCnt = 0; |
| | | ieCtx.initializeCounters(0); |
| | | ieCtx.exception = null; |
| | | ieCtx.msgCnt = 0; |
| | | |
| | | // Processing of the received initTargetMsgReceived is done |
| | | // let's wait for the next one |
| | |
| | | */ |
| | | logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get( |
| | | e.getLocalizedMessage(), |
| | | ieContext.getException().getLocalizedMessage())); |
| | | ieCtx.getException().getLocalizedMessage())); |
| | | } |
| | | } |
| | | |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | + " ends initialization with exception=" + ieContext.getException() |
| | | + " ends initialization with exception=" + ieCtx.getException() |
| | | + " connected=" + broker.isConnected() |
| | | + " task=" + initFromTask |
| | | + " attempt=" + ieContext.attemptCnt); |
| | | + " attempt=" + ieCtx.attemptCnt); |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (broker.isConnected() && ieContext.getException() != null) |
| | | if (broker.isConnected() && ieCtx.getException() != null) |
| | | { |
| | | // Let's notify the exporter |
| | | ErrorMsg errorMsg = new ErrorMsg(requesterServerId, |
| | | ieContext.getException().getMessageObject()); |
| | | ieCtx.getException().getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | /* |
| | |
| | | */ |
| | | if (initFromTask != null) |
| | | { |
| | | initFromTask.updateTaskCompletionState(ieContext.getException()); |
| | | initFromTask.updateTaskCompletionState(ieCtx.getException()); |
| | | } |
| | | } |
| | | finally |
| | |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), |
| | | getServerId(), |
| | | (ieContext.getException() == null ? "" |
| | | : ieContext.getException().getLocalizedMessage())); |
| | | (ieCtx.getException() == null ? "" |
| | | : ieCtx.getException().getLocalizedMessage())); |
| | | logError(msg); |
| | | releaseIEContext(); |
| | | } // finally |
| | |
| | | */ |
| | | public boolean ieRunning() |
| | | { |
| | | return ieContext != null; |
| | | return ieContext.get() != null; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a boolean indicating if a total update import is currently |
| | | * in Progress. |
| | | * Returns the Import/Export context associated to this ReplicationDomain. |
| | | * |
| | | * @return A boolean indicating if a total update import is currently |
| | | * in Progress. |
| | | * @return the Import/Export context associated to this ReplicationDomain |
| | | */ |
| | | public boolean importInProgress() |
| | | protected IEContext getImportExportContext() |
| | | { |
| | | return ieContext != null && ieContext.importInProgress; |
| | | } |
| | | |
| | | /** |
| | | * Returns a boolean indicating if a total update export is currently |
| | | * in Progress. |
| | | * |
| | | * @return A boolean indicating if a total update export is currently |
| | | * in Progress. |
| | | */ |
| | | public boolean exportInProgress() |
| | | { |
| | | return ieContext != null && !ieContext.importInProgress; |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of entries still to be processed when a total update |
| | | * is in progress. |
| | | * |
| | | * @return The number of entries still to be processed when a total update |
| | | * is in progress. |
| | | */ |
| | | long getLeftEntryCount() |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | return ieContext.entryLeftCount; |
| | | } |
| | | return 0; |
| | | return ieContext.get(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the total number of entries to be processed when a total update |
| | | * is in progress. |
| | | * |
| | | * @return The total number of entries to be processed when a total update |
| | | * is in progress. |
| | | */ |
| | | long getTotalEntryCount() |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | return ieContext.entryCount; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Set the attributes configured on a server to be included in the ECL. |
| | | * |
| | | * @param serverId |
| | |
| | | * The serverId for which we want the include attributes. |
| | | * @return The attributes. |
| | | */ |
| | | public Set<String> getEclIncludes(int serverId) |
| | | Set<String> getEclIncludes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |
| | |
| | | * The serverId for which we want the include attributes. |
| | | * @return The attributes. |
| | | */ |
| | | public Set<String> getEclIncludesForDeletes(int serverId) |
| | | Set<String> getEclIncludesForDeletes(int serverId) |
| | | { |
| | | synchronized (eclIncludesLock) |
| | | { |