| | |
| | | * The context related to an import or export being processed |
| | | * Null when none is being processed. |
| | | */ |
| | | private final AtomicReference<IEContext> ieContext = |
| | | new AtomicReference<IEContext>(); |
| | | private final AtomicReference<ImportExportContext> importExportContext = |
| | | new AtomicReference<ImportExportContext>(); |
| | | |
| | | /** |
| | | * The Thread waiting for incoming update messages for this domain and pushing |
| | |
| | | * @return the info related to this remote server if it is connected, |
| | | * null is the server is NOT connected. |
| | | */ |
| | | private DSInfo isRemoteDSConnected(int dsId) |
| | | private DSInfo getConnectedRemoteDS(int dsId) |
| | | { |
| | | return getReplicaInfos().get(dsId); |
| | | } |
| | |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | IEContext ieCtx = ieContext.get(); |
| | | ImportExportContext ieCtx = importExportContext.get(); |
| | | if (ieCtx != null) |
| | | { |
| | | /* |
| | |
| | | } |
| | | else if (msg instanceof InitializeRcvAckMsg) |
| | | { |
| | | IEContext ieCtx = ieContext.get(); |
| | | ImportExportContext ieCtx = importExportContext.get(); |
| | | if (ieCtx != null) |
| | | { |
| | | InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg; |
| | |
| | | } |
| | | |
| | | /** |
| | | * This class contain the context related to an import or export |
| | | * launched on the domain. |
| | | * This class contains the context related to an import or export launched on |
| | | * the domain. |
| | | */ |
| | | protected class IEContext |
| | | protected class ImportExportContext |
| | | { |
| | | /** The private task that initiated the operation. */ |
| | | private Task initializeTask; |
| | |
| | | * for and import, false if the IEContext |
| | | * will be used for and export. |
| | | */ |
| | | private IEContext(boolean importInProgress) |
| | | private ImportExportContext(boolean importInProgress) |
| | | { |
| | | this.importInProgress = importInProgress; |
| | | this.startTime = System.currentTimeMillis(); |
| | |
| | | |
| | | // Recompute the server with the minAck returned,means the slowest server. |
| | | slowestServerId = serverId; |
| | | for (Integer sid : ieContext.get().ackVals.keySet()) |
| | | for (Integer sid : importExportContext.get().ackVals.keySet()) |
| | | { |
| | | if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) |
| | | { |
| | |
| | | int serverRunningTheTask, Task initTask, int initWindow) |
| | | throws DirectoryException |
| | | { |
| | | final IEContext ieCtx = acquireIEContext(false); |
| | | final ImportExportContext ieCtx = acquireIEContext(false); |
| | | |
| | | /* |
| | | We manage the list of servers to initialize in order : |
| | |
| | | logger.trace( |
| | | "[IE] Exporter wait for reconnection by the listener thread"); |
| | | int att=0; |
| | | while (!broker.shuttingDown() && !broker.isConnected() |
| | | while (!broker.shuttingDown() |
| | | && !broker.isConnected() |
| | | && ++att < 100) |
| | | { |
| | | try { Thread.sleep(100); } |
| | |
| | | } |
| | | } |
| | | |
| | | if (initTask != null && broker.isConnected() |
| | | if (initTask != null |
| | | && broker.isConnected() |
| | | && serverToInitialize != RoutableMsg.ALL_SERVERS) |
| | | { |
| | | /* |
| | |
| | | * - wait it has finished the import and present the expected generationID, |
| | | * - build the failureList. |
| | | */ |
| | | private void waitForRemoteStartOfInit(IEContext ieCtx) |
| | | private void waitForRemoteStartOfInit(ImportExportContext ieCtx) |
| | | { |
| | | final Set<Integer> replicasWeAreWaitingFor = |
| | | new HashSet<Integer>(ieCtx.startList); |
| | |
| | | * - wait it has finished the import and present the expected generationID, |
| | | * - build the failureList. |
| | | */ |
| | | private void waitForRemoteEndOfInit(IEContext ieCtx) |
| | | private void waitForRemoteEndOfInit(ImportExportContext ieCtx) |
| | | { |
| | | final Set<Integer> replicasWeAreWaitingFor = |
| | | new HashSet<Integer>(ieCtx.startList); |
| | |
| | | continue; |
| | | } |
| | | |
| | | DSInfo dsInfo = isRemoteDSConnected(serverId); |
| | | DSInfo dsInfo = getConnectedRemoteDS(serverId); |
| | | if (dsInfo == null) |
| | | { |
| | | /* |
| | |
| | | * Acquire and initialize the import/export context, verifying no other |
| | | * import/export is in progress. |
| | | */ |
| | | private IEContext acquireIEContext(boolean importInProgress) |
| | | private ImportExportContext acquireIEContext(boolean importInProgress) |
| | | throws DirectoryException |
| | | { |
| | | final IEContext ieCtx = new IEContext(importInProgress); |
| | | if (!ieContext.compareAndSet(null, ieCtx)) |
| | | final ImportExportContext ieCtx = new ImportExportContext(importInProgress); |
| | | if (!importExportContext.compareAndSet(null, ieCtx)) |
| | | { |
| | | // Rejects 2 simultaneous exports |
| | | LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); |
| | |
| | | |
| | | private void releaseIEContext() |
| | | { |
| | | ieContext.set(null); |
| | | importExportContext.set(null); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param errorMsg The error message received. |
| | | */ |
| | | private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx) |
| | | private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx) |
| | | { |
| | | //Exporting must not be stopped on the first error, if we run initialize-all |
| | | if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS) |
| | |
| | | ReplicationMsg msg; |
| | | while (true) |
| | | { |
| | | IEContext ieCtx = ieContext.get(); |
| | | ImportExportContext ieCtx = importExportContext.get(); |
| | | try |
| | | { |
| | | // In the context of the total update, we don't want any automatic |
| | |
| | | // Other messages received during an import are trashed except |
| | | // the topologyMsg. |
| | | if (msg instanceof TopologyMsg |
| | | && isRemoteDSConnected(ieCtx.importSource) == null) |
| | | && getConnectedRemoteDS(ieCtx.importSource) == null) |
| | | { |
| | | LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get( |
| | | getBaseDNString(), getServerId(), ieCtx.importSource); |
| | |
| | | Arrays.toString(lDIFEntry)); |
| | | |
| | | // build the message |
| | | IEContext ieCtx = ieContext.get(); |
| | | ImportExportContext ieCtx = importExportContext.get(); |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length, |
| | | ++ieCtx.msgCnt); |
| | |
| | | } |
| | | |
| | | int slowestServerId = ieCtx.getSlowestServer(); |
| | | if (isRemoteDSConnected(slowestServerId)==null) |
| | | if (getConnectedRemoteDS(slowestServerId) == null) |
| | | { |
| | | ieCtx.setException(new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer()))); |
| | |
| | | update the task. |
| | | */ |
| | | |
| | | final IEContext ieCtx = acquireIEContext(true); |
| | | final ImportExportContext ieCtx = acquireIEContext(true); |
| | | ieCtx.initializeTask = initTask; |
| | | ieCtx.attemptCnt = 0; |
| | | ieCtx.initReqMsgSent = new InitializeRequestMsg( |
| | |
| | | |
| | | int source = initTargetMsgReceived.getSenderID(); |
| | | |
| | | IEContext ieCtx = ieContext.get(); |
| | | ImportExportContext ieCtx = importExportContext.get(); |
| | | try |
| | | { |
| | | // Log starting |
| | |
| | | */ |
| | | public boolean ieRunning() |
| | | { |
| | | return ieContext.get() != null; |
| | | return importExportContext.get() != null; |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return the Import/Export context associated to this ReplicationDomain |
| | | */ |
| | | protected IEContext getImportExportContext() |
| | | protected ImportExportContext getImportExportContext() |
| | | { |
| | | return ieContext.get(); |
| | | return importExportContext.get(); |
| | | } |
| | | |
| | | /** |