opends/src/server/org/opends/server/protocols/LDIFConnectionHandler.java
@@ -110,6 +110,9 @@ // The thread used to run the connection handler. private Thread connectionHandlerThread; // Help to not warn permanently and fullfill the log file // in debug mode. private boolean alreadyWarn = false; /** @@ -123,6 +126,7 @@ isStopped = true; stopRequested = false; connectionHandlerThread = null; alreadyWarn = false; } @@ -288,11 +292,12 @@ } else { if (debugEnabled()) if (!alreadyWarn && debugEnabled()) { TRACER.debugInfo("LDIF connection handler directory " + dir.getAbsolutePath() + "doesn't exist or isn't a file"); alreadyWarn = true; } } opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -60,7 +60,9 @@ */ public ListenerThread(ReplicationDomain listener) { super("Replication Listener thread"); super("Replication Listener thread " + "serverID=" + listener.serverId + " domain=" + listener.getName()); this.listener = listener; } opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -203,8 +203,6 @@ private long generationId = -1; private boolean generationIdSavedStatus = false; private long rejectedGenerationId = -1; private boolean requestedResetSinceLastStart = false; /** * This object is used to store the list of update currently being @@ -901,29 +899,6 @@ ErrorMessage errorMsg = (ErrorMessage)msg; logError(ERR_ERROR_MSG_RECEIVED.get( errorMsg.getDetails())); if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId()) { TRACER.debugInfo("requestedResetSinceLastStart=" + requestedResetSinceLastStart + "rejectedGenerationId=" + rejectedGenerationId); if (requestedResetSinceLastStart && (rejectedGenerationId>0)) { // When the last generation presented was refused and we are // the 'reseter' server then restart automatically to become // the 'master' state.clear(); rejectedGenerationId = -1; requestedResetSinceLastStart = false; broker.stop(); broker.start(replicationServers); } } if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId()) { rejectedGenerationId = generationId; } } } else if (msg instanceof UpdateMessage) @@ -2534,8 +2509,11 @@ */ public void resetGenerationId(Long generationIdNewValue) { if (debugEnabled()) TRACER.debugInfo( this.getName() + "resetGenerationId" + generationIdNewValue); ResetGenerationId genIdMessage = null; requestedResetSinceLastStart = true; if (generationIdNewValue == null) { genIdMessage = new ResetGenerationId(this.generationId); @@ -3267,7 +3245,6 @@ baseDN.toNormalizedString(), e.getLocalizedMessage())); } rejectedGenerationId = -1; if (debugEnabled()) TRACER.debugInfo( opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -1004,7 +1004,7 @@ { for (ServerHandler handler : connectedServers.values()) { handler.resetGenerationId(); handler.warnBadGenerationId(); } } @@ -1031,41 +1031,40 @@ " baseDN=" + baseDn + " RCache.resetGenerationId received new ref genId=" + newGenId); // Notifies the others LDAP servers that from now on // they have the bad generationId // Notifies the remote LDAP servers that from now on // they have a generationId different from the reference one for (ServerHandler handler : connectedServers.values()) { if (newGenId != handler.getGenerationId()) { handler.resetGenerationId(); handler.warnBadGenerationId(); } } // Propagates the reset message to the others replication servers // dealing with the same domain. if (senderHandler.isLDAPserver()) { for (ServerHandler handler : replicationServers.values()) // If we are the first replication server warned, // then forwards the reset message to the remote replication servers for (ServerHandler rsHandler : replicationServers.values()) { try { handler.sendGenerationId(genIdMsg); rsHandler.setGenerationId(newGenId); if (senderHandler.isLDAPserver()) { rsHandler.forwardGenerationIdToRS(genIdMsg); } } catch (IOException e) { logError(ERR_CHANGELOG_ERROR_SENDING_INFO. get(handler.getMonitorInstanceName())); } get(rsHandler.getMonitorInstanceName())); } } if (this.generationId != newGenId) { clearDbs(); // Reset the in memory domain generationId generationId = newGenId; } } /** opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -557,7 +557,9 @@ // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) { heartbeatThread = new HeartbeatThread("replication Heartbeat", heartbeatThread = new HeartbeatThread( "replication Heartbeat to " + serverURL + " for " + this.baseDn, session, heartbeatInterval); heartbeatThread.start(); } @@ -1738,7 +1740,7 @@ /** * Resets the generationId for this domain. */ public void resetGenerationId() public void warnBadGenerationId() { // Notify the peer that it is now invalid regarding the generationId // We are now waiting a startServer message from this server with @@ -1764,10 +1766,20 @@ * @throws IOException When it occurs while sending the message, * */ public void sendGenerationId(ResetGenerationId msg) public void forwardGenerationIdToRS(ResetGenerationId msg) throws IOException { generationId = msg.getGenerationId(); session.publish(msg); } /** * Set a new generation ID. * * @param generationId The new generation ID * */ public void setGenerationId(long generationId) { this.generationId = generationId; } } opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -108,7 +108,9 @@ // Ignore update to be sent to a replica with a bad generation ID long referenceGenerationId = replicationCache.getGenerationId(); if (referenceGenerationId != handler.getGenerationId()) if ((referenceGenerationId != handler.getGenerationId()) || (referenceGenerationId == -1) || (handler.getGenerationId() == -1)) { logError(ERR_IGNORING_UPDATE_TO.get( update.getDn(),