mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
23.07.2014 ea96abeb1496bcdca8f86b09b63cb4b994e4f785
opendj3-server-dev/src/server/org/opends/server/replication/server/ServerReader.java
@@ -102,7 +102,8 @@
          {
            handler.checkWindow();
            handler.processAck((AckMsg) msg);
          } else if (msg instanceof UpdateMsg)
          }
          else if (msg instanceof UpdateMsg)
          {
            UpdateMsg updateMsg = (UpdateMsg) msg;
@@ -110,18 +111,17 @@
            // Ignore updates in some cases
            if (handler.isDataServer())
            {
              /**
              /*
               * Ignore updates from DS in bad BAD_GENID_STATUS or
               * FULL_UPDATE_STATUS
               *
               * The RSD lock should not be taken here as it is acceptable to
               * have a delay between the time the server has a wrong status and
               * the fact we detect it: the updates that succeed to pass during
               * this time will have no impact on remote server. But it is
               * interesting to not saturate uselessly the network if the
               * updates are not necessary so this check to stop sending updates
               * is interesting anyway. Not taking the RSD lock allows to have
               * better performances in normal mode (most of the time).
               * FULL_UPDATE_STATUS The RSD lock should not be taken here as it
               * is acceptable to have a delay between the time the server has a
               * wrong status and the fact we detect it: the updates that
               * succeed to pass during this time will have no impact on remote
               * server. But it is interesting to not saturate uselessly the
               * network if the updates are not necessary so this check to stop
               * sending updates is interesting anyway. Not taking the RSD lock
               * allows to have better performances in normal mode (most of the
               * time).
               */
              ServerStatus dsStatus = handler.getStatus();
              if (dsStatus == BAD_GEN_ID_STATUS
@@ -129,33 +129,38 @@
              {
                long referenceGenerationId = handler.getReferenceGenId();
                if (dsStatus == BAD_GEN_ID_STATUS)
                  logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID, handler.getReplicationServerId(),
                      updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
                {
                  logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID,
                      handler.getReplicationServerId(), updateMsg.getCSN(),
                      handler.getBaseDN(), handler.getServerId(),
                      session.getReadableRemoteAddress(),
                      handler.getGenerationId(),
                      referenceGenerationId);
                      handler.getGenerationId(), referenceGenerationId);
                }
                if (dsStatus == FULL_UPDATE_STATUS)
                  logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP, handler.getReplicationServerId(),
                      updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
                {
                  logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP,
                      handler.getReplicationServerId(), updateMsg.getCSN(),
                      handler.getBaseDN(), handler.getServerId(),
                      session.getReadableRemoteAddress());
                }
                filtered = true;
              }
            } else
            }
            else
            {
              /**
               * Ignore updates from RS with bad gen id
               * (no system managed status for a RS)
              /*
               * Ignore updates from RS with bad gen id (no system managed
               * status for a RS)
               */
              long referenceGenerationId = handler.getReferenceGenId();
              if (referenceGenerationId > 0
                  && referenceGenerationId != handler.getGenerationId())
              {
                logger.error(WARN_IGNORING_UPDATE_FROM_RS,
                        handler.getReplicationServerId(),
                        updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
                        session.getReadableRemoteAddress(),
                        handler.getGenerationId(),
                        referenceGenerationId);
                    handler.getReplicationServerId(), updateMsg.getCSN(),
                    handler.getBaseDN(), handler.getServerId(),
                    session.getReadableRemoteAddress(),
                    handler.getGenerationId(), referenceGenerationId);
                filtered = true;
              }
            }
@@ -164,43 +169,53 @@
            {
              handler.put(updateMsg);
            }
          } else if (msg instanceof WindowMsg)
          }
          else if (msg instanceof WindowMsg)
          {
            handler.updateWindow((WindowMsg) msg);
          } else if (msg instanceof RoutableMsg)
          }
          else if (msg instanceof RoutableMsg)
          {
            handler.process((RoutableMsg) msg);
          } else if (msg instanceof ResetGenerationIdMsg)
          }
          else if (msg instanceof ResetGenerationIdMsg)
          {
            handler.processResetGenId((ResetGenerationIdMsg) msg);
          } else if (msg instanceof WindowProbeMsg)
          }
          else if (msg instanceof WindowProbeMsg)
          {
            handler.replyToWindowProbe();
          } else if (msg instanceof TopologyMsg)
          }
          else if (msg instanceof TopologyMsg)
          {
            ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
            ReplicationServerHandler rsh = (ReplicationServerHandler) handler;
            rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
          } else if (msg instanceof ChangeStatusMsg)
          }
          else if (msg instanceof ChangeStatusMsg)
          {
            ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
            try
            {
              DataServerHandler dsh = (DataServerHandler)handler;
              DataServerHandler dsh = (DataServerHandler) handler;
              dsh.receiveNewStatus(csMsg);
            }
            catch(Exception e)
            catch (Exception e)
            {
              errMessage = ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
                  handler.getBaseDN(), handler.getServerId(), csMsg);
              logger.error(errMessage);
            }
          } else if (msg instanceof ChangeTimeHeartbeatMsg)
          }
          else if (msg instanceof ChangeTimeHeartbeatMsg)
          {
            handler.process((ChangeTimeHeartbeatMsg) msg);
          } else if (msg instanceof StopMsg)
          }
          else if (msg instanceof StopMsg)
          {
            // Peer server is properly disconnecting: go out of here to
            // properly close the server handler going to finally block.
            /*
             * Peer server is properly disconnecting: go out of here to properly
             * close the server handler going to finally block.
             */
            if (logger.isTraceEnabled())
            {
              logger.trace(handler
@@ -208,7 +223,8 @@
                  + handler.getReplicationServerId());
            }
            return;
          } else if (msg == null)
          }
          else if (msg == null)
          {
            /*
             * The remote server has sent an unknown message, close the
@@ -218,11 +234,14 @@
            logger.info(errMessage);
            return;
          }
        } catch (NotSupportedOldVersionPDUException e)
        }
        catch (NotSupportedOldVersionPDUException e)
        {
          // Received a V1 PDU we do not need to support:
          // we just trash the message and log the event for debug purpose,
          // then continue receiving messages.
          /*
           * Received a V1 PDU we do not need to support: we just trash the
           * message and log the event for debug purpose, then continue
           * receiving messages.
           */
          logException(e);
        }
      }
@@ -230,9 +249,8 @@
    catch (SocketException e)
    {
      /*
       * The connection has been broken
       * Log a message and exit from this loop
       * So that this handler is stopped.
       * The connection has been broken Log a message and exit from this loop So
       * that this handler is stopped.
       */
      logException(e);
      if (!handler.shuttingDown())
@@ -244,10 +262,10 @@
    catch (Exception e)
    {
      /*
       * The remote server has sent an unknown message,
       * close the connection.
       * The remote server has sent an unknown message, close the connection.
       */
      errMessage = NOTE_READER_EXCEPTION.get(handler, stackTraceToSingleLineString(e));
      errMessage = NOTE_READER_EXCEPTION.get(handler,
          stackTraceToSingleLineString(e));
      logger.info(errMessage);
    }
    finally