| | |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | |
| | | * The context related to an import or export being processed |
| | | * Null when none is being processed. |
| | | */ |
| | | volatile IEContext ieContext; |
| | | private final AtomicReference<IEContext> ieContext = |
| | | new AtomicReference<IEContext>(); |
| | | |
| | | /** |
| | | * The Thread waiting for incoming update messages for this domain and pushing |
| | |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | IEContext ieCtx = ieContext; |
| | | IEContext ieCtx = ieContext.get(); |
| | | if (ieCtx != null) |
| | | { |
| | | /* |
| | |
| | | } |
| | | else if (msg instanceof InitializeRcvAckMsg) |
| | | { |
| | | IEContext ieCtx = ieContext; |
| | | IEContext ieCtx = ieContext.get(); |
| | | if (ieCtx != null) |
| | | { |
| | | InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg; |
| | |
| | | |
| | | // Recompute the server with the minAck returned,means the slowest server. |
| | | slowestServerId = serverId; |
| | | for (Integer sid : ieContext.ackVals.keySet()) |
| | | for (Integer sid : ieContext.get().ackVals.keySet()) |
| | | { |
| | | if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) |
| | | { |
| | |
| | | int serverRunningTheTask, Task initTask, int initWindow) |
| | | throws DirectoryException |
| | | { |
| | | DirectoryException exportRootException = null; |
| | | |
| | | // Acquire and initialize the export context |
| | | acquireIEContext(false); |
| | | final IEContext ieCtx = acquireIEContext(false); |
| | | |
| | | /* |
| | | We manage the list of servers to initialize in order : |
| | |
| | | - to update the task with the server(s) where this test failed |
| | | */ |
| | | |
| | | IEContext ieCtx = ieContext; |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get( |
| | |
| | | } |
| | | } |
| | | |
| | | DirectoryException exportRootException = null; |
| | | |
| | | // loop for the case where the exporter is the initiator |
| | | int attempt = 0; |
| | | boolean done = false; |
| | |
| | | } |
| | | |
| | | // Don't forget to release IEcontext acquired at beginning. |
| | | releaseIEContext(); |
| | | releaseIEContext(); // FIXME should not this be in a finally? |
| | | |
| | | final String cause = exportRootException == null ? "" |
| | | : exportRootException.getLocalizedMessage(); |
| | |
| | | return state; |
| | | } |
| | | |
| | | |
| | | private synchronized void acquireIEContext(boolean importInProgress) |
| | | /** |
| | | * Acquire and initialize the import/export context, verifying no other |
| | | * import/export is in progress. |
| | | */ |
| | | private IEContext acquireIEContext(boolean importInProgress) |
| | | throws DirectoryException |
| | | { |
| | | if (ieContext != null) |
| | | final IEContext ieCtx = new IEContext(importInProgress); |
| | | if (!ieContext.compareAndSet(null, ieCtx)) |
| | | { |
| | | // Rejects 2 simultaneous exports |
| | | Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | ieContext = new IEContext(importInProgress); |
| | | return ieCtx; |
| | | } |
| | | |
| | | private synchronized void releaseIEContext() |
| | | private void releaseIEContext() |
| | | { |
| | | ieContext = null; |
| | | ieContext.set(null); |
| | | } |
| | | |
| | | /** |
| | |
| | | ReplicationMsg msg; |
| | | while (true) |
| | | { |
| | | IEContext ieCtx = ieContext; |
| | | IEContext ieCtx = ieContext.get(); |
| | | try |
| | | { |
| | | // In the context of the total update, we don't want any automatic |
| | |
| | | Arrays.toString(lDIFEntry)); |
| | | |
| | | // build the message |
| | | IEContext ieCtx = ieContext; |
| | | IEContext ieCtx = ieContext.get(); |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length, |
| | | ++ieCtx.msgCnt); |
| | |
| | | public void initializeFromRemote(int source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | Message errMsg = null; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this); |
| | | } |
| | | |
| | | if (!broker.isConnected()) |
| | | { |
| | | errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString()); |
| | | } |
| | | Message errMsg = !broker.isConnected() |
| | | ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString()) |
| | | : null; |
| | | |
| | | /* |
| | | We must not test here whether the remote source is connected to |
| | |
| | | update the task. |
| | | */ |
| | | |
| | | acquireIEContext(true); //test and set if no import already in progress |
| | | IEContext ieCtx = ieContext; |
| | | final IEContext ieCtx = acquireIEContext(true); |
| | | ieCtx.initializeTask = initTask; |
| | | ieCtx.attemptCnt = 0; |
| | | ieCtx.initReqMsgSent = new InitializeRequestMsg( |
| | |
| | | |
| | | int source = initTargetMsgReceived.getSenderID(); |
| | | |
| | | IEContext ieCtx = ieContext; |
| | | IEContext ieCtx = ieContext.get(); |
| | | try |
| | | { |
| | | // Log starting |
| | |
| | | server. |
| | | Test and set if no import already in progress |
| | | */ |
| | | acquireIEContext(true); |
| | | ieCtx = acquireIEContext(true); |
| | | } |
| | | |
| | | // Initialize stuff |
| | |
| | | */ |
| | | public boolean ieRunning() |
| | | { |
| | | return ieContext != null; |
| | | return ieContext.get() != null; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | protected IEContext getImportExportContext() |
| | | { |
| | | return ieContext; |
| | | return ieContext.get(); |
| | | } |
| | | |
| | | /** |