From e2447ca29d7539529ef05a40a26abc2f7ae35d8c Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 04 Jul 2007 15:12:04 +0000
Subject: [PATCH] The problem was that the publisher thread is stuck waiting for the window to re-open on a connection that has been closed without notifying the publisher.
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 510 ++++++++++++++++++++++----------------
opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java | 11
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 14 +
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 31 ++
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java | 4
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 60 ++++
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java | 84 ++++++
13 files changed, 516 insertions(+), 229 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 938c941..72f4ad0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -444,6 +444,12 @@
/**
+ * The connection to the curent Replication Server has failed.
+ */
+ public static final int MSGID_DISCONNECTED_FROM_CHANGELOG =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
+
+ /**
* Register the messages from this class in the core server.
*
*/
@@ -607,6 +613,9 @@
"The Replication is configured for suffix %s "
+ "but was not able to connect to any Replication Server");
registerMessage(MSGID_NOW_FOUND_CHANGELOG,
- "A Replication Server was found for suffix %s");
+ "Replication Server %s now used for Replication Domain %s");
+ registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
+ "The connection to Replication Server %s has been dropped by the "
+ + "Replication Server");
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
index 70d43f9..36b7003 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -54,7 +54,6 @@
public HistoricalCsnOrderingMatchingRule()
{
super();
- // TODO Auto-generated constructor stub
}
/**
@@ -81,7 +80,7 @@
@Override
public void initializeMatchingRule(OrderingMatchingRuleCfg configuration)
{
- // TODO Auto-generated method stub
+ // No implementation needed here.
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index 7d181f0..ff5105f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -106,7 +106,7 @@
*
* @return The number of update currently in the list.
*/
- public synchronized int size()
+ public int size()
{
return pendingChanges.size();
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index bb298eb..c1fdbb6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -60,6 +61,7 @@
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
@@ -83,7 +85,6 @@
private boolean shutdown = false;
private Collection<String> servers;
private boolean connected = false;
- private final Object lock = new Object();
private String replicationServer = "Not connected";
private TreeSet<FakeOperation> replayOperations;
private ProtocolSession session = null;
@@ -120,7 +121,7 @@
private int numLostConnections = 0;
/**
- * When the broker cannort connect to any replication server
+ * When the broker cannot connect to any replication server
* it log an error and keeps continuing every second.
* This boolean is set when the first failure happens and is used
* to avoid repeating the error message for further failure to connect
@@ -129,6 +130,8 @@
*/
private boolean connectionError = false;
+ private Object connectPhaseLock = new Object();
+
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
*
@@ -217,234 +220,243 @@
boolean checkState = true;
boolean receivedResponse = true;
- while ((!connected) && (!shutdown) && (receivedResponse))
+ synchronized (connectPhaseLock)
{
- receivedResponse = false;
- for (String server : servers)
+ while ((!connected) && (!shutdown) && (receivedResponse))
{
- int separator = server.lastIndexOf(':');
- String port = server.substring(separator + 1);
- String hostname = server.substring(0, separator);
-
- try
+ receivedResponse = false;
+ for (String server : servers)
{
- /*
- * Open a socket connection to the next candidate.
- */
- InetSocketAddress ServerAddr = new InetSocketAddress(
- InetAddress.getByName(hostname), Integer.parseInt(port));
- Socket socket = new Socket();
- socket.setReceiveBufferSize(1000000);
- socket.setTcpNoDelay(true);
- socket.connect(ServerAddr, 500);
- session = new SocketSession(socket);
+ int separator = server.lastIndexOf(':');
+ String port = server.substring(separator + 1);
+ String hostname = server.substring(0, separator);
- /*
- * Send our ServerStartMessage.
- */
- ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
- maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow*2, heartbeatInterval, state,
- protocolVersion);
- session.publish(msg);
-
-
- /*
- * Read the ReplServerStartMessage that should come back.
- */
- session.setSoTimeout(1000);
- startMsg = (ReplServerStartMessage) session.receive();
- receivedResponse = true;
-
- /*
- * We have sent our own protocol version to the replication server.
- * The replication server will use the same one (or an older one
- * if it is an old replication server).
- */
- protocolVersion = ProtocolVersion.minWithCurrent(
- startMsg.getVersion());
- session.setSoTimeout(timeout);
-
- /*
- * We must not publish changes to a replicationServer that has not
- * seen all our previous changes because this could cause some
- * other ldap servers to miss those changes.
- * Check that the ReplicationServer has seen all our previous changes.
- * If not, try another replicationServer.
- * If no other replicationServer has seen all our changes, recover
- * those changes and send them again to any replicationServer.
- */
- ChangeNumber replServerMaxChangeNumber =
- startMsg.getServerState().getMaxChangeNumber(serverID);
- if (replServerMaxChangeNumber == null)
- replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
- ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID);
- if ((ourMaxChangeNumber == null) ||
- (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+ try
{
- replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
- this.sendWindow = new Semaphore(maxSendWindow);
- connected = true;
- startHeartBeat();
- break;
- }
- else
- {
- if (checkState == true)
+ /*
+ * Open a socket connection to the next candidate.
+ */
+ InetSocketAddress ServerAddr = new InetSocketAddress(
+ InetAddress.getByName(hostname), Integer.parseInt(port));
+ Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
+ socket.setTcpNoDelay(true);
+ socket.connect(ServerAddr, 500);
+ session = new SocketSession(socket);
+
+ /*
+ * Send our ServerStartMessage.
+ */
+ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
+ maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+ halfRcvWindow*2, heartbeatInterval, state,
+ protocolVersion);
+ session.publish(msg);
+
+
+ /*
+ * Read the ReplServerStartMessage that should come back.
+ */
+ session.setSoTimeout(1000);
+ startMsg = (ReplServerStartMessage) session.receive();
+ receivedResponse = true;
+
+ /*
+ * We have sent our own protocol version to the replication server.
+ * The replication server will use the same one (or an older one
+ * if it is an old replication server).
+ */
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ startMsg.getVersion());
+ session.setSoTimeout(timeout);
+
+ /*
+ * We must not publish changes to a replicationServer that has not
+ * seen all our previous changes because this could cause some
+ * other ldap servers to miss those changes.
+ * Check that the ReplicationServer has seen all our previous
+ * changes.
+ * If not, try another replicationServer.
+ * If no other replicationServer has seen all our changes, recover
+ * those changes and send them again to any replicationServer.
+ */
+ ChangeNumber replServerMaxChangeNumber =
+ startMsg.getServerState().getMaxChangeNumber(serverID);
+ if (replServerMaxChangeNumber == null)
+ replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+ ChangeNumber ourMaxChangeNumber =
+ state.getMaxChangeNumber(serverID);
+ if ((ourMaxChangeNumber == null) ||
+ (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
- /* This replicationServer is missing some
- * of our changes, we are going to try another server
- * but before log a notice message
- */
- int msgID = MSGID_CHANGELOG_MISSING_CHANGES;
- String message = getMessage(msgID, server);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
+ replicationServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ connected = true;
+ startHeartBeat();
+ break;
}
else
{
- replayOperations.clear();
- /*
- * Get all the changes that have not been seen by this
- * replicationServer and update it
- */
- InternalClientConnection conn =
- InternalClientConnection.getRootConnection();
- LDAPFilter filter = LDAPFilter.decode(
- "("+ Historical.HISTORICALATTRIBUTENAME +
- ">=dummy:" + replServerMaxChangeNumber + ")");
- LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
- attrs.add(Historical.HISTORICALATTRIBUTENAME);
- InternalSearchOperation op = conn.processSearch(
- new ASN1OctetString(baseDn.toString()),
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, 0, false, filter,
- attrs, this);
- if (op.getResultCode() != ResultCode.SUCCESS)
+ if (checkState == true)
{
- /*
- * An error happened trying to search for the updates
- * This server therefore can't start acepting new updates.
- * TODO : should stop the LDAP server (how to ?)
+ /* This replicationServer is missing some
+ * of our changes, we are going to try another server
+ * but before log a notice message
*/
- int msgID = MSGID_CANNOT_RECOVER_CHANGES;
- String message = getMessage(msgID);
+ int msgID = MSGID_CHANGELOG_MISSING_CHANGES;
+ String message = getMessage(msgID, server);
logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.FATAL_ERROR,
- message, msgID);
+ ErrorLogSeverity.NOTICE,
+ message, msgID);
}
else
{
- replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
- this.sendWindow = new Semaphore(maxSendWindow);
- connected = true;
- for (FakeOperation replayOp : replayOperations)
+ replayOperations.clear();
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "going to search for changes", 1);
+ /*
+ * Get all the changes that have not been seen by this
+ * replicationServer and update it
+ */
+ InternalClientConnection conn =
+ InternalClientConnection.getRootConnection();
+ LDAPFilter filter = LDAPFilter.decode(
+ "("+ Historical.HISTORICALATTRIBUTENAME +
+ ">=dummy:" + replServerMaxChangeNumber + ")");
+ LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+ attrs.add(Historical.HISTORICALATTRIBUTENAME);
+ InternalSearchOperation op = conn.processSearch(
+ new ASN1OctetString(baseDn.toString()),
+ SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter,
+ attrs, this);
+ if (op.getResultCode() != ResultCode.SUCCESS)
{
- publish(replayOp.generateMessage());
+ /*
+ * An error happened trying to search for the updates
+ * This server will start acepting again new updates but
+ * some inconsistencies will stay between servers.
+ * TODO : REPAIR : log an error for the repair tool
+ * that will need to resynchronize the servers.
+ */
+ int msgID = MSGID_CANNOT_RECOVER_CHANGES;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.FATAL_ERROR,
+ message, msgID);
}
- startHeartBeat();
- break;
+ else
+ {
+ replicationServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ connected = true;
+ for (FakeOperation replayOp : replayOperations)
+ {
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "sendingChange", 1);
+ session.publish(replayOp.generateMessage());
+ }
+ startHeartBeat();
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "changes sent", 1);
+ break;
+ }
+ }
+ }
+ }
+ catch (ConnectException e)
+ {
+ /*
+ * There was no server waiting on this host:port
+ * Log a notice and try the next replicationServer in the list
+ */
+ if (!connectionError )
+ {
+ // the error message is only logged once to avoid overflowing
+ // the error log
+ int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING;
+ String message = getMessage(msgID, server);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_EXCEPTION_STARTING_SESSION;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message + stackTraceToSingleLineString(e), msgID);
+ }
+ finally
+ {
+ if (connected == false)
+ {
+ if (session != null)
+ {
+ session.close();
+ session = null;
}
}
}
}
- catch (ConnectException e)
+
+ if ((!connected) && (checkState == true) && receivedResponse)
{
/*
- * There was no server waiting on this host:port
- * Log a notice and try the next replicationServer in the list
+ * We could not find a replicationServer that has seen all the
+ * changes that this server has already processed, start again
+ * the loop looking for any replicationServer.
*/
- if (!connectionError )
- {
- // the error message is only logged once to avoid overflowing
- // the error log
- int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING;
- String message = getMessage(msgID, server);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
- }
- }
- catch (Exception e)
- {
- int msgID = MSGID_EXCEPTION_STARTING_SESSION;
- String message = getMessage(msgID) + stackTraceToSingleLineString(e);
+ int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
+ String message = getMessage(msgID);
logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
- finally
- {
- if (connected == false)
- {
- if (session != null)
- {
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "Broker : connect closing session" , 1);
-
- session.close();
- session = null;
- }
- }
+ ErrorLogSeverity.NOTICE,
+ message, msgID);
+ checkState = false;
}
}
- if ((!connected) && (checkState == true) && receivedResponse)
+ if (connected)
+ {
+ // This server has connected correctly.
+ // Log a message to let the administrator know that the failure was
+ // resolved.
+ // wakeup all the thread that were waiting on the window
+ // on the previous connection.
+ connectionError = false;
+ if (sendWindow != null)
+ sendWindow.release(Integer.MAX_VALUE);
+ this.sendWindow = new Semaphore(maxSendWindow);
+ connectPhaseLock.notify();
+ int msgID = MSGID_NOW_FOUND_CHANGELOG;
+ String message =
+ getMessage(msgID, replicationServer, baseDn.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE, message, msgID);
+ }
+ else
{
/*
- * We could not find a replicationServer that has seen all the
- * changes that this server has already processed, start again
- * the loop looking for any replicationServer.
+ * This server could not find any replicationServer
+ * It's going to start in degraded mode.
+ * Log a message
*/
- int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
- String message = getMessage(msgID);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
- try
+ if (!connectionError)
{
- Thread.sleep(500);
- } catch (InterruptedException e)
- {
+ checkState = false;
+ connectionError = true;
+ connectPhaseLock.notify();
+ int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
+ String message = getMessage(msgID, baseDn.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE, message, msgID);
}
- checkState = false;
- }
- }
-
- if (connected)
- {
- // This server has connected correctly.
- // let's check if it was previosuly on error, in this case log
- // a message to let the administratot know that the failure was resolved.
- if (connectionError)
- {
- connectionError = false;
- int msgID = MSGID_NOW_FOUND_CHANGELOG;
- String message = getMessage(msgID, baseDn.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE, message, msgID);
- }
- }
- else
- {
- /*
- * This server could not find any replicationServer
- * It's going to start in degraded mode.
- * Log a message
- */
- if (!connectionError)
- {
- checkState = false;
- connectionError = true;
- int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
- String message = getMessage(msgID, baseDn.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE, message, msgID);
}
}
}
@@ -530,30 +542,90 @@
public void publish(ReplicationMessage msg)
{
boolean done = false;
- ProtocolSession failingSession = session;
while (!done)
{
if (connectionError)
- return;
- synchronized (lock)
{
- try
+ // It was not possible to connect to any replication server.
+ // Since the operation was already processed, we have no other
+ // choice than to return without sending the ReplicationMessage
+ // and relying on the resend procedure of the connect phase to
+ // fix the problem when we finally connect.
+ return;
+ }
+
+ try
+ {
+ boolean credit;
+ ProtocolSession current_session;
+ Semaphore currentWindowSemaphore;
+
+ // save the session at the time when we acquire the
+ // sendwindow credit so that we can make sure later
+ // that the session did not change in between.
+ // This is necessary to make sure that we don't publish a message
+ // on a session with a credit that was acquired from a previous
+ // session.
+ synchronized (connectPhaseLock)
{
- if (this.connected == false)
- this.reStart(failingSession);
- if (msg instanceof UpdateMessage)
- sendWindow.acquire();
- session.publish(msg);
- done = true;
- } catch (IOException e)
- {
- this.reStart(failingSession);
+ current_session = session;
+ currentWindowSemaphore = sendWindow;
}
- catch (InterruptedException e)
+
+ if (msg instanceof UpdateMessage)
{
- this.reStart(failingSession);
+ // Acquiring the window credit must be done outside of the
+ // connectPhaseLock because it can be blocking and we don't
+ // want to hold off reconnection in case the connection dropped.
+ credit =
+ currentWindowSemaphore.tryAcquire(
+ (long) 500, TimeUnit.MILLISECONDS);
}
+ else
+ {
+ credit = true;
+ }
+ if (credit)
+ {
+ synchronized (connectPhaseLock)
+ {
+ // check the session. If it has changed, some
+ // deconnection/reconnection happened and we need to restart from
+ // scratch.
+ if (session == current_session)
+ {
+ session.publish(msg);
+ done = true;
+ }
+ }
+ }
+ if (!credit)
+ {
+ // the window is still closed.
+ // Send a WindowProbe message to wakeup the receiver in case the
+ // window update message was lost somehow...
+ // then loop to check again if connection was closed.
+ session.publish(new WindowProbe());
+ }
+ } catch (IOException e)
+ {
+ // The receive threads should handle reconnection or
+ // mark this broker in error. Just retry.
+ synchronized (connectPhaseLock)
+ {
+ try
+ {
+ connectPhaseLock.wait(100);
+ } catch (InterruptedException e1)
+ {
+ // ignore
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // just loop.
}
}
}
@@ -561,6 +633,10 @@
/**
* Receive a message.
+ * This method is not multithread safe and should either always be
+ * called in a single thread or protected by a locking mechanism
+ * before being called.
+ *
* @return the received message
* @throws SocketTimeoutException if the timeout set by setSoTimeout
* has expired
@@ -603,10 +679,12 @@
{
if (shutdown == false)
{
- synchronized (lock)
- {
- this.reStart(failingSession);
- }
+ int msgID = MSGID_DISCONNECTED_FROM_CHANGELOG;
+ String message = getMessage(msgID, replicationServer);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ message + " " + e.getMessage(), msgID);
+ this.reStart(failingSession);
}
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 29f3aa6..ac429e8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -561,7 +561,7 @@
{
if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
{
- // this policy imply that we always aceept updates.
+ // this policy imply that we always accept updates.
return true;
}
if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
@@ -587,7 +587,7 @@
}
}
// we should never get there as the only possible policies are
- // ACCEPT_UPDATES and DENY_UPDATES
+ // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
return true;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
index 7cac348..3dffce0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -59,7 +59,7 @@
@Override()
public void initializeMonitorProvider(MonitorProviderCfg configuration)
{
- // TODO Auto-generated method stub
+ // no implementation needed.
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
index 3cd873d..15847ed 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -52,6 +52,7 @@
static final byte MSG_TYPE_ENTRY = 12;
static final byte MSG_TYPE_DONE = 13;
static final byte MSG_TYPE_ERROR = 14;
+ static final byte MSG_TYPE_WINDOW_PROBE = 15;
// Adding a new type of message here probably requires to
// change accordingly generateMsg method below
@@ -136,6 +137,9 @@
case MSG_TYPE_ERROR:
msg = new ErrorMessage(buffer);
break;
+ case MSG_TYPE_WINDOW_PROBE:
+ msg = new WindowProbe(buffer);
+ break;
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
index de39bf3..c311b3b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
@@ -32,9 +32,13 @@
/**
- * This message is used by LDAP server when they first connect.
- * to a replication server to let them know who they are and what is their state
- * (their RUV)
+ * This message is used by LDAP server or by Replication Servers to
+ * update the send window of the remote entities.
+ *
+ * A receiving entity should create such a message with a given credit
+ * when it wants to open the send window of the remote entity.
+ * A LDAP or Replication Server should increase its send window when receiving
+ * such a message.
*/
public class WindowMessage extends ReplicationMessage implements
Serializable
@@ -47,7 +51,7 @@
* Create a new WindowMessage.
*
* @param numAck The number of acknowledged messages.
- * The window will be increase by this number.
+ * The window will be increase by this credit number.
*/
public WindowMessage(int numAck)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java
new file mode 100644
index 0000000..f30a219
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java
@@ -0,0 +1,84 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.Serializable;
+import java.util.zip.DataFormatException;
+
+
+/**
+ * This message is used by LDAP or Replication Server that have been
+ * out of credit for a while and want to check that the remote servers.
+ *
+ * A sending entity that is blocked because its send window is closed
+ * for a while should create such a message to check that the window
+ * closure is valid.
+ *
+ * An entity that received such a message should respond with a
+ * WindowUpdate message indicating the curent credit available.
+ */
+public class WindowProbe extends ReplicationMessage implements
+ Serializable
+{
+ private static final long serialVersionUID = 8442267608764026867L;
+
+ /**
+ * Create a new WindowProbe message.
+ */
+ public WindowProbe()
+ {
+ }
+
+ /**
+ * Creates a new WindowProbe from its encoded form.
+ *
+ * @param in The byte array containing the encoded form of the
+ * WindowMessage.
+ * @throws DataFormatException If the byte array does not contain a valid
+ * encoded form of the WindowMessage.
+ */
+ public WindowProbe(byte[] in) throws DataFormatException
+ {
+ // WindowProbe Message only contains its type.
+ if (in[0] != MSG_TYPE_WINDOW_PROBE)
+ throw new DataFormatException("input is not a valid Window Message");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ // WindowProbe Message only contains its type.
+
+ byte[] resultByteArray = new byte[1];
+ resultByteArray[0] = MSG_TYPE_WINDOW_PROBE;
+
+ return resultByteArray;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a98df94..7a0f72c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -60,6 +60,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -1372,4 +1373,34 @@
session.publish(msg);
}
+
+ /**
+ * Process the reception of a WindowProbe message.
+ *
+ * @param windowProbeMsg The message to process.
+ *
+ * @throws IOException When the session becomes unavailable.
+ */
+ public void process(WindowProbe windowProbeMsg) throws IOException
+ {
+ if (rcvWindow > 0)
+ {
+ // The LDAP server believes that its window is closed
+ // while it is not, this means that some problem happened in the
+ // window exchange procedure !
+ // lets update the LDAP server with out current window size and hope
+ // that everything will work better in the futur.
+ // TODO also log an error message.
+ WindowMessage msg = new WindowMessage(rcvWindow);
+ session.publish(msg);
+ outAckCount++;
+ }
+ else
+ {
+ // Both the LDAP server and the replication server believes that the
+ // window is closed. Lets check the flowcontrol in case we
+ // can now resume operations and send a windowMessage if necessary.
+ checkWindow();
+ }
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 41fb483..16373fc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.loggers.debug.DebugTracer;
@@ -159,6 +160,11 @@
ErrorMessage errorMsg = (ErrorMessage) msg;
handler.process(errorMsg);
}
+ else if (msg instanceof WindowProbe)
+ {
+ WindowProbe windowProbeMsg = (WindowProbe) msg;
+ handler.process(windowProbeMsg);
+ }
else if (msg == null)
{
/*
@@ -184,7 +190,7 @@
String message = getMessage(msgID, handler.toString());
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
- message + e.getMessage(), msgID);
+ message + ": " + e.getMessage(), msgID);
} catch (ClassNotFoundException e)
{
/*
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index bd48b77..c203fea 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -534,7 +534,7 @@
/**
* Test that WindowMessageTest encoding and decoding works
- * by checking that : msg == new WindowMessageTest(msg.getBytes()).
+ * by checking that : msg == new WindowMessage(msg.getBytes()).
*/
@Test()
public void WindowMessageTest() throws Exception
@@ -543,6 +543,18 @@
WindowMessage newMsg = new WindowMessage(msg.getBytes());
assertEquals(msg.getNumAck(), newMsg.getNumAck());
}
+
+ /**
+ * Test that WindowProbe encoding and decoding works
+ * by checking that : new WindowProbe(msg.getBytes()) does not throws
+ * an exception.
+ */
+ @Test()
+ public void WindowProbeTest() throws Exception
+ {
+ WindowProbe msg = new WindowProbe();
+ new WindowProbe(msg.getBytes());
+ }
/**
* Test that EntryMessage encoding and decoding works
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index b1a1eb2..15a03e3 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -27,11 +27,15 @@
package org.opends.server.replication.server;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.opends.server.replication.protocol.OperationContext.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
@@ -49,7 +53,13 @@
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.ServerStartMessage;
+import org.opends.server.replication.protocol.SocketSession;
+import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
@@ -748,6 +758,56 @@
}
/**
+ * Test that the Replication sends back correctly WindowsUpdate
+ * when we send a WindowProbe.
+ */
+ @Test()
+ public void windowProbeTest() throws Exception
+ {
+ final int WINDOW = 10;
+ /*
+ * Open a socket connection to the replication server
+ */
+ InetSocketAddress ServerAddr = new InetSocketAddress(
+ InetAddress.getByName("localhost"), replicationServerPort);
+ Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
+ socket.setTcpNoDelay(true);
+ socket.connect(ServerAddr, 500);
+ SocketSession session = new SocketSession(socket);
+
+ /*
+ * Send our ServerStartMessage.
+ */
+ ServerStartMessage msg =
+ new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
+ 0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
+ ProtocolVersion.currentVersion());
+ session.publish(msg);
+
+ /*
+ * Read the ReplServerStartMessage that should come back.
+ */
+ session.setSoTimeout(10000);
+ ReplServerStartMessage replStartMsg =
+ (ReplServerStartMessage) session.receive();
+ int serverwindow = replStartMsg.getWindowSize();
+
+ // push a WindowProbe message
+ session.publish(new WindowProbe());
+
+ WindowMessage windowMsg = (WindowMessage) session.receive();
+ assertEquals(serverwindow, windowMsg.getNumAck());
+
+ // check that this did not change the window by sending a probe again.
+ session.publish(new WindowProbe());
+
+ windowMsg = (WindowMessage) session.receive();
+ assertEquals(serverwindow, windowMsg.getNumAck());
+ }
+
+
+ /**
* After the tests stop the replicationServer.
*/
@AfterClass()
--
Gitblit v1.10.0