From 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508  These changes implement a window mechanism in the sycnhronization protocol.

---
 opends/src/server/org/opends/server/changelog/Changelog.java |   92 +++++++++++++++++++++++++++------------------
 1 files changed, 55 insertions(+), 37 deletions(-)

diff --git a/opends/src/server/org/opends/server/changelog/Changelog.java b/opends/src/server/org/opends/server/changelog/Changelog.java
index 6660b1b..497e2f4 100644
--- a/opends/src/server/org/opends/server/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/changelog/Changelog.java
@@ -71,14 +71,14 @@
  */
 public class Changelog implements Runnable, ConfigurableComponent
 {
-  static private short serverId;
-  static private String serverURL;
+  private short serverId;
+  private String serverURL;
 
-  private static ServerSocket listenSocket;
-  private static Thread myListenThread;
-  private static Thread myConnectThread;
+  private ServerSocket listenSocket;
+  private Thread myListenThread;
+  private Thread myConnectThread;
 
-  private static boolean runListen = true;
+  private boolean runListen = true;
 
   /* The list of changelog servers configured by the administrator */
   private List<String> changelogServers;
@@ -86,19 +86,22 @@
   /* This table is used to store the list of dn for which we are currently
    * handling servers.
    */
-  private static HashMap<DN, ChangelogCache> baseDNs =
+  private HashMap<DN, ChangelogCache> baseDNs =
           new HashMap<DN, ChangelogCache>();
 
   private String localURL = "null";
-  private static boolean shutdown = false;
+  private boolean shutdown = false;
   private short changelogServerId;
   private DN configDn;
   private List<ConfigAttribute> configAttributes =
           new ArrayList<ConfigAttribute>();
+  private ChangelogDbEnv dbEnv;
+  private int rcvWindow;
 
   static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
   static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
   static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
+  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
 
   static final IntegerConfigAttribute changelogPortStub =
     new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -114,6 +117,10 @@
         "changelog server information", true,
         true, false);
 
+  static final IntegerConfigAttribute windowStub =
+    new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
+                               false, false, false, true, 0, false, 0);
+
   /**
    * Check if a ConfigEntry is valid.
    * @param config The config entry that needs to be checked.
@@ -229,6 +236,16 @@
     }
     configAttributes.add(changelogServer);
 
+    IntegerConfigAttribute windowAttr =
+      (IntegerConfigAttribute) config.getConfigAttribute(windowStub);
+    if (windowAttr == null)
+      rcvWindow = 100;  // Attribute is not present : use the default value
+    else
+    {
+      rcvWindow = windowAttr.activeIntValue();
+      configAttributes.add(windowAttr);
+    }
+
     initialize(changelogServerId, changelogPort);
 
     configDn = config.getDN();
@@ -305,9 +322,10 @@
       try
       {
         newSocket =  listenSocket.accept();
+        newSocket.setReceiveBufferSize(1000000);
         ServerHandler handler = new ServerHandler(
                                      new SocketSession(newSocket));
-        handler.start(null);
+        handler.start(null, serverId, serverURL, rcvWindow, this);
       } catch (IOException e)
       {
         // ignore
@@ -378,11 +396,12 @@
       InetSocketAddress ServerAddr = new InetSocketAddress(
                      InetAddress.getByName(hostname), Integer.parseInt(port));
       Socket socket = new Socket();
+      socket.setReceiveBufferSize(1000000);
       socket.connect(ServerAddr, 500);
 
       ServerHandler handler = new ServerHandler(
                                       new SocketSession(socket));
-      handler.start(baseDn);
+      handler.start(baseDn, serverId, serverURL, rcvWindow, this);
     }
     catch (IOException e)
     {
@@ -406,8 +425,9 @@
        * Initialize the changelog database.
        * TODO : the changelog db path should be configurable
        */
-      ChangelogDB.initialize(DirectoryServer.getServerRoot() + File.separator
-          + "changelogDb");
+      dbEnv = new ChangelogDbEnv(
+          DirectoryServer.getServerRoot() + File.separator + "changelogDb",
+          this);
 
       /*
        * create changelog cache
@@ -421,7 +441,9 @@
       String localAdddress = InetAddress.getLocalHost().getHostAddress();
       serverURL = localhostname + ":" + String.valueOf(changelogPort);
       localURL = localAdddress + ":" + String.valueOf(changelogPort);
-      listenSocket = new ServerSocket(changelogPort);
+      listenSocket = new ServerSocket();
+      listenSocket.setReceiveBufferSize(1000000);
+      listenSocket.bind(new InetSocketAddress(changelogPort));
 
       /*
        * create working threads
@@ -460,32 +482,12 @@
   }
 
   /**
-   * Retrieves the unique identifier for this changelog.
-   *
-   * @return  The unique identifier for this changelog.
-   */
-  public static short getServerId()
-  {
-    return serverId;
-  }
-
-  /**
-   * Retrieves the host and port for this changelog, separated by a colon.
-   *
-   * @return  The host and port for this changelog, separated by a colon.
-   */
-  public static String getServerURL()
-  {
-    return serverURL;
-  }
-
-  /**
    * Get the ChangelogCache associated to the base DN given in parameter.
    *
    * @param baseDn The base Dn for which the ChangelogCache must be returned.
    * @return The ChangelogCache associated to the base DN given in parameter.
    */
-  public static ChangelogCache getChangelogCache(DN baseDn)
+  public ChangelogCache getChangelogCache(DN baseDn)
   {
     ChangelogCache changelogCache;
 
@@ -493,7 +495,7 @@
     {
       changelogCache = baseDNs.get(baseDn);
       if (changelogCache == null)
-        changelogCache = new ChangelogCache(baseDn);
+        changelogCache = new ChangelogCache(baseDn, this);
       baseDNs.put(baseDn, changelogCache);
     }
 
@@ -503,7 +505,7 @@
   /**
    * Shutdown the Changelog service and all its connections.
    */
-  public static void shutdown()
+  public void shutdown()
   {
     shutdown = true;
 
@@ -525,6 +527,22 @@
       changelogCache.shutdown();
     }
 
-    ChangelogDB.shutdownDbEnvironment();
+    dbEnv.shutdown();
+  }
+
+
+  /**
+   * Creates a new DB handler for this Changelog and the serverId and
+   * DN given in parameter.
+   *
+   * @param id The serverId for which the dbHandler must be created.
+   * @param baseDn The DN for which the dbHandler muste be created.
+   * @return The new DB handler for this Changelog and the serverId and
+   *         DN given in parameter.
+   * @throws DatabaseException in case of underlying database problem.
+   */
+  public DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
+  {
+    return new DbHandler(id, baseDn, this, dbEnv);
   }
 }

--
Gitblit v1.10.0