From 13231f3def739a90b963d42853dea768789925f1 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 05 Jun 2007 14:59:58 +0000
Subject: [PATCH] Fix for issue 1764 : infinite loop when no replication server up and running.
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 129 ++++++++++++++++++++++--------------------
1 files changed, 67 insertions(+), 62 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 94cbe62..229f9e9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,8 +207,10 @@
}
boolean checkState = true;
- while ((!connected) && (!shutdown))
+ boolean receivedResponse = true;
+ while ((!connected) && (!shutdown) && (receivedResponse))
{
+ receivedResponse = false;
for (String server : servers)
{
int separator = server.lastIndexOf(':');
@@ -243,6 +245,7 @@
*/
session.setSoTimeout(1000);
startMsg = (ReplServerStartMessage) session.receive();
+ receivedResponse = true;
/*
* We have sent our own protocol version to the replication server.
@@ -274,6 +277,7 @@
maxSendWindow = startMsg.getWindowSize();
this.sendWindow = new Semaphore(maxSendWindow);
connected = true;
+ startHeartBeat();
break;
}
else
@@ -333,6 +337,7 @@
{
publish(replayOp.generateMessage());
}
+ startHeartBeat();
break;
}
}
@@ -375,51 +380,48 @@
}
}
- if (!connected)
+ if ((!connected) && (checkState == true) && receivedResponse)
{
- if (checkState == true)
+ /*
+ * We could not find a replicationServer that has seen all the
+ * changes that this server has already processed, start again
+ * the loop looking for any replicationServer.
+ */
+ int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ message, msgID);
+ try
{
- /*
- * We could not find a replicationServer that has seen all the
- * changes that this server has already processed, start again
- * the loop looking for any replicationServer.
- */
- try
- {
- Thread.sleep(500);
- } catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- checkState = false;
- int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
- String message = getMessage(msgID);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
- }
- else
+ Thread.sleep(500);
+ } catch (InterruptedException e)
{
- /*
- * This server could not find any replicationServer
- * Let's wait a little and try again.
- */
- checkState = false;
- int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
- String message = getMessage(msgID);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE, message, msgID);
- try
- {
- Thread.sleep(1000);
- } catch (InterruptedException e)
- {
- }
}
+ checkState = false;
}
}
+ if (!connected)
+ {
+ /*
+ * This server could not find any replicationServer
+ * It's going to start in degraded mode.
+ * Log a message
+ */
+ checkState = false;
+ int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE, message, msgID);
+ }
+ }
+
+ /**
+ * Start the heartbeat monitor thread.
+ */
+ private void startHeartBeat()
+ {
// Start a heartbeat monitor thread.
if (heartbeatInterval > 0)
{
@@ -432,17 +434,27 @@
/**
+ * restart the ReplicationBroker.
+ */
+ private void reStart()
+ {
+ reStart(null);
+ }
+
+ /**
* Restart the ReplicationServer broker after a failure.
*
* @param failingSession the socket which failed
*/
private void reStart(ProtocolSession failingSession)
{
- numLostConnections++;
-
try
{
- failingSession.close();
+ if (failingSession != null)
+ {
+ failingSession.close();
+ numLostConnections++;
+ }
} catch (IOException e1)
{
// ignore
@@ -465,6 +477,16 @@
ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
}
+ if ((!connected) && (!shutdown))
+ {
+ try
+ {
+ Thread.sleep(500);
+ } catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
}
}
@@ -513,6 +535,9 @@
{
while (shutdown == false)
{
+ if (!connected)
+ reStart();
+
ProtocolSession failingSession = session;
try
{
@@ -575,26 +600,6 @@
}
/**
- * restart the server after a suspension.
- * @throws Exception in case of errors.
- */
- public void restartReceive() throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- /**
- * Suspend message reception.
- * @throws Exception in case of errors.
- */
- public void suspendReceive() throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- /**
* Set a timeout value.
* With this option set to a non-zero value, calls to the receive() method
* block for only this amount of time after which a
--
Gitblit v1.10.0