mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.32.2013 c63e1f305327734be21f5ce0e21bdd2f7a4d143b
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -78,8 +78,7 @@
      if (debugEnabled())
        TRACER.debugInfo("In " +
          ((handler != null) ? handler.toString() : "Replication Server") +
          " closing session with err=" +
          providedMsg.toString());
          " closing session with err=" + providedMsg);
      logError(providedMsg);
    }
@@ -125,7 +124,7 @@
   */
  protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  /**
  // Number of updates received from the server in assured safe data mode.
   * Number of updates received from the server in assured safe data mode.
   */
  protected int assuredSdReceivedUpdates = 0;
  /**
@@ -169,7 +168,7 @@
  /**
   * The initial size of the sending window.
   */
  int sendWindowSize;
  protected int sendWindowSize;
  /**
   * remote generation id.
   */
@@ -185,7 +184,7 @@
  /**
   * Group id of this remote server.
   */
  protected byte groupId = (byte) -1;
  protected byte groupId = -1;
  /**
   * The SSL encryption after the negotiation with the peer.
   */
@@ -254,8 +253,7 @@
      closeSession(localSession, reason, this);
    }
    if ((replicationServerDomain != null) &&
        replicationServerDomain.hasLock())
    if (replicationServerDomain != null && replicationServerDomain.hasLock())
      replicationServerDomain.release();
    // If generation id of domain was changed, set it back to old value
@@ -263,10 +261,9 @@
    // peer server and the last topo message sent may have failed being
    // sent: in that case retrieve old value of generation id for
    // replication server domain
    if (oldGenerationId != -100)
    if (oldGenerationId != -100 && replicationServerDomain != null)
    {
      if (replicationServerDomain!=null)
        replicationServerDomain.changeGenerationId(oldGenerationId, false);
      replicationServerDomain.changeGenerationId(oldGenerationId, false);
    }
  }
@@ -304,7 +301,6 @@
  @Override
  public boolean engageShutdown()
  {
    // Use thread safe boolean
    return shuttingDown.getAndSet(true);
  }
@@ -340,13 +336,11 @@
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      writer = new ServerWriter(session, this,
          replicationServerDomain);
      writer = new ServerWriter(session, this, replicationServerDomain);
      reader = new ServerReader(session, this);
      session.setName("Replication server RS("
          + this.getReplicationServerId()
          + ") session thread to " + this.toString() + " at "
      session.setName("Replication server RS(" + getReplicationServerId()
          + ") session thread to " + this + " at "
          + session.getReadableRemoteAddress());
      session.start();
      try
@@ -366,9 +360,8 @@
      // Create a thread to send heartbeat messages.
      if (heartbeatInterval > 0)
      {
        String threadName = "Replication server RS("
            + this.getReplicationServerId()
            + ") heartbeat publisher to " + this.toString() + " at "
        String threadName = "Replication server RS(" + getReplicationServerId()
            + ") heartbeat publisher to " + this + " at "
            + session.getReadableRemoteAddress();
        heartbeatThread = new HeartbeatThread(threadName, session,
            heartbeatInterval / 3);
@@ -788,7 +781,7 @@
   */
  public boolean isReplicationServer()
  {
    return (!this.isDataServer());
    return !this.isDataServer();
  }
@@ -827,62 +820,58 @@
    // it will be created and locked later in the method
    if (!timedout)
    {
      // !timedout
      if (!replicationServerDomain.hasLock())
        replicationServerDomain.lock();
      return;
    }
    else
    /**
     * Take the lock on the domain.
     * WARNING: Here we try to acquire the lock with a timeout. This
     * is for preventing a deadlock that may happen if there are cross
     * connection attempts (for same domain) from this replication
     * server and from a peer one:
     * Here is the scenario:
     * - RS1 connect thread takes the domain lock and starts
     * connection to RS2
     * - at the same time RS2 connect thread takes his domain lock and
     * start connection to RS2
     * - RS2 listen thread starts processing received
     * ReplServerStartMsg from RS1 and wants to acquire the lock on
     * the domain (here) but cannot as RS2 connect thread already has
     * it
     * - RS1 listen thread starts processing received
     * ReplServerStartMsg from RS2 and wants to acquire the lock on
     * the domain (here) but cannot as RS1 connect thread already has
     * it
     * => Deadlock: 4 threads are locked.
     * So to prevent that in such situation, the listen threads here
     * will both timeout trying to acquire the lock. The random time
     * for the timeout should allow on connection attempt to be
     * aborted whereas the other one should have time to finish in the
     * same time.
     * Warning: the minimum time (3s) should be big enough to allow
     * normal situation connections to terminate. The added random
     * time should represent a big enough range so that the chance to
     * have one listen thread timing out a lot before the peer one is
     * great. When the first listen thread times out, the remote
     * connect thread should release the lock and allow the peer
     * listen thread to take the lock it was waiting for and process
     * the connection attempt.
     */
    Random random = new Random();
    int randomTime = random.nextInt(6); // Random from 0 to 5
    // Wait at least 3 seconds + (0 to 5 seconds)
    long timeout = 3000 + (randomTime * 1000);
    boolean lockAcquired = replicationServerDomain.tryLock(timeout);
    if (!lockAcquired)
    {
      // timedout
      /**
       * Take the lock on the domain.
       * WARNING: Here we try to acquire the lock with a timeout. This
       * is for preventing a deadlock that may happen if there are cross
       * connection attempts (for same domain) from this replication
       * server and from a peer one:
       * Here is the scenario:
       * - RS1 connect thread takes the domain lock and starts
       * connection to RS2
       * - at the same time RS2 connect thread takes his domain lock and
       * start connection to RS2
       * - RS2 listen thread starts processing received
       * ReplServerStartMsg from RS1 and wants to acquire the lock on
       * the domain (here) but cannot as RS2 connect thread already has
       * it
       * - RS1 listen thread starts processing received
       * ReplServerStartMsg from RS2 and wants to acquire the lock on
       * the domain (here) but cannot as RS1 connect thread already has
       * it
       * => Deadlock: 4 threads are locked.
       * So to prevent that in such situation, the listen threads here
       * will both timeout trying to acquire the lock. The random time
       * for the timeout should allow on connection attempt to be
       * aborted whereas the other one should have time to finish in the
       * same time.
       * Warning: the minimum time (3s) should be big enough to allow
       * normal situation connections to terminate. The added random
       * time should represent a big enough range so that the chance to
       * have one listen thread timing out a lot before the peer one is
       * great. When the first listen thread times out, the remote
       * connect thread should release the lock and allow the peer
       * listen thread to take the lock it was waiting for and process
       * the connection attempt.
       */
      Random random = new Random();
      int randomTime = random.nextInt(6); // Random from 0 to 5
      // Wait at least 3 seconds + (0 to 5 seconds)
      long timeout = 3000 + (randomTime * 1000);
      boolean noTimeout = replicationServerDomain.tryLock(timeout);
      if (!noTimeout)
      {
        // Timeout
        Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
            getBaseDN(),
            serverId,
            session.getReadableRemoteAddress(),
            getReplicationServerId());
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
          getBaseDN(),
          serverId,
          session.getReadableRemoteAddress(),
          getReplicationServerId());
      throw new DirectoryException(ResultCode.OTHER, message);
    }
  }
@@ -1011,9 +1000,7 @@
      session.close();
    }
    /*
     * Stop the heartbeat thread.
     */
    // Stop the heartbeat thread.
    if (heartbeatThread != null)
    {
      heartbeatThread.shutdown();
@@ -1028,12 +1015,11 @@
     */
    try
    {
      if ((writer != null) && (!(Thread.currentThread().equals(writer))))
      if (writer != null && !Thread.currentThread().equals(writer))
      {
        writer.join(SHUTDOWN_JOIN_TIMEOUT);
      }
      if ((reader != null) && (!(Thread.currentThread().equals(reader))))
      if (reader != null && !Thread.currentThread().equals(reader))
      {
        reader.join(SHUTDOWN_JOIN_TIMEOUT);
      }
@@ -1068,7 +1054,7 @@
      {
        // loop until not interrupted
      }
    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
    } while ((interrupted || !acquired) && !shutdownWriter);
    if (msg != null)
    {
      incrementOutCount();
@@ -1078,10 +1064,9 @@
        if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
        {
          incrementAssuredSrSentUpdates();
        } else
        } else if (!isDataServer())
        {
          if (!isDataServer())
            incrementAssuredSdSentUpdates();
          incrementAssuredSdSentUpdates();
        }
      }
    }
@@ -1094,22 +1079,10 @@
   */
  public RSInfo toRSInfo()
  {
    return new RSInfo(serverId, serverURL, generationId, groupId,
      weight);
    return new RSInfo(serverId, serverURL, generationId, groupId, weight);
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  protected void createMonitoringPublisher()
  {
    if (!replicationServerDomain.isRunningMonitoringPublisher())
    {
      replicationServerDomain.startMonitoringPublisher();
    }
  }
  /**
   * Update the send window size based on the credit specified in the
   * given window message.
   *
@@ -1132,11 +1105,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
        this.replicationServer.getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
        "\nAND REPLIED:\n" + outStartMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
          + "\nAND REPLIED:\n" + outStartMsg);
    }
  }
@@ -1151,12 +1123,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
        this.replicationServer.getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE SENT("+ this +
        "):\n" + outStartMsg.toString()+
        "\nAND RECEIVED:\n" + inStartMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
          + inStartMsg);
    }
  }
@@ -1171,11 +1141,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
          + outTopoMsg);
    }
  }
@@ -1190,11 +1159,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
          "\nAND RECEIVED:\n" + inTopoMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + ":"
          + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
          + inTopoMsg);
    }
  }
@@ -1209,11 +1177,10 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + " :"
          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
          + "\nAND REPLIED:\n" + outTopoMsg);
    }
  }
@@ -1224,10 +1191,9 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + " :"
          + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
    }
  }
@@ -1240,11 +1206,9 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
          inStartECLSessionMsg.toString());
      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
          + ", " + getClass().getSimpleName() + " " + this + " :"
          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
    }
  }
@@ -1264,10 +1228,9 @@
   */
  public long getReferenceGenId()
  {
    long refgenid = -1;
    if (replicationServerDomain!=null)
      refgenid = replicationServerDomain.getGenerationId();
    return refgenid;
    if (replicationServerDomain != null)
      return replicationServerDomain.getGenerationId();
    return -1;
  }
  /**
@@ -1285,8 +1248,7 @@
   * @param update the update message received.
   * @throws IOException when it occurs.
   */
  public void put(UpdateMsg update)
  throws IOException
  public void put(UpdateMsg update) throws IOException
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.put(update, this);