opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -444,6 +444,12 @@ /** * The connection to the curent Replication Server has failed. */ public static final int MSGID_DISCONNECTED_FROM_CHANGELOG = CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63; /** * Register the messages from this class in the core server. * */ @@ -607,6 +613,9 @@ "The Replication is configured for suffix %s " + "but was not able to connect to any Replication Server"); registerMessage(MSGID_NOW_FOUND_CHANGELOG, "A Replication Server was found for suffix %s"); "Replication Server %s now used for Replication Domain %s"); registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG, "The connection to Replication Server %s has been dropped by the " + "Replication Server"); } } opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -54,7 +54,6 @@ public HistoricalCsnOrderingMatchingRule() { super(); // TODO Auto-generated constructor stub } /** @@ -81,7 +80,7 @@ @Override public void initializeMatchingRule(OrderingMatchingRuleCfg configuration) { // TODO Auto-generated method stub // No implementation needed here. } /** opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -106,7 +106,7 @@ * * @return The number of update currently in the list. */ public synchronized int size() public int size() { return pendingChanges.size(); } opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@ import java.util.LinkedHashSet; import java.util.TreeSet; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.protocols.internal.InternalClientConnection; @@ -60,6 +61,7 @@ import org.opends.server.replication.protocol.SocketSession; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.types.DN; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.ErrorLogCategory; @@ -83,7 +85,6 @@ private boolean shutdown = false; private Collection<String> servers; private boolean connected = false; private final Object lock = new Object(); private String replicationServer = "Not connected"; private TreeSet<FakeOperation> replayOperations; private ProtocolSession session = null; @@ -120,7 +121,7 @@ private int numLostConnections = 0; /** * When the broker cannort connect to any replication server * When the broker cannot connect to any replication server * it log an error and keeps continuing every second. * This boolean is set when the first failure happens and is used * to avoid repeating the error message for further failure to connect @@ -129,6 +130,8 @@ */ private boolean connectionError = false; private Object connectPhaseLock = new Object(); /** * Creates a new ReplicationServer Broker for a particular ReplicationDomain. * @@ -217,6 +220,8 @@ boolean checkState = true; boolean receivedResponse = true; synchronized (connectPhaseLock) { while ((!connected) && (!shutdown) && (receivedResponse)) { receivedResponse = false; @@ -269,7 +274,8 @@ * We must not publish changes to a replicationServer that has not * seen all our previous changes because this could cause some * other ldap servers to miss those changes. * Check that the ReplicationServer has seen all our previous changes. * Check that the ReplicationServer has seen all our previous * changes. * If not, try another replicationServer. * If no other replicationServer has seen all our changes, recover * those changes and send them again to any replicationServer. @@ -278,13 +284,13 @@ startMsg.getServerState().getMaxChangeNumber(serverID); if (replServerMaxChangeNumber == null) replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); if ((ourMaxChangeNumber == null) || (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) { replicationServer = ServerAddr.toString(); maxSendWindow = startMsg.getWindowSize(); this.sendWindow = new Semaphore(maxSendWindow); connected = true; startHeartBeat(); break; @@ -306,6 +312,9 @@ else { replayOperations.clear(); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "going to search for changes", 1); /* * Get all the changes that have not been seen by this * replicationServer and update it @@ -327,8 +336,10 @@ { /* * An error happened trying to search for the updates * This server therefore can't start acepting new updates. * TODO : should stop the LDAP server (how to ?) * This server will start acepting again new updates but * some inconsistencies will stay between servers. * TODO : REPAIR : log an error for the repair tool * that will need to resynchronize the servers. */ int msgID = MSGID_CANNOT_RECOVER_CHANGES; String message = getMessage(msgID); @@ -340,13 +351,18 @@ { replicationServer = ServerAddr.toString(); maxSendWindow = startMsg.getWindowSize(); this.sendWindow = new Semaphore(maxSendWindow); connected = true; for (FakeOperation replayOp : replayOperations) { publish(replayOp.generateMessage()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "sendingChange", 1); session.publish(replayOp.generateMessage()); } startHeartBeat(); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "changes sent", 1); break; } } @@ -372,10 +388,10 @@ catch (Exception e) { int msgID = MSGID_EXCEPTION_STARTING_SESSION; String message = getMessage(msgID) + stackTraceToSingleLineString(e); String message = getMessage(msgID); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); message + stackTraceToSingleLineString(e), msgID); } finally { @@ -383,10 +399,6 @@ { if (session != null) { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "Broker : connect closing session" , 1); session.close(); session = null; } @@ -406,12 +418,6 @@ logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); try { Thread.sleep(500); } catch (InterruptedException e) { } checkState = false; } } @@ -419,17 +425,21 @@ if (connected) { // This server has connected correctly. // let's check if it was previosuly on error, in this case log // a message to let the administratot know that the failure was resolved. if (connectionError) { // Log a message to let the administrator know that the failure was // resolved. // wakeup all the thread that were waiting on the window // on the previous connection. connectionError = false; if (sendWindow != null) sendWindow.release(Integer.MAX_VALUE); this.sendWindow = new Semaphore(maxSendWindow); connectPhaseLock.notify(); int msgID = MSGID_NOW_FOUND_CHANGELOG; String message = getMessage(msgID, baseDn.toString()); String message = getMessage(msgID, replicationServer, baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); } } else { /* @@ -441,6 +451,7 @@ { checkState = false; connectionError = true; connectPhaseLock.notify(); int msgID = MSGID_COULD_NOT_FIND_CHANGELOG; String message = getMessage(msgID, baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, @@ -448,6 +459,7 @@ } } } } /** * Start the heartbeat monitor thread. @@ -530,30 +542,90 @@ public void publish(ReplicationMessage msg) { boolean done = false; ProtocolSession failingSession = session; while (!done) { if (connectionError) { // It was not possible to connect to any replication server. // Since the operation was already processed, we have no other // choice than to return without sending the ReplicationMessage // and relying on the resend procedure of the connect phase to // fix the problem when we finally connect. return; synchronized (lock) } try { boolean credit; ProtocolSession current_session; Semaphore currentWindowSemaphore; // save the session at the time when we acquire the // sendwindow credit so that we can make sure later // that the session did not change in between. // This is necessary to make sure that we don't publish a message // on a session with a credit that was acquired from a previous // session. synchronized (connectPhaseLock) { current_session = session; currentWindowSemaphore = sendWindow; } if (msg instanceof UpdateMessage) { // Acquiring the window credit must be done outside of the // connectPhaseLock because it can be blocking and we don't // want to hold off reconnection in case the connection dropped. credit = currentWindowSemaphore.tryAcquire( (long) 500, TimeUnit.MILLISECONDS); } else { credit = true; } if (credit) { synchronized (connectPhaseLock) { // check the session. If it has changed, some // deconnection/reconnection happened and we need to restart from // scratch. if (session == current_session) { session.publish(msg); done = true; } } } if (!credit) { // the window is still closed. // Send a WindowProbe message to wakeup the receiver in case the // window update message was lost somehow... // then loop to check again if connection was closed. session.publish(new WindowProbe()); } } catch (IOException e) { // The receive threads should handle reconnection or // mark this broker in error. Just retry. synchronized (connectPhaseLock) { try { if (this.connected == false) this.reStart(failingSession); if (msg instanceof UpdateMessage) sendWindow.acquire(); session.publish(msg); done = true; } catch (IOException e) connectPhaseLock.wait(100); } catch (InterruptedException e1) { this.reStart(failingSession); // ignore } } } catch (InterruptedException e) { this.reStart(failingSession); } // just loop. } } } @@ -561,6 +633,10 @@ /** * Receive a message. * This method is not multithread safe and should either always be * called in a single thread or protected by a locking mechanism * before being called. * * @return the received message * @throws SocketTimeoutException if the timeout set by setSoTimeout * has expired @@ -603,13 +679,15 @@ { if (shutdown == false) { synchronized (lock) { int msgID = MSGID_DISCONNECTED_FROM_CHANGELOG; String message = getMessage(msgID, replicationServer); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message + " " + e.getMessage(), msgID); this.reStart(failingSession); } } } } return null; } opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -561,7 +561,7 @@ { if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) { // this policy imply that we always aceept updates. // this policy imply that we always accept updates. return true; } if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) @@ -587,7 +587,7 @@ } } // we should never get there as the only possible policies are // ACCEPT_UPDATES and DENY_UPDATES // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES return true; } opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -59,7 +59,7 @@ @Override() public void initializeMonitorProvider(MonitorProviderCfg configuration) { // TODO Auto-generated method stub // no implementation needed. } /** opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -52,6 +52,7 @@ static final byte MSG_TYPE_ENTRY = 12; static final byte MSG_TYPE_DONE = 13; static final byte MSG_TYPE_ERROR = 14; static final byte MSG_TYPE_WINDOW_PROBE = 15; // Adding a new type of message here probably requires to // change accordingly generateMsg method below @@ -136,6 +137,9 @@ case MSG_TYPE_ERROR: msg = new ErrorMessage(buffer); break; case MSG_TYPE_WINDOW_PROBE: msg = new WindowProbe(buffer); break; default: throw new DataFormatException("received message with unknown type"); } opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
@@ -32,9 +32,13 @@ /** * This message is used by LDAP server when they first connect. * to a replication server to let them know who they are and what is their state * (their RUV) * This message is used by LDAP server or by Replication Servers to * update the send window of the remote entities. * * A receiving entity should create such a message with a given credit * when it wants to open the send window of the remote entity. * A LDAP or Replication Server should increase its send window when receiving * such a message. */ public class WindowMessage extends ReplicationMessage implements Serializable @@ -47,7 +51,7 @@ * Create a new WindowMessage. * * @param numAck The number of acknowledged messages. * The window will be increase by this number. * The window will be increase by this credit number. */ public WindowMessage(int numAck) { opends/src/server/org/opends/server/replication/protocol/WindowProbe.java
New file @@ -0,0 +1,84 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; import java.io.Serializable; import java.util.zip.DataFormatException; /** * This message is used by LDAP or Replication Server that have been * out of credit for a while and want to check that the remote servers. * * A sending entity that is blocked because its send window is closed * for a while should create such a message to check that the window * closure is valid. * * An entity that received such a message should respond with a * WindowUpdate message indicating the curent credit available. */ public class WindowProbe extends ReplicationMessage implements Serializable { private static final long serialVersionUID = 8442267608764026867L; /** * Create a new WindowProbe message. */ public WindowProbe() { } /** * Creates a new WindowProbe from its encoded form. * * @param in The byte array containing the encoded form of the * WindowMessage. * @throws DataFormatException If the byte array does not contain a valid * encoded form of the WindowMessage. */ public WindowProbe(byte[] in) throws DataFormatException { // WindowProbe Message only contains its type. if (in[0] != MSG_TYPE_WINDOW_PROBE) throw new DataFormatException("input is not a valid Window Message"); } /** * {@inheritDoc} */ @Override public byte[] getBytes() { // WindowProbe Message only contains its type. byte[] resultByteArray = new byte[1]; resultByteArray[0] = MSG_TYPE_WINDOW_PROBE; return resultByteArray; } } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -60,6 +60,7 @@ import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; @@ -1372,4 +1373,34 @@ session.publish(msg); } /** * Process the reception of a WindowProbe message. * * @param windowProbeMsg The message to process. * * @throws IOException When the session becomes unavailable. */ public void process(WindowProbe windowProbeMsg) throws IOException { if (rcvWindow > 0) { // The LDAP server believes that its window is closed // while it is not, this means that some problem happened in the // window exchange procedure ! // lets update the LDAP server with out current window size and hope // that everything will work better in the futur. // TODO also log an error message. WindowMessage msg = new WindowMessage(rcvWindow); session.publish(msg); outAckCount++; } else { // Both the LDAP server and the replication server believes that the // window is closed. Lets check the flowcontrol in case we // can now resume operations and send a windowMessage if necessary. checkWindow(); } } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -45,6 +45,7 @@ import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.loggers.debug.DebugTracer; @@ -159,6 +160,11 @@ ErrorMessage errorMsg = (ErrorMessage) msg; handler.process(errorMsg); } else if (msg instanceof WindowProbe) { WindowProbe windowProbeMsg = (WindowProbe) msg; handler.process(windowProbeMsg); } else if (msg == null) { /* @@ -184,7 +190,7 @@ String message = getMessage(msgID, handler.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message + e.getMessage(), msgID); message + ": " + e.getMessage(), msgID); } catch (ClassNotFoundException e) { /* opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -534,7 +534,7 @@ /** * Test that WindowMessageTest encoding and decoding works * by checking that : msg == new WindowMessageTest(msg.getBytes()). * by checking that : msg == new WindowMessage(msg.getBytes()). */ @Test() public void WindowMessageTest() throws Exception @@ -545,6 +545,18 @@ } /** * Test that WindowProbe encoding and decoding works * by checking that : new WindowProbe(msg.getBytes()) does not throws * an exception. */ @Test() public void WindowProbeTest() throws Exception { WindowProbe msg = new WindowProbe(); new WindowProbe(msg.getBytes()); } /** * Test that EntryMessage encoding and decoding works * by checking that : msg == new EntryMessageTest(msg.getBytes()). */ opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -27,11 +27,15 @@ package org.opends.server.replication.server; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import static org.opends.server.replication.protocol.OperationContext.*; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.SortedSet; @@ -49,7 +53,13 @@ import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.ModifyDnContext; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplServerStartMessage; import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.ServerStartMessage; import org.opends.server.replication.protocol.SocketSession; import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.Attribute; import org.opends.server.types.DN; @@ -748,6 +758,56 @@ } /** * Test that the Replication sends back correctly WindowsUpdate * when we send a WindowProbe. */ @Test() public void windowProbeTest() throws Exception { final int WINDOW = 10; /* * Open a socket connection to the replication server */ InetSocketAddress ServerAddr = new InetSocketAddress( InetAddress.getByName("localhost"), replicationServerPort); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(ServerAddr, 500); SocketSession session = new SocketSession(socket); /* * Send our ServerStartMessage. */ ServerStartMessage msg = new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"), 0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(), ProtocolVersion.currentVersion()); session.publish(msg); /* * Read the ReplServerStartMessage that should come back. */ session.setSoTimeout(10000); ReplServerStartMessage replStartMsg = (ReplServerStartMessage) session.receive(); int serverwindow = replStartMsg.getWindowSize(); // push a WindowProbe message session.publish(new WindowProbe()); WindowMessage windowMsg = (WindowMessage) session.receive(); assertEquals(serverwindow, windowMsg.getNumAck()); // check that this did not change the window by sending a probe again. session.publish(new WindowProbe()); windowMsg = (WindowMessage) session.receive(); assertEquals(serverwindow, windowMsg.getNumAck()); } /** * After the tests stop the replicationServer. */ @AfterClass()