From 4fe72a4bef946169b0f50bc05bd9dc3b4b1131d3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 14 Aug 2009 12:37:19 +0000
Subject: [PATCH] Support for External change log compatible with draft-good-ldap-changelog-04.txt , March 2003
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 401 ++++++++++++++++++++++++++++++++++++++++++++++-----------
1 files changed, 322 insertions(+), 79 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3bae821..4f844d4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -63,6 +63,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.MonitorMsg;
@@ -75,9 +76,11 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
+import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
+
import com.sleepycat.je.DatabaseException;
/**
@@ -173,6 +176,8 @@
// every n number of treated assured messages
private int assuredTimeoutTimerPurgeCounter = 0;
+ ServerState ctHeartbeatState = null;
+
/**
* Creates a new ReplicationServerDomain associated to the DN baseDn.
*
@@ -360,8 +365,7 @@
if ( (generationId>0) && (generationId != handler.getGenerationId()) )
{
if (debugEnabled())
- TRACER.debugInfo("In RS " +
- replicationServer.getServerId() +
+ TRACER.debugInfo("In " + this.getName() +
" for dn " + baseDn + ", update " +
update.getChangeNumber().toString() +
" will not be sent to replication server " +
@@ -426,8 +430,7 @@
if (debugEnabled())
{
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
- TRACER.debugInfo("In RS " +
- replicationServer.getServerId() +
+ TRACER.debugInfo("In " + this +
" for dn " + baseDn + ", update " +
update.getChangeNumber().toString() +
" will not be sent to directory server " +
@@ -1024,10 +1027,9 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS " + this.replicationServer.getMonitorInstanceName() +
- " domain=" + this +
- " stopServer(SH)" + handler.getMonitorInstanceName() +
- " " + stackTraceToSingleLineString(new Exception()));
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " domain=" + this + " stopServer() on the server handler " +
+ handler.getMonitorInstanceName());
/*
* We must prevent deadlock on replication server domain lock, when for
* instance this code is called from dying ServerReader but also dying
@@ -1119,10 +1121,9 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS " + this.replicationServer.getMonitorInstanceName() +
- " domain=" + this +
- " stopServer(MH)" + handler.getMonitorInstanceName() +
- " " + stackTraceToSingleLineString(new Exception()));
+ "In " + this.replicationServer.getMonitorInstanceName()
+ + " domain=" + this + " stopServer() on the message handler "
+ + handler.getMonitorInstanceName());
/*
* We must prevent deadlock on replication server domain lock, when for
* instance this code is called from dying ServerReader but also dying
@@ -1363,7 +1364,40 @@
try
{
- return handler.generateIterator(changeNumber);
+ ReplicationIterator it = handler.generateIterator(changeNumber);
+ if (it.next()==false)
+ {
+ it.releaseCursor();
+ throw new Exception("no new change");
+ }
+ return it;
+ } catch (Exception e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Creates and returns an iterator.
+ * When the iterator is not used anymore, the caller MUST call the
+ * ReplicationIterator.releaseCursor() method to free the resources
+ * and locks used by the ReplicationIterator.
+ *
+ * @param serverId Identifier of the server for which the iterator is created.
+ * @param changeNumber Starting point for the iterator.
+ * @return the created ReplicationIterator. Null when no DB is available
+ * for the provided server Id.
+ */
+ public ReplicationIterator getIterator(short serverId,
+ ChangeNumber changeNumber)
+ {
+ DbHandler handler = sourceDbHandlers.get(serverId);
+ if (handler == null)
+ return null;
+ try
+ {
+ ReplicationIterator it = handler.generateIterator(changeNumber);
+ return it;
} catch (Exception e)
{
return null;
@@ -1955,12 +1989,10 @@
ResetGenerationIdMsg genIdMsg)
{
if (debugEnabled())
- {
TRACER.debugInfo(
- "In RS " + getReplicationServer().getServerId() +
+ "In " + this +
" Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() +
" for baseDn " + baseDn + ":\n" + genIdMsg);
- }
try
{
@@ -1982,14 +2014,12 @@
{
// Order to take a gen id we already have, just ignore
if (debugEnabled())
- {
TRACER.debugInfo(
- "In RS " + getReplicationServer().getServerId()
+ "In " + this
+ " Reset generation id requested for baseDn " + baseDn
+ " but generation id was already " + this.generationId
+ ":\n" + genIdMsg);
}
- }
// If we are the first replication server warned,
// then forwards the reset message to the remote replication servers
@@ -2002,7 +2032,7 @@
rsHandler.setGenerationId(newGenId);
if (senderHandler.isDataServer())
{
- rsHandler.forwardGenerationIdToRS(genIdMsg);
+ rsHandler.send(genIdMsg);
}
} catch (IOException e)
{
@@ -2158,7 +2188,7 @@
}
/**
- * Clears the Db associated with that cache.
+ * Clears the Db associated with that domain.
*/
public void clearDbs()
{
@@ -2181,12 +2211,6 @@
}
}
stopDbHandlers();
-
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDN=" + baseDn +
- " The source db handler has been cleared");
}
try
{
@@ -2471,11 +2495,6 @@
*/
public void receivesMonitorDataResponse(MonitorMsg msg)
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- "Receiving " + msg + " from " + msg.getsenderID());
-
try
{
synchronized (monitorDataLock)
@@ -2543,7 +2562,7 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
+ "In " + this +
" baseDn=" + baseDn +
" Processed msg from " + msg.getsenderID() +
" New monitor data: " + wrkMonitorData.toString());
@@ -2819,24 +2838,29 @@
* Return the state that contain for each server the time of eligibility.
* @return the state.
*/
- public ServerState getHeartbeatState()
+ public ServerState getChangeTimeHeartbeatState()
{
- // TODO:ECL Eligility must be supported
- return this.getDbServerState();
+ if (ctHeartbeatState == null)
+ {
+ ctHeartbeatState = this.getDbServerState().duplicate();
+ }
+ return ctHeartbeatState;
}
+
/**
+ * TODO: code cleaning - remove this method.
* Computes the change number eligible to the ECL.
* @return null if the domain does not play in eligibility.
*/
- public ChangeNumber computeEligibleCN()
+ public ChangeNumber computeEligibleCN2()
{
- ChangeNumber elligibleCN = null;
- ServerState heartbeatState = getHeartbeatState();
+ ChangeNumber eligibleCN = null;
+ ServerState heartbeatState = getChangeTimeHeartbeatState();
if (heartbeatState==null)
return null;
- // compute elligible CN
+ // compute eligible CN
ServerState hbState = heartbeatState.duplicate();
Iterator<Short> it = hbState.iterator();
@@ -2850,70 +2874,87 @@
if (TimeThread.getTime()-storedCN.getTime()>2000)
{
if (debugEnabled())
- TRACER.debugInfo(
- "For RSD." + this.baseDn + " Server " + sid
+ TRACER.debugInfo("In " + this.getName() +
+ " Server " + sid
+ " is not considered for eligibility ... potentially down");
continue;
}
- if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
+ if ((eligibleCN == null) || (storedCN.older(eligibleCN)))
{
- elligibleCN = storedCN;
+ eligibleCN = storedCN;
}
}
if (debugEnabled())
- TRACER.debugInfo(
- "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN);
- return elligibleCN;
+ TRACER.debugInfo("In " + this.getName() +
+ " computeEligibleCN() returns " + eligibleCN);
+ return eligibleCN;
}
/**
- * Computes the eligible server state by minimizing the dbServerState and the
- * elligibleCN.
+ * Computes the eligible server state for the domain.
+ * Consists in taking the most recent change from the dbServerState and the
+ * eligibleCN.
+ * @param eligibleCN The provided eligibleCN.
* @return The computed eligible server state.
*/
- public ServerState getCLElligibleState()
+ public ServerState getEligibleState(ChangeNumber eligibleCN)
{
- // ChangeNumber elligibleCN = computeEligibleCN();
- ServerState res = new ServerState();
- ServerState dbState = this.getDbServerState();
- res = dbState;
+ ServerState result = new ServerState();
- /* TODO:ECL Eligibility is not yet implemented
- Iterator<Short> it = dbState.iterator();
- while (it.hasNext())
+ ServerState dbState = this.getDbServerState();
+
+ result = dbState.duplicate();
+
+ if (eligibleCN != null)
{
- Short sid = it.next();
- DbHandler h = sourceDbHandlers.get(sid);
- ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
- try
+ Iterator<Short> it = dbState.iterator();
+ while (it.hasNext())
{
- if ((elligibleCN!=null)&&(elligibleCN.older(dbCN)))
+ Short sid = it.next();
+ DbHandler h = sourceDbHandlers.get(sid);
+ ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
+ try
{
- // some CN exist in the db newer than elligible CN
- ReplicationIterator ri = h.generateIterator(elligibleCN);
- ChangeNumber newCN = ri.getCurrentCN();
- res.update(newCN);
- ri.releaseCursor();
+ if (eligibleCN.older(dbCN))
+ {
+ // some CN exist in the db newer than eligible CN
+ // let's get it
+ ReplicationIterator ri = h.generateIterator(eligibleCN);
+ try
+ {
+ if ((ri != null) && (ri.getChange()!=null))
+ {
+ ChangeNumber newCN = ri.getChange().getChangeNumber();
+ result.update(newCN);
+ }
+ }
+ finally
+ {
+ ri.releaseCursor();
+ ri = null;
+ }
+ }
+ else
+ {
+ // no CN exist in the db newer than elligible CN
+ result.update(dbCN);
+ }
}
- else
+ catch(Exception e)
{
- // no CN exist in the db newer than elligible CN
- res.update(dbCN);
+ Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
+ " " + stackTraceToSingleLineString(e));
+ logError(errMessage);
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
}
- catch(Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
}
- */
-
if (debugEnabled())
- TRACER.debugInfo("In " + this.getName()
- + " getCLElligibleState returns:" + res);
- return res;
+ TRACER.debugInfo("In " + this
+ + " getEligibleState() result is " + result);
+ return result;
}
/**
@@ -2930,4 +2971,206 @@
}
return domainStartState;
}
+
+ /**
+ * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
+ * state.
+ * For each DS, take the oldest CN from the changetime hearbeat state
+ * and from the changelog db last CN. Can be null.
+ * @return the eligible CN.
+ */
+ public ChangeNumber getEligibleCN()
+ {
+ ChangeNumber eligibleCN = null;
+
+ for (DbHandler db : sourceDbHandlers.values())
+ {
+ // Consider this producer (DS/db).
+ short sid = db.getServerId();
+
+ ChangeNumber changelogLastCN = db.getLastChange();
+ if (changelogLastCN != null)
+ {
+ if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN)))
+ {
+ eligibleCN = changelogLastCN;
+ }
+ }
+
+ ChangeNumber heartbeatLastDN =
+ getChangeTimeHeartbeatState().getMaxChangeNumber(sid);
+
+ if ((heartbeatLastDN != null) &&
+ ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN))))
+ {
+ eligibleCN = heartbeatLastDN;
+ }
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.getName() + " getEligibleCN() returns result ="
+ + eligibleCN);
+ return eligibleCN;
+ }
+
+ /**
+ * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
+ * value received, and forwarding the message to the other RSes.
+ * @param senderHandler The handler for the server that sent the heartbeat.
+ * @param msg The message to process.
+ */
+ public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
+ ChangeTimeHeartbeatMsg msg )
+ {
+ try
+ {
+ // Acquire lock on domain (see more details in comment of start() method
+ // of ServerHandler)
+ lock();
+ } catch (InterruptedException ex)
+ {
+ // Try doing job anyway...
+ }
+
+ try
+ {
+ storeReceivedCTHeartbeat(msg.getChangeNumber());
+
+ // If we are the first replication server warned,
+ // then forwards the reset message to the remote replication servers
+ for (ReplicationServerHandler rsHandler : replicationServers.values())
+ {
+ try
+ {
+ // After we'll have sent the message , the remote RS will adopt
+ // the new genId
+ if (senderHandler.isDataServer())
+ {
+ rsHandler.send(msg);
+ }
+ } catch (IOException e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName()));
+ stopServer(rsHandler);
+ }
+ }
+ }
+ finally
+ {
+ release();
+ }
+ }
+
+ /**
+ * Store a change time value received from a data server.
+ * @param cn The provided change time.
+ */
+ public void storeReceivedCTHeartbeat(ChangeNumber cn)
+ {
+ // TODO:May be we can spare processing by only storing CN (timestamp)
+ // instead of a server state.
+ getChangeTimeHeartbeatState().update(cn);
+
+ /*
+ if (debugEnabled())
+ {
+ Set<String> ss = ctHeartbeatState.toStringSet();
+ String dss = "";
+ for (String s : ss)
+ {
+ dss = dss + " \\ " + s;
+ }
+ TRACER.debugInfo("In " + this.getName() + " " + dss);
+ }
+ */
+ }
+
+ /**
+ * This methods count the changes, server by server :
+ * - from a start point (cn taken from the provided startState)
+ * - to an end point (the provided endCN).
+ * @param startState The provided start server state.
+ * @param endCN The provided end change number.
+ * @return The number of changes between startState and endCN.
+ */
+ public long getEligibleCount(ServerState startState, ChangeNumber endCN)
+ {
+ long res = 0;
+ ReplicationIterator ri=null;
+
+ // Parses the dbState of the domain , server by server
+ ServerState dbState = this.getDbServerState();
+ Iterator<Short> it = dbState.iterator();
+ while (it.hasNext())
+ {
+ // for each server
+ Short sid = it.next();
+ DbHandler h = sourceDbHandlers.get(sid);
+
+ try
+ {
+ // Set on the change related to the startState
+ ChangeNumber startCN = null;
+ try
+ {
+ ri = h.generateIterator(startState.getMaxChangeNumber(sid));
+ startCN = ri.getChange().getChangeNumber();
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ // no change found (purge from CL)
+ startCN = null;
+ }
+ finally
+ {
+ if (ri!=null)
+ {
+ ri.releaseCursor();
+ ri = null;
+ }
+ }
+
+ if (startCN != null)
+ {
+ // Set on the change related to the endCN
+ ChangeNumber upperCN;
+ try
+ {
+ // Build a changenumber for this very server, with the timestamp
+ // of the endCN
+ ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
+ ri = h.generateIterator(f);
+ upperCN = ri.getChange().getChangeNumber();
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ // no new change
+ upperCN = h.getLastChange();
+ }
+ finally
+ {
+ if (ri!=null)
+ {
+ ri.releaseCursor();
+ ri = null;
+ }
+ }
+
+ long diff = upperCN.getSeqnum() - startCN.getSeqnum() + 1;
+
+ res += diff;
+ }
+ // TODO:ECL We should compute if changenumber.seqnum has turned !
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ return res;
+ }
}
--
Gitblit v1.10.0