issue 508
These changes implement a window mechanism in the sycnhronization protocol.
Up to now the flow control mechanism used by the synchronization
was the TCP flow control mechanism. However, since TCP is not aware about
the type of the synchronization mechanism this was not allowing sending
of ACK messages when the TCP connection was saturated.
This was also preventing the implementation of the prioritized synchronization.
With these changes the TCP windows are set to a very large value and the
flow control is based on a configurable window size on the changelog servers
and on the LDAP servers.
These changes also add monitoring informations for the current and max window sizes.
I also took the opportunity to remove most the static variables and methods that were
preventing multi instantiation of the Changelog class.
I have also added tests for the incoding/decoding of ServerStartMessage and
ChangelogStartMessage, WindowMessage and for testing the window mechanism.
Also add the possibility to choose the port number used by the LDAP server when running
the unit test using property : org.opends.server.LdapPort
This can be usefull for debugging purpose.
reviewed by Daniel
3 files added
19 files modified
| | |
| | | NAME 'ds-cfg-bind-with-dn-requires-password' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.288 |
| | | NAME 'ds-cfg-window-size' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.164 |
| | | NAME 'ds-cfg-max-receive-queue' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE |
| | |
| | | STRUCTURAL MUST ( ds-cfg-changelog-server $ ds-cfg-directory-server-id |
| | | $ ds-cfg-synchronization-dn ) |
| | | MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $ |
| | | ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay ) |
| | | ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $ |
| | | ds-cfg-window-size ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.59 |
| | | NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator |
| | |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME |
| | | 'ds-cfg-synchronization-changelog-server-config' SUP top |
| | | STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port ) |
| | | MAY ( ds-cfg-changelog-server $ cn ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory' |
| | | SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | |
| | | */ |
| | | public class Changelog implements Runnable, ConfigurableComponent |
| | | { |
| | | static private short serverId; |
| | | static private String serverURL; |
| | | private short serverId; |
| | | private String serverURL; |
| | | |
| | | private static ServerSocket listenSocket; |
| | | private static Thread myListenThread; |
| | | private static Thread myConnectThread; |
| | | private ServerSocket listenSocket; |
| | | private Thread myListenThread; |
| | | private Thread myConnectThread; |
| | | |
| | | private static boolean runListen = true; |
| | | private boolean runListen = true; |
| | | |
| | | /* The list of changelog servers configured by the administrator */ |
| | | private List<String> changelogServers; |
| | |
| | | /* This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | | */ |
| | | private static HashMap<DN, ChangelogCache> baseDNs = |
| | | private HashMap<DN, ChangelogCache> baseDNs = |
| | | new HashMap<DN, ChangelogCache>(); |
| | | |
| | | private String localURL = "null"; |
| | | private static boolean shutdown = false; |
| | | private boolean shutdown = false; |
| | | private short changelogServerId; |
| | | private DN configDn; |
| | | private List<ConfigAttribute> configAttributes = |
| | | new ArrayList<ConfigAttribute>(); |
| | | private ChangelogDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | |
| | | static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; |
| | | static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id"; |
| | | static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port"; |
| | | static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; |
| | | |
| | | static final IntegerConfigAttribute changelogPortStub = |
| | | new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port", |
| | |
| | | "changelog server information", true, |
| | | true, false); |
| | | |
| | | static final IntegerConfigAttribute windowStub = |
| | | new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | /** |
| | | * Check if a ConfigEntry is valid. |
| | | * @param config The config entry that needs to be checked. |
| | |
| | | } |
| | | configAttributes.add(changelogServer); |
| | | |
| | | IntegerConfigAttribute windowAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(windowStub); |
| | | if (windowAttr == null) |
| | | rcvWindow = 100; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | rcvWindow = windowAttr.activeIntValue(); |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | initialize(changelogServerId, changelogPort); |
| | | |
| | | configDn = config.getDN(); |
| | |
| | | try |
| | | { |
| | | newSocket = listenSocket.accept(); |
| | | newSocket.setReceiveBufferSize(1000000); |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(newSocket)); |
| | | handler.start(null); |
| | | handler.start(null, serverId, serverURL, rcvWindow, this); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore |
| | |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.connect(ServerAddr, 500); |
| | | |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(socket)); |
| | | handler.start(baseDn); |
| | | handler.start(baseDn, serverId, serverURL, rcvWindow, this); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | |
| | | * Initialize the changelog database. |
| | | * TODO : the changelog db path should be configurable |
| | | */ |
| | | ChangelogDB.initialize(DirectoryServer.getServerRoot() + File.separator |
| | | + "changelogDb"); |
| | | dbEnv = new ChangelogDbEnv( |
| | | DirectoryServer.getServerRoot() + File.separator + "changelogDb", |
| | | this); |
| | | |
| | | /* |
| | | * create changelog cache |
| | |
| | | String localAdddress = InetAddress.getLocalHost().getHostAddress(); |
| | | serverURL = localhostname + ":" + String.valueOf(changelogPort); |
| | | localURL = localAdddress + ":" + String.valueOf(changelogPort); |
| | | listenSocket = new ServerSocket(changelogPort); |
| | | listenSocket = new ServerSocket(); |
| | | listenSocket.setReceiveBufferSize(1000000); |
| | | listenSocket.bind(new InetSocketAddress(changelogPort)); |
| | | |
| | | /* |
| | | * create working threads |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the unique identifier for this changelog. |
| | | * |
| | | * @return The unique identifier for this changelog. |
| | | */ |
| | | public static short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the host and port for this changelog, separated by a colon. |
| | | * |
| | | * @return The host and port for this changelog, separated by a colon. |
| | | */ |
| | | public static String getServerURL() |
| | | { |
| | | return serverURL; |
| | | } |
| | | |
| | | /** |
| | | * Get the ChangelogCache associated to the base DN given in parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ChangelogCache must be returned. |
| | | * @return The ChangelogCache associated to the base DN given in parameter. |
| | | */ |
| | | public static ChangelogCache getChangelogCache(DN baseDn) |
| | | public ChangelogCache getChangelogCache(DN baseDn) |
| | | { |
| | | ChangelogCache changelogCache; |
| | | |
| | |
| | | { |
| | | changelogCache = baseDNs.get(baseDn); |
| | | if (changelogCache == null) |
| | | changelogCache = new ChangelogCache(baseDn); |
| | | changelogCache = new ChangelogCache(baseDn, this); |
| | | baseDNs.put(baseDn, changelogCache); |
| | | } |
| | | |
| | |
| | | /** |
| | | * Shutdown the Changelog service and all its connections. |
| | | */ |
| | | public static void shutdown() |
| | | public void shutdown() |
| | | { |
| | | shutdown = true; |
| | | |
| | |
| | | changelogCache.shutdown(); |
| | | } |
| | | |
| | | ChangelogDB.shutdownDbEnvironment(); |
| | | dbEnv.shutdown(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Creates a new DB handler for this Changelog and the serverId and |
| | | * DN given in parameter. |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @return The new DB handler for this Changelog and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | | */ |
| | | public DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException |
| | | { |
| | | return new DbHandler(id, baseDn, this, dbEnv); |
| | | } |
| | | } |
| | |
| | | */ |
| | | private Map<Short, DbHandler> sourceDbHandlers = |
| | | new ConcurrentHashMap<Short, DbHandler>(); |
| | | private Changelog changelog; |
| | | |
| | | /** |
| | | * Creates a new ChangelogCache associated to the DN baseDn. |
| | | * |
| | | * @param baseDn The baseDn associated to the ChangelogCache. |
| | | * @param changelog the Changelog that created this changelog cache. |
| | | */ |
| | | public ChangelogCache(DN baseDn) |
| | | public ChangelogCache(DN baseDn, Changelog changelog) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.changelog = changelog; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | dbHandler = new DbHandler(id, baseDn); |
| | | dbHandler = changelog.newDbHandler(id, baseDn); |
| | | } catch (DatabaseException e) |
| | | { |
| | | /* |
| | |
| | | * from at least one LDAP server. |
| | | * This changelog therefore can't do it's job properly anymore |
| | | * and needs to close all its connections and shutdown itself. |
| | | * TODO : log error |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR; |
| | | String message = getMessage(msgID) + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | return; |
| | | } |
| | | sourceDbHandlers.put(id, dbHandler); |
| | |
| | | /** |
| | | * creates a new ChangelogDB with specified identifier. |
| | | * @param id the identifier of the new ChangelogDB. |
| | | * @param baseDn the baseDn of the new ChangelogDB. |
| | | * @param db the new db. |
| | | * |
| | | * @throws DatabaseException If a database error happened. |
| | | */ |
| | | public void newDb(short id, DN baseDn) throws DatabaseException |
| | | public void newDb(short id, DbHandler db) throws DatabaseException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | sourceDbHandlers.put(id , new DbHandler(id, baseDn)); |
| | | sourceDbHandlers.put(id , db); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void ack(AckMessage message, short fromServerId) |
| | | { |
| | | /* |
| | | * there are 2 possible cases here : |
| | | * - the message that was acked comes from a server to which |
| | | * we are directly connected. |
| | | * In this case, we can find the handler from the connectedServers map |
| | | * - the message that was acked comes from a server to which we are not |
| | | * connected. |
| | | * In this case we need to find the changelog server that forwarded |
| | | * the change and send back the ack to this server. |
| | | */ |
| | | ServerHandler handler = connectedServers.get( |
| | | message.getChangeNumber().getServerId()); |
| | | if (handler != null) |
| | |
| | | { |
| | | return "ChangelogCache " + baseDn; |
| | | } |
| | | |
| | | |
| | | } |
| | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.util.List; |
| | | import java.io.File; |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | |
| | | import com.sleepycat.je.Cursor; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.Environment; |
| | | import com.sleepycat.je.EnvironmentConfig; |
| | | import com.sleepycat.je.Database; |
| | | import com.sleepycat.je.DatabaseConfig; |
| | | import com.sleepycat.je.LockMode; |
| | | import com.sleepycat.je.OperationStatus; |
| | | import com.sleepycat.je.Transaction; |
| | |
| | | */ |
| | | public class ChangelogDB |
| | | { |
| | | private static Environment dbEnvironment = null; |
| | | private Database db = null; |
| | | private static Database stateDb = null; |
| | | private String stringId = null; |
| | | private ChangelogDbEnv dbenv = null; |
| | | private Changelog changelog; |
| | | private Short serverId; |
| | | private DN baseDn; |
| | | |
| | | /** |
| | | * Creates a new database or open existing database that will be used |
| | | * to store and retrieve changes from an LDAP server. |
| | | * @param serverId Identifier of the LDAP server. |
| | | * @param baseDn baseDn of the LDAP server. |
| | | * @param changelog the Changelog that needs to be shutdown |
| | | * @param dbenv the Db encironemnet to use to create the db |
| | | * @throws DatabaseException if a database problem happened |
| | | */ |
| | | public ChangelogDB(Short serverId, DN baseDn) |
| | | public ChangelogDB(Short serverId, DN baseDn, Changelog changelog, |
| | | ChangelogDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | try { |
| | | stringId = serverId.toString() + " " + baseDn.toNormalizedString(); |
| | | byte[] byteId = stringId.getBytes("UTF-8"); |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn; |
| | | this.dbenv = dbenv; |
| | | this.changelog = changelog; |
| | | db = dbenv.getOrAddDb(serverId, baseDn); |
| | | |
| | | // Open the database. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | |
| | | db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // never happens |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Initialize this class. |
| | | * Creates Db environment that will be used to create databases. |
| | | * It also reads the currently known databases from the "changelogstate" |
| | | * database. |
| | | * @param path Path where the backing files must be created. |
| | | * @throws DatabaseException If a DatabaseException occured that prevented |
| | | * the initialization to happen. |
| | | * @throws ChangelogDBException If a changelog internal error caused |
| | | * a failure of the changelog processing. |
| | | */ |
| | | public static void initialize(String path) throws DatabaseException, |
| | | ChangelogDBException |
| | | { |
| | | EnvironmentConfig envConfig = new EnvironmentConfig(); |
| | | |
| | | /* Create the DB Environment that will be used for all |
| | | * the Changelog activities related to the db |
| | | */ |
| | | envConfig.setAllowCreate(true); |
| | | envConfig.setTransactional(true); |
| | | envConfig.setConfigParam("je.cleaner.expunge", "true"); |
| | | // TODO : the DB cache size should be configurable |
| | | // For now set 5M is OK for being efficient in 64M total for the JVM |
| | | envConfig.setConfigParam("je.maxMemory", "5000000"); |
| | | dbEnvironment = new Environment(new File(path), envConfig); |
| | | |
| | | /* |
| | | * One database is created to store the update from each LDAP |
| | | * server in the topology. |
| | | * The database "changelogstate" is used to store the list of all |
| | | * the servers that have been seen in the past. |
| | | */ |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | |
| | | stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig); |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | try |
| | | { |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data.getData(), "UTF-8"); |
| | | String[] str = stringData.split(" ", 2); |
| | | short serverId = new Short(str[0]); |
| | | DN baseDn = null; |
| | | try |
| | | { |
| | | baseDn = DN.decode(str[1]); |
| | | } catch (DirectoryException e) |
| | | { |
| | | int msgID = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER; |
| | | String message = getMessage(msgID, str[1]); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | Changelog.getChangelogCache(baseDn).newDb(serverId, baseDn); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | throw new ChangelogDBException(0, |
| | | "changelog state database has a wrong format"); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | throw new ChangelogDBException(0, "need UTF-8 support"); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | cursor.close(); |
| | | |
| | | } catch (DatabaseException dbe) { |
| | | cursor.close(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * add a list of changes to the underlying db. |
| | | * |
| | | * @param changes The list of changes to add to the underlying db. |
| | |
| | | |
| | | try |
| | | { |
| | | txn = dbEnvironment.beginTransaction(null, null); |
| | | txn = dbenv.beginTransaction(); |
| | | |
| | | for (UpdateMessage change : changes) |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | } |
| | | |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | if (txn != null) |
| | | { |
| | | try |
| | |
| | | } catch (DatabaseException e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_CLOSING_DATABASE; |
| | | String message = getMessage(msgID, stringId) + |
| | | String message = getMessage(msgID, this.toString()) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | return null; |
| | | } |
| | | } |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | * {@inheritDoc} |
| | | */ |
| | | public static void shutdownDbEnvironment() |
| | | @Override |
| | | public String toString() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | int msgID = MSGID_ERROR_CLOSING_CHANGELOG_ENV; |
| | | String message = getMessage(msgID) + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | return serverId.toString() + baseDn.toString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | private ChangelogCursor() throws DatabaseException |
| | | { |
| | | txn = dbEnvironment.beginTransaction(null, null); |
| | | txn = dbenv.beginTransaction(); |
| | | cursor = db.openCursor(txn, null); |
| | | } |
| | | |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | if (txn != null) |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.changelog; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.opends.server.messages.MessageHandler.getMessage; |
| | | import static org.opends.server.synchronization.SynchMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.File; |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | |
| | | import com.sleepycat.je.Cursor; |
| | | import com.sleepycat.je.Database; |
| | | import com.sleepycat.je.DatabaseConfig; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.Environment; |
| | | import com.sleepycat.je.EnvironmentConfig; |
| | | import com.sleepycat.je.LockMode; |
| | | import com.sleepycat.je.OperationStatus; |
| | | import com.sleepycat.je.Transaction; |
| | | |
| | | /** |
| | | * This class is used to represent a Db environement that can be used |
| | | * to create ChangelogDB. |
| | | */ |
| | | public class ChangelogDbEnv |
| | | { |
| | | private Environment dbEnvironment = null; |
| | | private Database stateDb = null; |
| | | private Changelog changelog = null; |
| | | |
| | | /** |
| | | * Initialize this class. |
| | | * Creates Db environment that will be used to create databases. |
| | | * It also reads the currently known databases from the "changelogstate" |
| | | * database. |
| | | * @param path Path where the backing files must be created. |
| | | * @param changelog the Changelog that creates this ChangelogDbEnv. |
| | | * @throws DatabaseException If a DatabaseException occured that prevented |
| | | * the initialization to happen. |
| | | * @throws ChangelogDBException If a changelog internal error caused |
| | | * a failure of the changelog processing. |
| | | */ |
| | | public ChangelogDbEnv(String path, Changelog changelog) |
| | | throws DatabaseException, ChangelogDBException |
| | | { |
| | | this.changelog = changelog; |
| | | EnvironmentConfig envConfig = new EnvironmentConfig(); |
| | | |
| | | /* Create the DB Environment that will be used for all |
| | | * the Changelog activities related to the db |
| | | */ |
| | | envConfig.setAllowCreate(true); |
| | | envConfig.setTransactional(true); |
| | | envConfig.setConfigParam("je.cleaner.expunge", "true"); |
| | | // TODO : the DB cache size should be configurable |
| | | // For now set 5M is OK for being efficient in 64M total for the JVM |
| | | envConfig.setConfigParam("je.maxMemory", "5000000"); |
| | | dbEnvironment = new Environment(new File(path), envConfig); |
| | | |
| | | /* |
| | | * One database is created to store the update from each LDAP |
| | | * server in the topology. |
| | | * The database "changelogstate" is used to store the list of all |
| | | * the servers that have been seen in the past. |
| | | */ |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | |
| | | stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig); |
| | | start(); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Read the list of known servers from the database and start dbHandler |
| | | * for each of them. |
| | | * |
| | | * @throws DatabaseException in case of underlying DatabaseException |
| | | * @throws ChangelogDBException when the information from the database |
| | | * cannot be decoded correctly. |
| | | */ |
| | | private void start() throws DatabaseException, ChangelogDBException |
| | | { |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | try |
| | | { |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data.getData(), "UTF-8"); |
| | | String[] str = stringData.split(" ", 2); |
| | | short serverId = new Short(str[0]); |
| | | DN baseDn = null; |
| | | try |
| | | { |
| | | baseDn = DN.decode(str[1]); |
| | | } catch (DirectoryException e) |
| | | { |
| | | int msgID = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER; |
| | | String message = getMessage(msgID, str[1]); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, changelog, this); |
| | | changelog.getChangelogCache(baseDn).newDb(serverId, dbHandler); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | throw new ChangelogDBException(0, |
| | | "changelog state database has a wrong format"); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | throw new ChangelogDBException(0, "need UTF-8 support"); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | cursor.close(); |
| | | |
| | | } catch (DatabaseException dbe) { |
| | | cursor.close(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Find or create the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | | * @param serverId The server id that identifies the server. |
| | | * @param baseDn The baseDn that identifies the server. |
| | | * @return the Database. |
| | | * @throws DatabaseException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(Short serverId, DN baseDn) |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | { |
| | | String stringId = serverId.toString() + " " + baseDn.toNormalizedString(); |
| | | byte[] byteId; |
| | | |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | |
| | | // Open the database. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | return db; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new transaction. |
| | | * |
| | | * @return the transaction. |
| | | * @throws DatabaseException in case of underlying database Exception. |
| | | */ |
| | | public Transaction beginTransaction() throws DatabaseException |
| | | { |
| | | return dbEnvironment.beginTransaction(null, null); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | int msgID = MSGID_ERROR_CLOSING_CHANGELOG_ENV; |
| | | String message = getMessage(msgID) + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | * |
| | | * @param id Identifier of the DB. |
| | | * @param baseDn of the DB. |
| | | * @param changelog the Changelog that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the Changelog DB. |
| | | * @throws DatabaseException If a database problem happened |
| | | */ |
| | | public DbHandler(short id, DN baseDn) throws DatabaseException |
| | | public DbHandler(short id, DN baseDn, Changelog changelog, |
| | | ChangelogDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | this.serverId = id; |
| | | this.baseDn = baseDn; |
| | | db = new ChangelogDB(id, baseDn); |
| | | db = new ChangelogDB(id, baseDn, changelog, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | thread = new DirectoryThread(this, "changelog db " + id + " " + baseDn); |
| | |
| | | {} |
| | | } |
| | | } |
| | | |
| | | while (msgQueue.size() != 0) |
| | | flush(); |
| | | |
| | | db.shutdown(); |
| | | } |
| | | |
| | |
| | | import java.util.Map; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigEntry; |
| | |
| | | import org.opends.server.synchronization.ServerState; |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | import org.opends.server.synchronization.UpdateMessage; |
| | | import org.opends.server.synchronization.WindowMessage; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | /** |
| | |
| | | private MsgQueue msgQueue = new MsgQueue(); |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>();; |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ChangelogCache changelogCache = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | |
| | | private ServerWriter writer = null; |
| | | private DN baseDn = null; |
| | | private String serverAddressURL; |
| | | private int rcvWindow; |
| | | private int rcvWindowSizeHalf; |
| | | private int maxRcvWindow; |
| | | private ServerReader reader; |
| | | private Semaphore sendWindow; |
| | | private int sendWindowSize; |
| | | |
| | | private static Map<ChangeNumber, ChangelogAckMessageList> |
| | | changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); |
| | |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection. |
| | | * @param changelogId The identifier of the changelog that creates this |
| | | * server handler. |
| | | * @param changelogURL The URL of the changelog that creates this |
| | | * server handler. |
| | | * @param windowSize the window size that this server handler must use. |
| | | * @param changelog the Changelog that created this server handler. |
| | | */ |
| | | public void start(DN baseDn) |
| | | public void start(DN baseDn, short changelogId, String changelogURL, |
| | | int windowSize, Changelog changelog) |
| | | { |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | try |
| | | { |
| | | if (baseDn != null) |
| | | { |
| | | this.baseDn = baseDn; |
| | | changelogCache = Changelog.getChangelogCache(baseDn); |
| | | changelogCache = changelog.getChangelogCache(baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage msg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | baseDn, localServerState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | baseDn, windowSize, localServerState); |
| | | |
| | | session.publish(msg); |
| | | } |
| | |
| | | restartSendDelay = 0; |
| | | serverIsLDAPserver = true; |
| | | |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage myStartMsg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | this.baseDn, localServerState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, localServerState); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else if (msg.getClass() == Class.forName( |
| | | "org.opends.server.synchronization.ChangelogStartMessage")) |
| | | else if (msg instanceof ChangelogStartMessage) |
| | | { |
| | | ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg; |
| | | serverId = receivedMsg.getServerId(); |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState serverState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage outMsg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | this.baseDn, serverState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, serverState); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | | this.baseDn = baseDn; |
| | | this.serverState = receivedMsg.getServerState(); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else |
| | | { |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | |
| | | |
| | | writer = new ServerWriter(session, serverId, this, changelogCache); |
| | | |
| | | ServerReader reader = new ServerReader(session, serverId, this, |
| | | reader = new ServerReader(session, serverId, this, |
| | | changelogCache); |
| | | |
| | | reader.start(); |
| | |
| | | // ignore |
| | | } |
| | | } |
| | | |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public UpdateMessage take() |
| | | { |
| | | boolean interrupted = true; |
| | | UpdateMessage msg = getnextMessage(); |
| | | do { |
| | | try |
| | | { |
| | | sendWindow.acquire(); |
| | | interrupted = false; |
| | | } catch (InterruptedException e) |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while (interrupted); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | |
| | | /** |
| | | * Get the next update that must be sent to the server |
| | | * from the message queue or from the database. |
| | | * |
| | | * @return The next update that must be sent to the server. |
| | | */ |
| | | private UpdateMessage getnextMessage() |
| | | { |
| | | UpdateMessage msg; |
| | | do |
| | | { |
| | |
| | | msg1 = msgQueue.removeFirst(); |
| | | } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); |
| | | this.updateServerState(msg); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | /* get the next change from the lateQueue */ |
| | | msg = lateQueue.removeFirst(); |
| | | this.updateServerState(msg); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | * by the other server. |
| | | * Otherwise just loop to select the next message. |
| | | */ |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | String str = changelogCache.getBaseDn().toString() + |
| | | String str = baseDn.toString() + |
| | | " " + serverURL + " " + String.valueOf(serverId); |
| | | |
| | | if (serverIsLDAPserver) |
| | |
| | | attributes.add(new Attribute("server-id", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", |
| | | changelogCache.getBaseDn().toString())); |
| | | baseDn.toString())); |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | attributes.add(new Attribute("update-waiting-acks", |
| | |
| | | String.valueOf(getInAckCount()))); |
| | | attributes.add(new Attribute("approximate-delay", |
| | | String.valueOf(getApproxDelay()))); |
| | | attributes.add(new Attribute("max-send-window", |
| | | String.valueOf(sendWindowSize))); |
| | | attributes.add(new Attribute("current-send-window", |
| | | String.valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(new Attribute("max-rcv-window", |
| | | String.valueOf(maxRcvWindow))); |
| | | attributes.add(new Attribute("current-rcv-window", |
| | | String.valueOf(rcvWindow))); |
| | | long olderUpdateTime = getOlderUpdateTime(); |
| | | if (olderUpdateTime != 0) |
| | | { |
| | |
| | | |
| | | return localString; |
| | | } |
| | | |
| | | /** |
| | | * Check the protocol window and send WindowMessage if necessary. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void checkWindow() throws IOException |
| | | { |
| | | rcvWindow--; |
| | | if (rcvWindow < rcvWindowSizeHalf) |
| | | { |
| | | WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the send window size based on the credit specified in the |
| | | * given window message. |
| | | * |
| | | * @param windowMsg The Window Message containing the information |
| | | * necessary for updating the window size. |
| | | */ |
| | | public void updateWindow(WindowMessage windowMsg) |
| | | { |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.synchronization.AckMessage; |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | import org.opends.server.synchronization.UpdateMessage; |
| | | import org.opends.server.synchronization.WindowMessage; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | |
| | |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.checkWindow(); |
| | | changelogCache.put(update, handler); |
| | | } |
| | | else if (msg instanceof WindowMessage) |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | handler.updateWindow(windowMsg); |
| | | } |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | |
| | | public SocketSession(Socket socket) throws IOException |
| | | { |
| | | this.socket = socket; |
| | | /* |
| | | * Use a window instead of the TCP flow control. |
| | | * Therefore set a very large value for send and receive buffer sizes. |
| | | */ |
| | | input = socket.getInputStream(); |
| | | output = socket.getOutputStream(); |
| | | } |
| | |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.io.IOException; |
| | | import java.net.ConnectException; |
| | | import java.net.InetAddress; |
| | |
| | | private int maxReceiveDelay; |
| | | private int maxSendQueue; |
| | | private int maxReceiveQueue; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow; |
| | | private int halfRcvWindow; |
| | | private int maxRcvWindow; |
| | | private int timeout = 0; |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular SynchronizationDomain. |
| | |
| | | * @param maxSendQueue The maximum size of the send queue to use on |
| | | * the changelog server. |
| | | * @param maxSendDelay The maximum send delay to use on the changelog server. |
| | | * @param window The size of the send and receive window to use. |
| | | */ |
| | | public ChangelogBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay ) |
| | | int maxSendDelay, int window) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | |
| | | this.state = state; |
| | | replayOperations = |
| | | new TreeSet<FakeOperation>(new FakeOperationComparator()); |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window/2; |
| | | } |
| | | |
| | | /** |
| | |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.connect(ServerAddr, 500); |
| | | session = new SocketSession(socket); |
| | | |
| | |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage( serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | state); |
| | | halfRcvWindow*2, state); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ChangelogStartMessage) session.receive(); |
| | | session.setSoTimeout(0); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | | * We must not publish changes to a changelog that has not |
| | |
| | | (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber))) |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | break; |
| | | } |
| | |
| | | else |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any changelog server. |
| | | */ |
| | | try |
| | | { |
| | | Thread.sleep(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // TODO Auto-generated catch block |
| | | e.printStackTrace(); |
| | | } |
| | | checkState = false; |
| | | int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES; |
| | | String message = getMessage(msgID); |
| | |
| | | { |
| | | if (this.connected == false) |
| | | this.reStart(failingSession); |
| | | |
| | | if (msg instanceof UpdateMessage) |
| | | sendWindow.acquire(); |
| | | session.publish(msg); |
| | | done = true; |
| | | } catch (IOException e) |
| | | { |
| | | this.reStart(failingSession); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | ProtocolSession failingSession = session; |
| | | try |
| | | { |
| | | return session.receive(); |
| | | SynchronizationMessage msg = session.receive(); |
| | | if (msg instanceof WindowMessage) |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | else |
| | | { |
| | | if (msg instanceof UpdateMessage) |
| | | { |
| | | rcvWindow--; |
| | | if (rcvWindow < halfRcvWindow) |
| | | { |
| | | session.publish(new WindowMessage(halfRcvWindow)); |
| | | rcvWindow += halfRcvWindow; |
| | | } |
| | | } |
| | | return msg; |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | if (e instanceof SocketTimeoutException) |
| | |
| | | */ |
| | | public void setSoTimeout(int timeout) throws SocketException |
| | | { |
| | | this.timeout = timeout; |
| | | session.setSoTimeout(timeout); |
| | | } |
| | | |
| | |
| | | { |
| | | // TODO to be implemented |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | return maxRcvWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the current receive window size. |
| | | * |
| | | * @return The current receive window size. |
| | | */ |
| | | public int getCurrentRcvWindow() |
| | | { |
| | | return rcvWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum send window size. |
| | | * |
| | | * @return The maximum send window size. |
| | | */ |
| | | public int getMaxSendWindow() |
| | | { |
| | | return maxSendWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the current send window size. |
| | | * |
| | | * @return The current send window size. |
| | | */ |
| | | public int getCurrentSendWindow() |
| | | { |
| | | if (connected) |
| | | return sendWindow.availablePermits(); |
| | | else |
| | | return 0; |
| | | } |
| | | } |
| | |
| | | private String serverURL; |
| | | private ServerState serverState; |
| | | |
| | | private int windowSize; |
| | | |
| | | /** |
| | | * Create a ChangelogStartMessage. |
| | | * |
| | | * @param serverId changelog server id |
| | | * @param serverURL changelog server URL |
| | | * @param baseDn base DN for which the ChangelogStartMessage is created. |
| | | * @param windowSize The window size. |
| | | * @param serverState our ServerState for this baseDn. |
| | | */ |
| | | public ChangelogStartMessage(short serverId, String serverURL, DN baseDn, |
| | | int windowSize, |
| | | ServerState serverState) |
| | | { |
| | | this.serverId = serverId; |
| | |
| | | this.baseDn = baseDn.toNormalizedString(); |
| | | else |
| | | this.baseDn = null; |
| | | this.windowSize = windowSize; |
| | | this.serverState = serverState; |
| | | } |
| | | |
| | |
| | | public ChangelogStartMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The ChangelogStartMessage is encoded in the form : |
| | | * <baseDn><ServerId><ServerUrl><ServerState> |
| | | * <baseDn><ServerId><ServerUrl><windowsize><ServerState> |
| | | */ |
| | | try |
| | | { |
| | |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the window size |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerState |
| | | */ |
| | | serverState = new ServerState(in, pos, in.length-1); |
| | |
| | | public byte[] getBytes() |
| | | { |
| | | /* The ChangelogStartMessage is stored in the form : |
| | | * <operation type><basedn><serverid><serverURL><serverState> |
| | | * <operation type><basedn><serverid><serverURL><windowsize><serverState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | | byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8"); |
| | | byte[] byteServerUrl = serverURL.getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8"); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | | byteServerUrl.length + 1 + byteServerState.length + 1; |
| | | byteServerUrl.length + 1 + byteWindowSize.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | |
| | | /* put the ServerURL */ |
| | | pos = addByteArray(byteServerUrl, resultByteArray, pos); |
| | | |
| | | /* put the window size */ |
| | | pos = addByteArray(byteWindowSize, resultByteArray, pos); |
| | | |
| | | /* put the ServerState */ |
| | | pos = addByteArray(byteServerState, resultByteArray, pos); |
| | | |
| | |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * get the window size for the server that created this message. |
| | | * |
| | | * @return The window size for the server that created this message. |
| | | */ |
| | | public int getWindowSize() |
| | | { |
| | | return windowSize; |
| | | } |
| | | } |
| | |
| | | |
| | | // shutdown the Changelog Service if necessary |
| | | if (changelog != null) |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | |
| | | /** |
| | |
| | | private int maxSendQueue; |
| | | private int maxReceiveDelay; |
| | | private int maxSendDelay; |
| | | private int windowSize; |
| | | private ServerState serverState = null; |
| | | |
| | | /** |
| | |
| | | * @param maxReceiveQueue The max receive Queue for this server. |
| | | * @param maxSendDelay The max Send Delay from this server. |
| | | * @param maxSendQueue The max send Queue from this server. |
| | | * @param windowSize The window size used by this server. |
| | | * @param serverState The state of this server. |
| | | */ |
| | | public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay, |
| | | int maxReceiveQueue, int maxSendDelay, |
| | | int maxSendQueue, ServerState serverState) |
| | | int maxSendQueue, int windowSize, |
| | | ServerState serverState) |
| | | { |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn.toString(); |
| | |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | this.serverState = serverState; |
| | | this.windowSize = windowSize; |
| | | |
| | | try |
| | | { |
| | |
| | | { |
| | | /* The ServerStartMessage is encoded in the form : |
| | | * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><ServerState> |
| | | * <maxSendDelay><maxSendQueue><window><ServerState> |
| | | */ |
| | | try |
| | | { |
| | |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the windowSize |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerState |
| | | */ |
| | | serverState = new ServerState(in, pos, in.length-1); |
| | |
| | | /* |
| | | * ServerStartMessage contains. |
| | | * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><ServerState> |
| | | * <maxSendDelay><maxSendQueue><windowsize><ServerState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | |
| | | String.valueOf(maxSendDelay).getBytes("UTF-8"); |
| | | byte[] byteMaxSendQueue = |
| | | String.valueOf(maxSendQueue).getBytes("UTF-8"); |
| | | byte[] byteWindowSize = |
| | | String.valueOf(windowSize).getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | |
| | | byteMaxRecvQueue.length + 1 + |
| | | byteMaxSendDelay.length + 1 + |
| | | byteMaxSendQueue.length + 1 + |
| | | byteWindowSize.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | |
| | | |
| | | pos = addByteArray(byteMaxSendQueue, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteWindowSize, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteServerState, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the window size for the ldap server that created the message. |
| | | * |
| | | * @return The window size for the ldap server that created the message. |
| | | */ |
| | | public int getWindowSize() |
| | | { |
| | | return windowSize; |
| | | } |
| | | } |
| | |
| | | static String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay"; |
| | | static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue"; |
| | | static String MAX_SEND_DELAY = "ds-cfg-max-send-delay"; |
| | | static String WINDOW_SIZE = "ds-cfg-window-size"; |
| | | |
| | | private static final StringConfigAttribute changelogStub = |
| | | new StringConfigAttribute(CHANGELOG_SERVER_ATTR, |
| | |
| | | configAttributes.add(maxSendDelayAttr); |
| | | } |
| | | |
| | | Integer window; |
| | | IntegerConfigAttribute windowStub = |
| | | new IntegerConfigAttribute(WINDOW_SIZE, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | IntegerConfigAttribute windowAttr = |
| | | (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub); |
| | | if (windowAttr == null) |
| | | window = 100; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | window = windowAttr.activeIntValue(); |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | configDn = configEntry.getDN(); |
| | | DirectoryServer.registerConfigurableComponent(this); |
| | | |
| | |
| | | try |
| | | { |
| | | broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay); |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window); |
| | | synchronized (broker) |
| | | { |
| | | broker.start(changelogServers); |
| | |
| | | */ |
| | | public int getPendingUpdatesCount() |
| | | { |
| | | synchronized (pendingChanges) |
| | | { |
| | | return pendingChanges.size(); |
| | | } |
| | | return pendingChanges.size(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | return broker.getMaxRcvWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Get the current receive window size. |
| | | * |
| | | * @return The current receive window size. |
| | | */ |
| | | public int getCurrentRcvWindow() |
| | | { |
| | | return broker.getCurrentRcvWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum send window size. |
| | | * |
| | | * @return The maximum send window size. |
| | | */ |
| | | public int getMaxSendWindow() |
| | | { |
| | | return broker.getMaxSendWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Get the current send window size. |
| | | * |
| | | * @return The current send window size. |
| | | */ |
| | | public int getCurrentSendWindow() |
| | | { |
| | | return broker.getCurrentSendWindow(); |
| | | } |
| | | } |
| | |
| | | static final byte MSG_TYPE_ACK = 5; |
| | | static final byte MSG_TYPE_SERVER_START = 6; |
| | | static final byte MSG_TYPE_CHANGELOG_START = 7; |
| | | static final byte MSG_TYPE_WINDOW = 8; |
| | | |
| | | /** |
| | | * Do the processing necessary when the message is received. |
| | |
| | | case MSG_TYPE_CHANGELOG_START: |
| | | msg = new ChangelogStartMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_WINDOW: |
| | | msg = new WindowMessage(buffer); |
| | | break; |
| | | default: |
| | | throw new DataFormatException("received message with unknown type"); |
| | | } |
| | |
| | | attributes.add(attr); |
| | | |
| | | /* get number of received updates */ |
| | | final String ATTR_UPDATE_RECVD = "received-updates"; |
| | | AttributeType type = |
| | | DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_RECVD); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumRcvdUpdates()))); |
| | | attr = new Attribute(type, "received-updates", values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates()); |
| | | |
| | | /* get number of updates sent */ |
| | | final String ATTR_UPDATE_SENT = "sent-updates"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_SENT); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumSentUpdates()))); |
| | | attr = new Attribute(type, "sent-updates", values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "sent-updates", domain.getNumSentUpdates()); |
| | | |
| | | /* get number of changes in the pending list */ |
| | | final String ATTR_UPDATE_PENDING = "pending-updates"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_PENDING); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getPendingUpdatesCount()))); |
| | | attr = new Attribute(type, "pending-updates", values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "pending-updates", |
| | | domain.getPendingUpdatesCount()); |
| | | |
| | | /* get number of changes replayed */ |
| | | final String ATTR_REPLAYED_UPDATE = "replayed-updates"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumProcessedUpdates()))); |
| | | attr = new Attribute(type, ATTR_REPLAYED_UPDATE, values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "replayed-updates", |
| | | domain.getNumProcessedUpdates()); |
| | | |
| | | /* get number of changes successfully */ |
| | | final String ATTR_REPLAYED_UPDATE_OK = "replayed-updates-ok"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE_OK); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumReplayedPostOpCalled()))); |
| | | attr = new Attribute(type, ATTR_REPLAYED_UPDATE_OK, values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "replayed-updates-ok", |
| | | domain.getNumReplayedPostOpCalled()); |
| | | |
| | | /* get debugCount */ |
| | | final String DEBUG_COUNT = "debug-count"; |
| | | type = DirectoryServer.getDefaultAttributeType(DEBUG_COUNT); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getDebugCount()))); |
| | | attr = new Attribute(type, DEBUG_COUNT, values); |
| | | attributes.add(attr); |
| | | /* get window information */ |
| | | addMonitorData(attributes, "max-rcv-window", domain.getMaxRcvWindow()); |
| | | addMonitorData(attributes, "current-rcv-window", |
| | | domain.getCurrentRcvWindow()); |
| | | addMonitorData(attributes, "max-send-window", |
| | | domain.getMaxSendWindow()); |
| | | addMonitorData(attributes, "current-send-window", |
| | | domain.getCurrentSendWindow()); |
| | | |
| | | /* get the Server State */ |
| | | final String ATTR_SERVER_STATE = "server-state"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | AttributeType type = |
| | | DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | for (String str : domain.getServerState().toStringSet()) |
| | | { |
| | | values.add(new AttributeValue(type,str)); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add an attribute with an integer value to the list of monitoring |
| | | * attributes. |
| | | * |
| | | * @param attributes the list of monitoring attributes |
| | | * @param name the name of the attribute to add. |
| | | * @param value The integer value of he attribute to add. |
| | | */ |
| | | private void addMonitorData(ArrayList<Attribute> attributes, |
| | | String name, int value) |
| | | { |
| | | Attribute attr; |
| | | AttributeType type; |
| | | LinkedHashSet<AttributeValue> values; |
| | | type = DirectoryServer.getDefaultAttributeType(name); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, String.valueOf(value))); |
| | | attr = new Attribute(type, name, values); |
| | | attributes.add(attr); |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the length of time in milliseconds that should elapse between |
| | | * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero |
| | | * return value indicates that the <CODE>updateMonitorData()</CODE> method |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | |
| | | /** |
| | | * This message is used by LDAP server when they first connect. |
| | | * to a changelog server to let them know who they are and what is their state |
| | | * (their RUV) |
| | | */ |
| | | public class WindowMessage extends SynchronizationMessage implements |
| | | Serializable |
| | | { |
| | | private static final long serialVersionUID = 8442267608764026867L; |
| | | private final int numAck; |
| | | |
| | | |
| | | /** |
| | | * Create a new WindowMessage. |
| | | * |
| | | * @param numAck The number of acknowledged messages. |
| | | * The window will be increase by this number. |
| | | */ |
| | | public WindowMessage(int numAck) |
| | | { |
| | | this.numAck = numAck; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new WindowMessage from its encoded form. |
| | | * |
| | | * @param in The byte array containing the encoded form of the |
| | | * WindowMessage. |
| | | * @throws DataFormatException If the byte array does not contain a valid |
| | | * encoded form of the WindowMessage. |
| | | */ |
| | | public WindowMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The WindowMessage is encoded in the form : |
| | | * <numAck> |
| | | */ |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_WINDOW) |
| | | throw new DataFormatException("input is not a valid Window Message"); |
| | | int pos = 1; |
| | | |
| | | /* |
| | | * read the number of acks contained in this message. |
| | | * first calculate the length then construct the string |
| | | */ |
| | | int length = getNextLength(in, pos); |
| | | String numAckStr = new String(in, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | numAck = Integer.parseInt(numAckStr); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | /* |
| | | * WindowMessage contains. |
| | | * <numAck> |
| | | */ |
| | | try { |
| | | byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8"); |
| | | |
| | | int length = 1 + byteNumAck.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_WINDOW; |
| | | int pos = 1; |
| | | |
| | | pos = addByteArray(byteNumAck, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Get the number of message acknowledged by the Window Message. |
| | | * |
| | | * @return the number of message acknowledged by the Window Message. |
| | | */ |
| | | public int getNumAck() |
| | | { |
| | | return numAck; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public UpdateMessage processReceive(SynchronizationDomain domain) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | |
| | | "org.opends.server.BuildRoot"; |
| | | |
| | | /** |
| | | * The name of the system property that specifies the ldap port. |
| | | * Set this prtoperty when running the server if you want to use a given |
| | | * port number, otherwise a port is choosed randomly at test startup time. |
| | | */ |
| | | public static final String PROPERTY_LDAP_PORT = |
| | | "org.opends.server.LdapPort"; |
| | | |
| | | /** |
| | | * The string representation of the DN that will be used as the base entry for |
| | | * the test backend. This must not be changed, as there are a number of test |
| | | * cases that depend on this specific value of "o=test". |
| | |
| | | ServerSocket serverJmxSocket = null; |
| | | ServerSocket serverLdapsSocket = null; |
| | | |
| | | serverLdapSocket = bindFreePort(); |
| | | serverLdapPort = serverLdapSocket.getLocalPort(); |
| | | String ldapPort = System.getProperty(PROPERTY_LDAP_PORT); |
| | | if (ldapPort == null) |
| | | { |
| | | serverLdapSocket = bindFreePort(); |
| | | serverLdapPort = serverLdapSocket.getLocalPort(); |
| | | } |
| | | else |
| | | { |
| | | serverLdapPort = Integer.valueOf(ldapPort); |
| | | serverLdapSocket = bindPort(serverLdapPort); |
| | | } |
| | | |
| | | serverJmxSocket = bindFreePort(); |
| | | serverJmxPort = serverJmxSocket.getLocalPort(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Binds to the given socket port on the local host. |
| | | * @return the bounded Server socket. |
| | | * |
| | | * @throws IOException in case of underlying exception. |
| | | * @throws SocketException in case of underlying exception. |
| | | */ |
| | | private static ServerSocket bindPort(int port) |
| | | throws IOException, SocketException |
| | | { |
| | | ServerSocket serverLdapSocket; |
| | | serverLdapSocket = new ServerSocket(); |
| | | serverLdapSocket.setReuseAddress(true); |
| | | serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port)); |
| | | return serverLdapSocket; |
| | | } |
| | | |
| | | /** |
| | | * Find and binds to a free server socket port on the local host. |
| | | * @return the bounded Server socket. |
| | | * |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.testng.Assert.*; |
| | | |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.config.ConfigEntry; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.Operation; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPException; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.OperationType; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchScope; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | /** |
| | | * Test the contructors, encoders and decoders of the synchronization AckMsg, |
| | | * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg |
| | | */ |
| | | public class ProtocolWindowTest |
| | | { |
| | | private static final int WINDOW_SIZE = 10; |
| | | |
| | | private static final String SYNCHRONIZATION_STRESS_TEST = |
| | | "Synchronization Stress Test"; |
| | | |
| | | /** |
| | | * The internal connection used for operation |
| | | */ |
| | | private InternalClientConnection connection; |
| | | |
| | | /** |
| | | * Created entries that need to be deleted for cleanup |
| | | */ |
| | | private ArrayList<Entry> entryList = new ArrayList<Entry>(); |
| | | |
| | | /** |
| | | * The Synchronization config manager entry |
| | | */ |
| | | private String synchroStringDN; |
| | | |
| | | /** |
| | | * The synchronization plugin entry |
| | | */ |
| | | private String synchroPluginStringDN; |
| | | |
| | | private Entry synchroPluginEntry; |
| | | |
| | | /** |
| | | * The Server synchro entry |
| | | */ |
| | | private String synchroServerStringDN; |
| | | |
| | | private Entry synchroServerEntry; |
| | | |
| | | /** |
| | | * The Change log entry |
| | | */ |
| | | private String changeLogStringDN; |
| | | |
| | | private Entry changeLogEntry; |
| | | |
| | | /** |
| | | * A "person" entry |
| | | */ |
| | | private Entry personEntry; |
| | | |
| | | /** |
| | | * schema check flag |
| | | */ |
| | | private boolean schemaCheck; |
| | | |
| | | // WORKAROUND FOR BUG #639 - BEGIN - |
| | | /** |
| | | * |
| | | */ |
| | | MultimasterSynchronization mms; |
| | | |
| | | // WORKAROUND FOR BUG #639 - END - |
| | | |
| | | /** |
| | | * Test the window mechanism by : |
| | | * - creating a Changelog service client using the ChangelogBroker class. |
| | | * - set a small window size. |
| | | * - perform more than the window size operations. |
| | | * - check that the Changelog has not sent more than window size operations. |
| | | * - receive all messages from the ChangelogBroker, check that |
| | | * the client receives the correct number of operations. |
| | | */ |
| | | @Test(enabled=true, groups="slow") |
| | | public void saturateAndRestart() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | cleanEntries(); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 13); |
| | | |
| | | try { |
| | | |
| | | /* Test that changelog monitor and synchro plugin monitor informations |
| | | * publish the correct window size. |
| | | * This allows both the check the monitoring code and to test that |
| | | * configuration is working. |
| | | */ |
| | | Thread.sleep(1500); |
| | | assertTrue(checkWindows(WINDOW_SIZE)); |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | // send twice the window modify operations |
| | | int count = WINDOW_SIZE * 2; |
| | | processModify(count); |
| | | |
| | | // let some time to the message to reach the changelog client |
| | | Thread.sleep(500); |
| | | |
| | | // check that the changelog only sent WINDOW_SIZE messages |
| | | assertTrue(searchUpdateSent()); |
| | | |
| | | int rcvCount=0; |
| | | try |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | rcvCount++; |
| | | } |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | {} |
| | | /* |
| | | * check that we received all updates |
| | | */ |
| | | assertEquals(rcvCount, WINDOW_SIZE*2); |
| | | } |
| | | finally { |
| | | broker.stop(); |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check that the window configuration has been successfull |
| | | * by reading the monitoring information and checking |
| | | * that we do have 2 entries with the configured max-rcv-window. |
| | | */ |
| | | private boolean checkWindows(int windowSize) throws LDAPException |
| | | { |
| | | InternalSearchOperation op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(max-rcv-window=" + windowSize + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 3); |
| | | } |
| | | |
| | | /** |
| | | * Search that the changelog has stopped sending changes after |
| | | * having reach the limit of the window size. |
| | | * Do this by checking the monitoring information. |
| | | */ |
| | | private boolean searchUpdateSent() throws Exception |
| | | { |
| | | InternalSearchOperation op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 1); |
| | | } |
| | | |
| | | /** |
| | | * Set up the environment for performing the tests in this Class. |
| | | * synchronization |
| | | * |
| | | * @throws Exception |
| | | * If the environment could not be set up. |
| | | */ |
| | | @BeforeClass |
| | | public void setUp() throws Exception |
| | | { |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // Disable schema check |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | DirectoryServer.setCheckSchema(false); |
| | | |
| | | // Create an internal connection |
| | | connection = new InternalClientConnection(); |
| | | |
| | | // Create backend top level entries |
| | | String[] topEntries = new String[2]; |
| | | topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n" |
| | | + "objectClass: domain\n"; |
| | | topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n" |
| | | + "objectClass: organizationalUnit\n" |
| | | + "entryUUID: 11111111-1111-1111-1111-111111111111\n"; |
| | | Entry entry; |
| | | for (int i = 0; i < topEntries.length; i++) |
| | | { |
| | | entry = TestCaseUtils.entryFromLdifString(topEntries[i]); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), |
| | | entry.getUserAttributes(), entry.getOperationalAttributes()); |
| | | addOp.setInternalOperation(true); |
| | | addOp.run(); |
| | | entryList.add(entry); |
| | | } |
| | | |
| | | // top level synchro provider |
| | | synchroStringDN = "cn=Synchronization Providers,cn=config"; |
| | | |
| | | // Multimaster Synchro plugin |
| | | synchroPluginStringDN = "cn=Multimaster Synchronization, " |
| | | + synchroStringDN; |
| | | String synchroPluginLdif = "dn: " |
| | | + synchroPluginStringDN |
| | | + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-provider\n" |
| | | + "ds-cfg-synchronization-provider-enabled: true\n" |
| | | + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n"; |
| | | synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif); |
| | | |
| | | // Change log |
| | | changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN; |
| | | String changeLogLdif = "dn: " + changeLogStringDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n" |
| | | + "ds-cfg-changelog-server-id: 1\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE; |
| | | changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); |
| | | |
| | | // suffix synchronized |
| | | synchroServerStringDN = "cn=example, " + synchroPluginStringDN; |
| | | String synchroServerLdif = "dn: " + synchroServerStringDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-provider-config\n" |
| | | + "cn: example\n" |
| | | + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n" |
| | | + "ds-cfg-changelog-server: localhost:8989\n" |
| | | + "ds-cfg-directory-server-id: 1\n" |
| | | + "ds-cfg-receive-status: true\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE; |
| | | synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); |
| | | |
| | | String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n" |
| | | + "objectClass: top\n" + "objectClass: person\n" |
| | | + "objectClass: organizationalPerson\n" |
| | | + "objectClass: inetOrgPerson\n" + "uid: user.1\n" |
| | | + "homePhone: 951-245-7634\n" |
| | | + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" |
| | | + "mobile: 027-085-0537\n" |
| | | + "postalAddress: Aaccf Amar$17984 Thirteenth Street" |
| | | + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" |
| | | + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" |
| | | + "street: 17984 Thirteenth Street\n" |
| | | + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" |
| | | + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" |
| | | + "userPassword: password\n" + "initials: AA\n"; |
| | | personEntry = TestCaseUtils.entryFromLdifString(personLdif); |
| | | |
| | | configureSynchronization(); |
| | | } |
| | | |
| | | /** |
| | | * Clean up the environment. return null; |
| | | * |
| | | * @throws Exception |
| | | * If the environment could not be set up. |
| | | */ |
| | | @AfterClass |
| | | public void classCleanUp() throws Exception |
| | | { |
| | | DirectoryServer.setCheckSchema(schemaCheck); |
| | | |
| | | // WORKAROUND FOR BUG #639 - BEGIN - |
| | | DirectoryServer.deregisterSynchronizationProvider(mms); |
| | | mms.finalizeSynchronizationProvider(); |
| | | // WORKAROUND FOR BUG #639 - END - |
| | | |
| | | cleanEntries(); |
| | | } |
| | | |
| | | /** |
| | | * suppress all the entries created by the tests in this class |
| | | */ |
| | | private void cleanEntries() |
| | | { |
| | | DeleteOperation op; |
| | | // Delete entries |
| | | Entry entries[] = entryList.toArray(new Entry[0]); |
| | | for (int i = entries.length - 1; i != 0; i--) |
| | | { |
| | | try |
| | | { |
| | | op = new DeleteOperation(connection, InternalClientConnection |
| | | .nextOperationID(), InternalClientConnection.nextMessageID(), null, |
| | | entries[i].getDN()); |
| | | op.run(); |
| | | } catch (Exception e) |
| | | { |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * @return |
| | | */ |
| | | private List<Modification> generatemods(String attrName, String attrValue) |
| | | { |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrName.toLowerCase(), true); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(attrType, attrValue)); |
| | | Attribute attr = new Attribute(attrType, attrName, values); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | Modification mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | | return mods; |
| | | } |
| | | |
| | | /** |
| | | * Open a changelog session to the local Changelog server. |
| | | * |
| | | */ |
| | | private ChangelogBroker openChangelogSession(final DN baseDn, short serverId) |
| | | throws Exception, SocketException |
| | | { |
| | | ServerState state = new ServerState(baseDn); |
| | | state.loadState(); |
| | | ChangelogBroker broker = |
| | | new ChangelogBroker(state, baseDn, serverId, 0, 0, 0, 0, WINDOW_SIZE); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:8989"); |
| | | broker.start(servers); |
| | | broker.setSoTimeout(5000); |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { } |
| | | return broker; |
| | | } |
| | | |
| | | /** |
| | | * Configure the Synchronization for this test. |
| | | */ |
| | | private void configureSynchronization() throws Exception |
| | | { |
| | | // |
| | | // Add the Multimaster synchronization plugin |
| | | DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null); |
| | | entryList.add(synchroPluginEntry); |
| | | assertNotNull(DirectoryServer.getConfigEntry(DN |
| | | .decode(synchroPluginStringDN)), |
| | | "Unable to add the Multimaster synchronization plugin"); |
| | | |
| | | // WORKAROUND FOR BUG #639 - BEGIN - |
| | | DN dn = DN.decode(synchroPluginStringDN); |
| | | ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn); |
| | | mms = new MultimasterSynchronization(); |
| | | try |
| | | { |
| | | mms.initializeSynchronizationProvider(mmsConfigEntry); |
| | | } |
| | | catch (ConfigException e) |
| | | { |
| | | assertTrue(false, |
| | | "Unable to initialize the Multimaster synchronization plugin"); |
| | | } |
| | | DirectoryServer.registerSynchronizationProvider(mms); |
| | | // WORKAROUND FOR BUG #639 - END - |
| | | |
| | | // |
| | | // Add the changelog server |
| | | DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), |
| | | "Unable to add the changeLog server"); |
| | | entryList.add(changeLogEntry); |
| | | |
| | | // |
| | | // We also have a replicated suffix (synchronization domain) |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the syncrhonized server"); |
| | | entryList.add(synchroServerEntry); |
| | | } |
| | | |
| | | private void processModify(int count) |
| | | { |
| | | while (count>0) |
| | | { |
| | | count--; |
| | | // must generate the mods for every operation because they are modified |
| | | // by processModify. |
| | | List<Modification> mods = generatemods("telephonenumber", "01 02 45"); |
| | | |
| | | ModifyOperation modOp = |
| | | connection.processModify(personEntry.getDN(), mods); |
| | | assertEquals(modOp.getResultCode(), ResultCode.SUCCESS); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.assertNotNull; |
| | | import static org.testng.Assert.assertTrue; |
| | |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | |
| | | @Test(enabled=true, groups="slow") |
| | | public void fromServertoBroker() throws Exception |
| | | { |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting Synchronization StressTest : fromServertoBroker" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | final int TOTAL_MESSAGES = 1000; |
| | | cleanEntries(); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 3); |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 18); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | |
| | | try { |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | while (true) |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | broker.receive(); |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | catch (Exception e) |
| | | { } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | reader = new BrokerReader(broker); |
| | | reader.start(); |
| | | reader = new BrokerReader(broker); |
| | | reader.start(); |
| | | |
| | | long startTime = TimeThread.getTime(); |
| | | int count = TOTAL_MESSAGES; |
| | | long startTime = TimeThread.getTime(); |
| | | int count = TOTAL_MESSAGES; |
| | | |
| | | // Create a number of writer thread that will loop modifying the entry |
| | | List<Thread> writerThreadList = new LinkedList<Thread>(); |
| | | for (int n = 0; n < 1; n++) |
| | | { |
| | | BrokerWriter writer = new BrokerWriter(count); |
| | | writerThreadList.add(writer); |
| | | } |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.start(); |
| | | } |
| | | // wait for all the threads to finish. |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.join(); |
| | | } |
| | | // Create a number of writer thread that will loop modifying the entry |
| | | List<Thread> writerThreadList = new LinkedList<Thread>(); |
| | | for (int n = 0; n < 1; n++) |
| | | { |
| | | BrokerWriter writer = new BrokerWriter(count); |
| | | writerThreadList.add(writer); |
| | | } |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.start(); |
| | | } |
| | | // wait for all the threads to finish. |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.join(); |
| | | } |
| | | |
| | | long afterSendTime = TimeThread.getTime(); |
| | | long afterSendTime = TimeThread.getTime(); |
| | | |
| | | int rcvCount = reader.getCount(); |
| | | long afterReceiveTime = TimeThread.getTime(); |
| | | int rcvCount = reader.getCount(); |
| | | |
| | | long afterReceiveTime = TimeThread.getTime(); |
| | | |
| | | if (rcvCount != TOTAL_MESSAGES) |
| | | { |
| | | fail("some messages were lost : expected : " +TOTAL_MESSAGES + |
| | | " received : " + rcvCount); |
| | | } |
| | | if (rcvCount != TOTAL_MESSAGES) |
| | | { |
| | | fail("some messages were lost : expected : " +TOTAL_MESSAGES + |
| | | " received : " + rcvCount); |
| | | } |
| | | |
| | | } |
| | | finally { |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | broker.stop(); |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | broker.stop(); |
| | | } |
| | | } |
| | | |
| | |
| | | ServerState state = new ServerState(baseDn); |
| | | state.loadState(); |
| | | ChangelogBroker broker = new ChangelogBroker(state, baseDn, |
| | | serverId, 0, 0, 0, 0); |
| | | serverId, 0, 0, 0, 0, 100); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:8989"); |
| | | broker.start(servers); |
| | |
| | | // We also have a replicated suffix (synchronization domain) |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the syncrhonized server"); |
| | | "Unable to add the synchronized server"); |
| | | entryList.add(synchroServerEntry); |
| | | } |
| | | |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | SynchronizationMessage msg = broker.receive(); |
| | | if (msg == null) |
| | | break; |
| | | count ++; |
| | | } |
| | | } catch (Exception e) { |
| | |
| | | return count; |
| | | try |
| | | { |
| | | this.wait(); |
| | | this.wait(60); |
| | | return count; |
| | | } catch (InterruptedException e) |
| | | { |
| | |
| | | // Check that retrieved CN is OK |
| | | msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes()); |
| | | } |
| | | |
| | | @DataProvider(name="serverStart") |
| | | public Object [][] createServerStartMessageTestData() throws Exception |
| | | { |
| | | DN baseDN = DN.decode("dc=example, dc=com"); |
| | | ServerState state = new ServerState(baseDN); |
| | | return new Object [][] { {(short)1, baseDN, 100, state} }; |
| | | } |
| | | /** |
| | | * Test that ServerStartMessage encoding and decoding works |
| | | * by checking that : msg == new ServerStartMessage(msg.getBytes()). |
| | | */ |
| | | @Test(dataProvider="serverStart") |
| | | public void ServerStartMessageTest(short serverId, DN baseDN, int window, |
| | | ServerState state) throws Exception |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ServerStartMessage msg = new ServerStartMessage(serverId, baseDN, |
| | | window, window, window, window, window, state); |
| | | ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | } |
| | | |
| | | @DataProvider(name="changelogStart") |
| | | public Object [][] createChangelogStartMessageTestData() throws Exception |
| | | { |
| | | DN baseDN = DN.decode("dc=example, dc=com"); |
| | | ServerState state = new ServerState(baseDN); |
| | | return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} }; |
| | | } |
| | | |
| | | /** |
| | | * Test that changelogStartMessage encoding and decoding works |
| | | * by checking that : msg == new ChangelogStartMessage(msg.getBytes()). |
| | | */ |
| | | @Test(dataProvider="changelogStart") |
| | | public void ChangelogStartMessageTest(short serverId, DN baseDN, int window, |
| | | String url, ServerState state) throws Exception |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ChangelogStartMessage msg = new ChangelogStartMessage(serverId, |
| | | url, baseDN, window, state); |
| | | ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | } |
| | | |
| | | /** |
| | | * Test that WindowMessageTest encoding and decoding works |
| | | * by checking that : msg == new WindowMessageTest(msg.getBytes()). |
| | | */ |
| | | @Test() |
| | | public void WindowMessageTest() throws Exception |
| | | { |
| | | WindowMessage msg = new WindowMessage(123); |
| | | WindowMessage newMsg = new WindowMessage(msg.getBytes()); |
| | | assertEquals(msg.getNumAck(), newMsg.getNumAck()); |
| | | } |
| | | |
| | | /** |
| | | * Test PendingChange |
| | |
| | | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.testng.Assert.*; |
| | | |
| | | import java.net.SocketException; |
| | |
| | | @Test(enabled=true) |
| | | public void namingConflicts() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : namingConflicts" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | /* |
| | |
| | | @Test(enabled=true, dataProvider="assured") |
| | | public void updateOperations(boolean assured) throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : updateOperations " + assured , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | cleanEntries(); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 27); |
| | | try { |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 3); |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0); |
| | | |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | // broker.setSoTimeout(100); |
| | | try |
| | | { |
| | | while (true) |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | broker.receive(); |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // broker.setSoTimeout(1000); |
| | | } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | catch (Exception e) |
| | | {} |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | |
| | | // Create an Entry (add operation) |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | // Create an Entry (add operation) |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | // Modify the entry |
| | | List<Modification> mods = generatemods("telephonenumber", "01 02 45"); |
| | | // Modify the entry |
| | | List<Modification> mods = generatemods("telephonenumber", "01 02 45"); |
| | | |
| | | ModifyOperation modOp = new ModifyOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), mods); |
| | | modOp.setInternalOperation(true); |
| | | modOp.run(); |
| | | ModifyOperation modOp = new ModifyOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), mods); |
| | | modOp.setInternalOperation(true); |
| | | modOp.run(); |
| | | |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyMsg, |
| | | "The received synchronization message is not a MODIFY msg"); |
| | | ModifyMsg modMsg = (ModifyMsg) msg; |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyMsg, |
| | | "The received synchronization message is not a MODIFY msg"); |
| | | ModifyMsg modMsg = (ModifyMsg) msg; |
| | | |
| | | receivedOp = modMsg.createOperation(connection); |
| | | assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY synchronization message is not for the excepted DN"); |
| | | receivedOp = modMsg.createOperation(connection); |
| | | assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY synchronization message is not for the excepted DN"); |
| | | |
| | | // Modify the entry DN |
| | | DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ; |
| | | ModifyDNOperation modDNOp = new ModifyDNOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), RDN |
| | | .decode("uid=new person"), true, DN |
| | | .decode("ou=People,dc=example,dc=com")); |
| | | modDNOp.run(); |
| | | assertNotNull(DirectoryServer.getEntry(newDN), |
| | | "The MOD_DN operation didn't create the new person entry"); |
| | | assertNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The MOD_DN operation didn't delete the old person entry"); |
| | | entryList.add(DirectoryServer.getEntry(newDN)); |
| | | // Modify the entry DN |
| | | DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ; |
| | | ModifyDNOperation modDNOp = new ModifyDNOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), RDN |
| | | .decode("uid=new person"), true, DN |
| | | .decode("ou=People,dc=example,dc=com")); |
| | | modDNOp.run(); |
| | | assertNotNull(DirectoryServer.getEntry(newDN), |
| | | "The MOD_DN operation didn't create the new person entry"); |
| | | assertNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The MOD_DN operation didn't delete the old person entry"); |
| | | entryList.add(DirectoryServer.getEntry(newDN)); |
| | | |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyDNMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | ModifyDNMsg moddnMsg = (ModifyDNMsg) msg; |
| | | receivedOp = moddnMsg.createOperation(connection); |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyDNMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | ModifyDNMsg moddnMsg = (ModifyDNMsg) msg; |
| | | receivedOp = moddnMsg.createOperation(connection); |
| | | |
| | | assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY_DN message is not for the excepted DN"); |
| | | assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY_DN message is not for the excepted DN"); |
| | | |
| | | // Delete the entry |
| | | Entry newPersonEntry = DirectoryServer.getEntry(newDN) ; |
| | | DeleteOperation delOp = new DeleteOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")); |
| | | delOp.run(); |
| | | assertNull(DirectoryServer.getEntry(newDN), |
| | | "Unable to delete the new person Entry"); |
| | | entryList.remove(newPersonEntry); |
| | | // Delete the entry |
| | | Entry newPersonEntry = DirectoryServer.getEntry(newDN) ; |
| | | DeleteOperation delOp = new DeleteOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")); |
| | | delOp.run(); |
| | | assertNull(DirectoryServer.getEntry(newDN), |
| | | "Unable to delete the new person Entry"); |
| | | entryList.remove(newPersonEntry); |
| | | |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof DeleteMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | DeleteMsg delMsg = (DeleteMsg) msg; |
| | | receivedOp = delMsg.createOperation(connection); |
| | | assertTrue(DN.decode(delMsg.getDn()).compareTo(DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")) == 0, |
| | | "The received DELETE message is not for the excepted DN"); |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof DeleteMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | DeleteMsg delMsg = (DeleteMsg) msg; |
| | | receivedOp = delMsg.createOperation(connection); |
| | | assertTrue(DN.decode(delMsg.getDn()).compareTo(DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")) == 0, |
| | | "The received DELETE message is not for the excepted DN"); |
| | | |
| | | /* |
| | | * Now check that when we send message to the Changelog server |
| | | * and that they are received and correctly replayed by the server. |
| | | * |
| | | * Start by testing the Add message reception |
| | | */ |
| | | addMsg = new AddMsg(gen.NewChangeNumber(), |
| | | personWithUUIDEntry.getDN().toString(), |
| | | user1entryUUID, baseUUID, |
| | | personWithUUIDEntry.getObjectClassAttribute(), |
| | | personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>()); |
| | | if (assured) |
| | | addMsg.setAssured(); |
| | | broker.publish(addMsg); |
| | | /* |
| | | * Now check that when we send message to the Changelog server |
| | | * and that they are received and correctly replayed by the server. |
| | | * |
| | | * Start by testing the Add message reception |
| | | */ |
| | | addMsg = new AddMsg(gen.NewChangeNumber(), |
| | | personWithUUIDEntry.getDN().toString(), |
| | | user1entryUUID, baseUUID, |
| | | personWithUUIDEntry.getObjectClassAttribute(), |
| | | personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>()); |
| | | if (assured) |
| | | addMsg.setAssured(); |
| | | broker.publish(addMsg); |
| | | |
| | | /* |
| | | * Check that the entry has been created in the local DS. |
| | | */ |
| | | Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true); |
| | | assertNotNull(resultEntry, |
| | | "The send ADD synchronization message was not applied"); |
| | | entryList.add(resultEntry); |
| | | /* |
| | | * Check that the entry has been created in the local DS. |
| | | */ |
| | | Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true); |
| | | assertNotNull(resultEntry, |
| | | "The send ADD synchronization message was not applied"); |
| | | entryList.add(resultEntry); |
| | | |
| | | /* |
| | | * Test the reception of Modify Msg |
| | | */ |
| | | modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(), |
| | | mods, user1entryUUID); |
| | | if (assured) |
| | | modMsg.setAssured(); |
| | | broker.publish(modMsg); |
| | | /* |
| | | * Test the reception of Modify Msg |
| | | */ |
| | | modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(), |
| | | mods, user1entryUUID); |
| | | if (assured) |
| | | modMsg.setAssured(); |
| | | broker.publish(modMsg); |
| | | |
| | | boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(), |
| | | "telephonenumber", "01 02 45", 1000); |
| | | boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(), |
| | | "telephonenumber", "01 02 45", 1000); |
| | | |
| | | if (found == false) |
| | | fail("The modification has not been correctly replayed."); |
| | | if (found == false) |
| | | fail("The modification has not been correctly replayed."); |
| | | |
| | | /* |
| | | * Test the Reception of Modify Dn Msg |
| | | */ |
| | | moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(), |
| | | gen.NewChangeNumber(), |
| | | user1entryUUID, null, |
| | | true, null, "uid= new person"); |
| | | if (assured) |
| | | moddnMsg.setAssured(); |
| | | broker.publish(moddnMsg); |
| | | /* |
| | | * Test the Reception of Modify Dn Msg |
| | | */ |
| | | moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(), |
| | | gen.NewChangeNumber(), |
| | | user1entryUUID, null, |
| | | true, null, "uid= new person"); |
| | | if (assured) |
| | | moddnMsg.setAssured(); |
| | | broker.publish(moddnMsg); |
| | | |
| | | resultEntry = getEntry( |
| | | DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true); |
| | | resultEntry = getEntry( |
| | | DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true); |
| | | |
| | | assertNotNull(resultEntry, |
| | | "The modify DN synchronization message was not applied"); |
| | | assertNotNull(resultEntry, |
| | | "The modify DN synchronization message was not applied"); |
| | | |
| | | /* |
| | | * Test the Reception of Delete Msg |
| | | */ |
| | | delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com", |
| | | gen.NewChangeNumber(), user1entryUUID); |
| | | if (assured) |
| | | delMsg.setAssured(); |
| | | broker.publish(delMsg); |
| | | resultEntry = getEntry( |
| | | /* |
| | | * Test the Reception of Delete Msg |
| | | */ |
| | | delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com", |
| | | gen.NewChangeNumber(), user1entryUUID); |
| | | if (assured) |
| | | delMsg.setAssured(); |
| | | broker.publish(delMsg); |
| | | resultEntry = getEntry( |
| | | DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, false); |
| | | |
| | | assertNull(resultEntry, |
| | | "The DELETE synchronization message was not replayed"); |
| | | |
| | | broker.stop(); |
| | | assertNull(resultEntry, |
| | | "The DELETE synchronization message was not replayed"); |
| | | } |
| | | finally |
| | | { |
| | | broker.stop(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | ServerState state = new ServerState(baseDn); |
| | | state.loadState(); |
| | | ChangelogBroker broker = new ChangelogBroker(state, baseDn, |
| | | serverId, 0, 0, 0, 0); |
| | | serverId, 0, 0, 0, 0, 100); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:8989"); |
| | | broker.start(servers); |
| | |
| | | @Test(enabled=true) |
| | | public void deleteNoSuchObject() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : deleteNoSuchObject" , 1); |
| | | |
| | | DN dn = DN.decode("cn=No Such Object,ou=People,dc=example,dc=com"); |
| | | Operation op = |
| | | new DeleteOperation(connection, |
| | |
| | | @Test(enabled=true) |
| | | public void infiniteReplayLoop() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : infiniteReplayLoop" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 3); |
| | | Thread.sleep(2000); |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 11); |
| | | try |
| | | { |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0); |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0); |
| | | |
| | | // Create a test entry. |
| | | String personLdif = "dn: uid=user.2,ou=People,dc=example,dc=com\n" |