mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
29.04.2007 ca243a420602b9f8b441e2d4d53b96601c756e97
Fix for 1323 : Error message on startup with synchronization enabled

When 2 replication server instances open the connection to each other simultaneously
one of this connection must be closed. In that case an error message
was produced even though this is not an error condition.

This change removes the error message in such condition and also improves the
message that get printer to the error log in the case where 2 different server
have been assigned with the same server ID.
5 files modified
142 ■■■■ changed files
opends/src/server/org/opends/server/messages/ReplicationMessages.java 18 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 22 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationCache.java 46 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 53 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -381,6 +381,20 @@
  public static final int MSGID_READER_EXCEPTION =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53;
  /**
   * A replication server received a null messsage from
   * another server.
   */
  public static final int MSGID_DUPLICATE_SERVER_ID =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 54;
  /**
   * A server disconnected from the replication server.
   * (this is an informational message)
   */
  public static final int MSGID_DUPLICATE_REPLICATION_SERVER_ID =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 55;
  /**
@@ -520,6 +534,10 @@
        "The provider class does not allow the operation requested");
    registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME,
        "The hostname %s could not be resolved as an IP address");
    registerMessage(MSGID_DUPLICATE_SERVER_ID,
        "Servers %s and %s have the same ServerId : %d");
    registerMessage(MSGID_DUPLICATE_REPLICATION_SERVER_ID,
        "Replication Servers %s and %s have the same ServerId : %d");
    registerMessage(MSGID_READER_NULL_MSG,
        "Received a Null Msg from %s");
    registerMessage(MSGID_READER_EXCEPTION,
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -405,20 +405,16 @@
           * This server could not find any replicationServer
           * Let's wait a little and try again.
           */
          synchronized (this)
          checkState = false;
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE, message, msgID);
          try
          {
            checkState = false;
            int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
            String message = getMessage(msgID);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.NOTICE,
                message, msgID);
            try
            {
              this.wait(1000);
            } catch (InterruptedException e)
            {
            }
            Thread.sleep(1000);
          } catch (InterruptedException e)
          {
          }
        }
      }
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -229,20 +229,31 @@
   *
   * @param handler handler for the server that must be started
   * @throws Exception when method has failed
   * @return A boolean indicating if the start was successfull.
   */
  public void startServer(ServerHandler handler) throws Exception
  public boolean startServer(ServerHandler handler) throws Exception
  {
    /*
     * create the balanced tree that will be used to forward changes
     */
    synchronized (connectedServers)
    {
      ServerHandler oldHandler = connectedServers.get(handler.getServerId());
      if (connectedServers.containsKey(handler.getServerId()))
      {
        /* TODO : handle error properly */
        throw new Exception("serverId already registered");
        // looks like two LDAP servers have the same serverId
        // log an error message and drop this connection.
        int    msgID   = MSGID_DUPLICATE_SERVER_ID;
        String message = getMessage(msgID, oldHandler.toString(),
            handler.toString(), handler.getServerId());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return false;
      }
      connectedServers.put(handler.getServerId(), handler);
      return true;
    }
  }
@@ -267,20 +278,41 @@
   *
   * @param handler the server ID to which we want to forward changes
   * @throws Exception in case of errors
   * @return A boolean indicating if the start was successfull.
   */
  public void startReplicationServer(ServerHandler handler) throws Exception
  public boolean startReplicationServer(ServerHandler handler) throws Exception
  {
    /*
     * create the balanced tree that will be used to forward changes
     * TODO throw proper exception
     */
    synchronized (replicationServers)
    {
      if (replicationServers.containsKey(handler.getServerId()))
      ServerHandler oldHandler = replicationServers.get(handler.getServerId());
      if ((oldHandler != null))
      {
        throw new Exception("Replication Server Id already registered");
        if (oldHandler.getServerAddressURL().equals(
            handler.getServerAddressURL()))
        {
          // this is the same server, this means that our ServerStart messages
          // have been sent at about the same time and 2 connections
          // have been established.
          // Silently drop this connection.
        }
        else
        {
          // looks like two replication servers have the same serverId
          // log an error message and drop this connection.
          int    msgID   = MSGID_DUPLICATE_REPLICATION_SERVER_ID;
          String message = getMessage(msgID, oldHandler.getServerAddressURL(),
                handler.getServerAddressURL(), handler.getServerId());
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
        }
        return false;
      }
      replicationServers.put(handler.getServerId(), handler);
      return true;
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -288,7 +288,8 @@
        synchronized (this)
        {
          /* check if we are connected every second */
          wait(1000);
          int randomizer = (int) Math.random()*100;
          wait(1000 + randomizer);
        }
      } catch (InterruptedException e)
      {
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -302,34 +302,49 @@
      replicationCache = replicationServer.getReplicationCache(this.baseDn);
      boolean started;
      if (serverIsLDAPserver)
      {
        replicationCache.startServer(this);
        started = replicationCache.startServer(this);
      }
      else
      {
        replicationCache.startReplicationServer(this);
        started = replicationCache.startReplicationServer(this);
      }
      writer = new ServerWriter(session, serverId, this, replicationCache);
      reader = new ServerReader(session, serverId, this,
                                             replicationCache);
      reader.start();
      writer.start();
      // Create a thread to send heartbeat messages.
      if (heartbeatInterval > 0)
      if (started)
      {
        heartbeatThread = new HeartbeatThread("replication Heartbeat",
                                              session, heartbeatInterval);
        heartbeatThread.start();
        writer = new ServerWriter(session, serverId, this, replicationCache);
        reader = new ServerReader(session, serverId, this,
            replicationCache);
        reader.start();
        writer.start();
        // Create a thread to send heartbeat messages.
        if (heartbeatInterval > 0)
        {
          heartbeatThread = new HeartbeatThread("replication Heartbeat",
              session, heartbeatInterval);
          heartbeatThread.start();
        }
        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
        DirectoryServer.registerMonitorProvider(this);
      }
      DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
      DirectoryServer.registerMonitorProvider(this);
      else
      {
        // the connection is not valid, close it.
        try
        {
          session.close();
        } catch (IOException e1)
        {
          // ignore
        }
      }
    }
    catch (Exception e)
    {