From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 113 ++++++++++++++++++++++++++++++++++++++++++++++----------
1 files changed, 92 insertions(+), 21 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index ed6270d..6733366 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,8 +25,7 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -93,6 +92,7 @@
private int maxRcvWindow;
private int timeout = 0;
private short protocolVersion;
+ private long generationId = -1;
private ReplSessionSecurity replSessionSecurity;
/**
@@ -143,12 +143,15 @@
* @param window The size of the send and receive window to use.
* @param heartbeatInterval The interval between heartbeats requested of the
* replicationServer, or zero if no heartbeats are requested.
+ *
+ * @param generationId The generationId for the server associated to the
+ * provided serverID and for the domain associated to the provided baseDN.
* @param replSessionSecurity The session security configuration.
*/
public ReplicationBroker(ServerState state, DN baseDn, short serverID,
int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
int maxSendDelay, int window, long heartbeatInterval,
- ReplSessionSecurity replSessionSecurity)
+ long generationId, ReplSessionSecurity replSessionSecurity)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -164,6 +167,7 @@
this.halfRcvWindow = window/2;
this.heartbeatInterval = heartbeatInterval;
this.protocolVersion = ProtocolVersion.currentVersion();
+ this.generationId = generationId;
this.replSessionSecurity = replSessionSecurity;
}
@@ -198,7 +202,7 @@
*/
private void connect()
{
- ReplServerStartMessage startMsg;
+ ReplServerStartMessage replServerStartMsg;
// Stop any existing heartbeat monitor from a previous session.
if (heartbeatMonitor != null)
@@ -207,8 +211,24 @@
heartbeatMonitor = null;
}
+ // checkState is true for the first loop on all replication servers
+ // looking for one already up-to-date.
+ // If we found some responding replication servers but none up-to-date
+ // then we set check-state to false and do a second loop where the first
+ // found will be the one elected and then we will update this replication
+ // server.
boolean checkState = true;
boolean receivedResponse = true;
+
+ // TODO: We are doing here 2 loops opening , closing , reopening session to
+ // the same servers .. risk to have 'same server id' erros.
+ // Would be better to do only one loop, keeping the best candidate while
+ // traversing the list of replication servers to connect to.
+ if (servers.size()==1)
+ {
+ checkState = false;
+ }
+
synchronized (connectPhaseLock)
{
while ((!connected) && (!shutdown) && (receivedResponse))
@@ -240,7 +260,7 @@
ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
halfRcvWindow*2, heartbeatInterval, state,
- protocolVersion, isSslEncryption);
+ protocolVersion, generationId, isSslEncryption);
session.publish(msg);
@@ -248,7 +268,7 @@
* Read the ReplServerStartMessage that should come back.
*/
session.setSoTimeout(1000);
- startMsg = (ReplServerStartMessage) session.receive();
+ replServerStartMsg = (ReplServerStartMessage) session.receive();
receivedResponse = true;
/*
@@ -257,7 +277,7 @@
* if it is an old replication server).
*/
protocolVersion = ProtocolVersion.minWithCurrent(
- startMsg.getVersion());
+ replServerStartMsg.getVersion());
session.setSoTimeout(timeout);
if (!isSslEncryption)
@@ -276,7 +296,7 @@
* those changes and send them again to any replicationServer.
*/
ChangeNumber replServerMaxChangeNumber =
- startMsg.getServerState().getMaxChangeNumber(serverID);
+ replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
if (replServerMaxChangeNumber == null)
replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
ChangeNumber ourMaxChangeNumber =
@@ -285,7 +305,7 @@
(ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
+ maxSendWindow = replServerStartMsg.getWindowSize();
connected = true;
startHeartBeat();
break;
@@ -298,7 +318,8 @@
* of our changes, we are going to try another server
* but before log a notice message
*/
- Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server);
+ Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
+ baseDn.toNormalizedString());
logError(message);
}
else
@@ -341,7 +362,7 @@
else
{
replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
+ maxSendWindow = replServerStartMsg.getWindowSize();
connected = true;
for (FakeOperation replayOp : replayOperations)
{
@@ -371,8 +392,9 @@
}
catch (Exception e)
{
- Message message =
- ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage());
+ Message message = NOTE_EXCEPTION_STARTING_SESSION.get(
+ baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
logError(message);
}
finally
@@ -392,7 +414,9 @@
}
}
}
- }
+ } // for servers
+
+ // We have traversed all the replication servers
if ((!connected) && (checkState == true) && receivedResponse)
{
@@ -401,12 +425,16 @@
* changes that this server has already processed, start again
* the loop looking for any replicationServer.
*/
- Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get();
+ Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
+ baseDn.toNormalizedString());
logError(message);
checkState = false;
}
}
+ // We have traversed all the replication servers as many times as needed
+ // to find one if one is up and running.
+
if (connected)
{
// This server has connected correctly.
@@ -462,9 +490,9 @@
/**
* restart the ReplicationBroker.
*/
- private void reStart()
+ public void reStart()
{
- reStart(null);
+ reStart(this.session);
}
/**
@@ -472,7 +500,7 @@
*
* @param failingSession the socket which failed
*/
- private void reStart(ProtocolSession failingSession)
+ public void reStart(ProtocolSession failingSession)
{
try
{
@@ -498,7 +526,8 @@
} catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage()));
+ mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
+ baseDn.toNormalizedString(), e.getLocalizedMessage()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
@@ -533,6 +562,13 @@
// choice than to return without sending the ReplicationMessage
// and relying on the resend procedure of the connect phase to
// fix the problem when we finally connect.
+
+ if (debugEnabled())
+ {
+ debugInfo("ReplicationBroker.publish() Publishing a " +
+ " message is not possible due to existing connection error.");
+ }
+
return;
}
@@ -601,12 +637,22 @@
} catch (InterruptedException e1)
{
// ignore
+ if (debugEnabled())
+ {
+ debugInfo("ReplicationBroker.publish() " +
+ "IO exception raised : " + e.getLocalizedMessage());
+ }
}
}
}
catch (InterruptedException e)
{
// just loop.
+ if (debugEnabled())
+ {
+ debugInfo("ReplicationBroker.publish() " +
+ "Interrupted exception raised." + e.getLocalizedMessage());
+ }
}
}
}
@@ -628,7 +674,7 @@
{
if (!connected)
{
- reStart();
+ reStart(null);
}
ProtocolSession failingSession = session;
@@ -663,6 +709,9 @@
Message message =
NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
logError(message);
+
+ debugInfo("ReplicationBroker.receive() " + baseDn +
+ " Exception raised." + e + e.getLocalizedMessage());
this.reStart(failingSession);
}
}
@@ -683,7 +732,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("ReplicationBroker Stop Closing session");
+ debugInfo("ReplicationBroker is stopping. and will" +
+ "close the connection");
}
if (session != null)
@@ -713,6 +763,20 @@
}
/**
+ * Set the value of the generationId for that broker. Normally the
+ * generationId is set through the constructor but there are cases
+ * where the value of the generationId must be changed while the broker
+ * already exist for example after an on-line import.
+ *
+ * @param generationId The value of the generationId.
+ *
+ */
+ public void setGenerationId(long generationId)
+ {
+ this.generationId = generationId;
+ }
+
+ /**
* Get the name of the replicationServer to which this broker is currently
* connected.
*
@@ -857,6 +921,13 @@
return !connectionError;
}
+ private boolean debugEnabled() { return true; }
+ private static final void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ TRACER.debugInfo(s);
+ }
+
/**
* Determine whether the connection to the replication server is encrypted.
* @return true if the connection is encrypted, false otherwise.
--
Gitblit v1.10.0