From 93b35d25ca1affdaed002b9467c3a6b6a369eae4 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 03 Jan 2008 14:00:38 +0000
Subject: [PATCH] fix for 2787 : Replication Server sessions fails when disconnecting and re-connecting When a ReplicationBroker disconnect and reconnect quickly to a Replication Server the reconnection sometimes fails.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 8 -
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java | 9 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 23 ++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 99 ++++++++++++++++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 49 ---------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 4
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java | 69 +++++++++++--
9 files changed, 175 insertions(+), 100 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
index 5d48ea1..d523595 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -22,16 +22,14 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.io.IOException;
import java.io.OutputStream;
-import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.ServerConstants;
/**
@@ -41,11 +39,6 @@
public class ReplLDIFOutputStream
extends OutputStream
{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
// The synchronization domain on which the export is done
ReplicationDomain domain;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 1c1b59a..34c7882 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -114,12 +114,11 @@
* @param baseDn the baseDn for which this DB was created.
* @param replicationServer The ReplicationServer that creates this dbHandler.
* @param dbenv the Database Env to use to create the ReplicationServer DB.
- * @param generationId The generationId of the data contained in the LDAP
* server for this domain.
* @throws DatabaseException If a database problem happened
*/
public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
- ReplicationDbEnv dbenv, long generationId)
+ ReplicationDbEnv dbenv)
throws DatabaseException
{
this.serverId = id;
@@ -369,7 +368,8 @@
mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
mb.append(stackTraceToSingleLineString(end));
logError(mb.toMessage());
- replicationServer.shutdown();
+ if (replicationServer != null)
+ replicationServer.shutdown();
break;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index e1b7373..3df442d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.*;
@@ -257,7 +257,7 @@
+ " serverId=" + serverId);
DbHandler dbHandler =
- new DbHandler(serverId, baseDn, replicationServer, this, 1);
+ new DbHandler(serverId, baseDn, replicationServer, this);
replicationServer.getReplicationServerDomain(baseDn, true).
setDbHandler(serverId, dbHandler);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index a7d69e5..bf81c46 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -544,16 +544,14 @@
*
* @param id The serverId for which the dbHandler must be created.
* @param baseDn The DN for which the dbHandler muste be created.
- * @param generationId The generationId for this server and this
- * replicationServerDomain.
* @return The new DB handler for this ReplicationServer and the serverId and
* DN given in parameter.
* @throws DatabaseException in case of underlying database problem.
*/
- public DbHandler newDbHandler(short id, DN baseDn, long generationId)
+ public DbHandler newDbHandler(short id, DN baseDn)
throws DatabaseException
{
- return new DbHandler(id, baseDn, this, dbEnv, generationId);
+ return new DbHandler(id, baseDn, this, dbEnv);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6382ca3..d97d2ad 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -215,6 +215,11 @@
}
}
+ if (generationId < 0)
+ {
+ generationId = sourceHandler.getGenerationId();
+ }
+
// look for the dbHandler that is responsible for the LDAP server which
// generated the change.
DbHandler dbHandler = null;
@@ -225,8 +230,7 @@
{
try
{
- dbHandler = replicationServer.newDbHandler(id,
- baseDn, generationId);
+ dbHandler = replicationServer.newDbHandler(id, baseDn);
generationIdSavedStatus = true;
}
catch (DatabaseException e)
@@ -277,8 +281,25 @@
handler.add(update, sourceHandler);
}
+ }
-
+ /**
+ * Wait a short while for ServerId disconnection.
+ *
+ * @param serverId the serverId to be checked.
+ */
+ public void waitDisconnection(short serverId)
+ {
+ if (connectedServers.containsKey(serverId))
+ {
+ // try again
+ try
+ {
+ Thread.sleep(100);
+ } catch (InterruptedException e)
+ {
+ }
+ }
}
/**
@@ -339,22 +360,27 @@
" for " + baseDn + " " +
" stopServer " + handler.getMonitorInstanceName());
- handler.stopHandler();
- if (handler.isReplicationServer())
- {
- replicationServers.remove(handler.getServerId());
- }
- else
- {
- connectedServers.remove(handler.getServerId());
- }
+ if (handler.isReplicationServer())
+ {
+ if (replicationServers.containsValue(handler))
+ {
+ replicationServers.remove(handler.getServerId());
+ handler.stopHandler();
+ }
+ }
+ else
+ {
+ if (connectedServers.containsValue(handler))
+ {
+ connectedServers.remove(handler.getServerId());
+ handler.stopHandler();
+ }
+ }
- mayResetGenerationId();
-
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendReplServerInfo();
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
}
/**
@@ -1238,6 +1264,45 @@
return replicationServer;
}
+ /**
+ * Process reception of a ReplServerInfoMessage.
+ *
+ * @param infoMsg The received message.
+ * @param handler The handler that received the message.
+ * @throws IOException when raised by the underlying session.
+ */
+ public void receiveReplServerInfo(
+ ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException
+ {
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ TRACER.debugInfo(
+ "In RS " + getReplicationServer().getServerId() +
+ " Receiving replServerInfo from " + handler.getServerId() +
+ " baseDn=" + baseDn +
+ " genId=" + infoMsg.getGenerationId());
+ }
+
+ mayResetGenerationId();
+ if (generationId < 0)
+ generationId = handler.getGenerationId();
+ if (generationId > 0 && (generationId != infoMsg.getGenerationId()))
+ {
+ Message message = NOTE_BAD_GENERATION_ID.get(
+ baseDn.toNormalizedString(),
+ Short.toString(handler.getServerId()),
+ Long.toString(infoMsg.getGenerationId()),
+ Long.toString(generationId));
+
+ ErrorMessage errorMsg = new ErrorMessage(
+ getReplicationServer().getServerId(),
+ handler.getServerId(),
+ message);
+ handler.sendError(errorMsg);
+ }
+ }
+
/*
* Monitor Data generation
*/
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index cd499e3..fb71b78 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -283,6 +283,10 @@
// Get or Create the ReplicationServerDomain
replicationServerDomain =
replicationServer.getReplicationServerDomain(this.baseDn, true);
+
+ replicationServerDomain.waitDisconnection(receivedMsg.getServerId());
+ replicationServerDomain.mayResetGenerationId();
+
localGenerationId = replicationServerDomain.getGenerationId();
ServerState localServerState =
@@ -1010,16 +1014,18 @@
{
}
}
- do {
+ boolean acquired = false;
+ do
+ {
try
{
- sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
+ acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
interrupted = false;
} catch (InterruptedException e)
{
// loop until not interrupted
}
- } while ((interrupted) && (!shutdown));
+ } while (((interrupted) || (!acquired )) && (!shutdown));
this.incrementOutCount();
return msg;
}
@@ -1800,6 +1806,17 @@
}
/**
+ * Send an ErrorMessage to the peer.
+ *
+ * @param errorMsg The message to be sent
+ * @throws IOException when raised by the underlying session
+ */
+ public void sendError(ErrorMessage errorMsg) throws IOException
+ {
+ session.publish(errorMsg);
+ }
+
+ /**
* Process the reception of a WindowProbe message.
*
* @param windowProbeMsg The message to process.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 9193c99..323751e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -199,54 +199,7 @@
{
ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
handler.receiveReplServerInfo(infoMsg);
-
- if (debugEnabled())
- {
- if (handler.isReplicationServer())
- TRACER.debugInfo(
- "In RS " + replicationServerDomain.getReplicationServer().
- getServerId() +
- " Receiving replServerInfo from " + handler.getServerId() +
- " baseDn=" + replicationServerDomain.getBaseDn() +
- " genId=" + infoMsg.getGenerationId());
- }
-
- if (replicationServerDomain.getGenerationId()<0)
- {
- // Here is the case where a ReplicationServer receives from
- // another ReplicationServer the generationId for a domain
- // for which the generation ID has never been set.
- replicationServerDomain.
- setGenerationId(infoMsg.getGenerationId(),false);
- }
- else
- {
- if (infoMsg.getGenerationId()<0)
- {
- // Here is the case where another ReplicationServer
- // signals that it has no generationId set for the domain.
- // If we have generationId set locally and no server currently
- // connected for that domain in the topology then we may also
- // reset the generationId localy.
- replicationServerDomain.mayResetGenerationId();
- }
-
- if (replicationServerDomain.getGenerationId() !=
- infoMsg.getGenerationId())
- {
- Message message = NOTE_BAD_GENERATION_ID.get(
- replicationServerDomain.getBaseDn().toNormalizedString(),
- Short.toString(handler.getServerId()),
- Long.toString(infoMsg.getGenerationId()),
- Long.toString(replicationServerDomain.getGenerationId()));
-
- ErrorMessage errorMsg = new ErrorMessage(
- replicationServerDomain.getReplicationServer().getServerId(),
- handler.getServerId(),
- message);
- session.publish(errorMsg);
- }
- }
+ replicationServerDomain.receiveReplServerInfo(infoMsg, handler);
}
else if (msg instanceof MonitorRequestMessage)
{
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
index 5ba1a2d..7011c1f 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication;
@@ -721,7 +721,6 @@
// Read generationId - should be not retrievable since no entry
debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")");
connectServer1ToChangelog(changelog1ID);
- Thread.sleep(1000);
debugInfo(testCase + " Expect genId attribute to be not retrievable");
genId = readGenId();
@@ -848,13 +847,15 @@
debugInfo("Create again replServer1");
replServer1 = createReplicationServer(changelog1ID, false, testCase);
debugInfo("Delay to allow DS to reconnect to replServer1");
- Thread.sleep(200);
+ Thread.sleep(1000);
long genIdAfterRestart = replServer1.getGenerationId(baseDn);
debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
assertTrue(replServer1!=null, "Replication server creation failed.");
assertTrue(genIdBeforeShut == genIdAfterRestart,
- "generationId is expected to have the same value after replServer1 restart");
+ "generationId is expected to have the same value" +
+ " after replServer1 restart. Before : " + genIdBeforeShut +
+ " after : " + genIdAfterRestart);
try
{
@@ -1072,12 +1073,6 @@
disconnectFromReplServer(changelog1ID);
Thread.sleep(1000);
- debugInfo("Expect genId to be unset(-1) in all servers since no server is " +
- " connected and no change ever occurred");
- assertEquals(replServer1.getGenerationId(baseDn), -1, " in replServer1");
- assertEquals(replServer2.getGenerationId(baseDn), -1, " in replServer2");
- assertEquals(replServer3.getGenerationId(baseDn), -1, " in replServer3");
-
debugInfo("Add entries to DS");
this.addTestEntriesToDB(updatedEntries);
@@ -1278,6 +1273,8 @@
debugInfo(testCase + " Expect genId attribute to be retrievable");
genId = readGenId();
assertEquals(genId, 3211313L);
+
+ disconnectFromReplServer(changelog1ID);
}
finally
{
@@ -1285,11 +1282,63 @@
debugInfo("Successfully ending " + testCase);
}
}
+
+ /**
+ * Loop opening sessions to the Replication Server
+ * to check that it handle correctly deconnection and reconnection.
+ */
+ @Test(enabled=false, groups="slow")
+ public void testLoop() throws Exception
+ {
+ String testCase = "testLoop";
+ debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled());
+ long rgenId;
+
+ ReplicationDomain.clearJEBackend(false,
+ "userRoot",
+ baseDn.toNormalizedString());
+
+ replServer1 = createReplicationServer(changelog1ID, false, testCase);
+ replServer1.clearDb();
+
+ ReplicationBroker broker = null;
+ try
+ {
+ for (int i=0; i< 100; i++)
+ {
+ long generationId = 1000+i;
+ broker = openReplicationSession(baseDn,
+ server2ID, 100, getChangelogPort(changelog1ID),
+ 1000, !emptyOldChanges, generationId);
+
+ debugInfo(testCase + " Expect genId to be set in memory on the replication " +
+ " server side even if not wrote on disk/db since no change occurred.");
+ rgenId = replServer1.getGenerationId(baseDn);
+ if (rgenId != generationId)
+ {
+ fail("replication server failed to set generation ID");
+ replServer1.getGenerationId(baseDn);
+ }
+ broker.stop();
+ broker = null;
+ }
+ } finally
+ {
+ if (broker != null)
+ broker.stop();
+ }
+ }
+
+ /**
+ * This is used to make sure that the 3 tests are run in the
+ * specified order since this is necessary.
+ */
@Test(enabled=true, groups="slow")
public void generationIdTest() throws Exception
{
testSingleRS();
testMultiRS();
testServerStop();
+ testLoop();
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 46f45bb..7af4ec5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -73,7 +73,7 @@
ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
DbHandler handler =
- new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
+ new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -153,7 +153,7 @@
ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
DbHandler handler =
- new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
+ new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
// Creates changes added to the dbHandler
ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
--
Gitblit v1.10.0