From 6d84d49bcafa262c5b9bb545810778721597421a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 26 Aug 2013 13:51:36 +0000
Subject: [PATCH] Found problems in the replication ECL code. Also made the code more explicit.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 106 +++++++++++---------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java | 52 ++++++++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 11 +-
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java | 20 ++--
5 files changed, 97 insertions(+), 96 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 67b977c..387b190 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -565,7 +565,7 @@
private String findCookie(final int startDraftCN) throws ChangelogException,
DirectoryException
{
- ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
if (startDraftCN <= 1)
{
@@ -580,9 +580,10 @@
return null;
}
- final int firstKey = changelogDB.getFirstKey();
- String crossDomainStartState = changelogDB.getPreviousCookie(firstKey);
- changelogDBIter = changelogDB.generateIterator(firstKey);
+ final int firstDraftCN = changelogDB.getFirstDraftCN();
+ final String crossDomainStartState =
+ changelogDB.getPreviousCookie(firstDraftCN);
+ changelogDBIter = changelogDB.generateIterator(firstDraftCN);
return crossDomainStartState;
}
@@ -633,7 +634,7 @@
return null;
}
- final int lastKey = changelogDB.getLastKey();
+ final int lastKey = changelogDB.getLastDraftCN();
crossDomainStartState = changelogDB.getPreviousCookie(lastKey);
changelogDBIter = changelogDB.generateIterator(lastKey);
return crossDomainStartState;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 7b16799..7455fa0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -89,8 +89,8 @@
private Thread listenThread;
private Thread connectThread;
- /** The list of replication servers configured by the administrator. */
- private Collection<String> replicationServers;
+ /** The list of replication server URLs configured by the administrator. */
+ private Collection<String> replicationServerUrls;
/**
* This table is used to store the list of dn for which we are currently
@@ -219,9 +219,9 @@
{
replicationPort = configuration.getReplicationPort();
serverId = configuration.getReplicationServerId();
- replicationServers = configuration.getReplicationServer();
- if (replicationServers == null)
- replicationServers = new ArrayList<String>();
+ replicationServerUrls = configuration.getReplicationServer();
+ if (replicationServerUrls == null)
+ replicationServerUrls = new ArrayList<String>();
queueSize = configuration.getQueueSize();
purgeDelay = configuration.getReplicationPurgeDelay();
dbDirname = configuration.getReplicationDBDirectory();
@@ -259,8 +259,8 @@
configuration.addChangeListener(this);
try
{
- backendConfigEntryDN = DN.decode(
- "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
+ backendConfigEntryDN =
+ DN.decode("ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
} catch (Exception e) { /* do nothing */ }
// Creates the backend associated to this ReplicationServer
@@ -404,14 +404,14 @@
/*
* check that all replication server in the config are in the
- * connected Set. If not create the connection
+ * connected Set. If not, create the connection
*/
- for (String aServerURL : replicationServers)
+ for (String rsURL : replicationServerUrls)
{
- final int separator = aServerURL.lastIndexOf(':');
- final String portString = aServerURL.substring(separator + 1);
- final int port = Integer.parseInt(portString);
- final String hostname = aServerURL.substring(0, separator);
+ final int separator = rsURL.lastIndexOf(':');
+ final String hostname = rsURL.substring(0, separator);
+ final int port = Integer.parseInt(rsURL.substring(separator + 1));
+
final InetAddress inetAddress;
try
{
@@ -436,13 +436,13 @@
}
// Don't connect to a server if it is already connected.
- final String normalizedServerURL = normalizeServerURL(aServerURL);
+ final String normalizedServerURL = normalizeServerURL(rsURL);
if (connectedRSUrls.contains(normalizedServerURL))
{
continue;
}
- connect(aServerURL, domain.getBaseDn());
+ connect(rsURL, domain.getBaseDn());
}
}
@@ -538,10 +538,7 @@
listenSocket = new ServerSocket();
listenSocket.bind(new InetSocketAddress(replicationPort));
- /*
- * creates working threads
- * We must first connect, then start to listen.
- */
+ // creates working threads: we must first connect, then start to listen.
if (debugEnabled())
TRACER.debugInfo("RS " +getMonitorInstanceName()+
" creates connect thread");
@@ -559,7 +556,7 @@
// can know me and really enableECL.
if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID) != null)
{
- // Already done . Nothing to do
+ // Already done. Nothing to do
return;
}
eclwe = new ECLWorkflowElement(this);
@@ -567,7 +564,6 @@
if (debugEnabled())
TRACER.debugInfo("RS " +getMonitorInstanceName()+
" successfully initialized");
-
} catch (ChangelogException e)
{
Message message = ERR_COULD_NOT_READ_DB.get(
@@ -927,7 +923,7 @@
try
{
- lastGeneratedDraftCN = changelogDB.getLastKey();
+ lastGeneratedDraftCN = changelogDB.getLastDraftCN();
}
catch (Exception ignored)
{
@@ -995,9 +991,9 @@
disconnectRemovedReplicationServers(configuration.getReplicationServer());
- replicationServers = configuration.getReplicationServer();
- if (replicationServers == null)
- replicationServers = new ArrayList<String>();
+ replicationServerUrls = configuration.getReplicationServer();
+ if (replicationServerUrls == null)
+ replicationServerUrls = new ArrayList<String>();
queueSize = configuration.getQueueSize();
long newPurgeDelay = configuration.getReplicationPurgeDelay();
@@ -1088,8 +1084,8 @@
broadcastConfigChange();
}
- if ((configuration.getReplicationDBDirectory() != null) &&
- (!dbDirname.equals(configuration.getReplicationDBDirectory())))
+ final String newDir = configuration.getReplicationDBDirectory();
+ if (newDir != null && !dbDirname.equals(newDir))
{
return new ConfigChangeResult(ResultCode.SUCCESS, true);
}
@@ -1109,25 +1105,24 @@
* First try the set of configured replication servers to see if one of them
* is this replication server (this should always be the case).
*/
- for (String rs : replicationServers)
+ for (String rsUrl : replicationServerUrls)
{
/*
* No need validate the string format because the admin framework has
* already done it.
*/
- final int index = rs.lastIndexOf(":");
- final String hostname = rs.substring(0, index);
- final int port = Integer.parseInt(rs.substring(index + 1));
+ final int index = rsUrl.lastIndexOf(":");
+ final String hostname = rsUrl.substring(0, index);
+ final int port = Integer.parseInt(rsUrl.substring(index + 1));
+
if (port == replicationPort && isLocalAddress(hostname))
{
- serverURL = rs;
+ serverURL = rsUrl;
return;
}
}
- /*
- * Fall-back to the machine hostname.
- */
+ // Fall-back to the machine hostname.
serverURL = InetAddress.getLocalHost().getHostName() + ":"
+ replicationPort;
}
@@ -1249,8 +1244,7 @@
public void remove()
{
if (debugEnabled())
- TRACER.debugInfo("RS " +getMonitorInstanceName()+
- " starts removing");
+ TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing");
shutdown();
removeBackend();
@@ -1471,23 +1465,20 @@
{
Collection<String> serversToDisconnect = new ArrayList<String>();
- for (String server: replicationServers)
+ for (String rsUrl : replicationServerUrls)
{
- if (!newReplServers.contains(server))
+ if (!newReplServers.contains(rsUrl))
{
try
{
- // translate the server name into IP address
- // and keep the port number
- String[] host = server.split(":");
+ // translate the server name into IP address and keep the port number
+ String[] host = rsUrl.split(":");
serversToDisconnect.add(
- (InetAddress.getByName(host[0])).getHostAddress()
- + ":" + host[1]);
+ InetAddress.getByName(host[0]).getHostAddress() + ":" + host[1]);
}
catch (IOException e)
{
- Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(server);
- logError(message);
+ logError(ERR_COULD_NOT_SOLVE_HOSTNAME.get(rsUrl));
}
}
}
@@ -1686,7 +1677,7 @@
{
if (changelogDB != null)
{
- return changelogDB.getFirstKey();
+ return changelogDB.getFirstDraftCN();
}
return 0;
}
@@ -1702,7 +1693,7 @@
{
if (changelogDB != null)
{
- return changelogDB.getLastKey();
+ return changelogDB.getLastDraftCN();
}
return 0;
}
@@ -1754,10 +1745,10 @@
int lastDraftCN;
boolean dbEmpty = false;
- long newestDate = 0L;
- ChangelogDB changelogDB = getChangelogDB();
+ long newestDate = 0;
+ final ChangelogDB changelogDB = getChangelogDB();
- int firstDraftCN = changelogDB.getFirstKey();
+ int firstDraftCN = changelogDB.getFirstDraftCN();
Map<String,ServerState> domainsServerStateForLastSeqnum = null;
ChangeNumber changeNumberForLastSeqnum = null;
String domainForLastSeqnum = null;
@@ -1769,7 +1760,7 @@
}
else
{
- lastDraftCN = changelogDB.getLastKey();
+ lastDraftCN = changelogDB.getLastDraftCN();
// Get the generalized state associated with the current last DraftCN
// and initializes from it the startStates table
@@ -1813,7 +1804,7 @@
// (may be this domain was disabled when this record was returned).
// In that case, are counted the changes from
// the date of the most recent change from this last draft record
- if (newestDate == 0L)
+ if (newestDate == 0)
{
newestDate = changeNumberForLastSeqnum.getTime();
}
@@ -1900,7 +1891,6 @@
}
-
private String normalizeServerURL(final String url)
{
final int separator = url.lastIndexOf(':');
@@ -1909,16 +1899,12 @@
try
{
final InetAddress inetAddress = InetAddress.getByName(hostname);
-
if (isLocalAddress(inetAddress))
{
- // It doesn't matter whether we use an IP or hostname here.
+ // It does not matter whether we use an IP or hostname here.
return InetAddress.getLocalHost().getHostAddress() + ":" + portString;
}
- else
- {
- return inetAddress.getHostAddress() + ":" + portString;
- }
+ return inetAddress.getHostAddress() + ":" + portString;
}
catch (UnknownHostException e)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
index f5382d7..4883047 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -70,14 +70,14 @@
*
* @return Returns the first draftCN in the DB.
*/
- int getFirstKey();
+ int getFirstDraftCN();
/**
* Get the lastChange.
*
* @return Returns the last draftCN in the DB
*/
- int getLastKey();
+ int getLastDraftCN();
/**
* Add an update to the list of messages that must be saved to the db managed
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
index 6aac0d9..10e0b71 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -77,8 +77,16 @@
private static int NO_KEY = 0;
private DraftCNDB db;
- private int firstkey = NO_KEY;
- private int lastkey = NO_KEY;
+ /**
+ * FIXME Is this field that useful? {@link #getFirstDraftCN()} does not even
+ * use it!
+ */
+ private int firstDraftCN = NO_KEY;
+ /**
+ * FIXME Is this field that useful? {@link #getLastDraftCN()} does not even
+ * use it! It is not even updated.
+ */
+ private int lastDraftCN = NO_KEY;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private boolean shutdown = false;
private boolean trimDone = false;
@@ -117,8 +125,8 @@
// DB initialization
db = new DraftCNDB(dbenv);
- firstkey = db.readFirstDraftCN();
- lastkey = db.readLastDraftCN();
+ firstDraftCN = db.readFirstDraftCN();
+ lastDraftCN = db.readLastDraftCN();
// Trimming thread
thread = new DirectoryThread(this, "Replication DraftCN db");
@@ -147,14 +155,14 @@
/** {@inheritDoc} */
@Override
- public int getFirstKey()
+ public int getFirstDraftCN()
{
return db.readFirstDraftCN();
}
/** {@inheritDoc} */
@Override
- public int getLastKey()
+ public int getLastDraftCN()
{
return db.readLastDraftCN();
}
@@ -309,11 +317,18 @@
return;
}
+ // FIXME is this correct?
+ // This code is not setting the excludedBaseDNs of the RS which means it
+ // could take any value set by one of the other methods!
+ // In addition, this code is not thread safe, but I suspect it is used in a
+ // multi-threaded way.
+ // The call to RS.getEligibleCN() is not reliable in any way and could
+ // return very different values even if the DB content did not change!!
ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
for (int i = 0; i < 100; i++)
{
- DraftCNDBCursor cursor = db.openDeleteCursor();
+ final DraftCNDBCursor cursor = db.openDeleteCursor();
try
{
for (int j = 0; j < 50; j++)
@@ -326,16 +341,15 @@
}
// From the draftCNDb change record, get the domain and changeNumber
- ChangeNumber cn = cursor.currentChangeNumber();
- String baseDN = cursor.currentBaseDN();
- if ((baseDNToClear != null)
- && (baseDNToClear.equalsIgnoreCase(baseDN)))
+ final ChangeNumber cn = cursor.currentChangeNumber();
+ final String baseDN = cursor.currentBaseDN();
+ if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN))
{
cursor.delete();
continue;
}
- ReplicationServerDomain domain = replicationServer
+ final ReplicationServerDomain domain = replicationServer
.getReplicationServerDomain(baseDN, false);
if (domain == null)
{
@@ -346,14 +360,14 @@
continue;
}
- ServerState startState = domain.getStartState();
+ final ServerState startState = domain.getStartState();
// We don't use the returned endState but it's updating CN as reading
domain.getEligibleState(crossDomainEligibleCN);
- ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
+ final ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
- int currentKey = cursor.currentKey();
+ final int currentDraftCN = cursor.currentKey();
if (cn.older(fcn))
{
@@ -391,7 +405,7 @@
continue;
}
- firstkey = currentKey;
+ firstDraftCN = currentDraftCN;
cursor.close();
return;
}
@@ -465,7 +479,7 @@
@Override
public String toString()
{
- return "draftCNdb:" + " " + firstkey + " " + lastkey;
+ return "draftCNdb:" + " " + firstDraftCN + " " + lastDraftCN;
}
/**
@@ -482,8 +496,8 @@
public void clear() throws ChangelogException
{
db.clear();
- firstkey = db.readFirstDraftCN();
- lastkey = db.readLastDraftCN();
+ firstDraftCN = db.readFirstDraftCN();
+ lastDraftCN = db.readLastDraftCN();
}
private ReentrantLock lock = new ReentrantLock();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
index 73e3696..8d5450c 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -111,11 +111,11 @@
handler.add(sn3, value3, baseDN3, changeNumber3);
// The ChangeNumber should not get purged
- int firstkey = handler.getFirstKey();
- assertEquals(firstkey, sn1);
- assertEquals(handler.getLastKey(), sn3);
+ final int firstDraftCN = handler.getFirstDraftCN();
+ assertEquals(firstDraftCN, sn1);
+ assertEquals(handler.getLastDraftCN(), sn3);
- DraftCNDBCursor dbc = handler.getReadCursor(firstkey);
+ DraftCNDBCursor dbc = handler.getReadCursor(firstDraftCN);
try
{
assertEquals(dbc.currentChangeNumber(), changeNumber1);
@@ -149,8 +149,8 @@
{
Thread.sleep(200);
}
- assertEquals(handler.getFirstKey(), 0);
- assertEquals(handler.getLastKey(), 0);
+ assertEquals(handler.getFirstDraftCN(), 0);
+ assertEquals(handler.getLastDraftCN(), 0);
} finally
@@ -239,8 +239,8 @@
Thread.sleep(500);
// Checks
- assertEquals(handler.getFirstKey(), sn1);
- assertEquals(handler.getLastKey(), sn3);
+ assertEquals(handler.getFirstDraftCN(), sn1);
+ assertEquals(handler.getLastDraftCN(), sn3);
assertEquals(handler.count(), 3, "Db count");
@@ -260,8 +260,8 @@
handler.clear();
// Check the db is cleared.
- assertEquals(handler.getFirstKey(), 0);
- assertEquals(handler.getLastKey(), 0);
+ assertEquals(handler.getFirstDraftCN(), 0);
+ assertEquals(handler.getLastDraftCN(), 0);
assertEquals(handler.count(), 0);
} finally
{
--
Gitblit v1.10.0