opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -29,7 +29,6 @@ import java.io.IOException; import java.util.*; import java.util.zip.DataFormatException; import org.opends.messages.Message; import org.opends.server.replication.common.*; @@ -39,6 +38,7 @@ import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.common.ServerStatus.*; import static org.opends.server.replication.common.StatusMachine.*; import static org.opends.server.replication.protocol.ProtocolVersion.*; @@ -92,76 +92,10 @@ */ public void changeStatusForResetGenId(long newGenId) throws IOException { final int localRsServerId = replicationServer.getServerId(); StatusMachineEvent event; if (newGenId == -1) StatusMachineEvent event = getStatusMachineEvent(newGenId); if (event == null) { // The generation id is being made invalid, let's put the DS // into BAD_GEN_ID_STATUS event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; } else { if (newGenId == generationId) { if (status == ServerStatus.BAD_GEN_ID_STATUS) { // This server has the good new reference generation id. // Close connection with him to force his reconnection: DS will // reconnect in NORMAL_STATUS or DEGRADED_STATUS. if (debugEnabled()) { TRACER.debugInfo( "In RS " + localRsServerId + ", closing connection to DS " + getServerId() + " for baseDn " + getBaseDN() + " to force reconnection as new local" + " generationId and remote one match and DS is in bad gen id: " + newGenId); } // Connection closure must not be done calling RSD.stopHandler() as it // would rewait the RSD lock that we already must have entering this // method. This would lead to a reentrant lock which we do not want. // So simply close the session, this will make the hang up appear // after the reader thread that took the RSD lock releases it. if (session != null // V4 protocol introduced a StopMsg to properly close the // connection between servers && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { try { session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } // NOT_CONNECTED_STATUS is the last one in RS session life: handler // will soon disappear after this method call... status = ServerStatus.NOT_CONNECTED_STATUS; return; } else { if (debugEnabled()) { TRACER.debugInfo("In RS " + localRsServerId + ". DS " + getServerId() + " for baseDn " + getBaseDN() + " has already generation id " + newGenId + " so no ChangeStatusMsg sent to him."); } return; } } else { // This server has a bad generation id compared to new reference one, // let's put it into BAD_GEN_ID_STATUS event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; } return; } if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT @@ -170,7 +104,7 @@ // Prevent useless error message (full update status cannot lead to bad // gen status) Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get( Integer.toString(localRsServerId), Integer.toString(replicationServer.getServerId()), getBaseDN(), Integer.toString(serverId), Long.toString(generationId), @@ -179,30 +113,73 @@ return; } ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); changeStatus(event, "for reset gen id"); } if (newStatus == ServerStatus.INVALID_STATUS) private StatusMachineEvent getStatusMachineEvent(long newGenId) { if (newGenId == -1) { Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(), Integer.toString(serverId), status.toString(), event.toString()); logError(msg); return; // The generation id is being made invalid, let's put the DS // into BAD_GEN_ID_STATUS return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; } if (newGenId != generationId) { // This server has a bad generation id compared to new reference one, // let's put it into BAD_GEN_ID_STATUS return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; } // Send message requesting to change the DS status ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, ServerStatus.INVALID_STATUS); if (status != ServerStatus.BAD_GEN_ID_STATUS) { if (debugEnabled()) { TRACER.debugInfo("In RS " + replicationServer.getServerId() + ", DS " + getServerId() + " for baseDn " + getBaseDN() + " has already generation id " + newGenId + " so no ChangeStatusMsg sent to him."); } return null; } // This server has the good new reference generation id. // Close connection with him to force his reconnection: DS will // reconnect in NORMAL_STATUS or DEGRADED_STATUS. if (debugEnabled()) { TRACER.debugInfo("In RS " + localRsServerId + " Sending change status for reset gen id to " + getServerId() + " for baseDn " + getBaseDN() + ":\n" + csMsg); TRACER.debugInfo("In RS " + replicationServer.getServerId() + ", closing connection to DS " + getServerId() + " for baseDn " + getBaseDN() + " to force reconnection as new local" + " generationId and remote one match and DS is in bad gen id: " + newGenId); } session.publish(csMsg); // Connection closure must not be done calling RSD.stopHandler() as it // would rewait the RSD lock that we already must have entering this // method. This would lead to a reentrant lock which we do not want. // So simply close the session, this will make the hang up appear // after the reader thread that took the RSD lock releases it. if (session != null // V4 protocol introduced a StopMsg to properly close the // connection between servers && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { try { session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } status = newStatus; // NOT_CONNECTED_STATUS is the last one in RS session life: handler // will soon disappear after this method call... status = ServerStatus.NOT_CONNECTED_STATUS; return null; } /** @@ -215,6 +192,12 @@ public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event) throws IOException { return changeStatus(event, "from status analyzer"); } private ServerStatus changeStatus(StatusMachineEvent event, String origin) throws IOException { // Check state machine allows this new status (Sanity check) ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); if (newStatus == ServerStatus.INVALID_STATUS) @@ -222,22 +205,20 @@ Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(), Integer.toString(serverId), status.toString(), event.toString()); logError(msg); // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS // and vice versa. We may are being trying to change the status while for // instance another status has just been entered: e.g a full update has // just been engaged. In that case, just ignore attempt to change the // status // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice // versa. We may be trying to change the status while another status has // just been entered: e.g a full update has just been engaged. // In that case, just ignore attempt to change the status return newStatus; } // Send message requesting to change the DS status ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, ServerStatus.INVALID_STATUS); ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS); if (debugEnabled()) { TRACER.debugInfo("In RS " + replicationServer.getServerId() + " Sending change status from status analyzer to " + getServerId() + " Sending change status " + origin + " to " + getServerId() + " for baseDn " + getBaseDN() + ":\n" + csMsg); } @@ -589,7 +570,7 @@ localGenerationId, sslEncryption, getLocalGroupId(), replicationServer.getDegradedStatusThreshold(), replicationServer.getWeight(), replicationServerDomain.getConnectedLDAPservers().size()); replicationServerDomain.getConnectedDSs().size()); } send(startMsg); @@ -626,16 +607,10 @@ * receiving a StopMsg to properly stop the handshake procedure. * @return the startSessionMsg received or null DS sent a stop message to * not finish the handshake. * @throws DirectoryException * @throws IOException * @throws ClassNotFoundException * @throws DataFormatException * @throws NotSupportedOldVersionPDUException * @throws Exception */ private StartSessionMsg waitAndProcessStartSessionFromRemoteDS() throws DirectoryException, IOException, ClassNotFoundException, DataFormatException, NotSupportedOldVersionPDUException throws Exception { ReplicationMsg msg = session.receive(); opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -92,7 +92,7 @@ * we are currently publishing the first update in the balanced tree is the * next change that we must push to this particular server. */ private final Map<Integer, DataServerHandler> directoryServers = private final Map<Integer, DataServerHandler> connectedDSs = new ConcurrentHashMap<Integer, DataServerHandler>(); /** @@ -101,7 +101,7 @@ * in the balanced tree is the next change that we must push to this * particular server. */ private final Map<Integer, ReplicationServerHandler> replicationServers = private final Map<Integer, ReplicationServerHandler> connectedRSs = new ConcurrentHashMap<Integer, ReplicationServerHandler>(); private final Queue<MessageHandler> otherHandlers = @@ -358,12 +358,10 @@ */ NotAssuredUpdateMsg notAssuredUpdate = null; /* * Push the message to the replication servers */ // Push the message to the replication servers if (sourceHandler.isDataServer()) { for (ReplicationServerHandler handler : replicationServers.values()) for (ReplicationServerHandler handler : connectedRSs.values()) { /** * Ignore updates to RS with bad gen id @@ -392,10 +390,8 @@ } } /* * Push the message to the LDAP servers */ for (DataServerHandler handler : directoryServers.values()) // Push the message to the LDAP servers for (DataServerHandler handler : connectedDSs.values()) { // Don't forward the change to the server that just sent it if (handler == sourceHandler) @@ -576,7 +572,7 @@ } // Look for DS eligible for assured for (DataServerHandler handler : directoryServers.values()) for (DataServerHandler handler : connectedDSs.values()) { // Don't forward the change to the server that just sent it if (handler == sourceHandler) @@ -735,7 +731,7 @@ private void collectRSsEligibleForAssuredReplication(byte groupId, List<Integer> expectedServers) { for (ReplicationServerHandler handler : replicationServers.values()) for (ReplicationServerHandler handler : connectedRSs.values()) { if (handler.getGroupId() == groupId // No ack expected from a RS with different group id @@ -908,9 +904,8 @@ List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers(); for (Integer serverId : serversInTimeout) { ServerHandler expectedDSInTimeout = directoryServers.get(serverId); ServerHandler expectedRSInTimeout = replicationServers.get(serverId); ServerHandler expectedDSInTimeout = connectedDSs.get(serverId); ServerHandler expectedRSInTimeout = connectedRSs.get(serverId); if (expectedDSInTimeout != null) { if (safeRead) @@ -950,7 +945,7 @@ */ public void stopReplicationServers(Collection<String> replServers) { for (ReplicationServerHandler handler : replicationServers.values()) for (ReplicationServerHandler handler : connectedRSs.values()) { if (replServers.contains(handler.getServerAddressURL())) { @@ -968,13 +963,13 @@ public void stopAllServers(boolean shutdown) { // Close session with other replication servers for (ReplicationServerHandler serverHandler : replicationServers.values()) for (ReplicationServerHandler serverHandler : connectedRSs.values()) { stopServer(serverHandler, shutdown); } // Close session with other LDAP servers for (DataServerHandler serverHandler : directoryServers.values()) for (DataServerHandler serverHandler : connectedDSs.values()) { stopServer(serverHandler, shutdown); } @@ -988,12 +983,12 @@ */ public boolean checkForDuplicateDS(DataServerHandler handler) { if (directoryServers.containsKey(handler.getServerId())) if (connectedDSs.containsKey(handler.getServerId())) { // looks like two connected LDAP servers have the same serverId Message message = ERR_DUPLICATE_SERVER_ID.get( localReplicationServer.getMonitorInstanceName(), directoryServers.get(handler.getServerId()).toString(), connectedDSs.get(handler.getServerId()).toString(), handler.toString(), handler.getServerId()); logError(message); return false; @@ -1047,7 +1042,7 @@ try { // Stop useless monitoring publisher if no more RS or DS in domain if ( (directoryServers.size() + replicationServers.size() )== 1) if ( (connectedDSs.size() + connectedRSs.size() )== 1) { if (debugEnabled()) { @@ -1062,7 +1057,7 @@ if (handler.isReplicationServer()) { if (replicationServers.containsKey(handler.getServerId())) if (connectedRSs.containsKey(handler.getServerId())) { unregisterServerHandler(handler); handler.shutdown(); @@ -1076,11 +1071,11 @@ buildAndSendTopoInfoToDSs(null); } } } else if (directoryServers.containsKey(handler.getServerId())) } else if (connectedDSs.containsKey(handler.getServerId())) { // If this is the last DS for the domain, // shutdown the status analyzer if (directoryServers.size() == 1) if (connectedDSs.size() == 1) { if (debugEnabled()) { @@ -1193,11 +1188,11 @@ { if (handler.isReplicationServer()) { replicationServers.remove(handler.getServerId()); connectedRSs.remove(handler.getServerId()); } else { directoryServers.remove(handler.getServerId()); connectedDSs.remove(handler.getServerId()); } } @@ -1224,9 +1219,9 @@ // topology and the generationId has never been saved, then we can reset // it and the next LDAP server to connect will become the new reference. boolean lDAPServersConnectedInTheTopology = false; if (directoryServers.isEmpty()) if (connectedDSs.isEmpty()) { for (ReplicationServerHandler rsh : replicationServers.values()) for (ReplicationServerHandler rsh : connectedRSs.values()) { if (generationId != rsh.getGenerationId()) { @@ -1283,7 +1278,7 @@ throws DirectoryException { ReplicationServerHandler oldHandler = replicationServers.get(handler.getServerId()); connectedRSs.get(handler.getServerId()); if (oldHandler != null) { if (oldHandler.getServerAddressURL().equals( @@ -1339,7 +1334,7 @@ public Set<String> getChangelogs() { Set<String> results = new LinkedHashSet<String>(); for (ReplicationServerHandler handler : replicationServers.values()) for (ReplicationServerHandler handler : connectedRSs.values()) { results.add(handler.getServerAddressURL()); } @@ -1358,22 +1353,6 @@ } /** * Returns as a set of String the list of LDAP servers connected to us. * Each string is the serverID of a connected LDAP server. * * @return The set of connected LDAP servers */ public List<String> getConnectedLDAPservers() { List<String> results = new ArrayList<String>(0); for (DataServerHandler handler : directoryServers.values()) { results.add(String.valueOf(handler.getServerId())); } return results; } /** * Creates and returns an iterator. * When the iterator is not used anymore, the caller MUST call the * ReplicationIterator.releaseCursor() method to free the resources @@ -1493,7 +1472,7 @@ { // Send to all replication servers with a least one remote // server connected for (ReplicationServerHandler rsh : replicationServers.values()) for (ReplicationServerHandler rsh : connectedRSs.values()) { if (rsh.hasRemoteLDAPServers()) { @@ -1503,7 +1482,7 @@ } // Sends to all connected LDAP servers for (DataServerHandler destinationHandler : directoryServers.values()) for (DataServerHandler destinationHandler : connectedDSs.values()) { // Don't loop on the sender if (destinationHandler == senderHandler) @@ -1516,7 +1495,7 @@ { // Destination is one server DataServerHandler destinationHandler = directoryServers.get(msg.getDestination()); connectedDSs.get(msg.getDestination()); if (destinationHandler != null) { servers.add(destinationHandler); @@ -1527,13 +1506,13 @@ // have the targeted server connected. if (senderHandler.isDataServer()) { for (ReplicationServerHandler h : replicationServers.values()) for (ReplicationServerHandler rsHandler : connectedRSs.values()) { // Send to all replication servers with a least one remote // server connected if (h.isRemoteLDAPServer(msg.getDestination())) if (rsHandler.isRemoteLDAPServer(msg.getDestination())) { servers.add(h); servers.add(rsHandler); } } } @@ -1808,14 +1787,14 @@ // from the states stored in the serverHandler. // - the server state // - the older missing change for (DataServerHandler lsh : this.directoryServers.values()) for (DataServerHandler lsh : this.connectedDSs.values()) { monitorMsg.setServerState(lsh.getServerId(), lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); } // Same for the connected RS for (ReplicationServerHandler rsh : this.replicationServers.values()) for (ReplicationServerHandler rsh : this.connectedRSs.values()) { monitorMsg.setServerState(rsh.getServerId(), rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); @@ -1891,7 +1870,7 @@ */ public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne) { for (DataServerHandler handler : directoryServers.values()) for (DataServerHandler handler : connectedDSs.values()) { if (notThisOne == null || handler != notThisOne) // All except passed one @@ -1932,7 +1911,7 @@ public void buildAndSendTopoInfoToRSs() { TopologyMsg topoMsg = createTopologyMsgForRS(); for (ReplicationServerHandler handler : replicationServers.values()) for (ReplicationServerHandler handler : connectedRSs.values()) { for (int i=1; i<=2; i++) { @@ -1976,7 +1955,7 @@ List<DSInfo> dsInfos = new ArrayList<DSInfo>(); // Go through every DSs for (DataServerHandler serverHandler : directoryServers.values()) for (DataServerHandler serverHandler : connectedDSs.values()) { dsInfos.add(serverHandler.toDSInfo()); } @@ -2001,7 +1980,7 @@ { // Go through every DSs (except recipient of msg) List<DSInfo> dsInfos = new ArrayList<DSInfo>(); for (DataServerHandler serverHandler : directoryServers.values()) for (DataServerHandler serverHandler : connectedDSs.values()) { if (serverHandler.getServerId() == destDsId) { @@ -2017,7 +1996,7 @@ // Go through every peer RSs (and get their connected DSs), also add info // for RSs for (ReplicationServerHandler serverHandler : replicationServers.values()) for (ReplicationServerHandler serverHandler : connectedRSs.values()) { rsInfos.add(serverHandler.toRSInfo()); @@ -2150,7 +2129,7 @@ // If we are the first replication server warned, // then forwards the reset message to the remote replication servers for (ServerHandler rsHandler : replicationServers.values()) for (ServerHandler rsHandler : connectedRSs.values()) { try { @@ -2170,7 +2149,7 @@ // Change status of the connected DSs according to the requested new // reference generation id for (DataServerHandler dsHandler : directoryServers.values()) for (DataServerHandler dsHandler : connectedDSs.values()) { try { @@ -2362,8 +2341,7 @@ // TODO: i18n MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e.getMessage() + " " + stackTraceToSingleLineString(e))); e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } } @@ -2400,10 +2378,10 @@ + " given local generation Id=" + this.generationId); } ServerHandler handler = replicationServers.get(serverId); ServerHandler handler = connectedRSs.get(serverId); if (handler == null) { handler = directoryServers.get(serverId); handler = connectedDSs.get(serverId); if (handler == null) { return false; @@ -2549,7 +2527,7 @@ initializePendingMonitorData(); // Send the monitor requests to the connected replication servers. for (ReplicationServerHandler rs : replicationServers.values()) for (ReplicationServerHandler rs : connectedRSs.values()) { // Add server ID to pending table. int serverId = rs.getServerId(); @@ -2657,13 +2635,12 @@ // So for a given DS connected we can take the state and the max from // the DS/state. for (ServerHandler ds : directoryServers.values()) for (ServerHandler ds : connectedDSs.values()) { int serverID = ds.getServerId(); // the state comes from the state stored in the SH ServerState dsState = ds.getServerState() .duplicate(); ServerState dsState = ds.getServerState().duplicate(); // the max CN sent by that LS also comes from the SH ChangeNumber maxcn = dsState.getChangeNumber(serverID); @@ -2725,46 +2702,44 @@ pendingMonitorData.setRSState(msg.getSenderID(), replServerState); // Store the remote LDAP servers states Iterator<Integer> lsidIterator = msg.ldapIterator(); while (lsidIterator.hasNext()) Iterator<Integer> dsServerIdIterator = msg.ldapIterator(); while (dsServerIdIterator.hasNext()) { int sid = lsidIterator.next(); ServerState dsServerState = msg.getLDAPServerState(sid); int dsServerId = dsServerIdIterator.next(); ServerState dsServerState = msg.getLDAPServerState(dsServerId); pendingMonitorData.setMaxCNs(dsServerState); pendingMonitorData.setLDAPServerState(sid, dsServerState); pendingMonitorData.setFirstMissingDate(sid, msg.getLDAPApproxFirstMissingDate(sid)); pendingMonitorData.setLDAPServerState(dsServerId, dsServerState); pendingMonitorData.setFirstMissingDate(dsServerId, msg.getLDAPApproxFirstMissingDate(dsServerId)); } // Process the latency reported by the remote RSi on its connections // to the other RSes Iterator<Integer> rsidIterator = msg.rsIterator(); while (rsidIterator.hasNext()) Iterator<Integer> rsServerIdIterator = msg.rsIterator(); while (rsServerIdIterator.hasNext()) { int rsid = rsidIterator.next(); if (rsid == localReplicationServer.getServerId()) int rsServerId = rsServerIdIterator.next(); long newFmd = msg.getRSApproxFirstMissingDate(rsServerId); if (rsServerId == localReplicationServer.getServerId()) { // this is the latency of the remote RSi regarding the current RS // let's update the fmd of my connected LS for (ServerHandler connectedlsh : directoryServers .values()) for (DataServerHandler connectedDS : connectedDSs.values()) { int connectedlsid = connectedlsh.getServerId(); Long newfmd = msg.getRSApproxFirstMissingDate(rsid); pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd); int connectedServerId = connectedDS.getServerId(); pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd); } } else { // this is the latency of the remote RSi regarding another RSj // let's update the latency of the LSes connected to RSj ReplicationServerHandler rsjHdr = replicationServers.get(rsid); ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId); if (rsjHdr != null) { for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds()) for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds()) { Long newfmd = msg.getRSApproxFirstMissingDate(rsid); pendingMonitorData.setFirstMissingDate(remotelsid, newfmd); pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd); } } } @@ -2773,8 +2748,8 @@ catch (RuntimeException e) { // FIXME: do we really expect these??? logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e .getMessage() + stackTraceToSingleLineString(e))); logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get( e.getMessage() + stackTraceToSingleLineString(e))); } finally { @@ -2808,7 +2783,7 @@ */ public Map<Integer, DataServerHandler> getConnectedDSs() { return directoryServers; return connectedDSs; } /** @@ -2817,7 +2792,7 @@ */ public Map<Integer, ReplicationServerHandler> getConnectedRSs() { return replicationServers; return connectedRSs; } @@ -3131,9 +3106,8 @@ result.update(mostRecentDbCN); } } catch (Exception e) { Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get( " " + stackTraceToSingleLineString(e)); logError(errMessage); logError(ERR_WRITER_UNEXPECTED_EXCEPTION .get(stackTraceToSingleLineString(e))); TRACER.debugCaught(DebugLogLevel.ERROR, e); } } @@ -3236,13 +3210,13 @@ private boolean isServerConnected(int serverId) { if (directoryServers.containsKey(serverId)) if (connectedDSs.containsKey(serverId)) { return true; } // not directly connected for (ReplicationServerHandler rsHandler : replicationServers.values()) for (ReplicationServerHandler rsHandler : connectedRSs.values()) { if (rsHandler.isRemoteLDAPServer(serverId)) { @@ -3283,7 +3257,7 @@ { // If we are the first replication server warned, // then forwards the message to the remote replication servers for (ReplicationServerHandler rsHandler : replicationServers.values()) for (ReplicationServerHandler rsHandler : connectedRSs.values()) { try { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -27,23 +27,17 @@ */ package org.opends.server.replication; import static org.opends.messages.ReplicationMessages.*; import static org.opends.messages.TaskMessages.*; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; import java.io.File; import java.net.SocketTimeoutException; import java.util.*; import org.assertj.core.api.Assertions; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.core.AddOperation; import org.opends.server.core.AddOperationBasis; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; @@ -63,6 +57,14 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.opends.messages.ReplicationMessages.*; import static org.opends.messages.TaskMessages.*; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; /** * Tests contained here: * @@ -202,7 +204,7 @@ "ds-task-initialize-replica-server-id: all"); } // Tests that entries have been written in the db /** Tests that entries have been written in the db */ private void testEntriesInDb() { log("TestEntriesInDb"); @@ -260,62 +262,19 @@ * @param expectedDone The expected number of entries to be processed. */ private void waitTaskCompleted(Entry taskEntry, TaskState expectedState, long expectedLeft, long expectedDone) long expectedLeft, long expectedDone) throws Exception { log("waitTaskCompleted " + taskEntry.toLDIFString()); try { // FIXME - Factorize with TasksTestCase // Wait until the task completes. int timeout = 2000; AttributeType completionTimeType = DirectoryServer.getAttributeType( ATTR_TASK_COMPLETION_TIME.toLowerCase()); SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; String completionTime = null; long startMillisecs = System.currentTimeMillis(); do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { // FIXME How is this possible? Must be issue 858. fail("Task entry was not returned from the search."); continue; } completionTime = resultEntry.getAttributeValue(completionTimeType, DirectoryStringSyntax.DECODER); if (completionTime == null) { if (System.currentTimeMillis() - startMillisecs > 1000*timeout) { break; } Thread.sleep(100); } } while (completionTime == null); if (completionTime == null) { fail("The task had not completed after " + timeout + " seconds."); } Entry resultEntry = getCompletionTime(taskEntry); // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); TaskState taskState = TaskState.fromString(stateString); assertEquals(taskState, expectedState, "The task completed in an unexpected state"); @@ -323,7 +282,7 @@ // Check that the task contains some log messages. AttributeType logMessagesType = DirectoryServer.getAttributeType( ATTR_TASK_LOG_MESSAGES.toLowerCase()); ArrayList<String> logMessages = new ArrayList<String>(); List<String> logMessages = new ArrayList<String>(); resultEntry.getAttributeValues(logMessagesType, DirectoryStringSyntax.DECODER, logMessages); @@ -333,88 +292,94 @@ fail("No log messages were written to the task entry on a failed task"); } try { // Check that the task state is as expected. taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true); stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); // Check that the task state is as expected. assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_LEFT, expectedLeft, "The number of entries to process is not correct."); assertEquals(Long.decode(stateString).longValue(),expectedLeft, "The number of entries to process is not correct."); // Check that the task state is as expected. taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true); stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); assertEquals(Long.decode(stateString).longValue(),expectedDone, "The number of entries processed is not correct."); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } // Check that the task state is as expected. assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_DONE, expectedDone, "The number of entries processed is not correct."); } catch(Exception e) } private Entry getCompletionTime(Entry taskEntry) throws Exception { // FIXME - Factorize with TasksTestCase // Wait until the task completes. int timeout = 2000; AttributeType completionTimeType = DirectoryServer.getAttributeType( ATTR_TASK_COMPLETION_TIME.toLowerCase()); SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); long startMillisecs = System.currentTimeMillis(); do { fail("Exception"+ e.getMessage()+e.getStackTrace()); InternalSearchOperation searchOperation = connection.processSearch( taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); Entry resultEntry = searchOperation.getSearchEntries().getFirst(); String completionTime = resultEntry.getAttributeValue( completionTimeType, DirectoryStringSyntax.DECODER); if (completionTime != null) { return resultEntry; } if (System.currentTimeMillis() - startMillisecs > 1000 * timeout) { fail("The task had not completed after " + timeout + " seconds."); } Thread.sleep(100); } while (true); } private void assertAttributeValue(Entry resultEntry, String lowerAttrName, long expected, String message) throws DirectoryException { AttributeType type = DirectoryServer.getAttributeType(lowerAttrName, true); String value = resultEntry.getAttributeValue(type, DirectoryStringSyntax.DECODER); assertEquals(Long.decode(value).longValue(), expected, message); } /** * Add to the current DB the entries necessary to the test. */ private void addTestEntriesToDB() private void addTestEntriesToDB() throws Exception { try for (String ldifEntry : updatedEntries) { for (String ldifEntry : updatedEntries) { Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry); addTestEntryToDB(entry); // They will be removed at the end of the test entryList.addLast(entry.getDN()); } log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB"); Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry); addTestEntryToDB(entry); // They will be removed at the end of the test entryList.addLast(entry.getDN()); } catch(Exception e) { fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB"); } private void addTestEntryToDB(Entry entry) { try AddOperation addOp = new AddOperationBasis(connection, InternalClientConnection .nextOperationID(), InternalClientConnection.nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(), entry.getOperationalAttributes()); addOp.setInternalOperation(true); addOp.run(); if (addOp.getResultCode() != ResultCode.SUCCESS) { AddOperationBasis addOp = new AddOperationBasis(connection, InternalClientConnection.nextOperationID(), InternalClientConnection .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(), entry.getOperationalAttributes()); addOp.setInternalOperation(true); addOp.run(); if (addOp.getResultCode() != ResultCode.SUCCESS) { log("addEntry: Failed" + addOp.getResultCode()); } log("addEntry: Failed" + addOp.getResultCode()); } // They will be removed at the end of the test entryList.addLast(entry.getDN()); } catch(Exception e) { fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } // They will be removed at the end of the test entryList.addLast(entry.getDN()); } /* /** * Creates entries necessary to the test. */ private String[] newLDIFEntries(int entriesCnt) @@ -426,8 +391,6 @@ bigAttributeValue[i] = Integer.toString(i).charAt(0); String[] entries = new String[entriesCnt + 2]; String filler = "000000000000000000000000000000000000"; entries[0] = "dn: " + EXAMPLE_DN + "\n" + "objectClass: top\n" + "objectClass: domain\n" @@ -441,6 +404,7 @@ + "entryUUID: 21111111-1111-1111-1111-111111111112\n" + "\n"; String filler = "000000000000000000000000000000000000"; for (int i=0; i<entriesCnt; i++) { String useri="0000"+i; @@ -472,36 +436,25 @@ private void makeBrokerPublishEntries(ReplicationBroker broker, int senderID, int destinationServerID, int requestorID) { // Send entries try RoutableMsg initTargetMessage = new InitializeTargetMsg(EXAMPLE_DN, server2ID, destinationServerID, requestorID, updatedEntries.length, initWindow); broker.publish(initTargetMessage); int cnt = 0; for (String entry : updatedEntries) { RoutableMsg initTargetMessage = new InitializeTargetMsg( EXAMPLE_DN, server2ID, destinationServerID, requestorID, updatedEntries.length, initWindow); broker.publish(initTargetMessage); log("Broker will publish 1 entry: bytes:" + entry.length()); int cnt = 0; for (String entry : updatedEntries) { log("Broker will publish 1 entry: bytes:"+ entry.length()); EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID, entry.getBytes(), ++cnt); broker.publish(entryMsg); } DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID); broker.publish(doneMsg); log("Broker " + senderID + " published entries"); EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID, entry.getBytes(), ++cnt); broker.publish(entryMsg); } catch(Exception e) { fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID); broker.publish(doneMsg); log("Broker " + senderID + " published entries"); } void receiveUpdatedEntries(ReplicationBroker broker, int serverID, @@ -565,8 +518,7 @@ broker.setGenerationID(EMPTY_DN_GENID); broker.reStart(true); try { Thread.sleep(500); } catch(Exception e) {} sleep(500); } /** @@ -598,19 +550,18 @@ * @param changelogId The serverID of the replicationServer to create. * @return The new replicationServer. */ private ReplicationServer createChangelogServer(int changelogId, String testCase) private ReplicationServer createChangelogServer(int changelogId, String testCase) throws Exception { SortedSet<String> servers = new TreeSet<String>(); try { if (changelogId != changelog1ID) servers.add("localhost:" + getChangelogPort(changelog1ID)); if (changelogId != changelog2ID) servers.add("localhost:" + getChangelogPort(changelog2ID)); if (changelogId != changelog3ID) servers.add("localhost:" + getChangelogPort(changelog3ID)); if (changelogId != changelog1ID) servers.add("localhost:" + getChangelogPort(changelog1ID)); if (changelogId != changelog2ID) servers.add("localhost:" + getChangelogPort(changelog2ID)); if (changelogId != changelog3ID) servers.add("localhost:" + getChangelogPort(changelog3ID)); ReplServerFakeConfiguration conf = ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( getChangelogPort(changelogId), "initOnlineTest" + getChangelogPort(changelogId) + testCase + "Db", @@ -619,16 +570,10 @@ 0, 100, servers); ReplicationServer replicationServer = new ReplicationServer(conf); Thread.sleep(1000); ReplicationServer replicationServer = new ReplicationServer(conf); Thread.sleep(1000); return replicationServer; } catch (Exception e) { fail("createChangelog" + stackTraceToSingleLineString(e)); } return null; return replicationServer; } /** @@ -636,52 +581,42 @@ * replication Server ID. * @param changelogID */ private void connectServer1ToChangelog(int changelogID) private void connectServer1ToChangelog(int changelogID) throws Exception { connectServer1ToChangelog(changelogID, 0); } private void connectServer1ToChangelog(int changelogID, int heartbeat) private void connectServer1ToChangelog(int changelogID, int heartbeat) throws Exception { // Connect DS to the replicationServer try { // suffix synchronized String testName = "initOnLineTest"; String synchroServerLdif = "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-provider\n" + "objectClass: ds-cfg-replication-domain\n" + "cn: " + testName + "\n" + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n" + "ds-cfg-replication-server: localhost:" + getChangelogPort(changelogID)+"\n" + "ds-cfg-server-id: " + server1ID + "\n" + "ds-cfg-receive-status: true\n" + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"") + "ds-cfg-window-size: " + WINDOW_SIZE; // suffix synchronized String testName = "initOnLineTest"; String synchroServerLdif = "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-provider\n" + "objectClass: ds-cfg-replication-domain\n" + "cn: " + testName + "\n" + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n" + "ds-cfg-replication-server: localhost:" + getChangelogPort(changelogID)+"\n" + "ds-cfg-server-id: " + server1ID + "\n" + "ds-cfg-receive-status: true\n" + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"") + "ds-cfg-window-size: " + WINDOW_SIZE; // Clear the backend LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN); // Clear the backend LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN); synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized server"); configEntryList.add(synchroServerEntry.getDN()); configEntryList.add(synchroServerEntry.getDN()); replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn); replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn); assertTrue(!replDomain.ieRunning(), assertTrue(!replDomain.ieRunning(), "ReplicationDomain: Import/Export is not expected to be running"); } catch(Exception e) { log("connectServer1ToChangelog", e); fail("connectServer1ToChangelog", e); } } private int getChangelogPort(int changelogID) throws Exception @@ -748,10 +683,6 @@ testEntriesInDb(); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } finally { afterTest(testCase); @@ -967,10 +898,6 @@ testEntriesInDb(); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } finally { afterTest(testCase); @@ -1022,10 +949,6 @@ // createTask(taskInitTargetS2); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } finally { afterTest(testCase); @@ -1094,10 +1017,6 @@ // createTask(taskInitTargetS2); log("Successfully ending " + testCase); } catch(Exception e) { fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } finally { afterTest(testCase); @@ -1155,40 +1074,32 @@ // Check that the list of connected LDAP servers is correct // in each replication servers List<String> l1 = changelog1.getReplicationServerDomain( baseDn.toNormalizedString(), false). getConnectedLDAPservers(); assertEquals(l1.size(), 1); assertEquals(l1.get(0), String.valueOf(server1ID)); Set<Integer> l1 = changelog1.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); Assertions.assertThat(l1).containsExactly(server1ID); List<String> l2; l2 = changelog2.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedLDAPservers(); assertEquals(l2.size(), 2); assertTrue(l2.contains(String.valueOf(server2ID))); assertTrue(l2.contains(String.valueOf(server3ID))); Set<Integer> l2 = changelog2.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); Assertions.assertThat(l2).containsExactly(server2ID, server3ID); List<String> l3; l3 = changelog3.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedLDAPservers(); assertEquals(l3.size(), 0); Set<Integer> l3 = changelog3.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); Assertions.assertThat(l3).isEmpty(); // Test updates broker3.stop(); Thread.sleep(1000); l2 = changelog2.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedLDAPservers(); assertEquals(l2.size(), 1); assertEquals(l2.get(0), String.valueOf(server2ID)); l2 = changelog2.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); Assertions.assertThat(l2).containsExactly(server2ID); broker3 = openReplicationSession(DN.decode(EXAMPLE_DN), server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); broker2.stop(); Thread.sleep(1000); l2 = changelog2.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedLDAPservers(); assertEquals(l2.size(), 1); assertEquals(l2.get(0), String.valueOf(server3ID)); l2 = changelog2.getReplicationServerDomain( baseDn.toNormalizedString(), false).getConnectedDSs().keySet(); Assertions.assertThat(l2).containsExactly(server3ID); // TODO Test ReplicationServerDomain.getDestinationServers method. @@ -1262,10 +1173,6 @@ log("Successfully ending " + testCase); } catch(Exception e) { log(testCase + e.getLocalizedMessage()); } finally { afterTest(testCase); @@ -1567,18 +1474,11 @@ // in those cases, loop for a while waiting for completion. for (int i = 0; i< 10; i++) { if (replDomain.ieRunning()) { try { Thread.sleep(500); } catch (InterruptedException e) { } } else if (!replDomain.ieRunning()) { break; } sleep(500); } assertTrue(!replDomain.ieRunning(), "ReplicationDomain: Import/Export is not expected to be running"); @@ -1591,14 +1491,14 @@ if (server2 != null) { server2.stop(); TestCaseUtils.sleep(100); // give some time to the broker to disconnect sleep(100); // give some time to the broker to disconnect // from the replicationServer. server2 = null; } if (server3 != null) { server3.stop(); TestCaseUtils.sleep(100); // give some time to the broker to disconnect sleep(100); // give some time to the broker to disconnect // from the replicationServer. server3 = null; } @@ -1639,7 +1539,7 @@ log("Successfully cleaned " + testCase); } /** /** * Clean up the environment. * * @throws Exception If the environment could not be set up.