mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
30.30.2008 7ee84fd2ff21c5e25b9a234f974c08b49070fcaf
Fix for #3543: Replication protocol incompatibility between v1 and v2: cannot upgrade a running replicated topology from v1 to v2
8 files modified
456 ■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java 6 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 5 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java 6 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java 2 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java 93 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java 282 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java 9 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 53 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -350,15 +350,13 @@
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Using current protocol version should normally not be done as we would
    // normally call the getBytes() method instead for that. So this check
    // for security
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
    }
    // Supported older protocol versions
    switch (reqProtocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -101,10 +101,7 @@
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Using current protocol version should normally not be done as we would
    // normally call the getBytes() method instead for that. So this check
    // for security
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -348,15 +348,13 @@
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Using current protocol version should normally not be done as we would
    // normally call the getBytes() method instead for that. So this check
    // for security
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
    }
    // Supported older protocol versions
    switch (reqProtocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -50,6 +50,8 @@
  public ReplicationData(UpdateMsg change)
         throws UnsupportedEncodingException
  {
    // Always keep messages in the replication DB with the current protocol
    // version
    this.setData(change.getBytes());
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -728,7 +728,7 @@
                  // if the 2 RS have different generationID
                  if (replicationServerDomain.getGenerationIdSavedStatus())
                  {
                    // it the present RS has received changes regarding its
                    // if the present RS has received changes regarding its
                    //     gen ID and so won't change without a reset
                    // then  we are just degrading the peer.
                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
@@ -925,6 +925,97 @@
            replicationServerDomain.release();
          return;
        }
      } else
      {
        // Terminate connection from a V1 RS
        // if the remote RS and the local RS have the same genID
        // then it's ok and nothing else to do
        if (generationId == localGenerationId)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS V1 with serverID=" + serverId +
              " is connected with the right generation ID");
          }
        } else
        {
          if (localGenerationId > 0)
          {
            // if the local RS is initialized
            if (generationId > 0)
            {
              // if the remote RS is initialized
              if (generationId != localGenerationId)
              {
                // if the 2 RS have different generationID
                if (replicationServerDomain.getGenerationIdSavedStatus())
                {
                  // if the present RS has received changes regarding its
                  //     gen ID and so won't change without a reset
                  // then  we are just degrading the peer.
                  Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                    this.baseDn.toNormalizedString(),
                    Short.toString(serverId),
                    Long.toString(generationId),
                    Long.toString(localGenerationId));
                  logError(message);
                } else
                {
                  // The present RS has never received changes regarding its
                  // gen ID.
                  //
                  // Example case:
                  // - we are in RS1
                  // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
                  // - RS1 has genId1 from LS1 /genId1 comes from data in
                  //   suffix
                  // - we are in RS1 and we receive a START msg from RS2
                  // - Each RS keeps its genID / is degraded and when LS2
                  //   will be populated from LS1 everything will become ok.
                  //
                  // Issue:
                  // FIXME : Would it be a good idea in some cases to just
                  //         set the gen ID received from the peer RS
                  //         specially if the peer has a non null state and
                  //         we have a nul state ?
                  // replicationServerDomain.
                  // setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                    this.baseDn.toNormalizedString(),
                    Short.toString(serverId),
                    Long.toString(generationId),
                    Long.toString(localGenerationId));
                  logError(message);
                }
              }
            } else
            {
              // The remote RS has no genId. We don't change anything for the
              // current RS.
            }
          } else
          {
            // The local RS is not initialized - take the one received
            oldGenerationId =
              replicationServerDomain.setGenerationId(generationId, false);
          }
        }
        // Alright, connected with new incoming V1 RS: store handler.
        Map<Short, ServerHandler> connectedRSs =
          replicationServerDomain.getConnectedRSs();
        connectedRSs.put(serverId, this);
        // Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1
        // all the servers of the topology. We prefer not not send a TopologyMsg
        // for giving partial/false information to the V2 servers as for
        // instance we don't have the connected DS of the V1 RS...When the V1
        // RS will be upgraded in his turn, topo info will be sent and accurate.
        // That way, there is  no risk to have false/incomplete information in
        // other servers.
      }
      /*
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -123,50 +123,80 @@
    {
      while (true)
      {
        ReplicationMsg msg = session.receive();
        try
        {
          ReplicationMsg msg = session.receive();
        /*
        if (debugEnabled())
        {
        TRACER.debugInfo(
        "In RS " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        (handler.isReplicationServer()?" From RS ":" From LS")+
        " with serverId=" + serverId + " receives " + msg);
        }
         */
        if (msg instanceof AckMsg)
        {
          AckMsg ack = (AckMsg) msg;
          handler.checkWindow();
          replicationServerDomain.ack(ack, serverId);
        } else if (msg instanceof UpdateMsg)
        {
          boolean filtered = false;
          /* Ignore updates in some cases */
          if (handler.isLDAPserver())
          /*
          if (debugEnabled())
          {
            /**
             * Ignore updates from DS in bad BAD_GENID_STATUS or
             * FULL_UPDATE_STATUS
             *
             * The RSD lock should not be taken here as it is acceptable to have
             * a delay between the time the server has a wrong status and the
             * fact we detect it: the updates that succeed to pass during this
             * time will have no impact on remote server. But it is interesting
             * to not saturate uselessly the network if the updates are not
             * necessary so this check to stop sending updates is interesting
             * anyway. Not taking the RSD lock allows to have better
             * performances in normal mode (most of the time).
             */
            ServerStatus dsStatus = handler.getStatus();
            if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
              (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
          TRACER.debugInfo(
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" From RS ":" From LS")+
          " with serverId=" + serverId + " receives " + msg);
          }
           */
          if (msg instanceof AckMsg)
          {
            AckMsg ack = (AckMsg) msg;
            handler.checkWindow();
            replicationServerDomain.ack(ack, serverId);
          } else if (msg instanceof UpdateMsg)
          {
            boolean filtered = false;
            /* Ignore updates in some cases */
            if (handler.isLDAPserver())
            {
              /**
               * Ignore updates from DS in bad BAD_GENID_STATUS or
               * FULL_UPDATE_STATUS
               *
               * The RSD lock should not be taken here as it is acceptable to
               * have a delay between the time the server has a wrong status and
               * the fact we detect it: the updates that succeed to pass during
               * this time will have no impact on remote server. But it is
               * interesting to not saturate uselessly the network if the
               * updates are not necessary so this check to stop sending updates
               * is interesting anyway. Not taking the RSD lock allows to have
               * better performances in normal mode (most of the time).
               */
              ServerStatus dsStatus = handler.getStatus();
              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
              {
                long referenceGenerationId =
                  replicationServerDomain.getGenerationId();
                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                    Short.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn().toNormalizedString(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Short.toString(handler.getServerId()),
                    Long.toString(referenceGenerationId),
                    Long.toString(handler.getGenerationId())));
                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                    Short.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn().toNormalizedString(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Short.toString(handler.getServerId())));
                filtered = true;
              }
            } else
            {
              /**
               * Ignore updates from RS with bad gen id
               * (no system managed status for a RS)
               */
              long referenceGenerationId =
                replicationServerDomain.getGenerationId();
              if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
              if ((referenceGenerationId > 0) &&
                (referenceGenerationId != handler.getGenerationId()))
              {
                logError(ERR_IGNORING_UPDATE_FROM_RS.get(
                  Short.toString(replicationServerDomain.getReplicationServer().
                  getServerId()),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
@@ -174,103 +204,86 @@
                  Short.toString(handler.getServerId()),
                  Long.toString(referenceGenerationId),
                  Long.toString(handler.getGenerationId())));
              if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                  Short.toString(replicationServerDomain.getReplicationServer().
                  getServerId()),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
                  ((UpdateMsg) msg).getChangeNumber().toString(),
                  Short.toString(handler.getServerId())));
              filtered = true;
                filtered = true;
              }
            }
          } else
          {
            /**
             * Ignore updates from RS with bad gen id
             * (no system managed status for a RS)
             */
            long referenceGenerationId =
              replicationServerDomain.getGenerationId();
            if ((referenceGenerationId > 0) &&
              (referenceGenerationId != handler.getGenerationId()))
            {
              logError(ERR_IGNORING_UPDATE_FROM_RS.get(
                Short.toString(replicationServerDomain.getReplicationServer().
                getServerId()),
                replicationServerDomain.getBaseDn().toNormalizedString(),
                ((UpdateMsg) msg).getChangeNumber().toString(),
                Short.toString(handler.getServerId()),
                Long.toString(referenceGenerationId),
                Long.toString(handler.getGenerationId())));
              filtered = true;
            }
          }
          if (!filtered)
            if (!filtered)
            {
              UpdateMsg update = (UpdateMsg) msg;
              handler.decAndCheckWindow();
              replicationServerDomain.put(update, handler);
            }
          } else if (msg instanceof WindowMsg)
          {
            UpdateMsg update = (UpdateMsg) msg;
            handler.decAndCheckWindow();
            replicationServerDomain.put(update, handler);
            WindowMsg windowMsg = (WindowMsg) msg;
            handler.updateWindow(windowMsg);
          } else if (msg instanceof InitializeRequestMsg)
          {
            InitializeRequestMsg initializeMsg =
              (InitializeRequestMsg) msg;
            handler.process(initializeMsg);
          } else if (msg instanceof InitializeTargetMsg)
          {
            InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
            handler.process(initializeMsg);
          } else if (msg instanceof EntryMsg)
          {
            EntryMsg entryMsg = (EntryMsg) msg;
            handler.process(entryMsg);
          } else if (msg instanceof DoneMsg)
          {
            DoneMsg doneMsg = (DoneMsg) msg;
            handler.process(doneMsg);
          } else if (msg instanceof ErrorMsg)
          {
            ErrorMsg errorMsg = (ErrorMsg) msg;
            handler.process(errorMsg);
          } else if (msg instanceof ResetGenerationIdMsg)
          {
            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
            replicationServerDomain.resetGenerationId(handler, genIdMsg);
          } else if (msg instanceof WindowProbeMsg)
          {
            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
            handler.process(windowProbeMsg);
          } else if (msg instanceof TopologyMsg)
          {
            TopologyMsg topoMsg = (TopologyMsg) msg;
            replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
              handler, true);
          } else if (msg instanceof ChangeStatusMsg)
          {
            ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
            replicationServerDomain.processNewStatus(handler, csMsg);
          } else if (msg instanceof MonitorRequestMsg)
          {
            MonitorRequestMsg replServerMonitorRequestMsg =
              (MonitorRequestMsg) msg;
            handler.process(replServerMonitorRequestMsg);
          } else if (msg instanceof MonitorMsg)
          {
            MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
            handler.process(replServerMonitorMsg);
          } else if (msg == null)
          {
            /*
             * The remote server has sent an unknown message,
             * close the conenction.
             */
            Message message = NOTE_READER_NULL_MSG.get(handler.toString());
            logError(message);
            return;
          }
        } else if (msg instanceof WindowMsg)
        } catch (NotSupportedOldVersionPDUException e)
        {
          WindowMsg windowMsg = (WindowMsg) msg;
          handler.updateWindow(windowMsg);
        } else if (msg instanceof InitializeRequestMsg)
        {
          InitializeRequestMsg initializeMsg =
            (InitializeRequestMsg) msg;
          handler.process(initializeMsg);
        } else if (msg instanceof InitializeTargetMsg)
        {
          InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
          handler.process(initializeMsg);
        } else if (msg instanceof EntryMsg)
        {
          EntryMsg entryMsg = (EntryMsg) msg;
          handler.process(entryMsg);
        } else if (msg instanceof DoneMsg)
        {
          DoneMsg doneMsg = (DoneMsg) msg;
          handler.process(doneMsg);
        } else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg) msg;
          handler.process(errorMsg);
        } else if (msg instanceof ResetGenerationIdMsg)
        {
          ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
          replicationServerDomain.resetGenerationId(handler, genIdMsg);
        } else if (msg instanceof WindowProbeMsg)
        {
          WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
          handler.process(windowProbeMsg);
        } else if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg) msg;
          replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true);
        } else if (msg instanceof ChangeStatusMsg)
        {
          ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
          replicationServerDomain.processNewStatus(handler, csMsg);
        } else if (msg instanceof MonitorRequestMsg)
        {
          MonitorRequestMsg replServerMonitorRequestMsg =
            (MonitorRequestMsg) msg;
          handler.process(replServerMonitorRequestMsg);
        } else if (msg instanceof MonitorMsg)
        {
          MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
          handler.process(replServerMonitorMsg);
        } else if (msg == null)
        {
          /*
           * The remote server has sent an unknown message,
           * close the conenction.
           */
          Message message = NOTE_READER_NULL_MSG.get(handler.toString());
          logError(message);
          return;
          // Received a V1 PDU we do not need to support:
          // we just trash the message and log the event for debug purpose,
          // then continue receiving messages.
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServerDomain.
              getReplicationServer().
              getMonitorInstanceName() + ":" + e.getMessage());
        }
      }
    } catch (IOException e)
@@ -304,13 +317,6 @@
       */
      Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
      logError(message);
    } catch (NotSupportedOldVersionPDUException e)
    {
      // Received a V1 PDU we do not need to support:
      // we just trash the message and log the event for debug purpose
      if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() + ":" + e.getMessage());
    } catch (Exception e)
    {
      if (debugEnabled())
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -191,14 +191,7 @@
        // Publish the update to the remote server using a protocol version he
        // it supports
        short pduProtocolVersion = update.getVersion();
        if (protocolVersion < pduProtocolVersion)
        { // The remote server wants a lower protocol version than the PDU one,
          // send it the PDU, serializing it with the supported older version
          session.publish(update, protocolVersion);
        } else {
          session.publish(update);
        }
        session.publish(update, protocolVersion);
      }
    }
    catch (NoSuchElementException e)
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -33,7 +33,6 @@
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -50,10 +49,8 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -353,56 +350,6 @@
    }
  }
  @Test(enabled=true)
  public void protocolVersion() throws Exception
  {
    logError(Message.raw(
        Category.SYNC, Severity.INFORMATION,
        "Starting Replication ProtocolWindowTest : protocolVersion"));
    ReplicationBroker broker = null;
    try
    {
      // Test : Make a broker degrade its version when connecting to an old
      // replication server.
      ProtocolVersion.resetCurrentVersion();
      broker = new ReplicationBroker(null,
        new ServerState(),
        baseDn,
        (short) 13, 0, 0, 0, 0, 1000, 0,
        ReplicationTestCase.getGenerationId(baseDn),
        getReplSessionSecurity(), (byte)1);
      // Check broker hard-coded version
      short pversion = broker.getProtocolVersion();
      assertEquals(pversion, ProtocolVersion.getCurrentVersion());
      // Connect the broker to the replication server
      ProtocolVersion.setCurrentVersion(ProtocolVersion.REPLICATION_PROTOCOL_V1);
      ArrayList<String> servers = new ArrayList<String>(1);
      servers.add("localhost:" + replServerPort);
      broker.start(servers);
      TestCaseUtils.sleep(3000); // wait for connection established
      // Check broker negociated version
      pversion = broker.getProtocolVersion();
      assertEquals(pversion, ProtocolVersion.REPLICATION_PROTOCOL_V1);
      logError(Message.raw(
        Category.SYNC, Severity.INFORMATION,
        "Ending Replication ProtocolWindowTest : protocolVersion"));
    } finally
    {
      if (broker != null)
        broker.stop();
      ProtocolVersion.resetCurrentVersion();
    }
  }
  /**
   * Clean up the environment.
   *