From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/server/ReplicationCache.java | 346 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 326 insertions(+), 20 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index d618cf8..990eb1f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -28,6 +28,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -47,6 +49,7 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
import com.sleepycat.je.DatabaseException;
@@ -106,6 +109,15 @@
new ConcurrentHashMap<Short, DbHandler>();
private ReplicationServer replicationServer;
+ /* GenerationId management */
+ private long generationId = -1;
+ private boolean generationIdSavedStatus = false;
+
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
/**
* Creates a new ReplicationCache associated to the DN baseDn.
*
@@ -117,7 +129,13 @@
{
this.baseDn = baseDn;
this.replicationServer = replicationServer;
- }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Created Cache for " + baseDn + " " +
+ stackTraceToSingleLineString(new Exception()));
+}
/**
* Add an update that has been received to the list of
@@ -138,6 +156,7 @@
* other replication server before pushing it to the LDAP servers
*/
+ short id = update.getChangeNumber().getServerId();
sourceHandler.updateServerState(update);
sourceHandler.incrementInCount();
@@ -158,19 +177,21 @@
}
}
- // look for the dbHandler that is responsible for the master server which
+ // look for the dbHandler that is responsible for the LDAP server which
// generated the change.
DbHandler dbHandler = null;
synchronized (sourceDbHandlers)
{
- short id = update.getChangeNumber().getServerId();
dbHandler = sourceDbHandlers.get(id);
if (dbHandler == null)
{
try
{
- dbHandler = replicationServer.newDbHandler(id, baseDn);
- } catch (DatabaseException e)
+ dbHandler = replicationServer.newDbHandler(id,
+ baseDn, generationId);
+ generationIdSavedStatus = true;
+ }
+ catch (DatabaseException e)
{
/*
* Because of database problem we can't save any more changes
@@ -250,6 +271,15 @@
}
connectedServers.put(handler.getServerId(), handler);
+ // It can be that the server that connects here is the
+ // first server connected for a domain.
+ // In that case, we will establish the appriopriate connections
+ // to the other repl servers for this domain and receive
+ // their ReplServerInfo messages.
+ // FIXME: Is it necessary to end this above processing BEFORE listening
+ // to incoming messages for that domain ? But the replica
+ // would raise Read Timeout for replica that connects.
+
// Update the remote replication servers with our list
// of connected LDAP servers
sendReplServerInfo();
@@ -265,17 +295,90 @@
*/
public void stopServer(ServerHandler handler)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " stopServer " + handler.getMonitorInstanceName());
+
handler.stopHandler();
if (handler.isReplicationServer())
+ {
replicationServers.remove(handler.getServerId());
+ }
else
{
connectedServers.remove(handler.getServerId());
+ }
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendReplServerInfo();
+ mayResetGenerationId();
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
+ }
+
+ /**
+ * Resets the generationId for this domain if there is no LDAP
+ * server currently connected and if the generationId has never
+ * been saved.
+ */
+ protected void mayResetGenerationId()
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " mayResetGenerationId generationIdSavedStatus=" +
+ generationIdSavedStatus);
+
+ // If there is no more any LDAP server connected to this domain in the
+ // topology and the generationId has never been saved, then we can reset
+ // it and the next LDAP server to connect will become the new reference.
+ boolean lDAPServersConnectedInTheTopology = false;
+ if (connectedServers.isEmpty())
+ {
+ for (ServerHandler rsh : replicationServers.values())
+ {
+ if (generationId != rsh.getGenerationId())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
+ " thas different genId");
+ }
+ else
+ {
+ if (!rsh.getRemoteLDAPServers().isEmpty())
+ {
+ lDAPServersConnectedInTheTopology = true;
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
+ " has servers connected to it - will not reset generationId");
+ }
+ }
+ }
+ }
+ else
+ {
+ lDAPServersConnectedInTheTopology = true;
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " for " + baseDn + " " +
+ " has servers connected to it - will not reset generationId");
+ }
+
+ if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus))
+ {
+ setGenerationId(-1, false);
}
}
@@ -321,7 +424,7 @@
// Update this server with the list of LDAP servers
// already connected
handler.sendInfo(
- new ReplServerInfoMessage(getConnectedLDAPservers()));
+ new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
return true;
}
@@ -437,17 +540,20 @@
}
/**
- * creates a new ReplicationDB with specified identifier.
- * @param id the identifier of the new ReplicationDB.
- * @param db the new db.
+ * Sets the provided DbHandler associated to the provided serverId.
+ *
+ * @param serverId the serverId for the server to which is
+ * associated the Dbhandler.
+ * @param dbHandler the dbHandler associated to the serverId.
*
* @throws DatabaseException If a database error happened.
*/
- public void newDb(short id, DbHandler db) throws DatabaseException
+ public void setDbHandler(short serverId, DbHandler dbHandler)
+ throws DatabaseException
{
synchronized (sourceDbHandlers)
{
- sourceDbHandlers.put(id , db);
+ sourceDbHandlers.put(serverId , dbHandler);
}
}
@@ -557,7 +663,8 @@
}
/**
- * Process an InitializeRequestMessage.
+ * Processes a message coming from one server in the topology
+ * and potentially forwards it to one or all other servers.
*
* @param msg The message received and to be processed.
* @param senderHandler The server handler of the server that emitted
@@ -565,6 +672,23 @@
*/
public void process(RoutableMessage msg, ServerHandler senderHandler)
{
+ // A replication server is not expected to be the destination
+ // of a routable message except for an error message.
+ if (msg.getDestination() == this.replicationServer.getServerId())
+ {
+ if (msg instanceof ErrorMessage)
+ {
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+ }
+ else
+ {
+ logError(NOTE_ERR_ROUTING_TO_SERVER.get(
+ msg.getClass().getCanonicalName()));
+ }
+ return;
+ }
List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
@@ -572,9 +696,13 @@
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
- mb.append("serverID:" + msg.getDestination());
+ mb.append(" unreachable server ID=" + msg.getDestination());
+ mb.append(" unroutable message =" + msg);
ErrorMessage errMsg = new ErrorMessage(
- msg.getsenderID(), mb.toMessage());
+ this.replicationServer.getServerId(),
+ msg.getsenderID(),
+ mb.toMessage());
+
try
{
senderHandler.send(errMsg);
@@ -583,8 +711,8 @@
{
// TODO Handle error properly (sender timeout in addition)
/*
- * An error happened trying the send back an error to this server.
- * Log an error and close the connection to the sender server.
+ * An error happened trying to send an error msg to this server.
+ * Log an error and close the connection to this server.
*/
MessageBuilder mb2 = new MessageBuilder();
mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -788,7 +916,7 @@
private void sendReplServerInfo()
{
ReplServerInfoMessage info =
- new ReplServerInfoMessage(getConnectedLDAPservers());
+ new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
for (ServerHandler handler : replicationServers.values())
{
try
@@ -811,6 +939,26 @@
}
/**
+ * Get the generationId associated to this domain.
+ *
+ * @return The generationId
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * Get the generationId saved status.
+ *
+ * @return The generationId saved status.
+ */
+ public boolean getGenerationIdSavedStatus()
+ {
+ return generationIdSavedStatus;
+ }
+
+ /**
* Sets the replication server informations for the provided
* handler from the provided ReplServerInfoMessage.
*
@@ -822,4 +970,162 @@
{
handler.setReplServerInfo(infoMsg);
}
+
+ /**
+ * Sets the provided value as the new in memory generationId.
+ *
+ * @param generationId The new value of generationId.
+ * @param savedStatus The saved status of the generationId.
+ */
+ synchronized public void setGenerationId(long generationId,
+ boolean savedStatus)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " RCache.set GenerationId=" + generationId);
+
+ if (generationId == this.generationId)
+ return;
+
+ if (this.generationId>0)
+ {
+ for (ServerHandler handler : connectedServers.values())
+ {
+ handler.resetGenerationId();
+ }
+ }
+
+ this.generationId = generationId;
+ this.generationIdSavedStatus = savedStatus;
+
+ }
+
+ /**
+ * Resets the generationID.
+ *
+ * @param senderHandler The handler associated to the server
+ * that requested to reset the generationId.
+ */
+ public void resetGenerationId(ServerHandler senderHandler)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " RCache.resetGenerationId");
+
+ // Notifies the others LDAP servers that from now on
+ // they have the bad generationId
+ for (ServerHandler handler : connectedServers.values())
+ {
+ handler.resetGenerationId();
+ }
+
+ // Propagates the reset message to the others replication servers
+ // dealing with the same domain.
+ if (senderHandler.isLDAPserver())
+ {
+ for (ServerHandler handler : replicationServers.values())
+ {
+ try
+ {
+ handler.sendGenerationId(new ResetGenerationId());
+ }
+ catch (IOException e)
+ {
+ logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
+ get(handler.getMonitorInstanceName()));
+ }
+ }
+ }
+
+ // Reset the localchange and state db for the current domain
+ synchronized (sourceDbHandlers)
+ {
+ for (DbHandler dbHandler : sourceDbHandlers.values())
+ {
+ try
+ {
+ dbHandler.clear();
+ }
+ catch (Exception e)
+ {
+ // TODO: i18n
+ logError(Message.raw(
+ "Exception caught while clearing dbHandler:" +
+ e.getLocalizedMessage()));
+ }
+ }
+ sourceDbHandlers.clear();
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " The source db handler has been cleared");
+ }
+ try
+ {
+ replicationServer.clearGenerationId(baseDn);
+ }
+ catch (Exception e)
+ {
+ // TODO: i18n
+ logError(Message.raw(
+ "Exception caught while clearing generationId:" +
+ e.getLocalizedMessage()));
+ }
+
+ // Reset the in memory domain generationId
+ generationId = -1;
+ }
+
+ /**
+ * Returns whether the provided server is in degraded
+ * state due to the fact that the peer server has an invalid
+ * generationId for this domain.
+ *
+ * @param serverId The serverId for which we want to know the
+ * the state.
+ * @return Whether it is degraded or not.
+ */
+
+ public boolean isDegradedDueToGenerationId(short serverId)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " isDegraded serverId=" + serverId +
+ " given local generation Id=" + this.generationId);
+
+ ServerHandler handler = replicationServers.get(serverId);
+ if (handler == null)
+ {
+ handler = connectedServers.get(serverId);
+ if (handler == null)
+ {
+ return false;
+ }
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ " Compute degradation of serverId=" + serverId +
+ " LS server generation Id=" + handler.getGenerationId());
+ return (handler.getGenerationId() != this.generationId);
+ }
+
+ /**
+ * Return the associated replication server.
+ * @return The replication server.
+ */
+ public ReplicationServer getReplicationServer()
+ {
+ return replicationServer;
+ }
}
--
Gitblit v1.10.0