opends/src/quicksetup/org/opends/quicksetup/installer/Installer.java
@@ -3974,6 +3974,7 @@ ne), ne); } } resetGenerationId(ctx, suffixDn, true, sourceServerDisplay); } /** @@ -4023,4 +4024,138 @@ { return (random.nextInt() & modulo); } private void resetGenerationId(InitialLdapContext ctx, String suffixDn, boolean displayProgress, String sourceServerDisplay) throws ApplicationException { boolean taskCreated = false; int i = 1; boolean isOver = false; String dn = null; BasicAttributes attrs = new BasicAttributes(); Attribute oc = new BasicAttribute("objectclass"); oc.add("top"); oc.add("ds-task"); oc.add("ds-task-reset-generation-id"); attrs.put(oc); attrs.put("ds-task-class-name", "org.opends.server.tasks.SetGenerationIdTask"); attrs.put("ds-task-reset-generation-id-domain-base-dn", suffixDn); while (!taskCreated) { String id = "quicksetup-reset-generation-id-"+i; dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks"; attrs.put("ds-task-id", id); try { DirContext dirCtx = ctx.createSubcontext(dn, attrs); taskCreated = true; LOG.log(Level.INFO, "created task entry: "+attrs); dirCtx.close(); } catch (NameAlreadyBoundException x) { } catch (NamingException ne) { LOG.log(Level.SEVERE, "Error creating task "+attrs, ne); throw new ApplicationException( ReturnCode.APPLICATION_ERROR, getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get( sourceServerDisplay ), ne), ne); } i++; } // Wait until it is over SearchControls searchControls = new SearchControls(); searchControls.setCountLimit(1); searchControls.setSearchScope( SearchControls. OBJECT_SCOPE); String filter = "objectclass=*"; searchControls.setReturningAttributes( new String[] { "ds-task-log-message", "ds-task-state" }); Message lastDisplayedMsg = null; String lastLogMsg = null; long lastTimeMsgDisplayed = -1; while (!isOver) { try { Thread.sleep(500); } catch (Throwable t) { } try { NamingEnumeration res = ctx.search(dn, filter, searchControls); SearchResult sr = (SearchResult)res.next(); String logMsg = getFirstValue(sr, "ds-task-log-message"); if (logMsg != null) { if (!logMsg.equals(lastLogMsg)) { LOG.log(Level.INFO, logMsg); lastLogMsg = logMsg; } } InstallerHelper helper = new InstallerHelper(); String state = getFirstValue(sr, "ds-task-state"); if (helper.isDone(state) || helper.isStoppedByError(state)) { isOver = true; Message errorMsg; if (lastLogMsg == null) { errorMsg = INFO_ERROR_DURING_INITIALIZATION_NO_LOG.get( sourceServerDisplay, state, sourceServerDisplay); } else { errorMsg = INFO_ERROR_DURING_INITIALIZATION_LOG.get( sourceServerDisplay, lastLogMsg, state, sourceServerDisplay); } if (helper.isCompletedWithErrors(state)) { notifyListeners(getFormattedWarning(errorMsg)); } else if (!helper.isSuccessful(state) || helper.isStoppedByError(state)) { ApplicationException ae = new ApplicationException( ReturnCode.APPLICATION_ERROR, errorMsg, null); throw ae; } else if (displayProgress) { notifyListeners(getFormattedProgress( INFO_SUFFIX_INITIALIZED_SUCCESSFULLY.get())); } } } catch (NameNotFoundException x) { isOver = true; notifyListeners(getFormattedProgress( INFO_SUFFIX_INITIALIZED_SUCCESSFULLY.get())); } catch (NamingException ne) { throw new ApplicationException( ReturnCode.APPLICATION_ERROR, getThrowableMsg(INFO_ERROR_POOLING_INITIALIZATION.get( sourceServerDisplay), ne), ne); } } } } opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -122,7 +122,7 @@ */ private boolean connectionError = false; private Object connectPhaseLock = new Object(); private final Object connectPhaseLock = new Object(); /** * Creates a new ReplicationServer Broker for a particular ReplicationDomain. opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2415,7 +2415,7 @@ public void resetGenerationId() { requestedResetSinceLastStart = true; ResetGenerationId genIdMessage = new ResetGenerationId(); ResetGenerationId genIdMessage = new ResetGenerationId(this.generationId); broker.publish(genIdMessage); } opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
@@ -26,7 +26,10 @@ */ package org.opends.server.replication.protocol; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; @@ -38,13 +41,15 @@ Serializable { private static final long serialVersionUID = 7657049716115572226L; private long generationId; /** * Creates a new message. * @param generationId The new reference value of the generationID. */ public ResetGenerationId() public ResetGenerationId(long generationId) { this.generationId = generationId; } /** @@ -57,9 +62,24 @@ */ public ResetGenerationId(byte[] in) throws DataFormatException { if (in[0] != MSG_TYPE_RESET_GENERATION_ID) throw new DataFormatException("input is not a valid GenerationId Message"); try { if (in[0] != MSG_TYPE_RESET_GENERATION_ID) throw new DataFormatException("input is not a valid GenerationId Message"); int pos = 1; /* read the generationId */ int length = getNextLength(in, pos); generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); pos += length +1; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** @@ -68,11 +88,33 @@ @Override public byte[] getBytes() { int length = 1; byte[] resultByteArray = new byte[length]; try { ByteArrayOutputStream oStream = new ByteArrayOutputStream(); /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_RESET_GENERATION_ID; return resultByteArray; /* Put the message type */ oStream.write(MSG_TYPE_RESET_GENERATION_ID); // Put the generationId oStream.write(String.valueOf(generationId).getBytes("UTF-8")); oStream.write(0); return oStream.toByteArray(); } catch (IOException e) { // never happens return null; } } /** * Returns the generation Id set in this message. * @return the value of the generation ID. * */ public long getGenerationId() { return this.generationId; } } opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -959,19 +959,6 @@ } /** * Sets the replication server informations for the provided * handler from the provided ReplServerInfoMessage. * * @param handler The server handler from which the info was received. * @param infoMsg The information message that was received. */ public void setReplServerInfo( ServerHandler handler, ReplServerInfoMessage infoMsg) { handler.setReplServerInfo(infoMsg); } /** * Sets the provided value as the new in memory generationId. * * @param generationId The new value of generationId. @@ -1007,20 +994,27 @@ * * @param senderHandler The handler associated to the server * that requested to reset the generationId. * @param genIdMsg The reset generation ID msg received. */ public void resetGenerationId(ServerHandler senderHandler) public void resetGenerationId(ServerHandler senderHandler, ResetGenerationId genIdMsg) { long newGenId = genIdMsg.getGenerationId(); if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + " RCache.resetGenerationId"); " RCache.resetGenerationId received new ref genId=" + newGenId); // Notifies the others LDAP servers that from now on // they have the bad generationId for (ServerHandler handler : connectedServers.values()) { handler.resetGenerationId(); if (newGenId != handler.getGenerationId()) { handler.resetGenerationId(); } } // Propagates the reset message to the others replication servers @@ -1031,7 +1025,7 @@ { try { handler.sendGenerationId(new ResetGenerationId()); handler.sendGenerationId(genIdMsg); } catch (IOException e) { @@ -1041,45 +1035,48 @@ } } // Reset the localchange and state db for the current domain synchronized (sourceDbHandlers) if (this.generationId != newGenId) { for (DbHandler dbHandler : sourceDbHandlers.values()) // Reset the localchange and state db for the current domain synchronized (sourceDbHandlers) { try for (DbHandler dbHandler : sourceDbHandlers.values()) { dbHandler.clear(); try { dbHandler.clear(); } catch (Exception e) { // TODO: i18n logError(Message.raw( "Exception caught while clearing dbHandler:" + e.getLocalizedMessage())); } } catch (Exception e) { // TODO: i18n logError(Message.raw( "Exception caught while clearing dbHandler:" + e.getLocalizedMessage())); } sourceDbHandlers.clear(); if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + " The source db handler has been cleared"); } sourceDbHandlers.clear(); try { replicationServer.clearGenerationId(baseDn); } catch (Exception e) { // TODO: i18n logError(Message.raw( "Exception caught while clearing generationId:" + e.getLocalizedMessage())); } if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + " The source db handler has been cleared"); // Reset the in memory domain generationId generationId = newGenId; } try { replicationServer.clearGenerationId(baseDn); } catch (Exception e) { // TODO: i18n logError(Message.raw( "Exception caught while clearing generationId:" + e.getLocalizedMessage())); } // Reset the in memory domain generationId generationId = -1; } /** opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -130,6 +130,14 @@ // ID of the backend private static final String backendId = "replicationChanges"; // At startup, the listen thread wait on this flag for the connet // thread to look for other servers in the topology. // TODO when a replication server is out of date (has old changes // to receive from other servers, the listen thread should not accept // connection from ldap servers. (issue 1302) private boolean connectedInTopology = false; private final Object connectedInTopologyLock = new Object(); /** * The tracer object for the debug logger. */ @@ -211,6 +219,23 @@ void runListen() { Socket newSocket; // wait for the connect thread to find other replication // servers in the topology before starting to accept connections // from the ldap servers. synchronized (connectedInTopologyLock) { if (connectedInTopology == false) { try { connectedInTopologyLock.wait(1000); } catch (InterruptedException e) { } } } while ((shutdown == false) && (stopListen == false)) { // Wait on the replicationServer port. @@ -286,6 +311,15 @@ } } } synchronized (connectedInTopologyLock) { // wake up the listen thread if necessary. if (connectedInTopology == false) { connectedInTopologyLock.notify(); connectedInTopology = true; } } try { synchronized (this) @@ -391,7 +425,7 @@ // FIXME : Is it better to have the time to receive the ReplServerInfo // from all the other replication servers since this info is necessary // to route an early received total update request. try { Thread.sleep(300);} catch(Exception e) {} if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " creates listen threads"); @@ -798,6 +832,7 @@ // Add the replication backend DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null); } ldifImportConfig.close(); } catch(Exception e) { opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -1617,7 +1617,7 @@ * * @param infoMsg The information message. */ public void setReplServerInfo(ReplServerInfoMessage infoMsg) public void receiveReplServerInfo(ReplServerInfoMessage infoMsg) { if (debugEnabled()) TRACER.debugInfo("In " + replicationCache.getReplicationServer(). @@ -1749,6 +1749,7 @@ public void sendGenerationId(ResetGenerationId msg) throws IOException { generationId = msg.getGenerationId(); session.publish(msg); } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -182,7 +182,7 @@ else if (msg instanceof ResetGenerationId) { ResetGenerationId genIdMsg = (ResetGenerationId) msg; replicationCache.resetGenerationId(this.handler); replicationCache.resetGenerationId(this.handler, genIdMsg); } else if (msg instanceof WindowProbe) { @@ -192,7 +192,7 @@ else if (msg instanceof ReplServerInfoMessage) { ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg; handler.setReplServerInfo(infoMsg); handler.receiveReplServerInfo(infoMsg); if (debugEnabled()) { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -719,7 +719,6 @@ long genId; replServer1 = createReplicationServer(changelog1ID, false, testCase); assertEquals(replServer1.getGenerationId(baseDn), -1); /* * Test : empty replicated backend @@ -892,6 +891,13 @@ * Test: Reset the replication server in order to allow new data set. */ debugInfo("Launch an on-line import on DS."); genId=-1; Entry importTask = getTaskImport(); addTask(importTask, ResultCode.SUCCESS, null); waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(500); Entry taskReset = TestCaseUtils.makeEntry( "dn: ds-task-id=resetgenid"+genId+ UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", @@ -908,33 +914,17 @@ // TODO: Test that replication server db has been cleared assertEquals(replServer1.getGenerationId(baseDn), -1, "Expected genId to be reset in replServer1"); debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import."); genId = readGenId(); assertTrue(genId != -1, "DS is expected to have a new genID computed " + " after on-line import but genId=" + genId); ReplicationMessage rcvmsg = broker2.receive(); if (!(rcvmsg instanceof ErrorMessage)) { fail("Broker2 is expected to receive an ErrorMessage " + " to signal degradation due to reset" + rcvmsg); } ErrorMessage emsg = (ErrorMessage)rcvmsg; debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); rgenId = replServer1.getGenerationId(baseDn); assertEquals(genId, rgenId, "DS and replServer are expected to have same genId."); rcvmsg = broker3.receive(); if (!(rcvmsg instanceof ErrorMessage)) { fail("Broker3 is expected to receive an ErrorMessage " + " to signal degradation due to reset" + rcvmsg); } emsg = (ErrorMessage)rcvmsg; debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); rgenId = replServer1.getGenerationId(baseDn); assertTrue(rgenId==-1,"Expecting that genId has been reset in replServer1: rgenId="+rgenId); assertTrue(replServer1.getReplicationCache(baseDn, false). assertTrue(!replServer1.getReplicationCache(baseDn, false). isDegradedDueToGenerationId(server1ID), "Expecting that DS is degraded since domain genId has been reset"); "Expecting that DS is not degraded since domain genId has been reset"); assertTrue(replServer1.getReplicationCache(baseDn, false). isDegradedDueToGenerationId(server2ID), @@ -946,8 +936,39 @@ // Now create a change that normally would be replicated // but will not be replicated here since DS and brokers are degraded String[] ent2 = { createEntry(UUID.randomUUID()) }; this.addTestEntriesToDB(ent2); String[] ent3 = { createEntry(UUID.randomUUID()) }; this.addTestEntriesToDB(ent3); try { ReplicationMessage msg = broker2.receive(); if (!(msg instanceof ErrorMessage)) { fail("Broker 2 connection is expected to receive an ErrorMessage." + msg); } ErrorMessage emsg = (ErrorMessage)msg; debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); } catch(SocketTimeoutException se) { fail("Broker 2 is expected to receive an ErrorMessage."); } try { ReplicationMessage msg = broker3.receive(); if (!(msg instanceof ErrorMessage)) { fail("Broker 3 connection is expected to receive an ErrorMessage." + msg); } ErrorMessage emsg = (ErrorMessage)msg; debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails()); } catch(SocketTimeoutException se) { fail("Broker 3 is expected to receive an ErrorMessage."); } try { @@ -969,23 +990,7 @@ ReplicationMessage msg = broker3.receive(); fail("No update message is supposed to be received by degraded broker3"+ msg); } catch(SocketTimeoutException e) { /* expected */ } debugInfo("Launch an on-line import on DS."); genId=-1; Entry importTask = getTaskImport(); addTask(importTask, ResultCode.SUCCESS, null); waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(500); debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import."); genId = readGenId(); assertTrue(genId != -1, "DS is expected to have a new genID computed " + " after on-line import but genId=" + genId); rgenId = replServer1.getGenerationId(baseDn); assertEquals(genId, rgenId, "DS and replServer are expected to have same genId."); // In S1 launch the total update to initialize S2 addTask(taskInitRemoteS2, ResultCode.SUCCESS, null); @@ -1008,7 +1013,7 @@ Thread.sleep(200); debugInfo("Verifying that replServer1 has been reset."); assertEquals(replServer1.getGenerationId(baseDn), -1); assertEquals(replServer1.getGenerationId(baseDn), rgenId); debugInfo("Disconnect DS from replServer1 (required in order to DEL entries)."); disconnectFromReplServer(changelog1ID); @@ -1163,9 +1168,9 @@ debugInfo("Verifying that all replservers genIds have been reset."); genId = readGenId(); assertEquals(replServer1.getGenerationId(baseDn), -1); assertEquals(replServer2.getGenerationId(baseDn), -1); assertEquals(replServer3.getGenerationId(baseDn), -1); assertEquals(replServer2.getGenerationId(baseDn), genId); assertEquals(replServer2.getGenerationId(baseDn), genId); assertEquals(replServer3.getGenerationId(baseDn), genId); debugInfo("Disconnect DS from replServer1 (required in order to DEL entries)."); disconnectFromReplServer(changelog1ID); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -48,6 +48,9 @@ import org.opends.server.backends.task.TaskState; import org.opends.server.core.ModifyDNOperationBasis; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; @@ -75,6 +78,7 @@ import org.opends.server.types.ModificationType; import org.opends.server.types.RDN; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchScope; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; import org.testng.annotations.AfterClass; @@ -1198,4 +1202,4 @@ catch(Exception e) {}; return l; } } }