| | |
| | | while (update == null) |
| | | { |
| | | InitializeRequestMessage initMsg = null; |
| | | synchronized (broker) |
| | | ReplicationMessage msg; |
| | | try |
| | | { |
| | | ReplicationMessage msg; |
| | | try |
| | | msg = broker.receive(); |
| | | if (msg == null) |
| | | { |
| | | msg = broker.receive(); |
| | | if (msg == null) |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMessage)) |
| | | TRACER.debugVerbose("Message received <" + msg + ">"); |
| | | |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | receiveAck(ack); |
| | | } |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | initMsg = (InitializeRequestMessage)msg; |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMessage importMsg = (InitializeTargetMessage) msg; |
| | | |
| | | try |
| | | { |
| | | // This must be done while we are still holding the |
| | | // broker lock because we are now going to receive a |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | initialize(importMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(de.getMessageObject()); |
| | | TRACER.debugInfo(Message.toString(mb.toMessage())); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | else |
| | | { |
| | | /* We can receive an error message from the replication server |
| | | * in the following cases : |
| | | * - we connected with an incorrect generation id |
| | | */ |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | } |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | update = (UpdateMessage) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | // just retry |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMessage)) |
| | | TRACER.debugVerbose("Message received <" + msg + ">"); |
| | | |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | receiveAck(ack); |
| | | } |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | initMsg = (InitializeRequestMessage)msg; |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMessage importMsg = (InitializeTargetMessage) msg; |
| | | |
| | | try |
| | | { |
| | | // This must be done while we are still holding the |
| | | // broker lock because we are now going to receive a |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | initialize(importMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(de.getMessageObject()); |
| | | TRACER.debugInfo(Message.toString(mb.toMessage())); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | else |
| | | { |
| | | /* We can receive an error message from the replication server |
| | | * in the following cases : |
| | | * - we connected with an incorrect generation id |
| | | */ |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | } |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | update = (UpdateMessage) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | // just retry |
| | | } |
| | | // Test if we have received and export request message and |
| | | // if that's the case handle it now. |
| | |
| | | shutdown = true; |
| | | |
| | | // Stop the listener thread |
| | | listenerThread.shutdown(); |
| | | if (listenerThread != null) |
| | | { |
| | | listenerThread.shutdown(); |
| | | } |
| | | |
| | | synchronized (this) |
| | | { |
| | |
| | | broker.stop(); |
| | | |
| | | // Wait for the listener thread to stop |
| | | listenerThread.waitForShutdown(); |
| | | if (listenerThread != null) |
| | | listenerThread.waitForShutdown(); |
| | | |
| | | // wait for completion of the persistentServerState thread. |
| | | try |
| | |
| | | { |
| | | if (!dependency) |
| | | { |
| | | broker.updateWindowAfterReplay(); |
| | | if (msg.isAssured()) |
| | | ack(msg.getChangeNumber()); |
| | | incProcessedUpdates(); |