fix for 2787 : Replication Server sessions fails when disconnecting and re-connecting
When a ReplicationBroker disconnect and reconnect quickly to a Replication
Server the reconnection sometimes fails.
These Changes :
- add a unit tests for these condition
- fixes race conditions in the Replication Server code to make this test
successful
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.OutputStream; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | /** |
| | |
| | | public class ReplLDIFOutputStream |
| | | extends OutputStream |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | // The synchronization domain on which the export is done |
| | | ReplicationDomain domain; |
| | | |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | * @param baseDn the baseDn for which this DB was created. |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * @param generationId The generationId of the data contained in the LDAP |
| | | * server for this domain. |
| | | * @throws DatabaseException If a database problem happened |
| | | */ |
| | | public DbHandler(short id, DN baseDn, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv, long generationId) |
| | | ReplicationDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | this.serverId = id; |
| | |
| | | mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get()); |
| | | mb.append(stackTraceToSingleLineString(end)); |
| | | logError(mb.toMessage()); |
| | | replicationServer.shutdown(); |
| | | if (replicationServer != null) |
| | | replicationServer.shutdown(); |
| | | break; |
| | | } |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.*; |
| | |
| | | + " serverId=" + serverId); |
| | | |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, replicationServer, this, 1); |
| | | new DbHandler(serverId, baseDn, replicationServer, this); |
| | | |
| | | replicationServer.getReplicationServerDomain(baseDn, true). |
| | | setDbHandler(serverId, dbHandler); |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @param generationId The generationId for this server and this |
| | | * replicationServerDomain. |
| | | * @return The new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | | */ |
| | | public DbHandler newDbHandler(short id, DN baseDn, long generationId) |
| | | public DbHandler newDbHandler(short id, DN baseDn) |
| | | throws DatabaseException |
| | | { |
| | | return new DbHandler(id, baseDn, this, dbEnv, generationId); |
| | | return new DbHandler(id, baseDn, this, dbEnv); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | } |
| | | |
| | | if (generationId < 0) |
| | | { |
| | | generationId = sourceHandler.getGenerationId(); |
| | | } |
| | | |
| | | // look for the dbHandler that is responsible for the LDAP server which |
| | | // generated the change. |
| | | DbHandler dbHandler = null; |
| | |
| | | { |
| | | try |
| | | { |
| | | dbHandler = replicationServer.newDbHandler(id, |
| | | baseDn, generationId); |
| | | dbHandler = replicationServer.newDbHandler(id, baseDn); |
| | | generationIdSavedStatus = true; |
| | | } |
| | | catch (DatabaseException e) |
| | |
| | | handler.add(update, sourceHandler); |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Wait a short while for ServerId disconnection. |
| | | * |
| | | * @param serverId the serverId to be checked. |
| | | */ |
| | | public void waitDisconnection(short serverId) |
| | | { |
| | | if (connectedServers.containsKey(serverId)) |
| | | { |
| | | // try again |
| | | try |
| | | { |
| | | Thread.sleep(100); |
| | | } catch (InterruptedException e) |
| | | { |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | " for " + baseDn + " " + |
| | | " stopServer " + handler.getMonitorInstanceName()); |
| | | |
| | | handler.stopHandler(); |
| | | |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | } |
| | | else |
| | | { |
| | | connectedServers.remove(handler.getServerId()); |
| | | } |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | if (replicationServers.containsValue(handler)) |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | handler.stopHandler(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (connectedServers.containsValue(handler)) |
| | | { |
| | | connectedServers.remove(handler.getServerId()); |
| | | handler.stopHandler(); |
| | | } |
| | | } |
| | | |
| | | mayResetGenerationId(); |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | } |
| | | |
| | | /** |
| | |
| | | return replicationServer; |
| | | } |
| | | |
| | | /** |
| | | * Process reception of a ReplServerInfoMessage. |
| | | * |
| | | * @param infoMsg The received message. |
| | | * @param handler The handler that received the message. |
| | | * @throws IOException when raised by the underlying session. |
| | | */ |
| | | public void receiveReplServerInfo( |
| | | ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | TRACER.debugInfo( |
| | | "In RS " + getReplicationServer().getServerId() + |
| | | " Receiving replServerInfo from " + handler.getServerId() + |
| | | " baseDn=" + baseDn + |
| | | " genId=" + infoMsg.getGenerationId()); |
| | | } |
| | | |
| | | mayResetGenerationId(); |
| | | if (generationId < 0) |
| | | generationId = handler.getGenerationId(); |
| | | if (generationId > 0 && (generationId != infoMsg.getGenerationId())) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | baseDn.toNormalizedString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(infoMsg.getGenerationId()), |
| | | Long.toString(generationId)); |
| | | |
| | | ErrorMessage errorMsg = new ErrorMessage( |
| | | getReplicationServer().getServerId(), |
| | | handler.getServerId(), |
| | | message); |
| | | handler.sendError(errorMsg); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * Monitor Data generation |
| | | */ |
| | |
| | | // Get or Create the ReplicationServerDomain |
| | | replicationServerDomain = |
| | | replicationServer.getReplicationServerDomain(this.baseDn, true); |
| | | |
| | | replicationServerDomain.waitDisconnection(receivedMsg.getServerId()); |
| | | replicationServerDomain.mayResetGenerationId(); |
| | | |
| | | localGenerationId = replicationServerDomain.getGenerationId(); |
| | | |
| | | ServerState localServerState = |
| | |
| | | { |
| | | } |
| | | } |
| | | do { |
| | | boolean acquired = false; |
| | | do |
| | | { |
| | | try |
| | | { |
| | | sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS); |
| | | acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS); |
| | | interrupted = false; |
| | | } catch (InterruptedException e) |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while ((interrupted) && (!shutdown)); |
| | | } while (((interrupted) || (!acquired )) && (!shutdown)); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Send an ErrorMessage to the peer. |
| | | * |
| | | * @param errorMsg The message to be sent |
| | | * @throws IOException when raised by the underlying session |
| | | */ |
| | | public void sendError(ErrorMessage errorMsg) throws IOException |
| | | { |
| | | session.publish(errorMsg); |
| | | } |
| | | |
| | | /** |
| | | * Process the reception of a WindowProbe message. |
| | | * |
| | | * @param windowProbeMsg The message to process. |
| | |
| | | { |
| | | ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg; |
| | | handler.receiveReplServerInfo(infoMsg); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServerDomain.getReplicationServer(). |
| | | getServerId() + |
| | | " Receiving replServerInfo from " + handler.getServerId() + |
| | | " baseDn=" + replicationServerDomain.getBaseDn() + |
| | | " genId=" + infoMsg.getGenerationId()); |
| | | } |
| | | |
| | | if (replicationServerDomain.getGenerationId()<0) |
| | | { |
| | | // Here is the case where a ReplicationServer receives from |
| | | // another ReplicationServer the generationId for a domain |
| | | // for which the generation ID has never been set. |
| | | replicationServerDomain. |
| | | setGenerationId(infoMsg.getGenerationId(),false); |
| | | } |
| | | else |
| | | { |
| | | if (infoMsg.getGenerationId()<0) |
| | | { |
| | | // Here is the case where another ReplicationServer |
| | | // signals that it has no generationId set for the domain. |
| | | // If we have generationId set locally and no server currently |
| | | // connected for that domain in the topology then we may also |
| | | // reset the generationId localy. |
| | | replicationServerDomain.mayResetGenerationId(); |
| | | } |
| | | |
| | | if (replicationServerDomain.getGenerationId() != |
| | | infoMsg.getGenerationId()) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | replicationServerDomain.getBaseDn().toNormalizedString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(infoMsg.getGenerationId()), |
| | | Long.toString(replicationServerDomain.getGenerationId())); |
| | | |
| | | ErrorMessage errorMsg = new ErrorMessage( |
| | | replicationServerDomain.getReplicationServer().getServerId(), |
| | | handler.getServerId(), |
| | | message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | replicationServerDomain.receiveReplServerInfo(infoMsg, handler); |
| | | } |
| | | else if (msg instanceof MonitorRequestMessage) |
| | | { |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | |
| | | // Read generationId - should be not retrievable since no entry |
| | | debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")"); |
| | | connectServer1ToChangelog(changelog1ID); |
| | | Thread.sleep(1000); |
| | | |
| | | debugInfo(testCase + " Expect genId attribute to be not retrievable"); |
| | | genId = readGenId(); |
| | |
| | | debugInfo("Create again replServer1"); |
| | | replServer1 = createReplicationServer(changelog1ID, false, testCase); |
| | | debugInfo("Delay to allow DS to reconnect to replServer1"); |
| | | Thread.sleep(200); |
| | | Thread.sleep(1000); |
| | | |
| | | long genIdAfterRestart = replServer1.getGenerationId(baseDn); |
| | | debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart); |
| | | assertTrue(replServer1!=null, "Replication server creation failed."); |
| | | assertTrue(genIdBeforeShut == genIdAfterRestart, |
| | | "generationId is expected to have the same value after replServer1 restart"); |
| | | "generationId is expected to have the same value" + |
| | | " after replServer1 restart. Before : " + genIdBeforeShut + |
| | | " after : " + genIdAfterRestart); |
| | | |
| | | try |
| | | { |
| | |
| | | disconnectFromReplServer(changelog1ID); |
| | | Thread.sleep(1000); |
| | | |
| | | debugInfo("Expect genId to be unset(-1) in all servers since no server is " + |
| | | " connected and no change ever occurred"); |
| | | assertEquals(replServer1.getGenerationId(baseDn), -1, " in replServer1"); |
| | | assertEquals(replServer2.getGenerationId(baseDn), -1, " in replServer2"); |
| | | assertEquals(replServer3.getGenerationId(baseDn), -1, " in replServer3"); |
| | | |
| | | debugInfo("Add entries to DS"); |
| | | this.addTestEntriesToDB(updatedEntries); |
| | | |
| | |
| | | debugInfo(testCase + " Expect genId attribute to be retrievable"); |
| | | genId = readGenId(); |
| | | assertEquals(genId, 3211313L); |
| | | |
| | | disconnectFromReplServer(changelog1ID); |
| | | } |
| | | finally |
| | | { |
| | |
| | | debugInfo("Successfully ending " + testCase); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Loop opening sessions to the Replication Server |
| | | * to check that it handle correctly deconnection and reconnection. |
| | | */ |
| | | @Test(enabled=false, groups="slow") |
| | | public void testLoop() throws Exception |
| | | { |
| | | String testCase = "testLoop"; |
| | | debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled()); |
| | | long rgenId; |
| | | |
| | | ReplicationDomain.clearJEBackend(false, |
| | | "userRoot", |
| | | baseDn.toNormalizedString()); |
| | | |
| | | replServer1 = createReplicationServer(changelog1ID, false, testCase); |
| | | replServer1.clearDb(); |
| | | |
| | | ReplicationBroker broker = null; |
| | | try |
| | | { |
| | | for (int i=0; i< 100; i++) |
| | | { |
| | | long generationId = 1000+i; |
| | | broker = openReplicationSession(baseDn, |
| | | server2ID, 100, getChangelogPort(changelog1ID), |
| | | 1000, !emptyOldChanges, generationId); |
| | | |
| | | debugInfo(testCase + " Expect genId to be set in memory on the replication " + |
| | | " server side even if not wrote on disk/db since no change occurred."); |
| | | rgenId = replServer1.getGenerationId(baseDn); |
| | | if (rgenId != generationId) |
| | | { |
| | | fail("replication server failed to set generation ID"); |
| | | replServer1.getGenerationId(baseDn); |
| | | } |
| | | broker.stop(); |
| | | broker = null; |
| | | } |
| | | } finally |
| | | { |
| | | if (broker != null) |
| | | broker.stop(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This is used to make sure that the 3 tests are run in the |
| | | * specified order since this is necessary. |
| | | */ |
| | | @Test(enabled=true, groups="slow") |
| | | public void generationIdTest() throws Exception |
| | | { |
| | | testSingleRS(); |
| | | testMultiRS(); |
| | | testServerStop(); |
| | | testLoop(); |
| | | } |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer); |
| | | |
| | | DbHandler handler = |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1); |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv); |
| | | |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0); |
| | | ChangeNumber changeNumber1 = gen.newChangeNumber(); |
| | |
| | | ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer); |
| | | |
| | | DbHandler handler = |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1); |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv); |
| | | |
| | | // Creates changes added to the dbHandler |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0); |