opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -381,6 +381,20 @@ public static final int MSGID_READER_EXCEPTION = CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53; /** * A replication server received a null messsage from * another server. */ public static final int MSGID_DUPLICATE_SERVER_ID = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 54; /** * A server disconnected from the replication server. * (this is an informational message) */ public static final int MSGID_DUPLICATE_REPLICATION_SERVER_ID = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 55; /** @@ -520,6 +534,10 @@ "The provider class does not allow the operation requested"); registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME, "The hostname %s could not be resolved as an IP address"); registerMessage(MSGID_DUPLICATE_SERVER_ID, "Servers %s and %s have the same ServerId : %d"); registerMessage(MSGID_DUPLICATE_REPLICATION_SERVER_ID, "Replication Servers %s and %s have the same ServerId : %d"); registerMessage(MSGID_READER_NULL_MSG, "Received a Null Msg from %s"); registerMessage(MSGID_READER_EXCEPTION, opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -405,20 +405,16 @@ * This server could not find any replicationServer * Let's wait a little and try again. */ synchronized (this) checkState = false; int msgID = MSGID_COULD_NOT_FIND_CHANGELOG; String message = getMessage(msgID); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); try { checkState = false; int msgID = MSGID_COULD_NOT_FIND_CHANGELOG; String message = getMessage(msgID); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); try { this.wait(1000); } catch (InterruptedException e) { } Thread.sleep(1000); } catch (InterruptedException e) { } } } opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -229,20 +229,31 @@ * * @param handler handler for the server that must be started * @throws Exception when method has failed * @return A boolean indicating if the start was successfull. */ public void startServer(ServerHandler handler) throws Exception public boolean startServer(ServerHandler handler) throws Exception { /* * create the balanced tree that will be used to forward changes */ synchronized (connectedServers) { ServerHandler oldHandler = connectedServers.get(handler.getServerId()); if (connectedServers.containsKey(handler.getServerId())) { /* TODO : handle error properly */ throw new Exception("serverId already registered"); // looks like two LDAP servers have the same serverId // log an error message and drop this connection. int msgID = MSGID_DUPLICATE_SERVER_ID; String message = getMessage(msgID, oldHandler.toString(), handler.toString(), handler.getServerId()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); return false; } connectedServers.put(handler.getServerId(), handler); return true; } } @@ -267,20 +278,41 @@ * * @param handler the server ID to which we want to forward changes * @throws Exception in case of errors * @return A boolean indicating if the start was successfull. */ public void startReplicationServer(ServerHandler handler) throws Exception public boolean startReplicationServer(ServerHandler handler) throws Exception { /* * create the balanced tree that will be used to forward changes * TODO throw proper exception */ synchronized (replicationServers) { if (replicationServers.containsKey(handler.getServerId())) ServerHandler oldHandler = replicationServers.get(handler.getServerId()); if ((oldHandler != null)) { throw new Exception("Replication Server Id already registered"); if (oldHandler.getServerAddressURL().equals( handler.getServerAddressURL())) { // this is the same server, this means that our ServerStart messages // have been sent at about the same time and 2 connections // have been established. // Silently drop this connection. } else { // looks like two replication servers have the same serverId // log an error message and drop this connection. int msgID = MSGID_DUPLICATE_REPLICATION_SERVER_ID; String message = getMessage(msgID, oldHandler.getServerAddressURL(), handler.getServerAddressURL(), handler.getServerId()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } return false; } replicationServers.put(handler.getServerId(), handler); return true; } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -288,7 +288,8 @@ synchronized (this) { /* check if we are connected every second */ wait(1000); int randomizer = (int) Math.random()*100; wait(1000 + randomizer); } } catch (InterruptedException e) { opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -302,34 +302,49 @@ replicationCache = replicationServer.getReplicationCache(this.baseDn); boolean started; if (serverIsLDAPserver) { replicationCache.startServer(this); started = replicationCache.startServer(this); } else { replicationCache.startReplicationServer(this); started = replicationCache.startReplicationServer(this); } writer = new ServerWriter(session, serverId, this, replicationCache); reader = new ServerReader(session, serverId, this, replicationCache); reader.start(); writer.start(); // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) if (started) { heartbeatThread = new HeartbeatThread("replication Heartbeat", session, heartbeatInterval); heartbeatThread.start(); writer = new ServerWriter(session, serverId, this, replicationCache); reader = new ServerReader(session, serverId, this, replicationCache); reader.start(); writer.start(); // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) { heartbeatThread = new HeartbeatThread("replication Heartbeat", session, heartbeatInterval); heartbeatThread.start(); } DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(this); } DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(this); else { // the connection is not valid, close it. try { session.close(); } catch (IOException e1) { // ignore } } } catch (Exception e) {