From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 313 +++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 286 insertions(+), 27 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 858a498..cc97ccf 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -51,6 +51,8 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
+
+import org.opends.messages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -67,6 +69,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -143,6 +146,7 @@
private short replicationServerId;
private short protocolVersion;
+ private long generationId=-1;
/**
@@ -189,7 +193,7 @@
* Then create the reader and writer thread.
*
* @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
- * null if this is an incoming connection.
+ * null if this is an incoming connection (listen).
* @param replicationServerId The identifier of the replicationServer that
* creates this server handler.
* @param replicationServerURL The URL of the replicationServer that creates
@@ -206,22 +210,34 @@
int windowSize, boolean sslEncryption,
ReplicationServer replicationServer)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
+ " starts a new LS or RS " +
+ ((baseDn == null)?"incoming connection":"outgoing connection"));
+
this.replicationServerId = replicationServerId;
rcvWindowSizeHalf = windowSize/2;
maxRcvWindow = windowSize;
rcvWindow = windowSize;
+ long localGenerationId=-1;
try
{
if (baseDn != null)
{
// This is an outgoing connection. Publish our start message.
this.baseDn = baseDn;
- replicationCache = replicationServer.getReplicationCache(baseDn);
+
+ // Get or create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(baseDn, true);
+ localGenerationId = replicationCache.getGenerationId();
+
ServerState localServerState = replicationCache.getDbServerState();
ReplServerStartMessage msg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
baseDn, windowSize, localServerState,
- protocolVersion, sslEncryption);
+ protocolVersion, localGenerationId,
+ sslEncryption);
+
session.publish(msg);
}
@@ -229,9 +245,10 @@
ReplicationMessage msg = session.receive();
if (msg instanceof ServerStartMessage)
{
- // The remote server is an LDAP Server
+ // The remote server is an LDAP Server.
ServerStartMessage receivedMsg = (ServerStartMessage) msg;
+ generationId = receivedMsg.getGenerationId();
protocolVersion = ProtocolVersion.minWithCurrent(
receivedMsg.getVersion());
serverId = receivedMsg.getServerId();
@@ -281,15 +298,69 @@
serverIsLDAPserver = true;
- // This an incoming connection. Publish our start message
- replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ // Get or Create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(this.baseDn,
+ true);
+ localGenerationId = replicationCache.getGenerationId();
+
ServerState localServerState = replicationCache.getDbServerState();
+ // This an incoming connection. Publish our start message
ReplServerStartMessage myStartMsg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
this.baseDn, windowSize, localServerState,
- protocolVersion, sslEncryption);
+ protocolVersion, localGenerationId,
+ sslEncryption);
session.publish(myStartMsg);
sendWindowSize = receivedMsg.getWindowSize();
+
+ /* Until here session is encrypted then it depends on the negociation */
+ if (!sslEncryption)
+ {
+ session.stopEncryption();
+ }
+
+ if (debugEnabled())
+ {
+ Set<String> ss = this.serverState.toStringSet();
+ Set<String> lss = replicationCache.getDbServerState().toStringSet();
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ ", SH received START from LS serverId=" + serverId +
+ " baseDN=" + this.baseDn +
+ " generationId=" + generationId +
+ " localGenerationId=" + localGenerationId +
+ " state=" + ss +
+ " and sent ReplServerStart with state=" + lss);
+ }
+
+ /*
+ * If we have already a generationID set for the domain
+ * then
+ * if the connecting replica has not the same
+ * then it is degraded locally and notified by an error message
+ * else
+ * we set the generationID from the one received
+ * (unsaved yet on disk . will be set with the 1rst change received)
+ */
+ if (localGenerationId>0)
+ {
+ if (generationId != localGenerationId)
+ {
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ receivedMsg.getBaseDn().toNormalizedString(),
+ Short.toString(receivedMsg.getServerId()),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+
+ ErrorMessage errorMsg =
+ new ErrorMessage(replicationServerId, serverId, message);
+ session.publish(errorMsg);
+ }
+ }
+ else
+ {
+ replicationCache.setGenerationId(generationId, false);
+ }
}
else if (msg instanceof ReplServerStartMessage)
{
@@ -297,6 +368,7 @@
ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
protocolVersion = ProtocolVersion.minWithCurrent(
receivedMsg.getVersion());
+ generationId = receivedMsg.getGenerationId();
serverId = receivedMsg.getServerId();
serverURL = receivedMsg.getServerURL();
int separator = serverURL.lastIndexOf(':');
@@ -306,7 +378,10 @@
this.baseDn = receivedMsg.getBaseDn();
if (baseDn == null)
{
- replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ // Get or create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(this.baseDn,
+ true);
+ localGenerationId = replicationCache.getGenerationId();
ServerState serverState = replicationCache.getDbServerState();
// The session initiator decides whether to use SSL.
@@ -317,7 +392,9 @@
new ReplServerStartMessage(replicationServerId,
replicationServerURL,
this.baseDn, windowSize, serverState,
- protocolVersion, sslEncryption);
+ protocolVersion,
+ localGenerationId,
+ sslEncryption);
session.publish(outMsg);
}
else
@@ -326,6 +403,107 @@
}
this.serverState = receivedMsg.getServerState();
sendWindowSize = receivedMsg.getWindowSize();
+
+ /* Until here session is encrypted then it depends on the negociation */
+ if (!sslEncryption)
+ {
+ session.stopEncryption();
+ }
+
+ if (debugEnabled())
+ {
+ Set<String> ss = this.serverState.toStringSet();
+ Set<String> lss = replicationCache.getDbServerState().toStringSet();
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ ", SH received START from RS serverId=" + serverId +
+ " baseDN=" + this.baseDn +
+ " generationId=" + generationId +
+ " localGenerationId=" + localGenerationId +
+ " state=" + ss +
+ " and sent ReplServerStart with state=" + lss);
+ }
+
+ // if the remote RS and the local RS have the same genID
+ // then it's ok and nothing else to do
+ if (generationId == localGenerationId)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() + " RS with serverID=" + serverId +
+ " is connected with the right generation ID");
+ }
+ }
+ else
+ {
+ if (localGenerationId>0)
+ {
+ // if the local RS is initialized
+ if (generationId>0)
+ {
+ // if the remote RS is initialized
+ if (generationId != localGenerationId)
+ {
+ // if the 2 RS have different generationID
+ if (replicationCache.getGenerationIdSavedStatus())
+ {
+ // it the present RS has received changes regarding its
+ // gen ID and so won't change without a reset
+ // then we are just degrading the peer.
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ this.baseDn.toNormalizedString(),
+ Short.toString(receivedMsg.getServerId()),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+
+ ErrorMessage errorMsg =
+ new ErrorMessage(replicationServerId, serverId, message);
+ session.publish(errorMsg);
+ }
+ else
+ {
+ // The present RS has never received changes regarding its
+ // gen ID.
+ //
+ // Example case:
+ // - we are in RS1
+ // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
+ // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
+ // - we are in RS1 and we receive a START msg from RS2
+ // - Each RS keeps its genID / is degraded and when LS2 will
+ // be populated from LS1 everything will becomes ok.
+ //
+ // Issue:
+ // FIXME : Would it be a good idea in some cases to just
+ // set the gen ID received from the peer RS
+ // specially if the peer has a non nul state and
+ // we have a nul state ?
+ // replicationCache.setGenerationId(generationId, false);
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ this.baseDn.toNormalizedString(),
+ Short.toString(receivedMsg.getServerId()),
+ Long.toString(generationId),
+ Long.toString(localGenerationId));
+
+ ErrorMessage errorMsg =
+ new ErrorMessage(replicationServerId, serverId, message);
+ session.publish(errorMsg);
+ }
+ }
+ }
+ else
+ {
+ // The remote has no genId. We don't change anything for the
+ // current RS.
+ }
+ }
+ else
+ {
+ // The local RS is not initialized - take the one received
+ replicationCache.setGenerationId(generationId, false);
+ }
+ }
}
else
{
@@ -333,12 +511,9 @@
return; // we did not recognize the message, ignore it
}
- if (!sslEncryption)
- {
- session.stopEncryption();
- }
-
- replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ // Get or create the ReplicationCache
+ replicationCache = replicationServer.getReplicationCache(this.baseDn,
+ true);
boolean started;
if (serverIsLDAPserver)
@@ -352,10 +527,11 @@
if (started)
{
- writer = new ServerWriter(session, serverId, this, replicationCache);
+ // sendWindow MUST be created before starting the writer
+ sendWindow = new Semaphore(sendWindowSize);
- reader = new ServerReader(session, serverId, this,
- replicationCache);
+ writer = new ServerWriter(session, serverId, this, replicationCache);
+ reader = new ServerReader(session, serverId, this, replicationCache);
reader.start();
writer.start();
@@ -377,6 +553,12 @@
// the connection is not valid, close it.
try
{
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() + " RS failed to start locally " +
+ " the connection from serverID="+serverId);
+ }
session.close();
} catch (IOException e1)
{
@@ -388,7 +570,8 @@
{
// some problem happened, reject the connection
MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(this.toString()));
+ mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
+ this.getMonitorInstanceName()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
try
@@ -399,7 +582,6 @@
// ignore
}
}
- sendWindow = new Semaphore(sendWindowSize);
}
/**
@@ -720,6 +902,21 @@
*/
public void add(UpdateMessage update, ServerHandler sourceHandler)
{
+ /*
+ * Ignore updates from a server that is degraded due to
+ * its inconsistent generationId
+ */
+ long referenceGenerationId = replicationCache.getGenerationId();
+ if ((referenceGenerationId>0) &&
+ (referenceGenerationId != generationId))
+ {
+ logError(ERR_IGNORING_UPDATE_TO.get(
+ update.getDn(),
+ this.getMonitorInstanceName()));
+
+ return;
+ }
+
synchronized (msgQueue)
{
/*
@@ -1164,7 +1361,7 @@
if (serverIsLDAPserver)
return "Remote LDAP Server " + str;
else
- return "Remote Replication Server " + str;
+ return "Remote Repl Server " + str;
}
/**
@@ -1261,7 +1458,10 @@
attributes.add(attr);
attributes.add(new Attribute("ssl-encryption",
- String.valueOf(session.isEncrypted())));
+ String.valueOf(session.isEncrypted())));
+
+ attributes.add(new Attribute("generation-id",
+ String.valueOf(generationId)));
return attributes;
}
@@ -1385,9 +1585,10 @@
public void process(RoutableMessage msg)
{
if (debugEnabled())
- TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
- msg + " from " + serverId);
-
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " processes received msg=" + msg);
replicationCache.process(msg, this);
}
@@ -1401,6 +1602,12 @@
public void sendInfo(ReplServerInfoMessage info)
throws IOException
{
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " sends message=" + info);
+
session.publish(info);
}
@@ -1412,7 +1619,13 @@
*/
public void setReplServerInfo(ReplServerInfoMessage infoMsg)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " sets replServerInfo " + "<" + infoMsg + ">");
remoteLDAPservers = infoMsg.getConnectedServers();
+ generationId = infoMsg.getGenerationId();
}
/**
@@ -1458,8 +1671,10 @@
public void send(RoutableMessage msg) throws IOException
{
if (debugEnabled())
- TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
- msg + " to " + serverId);
+ TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ getMonitorInstanceName() +
+ " SH for remote server " + this.getMonitorInstanceName() +
+ " sends message=" + msg);
session.publish(msg);
}
@@ -1492,4 +1707,48 @@
checkWindow();
}
}
+
+ /**
+ * Returns the value of generationId for that handler.
+ * @return The value of the generationId.
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * Resets the generationId for this domain.
+ */
+ public void resetGenerationId()
+ {
+ // Notify the peer that it is now invalid regarding the generationId
+ // We are now waiting a startServer message from this server with
+ // a valid generationId.
+ try
+ {
+ Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
+ ErrorMessage errorMsg =
+ new ErrorMessage(serverId, replicationServerId, message);
+ session.publish(errorMsg);
+ }
+ catch (Exception e)
+ {
+ // FIXME Log exception when sending reset error message
+ }
+ }
+
+ /**
+ * Sends a message containing a generationId to a peer server.
+ * The peer is expected to be a replication server.
+ *
+ * @param msg The GenerationIdMessage message to be sent.
+ * @throws IOException When it occurs while sending the message,
+ *
+ */
+ public void sendGenerationId(ResetGenerationId msg)
+ throws IOException
+ {
+ session.publish(msg);
+ }
}
--
Gitblit v1.10.0