opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -293,4 +293,10 @@ return sslEncryption; } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " " + (sslEncryption ? "with SSL" : ""); } } opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -82,7 +82,7 @@ */ public final static String NO_CONNECTED_SERVER = "Not connected"; private volatile String replicationServer = NO_CONNECTED_SERVER; private volatile Session session = null; private volatile Session session; private final ServerState state; private final DN baseDN; private final int serverId; @@ -1284,7 +1284,8 @@ // Send our Start Session StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg(); startECLSessionMsg.setOperationId("-1"); session.publish(startECLSessionMsg); final Session localSession = session; localSession.publish(startECLSessionMsg); // FIXME ECL In the handshake phase two, should RS send back a topo msg ? if (debugEnabled()) @@ -1294,7 +1295,7 @@ } // Alright set the timeout to the desired value session.setSoTimeout(timeout); localSession.setSoTimeout(timeout); connected = true; } catch (Exception e) { @@ -1319,8 +1320,6 @@ private TopologyMsg performPhaseTwoHandshake(String server, ServerStatus initStatus) { TopologyMsg topologyMsg; try { /* @@ -1347,12 +1346,13 @@ startSessionMsg = new StartSessionMsg(initStatus, new ArrayList<String>()); } session.publish(startSessionMsg); final Session localSession = session; localSession.publish(startSessionMsg); /* * Read the TopologyMsg that should come back. */ topologyMsg = (TopologyMsg) session.receive(); TopologyMsg topologyMsg = (TopologyMsg) localSession.receive(); if (debugEnabled()) { @@ -1361,8 +1361,8 @@ } // Alright set the timeout to the desired value session.setSoTimeout(timeout); localSession.setSoTimeout(timeout); return topologyMsg; } catch (Exception e) { Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, @@ -1372,9 +1372,8 @@ setSession(null); // Be sure to return null. topologyMsg = null; return null; } return topologyMsg; } /** @@ -1423,6 +1422,7 @@ * local DS * - replication server in the same VM as local DS one */ // TODO JNR log why an RS was evicted as best server Map<Integer, ReplicationServerInfo> bestServers = rsInfos; /* The list of best replication servers is filtered with each criteria. At @@ -2225,10 +2225,10 @@ Check the session. If it has changed, some disconnection or reconnection happened and we need to restart from scratch. */ if (session != null && session == currentSession) final Session localSession = session; if (localSession != null && session == currentSession) { session.publish(msg); localSession.publish(msg); done = true; } } @@ -2243,8 +2243,10 @@ window update message was lost somehow... then loop to check again if connection was closed. */ if (session != null) { session.publish(new WindowProbeMsg()); Session localSession = session; if (localSession != null) { localSession.publish(new WindowProbeMsg()); } } } @@ -2330,8 +2332,8 @@ // Save session information for later in case we need it for log messages // after the session has been closed and/or failed. final Session savedSession = session; if (savedSession == null) final Session localSession = session; if (localSession == null) { // Must be shutting down. break; @@ -2340,7 +2342,7 @@ final int previousRsServerID = rsServerId; try { ReplicationMsg msg = savedSession.receive(); ReplicationMsg msg = localSession.receive(); if (msg instanceof UpdateMsg) { synchronized (this) @@ -2372,12 +2374,12 @@ { // RS performs a proper disconnection Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get( previousRsServerID, savedSession.getReadableRemoteAddress(), previousRsServerID, localSession.getReadableRemoteAddress(), serverId, baseDN.toNormalizedString()); logError(message); // Try to find a suitable RS reStart(savedSession, true); reStart(localSession, true); } else if (msg instanceof MonitorMsg) { @@ -2436,14 +2438,15 @@ { message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( serverId, previousRsServerID, savedSession.getReadableRemoteAddress(), localSession.getReadableRemoteAddress(), baseDN.toNormalizedString()); } else { // TODO JNR log why an RS was evicted as best server message = NOTE_NEW_BEST_REPLICATION_SERVER.get( serverId, previousRsServerID, savedSession.getReadableRemoteAddress(), localSession.getReadableRemoteAddress(), bestServerInfo.getServerId(), baseDN.toNormalizedString()); } @@ -2480,13 +2483,13 @@ // We did not initiate the close on our side, log an error message. Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( serverId, baseDN.toNormalizedString(), previousRsServerID, savedSession.getReadableRemoteAddress()); localSession.getReadableRemoteAddress()); logError(message); } if (reconnectOnFailure) { reStart(savedSession, true); reStart(localSession, true); } else { @@ -2546,9 +2549,10 @@ try { updateDoneCount++; if ((updateDoneCount >= halfRcvWindow) && (session != null)) final Session localSession = session; if (updateDoneCount >= halfRcvWindow && localSession != null) { session.publish(new WindowMsg(updateDoneCount)); localSession.publish(new WindowMsg(updateDoneCount)); rcvWindow += updateDoneCount; updateDoneCount = 0; } @@ -2598,9 +2602,10 @@ public void setSoTimeout(int timeout) throws SocketException { this.timeout = timeout; if (session != null) final Session localSession = session; if (localSession != null) { session.setSoTimeout(timeout); localSession.setSoTimeout(timeout); } } @@ -2905,14 +2910,14 @@ // Start a CSN heartbeat thread. if (changeTimeHeartbeatSendInterval > 0) { String threadName = "Replica DS(" + getServerId() final Session localSession = session; final String threadName = "Replica DS(" + getServerId() + ") change time heartbeat publisher for domain \"" + this.baseDN + "\" to RS(" + getRsServerId() + ") at " + session.getReadableRemoteAddress(); + baseDN + "\" to RS(" + getRsServerId() + ") at " + localSession.getReadableRemoteAddress(); ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( threadName, session, changeTimeHeartbeatSendInterval, serverId); threadName, localSession, changeTimeHeartbeatSendInterval, serverId); ctHeartbeatPublisherThread.start(); } else { @@ -3030,4 +3035,11 @@ DirectoryServer.deregisterMonitorProvider(monitor); } } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " " + baseDN + " " + serverId; } } opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -69,7 +69,7 @@ * should read the list of replication servers from the configuration, * instantiate a {@link ServerState} then start the publish service * by calling * {@link #startPublishService(Collection, int, long, long)}. * {@link #startPublishService(Set, int, long, long)}. * At this point it can start calling the {@link #publish(UpdateMsg)} * method if needed. * <p> @@ -3675,4 +3675,11 @@ { return state.getCSN(serverID); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -27,6 +27,7 @@ */ package org.opends.server.replication; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -375,14 +376,12 @@ // Check that the modify has been replayed. found = checkEntryHasAttribute(personWithUUIDEntry.getDN(), "description", "Description was changed", 10000, true); "description", "Description was changed", 10000, true); assertTrue(found, "The second modification was not replayed."); // Delete the entries to clean the database. DeleteMsg delMsg = new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID); broker.publish(delMsg); broker.publish( new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID)); assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false), "The DELETE replication message was not replayed"); } @@ -552,16 +551,12 @@ * Open a session to the replicationServer using the ReplicationServer broker API. * This must use a serverId different from the LDAP server ID */ final int serverId = 2; ReplicationBroker broker = openReplicationSession(baseDN, 2, 100, replServerPort, 1000, true); openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true); try { /* * Create a CSN generator to generate new CSNs when we need to send * operations messages to the replicationServer. */ CSNGenerator gen = new CSNGenerator(2, 0); CSNGenerator gen = new CSNGenerator(serverId, 0); /* * Test that the conflict resolution code is able to find entries @@ -658,11 +653,10 @@ */ // send a delete operation with a wrong dn but the unique ID of the entry // used above DN delDN = DN.decode("cn=anotherdn," + baseDN); DeleteMsg delMsg = new DeleteMsg(delDN, gen.newCSN(), user1entryUUID); updateMonitorCount(baseDN, resolvedMonitorAttr); alertCount = DummyAlertHandler.getAlertCount(); broker.publish(delMsg); DN delDN = DN.decode("cn=anotherdn," + baseDN); broker.publish(new DeleteMsg(delDN, gen.newCSN(), user1entryUUID)); // check that the delete operation has been applied assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false), @@ -739,10 +733,10 @@ * To achieve this send a delete operation with a correct DN * but a wrong unique ID. */ delMsg = new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab"); updateMonitorCount(baseDN, resolvedMonitorAttr); alertCount = DummyAlertHandler.getAlertCount(); broker.publish(delMsg); broker.publish( new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab")); // check that the delete operation has not been applied assertNotNull(getEntry(newPersonDN, 10000, true), @@ -824,19 +818,13 @@ // delete the entries to clean the database DN delDN2 = DN.decode("entryUUID = " + user1entrysecondUUID + "+" + user1dn.getRDN() + "," + baseDN); delMsg = new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID); broker.publish(delMsg); // check that the delete operation has been applied DN delDN2 = DN.decode( "entryUUID = " + user1entrysecondUUID + "+" + user1dn.getRDN() + "," + baseDN); broker.publish(new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID)); assertNull(getEntry(delDN2, 10000, false), "The DELETE replication message was not replayed"); delMsg = new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID); broker.publish(delMsg); // check that the delete operation has been applied broker.publish(new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID)); assertNull(getEntry(reallyNewDN, 10000, false), "The DELETE replication message was not replayed"); @@ -930,8 +918,7 @@ alertCount = DummyAlertHandler.getAlertCount(); // delete domain1 delMsg = new DeleteMsg(domain1dn, olderCSN, domain1uid); broker.publish(delMsg); broker.publish(new DeleteMsg(domain1dn, olderCSN, domain1uid)); // check that the domain1 has correctly been deleted assertNull(getEntry(domain1dn, 10000, false), @@ -975,8 +962,7 @@ alertCount = DummyAlertHandler.getAlertCount(); // delete domain1 delMsg = new DeleteMsg(domain1dn, gen.newCSN(), domain1uid); broker.publish(delMsg); broker.publish(new DeleteMsg(domain1dn, gen.newCSN(), domain1uid)); // check that the domain1 has correctly been deleted assertNull(getEntry(domain1dn, 10000, false), @@ -1131,11 +1117,11 @@ // Cleanup from previous run cleanupTest(); final int serverId = 27; ReplicationBroker broker = openReplicationSession(baseDN, 27, 100, replServerPort, 2000, true); openReplicationSession(baseDN, serverId, 100, replServerPort, 2000, true); try { CSNGenerator gen = new CSNGenerator( 27, 0); CSNGenerator gen = new CSNGenerator(serverId, 0); /* * Test that operations done on this server are sent to the @@ -1303,12 +1289,12 @@ logError(Message.raw(Category.SYNC, Severity.INFORMATION, "Starting replication test : infiniteReplayLoop")); Thread.sleep(2000); int serverId = 11; ReplicationBroker broker = openReplicationSession(baseDN, 11, 100, replServerPort, 1000, true); openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true); try { CSNGenerator gen = new CSNGenerator( 11, 0); CSNGenerator gen = new CSNGenerator(serverId, 0); // Create a test entry. Entry tmp = TestCaseUtils.entryFromLdifString( @@ -1400,7 +1386,6 @@ public void csnGeneratorAdjust() throws Exception { testSetUp("csnGeneratorAdjust"); int serverId = 88; logError(Message.raw(Category.SYNC, Severity.INFORMATION, "Starting synchronization test : CSNGeneratorAdjust")); @@ -1408,16 +1393,13 @@ * Open a session to the replicationServer using the broker API. * This must use a different serverId to that of the directory server. */ final int serverId = 88; ReplicationBroker broker = openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true); consumeAllMessages(broker); // clean leftover messages from lostHeartbeatFailover() try { /* * Create a CSN generator to generate new CSNs * when we need to send operation messages to the replicationServer. */ long inTheFuture = System.currentTimeMillis() + (3600 * 1000); final long inTheFuture = System.currentTimeMillis() + (3600 * 1000); CSNGenerator gen = new CSNGenerator(serverId, inTheFuture); // Create and publish an update message to add an entry. @@ -1453,4 +1435,31 @@ broker.stop(); } } /** * Consumes all the messages sent to this broker. This is useful at the start * of a test to avoid leftover messages from previous test runs. */ private void consumeAllMessages(ReplicationBroker broker) { final List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>(); try { while (true) { msgs.add(broker.receive()); } } catch (SocketTimeoutException expectedAtSomeStage) { // this is expected to happen when there will not be any more messages to // consume from the socket } if (!msgs.isEmpty()) { logError(Message.raw(Category.SYNC, Severity.SEVERE_ERROR, "Leftover messages from previous test runs " + msgs)); } } }