From a00e725a364e9eb859ca8ed2c914e6637c94cb9d Mon Sep 17 00:00:00 2001
From: neil_a_wilson <neil_a_wilson@localhost>
Date: Thu, 29 Mar 2007 21:12:30 +0000
Subject: [PATCH] Back out the commit included in revision 1539 because it does not build properly (it appears to reference classes that are not in the repository).
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 2
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java | 50 -
opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java | 65 -
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java | 14
opendj-sdk/opends/src/server/org/opends/server/messages/TaskMessages.java | 17
opendj-sdk/opends/src/server/org/opends/server/config/ConfigConstants.java | 56 -
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java | 2
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java | 374 +++-------
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java | 20
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java | 3
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java | 32
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 63 -
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java | 1196 ---------------------------------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java | 112 ---
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java | 2
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java | 11
opendj-sdk/opends/resource/schema/02-config.ldif | 33
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java | 27
18 files changed, 189 insertions(+), 1,890 deletions(-)
diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index b60575c..b5791ca 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -1045,10 +1045,6 @@
NAME 'ds-cfg-heartbeat-interval'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.306
- NAME 'ds-cfg-changelog-db-directory'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.307
NAME 'ds-privilege-name' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' )
@@ -1107,22 +1103,6 @@
NAME 'ds-cfg-case-sensitive-validation'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.332
- NAME 'ds-task-initialize-domain-dn'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.333
- NAME 'ds-task-initialize-replica-server-id'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.334
- NAME 'ds-task-unprocessed-entry-count'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.335
- NAME 'ds-task-processed-entry-count'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
@@ -1445,8 +1425,7 @@
'ds-cfg-synchronization-changelog-server-config' SUP top
STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port )
MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size $
- ds-cfg-changelog-max-queue-size $ds-cfg-changelog-db-directory )
- X-ORIGIN 'OpenDS Directory Server' )
+ ds-cfg-changelog-max-queue-size ) X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory'
SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn )
X-ORIGIN 'OpenDS Directory Server' )
@@ -1553,14 +1532,4 @@
SUP ds-cfg-password-validator STRUCTURAL
MUST ( ds-cfg-maximum-consecutive-length $ ds-cfg-case-sensitive-validation )
X-ORIGIN 'OpenDS Directory Server' )
-objectClasses: ( 1.3.6.1.4.1.26027.1.2.91
- NAME 'ds-task-initialize-from-remote-replica' SUP ds-task
- MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
- MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
- X-ORIGIN 'OpenDS Directory Server' )
-objectClasses: ( 1.3.6.1.4.1.26027.1.2.92
- NAME 'ds-task-initialize-remote-replica' SUP ds-task
- MUST ( ds-task-initialize-domain-dn $ ds-task-initialize-replica-server-id )
- MAY ( ds-task-processed-entry-count $ ds-task-unprocessed-entry-count )
- X-ORIGIN 'OpenDS Directory Server' )
diff --git a/opendj-sdk/opends/src/server/org/opends/server/config/ConfigConstants.java b/opendj-sdk/opends/src/server/org/opends/server/config/ConfigConstants.java
index 5a666c2..8eb3ad5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/config/ConfigConstants.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3665,62 +3665,6 @@
NAME_PREFIX_TASK + "import-is-encrypted";
- /**
- * The name of the objectclass that will be used for a Directory Server
- * initialize task definition.
- */
- public static final String OC_INITIALIZE_TASK =
- NAME_PREFIX_TASK + "initialize-from-remote-replica";
-
- /**
- * The name of the attribute in an initialize task definition that specifies
- * the base dn related to the synchonization domain to initialize.
- */
- public static final String ATTR_TASK_INITIALIZE_DOMAIN_DN =
- NAME_PREFIX_TASK + "initialize-domain-dn";
-
- /**
- * The name of the attribute in an initialize target task definition that
- * specifies the source in terms of source server from which to initialize.
- */
- public static final String ATTR_TASK_INITIALIZE_SOURCE =
- NAME_PREFIX_TASK + "initialize-replica-server-id";
-
- /**
- * The name of the objectclass that will be used for a Directory Server
- * initialize target task definition.
- */
- public static final String OC_INITIALIZE_TARGET_TASK =
- NAME_PREFIX_TASK + "initialize-remote-replica";
-
- /**
- * The name of the attribute in an initialize target task definition that
- * specifies the base dn related to the synchonization domain to initialize.
- */
- public static final String ATTR_TASK_INITIALIZE_TARGET_DOMAIN_DN =
- NAME_PREFIX_TASK + "initialize-domain-dn";
-
- /**
- * The name of the attribute in an initialize target task definition that
- * specifies the scope in terms of servers to initialize.
- */
- public static final String ATTR_TASK_INITIALIZE_TARGET_SCOPE =
- NAME_PREFIX_TASK + "initialize-replica-server-id";
-
- /**
- * The name of the attribute in an initialize target task definition that
- * specifies the scope in terms of servers to initialize.
- */
- public static final String ATTR_TASK_INITIALIZE_LEFT =
- NAME_PREFIX_TASK + "unprocessed-entry-count";
-
- /**
- * The name of the attribute in an initialize target task definition that
- * specifies the scope in terms of servers to initialize.
- */
- public static final String ATTR_TASK_INITIALIZE_DONE =
- NAME_PREFIX_TASK + "processed-entry-count";
-
/**
* The name of the objectclass that will be used for a Directory Server
diff --git a/opendj-sdk/opends/src/server/org/opends/server/messages/TaskMessages.java b/opendj-sdk/opends/src/server/org/opends/server/messages/TaskMessages.java
index 7349988..16843e3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/messages/TaskMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/messages/TaskMessages.java
@@ -211,19 +211,7 @@
public static final int MSGID_TASK_ADDSCHEMAFILE_CANNOT_NOTIFY_SYNC_PROVIDER =
CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 17;
- /**
- * The message ID for the message that will be used when an invalid domain
- * base DN is provided as argument to the initialize target task.
- */
- public static final int MSGID_TASK_INITIALIZE_TARGET_INVALID_DN =
- CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 18;
- /**
- * The message ID for the message that will be used when an invalid domain
- * base DN is provided as argument to the initialize task.
- */
- public static final int MSGID_TASK_INITIALIZE_INVALID_DN =
- CATEGORY_MASK_TASK | SEVERITY_MASK_SEVERE_ERROR | 19;
/**
* Associates a set of generic messages with the message IDs defined in this
@@ -291,11 +279,6 @@
registerMessage(MSGID_TASK_LDIFEXPORT_INSUFFICIENT_PRIVILEGES,
"You do not have sufficient privileges to initiate an " +
"LDIF export.");
-
- registerMessage(MSGID_TASK_INITIALIZE_TARGET_INVALID_DN,
- "Invalid DN provided with the Initialize Target task.");
- registerMessage(MSGID_TASK_INITIALIZE_INVALID_DN,
- "Invalid DN provided with the Initialize task.");
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 531569c..64bf887 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -111,7 +111,7 @@
static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
- static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
+ static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index 51f099e..d7e0e2a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -26,31 +26,26 @@
*/
package org.opends.server.synchronization.changelog;
-import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.loggers.Error.logError;
+
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.InitializeRequestMessage;
-import org.opends.server.synchronization.protocol.ErrorMessage;
-import org.opends.server.synchronization.protocol.RoutableMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
+import com.sleepycat.je.DatabaseException;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
-import com.sleepycat.je.DatabaseException;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* This class define an in-memory cache that will be used to store
@@ -430,263 +425,148 @@
}
}
+
/**
- * Retrieves the destination handlers for a routable message.
+ * Send back an ack to the server that sent the change.
*
- * @param msg The message to route.
- * @param senderHandler The handler of the server that published this message.
- * @return The list of destination handlers.
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
*/
- protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
- ServerHandler senderHandler)
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
{
+ short serverId = changeNumber.getServerId();
+ sendAck(changeNumber, isLDAPserver, serverId);
+ }
- List<ServerHandler> servers =
- new ArrayList<ServerHandler>();
-
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "getDestinationServers"
- + " msgDest:" + msg.getDestination() , 1);
-
- if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
- {
- // TODO Import from the "closest server" to be implemented
- }
- else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
- {
- if (!senderHandler.isChangelogServer())
- {
- // Send to all changelogServers
- for (ServerHandler destinationHandler : changelogServers.values())
- {
- servers.add(destinationHandler);
- }
- }
-
- // Send to all connected LDAP servers
- for (ServerHandler destinationHandler : connectedServers.values())
- {
- // Don't loop on the sender
- if (destinationHandler == senderHandler)
- continue;
- servers.add(destinationHandler);
- }
- }
+ /**
+ *
+ * Send back an ack to a server that sent the change.
+ *
+ * @param changeNumber The ChangeNumber of the change that must be acked.
+ * @param isLDAPserver This boolean indicates if the server that sent the
+ * change was an LDAP server or a Changelog server.
+ * @param serverId The identifier of the server from which we
+ * received the change..
+ */
+ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+ short serverId)
+ {
+ ServerHandler handler;
+ if (isLDAPserver)
+ handler = connectedServers.get(serverId);
else
- {
- // Destination is one server
- ServerHandler destinationHandler =
- connectedServers.get(msg.getDestination());
- if (destinationHandler != null)
- {
- servers.add(destinationHandler);
- }
- else
- {
- // the targeted server is NOT connected
- if (senderHandler.isLDAPserver())
- {
- // let's forward to the other changelogs
- servers.addAll(changelogServers.values());
- }
- }
- }
+ handler = changelogServers.get(serverId);
- return servers;
+ // TODO : check for null handler and log error
+ try
+ {
+ handler.sendAck(changeNumber);
+ } catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ handler.shutdown();
+ }
}
/**
- * Process an InitializeRequestMessage.
- *
- * @param msg The message received and to be processed.
- * @param senderHandler The server handler of the server that emitted
- * the message.
+ * Shutdown this ChangelogCache.
*/
- public void process(RoutableMessage msg, ServerHandler senderHandler)
+ public void shutdown()
{
-
- List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
-
- if (servers.isEmpty())
+ // Close session with other changelogs
+ for (ServerHandler serverHandler : changelogServers.values())
{
- if (!(msg instanceof InitializeRequestMessage))
- {
- // TODO A more elaborated policy is probably needed
- }
- else
- {
- ErrorMessage errMsg = new ErrorMessage(
- msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
- "serverID:" + msg.getDestination());
-
- try
- {
- senderHandler.send(errMsg);
- }
- catch(IOException ioe)
- {
- // TODO Handle error properly (sender timeout in addition)
- }
- }
- return;
+ serverHandler.shutdown();
}
- for (ServerHandler targetHandler : servers)
+ // Close session with other LDAP servers
+ for (ServerHandler serverHandler : connectedServers.values())
{
- try
- {
- targetHandler.send(msg);
- }
- catch(IOException ioe)
- {
- // TODO Handle error properly (sender timeout in addition)
- }
+ serverHandler.shutdown();
}
+ // Shutdown the dbHandlers
+ synchronized (sourceDbHandlers)
+ {
+ for (DbHandler dbHandler : sourceDbHandlers.values())
+ {
+ dbHandler.shutdown();
+ }
+ sourceDbHandlers.clear();
+ }
}
- /**
- * Send back an ack to the server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+ /**
+ * Returns the ServerState describing the last change from this replica.
+ *
+ * @return The ServerState describing the last change from this replica.
+ */
+ public ServerState getDbServerState()
+ {
+ ServerState serverState = new ServerState();
+ for (DbHandler db : sourceDbHandlers.values())
{
- short serverId = changeNumber.getServerId();
- sendAck(changeNumber, isLDAPserver, serverId);
+ serverState.update(db.getLastChange());
+ }
+ return serverState;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ return "ChangelogCache " + baseDn;
+ }
+
+ /**
+ * Check if some server Handler should be removed from flow control state.
+ * @throws IOException If an error happened.
+ */
+ public void checkAllSaturation() throws IOException
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ handler.checkWindow();
}
- /**
- *
- * Send back an ack to a server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a Changelog server.
- * @param serverId The identifier of the server from which we
- * received the change..
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
- short serverId)
+ for (ServerHandler handler : connectedServers.values())
{
- ServerHandler handler;
- if (isLDAPserver)
- handler = connectedServers.get(serverId);
- else
- handler = changelogServers.get(serverId);
-
- // TODO : check for null handler and log error
- try
- {
- handler.sendAck(changeNumber);
- } catch (IOException e)
- {
- /*
- * An error happened trying the send back an ack to this server.
- * Log an error and close the connection to this server.
- */
- int msgID = MSGID_CHANGELOG_ERROR_SENDING_ACK;
- String message = getMessage(msgID, this.toString())
- + stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- handler.shutdown();
- }
+ handler.checkWindow();
}
+ }
- /**
- * Shutdown this ChangelogCache.
- */
- public void shutdown()
+ /**
+ * Check if a server that was in flow control can now restart
+ * sending updates.
+ * @param sourceHandler The server that must be checked.
+ * @return true if the server can restart sending changes.
+ * false if the server can't restart sending changes.
+ */
+ public boolean restartAfterSaturation(ServerHandler sourceHandler)
+ {
+ for (ServerHandler handler : changelogServers.values())
{
- // Close session with other changelogs
- for (ServerHandler serverHandler : changelogServers.values())
- {
- serverHandler.shutdown();
- }
-
- // Close session with other LDAP servers
- for (ServerHandler serverHandler : connectedServers.values())
- {
- serverHandler.shutdown();
- }
-
- // Shutdown the dbHandlers
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- dbHandler.shutdown();
- }
- sourceDbHandlers.clear();
- }
- }
-
- /**
- * Returns the ServerState describing the last change from this replica.
- *
- * @return The ServerState describing the last change from this replica.
- */
- public ServerState getDbServerState()
- {
- ServerState serverState = new ServerState();
- for (DbHandler db : sourceDbHandlers.values())
- {
- serverState.update(db.getLastChange());
- }
- return serverState;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString()
- {
- return "ChangelogCache " + baseDn;
- }
-
- /**
- * Check if some server Handler should be removed from flow control state.
- * @throws IOException If an error happened.
- */
- public void checkAllSaturation() throws IOException
- {
- for (ServerHandler handler : changelogServers.values())
- {
- handler.checkWindow();
- }
-
- for (ServerHandler handler : connectedServers.values())
- {
- handler.checkWindow();
- }
- }
-
- /**
- * Check if a server that was in flow control can now restart
- * sending updates.
- * @param sourceHandler The server that must be checked.
- * @return true if the server can restart sending changes.
- * false if the server can't restart sending changes.
- */
- public boolean restartAfterSaturation(ServerHandler sourceHandler)
- {
- for (ServerHandler handler : changelogServers.values())
- {
- if (!handler.restartAfterSaturation(sourceHandler))
+ if (!handler.restartAfterSaturation(sourceHandler))
return false;
- }
-
- for (ServerHandler handler : connectedServers.values())
- {
- if (!handler.restartAfterSaturation(sourceHandler))
- return false;
- }
- return true;
}
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
+ return false;
+ }
+ return true;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index a4e1ae8..3cee58d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -27,8 +27,6 @@
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -46,18 +44,6 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.ChangelogStartMessage;
-import org.opends.server.synchronization.protocol.HeartbeatThread;
-import org.opends.server.synchronization.protocol.ProtocolSession;
-import org.opends.server.synchronization.protocol.RoutableMessage;
-import org.opends.server.synchronization.protocol.ServerStartMessage;
-import org.opends.server.synchronization.protocol.SynchronizationMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -65,6 +51,17 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ServerState;
+import org.opends.server.synchronization.protocol.AckMessage;
+import org.opends.server.synchronization.protocol.ChangelogStartMessage;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+import org.opends.server.synchronization.protocol.ServerStartMessage;
+import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.synchronization.protocol.WindowMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -111,7 +108,6 @@
// flow controled and should
// be stopped from sending messsages.
private int saturationCount = 0;
- private short changelogId;
/**
* The time in milliseconds between heartbeats from the synchronization
@@ -159,7 +155,6 @@
public void start(DN baseDn, short changelogId, String changelogURL,
int windowSize, Changelog changelog)
{
- this.changelogId = changelogId;
rcvWindowSizeHalf = windowSize/2;
maxRcvWindow = windowSize;
rcvWindow = windowSize;
@@ -1268,40 +1263,4 @@
{
return heartbeatInterval;
}
-
- /**
- * Processes a routable message.
- *
- * @param msg The message to be processed.
- */
- public void process(RoutableMessage msg)
- {
- if (debugEnabled())
- debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
-
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
-
- changelogCache.process(msg, this);
- }
-
- /**
- * Send an InitializeRequestMessage to the server connected through this
- * handler.
- *
- * @param msg The message to be processed
- * @throws IOException when raised by the underlying session
- */
- public void send(RoutableMessage msg) throws IOException
- {
- if (debugEnabled())
- debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
-
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
-
- session.publish(msg);
- }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
index 2dc7a75..6f2e451 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -34,11 +34,6 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.ErrorMessage;
-import org.opends.server.synchronization.protocol.DoneMessage;
-import org.opends.server.synchronization.protocol.EntryMessage;
-import org.opends.server.synchronization.protocol.InitializeRequestMessage;
-import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ProtocolSession;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
@@ -121,33 +116,6 @@
WindowMessage windowMsg = (WindowMessage) msg;
handler.updateWindow(windowMsg);
}
- else if (msg instanceof InitializeRequestMessage)
- {
- InitializeRequestMessage initializeMsg =
- (InitializeRequestMessage) msg;
- handler.process(initializeMsg);
- }
- else if (msg instanceof InitializeTargetMessage)
- {
- InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
- handler.process(initializeMsg);
- }
- else if (msg instanceof EntryMessage)
- {
- EntryMessage entryMsg = (EntryMessage) msg;
- handler.process(entryMsg);
- }
- else if (msg instanceof DoneMessage)
- {
- DoneMessage doneMsg = (DoneMessage) msg;
- handler.process(doneMsg);
- }
- else if (msg instanceof ErrorMessage)
- {
- ErrorMessage errorMsg = (ErrorMessage) msg;
- handler.process(errorMsg);
- }
-
}
} catch (IOException e)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
index d2fc088..bab9527 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -317,61 +317,13 @@
CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
/**
- * The message id for the description of the attribute used to configure
+ * The message id for thedescription of the attribute used to configure
* the purge delay of the Changelog Servers.
*/
public static final int MSGID_PURGE_DELAY_ATTR =
CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
- /**
- * The message id for the error raised when export/import
- * is rejected due to an export/import already in progress.
- */
- public static final int MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED =
- CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 44;
- /**
- * The message id for the error raised when import
- * is rejected due to an invalid source of data imported.
- */
- public static final int MSGID_INVALID_IMPORT_SOURCE =
- CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 45;
-
- /**
- * The message id for the error raised when export
- * is rejected due to an invalid target to export datas.
- */
- public static final int MSGID_INVALID_EXPORT_TARGET =
- CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 46;
-
- /**
- * The message id for the error raised when import/export message
- * cannot be routed to an up-and-running target in the domain.
- */
- public static final int MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN =
- CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 47;
-
- /**
- * The message ID for the message that will be used when no domain
- * can be found matching the provided domain base DN.
- */
- public static final int MSGID_NO_MATCHING_DOMAIN =
- CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 48;
-
- /**
- * The message ID for the message that will be used when no domain
- * can be found matching the provided domain base DN.
- */
- public static final int MSGID_MULTIPLE_MATCHING_DOMAIN
- = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 49;
-
-
- /**
- * The message ID for the message that will be used when the domain
- * belongs to a provider class that does not allow the export.
- */
- public static final int MSGID_INVALID_PROVIDER =
- CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 50;
/**
* Register the messages from this class in the core server.
@@ -497,20 +449,5 @@
" restored because changelog servers would not be able to refresh" +
" LDAP servers with older versions of the data. A zero value" +
" can be used to specify an infinite delay (or never purge).");
- MessageHandler.registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
- "The current request is rejected due to an import or an export" +
- " already in progress for the same data.");
- MessageHandler.registerMessage(MSGID_INVALID_IMPORT_SOURCE,
- "Invalid source for the import.");
- MessageHandler.registerMessage(MSGID_INVALID_EXPORT_TARGET,
- "Invalid target for the export.");
- MessageHandler.registerMessage(MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
- "No reachable peer in the domain.");
- MessageHandler.registerMessage(MSGID_NO_MATCHING_DOMAIN,
- "No domain matches the base DN provided.");
- MessageHandler.registerMessage(MSGID_MULTIPLE_MATCHING_DOMAIN,
- "Multiple domains match the base DN provided.");
- MessageHandler.registerMessage(MSGID_INVALID_PROVIDER,
- "The provider class does not allow the operation requested.");
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index a985e6b..8fb1917 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -27,8 +27,6 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.loggers.Error.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -347,10 +345,6 @@
{
if (session != null)
{
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "Broker : connect closing session" , 1);
-
session.close();
session = null;
}
@@ -504,7 +498,6 @@
try
{
SynchronizationMessage msg = session.receive();
-
if (msg instanceof WindowMessage)
{
WindowMessage windowMsg = (WindowMessage) msg;
@@ -551,11 +544,6 @@
connected = false;
try
{
- if (debugEnabled())
- {
- debugInfo("ChangelogBroker Stop Closing session");
- }
-
session.close();
} catch (IOException e)
{}
@@ -694,12 +682,4 @@
{
return numLostConnections;
}
-
- private void log(String message)
- {
- int msgID = MSGID_UNKNOWN_TYPE;
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
index 670db40..b0bc862 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -27,13 +27,11 @@
package org.opends.server.synchronization.plugin;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
-
-import java.io.IOException;
-
import org.opends.server.api.DirectoryThread;
import org.opends.server.synchronization.protocol.ProtocolSession;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+import java.io.IOException;
/**
* This class implements a thread to monitor heartbeat messages from the
@@ -105,9 +103,6 @@
long lastReceiveTime = session.getLastReceiveTime();
if (now > lastReceiveTime + 2 * heartbeatInterval)
{
- debugInfo("Heartbeat monitor is closing the broker session " +
- "because it could not detect a heartbeat.");
-
// Heartbeat is well overdue so the server is assumed to be dead.
if (debugEnabled())
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
index edbb868..e71fd3b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -485,7 +485,7 @@
* Can be null is the request has no associated operation.
* @return The Synchronization domain for this DN.
*/
- public static SynchronizationDomain findDomain(DN dn, Operation op)
+ private static SynchronizationDomain findDomain(DN dn, Operation op)
{
/*
* Don't run the special synchronization code on Operation that are
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
index 1753d0e..c0d02fd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -233,8 +233,7 @@
{
int msgID = MSGID_ERROR_UPDATING_RUV;
String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
- op.toString(), op.getErrorMessage(), baseDn.toString(),
- Thread.currentThread().getStackTrace());
+ op.toString(), op.getErrorMessage(), baseDn.toString());
logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
message, msgID);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 11dea9d..7b2e078 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,59 +26,47 @@
*/
package org.opends.server.synchronization.plugin;
-import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
-import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
-import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
-import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
-import static org.opends.server.loggers.Error.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
-import static org.opends.server.messages.ConfigMessages.*;
-import static org.opends.server.messages.MessageHandler.getMessage;
-import static org.opends.server.messages.ToolMessages.*;
-import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
-import static org.opends.server.synchronization.protocol.OperationContext.*;
-import static org.opends.server.util.ServerConstants.*;
-import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.util.ServerConstants.
+ TIME_UNIT_MILLISECONDS_ABBR;
+import static org.opends.server.util.ServerConstants.
+ TIME_UNIT_MILLISECONDS_FULL;
+import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
+import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
+import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.synchronization.plugin.Historical.*;
+import static org.opends.server.synchronization.protocol.OperationContext.*;
+import static org.opends.server.loggers.Error.*;
+import static org.opends.server.messages.MessageHandler.*;
-import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
-import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
-import org.opends.server.api.SynchronizationProvider;
-import org.opends.server.backends.jeb.BackendImpl;
-import org.opends.server.backends.task.Task;
-import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
-import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
+import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
-import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -89,30 +77,19 @@
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddContext;
import org.opends.server.synchronization.protocol.DeleteContext;
-import org.opends.server.synchronization.protocol.DoneMessage;
-import org.opends.server.synchronization.protocol.EntryMessage;
-import org.opends.server.synchronization.protocol.ErrorMessage;
-import org.opends.server.synchronization.protocol.InitializeRequestMessage;
-import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ModifyContext;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.OperationContext;
-import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.tasks.InitializeTargetTask;
-import org.opends.server.tasks.InitializeTask;
-import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
-import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
-import org.opends.server.types.LDIFExportConfig;
-import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
@@ -160,96 +137,8 @@
* server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
- short serverId;
- /**
- * This class contain the context related to an import or export
- * launched on the domain.
- */
- private class IEContext
- {
- // The task that initiated the operation.
- Task initializeTask;
- // The input stream for the import
- SynchroLDIFInputStream ldifImportInputStream = null;
- // The target in the case of an export
- short exportTarget = RoutableMessage.UNKNOWN_SERVER;
- // The source in the case of an import
- short importSource = RoutableMessage.UNKNOWN_SERVER;
-
- // The total entry count expected to be processed
- long entryCount = 0;
- // The count for the entry left to be processed
- long entryLeftCount = 0;
-
- // The exception raised when any
- DirectoryException exception = null;
-
- /**
- * Initializes the counters of the task with the provider value.
- * @param count The value with which to initialize the counters.
- */
- public void initTaskCounters(long count)
- {
- entryCount = count;
- entryLeftCount = count;
-
- if (initializeTask != null)
- {
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setTotal(entryCount);
- ((InitializeTask)initializeTask).setLeft(entryCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setTotal(entryCount);
- ((InitializeTargetTask)initializeTask).setLeft(entryCount);
- }
- }
- }
-
- /**
- * Update the counters of the task for each entry processed during
- * an import or export.
- */
- public void updateTaskCounters()
- {
- entryLeftCount--;
-
- if (initializeTask != null)
- {
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setLeft(entryLeftCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
- }
- }
- }
-
- /**
- * Update the state of the task.
- */
- protected TaskState updateTaskCompletionState()
- {
- if (exception == null)
- return TaskState.COMPLETED_SUCCESSFULLY;
- else
- return TaskState.STOPPED_BY_ERROR;
- }
- }
-
- // The context related to an import or export being processed
- // Null when none is being processed.
- private IEContext ieContext = null;
-
- // The backend informations necessary to make an import or export.
- private Backend backend;
- private ConfigEntry backendConfigEntry;
- private List<DN> branches = new ArrayList<DN>(0);
+ private short serverId;
private int listenerThreadNumber = 10;
private boolean receiveStatus = true;
@@ -271,7 +160,6 @@
private boolean solveConflictFlag = true;
private boolean disabled = false;
- private boolean stateSavingDisabled = false;
static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -317,6 +205,8 @@
timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
}
+
+
/**
* Creates a new SynchronizationDomain using configuration from configEntry.
*
@@ -327,7 +217,6 @@
public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
{
super("Synchronization flush");
-
/*
* read the centralized changelog server configuration
* this is a multivalued attribute
@@ -508,10 +397,6 @@
if (!receiveStatus)
broker.suspendReceive();
}
-
- // Retrieves the related backend and its config entry
- retrievesBackendInfos(baseDN);
-
} catch (Exception e)
{
/* TODO should mark that changelog service is
@@ -918,9 +803,9 @@
}
/**
- * Receives an update message from the changelog.
+ * Receive an update message from the changelog.
* also responsible for updating the list of pending changes
- * @return the received message - null if none
+ * @return the received message
*/
public UpdateMessage receive()
{
@@ -938,7 +823,7 @@
// The server is in the shutdown process
return null;
}
- log("Broker received message :" + msg);
+
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -949,56 +834,6 @@
update = (UpdateMessage) msg;
receiveUpdate(update);
}
- else if (msg instanceof InitializeRequestMessage)
- {
- // Another server requests us to provide entries
- // for a total update
- InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
- try
- {
- initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
- null);
- }
- catch(DirectoryException de)
- {
- // Returns an error message to notify the sender
- int msgID = de.getErrorMessageID();
- ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
- msgID, de.getMessage());
- broker.publish(errorMsg);
- }
- }
- else if (msg instanceof InitializeTargetMessage)
- {
- // Another server is exporting its entries to us
- InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
-
- try
- {
- importBackend(initMsg);
- }
- catch(DirectoryException de)
- {
- // Return an error message to notify the sender
- int msgID = de.getErrorMessageID();
- ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
- msgID, de.getMessage());
- log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
- broker.publish(errorMsg);
- }
- }
- else if (msg instanceof ErrorMessage)
- {
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // changelog did not find any import source.
- abandonImportExport((ErrorMessage)msg);
- }
- }
} catch (SocketTimeoutException e)
{
// just retry
@@ -1041,9 +876,7 @@
public void receiveAck(AckMessage ack)
{
UpdateMessage update;
- ChangeNumber changeNumber;
-
- changeNumber = ack.getChangeNumber();
+ ChangeNumber changeNumber = ack.getChangeNumber();
synchronized (pendingChanges)
{
@@ -1272,7 +1105,7 @@
synchronized (this)
{
this.wait(1000);
- if (!disabled && !stateSavingDisabled )
+ if (!disabled )
{
// save the RUV
state.save();
@@ -1318,8 +1151,6 @@
this.notify();
}
- DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
-
// stop the ChangelogBroker
broker.stop();
}
@@ -2026,7 +1857,6 @@
return broker.getNumLostConnections();
}
-
/**
* Check if the domain solve conflicts.
*
@@ -2103,988 +1933,6 @@
// Nothing is needed at the moment
}
- /*
- * Total Update >>
- */
-
- /**
- * Receives bytes related to an entry in the context of an import to
- * initialize the domain (called by SynchronizationDomainLDIFInputStream).
- *
- * @return The bytes. Null when the Done or Err message has been received
- */
- public byte[] receiveEntryBytes()
- {
- SynchronizationMessage msg;
- while (true)
- {
- try
- {
- msg = broker.receive();
-
- if (msg == null)
- {
- // The server is in the shutdown process
- return null;
- }
- log("receiveEntryBytes: received " + msg);
- if (msg instanceof EntryMessage)
- {
- // FIXME
- EntryMessage entryMsg = (EntryMessage)msg;
- byte[] entryBytes = entryMsg.getEntryBytes().clone();
- ieContext.updateTaskCounters();
- return entryBytes;
- }
- else if (msg instanceof DoneMessage)
- {
- // This is the normal termination of the import
- // No error is stored and the import is ended
- // by returning null
- return null;
- }
- else if (msg instanceof ErrorMessage)
- {
- // This is an error termination during the import
- // The error is stored and the import is ended
- // by returning null
- ErrorMessage errorMsg = (ErrorMessage)msg;
- ieContext.exception = new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails() , errorMsg.getMsgID());
- return null;
- }
- else
- {
- // Other messages received during an import are trashed
- }
- }
- catch(Exception e)
- {
- ieContext.exception = new DirectoryException(ResultCode.OTHER,
- "received an unexpected message type" , 1, e);
- }
- return null;
- }
- }
-
- /**
- * Processes an error message received while an import/export is
- * on going.
- * @param errorMsg The error message received.
- */
- protected void abandonImportExport(ErrorMessage errorMsg)
- {
- // FIXME TBD Treat the case where the error happens while entries
- // are being exported
-
- if (ieContext != null)
- {
- ieContext.exception = new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails() , errorMsg.getMsgID());
-
- if (ieContext.initializeTask instanceof InitializeTask)
- {
- // Update the task that initiated the import
- ((InitializeTask)ieContext.initializeTask).
- setState(ieContext.updateTaskCompletionState(),ieContext.exception);
-
- ieContext = null;
- }
- }
- }
-
- /**
- * Clears all the entries from the JE backend determined by the
- * be id passed into the method.
- *
- * @param createBaseEntry Indicate whether to automatically create the base
- * entry and add it to the backend.
- * @param beID The be id to clear.
- * @param dn The suffix of the backend to create if the the createBaseEntry
- * boolean is true.
- * @throws Exception If an unexpected problem occurs.
- */
- public static void clearJEBackend(boolean createBaseEntry, String beID,
- String dn) throws Exception
- {
- BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
- DN[] baseDNs = backend.getBaseDNs();
-
- // FIXME Should getConfigEntry be part of TaskUtils ?
- ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
-
- // FIXME Should setBackendEnabled be part of TaskUtils ?
- TaskUtils.setBackendEnabled(configEntry, false);
-
- try
- {
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
-
- if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
- {
- throw new RuntimeException(failureReason.toString());
- }
-
- try
- {
- backend.clearBackend(configEntry, baseDNs);
- }
- finally
- {
- LockFileManager.releaseLock(lockFile, failureReason);
- }
- }
- finally
- {
- TaskUtils.setBackendEnabled(configEntry, true);
- }
-
- if (createBaseEntry)
- {
- DN baseDN = DN.decode(dn);
- Entry e = createEntry(baseDN);
- backend = (BackendImpl)DirectoryServer.getBackend(beID);
- backend.addEntry(e, null);
- }
- }
-
- /**
- * Log debug message.
- * @param message The message to log.
- */
- private void log(String message)
- {
- if (debugEnabled())
- {
- debugInfo("DebugInfo" + message);
- int msgID = MSGID_UNKNOWN_TYPE;
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "SynchronizationDomain/ " + message, msgID);
- }
- }
-
- /**
- * Export the entries.
- * @throws DirectoryException when an error occured
- */
- protected void exportBackend() throws DirectoryException
- {
- // FIXME Temporary workaround - will probably be fixed when implementing
- // dynamic config
- retrievesBackendInfos(this.baseDN);
-
- // Acquire a shared lock for the backend.
- try
- {
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
- if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
- {
- int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
- String message = getMessage(msgID, backend.getBackendID(),
- String.valueOf(failureReason));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
- String message = getMessage(msgID, backend.getBackendID());
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
- message + " " + stackTraceToSingleLineString(e), msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
- SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
-
- LDIFExportConfig exportConfig = new LDIFExportConfig(os);
-
- // Launch the export.
- try
- {
- DN[] baseDNs = {this.baseDN};
- backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
- }
- catch (DirectoryException de)
- {
- int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
- String message = getMessage(msgID, de.getErrorMessage());
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
- msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
- String message = getMessage(msgID, stackTraceToSingleLineString(e));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
- msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- finally
- {
- // Clean up after the export by closing the export config.
- exportConfig.close();
-
- // Release the shared lock on the backend.
- try
- {
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
- if (! LockFileManager.releaseLock(lockFile, failureReason))
- {
- int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
- String message = getMessage(msgID, backend.getBackendID(),
- String.valueOf(failureReason));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
- message, msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
- String message = getMessage(msgID, backend.getBackendID(),
- stackTraceToSingleLineString(e));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
- message, msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- }
- }
-
- /**
- * Retrieves the backend object related to the domain and the backend's
- * config entry. They will be used for import and export.
- * TODO This should be in a shared package rather than here.
- *
- * @param baseDN The baseDN to retrieve the backend
- * @throws DirectoryException when an error occired
- */
- protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
- {
- ArrayList<Backend> backendList = new ArrayList<Backend>();
- ArrayList<ConfigEntry> entryList = new ArrayList<ConfigEntry>();
- ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
-
- Backend backend = null;
- ConfigEntry backendConfigEntry = null;
- List<DN> branches = new ArrayList<DN>(0);
-
- // Retrieves the backend related to this domain
- Backend domainBackend = DirectoryServer.getBackend(baseDN);
- if (domainBackend == null)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
- String message = getMessage(msgID, DN_BACKEND_BASE);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
- // Retrieves its config entry and its DNs
- int code = getBackends(backendList, entryList, dnList);
- if (code != 0)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
- String message = getMessage(msgID, DN_BACKEND_BASE);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
- int numBackends = backendList.size();
- for (int i=0; i < numBackends; i++)
- {
- Backend b = backendList.get(i);
-
- if (domainBackend.getBackendID() != b.getBackendID())
- {
- continue;
- }
-
- if (backend == null)
- {
- backend = domainBackend;
- backendConfigEntry = entryList.get(i).duplicate();
- branches = dnList.get(i);
- }
- else
- {
- int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
- String message = getMessage(msgID, domainBackend.getBackendID());
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- }
-
- if (backend == null)
- {
- int msgID = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
- String message = getMessage(msgID, domainBackend.getBackendID());
- logError(ErrorLogCategory.BACKEND,
- ErrorLogSeverity.SEVERE_ERROR, message, msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- else if (! backend.supportsLDIFExport())
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_IMPORT;
- String message = getMessage(msgID, 0); // FIXME
- logError(ErrorLogCategory.BACKEND,
- ErrorLogSeverity.SEVERE_ERROR, message, msgID);
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
-
- this.backend = backend;
- this.backendConfigEntry = backendConfigEntry;
- this.branches = branches;
- }
-
-
- /**
- * Sends lDIFEntry entry lines to the export target currently set.
- *
- * @param lDIFEntry The lines for the LDIF entry.
- * @throws IOException when an error occured.
- */
- public void sendEntryLines(String lDIFEntry) throws IOException
- {
- // If an error was raised - like receiving an ErrorMessage
- // we just let down the export.
- if (ieContext.exception != null)
- {
- IOException ioe = new IOException(ieContext.exception.getMessage());
- ieContext = null;
- throw ioe;
- }
-
- // new entry then send the current one
- EntryMessage entryMessage = new EntryMessage(
- serverId, ieContext.exportTarget, lDIFEntry.getBytes());
- broker.publish(entryMessage);
-
- ieContext.updateTaskCounters();
- }
-
- /**
- * Retrieves information about the backends defined in the Directory Server
- * configuration.
- *
- * @param backendList A list into which instantiated (but not initialized)
- * backend instances will be placed.
- * @param entryList A list into which the config entries associated with
- * the backends will be placed.
- * @param dnList A list into which the set of base DNs for each backend
- * will be placed.
- */
- private static int getBackends(ArrayList<Backend> backendList,
- ArrayList<ConfigEntry> entryList,
- ArrayList<List<DN>> dnList)
- throws DirectoryException
- {
- // Get the base entry for all backend configuration.
- DN backendBaseDN = null;
- try
- {
- backendBaseDN = DN.decode(DN_BACKEND_BASE);
- }
- catch (DirectoryException de)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
- String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
- String message = getMessage(msgID, DN_BACKEND_BASE,
- stackTraceToSingleLineString(e));
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
- ConfigEntry baseEntry = null;
- try
- {
- baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
- }
- catch (ConfigException ce)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
- String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
- String message = getMessage(msgID, DN_BACKEND_BASE,
- stackTraceToSingleLineString(e));
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
-
- // Iterate through the immediate children, attempting to parse them as
- // backends.
- for (ConfigEntry configEntry : baseEntry.getChildren().values())
- {
- // Get the backend ID attribute from the entry. If there isn't one, then
- // skip the entry.
- String backendID = null;
- try
- {
- int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
- StringConfigAttribute idStub =
- new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
- true, false, true);
- StringConfigAttribute idAttr =
- (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
- if (idAttr == null)
- {
- continue;
- }
- else
- {
- backendID = idAttr.activeValue();
- }
- }
- catch (ConfigException ce)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
- String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
- ce.getMessage());
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
- String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
- stackTraceToSingleLineString(e));
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
-
- // Get the backend class name attribute from the entry. If there isn't
- // one, then just skip the entry.
- String backendClassName = null;
- try
- {
- int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
- StringConfigAttribute classStub =
- new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
- true, false, false);
- StringConfigAttribute classAttr =
- (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
- if (classAttr == null)
- {
- continue;
- }
- else
- {
- backendClassName = classAttr.activeValue();
- }
- }
- catch (ConfigException ce)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
- String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
- ce.getMessage());
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
- String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
- stackTraceToSingleLineString(e));
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
- Class backendClass = null;
- try
- {
- backendClass = Class.forName(backendClassName);
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_LOAD_BACKEND_CLASS;
- String message = getMessage(msgID, backendClassName,
- String.valueOf(configEntry.getDN()),
- stackTraceToSingleLineString(e));
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null);
- }
-
- Backend backend = null;
- try
- {
- backend = (Backend) backendClass.newInstance();
- backend.setBackendID(backendID);
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_INSTANTIATE_BACKEND_CLASS;
- String message = getMessage(msgID, backendClassName,
- String.valueOf(configEntry.getDN()),
- stackTraceToSingleLineString(e));
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null); }
-
-
- // Get the base DN attribute from the entry. If there isn't one, then
- // just skip this entry.
- List<DN> baseDNs = null;
- try
- {
- int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
- DNConfigAttribute baseDNStub =
- new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
- true, true, true);
- DNConfigAttribute baseDNAttr =
- (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
- if (baseDNAttr == null)
- {
- msgID = MSGID_LDIFIMPORT_NO_BASES_FOR_BACKEND;
- String message = getMessage(msgID,
- String.valueOf(configEntry.getDN()));
- throw new DirectoryException(
- DirectoryServer.getServerErrorResultCode(), message,msgID, null);
- }
- else
- {
- baseDNs = baseDNAttr.activeValues();
- }
- }
- catch (Exception e)
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BASES_FOR_BACKEND;
- String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
- stackTraceToSingleLineString(e));
- throw new DirectoryException(
- ResultCode.OTHER, message, msgID, null); }
-
-
- backendList.add(backend);
- entryList.add(configEntry);
- dnList.add(baseDNs);
- }
- return 0;
- }
-
- /**
- * Initializes this domain from another source server.
- *
- * @param source The source from which to initialize
- * @param initTask The task that launched the initialization
- * and should be updated of its progress.
- * @throws DirectoryException when an error occurs
- */
- public void initialize(short source, Task initTask)
- throws DirectoryException
- {
- acquireIEContext();
- ieContext.initializeTask = initTask;
-
- InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
- baseDN, serverId, source);
-
- // Publish Init request msg
- broker.publish(initializeMsg);
-
- // .. we expect to receive entries or err after that
- }
-
- /**
- * Verifies that the given string represents a valid source
- * from which this server can be initialized.
- * @param sourceString The string representaing the source
- * @return The source as a short value
- * @throws DirectoryException if the string is not valid
- */
- public short decodeSource(String sourceString)
- throws DirectoryException
- {
- short source = 0;
- Throwable cause = null;
- try
- {
- source = Integer.decode(sourceString).shortValue();
- if (source >= -1)
- {
- // TODO Verifies serverID is in the domain
- // We shold check here that this is a server implied
- // in the current domain.
-
- log("Source decoded for import:" + source);
- return source;
- }
- }
- catch(Exception e)
- {
- cause = e;
- }
-
- ResultCode resultCode = ResultCode.OTHER;
- int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
- String message = getMessage(errorMessageID);
-
- if (cause != null)
- throw new DirectoryException(
- resultCode, message, errorMessageID, cause);
- else
- throw new DirectoryException(
- resultCode, message, errorMessageID);
- }
-
- /**
- * Verifies that the given string represents a valid source
- * from which this server can be initialized.
- * @param targetString The string representing the source
- * @return The source as a short value
- * @throws DirectoryException if the string is not valid
- */
- public short decodeTarget(String targetString)
- throws DirectoryException
- {
- short target = 0;
- Throwable cause;
- if (targetString.equalsIgnoreCase("all"))
- {
- return RoutableMessage.ALL_SERVERS;
- }
-
- // So should be a serverID
- try
- {
- target = Integer.decode(targetString).shortValue();
- if (target >= 0)
- {
- // FIXME Could we check now that it is a know server in the domain ?
- }
- return target;
- }
- catch(Exception e)
- {
- cause = e;
- }
- ResultCode resultCode = ResultCode.OTHER;
- int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
- String message = getMessage(errorMessageID);
-
- if (cause != null)
- throw new DirectoryException(
- resultCode, message, errorMessageID, cause);
- else
- throw new DirectoryException(
- resultCode, message, errorMessageID);
-
- }
-
- private synchronized void acquireIEContext()
- throws DirectoryException
- {
- if (ieContext != null)
- {
- // Rejects 2 simultaneous exports
- int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
- String message = getMessage(msgID);
- throw new DirectoryException(ResultCode.OTHER,
- message, msgID);
- }
-
- ieContext = new IEContext();
- }
-
- private synchronized void releaseIEContext()
- {
- ieContext = null;
- }
-
- /**
- * Process the initialization of some other server or servers in the topology
- * specified by the target argument.
- * @param target The target that should be initialized
- * @param initTask The task that triggers this initialization and that should
- * be updated with its progress.
- * @exception DirectoryException When an error occurs.
- */
- public void initializeTarget(short target, Task initTask)
- throws DirectoryException
- {
- initializeTarget(target, serverId, initTask);
- }
-
- /**
- * Process the initialization of some other server or servers in the topology
- * specified by the target argument when this initialization has been
- * initiated by another server than this one.
- * @param target The target that should be initialized.
- * @param requestorID The server that initiated the export.
- * @param initTask The task that triggers this initialization and that should
- * be updated with its progress.
- * @exception DirectoryException When an error occurs.
- */
- public void initializeTarget(short target, short requestorID, Task initTask)
- throws DirectoryException
- {
- acquireIEContext();
-
- ieContext.exportTarget = target;
- ieContext.initializeTask = initTask;
- ieContext.initTaskCounters(backend.getEntryCount());
-
- // Send start message
- InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
- baseDN, serverId, ieContext.exportTarget, requestorID,
- ieContext.entryLeftCount);
-
- log("SD : publishes " + initializeMessage +
- " for #entries=" + ieContext.entryCount);
-
- broker.publish(initializeMessage);
-
- // make an export and send entries
- exportBackend();
-
- // Successfull termnation
- DoneMessage doneMsg = new DoneMessage(serverId,
- initializeMessage.getDestination());
- broker.publish(doneMsg);
-
- if (ieContext != null)
- {
- ieContext.updateTaskCompletionState();
- ieContext = null;
- }
- }
-
- /**
- * Process backend before import.
- * @param backend The backend.
- * @param backendConfigEntry The config entry of the backend.
- * @throws Exception
- */
- private void preBackendImport(Backend backend,
- ConfigEntry backendConfigEntry)
- throws Exception
- {
- // Stop saving state
- stateSavingDisabled = true;
-
- // Clear the backend
- clearJEBackend(false,backend.getBackendID(),null);
-
- // FIXME setBackendEnabled should be part of TaskUtils ?
- TaskUtils.setBackendEnabled(backendConfigEntry, false);
-
- // Acquire an exclusive lock for the backend.
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
- if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
- String message = getMessage(msgID, backend.getBackendID(),
- String.valueOf(failureReason));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- throw new DirectoryException(ResultCode.OTHER, message, msgID);
- }
- }
-
- /**
- * Initializes the domain's backend with received entries.
- * @param initializeMessage The message that initiated the import.
- * @exception DirectoryException Thrown when an error occurs.
- */
- protected void importBackend(InitializeTargetMessage initializeMessage)
- throws DirectoryException
- {
- LDIFImportConfig importConfig = null;
- try
- {
- log("startImport");
-
- if (initializeMessage.getRequestorID() == serverId)
- {
- // The import responds to a request we did so the IEContext
- // is already acquired
- }
- else
- {
- acquireIEContext();
- }
-
- ieContext.importSource = initializeMessage.getsenderID();
- ieContext.entryLeftCount = initializeMessage.getEntryCount();
- ieContext.initTaskCounters(initializeMessage.getEntryCount());
-
- preBackendImport(this.backend, this.backendConfigEntry);
-
- DN[] baseDNs = {baseDN};
- ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
- importConfig =
- new LDIFImportConfig(ieContext.ldifImportInputStream);
- importConfig.setIncludeBranches(this.branches);
-
- // TODO How to deal with rejected entries during the import
- // importConfig.writeRejectedEntries("rejectedImport",
- // ExistingFileBehavior.OVERWRITE);
-
- // Process import
- this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
-
- stateSavingDisabled = false;
-
- // Re-exchange state with SS
- broker.stop();
- broker.start(changelogServers);
-
- }
- catch(Exception e)
- {
- throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
- 2);// FIXME
- }
- finally
- {
- // Cleanup
- importConfig.close();
-
- // Re-enable backend
- closeBackendImport(this.backend, this.backendConfigEntry);
-
- // Update the task that initiated the import
- if ((ieContext != null ) && (ieContext.initializeTask != null))
- {
- ((InitializeTask)ieContext.initializeTask).
- setState(ieContext.updateTaskCompletionState(),ieContext.exception);
- }
-
- releaseIEContext();
-
- log("End importBackend");
- }
- // Success
- }
-
- /**
- * Make post import operations.
- * @param backend The backend implied in the import.
- * @param backendConfigEntry The config entry of the backend.
- * @exception DirectoryException Thrown when an error occurs.
- */
- protected void closeBackendImport(Backend backend,
- ConfigEntry backendConfigEntry)
- throws DirectoryException
- {
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
-
- // Release lock
- if (!LockFileManager.releaseLock(lockFile, failureReason))
- {
- int msgID = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
- String message = getMessage(msgID, backend.getBackendID(),
- String.valueOf(failureReason));
- logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- new DirectoryException(ResultCode.OTHER, message, msgID);
- }
-
- // FIXME setBackendEnabled should be part taskUtils ?
- TaskUtils.setBackendEnabled(backendConfigEntry, true);
- }
-
- /**
- * Retrieves a synchronization domain based on the baseDN.
- *
- * @param baseDN The baseDN of the domain to retrieve
- * @return The domain retrieved
- * @throws DirectoryException When an error occured.
- */
- public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
- throws DirectoryException
- {
- SynchronizationDomain synchronizationDomain = null;
-
- // Retrieves the domain
- DirectoryServer.getSynchronizationProviders();
- for (SynchronizationProvider provider :
- DirectoryServer.getSynchronizationProviders())
- {
- if (!( provider instanceof MultimasterSynchronization))
- {
- int msgID = LogMessages.MSGID_INVALID_PROVIDER;
- String message = getMessage(msgID);
- throw new DirectoryException(ResultCode.OTHER,
- message, msgID);
- }
-
- // From the domainDN retrieves the synchronization domain
- SynchronizationDomain sdomain =
- MultimasterSynchronization.findDomain(baseDN, null);
- if (sdomain == null)
- {
- int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
- String message = getMessage(msgID) + " " + baseDN;
- throw new DirectoryException(ResultCode.OTHER,
- message, msgID);
- }
-
- if (synchronizationDomain != null)
- {
- // Should never happen
- int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
- String message = getMessage(msgID);
- throw new DirectoryException(ResultCode.OTHER,
- message, msgID);
- }
- synchronizationDomain = sdomain;
- }
- return synchronizationDomain;
- }
-
- /**
- * Returns the backend associated to this domain.
- * @return The associated backend.
- */
- public Backend getBackend()
- {
- return backend;
- }
-
- /**
- * Returns a boolean indiciating if an import or export is currently
- * processed.
- * @return The status
- */
- public boolean ieRunning()
- {
- return (ieContext != null);
- }
- /*
- * <<Total Update
- */
-
-
/**
* Push the modifications contain the in given parameter has
* a modification that would happen on a local server.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
index d1ceb76..b57cc5d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -112,16 +112,11 @@
/* Read the first 8 bytes containing the packet length */
int length = 0;
- /* Let's start the stop-watch before waiting on read */
- /* for the heartbeat check to be operationnal */
- lastReceiveTime = System.currentTimeMillis();
-
while (length<8)
{
int read = input.read(rcvLengthBuf, length, 8-length);
if (read == -1)
{
- lastReceiveTime=0;
throw new IOException("no more data");
}
else
@@ -140,9 +135,8 @@
{
length += input.read(buffer, length, totalLength - length);
}
- /* We do not want the heartbeat to close the session when */
- /* we are processing a message even a time consuming one. */
- lastReceiveTime=0;
+
+ lastReceiveTime = System.currentTimeMillis();
return SynchronizationMessage.generateMsg(buffer);
}
catch (OutOfMemoryError e)
@@ -165,10 +159,6 @@
*/
public long getLastReceiveTime()
{
- if (lastReceiveTime==0)
- {
- return System.currentTimeMillis();
- }
return lastReceiveTime;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
index 59777e5..aab19be 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -47,13 +47,6 @@
static final byte MSG_TYPE_CHANGELOG_START = 7;
static final byte MSG_TYPE_WINDOW = 8;
static final byte MSG_TYPE_HEARTBEAT = 9;
- static final byte MSG_TYPE_INITIALIZE_REQUEST = 10;
- static final byte MSG_TYPE_INITIALIZE_TARGET = 11;
- static final byte MSG_TYPE_ENTRY = 12;
- static final byte MSG_TYPE_DONE = 13;
- static final byte MSG_TYPE_ERROR = 14;
- // Adding a new type of message here probably requires to
- // change accordingly generateMsg method below
/**
* Return the byte[] representation of this message.
@@ -67,11 +60,6 @@
* MSG_TYPE_CHANGELOG_START
* MSG_TYPE_WINDOW
* MSG_TYPE_HEARTBEAT
- * MSG_TYPE_INITIALIZE
- * MSG_TYPE_INITIALIZE_TARGET
- * MSG_TYPE_ENTRY
- * MSG_TYPE_DONE
- * MSG_TYPE_ERROR
*
* @return the byte[] representation of this message.
*/
@@ -119,21 +107,6 @@
case MSG_TYPE_HEARTBEAT:
msg = new HeartbeatMessage(buffer);
break;
- case MSG_TYPE_INITIALIZE_REQUEST:
- msg = new InitializeRequestMessage(buffer);
- break;
- case MSG_TYPE_INITIALIZE_TARGET:
- msg = new InitializeTargetMessage(buffer);
- break;
- case MSG_TYPE_ENTRY:
- msg = new EntryMessage(buffer);
- break;
- case MSG_TYPE_DONE:
- msg = new DoneMessage(buffer);
- break;
- case MSG_TYPE_ERROR:
- msg = new ErrorMessage(buffer);
- break;
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index f45713f..f89e5a2 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.synchronization;
-import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -55,8 +54,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ByteStringFactory;
-import org.opends.server.types.ErrorLogCategory;
-import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -161,11 +158,7 @@
}
}
catch (Exception e)
- {
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "SynchronizationTestCase/openChangelogSession" + e.getMessage(), 1);
- }
+ { }
}
return broker;
}
@@ -228,8 +221,7 @@
}
}
catch (Exception e)
- {
- }
+ { }
}
return broker;
}
@@ -239,10 +231,6 @@
*/
protected void cleanEntries()
{
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "SynchronizationTestCase/Cleaning entries" , 1);
-
DeleteOperation op;
// Delete entries
try
@@ -250,10 +238,6 @@
while (true)
{
DN dn = entryList.removeLast();
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "cleaning entry " + dn, 1);
-
op = new DeleteOperation(connection, InternalClientConnection
.nextOperationID(), InternalClientConnection.nextMessageID(), null,
dn);
@@ -280,13 +264,8 @@
// WORKAROUND FOR BUG #639 - BEGIN -
if (mms != null)
{
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "SynchronizationTestCase/FinalizeSynchronization Provider" , 1);
-
DirectoryServer.deregisterSynchronizationProvider(mms);
mms.finalizeSynchronizationProvider();
- mms = null;
}
// WORKAROUND FOR BUG #639 - END -
@@ -324,22 +303,17 @@
//
// Add the changelog server
- if (changeLogEntry!=null)
- {
- DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
- assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
+ DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
"Unable to add the changeLog server");
- entryList.add(changeLogEntry.getDN());
- }
-
- if (synchroServerEntry!=null)
- {
- // We also have a replicated suffix (synchronization domain)
- DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
- assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
- "Unable to add the synchronized suffix");
- entryList.add(synchroServerEntry.getDN());
- }
+ entryList.add(changeLogEntry.getDN());
+
+ //
+ // We also have a replicated suffix (synchronization domain)
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the synchronized server");
+ entryList.add(synchroServerEntry.getDN());
}
/**
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index dcf23e0..0b1bc2e 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -611,7 +611,7 @@
+ "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
+ "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
+ "ds-cfg-window-size: 100" + "\n"
- + "ds-cfg-changelog-db-directory: changelogDb"+i;
+ + "ds-cfg-changelog-db-dirname: changelogDb"+i;
Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
changelogs[i] = new Changelog(changelogConfig);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
index 0dccba8..20e8a20 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -26,18 +26,16 @@
*/
package org.opends.server.synchronization.protocol;
-import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.UUID;
import java.util.zip.DataFormatException;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -58,8 +56,8 @@
import org.opends.server.types.ObjectClass;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
+
+import static org.opends.server.synchronization.protocol.OperationContext.*;
/**
* Test the contructors, encoders and decoders of the synchronization
@@ -522,104 +520,6 @@
}
/**
- * Test that EntryMessage encoding and decoding works
- * by checking that : msg == new EntryMessageTest(msg.getBytes()).
- */
- @Test()
- public void EntryMessageTest() throws Exception
- {
- String taskInitFromS2 = new String(
- "dn: ds-task-id=" + UUID.randomUUID() +
- ",cn=Scheduled Tasks,cn=Tasks\n" +
- "objectclass: top\n" +
- "objectclass: ds-task\n" +
- "objectclass: ds-task-initialize\n" +
- "ds-task-class-name: org.opends.server.tasks.InitializeTask" +
- "ds-task-initialize-domain-dn: dc=example,dc=com" +
- "ds-task-initialize-source: 1");
- short sender = 1;
- short target = 2;
- byte[] entry = taskInitFromS2.getBytes();
- EntryMessage msg = new EntryMessage(sender, target, entry);
- EntryMessage newMsg = new EntryMessage(msg.getBytes());
- assertEquals(msg.getsenderID(), newMsg.getsenderID());
- assertEquals(msg.getDestination(), newMsg.getDestination());
- assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
- }
-
- /**
- * Test that InitializeRequestMessage encoding and decoding works
- */
- @Test()
- public void InitializeRequestMessageTest() throws Exception
- {
- short sender = 1;
- short target = 2;
- InitializeRequestMessage msg = new InitializeRequestMessage(
- DN.decode("dc=example"), sender, target);
- InitializeRequestMessage newMsg = new InitializeRequestMessage(msg.getBytes());
- assertEquals(msg.getsenderID(), newMsg.getsenderID());
- assertEquals(msg.getDestination(), newMsg.getDestination());
- assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
- }
-
- /**
- * Test that InitializeTargetMessage encoding and decoding works
- */
- @Test()
- public void InitializeTargetMessageTest() throws Exception
- {
- short senderID = 1;
- short targetID = 2;
- short requestorID = 3;
- long entryCount = 4;
- DN baseDN = DN.decode("dc=example");
-
- InitializeTargetMessage msg = new InitializeTargetMessage(
- baseDN, senderID, targetID, requestorID, entryCount);
- InitializeTargetMessage newMsg = new InitializeTargetMessage(msg.getBytes());
- assertEquals(msg.getsenderID(), newMsg.getsenderID());
- assertEquals(msg.getDestination(), newMsg.getDestination());
- assertEquals(msg.getRequestorID(), newMsg.getRequestorID());
- assertEquals(msg.getEntryCount(), newMsg.getEntryCount());
- assertTrue(msg.getBaseDN().equals(newMsg.getBaseDN())) ;
-
- assertEquals(senderID, newMsg.getsenderID());
- assertEquals(targetID, newMsg.getDestination());
- assertEquals(requestorID, newMsg.getRequestorID());
- assertEquals(entryCount, newMsg.getEntryCount());
- assertTrue(baseDN.equals(newMsg.getBaseDN())) ;
-
- }
-
- /**
- * Test that DoneMessage encoding and decoding works
- */
- @Test()
- public void DoneMessage() throws Exception
- {
- DoneMessage msg = new DoneMessage((short)1, (short)2);
- DoneMessage newMsg = new DoneMessage(msg.getBytes());
- assertEquals(msg.getsenderID(), newMsg.getsenderID());
- assertEquals(msg.getDestination(), newMsg.getDestination());
- }
-
- /**
- * Test that ErrorMessage encoding and decoding works
- */
- @Test()
- public void ErrorMessage() throws Exception
- {
- ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
- ErrorMessage newMsg = new ErrorMessage(msg.getBytes());
- assertEquals(msg.getsenderID(), newMsg.getsenderID());
- assertEquals(msg.getDestination(), newMsg.getDestination());
- assertEquals(msg.getMsgID(), newMsg.getMsgID());
- assertEquals(msg.getDetails(), newMsg.getDetails());
- }
-
-
- /**
* Test PendingChange
*/
private void testPendingChange(ChangeNumber cn, Operation op, SynchronizationMessage msg)
--
Gitblit v1.10.0