From 7b84e53457bce1f0733afa87797afc9928568c52 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 09 Oct 2006 14:15:55 +0000
Subject: [PATCH] - Change the synchronization code so that the changelog server can now be used directly through the ChangelogBroker class as a client API.

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java |   81 ++++++++++++++++++++++++++++++++--------
 1 files changed, 65 insertions(+), 16 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index 3a4d466..b265bb3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -39,6 +39,8 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 
 import org.opends.server.changelog.ProtocolSession;
 import org.opends.server.changelog.SocketSession;
@@ -47,6 +49,7 @@
 import org.opends.server.protocols.internal.InternalSearchListener;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.types.DN;
 import org.opends.server.types.DereferencePolicy;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
@@ -63,43 +66,64 @@
 {
   private boolean shutdown = false;
   private List<String> servers;
-  private Short identifier;
   private boolean connected = false;
-  private SynchronizationDomain domain;
   private final Object lock = new Object();
   private String changelogServer = "Not connected";
   private TreeSet<FakeOperation> replayOperations;
   private ProtocolSession session = null;
+  private final ServerState state;
+  private final DN baseDn;
+  private final short serverID;
+  private int maxSendDelay;
+  private int maxReceiveDelay;
+  private int maxSendQueue;
+  private int maxReceiveQueue;
 
   /**
    * Creates a new Changelog Broker for a particular SynchronizationDomain.
    *
-   * @param domain The SynchronizationDomain for which the borker is created.
+   * @param state The ServerState that should be used by this broker
+   *              when negociating the session with the changelog servers.
+   * @param baseDn The base DN that should be used by this broker
+   *              when negociating the session with the changelog servers.
+   * @param serverID The server ID that should be used by this broker
+   *              when negociating the session with the changelog servers.
+   * @param maxReceiveQueue The maximum size of the receive queue to use on
+   *                         the changelog server.
+   * @param maxReceiveDelay The maximum replication delay to use on the
+   *                        changelog server.
+   * @param maxSendQueue The maximum size of the send queue to use on
+   *                     the changelog server.
+   * @param maxSendDelay The maximum send delay to use on the changelog server.
    */
-  public ChangelogBroker(SynchronizationDomain domain)
+  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
+      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+      int maxSendDelay )
   {
-    this.domain = domain;
+    this.baseDn = baseDn;
+    this.serverID = serverID;
+    this.maxReceiveDelay = maxReceiveDelay;
+    this.maxSendDelay = maxSendDelay;
+    this.maxReceiveQueue = maxReceiveQueue;
+    this.maxSendQueue = maxSendQueue;
+    this.state = state;
     replayOperations =
       new TreeSet<FakeOperation>(new FakeOperationComparator());
   }
 
-
   /**
    * Start the ChangelogBroker.
    *
-   * @param identifier identifier of the changelog
    * @param servers list of servers used
    * @throws Exception : in case of errors
    */
-  public void start(Short identifier,
-                    List<String> servers)
+  public void start(List<String> servers)
                     throws Exception
   {
     /*
      * Open Socket to the Changelog
      * Send the Start message
      */
-    this.identifier = identifier;
     this.servers = servers;
     if (servers.size() < 1)
     {
@@ -147,7 +171,9 @@
           /*
            * Send our ServerStartMessage.
            */
-          ServerStartMessage msg = domain.newServerStartMessage();
+          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
+              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+              state);
           session.publish(msg);
 
 
@@ -167,10 +193,10 @@
            * those changes and send them again to any changelog server.
            */
           ChangeNumber changelogMaxChangeNumber =
-            startMsg.getServerState().getMaxChangeNumber(identifier);
+            startMsg.getServerState().getMaxChangeNumber(serverID);
           if (changelogMaxChangeNumber == null)
-            changelogMaxChangeNumber = new ChangeNumber(0, 0, identifier);
-          ChangeNumber ourMaxChangeNumber =  domain.getMaxChangeNumber();
+            changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
           if ((ourMaxChangeNumber == null) ||
               (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
           {
@@ -206,7 +232,7 @@
               LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
               attrs.add(Historical.HISTORICALATTRIBUTENAME);
               InternalSearchOperation op = conn.processSearch(
-                  new ASN1OctetString(domain.getBaseDN().toString()),
+                  new ASN1OctetString(baseDn.toString()),
                   SearchScope.WHOLE_SUBTREE,
                   DereferencePolicy.NEVER_DEREF_ALIASES,
                   0, 0, false, filter,
@@ -381,8 +407,10 @@
   /**
    * Receive a message.
    * @return the received message
+   * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+   *         has expired
    */
-  public SynchronizationMessage receive()
+  public SynchronizationMessage receive() throws SocketTimeoutException
   {
     while (shutdown == false)
     {
@@ -392,6 +420,11 @@
         return session.receive();
       } catch (Exception e)
       {
+        if (e instanceof SocketTimeoutException)
+        {
+          SocketTimeoutException e1 = (SocketTimeoutException) e;
+          throw e1;
+        }
         if (shutdown == false)
         {
           synchronized (lock)
@@ -439,6 +472,22 @@
   }
 
   /**
+   * Set a timeout value.
+   * With this option set to a non-zero value, calls to the receive() method
+   * block for only this amount of time after which a
+   * java.net.SocketTimeoutException is raised.
+   * The Broker is valid and useable even after such an Exception is raised.
+   *
+   * @param timeout the specified timeout, in milliseconds.
+   * @throws SocketException if there is an error in the underlying protocol,
+   *         such as a TCP error.
+   */
+  public void setSoTimeout(int timeout) throws SocketException
+  {
+    session.setSoTimeout(timeout);
+  }
+
+  /**
    * Get the name of the changelog server to which this broker is currently
    * connected.
    *

--
Gitblit v1.10.0