| | |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | |
| | | import org.opends.server.replication.protocol.HeartbeatMsg; |
| | | import org.opends.server.replication.protocol.InitializeRequestMsg; |
| | | import org.opends.server.replication.protocol.InitializeTargetMsg; |
| | | import org.opends.server.replication.protocol.InitializeRcvAckMsg; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | |
| | | private Map<Integer, Integer> assuredSdServerTimeoutUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | |
| | | /** |
| | | * Window size used during initialization .. between |
| | | * - the initializer/exporter DS that listens/waits acknowledges and that |
| | | * slows down data msg publishing based on the slowest server |
| | | * - and each initialized/importer DS that publishes acknowledges each |
| | | * WINDOW/2 data msg received. |
| | | */ |
| | | protected int initWindow = 100; |
| | | |
| | | /* Status related monitoring fields */ |
| | | |
| | | // Indicates the date when the status changed. This may be used to indicate |
| | |
| | | * to the Replication Domain. |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | * @param initWindow Window used during initialization. |
| | | */ |
| | | public ReplicationDomain(String serviceID, int serverID,int initWindow) |
| | | { |
| | | this.serviceID = serviceID; |
| | | this.serverID = serverID; |
| | | this.initWindow = initWindow; |
| | | this.state = new ServerState(); |
| | | this.generator = new ChangeNumberGenerator(serverID, state); |
| | | |
| | | domains.put(serviceID, this); |
| | | } |
| | | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * |
| | | * @param serviceID The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | */ |
| | | public ReplicationDomain(String serviceID, int serverID) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if a remote replica (DS) is connected to the topology based on |
| | | * the TopologyMsg we received when the remote replica connected or |
| | | * disconnected. |
| | | * |
| | | * @param serverId The provided serverId of the remote replica |
| | | * @return whether the remote replica is connected or not. |
| | | */ |
| | | public boolean isRemoteDSConnected(int serverId) |
| | | { |
| | | for (DSInfo remoteDS : getReplicasList()) |
| | | if (remoteDS.getDsId() == serverId) |
| | | return true; |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Gets the States of all the Replicas currently in the |
| | | * Topology. |
| | | * When this method is called, a Monitoring message will be sent |
| | |
| | | |
| | | /** |
| | | * Receives an update message from the replicationServer. |
| | | * also responsible for updating the list of pending changes |
| | | * The other types of messages are processed in an opaque way for the caller. |
| | | * Also responsible for updating the list of pending changes |
| | | * @return the received message - null if none |
| | | */ |
| | | UpdateMsg receive() |
| | |
| | | |
| | | while (update == null) |
| | | { |
| | | InitializeRequestMsg initMsg = null; |
| | | InitializeRequestMsg initReqMsg = null; |
| | | ReplicationMsg msg; |
| | | try |
| | | { |
| | | msg = broker.receive(true); |
| | | msg = broker.receive(true, true, false); |
| | | if (msg == null) |
| | | { |
| | | // The server is in the shutdown process |
| | |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | initMsg = (InitializeRequestMsg)msg; |
| | | initReqMsg = (InitializeRequestMsg)msg; |
| | | } |
| | | else if (msg instanceof InitializeTargetMsg) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMsg importMsg = (InitializeTargetMsg) msg; |
| | | InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg; |
| | | |
| | | try |
| | | { |
| | | // This must be done while we are still holding the |
| | | // broker lock because we are now going to receive a |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | initialize(importMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(de.getMessageObject()); |
| | | TRACER.debugInfo(Message.toString(mb.toMessage())); |
| | | broker.publish(errorMsg); |
| | | logError(de.getMessageObject()); |
| | | } |
| | | // This must be done while we are still holding the |
| | | // broker lock because we are now going to receive a |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | initialize(initTargetMsg, initTargetMsg.getSenderID()); |
| | | } |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMsg)msg); |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find the import source. |
| | | // |
| | | // A remote error during the import will be received in the |
| | | // receiveEntryBytes() method. |
| | | // |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] processErrorMsg:" + this.serverID + |
| | | " serviceID: " + this.serviceID + |
| | | " Error Msg received: " + errorMsg); |
| | | |
| | | if (errorMsg.getCreationTime() > ieContext.startTime) |
| | | { |
| | | // consider only ErrorMsg that relate to the current import/export |
| | | processErrorMsg(errorMsg); |
| | | } |
| | | else |
| | | { |
| | | // Simply log - happen when the ErrorMsg relates to a previous |
| | | // attempt of initialization while we have started a new one |
| | | // on this side. |
| | | logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails())); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * Log error message |
| | | */ |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | // Simply log - happen if import/export has been terminated |
| | | // on our side before receiving this ErrorMsg. |
| | | logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails())); |
| | | } |
| | | } |
| | | else if (msg instanceof ChangeStatusMsg) |
| | |
| | | update = (UpdateMsg) msg; |
| | | generator.adjust(update.getChangeNumber()); |
| | | } |
| | | else if (msg instanceof InitializeRcvAckMsg) |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg; |
| | | ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck()); |
| | | } |
| | | // Trash this msg When no input/export is running/should never happen |
| | | } |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | |
| | | // entries to the remote can be handled by the other |
| | | // replay thread when they call this method and therefore the |
| | | // broker.receive() method. |
| | | if (initMsg != null) |
| | | if (initReqMsg != null) |
| | | { |
| | | // Do this work in a thread to allow replay thread continue working |
| | | ExportThread exportThread = new ExportThread(initMsg.getsenderID()); |
| | | ExportThread exportThread = new ExportThread( |
| | | initReqMsg.getSenderID(), initReqMsg.getInitWindow()); |
| | | exportThread.start(); |
| | | } |
| | | } |
| | |
| | | */ |
| | | |
| | | /** |
| | | * This thread is launched when we want to export data to another server that |
| | | * has requested to be initialized with the data of our backend. |
| | | * This thread is launched when we want to export data to another server. |
| | | * |
| | | * When a task is created locally (so this local server is the initiator) |
| | | * of the export (Exemple: dsreplication initialize-all), |
| | | * this thread is NOT used but the task thread is running the export instead). |
| | | */ |
| | | private class ExportThread extends DirectoryThread |
| | | { |
| | | // Id of server that will receive updates |
| | | private int target; |
| | | // Id of server that will be initialized |
| | | private int serverToInitialize; |
| | | private int initWindow; |
| | | |
| | | /** |
| | | * Constructor for the ExportThread. |
| | | * |
| | | * @param i Id of server that will receive updates |
| | | * @param serverToInitialize serverId of server that will receive entries |
| | | */ |
| | | public ExportThread(int i) |
| | | public ExportThread(int serverToInitialize, int initWindow) |
| | | { |
| | | super("Export thread " + serverID); |
| | | this.target = i; |
| | | super("Export thread from serverId=" + serverID |
| | | + " to serverId=" + serverToInitialize); |
| | | this.serverToInitialize = serverToInitialize; |
| | | this.initWindow = initWindow; |
| | | } |
| | | |
| | | /** |
| | |
| | | public void run() |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Export thread starting."); |
| | | } |
| | | |
| | | TRACER.debugInfo("[IE] starting " + this.getName()); |
| | | try |
| | | { |
| | | initializeRemote(target, target, null); |
| | | initializeRemote(serverToInitialize, serverToInitialize, null, |
| | | initWindow); |
| | | } catch (DirectoryException de) |
| | | { |
| | | // An error message has been sent to the peer |
| | | // Nothing more to do locally |
| | | // An error message has been sent to the peer |
| | | // This server is not the initiator of the export so there is |
| | | // nothing more to do locally. |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Export thread stopping."); |
| | | } |
| | | TRACER.debugInfo("[IE] ending " + this.getName()); |
| | | } |
| | | } |
| | | |
| | |
| | | // The count for the entry not yet processed |
| | | long entryLeftCount = 0; |
| | | |
| | | // The exception raised when any |
| | | // Exception raised during the initialization. |
| | | DirectoryException exception = null; |
| | | |
| | | // A boolean indicating if the context is related to an |
| | | // import or an export. |
| | | // Whether the context is related to an import or an export. |
| | | boolean importInProgress; |
| | | |
| | | // Current counter of messages exchanged during the initialization |
| | | int msgCnt = 0; |
| | | |
| | | // Number of connections lost when we start the initialization. |
| | | // Will help counting connections lost during initialization, |
| | | int initNumLostConnections = 0; |
| | | |
| | | // Request message sent when this server has the initializeFromRemote task. |
| | | InitializeRequestMsg initReqMsgSent = null; |
| | | |
| | | // Start time of the initialization process. ErrorMsg timestamped |
| | | // before thi startTime will be ignored. |
| | | long startTime; |
| | | |
| | | // List fo replicas (DS) connected to the topology when |
| | | // initialization started. |
| | | Set<Integer> startList = new HashSet<Integer>(0); |
| | | |
| | | // List fo replicas (DS) with a failure (disconnected from the topology) |
| | | // since the initialization started. |
| | | Set<Integer> failureList = new HashSet<Integer>(0); |
| | | |
| | | // Flow control during initialization |
| | | // - for each remote server, counter of messages received |
| | | private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>(); |
| | | // - serverId of the slowest server (the one with the smallest non null |
| | | // counter) |
| | | private int slowestServerId = -1; |
| | | |
| | | short exporterProtocolVersion = -1; |
| | | |
| | | // Window used during this initialization |
| | | int initWindow; |
| | | |
| | | // Number of attempt already done for this initialization |
| | | short attemptCnt; |
| | | |
| | | /** |
| | | * Creates a new IEContext. |
| | | * |
| | |
| | | public IEContext(boolean importInProgress) |
| | | { |
| | | this.importInProgress = importInProgress; |
| | | this.startTime = System.currentTimeMillis(); |
| | | this.attemptCnt = 0; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Initializes the import/export counters with the provider value. |
| | | * @param total Total number of entries to be processed. |
| | | * @param left Remaining number of entries to be processed. |
| | | * @throws DirectoryException if an error occurred. |
| | | */ |
| | | public void setCounters(long total, long left) |
| | | private void initializeCounters(long total) |
| | | throws DirectoryException |
| | | { |
| | | entryCount = total; |
| | | entryLeftCount = left; |
| | | entryLeftCount = total; |
| | | |
| | | if (initializeTask != null) |
| | | { |
| | |
| | | { |
| | | this.exception = exception; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Set the id of the EntryMsg acknowledged from a receiver (importer)server. |
| | | * (updated via the listener thread) |
| | | * @param serverId serverId of the acknowledger/receiver/importer server. |
| | | * @param numAck id of the message received. |
| | | */ |
| | | public void setAckVal(int serverId, int numAck) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck); |
| | | |
| | | this.ackVals.put(serverId, numAck); |
| | | |
| | | // Recompute the server with the minAck returned,means the slowest server. |
| | | slowestServerId = serverId; |
| | | for (Integer sid : ieContext.ackVals.keySet()) |
| | | if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) |
| | | slowestServerId = sid; |
| | | } |
| | | |
| | | /** |
| | | * Returns the serverId of the server that acknowledged the smallest |
| | | * EntryMsg id. |
| | | * @return serverId of the server with latest acknowledge. |
| | | * 0 when no ack has been received yet. |
| | | */ |
| | | public int getSlowestServer() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] getSlowestServer" + slowestServerId |
| | | + " " + this.ackVals.get(slowestServerId)); |
| | | |
| | | return this.slowestServerId; |
| | | } |
| | | } |
| | | /** |
| | | * Verifies that the given string represents a valid source |
| | | * from which this server can be initialized. |
| | |
| | | public void initializeRemote(int target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | initializeRemote(target, serverID, initTask); |
| | | |
| | | if (target == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | // Check for the status of all remote servers to check if they |
| | | // are all finished with the import. |
| | | boolean done = true; |
| | | do |
| | | { |
| | | done = true; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | done = false; |
| | | try |
| | | { |
| | | Thread.sleep(100); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // just loop again. |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | while (!done); |
| | | } |
| | | initializeRemote(target, this.serverID, initTask, this.initWindow); |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | * specified by the target argument when this initialization specifying the |
| | | * server that requests the initialization. |
| | | * |
| | | * @param target The target that should be initialized. |
| | | * @param target2 The server that initiated the export. |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * @param serverToInitialize The target server that should be initialized. |
| | | * @param serverRunningTheTask The server that initiated the export. It can |
| | | * be the serverID of this server, or the serverID of a remote server. |
| | | * @param initTask The task in this server that triggers this initialization |
| | | * and that should be updated with its progress. Null when the export is done |
| | | * following a request coming from a remote server (task is remote). |
| | | * @param initWindow The value of the initialization window for flow control |
| | | * between the importer and the exporter. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | * @exception DirectoryException When an error occurs. No exception raised |
| | | * means success. |
| | | */ |
| | | protected void initializeRemote(int target, int target2, |
| | | Task initTask) throws DirectoryException |
| | | protected void initializeRemote(int serverToInitialize, |
| | | int serverRunningTheTask, Task initTask, int initWindow) |
| | | throws DirectoryException |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Integer.toString(target2)); |
| | | logError(msg); |
| | | DirectoryException exportRootException = null; |
| | | boolean contextAcquired = false; |
| | | |
| | | boolean contextAcquired=false; |
| | | |
| | | // Acquire and initialize the export context |
| | | acquireIEContext(false); |
| | | contextAcquired = true; |
| | | ieContext.exportTarget = target; |
| | | |
| | | if (initTask != null) |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | Integer.toString(serverID), Long.toString(countEntries()), serviceID, |
| | | Integer.toString(serverToInitialize)); |
| | | logError(msg); |
| | | |
| | | // We manage the list of servers to initialize in order : |
| | | // - to test at the end that all expected servers have reconnected |
| | | // after their import and with the right genId |
| | | // - to update the task with the server(s) where this test failed |
| | | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | for (DSInfo dsi : getReplicasList()) |
| | | ieContext.startList.add(dsi.getDsId()); |
| | | else |
| | | ieContext.startList.add(serverToInitialize); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | ieContext.setAckVal(dsi.getDsId(), 0); |
| | | } |
| | | |
| | | // The number of entries to be exported is the number of entries under |
| | | // the base DN entry and the base entry itself. |
| | | long entryCount = this.countEntries(); |
| | | |
| | | |
| | | ieContext.setCounters(entryCount, entryCount); |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initializeMessage = new InitializeTargetMsg( |
| | | serviceID, serverID, target, target2, entryCount); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | | try |
| | | // loop for the case where the exporter is the initiator |
| | | int attempt = 0; |
| | | boolean done = false; |
| | | while ((!done) && (++attempt<2)) // attempt loop |
| | | { |
| | | exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); |
| | | try |
| | | { |
| | | ieContext.exportTarget = serverToInitialize; |
| | | if (initTask != null) |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.initializeCounters(this.countEntries()); |
| | | ieContext.msgCnt = 0; |
| | | ieContext.initNumLostConnections = broker.getNumLostConnections(); |
| | | ieContext.initWindow = initWindow; |
| | | |
| | | // Notify the peer of the success |
| | | DoneMsg doneMsg = new DoneMsg(serverID, |
| | | initializeMessage.getDestination()); |
| | | broker.publish(doneMsg); |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( |
| | | serviceID, serverID, serverToInitialize, serverRunningTheTask, |
| | | ieContext.entryCount, initWindow); |
| | | |
| | | broker.publish(initTargetMsg); |
| | | |
| | | // Wait for all servers to be ok |
| | | waitForRemoteStartOfInit(); |
| | | |
| | | // Servers that left in the list are those for which we could not test |
| | | // that they have been successfully initialized. |
| | | if (!ieContext.failureList.isEmpty()) |
| | | { |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get( |
| | | ieContext.failureList.toString())); |
| | | } |
| | | |
| | | exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); |
| | | |
| | | // Notify the peer of the success |
| | | DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination()); |
| | | broker.publish(doneMsg); |
| | | |
| | | } |
| | | catch(DirectoryException exportException) |
| | | { |
| | | // Give priority to the first exception raised - stored in the context |
| | | if (ieContext.exception != null) |
| | | exportRootException = ieContext.exception; |
| | | else |
| | | exportRootException = exportException; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] In " + this.monitor.getMonitorInstanceName() |
| | | + " export ends with " |
| | | + " connected=" + broker.isConnected() |
| | | + " exportRootException=" + exportRootException); |
| | | |
| | | if (exportRootException != null) |
| | | { |
| | | try |
| | | { |
| | | // Handling the errors during export |
| | | |
| | | // Note: we could have lost the connection and another thread |
| | | // the listener one) has already managed to reconnect. |
| | | // So we MUST rely on the test broker.isConnected() |
| | | // ONLY to do 'wait to be reconnected by another thread' |
| | | // (if not yet reconnected already). |
| | | |
| | | if (!broker.isConnected()) |
| | | { |
| | | // We are still disconnected, so we wait for the listener thread |
| | | // to reconnect - wait 10s |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] Exporter wait for reconnection by the listener thread"); |
| | | int att=0; |
| | | while ((!broker.shuttingDown()) && |
| | | (!broker.isConnected())&& (++att<100)) |
| | | try { Thread.sleep(100); } catch(Exception e){} |
| | | } |
| | | |
| | | if ((initTask != null) && broker.isConnected() && |
| | | (serverToInitialize != RoutableMsg.ALL_SERVERS)) |
| | | { |
| | | // NewAttempt case : In the case where |
| | | // - it's not an InitializeAll |
| | | // - AND the previous export attempt failed |
| | | // - AND we are (now) connected |
| | | // - and we own the task and this task is not an InitializeAll |
| | | // Let's : |
| | | // - sleep to let time to the other peer to reconnect if needed |
| | | // - and launch another attempt |
| | | try { Thread.sleep(1000); } catch(Exception e){} |
| | | logError(NOTE_RESENDING_INIT_TARGET.get((exportRootException!=null? |
| | | exportRootException.getLocalizedMessage():""))); |
| | | |
| | | continue; |
| | | } |
| | | |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(serverToInitialize, |
| | | exportRootException.getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // Ignore the failure raised while proceeding the root failure |
| | | } |
| | | } |
| | | |
| | | // We are always done for this export ... |
| | | // ... except in the NewAttempt case (see above) |
| | | done = true; |
| | | |
| | | } // attempt loop |
| | | |
| | | // Wait for all servers to be ok, and build the failure list |
| | | waitForRemoteEndOfInit(); |
| | | |
| | | // Servers that left in the list are those for which we could not test |
| | | // that they have been successfully initialized. |
| | | if (!ieContext.failureList.isEmpty()) |
| | | { |
| | | if (exportRootException == null) |
| | | exportRootException = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get( |
| | | Long.toString(getGenerationID()), |
| | | ieContext.failureList.toString())); |
| | | } |
| | | |
| | | if (contextAcquired) |
| | | releaseIEContext(); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Notify the peer of the failure |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(target, |
| | | de.getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | |
| | | if (contextAcquired) |
| | | releaseIEContext(); |
| | | |
| | | throw(de); |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Integer.toString(target2)); |
| | | serviceID, |
| | | Integer.toString(serverToInitialize), |
| | | (exportRootException!=null?exportRootException.getLocalizedMessage():"")); |
| | | logError(msg); |
| | | |
| | | if (exportRootException != null) |
| | | { |
| | | throw(exportRootException); |
| | | } |
| | | |
| | | } |
| | | |
| | | /* |
| | | * For all remote servers in tht start list, |
| | | * - wait it has finished the import and present the expected generationID |
| | | * - build the failureList |
| | | */ |
| | | private void waitForRemoteStartOfInit() |
| | | { |
| | | int waitResultAttempt = 0; |
| | | Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0); |
| | | |
| | | for (Integer sid : ieContext.startList) |
| | | replicasWeAreWaitingFor.add(sid); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); |
| | | |
| | | boolean done = true; |
| | | do |
| | | { |
| | | done = true; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for start dsid " + dsi.getDsId() |
| | | + " " + dsi.getStatus() |
| | | + " " + dsi.getGenerationId() |
| | | + " " + this.getGenerationID()); |
| | | if (ieContext.startList.contains(dsi.getDsId())) |
| | | { |
| | | if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | // this one is still not doing the Full Update ... retry later |
| | | done = false; |
| | | try |
| | | { Thread.sleep(100); } catch (InterruptedException e) {} |
| | | waitResultAttempt++; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // this one is ok |
| | | replicasWeAreWaitingFor.remove(dsi.getDsId()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | while ((!done) && (waitResultAttempt<1200) // 2mn |
| | | && (!broker.shuttingDown())); |
| | | |
| | | // Add to the failure list the servers that were here at start time but |
| | | // that never ended with the right generationId. |
| | | for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0])) |
| | | ieContext.failureList.add(sid); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for start ends with " + ieContext.failureList); |
| | | } |
| | | |
| | | /* |
| | | * For all remote servers in tht start list, |
| | | * - wait it has finished the import and present the expected generationID |
| | | * - build the failureList |
| | | */ |
| | | private void waitForRemoteEndOfInit() |
| | | { |
| | | int waitResultAttempt = 0; |
| | | Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0); |
| | | |
| | | for (Integer sid : ieContext.startList) |
| | | replicasWeAreWaitingFor.add(sid); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); |
| | | |
| | | // In case some new servers appear during the init, we want them to be |
| | | // considered in the processing of sorting the successfully initialized |
| | | // and the others |
| | | for (DSInfo dsi : getReplicasList()) |
| | | replicasWeAreWaitingFor.add(dsi.getDsId()); |
| | | |
| | | boolean done = true; |
| | | do |
| | | { |
| | | done = true; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for end dsid " + dsi.getDsId() |
| | | + " " + dsi.getStatus() |
| | | + " " + dsi.getGenerationId() |
| | | + " " + this.getGenerationID()); |
| | | if (!ieContext.failureList.contains(dsi.getDsId())) |
| | | { |
| | | if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | // this one is still doing the Full Update ... retry later |
| | | done = false; |
| | | try |
| | | { Thread.sleep(1000); } catch (InterruptedException e) {} // 1s |
| | | waitResultAttempt++; |
| | | break; |
| | | } |
| | | else |
| | | { |
| | | // this one is done with the Full Update |
| | | if (dsi.getGenerationId() == this.getGenerationID()) |
| | | { |
| | | // and with the expected generationId |
| | | replicasWeAreWaitingFor.remove(dsi.getDsId()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | while ((!done) && (!broker.shuttingDown())); // infinite wait |
| | | |
| | | // Add to the failure list the servers that were here at start time but |
| | | // that never ended with the right generationId. |
| | | for (Integer sid : replicasWeAreWaitingFor.toArray(new Integer[0])) |
| | | ieContext.failureList.add(sid); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] wait for end ends with " + ieContext.failureList); |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Processes an error message received while an import/export is |
| | | * on going. |
| | | * Processes an error message received while an export is |
| | | * on going, or an import will start. |
| | | * |
| | | * @param errorMsg The error message received. |
| | | */ |
| | | void abandonImportExport(ErrorMsg errorMsg) |
| | | private void processErrorMsg(ErrorMsg errorMsg) |
| | | { |
| | | // FIXME TBD Treat the case where the error happens while entries |
| | | // are being exported |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugVerbose( |
| | | " abandonImportExport:" + this.serverID + |
| | | " serviceID: " + this.serviceID + |
| | | " Error Msg received: " + errorMsg); |
| | | |
| | | if (ieContext != null) |
| | | { |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails())); |
| | | |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | updateTaskCompletionState(ieContext.getException()); |
| | | // The ErrorMsg is received while we have started an initialization |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails())); |
| | | |
| | | releaseIEContext(); |
| | | /* |
| | | * This can happen : |
| | | * - on the first InitReqMsg sent when source in not known for example |
| | | * - on the next attempt when source crashed and did not reconnect |
| | | * even after the nextInitAttemptDelay |
| | | * During the import, the ErrorMsg will be received by receiveEntryBytes |
| | | */ |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | updateTaskCompletionState(ieContext.getException()); |
| | | |
| | | releaseIEContext(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // When we are the exporter in the case of initializeAll |
| | | // exporting must not be stopped on the first error. |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | msg = broker.receive(); |
| | | // In the context of the total update, we don't want any automatic |
| | | // re-connection done transparently by the broker because of a better |
| | | // RS or because of a connection failure. |
| | | // We want to be notified of topology change in order to track a |
| | | // potential disconnection of the exporter. |
| | | msg = broker.receive(false, false, true); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugVerbose( |
| | | " sid:" + serverID + |
| | | " base DN:" + serviceID + |
| | | " Import EntryBytes received " + msg); |
| | | TRACER.debugInfo( |
| | | "[IE] In " + this.monitor.getMonitorInstanceName() + |
| | | ", receiveEntryBytes " + msg); |
| | | |
| | | if (msg == null) |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | if (broker.shuttingDown()) |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | else |
| | | { |
| | | // Handle connection issues |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException( |
| | | ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_IMPORT.get( |
| | | broker.getReplicationServer()))); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | // Check good sequentiality of msg received |
| | | if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMsg entryMsg = (EntryMsg)msg; |
| | | byte[] entryBytes = entryMsg.getEntryBytes(); |
| | | ieContext.updateCounters(countEntryLimits(entryBytes)); |
| | | |
| | | if (ieContext.exporterProtocolVersion >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // check the msgCnt of the msg received to check sequenciality |
| | | if (++ieContext.msgCnt != entryMsg.getMsgId()) |
| | | { |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get( |
| | | String.valueOf(ieContext.msgCnt), |
| | | String.valueOf(entryMsg.getMsgId())))); |
| | | return null; |
| | | } |
| | | |
| | | // send the ack of flow control mgmt |
| | | if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0) |
| | | { |
| | | InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( |
| | | this.serverID, |
| | | entryMsg.getSenderID(), |
| | | ieContext.msgCnt); |
| | | broker.publish(amsg, false); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] In " + this.monitor.getMonitorInstanceName() + |
| | | ", publish InitializeRcvAckMsg" + amsg); |
| | | } |
| | | } |
| | | return entryBytes; |
| | | } |
| | | else if (msg instanceof DoneMsg) |
| | |
| | | // This is an error termination during the import |
| | | // The error is stored and the import is ended |
| | | // by returning null |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails())); |
| | | return null; |
| | | if (ieContext.getException() == null) |
| | | { |
| | | ErrorMsg errMsg = (ErrorMsg)msg; |
| | | if (errMsg.getCreationTime() > ieContext.startTime) |
| | | { |
| | | ieContext.setException( |
| | | new DirectoryException(ResultCode.OTHER,errMsg.getDetails())); |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // Other messages received during an import are trashed |
| | | // Other messages received during an import are trashed except |
| | | // the topologyMsg. |
| | | if ((msg instanceof TopologyMsg) && |
| | | (!this.isRemoteDSConnected(ieContext.importSource))) |
| | | { |
| | | Message errMsg = |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | | ERR_INIT_EXPORTER_DISCONNECTION.get( |
| | | this.serviceID, |
| | | Integer.toString(this.serverID), |
| | | Integer.toString(ieContext.importSource))); |
| | | if (ieContext.getException()==null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | errMsg)); |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // TODO: i18n |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | Message.raw("received an unexpected message type" + |
| | | e.getLocalizedMessage()))); |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); |
| | | } |
| | | } |
| | | } |
| | |
| | | * |
| | | * @throws IOException when an error occurred. |
| | | */ |
| | | void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) throws IOException |
| | | public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) |
| | | throws IOException |
| | | { |
| | | // If an error was raised - like receiving an ErrorMsg |
| | | // we just let down the export. |
| | | if (ieContext.getException() != null) |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry); |
| | | |
| | | // build the message |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | serverID,ieContext.getExportTarget(), lDIFEntry, pos, length, |
| | | ++ieContext.msgCnt); |
| | | |
| | | // Waiting the slowest loop |
| | | while (!broker.shuttingDown()) |
| | | { |
| | | IOException ioe = new IOException(ieContext.getException().getMessage()); |
| | | ieContext = null; |
| | | throw ioe; |
| | | // If an error was raised - like receiving an ErrorMsg from a remote |
| | | // server that have been stored by the listener thread in the ieContext, |
| | | // we just abandon the export by throwing an exception. |
| | | if (ieContext.getException() != null) |
| | | throw(new IOException(ieContext.getException().getMessage())); |
| | | |
| | | int slowestServerId = ieContext.getSlowestServer(); |
| | | if (!isRemoteDSConnected(slowestServerId)) |
| | | { |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get( |
| | | Integer.toString(ieContext.getSlowestServer())))); |
| | | // .. and abandon the export by throwing an exception. |
| | | IOException ioe = |
| | | new IOException("IOException with nested DirectoryException"); |
| | | ioe.initCause(ieContext.getException()); |
| | | throw ioe; |
| | | } |
| | | |
| | | int ourLastExportedCnt = ieContext.msgCnt; |
| | | int slowestCnt = ieContext.ackVals.get(slowestServerId); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " + |
| | | " our=" + ourLastExportedCnt + " slowest=" + slowestCnt); |
| | | |
| | | if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting"); |
| | | |
| | | // our export is too far beyond the slowest importer - let's wait |
| | | try { Thread.sleep(100); } catch(Exception e) {} |
| | | |
| | | // process any connection error |
| | | if ((broker.hasConnectionError())|| |
| | | (broker.getNumLostConnections()!= ieContext.initNumLostConnections)) |
| | | { |
| | | // publish failed - store the error in the ieContext ... |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( |
| | | Integer.toString(broker.getRsServerId()))); |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] slowest got to us => stop waiting"); |
| | | break; |
| | | } |
| | | } // Waiting the slowest loop |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry); |
| | | |
| | | // publish the message |
| | | boolean sent = broker.publish(entryMessage, false); |
| | | |
| | | // process any publish error |
| | | if (((!sent)|| |
| | | (broker.hasConnectionError()))|| |
| | | (broker.getNumLostConnections() != ieContext.initNumLostConnections)) |
| | | { |
| | | // publish failed - store the error in the ieContext ... |
| | | DirectoryException de = new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( |
| | | Integer.toString(broker.getRsServerId()))); |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | | |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | serverID,ieContext.getExportTarget(), lDIFEntry, pos, length); |
| | | broker.publish(entryMessage); |
| | | |
| | | // publish succeeded |
| | | try |
| | | { |
| | | ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length)); |
| | | } |
| | | catch (DirectoryException de) |
| | | { |
| | | // store the error in the ieContext ... |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(de); |
| | | // .. and abandon the export by throwing an exception. |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Initializes this domain from another source server. |
| | | * Initializes asynchronously this domain from a remote source server. |
| | | * Before returning from this call, for the provided task : |
| | | * - the progressing counters are updated during the initialization using |
| | | * setTotal() and setLeft(). |
| | | * - the end of the initialization using updateTaskCompletionState(). |
| | | * <p> |
| | | * When this method is called, a request for initialization will |
| | | * be sent to the source server asking for initialization. |
| | | * When this method is called, a request for initialization is sent to the |
| | | * remote source server requesting initialization. |
| | | * <p> |
| | | * The {@link #exportBackend(OutputStream)} will therefore be called |
| | | * on the source server, and the {@link #importBackend(InputStream)} |
| | | * will be called on his server. |
| | | * <p> |
| | | * The InputStream and OutpuStream given as a parameter to those |
| | | * methods will be connected through the replication protocol. |
| | | * |
| | | * @param source The server-id of the source from which to initialize. |
| | | * The source can be discovered using the |
| | | * {@link #getReplicasList()} method. |
| | | * |
| | | * @param initTask The task that launched the initialization |
| | | * and should be updated of its progress. |
| | | * |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | * The task state is updated. |
| | | */ |
| | | public void initializeFromRemote(int source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | Message errMsg = null; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Entering initializeFromRemote"); |
| | | TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this); |
| | | |
| | | if (!broker.isConnected()) |
| | | { |
| | | if (initTask instanceof InitializeTask) |
| | | { |
| | | InitializeTask task = (InitializeTask) initTask; |
| | | task.updateTaskCompletionState( |
| | | new DirectoryException( |
| | | ResultCode.OTHER, ERR_INITIALIZATION_FAILED_NOCONN.get( |
| | | getServiceID()))); |
| | | } |
| | | return; |
| | | errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID()); |
| | | } |
| | | |
| | | acquireIEContext(true); |
| | | ieContext.initializeTask = initTask; |
| | | // We must not test here whether the remote source is connected to |
| | | // the topology by testing if it stands in the replicas list since. |
| | | // In the case of a re-attempt of initialization, the listener thread is |
| | | // running this method directly coming from initailize() method and did |
| | | // not processed any topology message in between the failure and the |
| | | // new attempt. |
| | | try |
| | | { |
| | | // We must immediatly acquire a context to store the task inside |
| | | // The context will be used when we (the listener thread) will receive |
| | | // the InitializeTargetMsg, process the import, and at the end |
| | | // update the task. |
| | | |
| | | InitializeRequestMsg initializeMsg = new InitializeRequestMsg( |
| | | serviceID, serverID, source); |
| | | acquireIEContext(true); //test and set if no import already in progress |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.attemptCnt = 0; |
| | | ieContext.initReqMsgSent = new InitializeRequestMsg( |
| | | serviceID, serverID, source, this.initWindow); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(initializeMsg); |
| | | // Publish Init request msg |
| | | broker.publish(ieContext.initReqMsgSent); |
| | | |
| | | // .. we expect to receive entries or err after that |
| | | // The normal success processing is now to receive InitTargetMsg then |
| | | // entries from the remote server. |
| | | // The error cases are : |
| | | // - either local error immediatly caught below |
| | | // - a remote error we will receive as an ErrorMsg |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | errMsg = de.getMessageObject(); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // Should not happen |
| | | errMsg = Message.raw(Category.SYNC, Severity.NOTICE, |
| | | e.getLocalizedMessage()); |
| | | logError(errMsg); |
| | | } |
| | | |
| | | // When error, update the task and raise the error to the caller |
| | | if (errMsg != null) |
| | | { |
| | | // No need to call here updateTaskCompletionState - will be done |
| | | // by the caller |
| | | releaseIEContext(); |
| | | DirectoryException de = new DirectoryException( |
| | | ResultCode.OTHER, |
| | | errMsg); |
| | | throw (de); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Initializes the domain's backend with received entries. |
| | | * @param initializeMessage The message that initiated the import. |
| | | * @exception DirectoryException Thrown when an error occurs. |
| | | * Processes an InitializeTargetMsg received from a remote server |
| | | * meaning processes an initialization from the entries expected to be |
| | | * received now. |
| | | * |
| | | * @param initTargetMsgReceived The message received from the remote server. |
| | | * |
| | | * @param requestorServerId The serverId of the server that requested the |
| | | * initialization meaning the server where the |
| | | * task has initially been created (this server, |
| | | * or the remote server). |
| | | */ |
| | | void initialize(InitializeTargetMsg initializeMessage) |
| | | throws DirectoryException |
| | | void initialize(InitializeTargetMsg initTargetMsgReceived, |
| | | int requestorServerId) |
| | | { |
| | | DirectoryException de = null; |
| | | InitializeTask initFromtask = null; |
| | | |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering initialize - domain=" + this); |
| | | |
| | | // Go into full update status |
| | | setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT); |
| | | |
| | | if (initializeMessage.getRequestorID() == serverID) |
| | | { |
| | | // The import responds to a request we did so the IEContext |
| | | // is already acquired |
| | | } |
| | | else |
| | | { |
| | | acquireIEContext(true); |
| | | } |
| | | |
| | | ieContext.importSource = initializeMessage.getsenderID(); |
| | | ieContext.entryLeftCount = initializeMessage.getEntryCount(); |
| | | ieContext.setCounters( |
| | | initializeMessage.getEntryCount(), |
| | | initializeMessage.getEntryCount()); |
| | | int source = initTargetMsgReceived.getSenderID(); |
| | | |
| | | try |
| | | { |
| | | // Log starting |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Long.toString(initTargetMsgReceived.getInitiatorID())); |
| | | logError(msg); |
| | | |
| | | // Go into full update status |
| | | setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT); |
| | | |
| | | // Acquire an import context if no already done (and initialize). |
| | | if (initTargetMsgReceived.getInitiatorID() == this.serverID) |
| | | { |
| | | // The initTargetMsgReceived received is the answer to a request that |
| | | // we (this server) sent previously. In this case, so the IEContext |
| | | // has been already acquired when the request was published in order |
| | | // to store the task (to be updated with the status at the end). |
| | | } |
| | | else |
| | | { |
| | | // The initTargetMsgReceived is for an import initiated by the remote |
| | | // server. |
| | | // Test and set if no import already in progress |
| | | acquireIEContext(true); |
| | | } |
| | | |
| | | // Initialize stuff |
| | | ieContext.importSource = source; |
| | | ieContext.initializeCounters(initTargetMsgReceived.getEntryCount()); |
| | | ieContext.initWindow = initTargetMsgReceived.getInitWindow(); |
| | | // Protocol version is -1 when not known. |
| | | ieContext.exporterProtocolVersion = getProtocolVersion(source); |
| | | initFromtask = (InitializeTask)ieContext.initializeTask; |
| | | |
| | | // Lauch the import |
| | | importBackend(new ReplInputStream(this)); |
| | | broker.reStart(); |
| | | |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | de = e; |
| | | // Store the exception raised. It will be considered if no other exception |
| | | // has been previously stored in the context |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(e); |
| | | } |
| | | finally |
| | | { |
| | | if ((ieContext != null) && (ieContext.getException() != null)) |
| | | de = ieContext.getException(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | + " ends import with exception=" + ieContext.getException() |
| | | + " connected=" + broker.isConnected()); |
| | | |
| | | // Update the task that initiated the import |
| | | if ((ieContext != null ) && (ieContext.initializeTask != null)) |
| | | // It is necessary to restart (reconnect to RS) for different reasons |
| | | // - when everything went well, reconnect in order to exchange |
| | | // new state, new generation ID |
| | | // - when we have connection failure, reconnect to retry a new import |
| | | // right here, right now |
| | | // we never want retryOnFailure if we fails reconnecting in the restart. |
| | | broker.reStart(false); |
| | | |
| | | if (ieContext.getException() != null) |
| | | { |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | updateTaskCompletionState(de); |
| | | if (broker.isConnected() && (initFromtask != null) |
| | | && (++ieContext.attemptCnt<2)) |
| | | { |
| | | // Worth a new attempt |
| | | // since initFromtask is in this server, connection is ok |
| | | try |
| | | { |
| | | |
| | | // Wait for the exporter to stabilize - eventually reconnect as |
| | | // well if it was connected to the same RS than the one we lost ... |
| | | Thread.sleep(1000); |
| | | |
| | | // Restart the whole import protocol exchange by sending again |
| | | // the request |
| | | logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get( |
| | | ieContext.getException().getLocalizedMessage())); |
| | | |
| | | broker.publish(ieContext.initReqMsgSent); |
| | | |
| | | ieContext.initializeCounters(0); |
| | | ieContext.exception = null; |
| | | ieContext.msgCnt = 0; |
| | | |
| | | // Processing of the received initTargetMsgReceived is done |
| | | // let's wait for the next one |
| | | return; |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // An error occurs when sending a new request for a new import. |
| | | // This error is not stored, prefering to keep the initial one. |
| | | logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get( |
| | | e.getLocalizedMessage(), |
| | | ieContext.getException().getLocalizedMessage())); |
| | | } |
| | | } |
| | | } |
| | | releaseIEContext(); |
| | | } |
| | | |
| | | // Sends up the root error. |
| | | if (de != null) |
| | | // =================== |
| | | // No new attempt case |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | + " ends initialization with exception=" + ieContext.getException() |
| | | + " connected=" + broker.isConnected() |
| | | + " task=" + initFromtask |
| | | + " attempt=" + ieContext.attemptCnt); |
| | | |
| | | try |
| | | { |
| | | if (broker.isConnected() && (ieContext.getException() != null)) |
| | | { |
| | | // Let's notify the exporter |
| | | ErrorMsg errorMsg = new ErrorMsg(requestorServerId, |
| | | ieContext.getException().getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | else // !broker.isConnected() |
| | | { |
| | | // Don't try to reconnect here. |
| | | // The current running thread is the listener thread and will loop on |
| | | // receive() that is expected to manage reconnects attempt. |
| | | } |
| | | |
| | | // Update the task that initiated the import must be the last thing. |
| | | // Particularly, broker.restart() after import success must be done |
| | | // before some other operations/tasks to be launched, |
| | | // like resetting the generation ID. |
| | | if (initFromtask != null) |
| | | { |
| | | initFromtask.updateTaskCompletionState(ieContext.getException()); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Long.toString(initTargetMsgReceived.getInitiatorID()), |
| | | (ieContext.getException()!=null? |
| | | ieContext.getException().getLocalizedMessage():"")); |
| | | logError(msg); |
| | | releaseIEContext(); |
| | | } // finally |
| | | } // finally |
| | | } |
| | | |
| | | /** |
| | | * Return the protocol version of the DS related to the provided serverid. |
| | | * Returns -1 when the protocol version is not known. |
| | | * @param dsServerId The provided serverid. |
| | | * @return The procotol version. |
| | | */ |
| | | short getProtocolVersion(int dsServerId) |
| | | { |
| | | short protocolVersion = -1; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | throw de; |
| | | if (dsi.getDsId() == dsServerId) |
| | | { |
| | | protocolVersion = dsi.getProtocolVersion(); |
| | | break; |
| | | } |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | Integer.toString(serverID), |
| | | serviceID, |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | | return protocolVersion; |
| | | } |
| | | |
| | | /** |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Server id " + serverID + " and domain " + serviceID |
| | | + "resetGenerationId" + generationIdNewValue); |
| | | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | + " resetGenerationId " + generationIdNewValue); |
| | | |
| | | ResetGenerationIdMsg genIdMessage = null; |
| | | |
| | |
| | | { |
| | | genIdMessage = new ResetGenerationIdMsg(generationIdNewValue); |
| | | } |
| | | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID, |
| | | Integer.toString(serverID), |
| | | Long.toString(genIdMessage.getGenerationId())); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | broker.publish(genIdMessage); |
| | | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | |
| | | // Wait for the listener thread to stop |
| | | if (listenerThread != null) |
| | | listenerThread.waitForShutdown(); |
| | | |
| | | } |
| | | |
| | | /** |