From ed847e95ab009b3f8a7b57636aa3bbe977bf875d Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 19 Oct 2009 07:56:29 +0000
Subject: [PATCH] Fix #4270 ECL Should not establish connections between RSes
---
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 14 +
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 11 +
opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java | 3
opends/src/server/org/opends/server/replication/server/ServerReader.java | 88 ++++--------
opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 10 +
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 9
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java | 35 ++--
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 37 ++--
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java | 5
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 98 +++++++++++--
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 3
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 5
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 50 +++++-
13 files changed, 232 insertions(+), 136 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 78178b7..d1531e8 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4533,8 +4533,9 @@
{
AttributeType atype = DirectoryServer.getAttributeType(name);
List<Attribute> attrs = entry.getAttribute(atype);
- for (Attribute a : attrs)
- newattrs.add(a);
+ if (attrs != null)
+ for (Attribute a : attrs)
+ newattrs.add(a);
}
((DeleteMsg)msg).setEclIncludes(newattrs);
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index c20d895..ff6ea07 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -28,6 +28,7 @@
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.io.InputStream;
@@ -103,7 +104,7 @@
if (debugEnabled())
{
TRACER.debugInfo("Closing SocketSession." +
- Thread.currentThread().getStackTrace());
+ stackTraceToSingleLineString(new Exception("Stack:")));
}
if (plainSocket != null && !plainSocket.isClosed())
{
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 40fe57d..cbc6bad 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -780,4 +780,14 @@
}
return startSessionMsg;
}
+
+ /**
+ * Process message of a remote server changing his status.
+ * @param csMsg The message containing the new status
+ */
+ public void receiveNewStatus(ChangeStatusMsg csMsg)
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.processNewStatus(this, csMsg);
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 923e1b6..d9e67d6 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -371,19 +371,17 @@
replicationServerURL,
getServiceId(),
maxRcvWindow,
- replicationServerDomain.getDbServerState(),
+ new ServerState(),
protocolVersion,
localGenerationId,
sslEncryption,
getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold(),
+ 0,
replicationServer.getWeight(),
- replicationServerDomain.getConnectedLDAPservers().size());
+ 0);
session.publish(outReplServerStartDSMsg);
-
return outReplServerStartDSMsg;
}
}
@@ -462,9 +460,11 @@
processStartFromRemote(inECLStartMsg);
// lock with timeout
- lockDomain(true);
+ if (this.replicationServerDomain != null)
+ lockDomain(true);
- this.localGenerationId = replicationServerDomain.getGenerationId();
+// this.localGenerationId = replicationServerDomain.getGenerationId();
+ this.localGenerationId = -1;
// send start to remote
StartMsg outStartMsg =
@@ -708,7 +708,7 @@
{
HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
- ReplicationServer rs = replicationServerDomain.getReplicationServer();
+ ReplicationServer rs = this.replicationServer;
// Parse the provided cookie and overwrite startState from it.
if ((providedCookie != null) && (providedCookie.length()!=0))
@@ -740,6 +740,10 @@
if (excludedServiceIDs.contains(rsd.getBaseDn()))
continue;
+ // skip unused domains
+ if (rsd.getDbServerState().isEmpty())
+ continue;
+
// Creates the new domain context
DomainContext newDomainCtxt = new DomainContext();
newDomainCtxt.active = true;
@@ -826,7 +830,8 @@
*/
private void registerIntoDomain()
{
- replicationServerDomain.registerHandler(this);
+ if (replicationServerDomain!=null)
+ replicationServerDomain.registerHandler(this);
}
/**
@@ -877,7 +882,7 @@
String str = serverURL + " " + String.valueOf(serverId);
return "Connected External Changelog Server " + str +
- ",cn=" + replicationServerDomain.getMonitorInstanceName();
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
}
/**
@@ -982,8 +987,7 @@
sendWindow = new Semaphore(sendWindowSize);
// create reader
- reader = new ServerReader(session, serverId,
- this, replicationServerDomain);
+ reader = new ServerReader(session, serverId, this);
reader.start();
if (writer == null)
@@ -1132,8 +1136,7 @@
ECLUpdateMsg oldestChange = null;
if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + "," + this +
+ TRACER.debugInfo("In cn=changelog" + this +
" getNextECLUpdate starts: " + dumpState());
try
@@ -1443,8 +1446,7 @@
// starvation of changelog messages
// all domain have been unactived means are covered
if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + "," + this + " closeInitPhase(): "
+ TRACER.debugInfo("In cn=changelog" + "," + this + " closeInitPhase(): "
+ dumpState());
// go to persistent phase if one
@@ -1503,8 +1505,7 @@
}
if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName()
+ TRACER.debugInfo("In cn=changelog"
+ "," + this + " getOldestChangeFromDomainCtxts() returns " +
((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1"));
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 2567751..31ce932 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -193,7 +193,8 @@
// Can't do much more : ignore
}
}
- replicationServerDomain.stopServer(handler);
+ if (replicationServerDomain!=null)
+ replicationServerDomain.stopServer(handler);
}
}
@@ -243,7 +244,7 @@
// session is null in pusherOnly mode
// Done is used to end phase 1
session.publish(new DoneMsg(
- replicationServerDomain.getReplicationServer().getServerId(),
+ handler.getReplicationServerId(),
handler.getServerId()), protocolVersion);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
index 6dddacd..b66e8d0 100644
--- a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
+++ b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -92,6 +92,7 @@
*/
public void close()
{
- handler.getDomain().stopServer(handler);
+ if (handler.getDomain() != null)
+ handler.getDomain().stopServer(handler);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 96cdaa6..4cb7198 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -747,7 +747,8 @@
else
{
this.serviceId = serviceId;
- this.replicationServerDomain = getDomain(true, isDataServer);
+ if (!serviceId.equalsIgnoreCase("cn=changelog"))
+ this.replicationServerDomain = getDomain(true, isDataServer);
}
}
@@ -802,4 +803,12 @@
return replicationServer.getGroupId();
}
+ /**
+ * Get the serverId of the hosting replication server.
+ * @return the replication serverId.
+ */
+ public int getReplicationServerId()
+ {
+ return this.replicationServerId;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index b24c5e5..c0c6094 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1596,11 +1596,9 @@
while (rsdi.hasNext())
{
ReplicationServerDomain domain = rsdi.next();
-
- if (excludedServiceIDs.contains(domain.getBaseDn()))
- {
+ if ((excludedServiceIDs != null) &&
+ excludedServiceIDs.contains(domain.getBaseDn()))
continue;
- }
ChangeNumber domainEligibleCN = domain.getEligibleCN();
String dates = "";
@@ -1830,6 +1828,9 @@
&& (excludedServiceIDs.contains(rsd.getBaseDn())))
continue;
+ if (rsd.getDbServerState().isEmpty())
+ continue;
+
result.update(rsd.getBaseDn(), rsd.getEligibleState(
getEligibleCN()));
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index a301fc3..3531d5a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -194,7 +194,6 @@
{
super("Replication Server " + replicationServer.getReplicationPort() + " "
+ baseDn + " " + replicationServer.getServerId());
-
this.baseDn = baseDn;
this.replicationServer = replicationServer;
this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " +
@@ -2316,7 +2315,7 @@
/*
* Store DS connected to remote RS and update information about the peer RS
*/
- handler.receiveTopoInfoFromRS(topoMsg);
+ handler.processTopoInfoFromRS(topoMsg);
/*
* Handle generation id
@@ -2904,8 +2903,19 @@
/**
* Computes the eligible server state for the domain.
- * Consists in taking the most recent change from the dbServerState and the
- * eligibleCN.
+ *
+ * s1 s2 s3
+ * -- -- --
+ * cn31
+ * cn15
+ *
+ * ----------------------------------------- eligibleCN
+ * cn14
+ * cn26
+ * cn13
+ *
+ * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
+ *
* @param eligibleCN The provided eligibleCN.
* @return The computed eligible server state.
*/
@@ -2915,6 +2925,8 @@
ServerState dbState = this.getDbServerState();
+ // The result is initialized from the dbState.
+ // From it, we don't want to kepp the changes newer than eligibleCN.
result = dbState.duplicate();
if (eligibleCN != null)
@@ -2924,32 +2936,44 @@
{
int sid = it.next();
DbHandler h = sourceDbHandlers.get(sid);
- ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
+ ChangeNumber mostRecentDbCN = dbState.getMaxChangeNumber(sid);
try
{
- if (eligibleCN.older(dbCN))
+ // Is the most recent change in the Db newer than eligible CN ?
+ // if yes (like cn15 in the example above, then we have to go back
+ // to the Db and look for the change older than eligible CN (cn14)
+ if (eligibleCN.olderOrEqual(mostRecentDbCN))
{
- // some CN exist in the db newer than eligible CN
- // let's get it
- ReplicationIterator ri = h.generateIterator(eligibleCN);
+ // let's try to seek the first change <= eligibleCN
+ ReplicationIterator ri = null;
try
{
+ ri = h.generateIterator(eligibleCN);
if ((ri != null) && (ri.getChange()!=null))
{
ChangeNumber newCN = ri.getChange().getChangeNumber();
result.update(newCN);
}
}
+ catch(Exception e)
+ {
+ // there's no change older than eligibleCN (case of s3/cn31)
+ result.update(new ChangeNumber(0,0,sid));
+ }
finally
{
- ri.releaseCursor();
- ri = null;
+ if (ri != null)
+ {
+ ri.releaseCursor();
+ ri = null;
+ }
}
}
else
{
- // no CN exist in the db newer than elligible CN
- result.update(dbCN);
+ // for this serverid, all changes in the ChangelogDb are holder
+ // than eligibleCN , the most recent in the db is our guy.
+ result.update(mostRecentDbCN);
}
}
catch(Exception e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 6ce4627..91015f3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -638,7 +638,7 @@
*
* @param topoMsg The received topology message
*/
- public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
+ public void processTopoInfoFromRS(TopologyMsg topoMsg)
{
// Store info for remote RS
List<RSInfo> rsInfos = topoMsg.getRsList();
@@ -836,4 +836,16 @@
session.publish(msg);
}
+ /**
+ * Receives a topology msg.
+ * @param topoMsg The message received.
+ * @throws DirectoryException when it occurs.
+ * @throws IOException when it occurs.
+ */
+ public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
+ throws DirectoryException, IOException
+ {
+ if (replicationServerDomain != null)
+ replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 489dbae..784f01b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -46,13 +46,15 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
@@ -297,7 +299,8 @@
// replication server domain
if (oldGenerationId != -100)
{
- replicationServerDomain.changeGenerationId(oldGenerationId, false);
+ if (replicationServerDomain!=null)
+ replicationServerDomain.changeGenerationId(oldGenerationId, false);
}
}
@@ -363,8 +366,7 @@
writer = new ServerWriter(session, serverId,
this, replicationServerDomain);
- reader = new ServerReader(session, serverId,
- this, replicationServerDomain);
+ reader = new ServerReader(session, serverId, this);
reader.start();
writer.start();
@@ -947,6 +949,20 @@
}
/**
+ * Processes a change time heartbeat msg.
+ *
+ * @param msg The message to be processed.
+ */
+ public void process(ChangeTimeHeartbeatMsg msg)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + this +
+ " processes received msg:\n" + msg);
+ replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
+ }
+
+ /**
* Process the reception of a WindowProbeMsg message.
*
* @param windowProbeMsg The message to process.
@@ -1231,8 +1247,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
"\nAND REPLIED:\n" + outStartMsg.toString());
@@ -1251,8 +1266,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH START HANDSHAKE SENT("+ this +
"):\n" + outStartMsg.toString()+
@@ -1272,8 +1286,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
"\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1292,8 +1305,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + ":" +
"\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
"\nAND RECEIVED:\n" + inTopoMsg.toString());
@@ -1312,8 +1324,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
"\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
"\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1328,8 +1339,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
"\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
}
@@ -1345,11 +1355,63 @@
if (debugEnabled())
{
TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + ", " +
+ this.replicationServer.getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
"\nSH SESSION HANDSHAKE RECEIVED:\n" +
inStartECLSessionMsg.toString());
}
}
+
+ /**
+ * Process a Ack message received.
+ * @param ack the message received.
+ */
+ public void processAck(AckMsg ack)
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.processAck(ack, this);
+ }
+
+ /**
+ * Get the reference generation id (associated with the changes in the db).
+ * @return the reference generation id.
+ */
+ public long getReferenceGenId()
+ {
+ long refgenid = -1;
+ if (replicationServerDomain!=null)
+ refgenid = replicationServerDomain.getGenerationId();
+ return refgenid;
+ }
+
+ /**
+ * Process a ResetGenerationIdMsg message received.
+ * @param msg the message received.
+ */
+ public void processResetGenId(ResetGenerationIdMsg msg)
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.resetGenerationId(this, msg);
+ }
+
+ /**
+ * Put a new update message received.
+ * @param update the update message received.
+ * @throws IOException when it occurs.
+ */
+ public void put(UpdateMsg update)
+ throws IOException
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.put(update, this);
+ }
+
+ /**
+ * Stop this handler.
+ */
+ public void doStop()
+ {
+ if (replicationServerDomain!=null)
+ replicationServerDomain.stopServer(this);
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 4b5037d..934fe4d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -60,7 +60,6 @@
private int serverId;
private ProtocolSession session;
private ServerHandler handler;
- private ReplicationServerDomain replicationServerDomain;
/**
* Constructor for the LDAP server reader part of the replicationServer.
@@ -68,20 +67,15 @@
* @param session The ProtocolSession from which to read the data.
* @param serverId The server ID of the server from which we read messages.
* @param handler The server handler for this server reader.
- * @param replicationServerDomain The ReplicationServerDomain for this server
- * reader.
*/
public ServerReader(ProtocolSession session, int serverId,
- ServerHandler handler,
- ReplicationServerDomain replicationServerDomain)
+ ServerHandler handler)
{
- super("Replication Reader Thread for handler of " +
- handler.toString() +
- " in " + replicationServerDomain);
+ super("Replication Reader Thread for RS handler " +
+ handler.getMonitorInstanceName());
this.session = session;
this.serverId = serverId;
this.handler = handler;
- this.replicationServerDomain = replicationServerDomain;
}
/**
@@ -109,15 +103,14 @@
if (debugEnabled())
{
- TRACER.debugInfo("In " + replicationServerDomain + " " +
- getName() + " receives " + msg);
+ TRACER.debugInfo("In " + getName() + " receives " + msg);
}
if (msg instanceof AckMsg)
{
AckMsg ack = (AckMsg) msg;
handler.checkWindow();
- replicationServerDomain.processAck(ack, handler);
+ handler.processAck(ack);
} else if (msg instanceof UpdateMsg)
{
boolean filtered = false;
@@ -141,22 +134,19 @@
if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
(dsStatus == ServerStatus.FULL_UPDATE_STATUS))
{
- long referenceGenerationId =
- replicationServerDomain.getGenerationId();
+ long referenceGenerationId = handler.getReferenceGenId();
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()),
- replicationServerDomain.getBaseDn(),
+ Integer.toString(handler.getReplicationServerId()),
+ handler.getServiceId(),
((UpdateMsg) msg).getChangeNumber().toString(),
Integer.toString(handler.getServerId()),
Long.toString(referenceGenerationId),
Long.toString(handler.getGenerationId())));
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()),
- replicationServerDomain.getBaseDn(),
+ Integer.toString(handler.getReplicationServerId()),
+ handler.getServiceId(),
((UpdateMsg) msg).getChangeNumber().toString(),
Integer.toString(handler.getServerId())));
filtered = true;
@@ -167,17 +157,15 @@
* Ignore updates from RS with bad gen id
* (no system managed status for a RS)
*/
- long referenceGenerationId =
- replicationServerDomain.getGenerationId();
+ long referenceGenerationId =handler.getReferenceGenId();
if ((referenceGenerationId > 0) &&
(referenceGenerationId != handler.getGenerationId()))
{
logError(
ERR_IGNORING_UPDATE_FROM_RS.get(
Integer.toString(
- replicationServerDomain.getReplicationServer().
- getServerId()),
- replicationServerDomain.getBaseDn(),
+ handler.getReplicationServerId()),
+ handler.getServiceId(),
((UpdateMsg) msg).getChangeNumber().toString(),
Integer.toString(handler.getServerId()),
Long.toString(referenceGenerationId),
@@ -190,7 +178,7 @@
{
UpdateMsg update = (UpdateMsg) msg;
handler.decAndCheckWindow();
- replicationServerDomain.put(update, handler);
+ handler.put(update);
}
} else if (msg instanceof WindowMsg)
{
@@ -220,7 +208,7 @@
} else if (msg instanceof ResetGenerationIdMsg)
{
ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
- replicationServerDomain.resetGenerationId(handler, genIdMsg);
+ handler.processResetGenId(genIdMsg);
} else if (msg instanceof WindowProbeMsg)
{
WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
@@ -231,8 +219,7 @@
try
{
ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
- replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
- rsh, true);
+ rsh.receiveTopoInfoFromRS(topoMsg);
}
catch(Exception e)
{
@@ -247,13 +234,13 @@
try
{
DataServerHandler dsh = (DataServerHandler)handler;
- replicationServerDomain.processNewStatus(dsh, csMsg);
+ dsh.receiveNewStatus(csMsg);
}
catch(Exception e)
{
errMessage =
ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
- replicationServerDomain.getBaseDn(),
+ handler.getServiceId(),
Integer.toString(handler.getServerId()),
csMsg.toString());
logError(errMessage);
@@ -270,8 +257,7 @@
} else if (msg instanceof ChangeTimeHeartbeatMsg)
{
ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
- replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
- cthbMsg);
+ handler.process(cthbMsg);
} else if (msg instanceof StopMsg)
{
// Peer server is properly disconnecting: go out of here to
@@ -280,8 +266,7 @@
{
TRACER.debugInfo(handler.toString() + " has properly " +
"disconnected from this replication server " +
- Integer.toString(replicationServerDomain.getReplicationServer().
- getServerId()));
+ Integer.toString(handler.getReplicationServerId()));
}
return;
} else if (msg == null)
@@ -300,9 +285,8 @@
// we just trash the message and log the event for debug purpose,
// then continue receiving messages.
if (debugEnabled())
- TRACER.debugInfo("In " + replicationServerDomain.
- getReplicationServer().
- getMonitorInstanceName() + ":" + e.getMessage());
+ TRACER.debugInfo(
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
}
}
}
@@ -315,24 +299,16 @@
*/
if (debugEnabled())
TRACER.debugInfo(
- "In RS " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " reader IO EXCEPTION for serverID=" + serverId + " " +
- this + " " +
- stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()));
+ Integer.toString(handler.getReplicationServerId()));
logError(errMessage);
}
catch (ClassNotFoundException e)
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS <" + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " reader CNF EXCEPTION serverID=" + serverId +
- stackTraceToSingleLineString(e));
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
* close the connection.
@@ -344,10 +320,7 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS <" + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- " server reader EXCEPTION serverID=" + serverId +
- " " + stackTraceToSingleLineString(e));
+ "In " + this.getName() + " " + stackTraceToSingleLineString(e));
/*
* The remote server has sent an unknown message,
* close the connection.
@@ -364,11 +337,6 @@
*/
try
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- this + " is closing the session");
if (handler.getProtocolVersion() >=
ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
@@ -382,12 +350,14 @@
// Anyway, going to close session, so nothing to do
}
}
+ if (debugEnabled())
+ TRACER.debugInfo("In " + this.getName() + " closing the session");
session.close();
} catch (IOException e)
{
// ignore
}
- replicationServerDomain.stopServer(handler);
+ handler.doStop();
if (debugEnabled())
{
TRACER.debugInfo(this.getName() + " stopped " + errMessage);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index e646e29..2be61af 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -37,6 +37,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import static org.opends.server.loggers.ErrorLogger.logError;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
@@ -230,7 +231,7 @@
// Write additional changes and read ECL from a provided draft change number
ts = ECLCompatWriteReadAllOps(5);replicationServer.clearDb();
- // ECLIncludeAttributes();replicationServer.clearDb();
+ ECLIncludeAttributes();replicationServer.clearDb();
}
@Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
@@ -260,7 +261,7 @@
ECLRemoteNonEmpty();replicationServer.clearDb();
// Test with a mix of domains, a mix of DSes
- //ECLTwoDomains();
+ ECLTwoDomains();
// changelogDb required NOT empty for the next test
// Test ECL after changelog triming
@@ -1340,10 +1341,10 @@
// test success
waitOpResult(searchOp, ResultCode.SUCCESS);
// test 4 entries returned
- String cookie1 = "o=test:"+cn1.toString()+";o=test2:;";
- String cookie2 = "o=test:"+cn2.toString()+";o=test2:;";
- String cookie3 = "o=test:"+cn3.toString()+";o=test2:;";
- String cookie4 = "o=test:"+cn4.toString()+";o=test2:;";
+ String cookie1 = "o=test:"+cn1.toString()+";";
+ String cookie2 = "o=test:"+cn2.toString()+";";
+ String cookie3 = "o=test:"+cn3.toString()+";";
+ String cookie4 = "o=test:"+cn4.toString()+";";
assertEquals(searchOp.getSearchEntries().size(), 4);
LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
@@ -2017,7 +2018,7 @@
s2 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
org.opends.server.tools.LDAPReader r2 = new org.opends.server.tools.LDAPReader(s2);
LDAPWriter w2 = new LDAPWriter(s2);
- s2.setSoTimeout(15000);
+ s2.setSoTimeout(30000);
bindAsManager(w2, r2);
// Connects and bind
@@ -2483,6 +2484,7 @@
private static void removeTestBackend2(Backend backend)
{
MemoryBackend memoryBackend = (MemoryBackend)backend;
+ memoryBackend.clearMemoryBackend();
memoryBackend.finalizeBackend();
DirectoryServer.deregisterBackend(memoryBackend);
}
@@ -2757,7 +2759,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","delete");
- checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0));
checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115");
@@ -2775,7 +2777,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","add");
- checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1));
} else if (i==3)
@@ -2790,7 +2792,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","modify");
- checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2));
} else if (i==4)
@@ -2802,7 +2804,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","modrdn");
- checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"newrdn","uid="+tn+"new4");
checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2);
@@ -2849,7 +2851,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","delete");
- checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0));
checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115");
@@ -2867,7 +2869,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","add");
- checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1));
} else if (i==3)
@@ -2882,7 +2884,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","modify");
- checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2));
} else if (i==4)
@@ -2894,7 +2896,7 @@
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING);
checkValue(resultEntry,"changetype","modrdn");
- checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"newrdn","uid="+tn+"new4");
checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2);
@@ -2967,7 +2969,7 @@
checkValue(resultEntry,"replicationcsn",gblCN.toString());
checkValue(resultEntry,"replicaidentifier","1201");
checkValue(resultEntry,"changetype","add");
- checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
+ checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
checkValue(resultEntry,"targetentryuuid",user1entryUUID);
checkValue(resultEntry,"changenumber","6");
}
@@ -3603,6 +3605,7 @@
waitOpResult(searchOp, ResultCode.SUCCESS);
LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
+ sleep(2000);
assertTrue(entries != null);
String s = tn + " entries returned= ";
--
Gitblit v1.10.0