| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2012 ForgeRock AS |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | import java.io.OutputStream; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | |
| | | * to be able to correlate all the coming back acks to the original |
| | | * operation. |
| | | */ |
| | | private final SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs = |
| | | new TreeMap<ChangeNumber, UpdateMsg>(); |
| | | private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs = |
| | | new ConcurrentHashMap<ChangeNumber, UpdateMsg>(); |
| | | |
| | | |
| | | /** |
| | |
| | | // that have not been successfully acknowledged (either because of timeout, |
| | | // wrong status or error at replay) for a particular server (DS or RS). String |
| | | // format: <server id>:<number of failed updates> |
| | | private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = |
| | | private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | // Number of updates received in Assured Mode, Safe Read request |
| | | private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0); |
| | |
| | | // Multiple values allowed: number of updates sent in Assured Mode, Safe Data, |
| | | // that have not been successfully acknowledged because of timeout for a |
| | | // particular RS. String format: <server id>:<number of failed updates> |
| | | private Map<Integer, Integer> assuredSdServerTimeoutUpdates = |
| | | private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | |
| | | /** |
| | |
| | | |
| | | /* Status related monitoring fields */ |
| | | |
| | | // Indicates the date when the status changed. This may be used to indicate |
| | | // the date the session with the current replication server started (when |
| | | // status is NORMAL for instance). All the above assured monitoring fields |
| | | // are also reset each time the status is changed |
| | | /** |
| | | * Indicates the date when the status changed. This may be used to indicate |
| | | * the date the session with the current replication server started (when |
| | | * status is NORMAL for instance). All the above assured monitoring fields |
| | | * are also reset each time the status is changed |
| | | */ |
| | | private Date lastStatusChangeDate = new Date(); |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns informations about the DS server related to the provided serverId. |
| | | * Returns information about the DS server related to the provided serverId. |
| | | * based on the TopologyMsg we received when the remote replica connected or |
| | | * disconnected. Return null when no server with the provided serverId is |
| | | * connected. |
| | |
| | | */ |
| | | public void setURLs(Set<String> referralsUrl) |
| | | { |
| | | for (String url : referralsUrl) |
| | | this.refUrls.add(url); |
| | | this.refUrls.addAll(referralsUrl); |
| | | } |
| | | |
| | | /** |
| | |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg; |
| | | |
| | | // This must be done while we are still holding the |
| | | // broker lock because we are now going to receive a |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | /* |
| | | This must be done while we are still holding the broker lock |
| | | because we are now going to receive a bunch of entries from the |
| | | remote server and we want the import thread to catch them and |
| | | not the ListenerThread. |
| | | */ |
| | | initialize(initTargetMsg, initTargetMsg.getSenderID()); |
| | | } |
| | | else if (msg instanceof ErrorMsg) |
| | |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find the import source. |
| | | // |
| | | // A remote error during the import will be received in the |
| | | // receiveEntryBytes() method. |
| | | // |
| | | /* |
| | | This is an error termination for the 2 following cases : |
| | | - either during an export |
| | | - or before an import really started |
| | | For example, when we publish a request and the |
| | | replicationServer did not find the import source. |
| | | |
| | | A remote error during the import will be received in the |
| | | receiveEntryBytes() method. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] processErrorMsg:" + this.serverID + |
| | |
| | | } |
| | | else |
| | | { |
| | | // Simply log - happen when the ErrorMsg relates to a previous |
| | | // attempt of initialization while we have started a new one |
| | | // on this side. |
| | | /* |
| | | Simply log - happen when the ErrorMsg relates to a previous |
| | | attempt of initialization while we have started a new one |
| | | on this side. |
| | | */ |
| | | logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails())); |
| | | } |
| | | } |
| | |
| | | { |
| | | // just retry |
| | | } |
| | | // Test if we have received and export request message and |
| | | // if that's the case handle it now. |
| | | // This must be done outside of the portion of code protected |
| | | // by the broker lock so that we keep receiveing update |
| | | // when we are doing and export and so that a possible |
| | | // closure of the socket happening when we are publishing the |
| | | // entries to the remote can be handled by the other |
| | | // replay thread when they call this method and therefore the |
| | | // broker.receive() method. |
| | | /* |
| | | Test if we have received and export request message and |
| | | if that's the case handle it now. |
| | | This must be done outside of the portion of code protected |
| | | by the broker lock so that we keep receiving update |
| | | when we are doing and export and so that a possible |
| | | closure of the socket happening when we are publishing the |
| | | entries to the remote can be handled by the other |
| | | replay thread when they call this method and therefore the |
| | | broker.receive() method. |
| | | */ |
| | | if (initReqMsg != null) |
| | | { |
| | | // Do this work in a thread to allow replay thread continue working |
| | |
| | | * particular server in the list. This increments the counter of error for the |
| | | * passed server, or creates an initial value of 1 error for it if the server |
| | | * is not yet present in the map. |
| | | * @param errorList |
| | | * @param sid |
| | | * @param errorsByServer map of number of errors per serverID |
| | | * @param sid the ID of the server which produced an error |
| | | */ |
| | | private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer, |
| | | Integer sid) |
| | |
| | | { |
| | | // Server already present in list, just increment number of |
| | | // errors for the server |
| | | int val = serverErrCount.intValue(); |
| | | int val = serverErrCount; |
| | | val++; |
| | | errorsByServer.put(sid, val); |
| | | } |
| | |
| | | |
| | | // Remove the message for pending ack list (this may already make the thread |
| | | // that is waiting for the ack be aware of its reception) |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | update = waitingAckMsgs.remove(changeNumber); |
| | | } |
| | | update = waitingAckMsgs.remove(changeNumber); |
| | | |
| | | // Signal waiting thread ack has been received |
| | | if (update != null) |
| | |
| | | |
| | | if ( hasTimeout || hasReplayErrors || hasWrongStatus) |
| | | { |
| | | // Some problems detected: message not correclty reached every requested |
| | | // servers. Log problem |
| | | /* |
| | | Some problems detected: message did not correctly reach every |
| | | requested servers. Log problem |
| | | */ |
| | | Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get( |
| | | serviceID, Integer.toString(serverID), |
| | | update.toString(), ack.errorsToString()); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Retrieves a replication domain based on the baseDn. |
| | | * |
| | | * @param serviceID The identifier of the domain to retrieve. |
| | | * |
| | | * @return The domain retrieved. |
| | | * |
| | | * @throws DirectoryException When an error occurred or no domain |
| | | * match the provided baseDn. |
| | | */ |
| | | static ReplicationDomain retrievesReplicationDomain(String serviceID) |
| | | throws DirectoryException |
| | | { |
| | | ReplicationDomain replicationDomain = domains.get(serviceID); |
| | | if (replicationDomain == null) |
| | | { |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | ERR_NO_MATCHING_DOMAIN.get(serviceID)); |
| | | } |
| | | return replicationDomain; |
| | | } |
| | | |
| | | /* |
| | | * After this point the code is related to the Total Update. |
| | |
| | | * This thread is launched when we want to export data to another server. |
| | | * |
| | | * When a task is created locally (so this local server is the initiator) |
| | | * of the export (Exemple: dsreplication initialize-all), |
| | | * of the export (Example: dsreplication initialize-all), |
| | | * this thread is NOT used but the task thread is running the export instead). |
| | | */ |
| | | private class ExportThread extends DirectoryThread |
| | |
| | | initWindow); |
| | | } catch (DirectoryException de) |
| | | { |
| | | // An error message has been sent to the peer |
| | | // This server is not the initiator of the export so there is |
| | | // nothing more to do locally. |
| | | /* |
| | | An error message has been sent to the peer |
| | | This server is not the initiator of the export so there is |
| | | nothing more to do locally. |
| | | */ |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | |
| | | /** |
| | | * Update the counters of the task for each entry processed during |
| | | * an import or export. |
| | | * @throws DirectoryException if an error occurred. |
| | | */ |
| | | public void updateCounters() |
| | | throws DirectoryException |
| | | { |
| | | entryLeftCount--; |
| | | |
| | | if (initializeTask != null) |
| | | { |
| | | if (initializeTask instanceof InitializeTask) |
| | | { |
| | | ((InitializeTask)initializeTask).setLeft(entryLeftCount); |
| | | } |
| | | else if (initializeTask instanceof InitializeTargetTask) |
| | | { |
| | | ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the counters of the task for each entry processed during |
| | | * an import or export. |
| | | * |
| | | * @param entriesDone The number of entries that were processed |
| | | * since the last time this method was called. |
| | |
| | | * on this server, and the {@code importBackend(InputStream)} |
| | | * will be called on the remote server. |
| | | * <p> |
| | | * The InputStream and OutpuStream given as a parameter to those |
| | | * The InputStream and OutputStream given as a parameter to those |
| | | * methods will be connected through the replication protocol. |
| | | * |
| | | * @param target The server-id of the server that should be initialized. |
| | |
| | | public void initializeRemote(int target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | |
| | | initializeRemote(target, this.serverID, initTask, this.initWindow); |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | // Acquire and initialize the export context |
| | | acquireIEContext(false); |
| | | |
| | | // We manage the list of servers to initialize in order : |
| | | // - to test at the end that all expected servers have reconnected |
| | | // after their import and with the right genId |
| | | // - to update the task with the server(s) where this test failed |
| | | /* |
| | | We manage the list of servers to initialize in order : |
| | | - to test at the end that all expected servers have reconnected |
| | | after their import and with the right genId |
| | | - to update the task with the server(s) where this test failed |
| | | */ |
| | | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | // Handling the errors during export |
| | | /* |
| | | Handling the errors during export |
| | | |
| | | // Note: we could have lost the connection and another thread |
| | | // the listener one) has already managed to reconnect. |
| | | // So we MUST rely on the test broker.isConnected() |
| | | // ONLY to do 'wait to be reconnected by another thread' |
| | | // (if not yet reconnected already). |
| | | |
| | | Note: we could have lost the connection and another thread |
| | | the listener one) has already managed to reconnect. |
| | | So we MUST rely on the test broker.isConnected() |
| | | ONLY to do 'wait to be reconnected by another thread' |
| | | (if not yet reconnected already). |
| | | */ |
| | | if (!broker.isConnected()) |
| | | { |
| | | // We are still disconnected, so we wait for the listener thread |
| | |
| | | if ((initTask != null) && broker.isConnected() && |
| | | (serverToInitialize != RoutableMsg.ALL_SERVERS)) |
| | | { |
| | | // NewAttempt case : In the case where |
| | | // - it's not an InitializeAll |
| | | // - AND the previous export attempt failed |
| | | // - AND we are (now) connected |
| | | // - and we own the task and this task is not an InitializeAll |
| | | // Let's : |
| | | // - sleep to let time to the other peer to reconnect if needed |
| | | // - and launch another attempt |
| | | /* |
| | | NewAttempt case : In the case where |
| | | - it's not an InitializeAll |
| | | - AND the previous export attempt failed |
| | | - AND we are (now) connected |
| | | - and we own the task and this task is not an InitializeAll |
| | | Let's : |
| | | - sleep to let time to the other peer to reconnect if needed |
| | | - and launch another attempt |
| | | */ |
| | | try { Thread.sleep(1000); } catch(Exception e){} |
| | | logError(NOTE_RESENDING_INIT_TARGET.get( |
| | | exportRootException.getLocalizedMessage())); |
| | |
| | | int waitResultAttempt = 0; |
| | | Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0); |
| | | |
| | | for (Integer sid : ieContext.startList) |
| | | replicasWeAreWaitingFor.add(sid); |
| | | replicasWeAreWaitingFor.addAll(ieContext.startList); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | { |
| | | // this one is still not doing the Full Update ... retry later |
| | | done = false; |
| | | try |
| | | { Thread.sleep(100); } catch (InterruptedException e) {} |
| | | try { Thread.sleep(100); |
| | | } |
| | | catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | waitResultAttempt++; |
| | | break; |
| | | } |
| | |
| | | while ((!done) && (waitResultAttempt<1200) // 2mn |
| | | && (!broker.shuttingDown())); |
| | | |
| | | ieContext.failureList.addAll( |
| | | Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0]))); |
| | | ieContext.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | TRACER.debugInfo( |
| | | "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); |
| | | |
| | | // In case some new servers appear during the init, we want them to be |
| | | // considered in the processing of sorting the successfully initialized |
| | | // and the others |
| | | /* |
| | | In case some new servers appear during the init, we want them to be |
| | | considered in the processing of sorting the successfully initialized |
| | | and the others |
| | | */ |
| | | for (DSInfo dsi : getReplicasList()) |
| | | replicasWeAreWaitingFor.add(dsi.getDsId()); |
| | | |
| | |
| | | done = true; |
| | | short reconnectMaxDelayInSec = 10; |
| | | short reconnectWait = 0; |
| | | Integer[] servers = replicasWeAreWaitingFor.toArray(new Integer[0]); |
| | | for (int serverId : servers) |
| | | for (int serverId : replicasWeAreWaitingFor) |
| | | { |
| | | if (ieContext.failureList.contains(serverId)) |
| | | { |
| | | // this server has already been in error during initialization |
| | | // dont't wait for it |
| | | /* |
| | | this server has already been in error during initialization |
| | | don't wait for it |
| | | */ |
| | | continue; |
| | | } |
| | | |
| | | DSInfo dsInfo = isRemoteDSConnected(serverId); |
| | | if (dsInfo == null) |
| | | { |
| | | // this server is disconnected |
| | | // may be for a long time if it crashed or had been stopped |
| | | // may be just the time to reconnect after import : should be short |
| | | /* |
| | | this server is disconnected |
| | | may be for a long time if it crashed or had been stopped |
| | | may be just the time to reconnect after import : should be short |
| | | */ |
| | | if (++reconnectWait<reconnectMaxDelayInSec) |
| | | { |
| | | // let's still wait to give a chance to this server to reconnect |
| | |
| | | } |
| | | while ((!done) && (!broker.shuttingDown())); // infinite wait |
| | | |
| | | ieContext.failureList.addAll( |
| | | Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0]))); |
| | | ieContext.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | |
| | | } |
| | | else |
| | | { |
| | | // When we are the exporter in the case of initializeAll |
| | | // exporting must not be stopped on the first error. |
| | | /* |
| | | When we are the exporter in the case of initializeAll |
| | | exporting must not be stopped on the first error. |
| | | */ |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | // Check good sequentiality of msg received |
| | | // Check good ordering of msg received |
| | | if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMsg entryMsg = (EntryMsg)msg; |
| | |
| | | if (ieContext.exporterProtocolVersion >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // check the msgCnt of the msg received to check sequenciality |
| | | // check the msgCnt of the msg received to check ordering |
| | | if (++ieContext.msgCnt != entryMsg.getMsgId()) |
| | | { |
| | | if (ieContext.getException() == null) |
| | |
| | | } |
| | | else if (msg instanceof DoneMsg) |
| | | { |
| | | // This is the normal termination of the import |
| | | // No error is stored and the import is ended |
| | | // by returning null |
| | | /* |
| | | This is the normal termination of the import |
| | | No error is stored and the import is ended |
| | | by returning null |
| | | */ |
| | | return null; |
| | | } |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | // This is an error termination during the import |
| | | // The error is stored and the import is ended |
| | | // by returning null |
| | | /* |
| | | This is an error termination during the import |
| | | The error is stored and the import is ended |
| | | by returning null |
| | | */ |
| | | if (ieContext.getException() == null) |
| | | { |
| | | ErrorMsg errMsg = (ErrorMsg)msg; |
| | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // TODO: i18n |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(new DirectoryException(ResultCode.OTHER, |
| | | ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); |
| | |
| | | * This is based on the hypothesis that the entries are separated |
| | | * by a "\n\n" String. |
| | | * |
| | | * @param entryBytes |
| | | * @param entryBytes the set of bytes containing one or more entries. |
| | | * @return The number of entries in the provided byte[]. |
| | | */ |
| | | private int countEntryLimits(byte[] entryBytes) |
| | |
| | | * This is based on the hypothesis that the entries are separated |
| | | * by a "\n\n" String. |
| | | * |
| | | * @param entryBytes |
| | | * @param entryBytes the set of bytes containing one or more entries. |
| | | * @return The number of entries in the provided byte[]. |
| | | */ |
| | | private int countEntryLimits(byte[] entryBytes, int pos, int length) |
| | |
| | | throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry); |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + |
| | | Arrays.toString(lDIFEntry)); |
| | | |
| | | // build the message |
| | | EntryMsg entryMessage = new EntryMsg( |
| | |
| | | // Waiting the slowest loop |
| | | while (!broker.shuttingDown()) |
| | | { |
| | | // If an error was raised - like receiving an ErrorMsg from a remote |
| | | // server that have been stored by the listener thread in the ieContext, |
| | | // we just abandon the export by throwing an exception. |
| | | /* |
| | | If an error was raised - like receiving an ErrorMsg from a remote |
| | | server that have been stored by the listener thread in the ieContext, |
| | | we just abandon the export by throwing an exception. |
| | | */ |
| | | if (ieContext.getException() != null) |
| | | throw(new IOException(ieContext.getException().getMessage())); |
| | | |
| | |
| | | } // Waiting the slowest loop |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry); |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" |
| | | + Arrays.toString(lDIFEntry)); |
| | | |
| | | // publish the message |
| | | boolean sent = broker.publish(entryMessage, false); |
| | |
| | | errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID()); |
| | | } |
| | | |
| | | // We must not test here whether the remote source is connected to |
| | | // the topology by testing if it stands in the replicas list since. |
| | | // In the case of a re-attempt of initialization, the listener thread is |
| | | // running this method directly coming from initailize() method and did |
| | | // not processed any topology message in between the failure and the |
| | | // new attempt. |
| | | /* |
| | | We must not test here whether the remote source is connected to |
| | | the topology by testing if it stands in the replicas list since. |
| | | In the case of a re-attempt of initialization, the listener thread is |
| | | running this method directly coming from initialize() method and did |
| | | not processed any topology message in between the failure and the |
| | | new attempt. |
| | | */ |
| | | try |
| | | { |
| | | // We must immediatly acquire a context to store the task inside |
| | | // The context will be used when we (the listener thread) will receive |
| | | // the InitializeTargetMsg, process the import, and at the end |
| | | // update the task. |
| | | /* |
| | | We must immediately acquire a context to store the task inside |
| | | The context will be used when we (the listener thread) will receive |
| | | the InitializeTargetMsg, process the import, and at the end |
| | | update the task. |
| | | */ |
| | | |
| | | acquireIEContext(true); //test and set if no import already in progress |
| | | ieContext.initializeTask = initTask; |
| | |
| | | // Publish Init request msg |
| | | broker.publish(ieContext.initReqMsgSent); |
| | | |
| | | // The normal success processing is now to receive InitTargetMsg then |
| | | // entries from the remote server. |
| | | // The error cases are : |
| | | // - either local error immediatly caught below |
| | | // - a remote error we will receive as an ErrorMsg |
| | | /* |
| | | The normal success processing is now to receive InitTargetMsg then |
| | | entries from the remote server. |
| | | The error cases are : |
| | | - either local error immediately caught below |
| | | - a remote error we will receive as an ErrorMsg |
| | | */ |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | * |
| | | * @param initTargetMsgReceived The message received from the remote server. |
| | | * |
| | | * @param requestorServerId The serverId of the server that requested the |
| | | * @param requesterServerId The serverId of the server that requested the |
| | | * initialization meaning the server where the |
| | | * task has initially been created (this server, |
| | | * or the remote server). |
| | | */ |
| | | void initialize(InitializeTargetMsg initTargetMsgReceived, |
| | | int requestorServerId) |
| | | int requesterServerId) |
| | | { |
| | | InitializeTask initFromtask = null; |
| | | InitializeTask initFromTask = null; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering initialize - domain=" + this); |
| | |
| | | // Acquire an import context if no already done (and initialize). |
| | | if (initTargetMsgReceived.getInitiatorID() == this.serverID) |
| | | { |
| | | // The initTargetMsgReceived received is the answer to a request that |
| | | // we (this server) sent previously. In this case, so the IEContext |
| | | // has been already acquired when the request was published in order |
| | | // to store the task (to be updated with the status at the end). |
| | | /* |
| | | The initTargetMsgReceived received is the answer to a request that |
| | | we (this server) sent previously. In this case, so the IEContext |
| | | has been already acquired when the request was published in order |
| | | to store the task (to be updated with the status at the end). |
| | | */ |
| | | } |
| | | else |
| | | { |
| | | // The initTargetMsgReceived is for an import initiated by the remote |
| | | // server. |
| | | // Test and set if no import already in progress |
| | | /* |
| | | The initTargetMsgReceived is for an import initiated by the remote |
| | | server. |
| | | Test and set if no import already in progress |
| | | */ |
| | | acquireIEContext(true); |
| | | } |
| | | |
| | |
| | | ieContext.initWindow = initTargetMsgReceived.getInitWindow(); |
| | | // Protocol version is -1 when not known. |
| | | ieContext.exporterProtocolVersion = getProtocolVersion(source); |
| | | initFromtask = (InitializeTask)ieContext.initializeTask; |
| | | initFromTask = (InitializeTask)ieContext.initializeTask; |
| | | |
| | | // Lauch the import |
| | | // Launch the import |
| | | importBackend(new ReplInputStream(this)); |
| | | |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | // Store the exception raised. It will be considered if no other exception |
| | | // has been previously stored in the context |
| | | /* |
| | | Store the exception raised. It will be considered if no other exception |
| | | has been previously stored in the context |
| | | */ |
| | | if (ieContext.getException() == null) |
| | | ieContext.setException(e); |
| | | } |
| | |
| | | + " ends import with exception=" + ieContext.getException() |
| | | + " connected=" + broker.isConnected()); |
| | | |
| | | // It is necessary to restart (reconnect to RS) for different reasons |
| | | // - when everything went well, reconnect in order to exchange |
| | | // new state, new generation ID |
| | | // - when we have connection failure, reconnect to retry a new import |
| | | // right here, right now |
| | | // we never want retryOnFailure if we fails reconnecting in the restart. |
| | | /* |
| | | It is necessary to restart (reconnect to RS) for different reasons |
| | | - when everything went well, reconnect in order to exchange |
| | | new state, new generation ID |
| | | - when we have connection failure, reconnect to retry a new import |
| | | right here, right now |
| | | we never want retryOnFailure if we fails reconnecting in the restart. |
| | | */ |
| | | broker.reStart(false); |
| | | |
| | | if (ieContext.getException() != null) |
| | | { |
| | | if (broker.isConnected() && (initFromtask != null) |
| | | if (broker.isConnected() && (initFromTask != null) |
| | | && (++ieContext.attemptCnt<2)) |
| | | { |
| | | // Worth a new attempt |
| | | // since initFromtask is in this server, connection is ok |
| | | /* |
| | | Worth a new attempt |
| | | since initFromTask is in this server, connection is ok |
| | | */ |
| | | try |
| | | { |
| | | |
| | | // Wait for the exporter to stabilize - eventually reconnect as |
| | | // well if it was connected to the same RS than the one we lost ... |
| | | /* |
| | | Wait for the exporter to stabilize - eventually reconnect as |
| | | well if it was connected to the same RS than the one we lost ... |
| | | */ |
| | | Thread.sleep(1000); |
| | | |
| | | // Restart the whole import protocol exchange by sending again |
| | | // the request |
| | | /* |
| | | Restart the whole import protocol exchange by sending again |
| | | the request |
| | | */ |
| | | logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get( |
| | | ieContext.getException().getLocalizedMessage())); |
| | | |
| | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // An error occurs when sending a new request for a new import. |
| | | // This error is not stored, prefering to keep the initial one. |
| | | /* |
| | | An error occurs when sending a new request for a new import. |
| | | This error is not stored, prefering to keep the initial one. |
| | | */ |
| | | logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get( |
| | | e.getLocalizedMessage(), |
| | | ieContext.getException().getLocalizedMessage())); |
| | |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | + " ends initialization with exception=" + ieContext.getException() |
| | | + " connected=" + broker.isConnected() |
| | | + " task=" + initFromtask |
| | | + " task=" + initFromTask |
| | | + " attempt=" + ieContext.attemptCnt); |
| | | |
| | | try |
| | |
| | | if (broker.isConnected() && (ieContext.getException() != null)) |
| | | { |
| | | // Let's notify the exporter |
| | | ErrorMsg errorMsg = new ErrorMsg(requestorServerId, |
| | | ErrorMsg errorMsg = new ErrorMsg(requesterServerId, |
| | | ieContext.getException().getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | else // !broker.isConnected() |
| | | { |
| | | // Don't try to reconnect here. |
| | | // The current running thread is the listener thread and will loop on |
| | | // receive() that is expected to manage reconnects attempt. |
| | | /* |
| | | Don't try to reconnect here. |
| | | The current running thread is the listener thread and will loop on |
| | | receive() that is expected to manage reconnects attempt. |
| | | */ |
| | | } |
| | | |
| | | // Update the task that initiated the import must be the last thing. |
| | | // Particularly, broker.restart() after import success must be done |
| | | // before some other operations/tasks to be launched, |
| | | // like resetting the generation ID. |
| | | if (initFromtask != null) |
| | | /* |
| | | Update the task that initiated the import must be the last thing. |
| | | Particularly, broker.restart() after import success must be done |
| | | before some other operations/tasks to be launched, |
| | | like resetting the generation ID. |
| | | */ |
| | | if (initFromTask != null) |
| | | { |
| | | initFromtask.updateTaskCompletionState(ieContext.getException()); |
| | | initFromTask.updateTaskCompletionState(ieContext.getException()); |
| | | } |
| | | } |
| | | finally |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the protocol version of the DS related to the provided serverid. |
| | | * Return the protocol version of the DS related to the provided serverId. |
| | | * Returns -1 when the protocol version is not known. |
| | | * @param dsServerId The provided serverid. |
| | | * @return The procotol version. |
| | | * @param dsServerId The provided serverId. |
| | | * @return The protocol version. |
| | | */ |
| | | short getProtocolVersion(int dsServerId) |
| | | { |
| | |
| | | private void checkGenerationID(long generationID) |
| | | throws DirectoryException |
| | | { |
| | | boolean allset = true; |
| | | boolean allSet = true; |
| | | |
| | | for (int i = 0; i< 50; i++) |
| | | { |
| | | allset = true; |
| | | allSet = true; |
| | | for (RSInfo rsInfo : getRsList()) |
| | | { |
| | | // the 'empty' RSes (generationId==-1) are considered as good citizens |
| | |
| | | } catch (InterruptedException e) |
| | | { |
| | | } |
| | | allset = false; |
| | | allSet = false; |
| | | break; |
| | | } |
| | | } |
| | | if (allset) |
| | | if (allSet) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | if (!allset) |
| | | if (!allSet) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID); |
| | |
| | | */ |
| | | void processUpdateDoneSynchronous(UpdateMsg msg) |
| | | { |
| | | // Warning: in synchronous mode, no way to tell the replay of an update went |
| | | // wrong Just put null in processUpdateDone so that if assured replication |
| | | // is used the ack is sent without error at replay flag. |
| | | /* |
| | | Warning: in synchronous mode, no way to tell the replay of an update went |
| | | wrong Just put null in processUpdateDone so that if assured replication |
| | | is used the ack is sent without error at replay flag. |
| | | */ |
| | | processUpdateDone(msg, null); |
| | | state.update(msg.getChangeNumber()); |
| | | } |
| | |
| | | */ |
| | | public boolean isConnected() |
| | | { |
| | | if (broker != null) |
| | | return broker.isConnected(); |
| | | else |
| | | return false; |
| | | return broker != null && broker.isConnected(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean hasConnectionError() |
| | | { |
| | | if (broker != null) |
| | | return broker.hasConnectionError(); |
| | | else |
| | | return true; |
| | | return broker == null || broker.hasConnectionError(); |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * Gets the number of updates sent in assured safe read mode that have not |
| | | * been acknowledged per server. |
| | | * @return The number of updates sent in assured safe read mode that have not |
| | | * been acknowledged per server. |
| | | * @return A copy of the map that contains the number of updates sent in |
| | | * assured safe read mode that have not been acknowledged per server. |
| | | */ |
| | | public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates() |
| | | { |
| | | // Clone a snapshot with synchronized section to have a consistent view in |
| | | // monitoring |
| | | Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>(); |
| | | synchronized(assuredSrServerNotAcknowledgedUpdates) |
| | | { |
| | | Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet(); |
| | | for (Integer serverId : keySet) |
| | | { |
| | | Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId); |
| | | snapshot.put(serverId, i); |
| | | } |
| | | return new HashMap<Integer, Integer>( |
| | | assuredSrServerNotAcknowledgedUpdates); |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * Gets the number of updates sent in assured safe data mode that have not |
| | | * been acknowledged due to timeout error per server. |
| | | * @return The number of updates sent in assured safe data mode that have not |
| | | * been acknowledged due to timeout error per server. |
| | | * @return A copy of the map that contains the number of updates sent in |
| | | * assured safe data mode that have not been acknowledged due to timeout |
| | | * error per server. |
| | | */ |
| | | public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates() |
| | | { |
| | | // Clone a snapshot with synchronized section to have a consistent view in |
| | | // monitoring |
| | | Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>(); |
| | | synchronized(assuredSdServerTimeoutUpdates) |
| | | { |
| | | Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet(); |
| | | for (Integer serverId : keySet) |
| | | { |
| | | Integer i = assuredSdServerTimeoutUpdates.get(serverId); |
| | | snapshot.put(serverId, i); |
| | | } |
| | | return new HashMap<Integer, Integer>(assuredSdServerTimeoutUpdates); |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | /** |
| | |
| | | assuredSrTimeoutUpdates = new AtomicInteger(0); |
| | | assuredSrWrongStatusUpdates = new AtomicInteger(0); |
| | | assuredSrReplayErrorUpdates = new AtomicInteger(0); |
| | | assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>(); |
| | | synchronized (assuredSrServerNotAcknowledgedUpdates) |
| | | { |
| | | assuredSrServerNotAcknowledgedUpdates.clear(); |
| | | } |
| | | assuredSrReceivedUpdates = new AtomicInteger(0); |
| | | assuredSrReceivedUpdatesAcked = new AtomicInteger(0); |
| | | assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); |
| | | assuredSdSentUpdates = new AtomicInteger(0); |
| | | assuredSdAcknowledgedUpdates = new AtomicInteger(0); |
| | | assuredSdTimeoutUpdates = new AtomicInteger(0); |
| | | assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>(); |
| | | synchronized (assuredSdServerTimeoutUpdates) |
| | | { |
| | | assuredSdServerTimeoutUpdates.clear(); |
| | | } |
| | | } |
| | | |
| | | /* |
| | |
| | | { |
| | | synchronized (sessionLock) |
| | | { |
| | | // Stop the broker first in order to prevent the listener from |
| | | // reconnecting - see OPENDJ-457. |
| | | /* |
| | | Stop the broker first in order to prevent the listener from |
| | | reconnecting - see OPENDJ-457. |
| | | */ |
| | | if (broker != null) |
| | | { |
| | | broker.stop(); |
| | |
| | | { |
| | | broker.updateWindowAfterReplay(); |
| | | |
| | | // Send an ack if it was requested and the group id is the same of the RS |
| | | // one. Only Safe Read mode makes sense in DS for returning an ack. |
| | | /* |
| | | Send an ack if it was requested and the group id is the same of the RS |
| | | one. Only Safe Read mode makes sense in DS for returning an ack. |
| | | */ |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | if (msg.isAssured()) |
| | | { |
| | |
| | | if (replayErrorMsg != null) |
| | | { |
| | | // Mark the error in the ack |
| | | // -> replay error occured |
| | | // -> replay error occurred |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occured in our server |
| | | // -> replay error occurred in our server |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | ackMsg.setFailedServers(idList); |
| | |
| | | logError(errorMsg); |
| | | } else |
| | | { |
| | | // In safe data mode assured update that comes up to a DS requires no |
| | | // ack from a destinator DS. Safe data mode is based on RS acks only |
| | | /* |
| | | In safe data mode assured update that comes up to a DS requires no |
| | | ack from a recipient DS. Safe data mode is based on RS acks only |
| | | */ |
| | | } |
| | | } |
| | | } |
| | |
| | | * If assured configured, set message accordingly to request an ack in the |
| | | * right assured mode. |
| | | * No ack requested for a RS with a different group id. Assured |
| | | * replication suported for the same locality, i.e: a topology working in |
| | | * replication supported for the same locality, i.e: a topology working in |
| | | * the same |
| | | * geographical location). If we are connected to a RS which is not in our |
| | | * locality, no need to ask for an ack. |
| | |
| | | if (assuredMode == AssuredMode.SAFE_DATA_MODE) |
| | | msg.setSafeDataLevel(assuredSdLevel); |
| | | |
| | | // Add the assured message to the list of update that are |
| | | // waiting for acks |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | waitingAckMsgs.put(msg.getChangeNumber(), msg); |
| | | } |
| | | /* |
| | | Add the assured message to the list of update that are |
| | | waiting for acks |
| | | */ |
| | | waitingAckMsgs.put(msg.getChangeNumber(), msg); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | // WARNING: this timeout may be difficult to optimize: too low, it |
| | | // may use too much CPU, too high, it may penalize performance... |
| | | /* |
| | | WARNING: this timeout may be difficult to optimize: too low, it |
| | | may use too much CPU, too high, it may penalize performance... |
| | | */ |
| | | msg.wait(10); |
| | | } catch (InterruptedException e) |
| | | { |
| | |
| | | // Timeout ? |
| | | if ( (System.currentTimeMillis() - startTime) >= assuredTimeout ) |
| | | { |
| | | // Timeout occured, be sure that ack is not being received and if so, |
| | | // remove the update from the wait list, log the timeout error and |
| | | // also update assured monitoring counters |
| | | /* |
| | | Timeout occurred, be sure that ack is not being received and if so, |
| | | remove the update from the wait list, log the timeout error and |
| | | also update assured monitoring counters |
| | | */ |
| | | UpdateMsg update; |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | update = waitingAckMsgs.remove(cn); |
| | | } |
| | | update = waitingAckMsgs.remove(cn); |
| | | |
| | | if (update != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Publish informations to the Replication Service (not assured mode). |
| | | * Publish information to the Replication Service (not assured mode). |
| | | * |
| | | * @param msg The byte array containing the informations that should |
| | | * @param msg The byte array containing the information that should |
| | | * be sent to the remote entities. |
| | | */ |
| | | public void publish(byte[] msg) |
| | |
| | | synchronized (this) |
| | | { |
| | | update = new UpdateMsg(generator.newChangeNumber(), msg); |
| | | |
| | | // If assured replication is configured, this will prepare blocking |
| | | // mechanism. If assured replication is disabled, this returns |
| | | // immediately |
| | | /* |
| | | If assured replication is configured, this will prepare blocking |
| | | mechanism. If assured replication is disabled, this returns |
| | | immediately |
| | | */ |
| | | prepareWaitForAckIfAssuredEnabled(update); |
| | | |
| | | publish(update); |
| | |
| | | |
| | | try |
| | | { |
| | | // If assured replication is enabled, this will wait for the matching |
| | | // ack or time out. If assured replication is disabled, this returns |
| | | // immediately |
| | | /* |
| | | If assured replication is enabled, this will wait for the matching |
| | | ack or time out. If assured replication is disabled, this returns |
| | | immediately |
| | | */ |
| | | waitForAckIfAssuredEnabled(update); |
| | | } catch (TimeoutException ex) |
| | | { |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString( |
| | | assuredTimeout), msg.toString()); |
| | | assuredTimeout), update.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public boolean importInProgress() |
| | | { |
| | | if (ieContext == null) |
| | | return false; |
| | | else |
| | | return ieContext.importInProgress; |
| | | return ieContext != null && ieContext.importInProgress; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean exportInProgress() |
| | | { |
| | | if (ieContext == null) |
| | | return false; |
| | | else |
| | | return !ieContext.importInProgress; |
| | | return ieContext != null && !ieContext.importInProgress; |
| | | } |
| | | |
| | | /** |