opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -350,15 +350,13 @@ public byte[] getBytes(short reqProtocolVersion) throws UnsupportedEncodingException { // Using current protocol version should normally not be done as we would // normally call the getBytes() method instead for that. So this check // for security // Of course, always support current protocol version if (reqProtocolVersion == ProtocolVersion.getCurrentVersion()) { return getBytes(); } // Supported older protocol versions switch (reqProtocolVersion) { case ProtocolVersion.REPLICATION_PROTOCOL_V1: opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -101,10 +101,7 @@ public byte[] getBytes(short reqProtocolVersion) throws UnsupportedEncodingException { // Using current protocol version should normally not be done as we would // normally call the getBytes() method instead for that. So this check // for security // Of course, always support current protocol version if (reqProtocolVersion == ProtocolVersion.getCurrentVersion()) { return getBytes(); opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -348,15 +348,13 @@ public byte[] getBytes(short reqProtocolVersion) throws UnsupportedEncodingException { // Using current protocol version should normally not be done as we would // normally call the getBytes() method instead for that. So this check // for security // Of course, always support current protocol version if (reqProtocolVersion == ProtocolVersion.getCurrentVersion()) { return getBytes(); } // Supported older protocol versions switch (reqProtocolVersion) { case ProtocolVersion.REPLICATION_PROTOCOL_V1: opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -50,6 +50,8 @@ public ReplicationData(UpdateMsg change) throws UnsupportedEncodingException { // Always keep messages in the replication DB with the current protocol // version this.setData(change.getBytes()); } opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -728,7 +728,7 @@ // if the 2 RS have different generationID if (replicationServerDomain.getGenerationIdSavedStatus()) { // it the present RS has received changes regarding its // if the present RS has received changes regarding its // gen ID and so won't change without a reset // then we are just degrading the peer. Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( @@ -925,6 +925,97 @@ replicationServerDomain.release(); return; } } else { // Terminate connection from a V1 RS // if the remote RS and the local RS have the same genID // then it's ok and nothing else to do if (generationId == localGenerationId) { if (debugEnabled()) { TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " RS V1 with serverID=" + serverId + " is connected with the right generation ID"); } } else { if (localGenerationId > 0) { // if the local RS is initialized if (generationId > 0) { // if the remote RS is initialized if (generationId != localGenerationId) { // if the 2 RS have different generationID if (replicationServerDomain.getGenerationIdSavedStatus()) { // if the present RS has received changes regarding its // gen ID and so won't change without a reset // then we are just degrading the peer. Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( this.baseDn.toNormalizedString(), Short.toString(serverId), Long.toString(generationId), Long.toString(localGenerationId)); logError(message); } else { // The present RS has never received changes regarding its // gen ID. // // Example case: // - we are in RS1 // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) // - RS1 has genId1 from LS1 /genId1 comes from data in // suffix // - we are in RS1 and we receive a START msg from RS2 // - Each RS keeps its genID / is degraded and when LS2 // will be populated from LS1 everything will become ok. // // Issue: // FIXME : Would it be a good idea in some cases to just // set the gen ID received from the peer RS // specially if the peer has a non null state and // we have a nul state ? // replicationServerDomain. // setGenerationId(generationId, false); Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get( this.baseDn.toNormalizedString(), Short.toString(serverId), Long.toString(generationId), Long.toString(localGenerationId)); logError(message); } } } else { // The remote RS has no genId. We don't change anything for the // current RS. } } else { // The local RS is not initialized - take the one received oldGenerationId = replicationServerDomain.setGenerationId(generationId, false); } } // Alright, connected with new incoming V1 RS: store handler. Map<Short, ServerHandler> connectedRSs = replicationServerDomain.getConnectedRSs(); connectedRSs.put(serverId, this); // Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1 // all the servers of the topology. We prefer not not send a TopologyMsg // for giving partial/false information to the V2 servers as for // instance we don't have the connected DS of the V1 RS...When the V1 // RS will be upgraded in his turn, topo info will be sent and accurate. // That way, there is no risk to have false/incomplete information in // other servers. } /* opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -123,6 +123,8 @@ { while (true) { try { ReplicationMsg msg = session.receive(); /* @@ -150,14 +152,14 @@ * Ignore updates from DS in bad BAD_GENID_STATUS or * FULL_UPDATE_STATUS * * The RSD lock should not be taken here as it is acceptable to have * a delay between the time the server has a wrong status and the * fact we detect it: the updates that succeed to pass during this * time will have no impact on remote server. But it is interesting * to not saturate uselessly the network if the updates are not * necessary so this check to stop sending updates is interesting * anyway. Not taking the RSD lock allows to have better * performances in normal mode (most of the time). * The RSD lock should not be taken here as it is acceptable to * have a delay between the time the server has a wrong status and * the fact we detect it: the updates that succeed to pass during * this time will have no impact on remote server. But it is * interesting to not saturate uselessly the network if the * updates are not necessary so this check to stop sending updates * is interesting anyway. Not taking the RSD lock allows to have * better performances in normal mode (most of the time). */ ServerStatus dsStatus = handler.getStatus(); if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || @@ -167,8 +169,8 @@ replicationServerDomain.getGenerationId(); if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get( Short.toString(replicationServerDomain.getReplicationServer(). getServerId()), Short.toString(replicationServerDomain. getReplicationServer().getServerId()), replicationServerDomain.getBaseDn().toNormalizedString(), ((UpdateMsg) msg).getChangeNumber().toString(), Short.toString(handler.getServerId()), @@ -176,8 +178,8 @@ Long.toString(handler.getGenerationId()))); if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get( Short.toString(replicationServerDomain.getReplicationServer(). getServerId()), Short.toString(replicationServerDomain. getReplicationServer().getServerId()), replicationServerDomain.getBaseDn().toNormalizedString(), ((UpdateMsg) msg).getChangeNumber().toString(), Short.toString(handler.getServerId()))); @@ -248,7 +250,8 @@ } else if (msg instanceof TopologyMsg) { TopologyMsg topoMsg = (TopologyMsg) msg; replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true); replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true); } else if (msg instanceof ChangeStatusMsg) { ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; @@ -272,6 +275,16 @@ logError(message); return; } } catch (NotSupportedOldVersionPDUException e) { // Received a V1 PDU we do not need to support: // we just trash the message and log the event for debug purpose, // then continue receiving messages. if (debugEnabled()) TRACER.debugInfo("In " + replicationServerDomain. getReplicationServer(). getMonitorInstanceName() + ":" + e.getMessage()); } } } catch (IOException e) { @@ -304,13 +317,6 @@ */ Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString()); logError(message); } catch (NotSupportedOldVersionPDUException e) { // Received a V1 PDU we do not need to support: // we just trash the message and log the event for debug purpose if (debugEnabled()) TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + ":" + e.getMessage()); } catch (Exception e) { if (debugEnabled()) opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -191,14 +191,7 @@ // Publish the update to the remote server using a protocol version he // it supports short pduProtocolVersion = update.getVersion(); if (protocolVersion < pduProtocolVersion) { // The remote server wants a lower protocol version than the PDU one, // send it the PDU, serializing it with the supported older version session.publish(update, protocolVersion); } else { session.publish(update); } } } catch (NoSuchElementException e) opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -33,7 +33,6 @@ import java.net.ServerSocket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -50,10 +49,8 @@ import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; @@ -353,56 +350,6 @@ } } @Test(enabled=true) public void protocolVersion() throws Exception { logError(Message.raw( Category.SYNC, Severity.INFORMATION, "Starting Replication ProtocolWindowTest : protocolVersion")); ReplicationBroker broker = null; try { // Test : Make a broker degrade its version when connecting to an old // replication server. ProtocolVersion.resetCurrentVersion(); broker = new ReplicationBroker(null, new ServerState(), baseDn, (short) 13, 0, 0, 0, 0, 1000, 0, ReplicationTestCase.getGenerationId(baseDn), getReplSessionSecurity(), (byte)1); // Check broker hard-coded version short pversion = broker.getProtocolVersion(); assertEquals(pversion, ProtocolVersion.getCurrentVersion()); // Connect the broker to the replication server ProtocolVersion.setCurrentVersion(ProtocolVersion.REPLICATION_PROTOCOL_V1); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + replServerPort); broker.start(servers); TestCaseUtils.sleep(3000); // wait for connection established // Check broker negociated version pversion = broker.getProtocolVersion(); assertEquals(pversion, ProtocolVersion.REPLICATION_PROTOCOL_V1); logError(Message.raw( Category.SYNC, Severity.INFORMATION, "Ending Replication ProtocolWindowTest : protocolVersion")); } finally { if (broker != null) broker.stop(); ProtocolVersion.resetCurrentVersion(); } } /** * Clean up the environment. *