| | |
| | | NAME 'ds-cfg-bind-with-dn-requires-password' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.288 |
| | | NAME 'ds-cfg-window-size' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.164 |
| | | NAME 'ds-cfg-max-receive-queue' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE |
| | |
| | | STRUCTURAL MUST ( ds-cfg-changelog-server $ ds-cfg-directory-server-id |
| | | $ ds-cfg-synchronization-dn ) |
| | | MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $ |
| | | ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay ) |
| | | ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $ |
| | | ds-cfg-window-size ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.59 |
| | | NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator |
| | |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.65 NAME |
| | | 'ds-cfg-synchronization-changelog-server-config' SUP top |
| | | STRUCTURAL MUST (ds-cfg-changelog-server-id $ ds-cfg-changelog-port ) |
| | | MAY ( ds-cfg-changelog-server $ cn ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | MAY ( ds-cfg-changelog-server $ cn $ ds-cfg-window-size ) X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.66 NAME 'ds-backup-directory' |
| | | SUP top STRUCTURAL MUST ( ds-backup-directory-path $ ds-backup-backend-dn ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | |
| | | */ |
| | | public class Changelog implements Runnable, ConfigurableComponent |
| | | { |
| | | static private short serverId; |
| | | static private String serverURL; |
| | | private short serverId; |
| | | private String serverURL; |
| | | |
| | | private static ServerSocket listenSocket; |
| | | private static Thread myListenThread; |
| | | private static Thread myConnectThread; |
| | | private ServerSocket listenSocket; |
| | | private Thread myListenThread; |
| | | private Thread myConnectThread; |
| | | |
| | | private static boolean runListen = true; |
| | | private boolean runListen = true; |
| | | |
| | | /* The list of changelog servers configured by the administrator */ |
| | | private List<String> changelogServers; |
| | |
| | | /* This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | | */ |
| | | private static HashMap<DN, ChangelogCache> baseDNs = |
| | | private HashMap<DN, ChangelogCache> baseDNs = |
| | | new HashMap<DN, ChangelogCache>(); |
| | | |
| | | private String localURL = "null"; |
| | | private static boolean shutdown = false; |
| | | private boolean shutdown = false; |
| | | private short changelogServerId; |
| | | private DN configDn; |
| | | private List<ConfigAttribute> configAttributes = |
| | | new ArrayList<ConfigAttribute>(); |
| | | private ChangelogDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | |
| | | static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; |
| | | static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id"; |
| | | static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port"; |
| | | static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; |
| | | |
| | | static final IntegerConfigAttribute changelogPortStub = |
| | | new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port", |
| | |
| | | "changelog server information", true, |
| | | true, false); |
| | | |
| | | static final IntegerConfigAttribute windowStub = |
| | | new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | /** |
| | | * Check if a ConfigEntry is valid. |
| | | * @param config The config entry that needs to be checked. |
| | |
| | | } |
| | | configAttributes.add(changelogServer); |
| | | |
| | | IntegerConfigAttribute windowAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(windowStub); |
| | | if (windowAttr == null) |
| | | rcvWindow = 100; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | rcvWindow = windowAttr.activeIntValue(); |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | initialize(changelogServerId, changelogPort); |
| | | |
| | | configDn = config.getDN(); |
| | |
| | | try |
| | | { |
| | | newSocket = listenSocket.accept(); |
| | | newSocket.setReceiveBufferSize(1000000); |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(newSocket)); |
| | | handler.start(null); |
| | | handler.start(null, serverId, serverURL, rcvWindow, this); |
| | | } catch (IOException e) |
| | | { |
| | | // ignore |
| | |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.connect(ServerAddr, 500); |
| | | |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(socket)); |
| | | handler.start(baseDn); |
| | | handler.start(baseDn, serverId, serverURL, rcvWindow, this); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | |
| | | * Initialize the changelog database. |
| | | * TODO : the changelog db path should be configurable |
| | | */ |
| | | ChangelogDB.initialize(DirectoryServer.getServerRoot() + File.separator |
| | | + "changelogDb"); |
| | | dbEnv = new ChangelogDbEnv( |
| | | DirectoryServer.getServerRoot() + File.separator + "changelogDb", |
| | | this); |
| | | |
| | | /* |
| | | * create changelog cache |
| | |
| | | String localAdddress = InetAddress.getLocalHost().getHostAddress(); |
| | | serverURL = localhostname + ":" + String.valueOf(changelogPort); |
| | | localURL = localAdddress + ":" + String.valueOf(changelogPort); |
| | | listenSocket = new ServerSocket(changelogPort); |
| | | listenSocket = new ServerSocket(); |
| | | listenSocket.setReceiveBufferSize(1000000); |
| | | listenSocket.bind(new InetSocketAddress(changelogPort)); |
| | | |
| | | /* |
| | | * create working threads |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the unique identifier for this changelog. |
| | | * |
| | | * @return The unique identifier for this changelog. |
| | | */ |
| | | public static short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the host and port for this changelog, separated by a colon. |
| | | * |
| | | * @return The host and port for this changelog, separated by a colon. |
| | | */ |
| | | public static String getServerURL() |
| | | { |
| | | return serverURL; |
| | | } |
| | | |
| | | /** |
| | | * Get the ChangelogCache associated to the base DN given in parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ChangelogCache must be returned. |
| | | * @return The ChangelogCache associated to the base DN given in parameter. |
| | | */ |
| | | public static ChangelogCache getChangelogCache(DN baseDn) |
| | | public ChangelogCache getChangelogCache(DN baseDn) |
| | | { |
| | | ChangelogCache changelogCache; |
| | | |
| | |
| | | { |
| | | changelogCache = baseDNs.get(baseDn); |
| | | if (changelogCache == null) |
| | | changelogCache = new ChangelogCache(baseDn); |
| | | changelogCache = new ChangelogCache(baseDn, this); |
| | | baseDNs.put(baseDn, changelogCache); |
| | | } |
| | | |
| | |
| | | /** |
| | | * Shutdown the Changelog service and all its connections. |
| | | */ |
| | | public static void shutdown() |
| | | public void shutdown() |
| | | { |
| | | shutdown = true; |
| | | |
| | |
| | | changelogCache.shutdown(); |
| | | } |
| | | |
| | | ChangelogDB.shutdownDbEnvironment(); |
| | | dbEnv.shutdown(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Creates a new DB handler for this Changelog and the serverId and |
| | | * DN given in parameter. |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @return The new DB handler for this Changelog and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | | */ |
| | | public DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException |
| | | { |
| | | return new DbHandler(id, baseDn, this, dbEnv); |
| | | } |
| | | } |
| | |
| | | */ |
| | | private Map<Short, DbHandler> sourceDbHandlers = |
| | | new ConcurrentHashMap<Short, DbHandler>(); |
| | | private Changelog changelog; |
| | | |
| | | /** |
| | | * Creates a new ChangelogCache associated to the DN baseDn. |
| | | * |
| | | * @param baseDn The baseDn associated to the ChangelogCache. |
| | | * @param changelog the Changelog that created this changelog cache. |
| | | */ |
| | | public ChangelogCache(DN baseDn) |
| | | public ChangelogCache(DN baseDn, Changelog changelog) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.changelog = changelog; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | dbHandler = new DbHandler(id, baseDn); |
| | | dbHandler = changelog.newDbHandler(id, baseDn); |
| | | } catch (DatabaseException e) |
| | | { |
| | | /* |
| | |
| | | * from at least one LDAP server. |
| | | * This changelog therefore can't do it's job properly anymore |
| | | * and needs to close all its connections and shutdown itself. |
| | | * TODO : log error |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR; |
| | | String message = getMessage(msgID) + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | return; |
| | | } |
| | | sourceDbHandlers.put(id, dbHandler); |
| | |
| | | /** |
| | | * creates a new ChangelogDB with specified identifier. |
| | | * @param id the identifier of the new ChangelogDB. |
| | | * @param baseDn the baseDn of the new ChangelogDB. |
| | | * @param db the new db. |
| | | * |
| | | * @throws DatabaseException If a database error happened. |
| | | */ |
| | | public void newDb(short id, DN baseDn) throws DatabaseException |
| | | public void newDb(short id, DbHandler db) throws DatabaseException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | sourceDbHandlers.put(id , new DbHandler(id, baseDn)); |
| | | sourceDbHandlers.put(id , db); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void ack(AckMessage message, short fromServerId) |
| | | { |
| | | /* |
| | | * there are 2 possible cases here : |
| | | * - the message that was acked comes from a server to which |
| | | * we are directly connected. |
| | | * In this case, we can find the handler from the connectedServers map |
| | | * - the message that was acked comes from a server to which we are not |
| | | * connected. |
| | | * In this case we need to find the changelog server that forwarded |
| | | * the change and send back the ack to this server. |
| | | */ |
| | | ServerHandler handler = connectedServers.get( |
| | | message.getChangeNumber().getServerId()); |
| | | if (handler != null) |
| | |
| | | { |
| | | return "ChangelogCache " + baseDn; |
| | | } |
| | | |
| | | |
| | | } |
| | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.util.List; |
| | | import java.io.File; |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | |
| | | import com.sleepycat.je.Cursor; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.Environment; |
| | | import com.sleepycat.je.EnvironmentConfig; |
| | | import com.sleepycat.je.Database; |
| | | import com.sleepycat.je.DatabaseConfig; |
| | | import com.sleepycat.je.LockMode; |
| | | import com.sleepycat.je.OperationStatus; |
| | | import com.sleepycat.je.Transaction; |
| | |
| | | */ |
| | | public class ChangelogDB |
| | | { |
| | | private static Environment dbEnvironment = null; |
| | | private Database db = null; |
| | | private static Database stateDb = null; |
| | | private String stringId = null; |
| | | private ChangelogDbEnv dbenv = null; |
| | | private Changelog changelog; |
| | | private Short serverId; |
| | | private DN baseDn; |
| | | |
| | | /** |
| | | * Creates a new database or open existing database that will be used |
| | | * to store and retrieve changes from an LDAP server. |
| | | * @param serverId Identifier of the LDAP server. |
| | | * @param baseDn baseDn of the LDAP server. |
| | | * @param changelog the Changelog that needs to be shutdown |
| | | * @param dbenv the Db encironemnet to use to create the db |
| | | * @throws DatabaseException if a database problem happened |
| | | */ |
| | | public ChangelogDB(Short serverId, DN baseDn) |
| | | public ChangelogDB(Short serverId, DN baseDn, Changelog changelog, |
| | | ChangelogDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | try { |
| | | stringId = serverId.toString() + " " + baseDn.toNormalizedString(); |
| | | byte[] byteId = stringId.getBytes("UTF-8"); |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn; |
| | | this.dbenv = dbenv; |
| | | this.changelog = changelog; |
| | | db = dbenv.getOrAddDb(serverId, baseDn); |
| | | |
| | | // Open the database. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | |
| | | db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // never happens |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Initialize this class. |
| | | * Creates Db environment that will be used to create databases. |
| | | * It also reads the currently known databases from the "changelogstate" |
| | | * database. |
| | | * @param path Path where the backing files must be created. |
| | | * @throws DatabaseException If a DatabaseException occured that prevented |
| | | * the initialization to happen. |
| | | * @throws ChangelogDBException If a changelog internal error caused |
| | | * a failure of the changelog processing. |
| | | */ |
| | | public static void initialize(String path) throws DatabaseException, |
| | | ChangelogDBException |
| | | { |
| | | EnvironmentConfig envConfig = new EnvironmentConfig(); |
| | | |
| | | /* Create the DB Environment that will be used for all |
| | | * the Changelog activities related to the db |
| | | */ |
| | | envConfig.setAllowCreate(true); |
| | | envConfig.setTransactional(true); |
| | | envConfig.setConfigParam("je.cleaner.expunge", "true"); |
| | | // TODO : the DB cache size should be configurable |
| | | // For now set 5M is OK for being efficient in 64M total for the JVM |
| | | envConfig.setConfigParam("je.maxMemory", "5000000"); |
| | | dbEnvironment = new Environment(new File(path), envConfig); |
| | | |
| | | /* |
| | | * One database is created to store the update from each LDAP |
| | | * server in the topology. |
| | | * The database "changelogstate" is used to store the list of all |
| | | * the servers that have been seen in the past. |
| | | */ |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | |
| | | stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig); |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | try |
| | | { |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data.getData(), "UTF-8"); |
| | | String[] str = stringData.split(" ", 2); |
| | | short serverId = new Short(str[0]); |
| | | DN baseDn = null; |
| | | try |
| | | { |
| | | baseDn = DN.decode(str[1]); |
| | | } catch (DirectoryException e) |
| | | { |
| | | int msgID = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER; |
| | | String message = getMessage(msgID, str[1]); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | Changelog.getChangelogCache(baseDn).newDb(serverId, baseDn); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | throw new ChangelogDBException(0, |
| | | "changelog state database has a wrong format"); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | throw new ChangelogDBException(0, "need UTF-8 support"); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | cursor.close(); |
| | | |
| | | } catch (DatabaseException dbe) { |
| | | cursor.close(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * add a list of changes to the underlying db. |
| | | * |
| | | * @param changes The list of changes to add to the underlying db. |
| | |
| | | |
| | | try |
| | | { |
| | | txn = dbEnvironment.beginTransaction(null, null); |
| | | txn = dbenv.beginTransaction(); |
| | | |
| | | for (UpdateMessage change : changes) |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | } |
| | | |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | if (txn != null) |
| | | { |
| | | try |
| | |
| | | } catch (DatabaseException e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_CLOSING_DATABASE; |
| | | String message = getMessage(msgID, stringId) + |
| | | String message = getMessage(msgID, this.toString()) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | return null; |
| | | } |
| | | } |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | * {@inheritDoc} |
| | | */ |
| | | public static void shutdownDbEnvironment() |
| | | @Override |
| | | public String toString() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | int msgID = MSGID_ERROR_CLOSING_CHANGELOG_ENV; |
| | | String message = getMessage(msgID) + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | return serverId.toString() + baseDn.toString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | private ChangelogCursor() throws DatabaseException |
| | | { |
| | | txn = dbEnvironment.beginTransaction(null, null); |
| | | txn = dbenv.beginTransaction(); |
| | | cursor = db.openCursor(txn, null); |
| | | } |
| | | |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | if (txn != null) |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.changelog; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.opends.server.messages.MessageHandler.getMessage; |
| | | import static org.opends.server.synchronization.SynchMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.File; |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | |
| | | import com.sleepycat.je.Cursor; |
| | | import com.sleepycat.je.Database; |
| | | import com.sleepycat.je.DatabaseConfig; |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import com.sleepycat.je.Environment; |
| | | import com.sleepycat.je.EnvironmentConfig; |
| | | import com.sleepycat.je.LockMode; |
| | | import com.sleepycat.je.OperationStatus; |
| | | import com.sleepycat.je.Transaction; |
| | | |
| | | /** |
| | | * This class is used to represent a Db environement that can be used |
| | | * to create ChangelogDB. |
| | | */ |
| | | public class ChangelogDbEnv |
| | | { |
| | | private Environment dbEnvironment = null; |
| | | private Database stateDb = null; |
| | | private Changelog changelog = null; |
| | | |
| | | /** |
| | | * Initialize this class. |
| | | * Creates Db environment that will be used to create databases. |
| | | * It also reads the currently known databases from the "changelogstate" |
| | | * database. |
| | | * @param path Path where the backing files must be created. |
| | | * @param changelog the Changelog that creates this ChangelogDbEnv. |
| | | * @throws DatabaseException If a DatabaseException occured that prevented |
| | | * the initialization to happen. |
| | | * @throws ChangelogDBException If a changelog internal error caused |
| | | * a failure of the changelog processing. |
| | | */ |
| | | public ChangelogDbEnv(String path, Changelog changelog) |
| | | throws DatabaseException, ChangelogDBException |
| | | { |
| | | this.changelog = changelog; |
| | | EnvironmentConfig envConfig = new EnvironmentConfig(); |
| | | |
| | | /* Create the DB Environment that will be used for all |
| | | * the Changelog activities related to the db |
| | | */ |
| | | envConfig.setAllowCreate(true); |
| | | envConfig.setTransactional(true); |
| | | envConfig.setConfigParam("je.cleaner.expunge", "true"); |
| | | // TODO : the DB cache size should be configurable |
| | | // For now set 5M is OK for being efficient in 64M total for the JVM |
| | | envConfig.setConfigParam("je.maxMemory", "5000000"); |
| | | dbEnvironment = new Environment(new File(path), envConfig); |
| | | |
| | | /* |
| | | * One database is created to store the update from each LDAP |
| | | * server in the topology. |
| | | * The database "changelogstate" is used to store the list of all |
| | | * the servers that have been seen in the past. |
| | | */ |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | |
| | | stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig); |
| | | start(); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Read the list of known servers from the database and start dbHandler |
| | | * for each of them. |
| | | * |
| | | * @throws DatabaseException in case of underlying DatabaseException |
| | | * @throws ChangelogDBException when the information from the database |
| | | * cannot be decoded correctly. |
| | | */ |
| | | private void start() throws DatabaseException, ChangelogDBException |
| | | { |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | try |
| | | { |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | try |
| | | { |
| | | String stringData = new String(data.getData(), "UTF-8"); |
| | | String[] str = stringData.split(" ", 2); |
| | | short serverId = new Short(str[0]); |
| | | DN baseDn = null; |
| | | try |
| | | { |
| | | baseDn = DN.decode(str[1]); |
| | | } catch (DirectoryException e) |
| | | { |
| | | int msgID = MSGID_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER; |
| | | String message = getMessage(msgID, str[1]); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, changelog, this); |
| | | changelog.getChangelogCache(baseDn).newDb(serverId, dbHandler); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | throw new ChangelogDBException(0, |
| | | "changelog state database has a wrong format"); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | throw new ChangelogDBException(0, "need UTF-8 support"); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | cursor.close(); |
| | | |
| | | } catch (DatabaseException dbe) { |
| | | cursor.close(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Find or create the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | | * @param serverId The server id that identifies the server. |
| | | * @param baseDn The baseDn that identifies the server. |
| | | * @return the Database. |
| | | * @throws DatabaseException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(Short serverId, DN baseDn) |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | { |
| | | String stringId = serverId.toString() + " " + baseDn.toNormalizedString(); |
| | | byte[] byteId; |
| | | |
| | | byteId = stringId.getBytes("UTF-8"); |
| | | |
| | | // Open the database. Create it if it does not already exist. |
| | | DatabaseConfig dbConfig = new DatabaseConfig(); |
| | | dbConfig.setAllowCreate(true); |
| | | dbConfig.setTransactional(true); |
| | | Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); |
| | | |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try { |
| | | data.setData(byteId); |
| | | stateDb.put(txn, key, data); |
| | | txn.commitWriteNoSync(); |
| | | } catch (DatabaseException dbe) |
| | | { |
| | | // Abort the txn and propagate the Exception to the caller |
| | | txn.abort(); |
| | | throw dbe; |
| | | } |
| | | } |
| | | return db; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new transaction. |
| | | * |
| | | * @return the transaction. |
| | | * @throws DatabaseException in case of underlying database Exception. |
| | | */ |
| | | public Transaction beginTransaction() throws DatabaseException |
| | | { |
| | | return dbEnvironment.beginTransaction(null, null); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Db environment. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | dbEnvironment.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | int msgID = MSGID_ERROR_CLOSING_CHANGELOG_ENV; |
| | | String message = getMessage(msgID) + stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | * |
| | | * @param id Identifier of the DB. |
| | | * @param baseDn of the DB. |
| | | * @param changelog the Changelog that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the Changelog DB. |
| | | * @throws DatabaseException If a database problem happened |
| | | */ |
| | | public DbHandler(short id, DN baseDn) throws DatabaseException |
| | | public DbHandler(short id, DN baseDn, Changelog changelog, |
| | | ChangelogDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | this.serverId = id; |
| | | this.baseDn = baseDn; |
| | | db = new ChangelogDB(id, baseDn); |
| | | db = new ChangelogDB(id, baseDn, changelog, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | thread = new DirectoryThread(this, "changelog db " + id + " " + baseDn); |
| | |
| | | {} |
| | | } |
| | | } |
| | | |
| | | while (msgQueue.size() != 0) |
| | | flush(); |
| | | |
| | | db.shutdown(); |
| | | } |
| | | |
| | |
| | | import java.util.Map; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigEntry; |
| | |
| | | import org.opends.server.synchronization.ServerState; |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | import org.opends.server.synchronization.UpdateMessage; |
| | | import org.opends.server.synchronization.WindowMessage; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | /** |
| | |
| | | private MsgQueue msgQueue = new MsgQueue(); |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>();; |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ChangelogCache changelogCache = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | |
| | | private ServerWriter writer = null; |
| | | private DN baseDn = null; |
| | | private String serverAddressURL; |
| | | private int rcvWindow; |
| | | private int rcvWindowSizeHalf; |
| | | private int maxRcvWindow; |
| | | private ServerReader reader; |
| | | private Semaphore sendWindow; |
| | | private int sendWindowSize; |
| | | |
| | | private static Map<ChangeNumber, ChangelogAckMessageList> |
| | | changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); |
| | |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection. |
| | | * @param changelogId The identifier of the changelog that creates this |
| | | * server handler. |
| | | * @param changelogURL The URL of the changelog that creates this |
| | | * server handler. |
| | | * @param windowSize the window size that this server handler must use. |
| | | * @param changelog the Changelog that created this server handler. |
| | | */ |
| | | public void start(DN baseDn) |
| | | public void start(DN baseDn, short changelogId, String changelogURL, |
| | | int windowSize, Changelog changelog) |
| | | { |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | try |
| | | { |
| | | if (baseDn != null) |
| | | { |
| | | this.baseDn = baseDn; |
| | | changelogCache = Changelog.getChangelogCache(baseDn); |
| | | changelogCache = changelog.getChangelogCache(baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage msg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | baseDn, localServerState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | baseDn, windowSize, localServerState); |
| | | |
| | | session.publish(msg); |
| | | } |
| | |
| | | restartSendDelay = 0; |
| | | serverIsLDAPserver = true; |
| | | |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage myStartMsg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | this.baseDn, localServerState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, localServerState); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else if (msg.getClass() == Class.forName( |
| | | "org.opends.server.synchronization.ChangelogStartMessage")) |
| | | else if (msg instanceof ChangelogStartMessage) |
| | | { |
| | | ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg; |
| | | serverId = receivedMsg.getServerId(); |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState serverState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage outMsg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | this.baseDn, serverState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, serverState); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | | this.baseDn = baseDn; |
| | | this.serverState = receivedMsg.getServerState(); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else |
| | | { |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | |
| | | |
| | | writer = new ServerWriter(session, serverId, this, changelogCache); |
| | | |
| | | ServerReader reader = new ServerReader(session, serverId, this, |
| | | reader = new ServerReader(session, serverId, this, |
| | | changelogCache); |
| | | |
| | | reader.start(); |
| | |
| | | // ignore |
| | | } |
| | | } |
| | | |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public UpdateMessage take() |
| | | { |
| | | boolean interrupted = true; |
| | | UpdateMessage msg = getnextMessage(); |
| | | do { |
| | | try |
| | | { |
| | | sendWindow.acquire(); |
| | | interrupted = false; |
| | | } catch (InterruptedException e) |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while (interrupted); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | |
| | | /** |
| | | * Get the next update that must be sent to the server |
| | | * from the message queue or from the database. |
| | | * |
| | | * @return The next update that must be sent to the server. |
| | | */ |
| | | private UpdateMessage getnextMessage() |
| | | { |
| | | UpdateMessage msg; |
| | | do |
| | | { |
| | |
| | | msg1 = msgQueue.removeFirst(); |
| | | } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); |
| | | this.updateServerState(msg); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | /* get the next change from the lateQueue */ |
| | | msg = lateQueue.removeFirst(); |
| | | this.updateServerState(msg); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | * by the other server. |
| | | * Otherwise just loop to select the next message. |
| | | */ |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | String str = changelogCache.getBaseDn().toString() + |
| | | String str = baseDn.toString() + |
| | | " " + serverURL + " " + String.valueOf(serverId); |
| | | |
| | | if (serverIsLDAPserver) |
| | |
| | | attributes.add(new Attribute("server-id", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", |
| | | changelogCache.getBaseDn().toString())); |
| | | baseDn.toString())); |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | attributes.add(new Attribute("update-waiting-acks", |
| | |
| | | String.valueOf(getInAckCount()))); |
| | | attributes.add(new Attribute("approximate-delay", |
| | | String.valueOf(getApproxDelay()))); |
| | | attributes.add(new Attribute("max-send-window", |
| | | String.valueOf(sendWindowSize))); |
| | | attributes.add(new Attribute("current-send-window", |
| | | String.valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(new Attribute("max-rcv-window", |
| | | String.valueOf(maxRcvWindow))); |
| | | attributes.add(new Attribute("current-rcv-window", |
| | | String.valueOf(rcvWindow))); |
| | | long olderUpdateTime = getOlderUpdateTime(); |
| | | if (olderUpdateTime != 0) |
| | | { |
| | |
| | | |
| | | return localString; |
| | | } |
| | | |
| | | /** |
| | | * Check the protocol window and send WindowMessage if necessary. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void checkWindow() throws IOException |
| | | { |
| | | rcvWindow--; |
| | | if (rcvWindow < rcvWindowSizeHalf) |
| | | { |
| | | WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the send window size based on the credit specified in the |
| | | * given window message. |
| | | * |
| | | * @param windowMsg The Window Message containing the information |
| | | * necessary for updating the window size. |
| | | */ |
| | | public void updateWindow(WindowMessage windowMsg) |
| | | { |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.synchronization.AckMessage; |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | import org.opends.server.synchronization.UpdateMessage; |
| | | import org.opends.server.synchronization.WindowMessage; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | |
| | |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.checkWindow(); |
| | | changelogCache.put(update, handler); |
| | | } |
| | | else if (msg instanceof WindowMessage) |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | handler.updateWindow(windowMsg); |
| | | } |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | |
| | | public SocketSession(Socket socket) throws IOException |
| | | { |
| | | this.socket = socket; |
| | | /* |
| | | * Use a window instead of the TCP flow control. |
| | | * Therefore set a very large value for send and receive buffer sizes. |
| | | */ |
| | | input = socket.getInputStream(); |
| | | output = socket.getOutputStream(); |
| | | } |
| | |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.io.IOException; |
| | | import java.net.ConnectException; |
| | | import java.net.InetAddress; |
| | |
| | | private int maxReceiveDelay; |
| | | private int maxSendQueue; |
| | | private int maxReceiveQueue; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow; |
| | | private int halfRcvWindow; |
| | | private int maxRcvWindow; |
| | | private int timeout = 0; |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular SynchronizationDomain. |
| | |
| | | * @param maxSendQueue The maximum size of the send queue to use on |
| | | * the changelog server. |
| | | * @param maxSendDelay The maximum send delay to use on the changelog server. |
| | | * @param window The size of the send and receive window to use. |
| | | */ |
| | | public ChangelogBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay ) |
| | | int maxSendDelay, int window) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | |
| | | this.state = state; |
| | | replayOperations = |
| | | new TreeSet<FakeOperation>(new FakeOperationComparator()); |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window/2; |
| | | } |
| | | |
| | | /** |
| | |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.connect(ServerAddr, 500); |
| | | session = new SocketSession(socket); |
| | | |
| | |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage( serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | state); |
| | | halfRcvWindow*2, state); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ChangelogStartMessage) session.receive(); |
| | | session.setSoTimeout(0); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | | * We must not publish changes to a changelog that has not |
| | |
| | | (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber))) |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | break; |
| | | } |
| | |
| | | else |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any changelog server. |
| | | */ |
| | | try |
| | | { |
| | | Thread.sleep(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // TODO Auto-generated catch block |
| | | e.printStackTrace(); |
| | | } |
| | | checkState = false; |
| | | int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES; |
| | | String message = getMessage(msgID); |
| | |
| | | { |
| | | if (this.connected == false) |
| | | this.reStart(failingSession); |
| | | |
| | | if (msg instanceof UpdateMessage) |
| | | sendWindow.acquire(); |
| | | session.publish(msg); |
| | | done = true; |
| | | } catch (IOException e) |
| | | { |
| | | this.reStart(failingSession); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | ProtocolSession failingSession = session; |
| | | try |
| | | { |
| | | return session.receive(); |
| | | SynchronizationMessage msg = session.receive(); |
| | | if (msg instanceof WindowMessage) |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | else |
| | | { |
| | | if (msg instanceof UpdateMessage) |
| | | { |
| | | rcvWindow--; |
| | | if (rcvWindow < halfRcvWindow) |
| | | { |
| | | session.publish(new WindowMessage(halfRcvWindow)); |
| | | rcvWindow += halfRcvWindow; |
| | | } |
| | | } |
| | | return msg; |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | if (e instanceof SocketTimeoutException) |
| | |
| | | */ |
| | | public void setSoTimeout(int timeout) throws SocketException |
| | | { |
| | | this.timeout = timeout; |
| | | session.setSoTimeout(timeout); |
| | | } |
| | | |
| | |
| | | { |
| | | // TODO to be implemented |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | return maxRcvWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the current receive window size. |
| | | * |
| | | * @return The current receive window size. |
| | | */ |
| | | public int getCurrentRcvWindow() |
| | | { |
| | | return rcvWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum send window size. |
| | | * |
| | | * @return The maximum send window size. |
| | | */ |
| | | public int getMaxSendWindow() |
| | | { |
| | | return maxSendWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the current send window size. |
| | | * |
| | | * @return The current send window size. |
| | | */ |
| | | public int getCurrentSendWindow() |
| | | { |
| | | if (connected) |
| | | return sendWindow.availablePermits(); |
| | | else |
| | | return 0; |
| | | } |
| | | } |
| | |
| | | private String serverURL; |
| | | private ServerState serverState; |
| | | |
| | | private int windowSize; |
| | | |
| | | /** |
| | | * Create a ChangelogStartMessage. |
| | | * |
| | | * @param serverId changelog server id |
| | | * @param serverURL changelog server URL |
| | | * @param baseDn base DN for which the ChangelogStartMessage is created. |
| | | * @param windowSize The window size. |
| | | * @param serverState our ServerState for this baseDn. |
| | | */ |
| | | public ChangelogStartMessage(short serverId, String serverURL, DN baseDn, |
| | | int windowSize, |
| | | ServerState serverState) |
| | | { |
| | | this.serverId = serverId; |
| | |
| | | this.baseDn = baseDn.toNormalizedString(); |
| | | else |
| | | this.baseDn = null; |
| | | this.windowSize = windowSize; |
| | | this.serverState = serverState; |
| | | } |
| | | |
| | |
| | | public ChangelogStartMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The ChangelogStartMessage is encoded in the form : |
| | | * <baseDn><ServerId><ServerUrl><ServerState> |
| | | * <baseDn><ServerId><ServerUrl><windowsize><ServerState> |
| | | */ |
| | | try |
| | | { |
| | |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the window size |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerState |
| | | */ |
| | | serverState = new ServerState(in, pos, in.length-1); |
| | |
| | | public byte[] getBytes() |
| | | { |
| | | /* The ChangelogStartMessage is stored in the form : |
| | | * <operation type><basedn><serverid><serverURL><serverState> |
| | | * <operation type><basedn><serverid><serverURL><windowsize><serverState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | | byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8"); |
| | | byte[] byteServerUrl = serverURL.getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8"); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | | byteServerUrl.length + 1 + byteServerState.length + 1; |
| | | byteServerUrl.length + 1 + byteWindowSize.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | |
| | | /* put the ServerURL */ |
| | | pos = addByteArray(byteServerUrl, resultByteArray, pos); |
| | | |
| | | /* put the window size */ |
| | | pos = addByteArray(byteWindowSize, resultByteArray, pos); |
| | | |
| | | /* put the ServerState */ |
| | | pos = addByteArray(byteServerState, resultByteArray, pos); |
| | | |
| | |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * get the window size for the server that created this message. |
| | | * |
| | | * @return The window size for the server that created this message. |
| | | */ |
| | | public int getWindowSize() |
| | | { |
| | | return windowSize; |
| | | } |
| | | } |
| | |
| | | |
| | | // shutdown the Changelog Service if necessary |
| | | if (changelog != null) |
| | | Changelog.shutdown(); |
| | | changelog.shutdown(); |
| | | } |
| | | |
| | | /** |
| | |
| | | private int maxSendQueue; |
| | | private int maxReceiveDelay; |
| | | private int maxSendDelay; |
| | | private int windowSize; |
| | | private ServerState serverState = null; |
| | | |
| | | /** |
| | |
| | | * @param maxReceiveQueue The max receive Queue for this server. |
| | | * @param maxSendDelay The max Send Delay from this server. |
| | | * @param maxSendQueue The max send Queue from this server. |
| | | * @param windowSize The window size used by this server. |
| | | * @param serverState The state of this server. |
| | | */ |
| | | public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay, |
| | | int maxReceiveQueue, int maxSendDelay, |
| | | int maxSendQueue, ServerState serverState) |
| | | int maxSendQueue, int windowSize, |
| | | ServerState serverState) |
| | | { |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn.toString(); |
| | |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | this.serverState = serverState; |
| | | this.windowSize = windowSize; |
| | | |
| | | try |
| | | { |
| | |
| | | { |
| | | /* The ServerStartMessage is encoded in the form : |
| | | * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><ServerState> |
| | | * <maxSendDelay><maxSendQueue><window><ServerState> |
| | | */ |
| | | try |
| | | { |
| | |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the windowSize |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerState |
| | | */ |
| | | serverState = new ServerState(in, pos, in.length-1); |
| | |
| | | /* |
| | | * ServerStartMessage contains. |
| | | * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><ServerState> |
| | | * <maxSendDelay><maxSendQueue><windowsize><ServerState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | |
| | | String.valueOf(maxSendDelay).getBytes("UTF-8"); |
| | | byte[] byteMaxSendQueue = |
| | | String.valueOf(maxSendQueue).getBytes("UTF-8"); |
| | | byte[] byteWindowSize = |
| | | String.valueOf(windowSize).getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | |
| | | byteMaxRecvQueue.length + 1 + |
| | | byteMaxSendDelay.length + 1 + |
| | | byteMaxSendQueue.length + 1 + |
| | | byteWindowSize.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | |
| | | |
| | | pos = addByteArray(byteMaxSendQueue, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteWindowSize, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteServerState, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the window size for the ldap server that created the message. |
| | | * |
| | | * @return The window size for the ldap server that created the message. |
| | | */ |
| | | public int getWindowSize() |
| | | { |
| | | return windowSize; |
| | | } |
| | | } |
| | |
| | | static String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay"; |
| | | static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue"; |
| | | static String MAX_SEND_DELAY = "ds-cfg-max-send-delay"; |
| | | static String WINDOW_SIZE = "ds-cfg-window-size"; |
| | | |
| | | private static final StringConfigAttribute changelogStub = |
| | | new StringConfigAttribute(CHANGELOG_SERVER_ATTR, |
| | |
| | | configAttributes.add(maxSendDelayAttr); |
| | | } |
| | | |
| | | Integer window; |
| | | IntegerConfigAttribute windowStub = |
| | | new IntegerConfigAttribute(WINDOW_SIZE, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | IntegerConfigAttribute windowAttr = |
| | | (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub); |
| | | if (windowAttr == null) |
| | | window = 100; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | window = windowAttr.activeIntValue(); |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | configDn = configEntry.getDN(); |
| | | DirectoryServer.registerConfigurableComponent(this); |
| | | |
| | |
| | | try |
| | | { |
| | | broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay); |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window); |
| | | synchronized (broker) |
| | | { |
| | | broker.start(changelogServers); |
| | |
| | | */ |
| | | public int getPendingUpdatesCount() |
| | | { |
| | | synchronized (pendingChanges) |
| | | { |
| | | return pendingChanges.size(); |
| | | } |
| | | return pendingChanges.size(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | return broker.getMaxRcvWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Get the current receive window size. |
| | | * |
| | | * @return The current receive window size. |
| | | */ |
| | | public int getCurrentRcvWindow() |
| | | { |
| | | return broker.getCurrentRcvWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum send window size. |
| | | * |
| | | * @return The maximum send window size. |
| | | */ |
| | | public int getMaxSendWindow() |
| | | { |
| | | return broker.getMaxSendWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Get the current send window size. |
| | | * |
| | | * @return The current send window size. |
| | | */ |
| | | public int getCurrentSendWindow() |
| | | { |
| | | return broker.getCurrentSendWindow(); |
| | | } |
| | | } |
| | |
| | | static final byte MSG_TYPE_ACK = 5; |
| | | static final byte MSG_TYPE_SERVER_START = 6; |
| | | static final byte MSG_TYPE_CHANGELOG_START = 7; |
| | | static final byte MSG_TYPE_WINDOW = 8; |
| | | |
| | | /** |
| | | * Do the processing necessary when the message is received. |
| | |
| | | case MSG_TYPE_CHANGELOG_START: |
| | | msg = new ChangelogStartMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_WINDOW: |
| | | msg = new WindowMessage(buffer); |
| | | break; |
| | | default: |
| | | throw new DataFormatException("received message with unknown type"); |
| | | } |
| | |
| | | attributes.add(attr); |
| | | |
| | | /* get number of received updates */ |
| | | final String ATTR_UPDATE_RECVD = "received-updates"; |
| | | AttributeType type = |
| | | DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_RECVD); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumRcvdUpdates()))); |
| | | attr = new Attribute(type, "received-updates", values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates()); |
| | | |
| | | /* get number of updates sent */ |
| | | final String ATTR_UPDATE_SENT = "sent-updates"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_SENT); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumSentUpdates()))); |
| | | attr = new Attribute(type, "sent-updates", values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "sent-updates", domain.getNumSentUpdates()); |
| | | |
| | | /* get number of changes in the pending list */ |
| | | final String ATTR_UPDATE_PENDING = "pending-updates"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_UPDATE_PENDING); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getPendingUpdatesCount()))); |
| | | attr = new Attribute(type, "pending-updates", values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "pending-updates", |
| | | domain.getPendingUpdatesCount()); |
| | | |
| | | /* get number of changes replayed */ |
| | | final String ATTR_REPLAYED_UPDATE = "replayed-updates"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumProcessedUpdates()))); |
| | | attr = new Attribute(type, ATTR_REPLAYED_UPDATE, values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "replayed-updates", |
| | | domain.getNumProcessedUpdates()); |
| | | |
| | | /* get number of changes successfully */ |
| | | final String ATTR_REPLAYED_UPDATE_OK = "replayed-updates-ok"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_REPLAYED_UPDATE_OK); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getNumReplayedPostOpCalled()))); |
| | | attr = new Attribute(type, ATTR_REPLAYED_UPDATE_OK, values); |
| | | attributes.add(attr); |
| | | addMonitorData(attributes, "replayed-updates-ok", |
| | | domain.getNumReplayedPostOpCalled()); |
| | | |
| | | /* get debugCount */ |
| | | final String DEBUG_COUNT = "debug-count"; |
| | | type = DirectoryServer.getDefaultAttributeType(DEBUG_COUNT); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, |
| | | String.valueOf(domain.getDebugCount()))); |
| | | attr = new Attribute(type, DEBUG_COUNT, values); |
| | | attributes.add(attr); |
| | | /* get window information */ |
| | | addMonitorData(attributes, "max-rcv-window", domain.getMaxRcvWindow()); |
| | | addMonitorData(attributes, "current-rcv-window", |
| | | domain.getCurrentRcvWindow()); |
| | | addMonitorData(attributes, "max-send-window", |
| | | domain.getMaxSendWindow()); |
| | | addMonitorData(attributes, "current-send-window", |
| | | domain.getCurrentSendWindow()); |
| | | |
| | | /* get the Server State */ |
| | | final String ATTR_SERVER_STATE = "server-state"; |
| | | type = DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | AttributeType type = |
| | | DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | for (String str : domain.getServerState().toStringSet()) |
| | | { |
| | | values.add(new AttributeValue(type,str)); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add an attribute with an integer value to the list of monitoring |
| | | * attributes. |
| | | * |
| | | * @param attributes the list of monitoring attributes |
| | | * @param name the name of the attribute to add. |
| | | * @param value The integer value of he attribute to add. |
| | | */ |
| | | private void addMonitorData(ArrayList<Attribute> attributes, |
| | | String name, int value) |
| | | { |
| | | Attribute attr; |
| | | AttributeType type; |
| | | LinkedHashSet<AttributeValue> values; |
| | | type = DirectoryServer.getDefaultAttributeType(name); |
| | | values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(type, String.valueOf(value))); |
| | | attr = new Attribute(type, name, values); |
| | | attributes.add(attr); |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the length of time in milliseconds that should elapse between |
| | | * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero |
| | | * return value indicates that the <CODE>updateMonitorData()</CODE> method |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | |
| | | /** |
| | | * This message is used by LDAP server when they first connect. |
| | | * to a changelog server to let them know who they are and what is their state |
| | | * (their RUV) |
| | | */ |
| | | public class WindowMessage extends SynchronizationMessage implements |
| | | Serializable |
| | | { |
| | | private static final long serialVersionUID = 8442267608764026867L; |
| | | private final int numAck; |
| | | |
| | | |
| | | /** |
| | | * Create a new WindowMessage. |
| | | * |
| | | * @param numAck The number of acknowledged messages. |
| | | * The window will be increase by this number. |
| | | */ |
| | | public WindowMessage(int numAck) |
| | | { |
| | | this.numAck = numAck; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new WindowMessage from its encoded form. |
| | | * |
| | | * @param in The byte array containing the encoded form of the |
| | | * WindowMessage. |
| | | * @throws DataFormatException If the byte array does not contain a valid |
| | | * encoded form of the WindowMessage. |
| | | */ |
| | | public WindowMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The WindowMessage is encoded in the form : |
| | | * <numAck> |
| | | */ |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_WINDOW) |
| | | throw new DataFormatException("input is not a valid Window Message"); |
| | | int pos = 1; |
| | | |
| | | /* |
| | | * read the number of acks contained in this message. |
| | | * first calculate the length then construct the string |
| | | */ |
| | | int length = getNextLength(in, pos); |
| | | String numAckStr = new String(in, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | numAck = Integer.parseInt(numAckStr); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | /* |
| | | * WindowMessage contains. |
| | | * <numAck> |
| | | */ |
| | | try { |
| | | byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8"); |
| | | |
| | | int length = 1 + byteNumAck.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_WINDOW; |
| | | int pos = 1; |
| | | |
| | | pos = addByteArray(byteNumAck, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Get the number of message acknowledged by the Window Message. |
| | | * |
| | | * @return the number of message acknowledged by the Window Message. |
| | | */ |
| | | public int getNumAck() |
| | | { |
| | | return numAck; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public UpdateMessage processReceive(SynchronizationDomain domain) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | |
| | | "org.opends.server.BuildRoot"; |
| | | |
| | | /** |
| | | * The name of the system property that specifies the ldap port. |
| | | * Set this prtoperty when running the server if you want to use a given |
| | | * port number, otherwise a port is choosed randomly at test startup time. |
| | | */ |
| | | public static final String PROPERTY_LDAP_PORT = |
| | | "org.opends.server.LdapPort"; |
| | | |
| | | /** |
| | | * The string representation of the DN that will be used as the base entry for |
| | | * the test backend. This must not be changed, as there are a number of test |
| | | * cases that depend on this specific value of "o=test". |
| | |
| | | ServerSocket serverJmxSocket = null; |
| | | ServerSocket serverLdapsSocket = null; |
| | | |
| | | serverLdapSocket = bindFreePort(); |
| | | serverLdapPort = serverLdapSocket.getLocalPort(); |
| | | String ldapPort = System.getProperty(PROPERTY_LDAP_PORT); |
| | | if (ldapPort == null) |
| | | { |
| | | serverLdapSocket = bindFreePort(); |
| | | serverLdapPort = serverLdapSocket.getLocalPort(); |
| | | } |
| | | else |
| | | { |
| | | serverLdapPort = Integer.valueOf(ldapPort); |
| | | serverLdapSocket = bindPort(serverLdapPort); |
| | | } |
| | | |
| | | serverJmxSocket = bindFreePort(); |
| | | serverJmxPort = serverJmxSocket.getLocalPort(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Binds to the given socket port on the local host. |
| | | * @return the bounded Server socket. |
| | | * |
| | | * @throws IOException in case of underlying exception. |
| | | * @throws SocketException in case of underlying exception. |
| | | */ |
| | | private static ServerSocket bindPort(int port) |
| | | throws IOException, SocketException |
| | | { |
| | | ServerSocket serverLdapSocket; |
| | | serverLdapSocket = new ServerSocket(); |
| | | serverLdapSocket.setReuseAddress(true); |
| | | serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port)); |
| | | return serverLdapSocket; |
| | | } |
| | | |
| | | /** |
| | | * Find and binds to a free server socket port on the local host. |
| | | * @return the bounded Server socket. |
| | | * |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.testng.Assert.*; |
| | | |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.config.ConfigEntry; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.Operation; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPException; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.OperationType; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchScope; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | /** |
| | | * Test the contructors, encoders and decoders of the synchronization AckMsg, |
| | | * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg |
| | | */ |
| | | public class ProtocolWindowTest |
| | | { |
| | | private static final int WINDOW_SIZE = 10; |
| | | |
| | | private static final String SYNCHRONIZATION_STRESS_TEST = |
| | | "Synchronization Stress Test"; |
| | | |
| | | /** |
| | | * The internal connection used for operation |
| | | */ |
| | | private InternalClientConnection connection; |
| | | |
| | | /** |
| | | * Created entries that need to be deleted for cleanup |
| | | */ |
| | | private ArrayList<Entry> entryList = new ArrayList<Entry>(); |
| | | |
| | | /** |
| | | * The Synchronization config manager entry |
| | | */ |
| | | private String synchroStringDN; |
| | | |
| | | /** |
| | | * The synchronization plugin entry |
| | | */ |
| | | private String synchroPluginStringDN; |
| | | |
| | | private Entry synchroPluginEntry; |
| | | |
| | | /** |
| | | * The Server synchro entry |
| | | */ |
| | | private String synchroServerStringDN; |
| | | |
| | | private Entry synchroServerEntry; |
| | | |
| | | /** |
| | | * The Change log entry |
| | | */ |
| | | private String changeLogStringDN; |
| | | |
| | | private Entry changeLogEntry; |
| | | |
| | | /** |
| | | * A "person" entry |
| | | */ |
| | | private Entry personEntry; |
| | | |
| | | /** |
| | | * schema check flag |
| | | */ |
| | | private boolean schemaCheck; |
| | | |
| | | // WORKAROUND FOR BUG #639 - BEGIN - |
| | | /** |
| | | * |
| | | */ |
| | | MultimasterSynchronization mms; |
| | | |
| | | // WORKAROUND FOR BUG #639 - END - |
| | | |
| | | /** |
| | | * Test the window mechanism by : |
| | | * - creating a Changelog service client using the ChangelogBroker class. |
| | | * - set a small window size. |
| | | * - perform more than the window size operations. |
| | | * - check that the Changelog has not sent more than window size operations. |
| | | * - receive all messages from the ChangelogBroker, check that |
| | | * the client receives the correct number of operations. |
| | | */ |
| | | @Test(enabled=true, groups="slow") |
| | | public void saturateAndRestart() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | cleanEntries(); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 13); |
| | | |
| | | try { |
| | | |
| | | /* Test that changelog monitor and synchro plugin monitor informations |
| | | * publish the correct window size. |
| | | * This allows both the check the monitoring code and to test that |
| | | * configuration is working. |
| | | */ |
| | | Thread.sleep(1500); |
| | | assertTrue(checkWindows(WINDOW_SIZE)); |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | // send twice the window modify operations |
| | | int count = WINDOW_SIZE * 2; |
| | | processModify(count); |
| | | |
| | | // let some time to the message to reach the changelog client |
| | | Thread.sleep(500); |
| | | |
| | | // check that the changelog only sent WINDOW_SIZE messages |
| | | assertTrue(searchUpdateSent()); |
| | | |
| | | int rcvCount=0; |
| | | try |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | rcvCount++; |
| | | } |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | {} |
| | | /* |
| | | * check that we received all updates |
| | | */ |
| | | assertEquals(rcvCount, WINDOW_SIZE*2); |
| | | } |
| | | finally { |
| | | broker.stop(); |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check that the window configuration has been successfull |
| | | * by reading the monitoring information and checking |
| | | * that we do have 2 entries with the configured max-rcv-window. |
| | | */ |
| | | private boolean checkWindows(int windowSize) throws LDAPException |
| | | { |
| | | InternalSearchOperation op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(max-rcv-window=" + windowSize + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 3); |
| | | } |
| | | |
| | | /** |
| | | * Search that the changelog has stopped sending changes after |
| | | * having reach the limit of the window size. |
| | | * Do this by checking the monitoring information. |
| | | */ |
| | | private boolean searchUpdateSent() throws Exception |
| | | { |
| | | InternalSearchOperation op = connection.processSearch( |
| | | new ASN1OctetString("cn=monitor"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(update-sent=" + WINDOW_SIZE + ")")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | return (op.getEntriesSent() == 1); |
| | | } |
| | | |
| | | /** |
| | | * Set up the environment for performing the tests in this Class. |
| | | * synchronization |
| | | * |
| | | * @throws Exception |
| | | * If the environment could not be set up. |
| | | */ |
| | | @BeforeClass |
| | | public void setUp() throws Exception |
| | | { |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // Disable schema check |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | DirectoryServer.setCheckSchema(false); |
| | | |
| | | // Create an internal connection |
| | | connection = new InternalClientConnection(); |
| | | |
| | | // Create backend top level entries |
| | | String[] topEntries = new String[2]; |
| | | topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n" |
| | | + "objectClass: domain\n"; |
| | | topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n" |
| | | + "objectClass: organizationalUnit\n" |
| | | + "entryUUID: 11111111-1111-1111-1111-111111111111\n"; |
| | | Entry entry; |
| | | for (int i = 0; i < topEntries.length; i++) |
| | | { |
| | | entry = TestCaseUtils.entryFromLdifString(topEntries[i]); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), |
| | | entry.getUserAttributes(), entry.getOperationalAttributes()); |
| | | addOp.setInternalOperation(true); |
| | | addOp.run(); |
| | | entryList.add(entry); |
| | | } |
| | | |
| | | // top level synchro provider |
| | | synchroStringDN = "cn=Synchronization Providers,cn=config"; |
| | | |
| | | // Multimaster Synchro plugin |
| | | synchroPluginStringDN = "cn=Multimaster Synchronization, " |
| | | + synchroStringDN; |
| | | String synchroPluginLdif = "dn: " |
| | | + synchroPluginStringDN |
| | | + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-provider\n" |
| | | + "ds-cfg-synchronization-provider-enabled: true\n" |
| | | + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n"; |
| | | synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif); |
| | | |
| | | // Change log |
| | | changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN; |
| | | String changeLogLdif = "dn: " + changeLogStringDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-changelog-server-config\n" |
| | | + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n" |
| | | + "ds-cfg-changelog-server-id: 1\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE; |
| | | changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); |
| | | |
| | | // suffix synchronized |
| | | synchroServerStringDN = "cn=example, " + synchroPluginStringDN; |
| | | String synchroServerLdif = "dn: " + synchroServerStringDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: ds-cfg-synchronization-provider-config\n" |
| | | + "cn: example\n" |
| | | + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n" |
| | | + "ds-cfg-changelog-server: localhost:8989\n" |
| | | + "ds-cfg-directory-server-id: 1\n" |
| | | + "ds-cfg-receive-status: true\n" |
| | | + "ds-cfg-window-size: " + WINDOW_SIZE; |
| | | synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); |
| | | |
| | | String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n" |
| | | + "objectClass: top\n" + "objectClass: person\n" |
| | | + "objectClass: organizationalPerson\n" |
| | | + "objectClass: inetOrgPerson\n" + "uid: user.1\n" |
| | | + "homePhone: 951-245-7634\n" |
| | | + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" |
| | | + "mobile: 027-085-0537\n" |
| | | + "postalAddress: Aaccf Amar$17984 Thirteenth Street" |
| | | + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" |
| | | + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" |
| | | + "street: 17984 Thirteenth Street\n" |
| | | + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" |
| | | + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" |
| | | + "userPassword: password\n" + "initials: AA\n"; |
| | | personEntry = TestCaseUtils.entryFromLdifString(personLdif); |
| | | |
| | | configureSynchronization(); |
| | | } |
| | | |
| | | /** |
| | | * Clean up the environment. return null; |
| | | * |
| | | * @throws Exception |
| | | * If the environment could not be set up. |
| | | */ |
| | | @AfterClass |
| | | public void classCleanUp() throws Exception |
| | | { |
| | | DirectoryServer.setCheckSchema(schemaCheck); |
| | | |
| | | // WORKAROUND FOR BUG #639 - BEGIN - |
| | | DirectoryServer.deregisterSynchronizationProvider(mms); |
| | | mms.finalizeSynchronizationProvider(); |
| | | // WORKAROUND FOR BUG #639 - END - |
| | | |
| | | cleanEntries(); |
| | | } |
| | | |
| | | /** |
| | | * suppress all the entries created by the tests in this class |
| | | */ |
| | | private void cleanEntries() |
| | | { |
| | | DeleteOperation op; |
| | | // Delete entries |
| | | Entry entries[] = entryList.toArray(new Entry[0]); |
| | | for (int i = entries.length - 1; i != 0; i--) |
| | | { |
| | | try |
| | | { |
| | | op = new DeleteOperation(connection, InternalClientConnection |
| | | .nextOperationID(), InternalClientConnection.nextMessageID(), null, |
| | | entries[i].getDN()); |
| | | op.run(); |
| | | } catch (Exception e) |
| | | { |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * @return |
| | | */ |
| | | private List<Modification> generatemods(String attrName, String attrValue) |
| | | { |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrName.toLowerCase(), true); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(attrType, attrValue)); |
| | | Attribute attr = new Attribute(attrType, attrName, values); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | Modification mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | | return mods; |
| | | } |
| | | |
| | | /** |
| | | * Open a changelog session to the local Changelog server. |
| | | * |
| | | */ |
| | | private ChangelogBroker openChangelogSession(final DN baseDn, short serverId) |
| | | throws Exception, SocketException |
| | | { |
| | | ServerState state = new ServerState(baseDn); |
| | | state.loadState(); |
| | | ChangelogBroker broker = |
| | | new ChangelogBroker(state, baseDn, serverId, 0, 0, 0, 0, WINDOW_SIZE); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:8989"); |
| | | broker.start(servers); |
| | | broker.setSoTimeout(5000); |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { } |
| | | return broker; |
| | | } |
| | | |
| | | /** |
| | | * Configure the Synchronization for this test. |
| | | */ |
| | | private void configureSynchronization() throws Exception |
| | | { |
| | | // |
| | | // Add the Multimaster synchronization plugin |
| | | DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null); |
| | | entryList.add(synchroPluginEntry); |
| | | assertNotNull(DirectoryServer.getConfigEntry(DN |
| | | .decode(synchroPluginStringDN)), |
| | | "Unable to add the Multimaster synchronization plugin"); |
| | | |
| | | // WORKAROUND FOR BUG #639 - BEGIN - |
| | | DN dn = DN.decode(synchroPluginStringDN); |
| | | ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn); |
| | | mms = new MultimasterSynchronization(); |
| | | try |
| | | { |
| | | mms.initializeSynchronizationProvider(mmsConfigEntry); |
| | | } |
| | | catch (ConfigException e) |
| | | { |
| | | assertTrue(false, |
| | | "Unable to initialize the Multimaster synchronization plugin"); |
| | | } |
| | | DirectoryServer.registerSynchronizationProvider(mms); |
| | | // WORKAROUND FOR BUG #639 - END - |
| | | |
| | | // |
| | | // Add the changelog server |
| | | DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), |
| | | "Unable to add the changeLog server"); |
| | | entryList.add(changeLogEntry); |
| | | |
| | | // |
| | | // We also have a replicated suffix (synchronization domain) |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the syncrhonized server"); |
| | | entryList.add(synchroServerEntry); |
| | | } |
| | | |
| | | private void processModify(int count) |
| | | { |
| | | while (count>0) |
| | | { |
| | | count--; |
| | | // must generate the mods for every operation because they are modified |
| | | // by processModify. |
| | | List<Modification> mods = generatemods("telephonenumber", "01 02 45"); |
| | | |
| | | ModifyOperation modOp = |
| | | connection.processModify(personEntry.getDN(), mods); |
| | | assertEquals(modOp.getResultCode(), ResultCode.SUCCESS); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.assertNotNull; |
| | | import static org.testng.Assert.assertTrue; |
| | |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | |
| | | @Test(enabled=true, groups="slow") |
| | | public void fromServertoBroker() throws Exception |
| | | { |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting Synchronization StressTest : fromServertoBroker" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | final int TOTAL_MESSAGES = 1000; |
| | | cleanEntries(); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 3); |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 18); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | |
| | | try { |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | while (true) |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | broker.receive(); |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | catch (Exception e) |
| | | { } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | reader = new BrokerReader(broker); |
| | | reader.start(); |
| | | reader = new BrokerReader(broker); |
| | | reader.start(); |
| | | |
| | | long startTime = TimeThread.getTime(); |
| | | int count = TOTAL_MESSAGES; |
| | | long startTime = TimeThread.getTime(); |
| | | int count = TOTAL_MESSAGES; |
| | | |
| | | // Create a number of writer thread that will loop modifying the entry |
| | | List<Thread> writerThreadList = new LinkedList<Thread>(); |
| | | for (int n = 0; n < 1; n++) |
| | | { |
| | | BrokerWriter writer = new BrokerWriter(count); |
| | | writerThreadList.add(writer); |
| | | } |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.start(); |
| | | } |
| | | // wait for all the threads to finish. |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.join(); |
| | | } |
| | | // Create a number of writer thread that will loop modifying the entry |
| | | List<Thread> writerThreadList = new LinkedList<Thread>(); |
| | | for (int n = 0; n < 1; n++) |
| | | { |
| | | BrokerWriter writer = new BrokerWriter(count); |
| | | writerThreadList.add(writer); |
| | | } |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.start(); |
| | | } |
| | | // wait for all the threads to finish. |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.join(); |
| | | } |
| | | |
| | | long afterSendTime = TimeThread.getTime(); |
| | | long afterSendTime = TimeThread.getTime(); |
| | | |
| | | int rcvCount = reader.getCount(); |
| | | long afterReceiveTime = TimeThread.getTime(); |
| | | int rcvCount = reader.getCount(); |
| | | |
| | | long afterReceiveTime = TimeThread.getTime(); |
| | | |
| | | if (rcvCount != TOTAL_MESSAGES) |
| | | { |
| | | fail("some messages were lost : expected : " +TOTAL_MESSAGES + |
| | | " received : " + rcvCount); |
| | | } |
| | | if (rcvCount != TOTAL_MESSAGES) |
| | | { |
| | | fail("some messages were lost : expected : " +TOTAL_MESSAGES + |
| | | " received : " + rcvCount); |
| | | } |
| | | |
| | | } |
| | | finally { |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | broker.stop(); |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | broker.stop(); |
| | | } |
| | | } |
| | | |
| | |
| | | ServerState state = new ServerState(baseDn); |
| | | state.loadState(); |
| | | ChangelogBroker broker = new ChangelogBroker(state, baseDn, |
| | | serverId, 0, 0, 0, 0); |
| | | serverId, 0, 0, 0, 0, 100); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:8989"); |
| | | broker.start(servers); |
| | |
| | | // We also have a replicated suffix (synchronization domain) |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the syncrhonized server"); |
| | | "Unable to add the synchronized server"); |
| | | entryList.add(synchroServerEntry); |
| | | } |
| | | |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | SynchronizationMessage msg = broker.receive(); |
| | | if (msg == null) |
| | | break; |
| | | count ++; |
| | | } |
| | | } catch (Exception e) { |
| | |
| | | return count; |
| | | try |
| | | { |
| | | this.wait(); |
| | | this.wait(60); |
| | | return count; |
| | | } catch (InterruptedException e) |
| | | { |
| | |
| | | // Check that retrieved CN is OK |
| | | msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes()); |
| | | } |
| | | |
| | | @DataProvider(name="serverStart") |
| | | public Object [][] createServerStartMessageTestData() throws Exception |
| | | { |
| | | DN baseDN = DN.decode("dc=example, dc=com"); |
| | | ServerState state = new ServerState(baseDN); |
| | | return new Object [][] { {(short)1, baseDN, 100, state} }; |
| | | } |
| | | /** |
| | | * Test that ServerStartMessage encoding and decoding works |
| | | * by checking that : msg == new ServerStartMessage(msg.getBytes()). |
| | | */ |
| | | @Test(dataProvider="serverStart") |
| | | public void ServerStartMessageTest(short serverId, DN baseDN, int window, |
| | | ServerState state) throws Exception |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ServerStartMessage msg = new ServerStartMessage(serverId, baseDN, |
| | | window, window, window, window, window, state); |
| | | ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | } |
| | | |
| | | @DataProvider(name="changelogStart") |
| | | public Object [][] createChangelogStartMessageTestData() throws Exception |
| | | { |
| | | DN baseDN = DN.decode("dc=example, dc=com"); |
| | | ServerState state = new ServerState(baseDN); |
| | | return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} }; |
| | | } |
| | | |
| | | /** |
| | | * Test that changelogStartMessage encoding and decoding works |
| | | * by checking that : msg == new ChangelogStartMessage(msg.getBytes()). |
| | | */ |
| | | @Test(dataProvider="changelogStart") |
| | | public void ChangelogStartMessageTest(short serverId, DN baseDN, int window, |
| | | String url, ServerState state) throws Exception |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ChangelogStartMessage msg = new ChangelogStartMessage(serverId, |
| | | url, baseDN, window, state); |
| | | ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | } |
| | | |
| | | /** |
| | | * Test that WindowMessageTest encoding and decoding works |
| | | * by checking that : msg == new WindowMessageTest(msg.getBytes()). |
| | | */ |
| | | @Test() |
| | | public void WindowMessageTest() throws Exception |
| | | { |
| | | WindowMessage msg = new WindowMessage(123); |
| | | WindowMessage newMsg = new WindowMessage(msg.getBytes()); |
| | | assertEquals(msg.getNumAck(), newMsg.getNumAck()); |
| | | } |
| | | |
| | | /** |
| | | * Test PendingChange |
| | |
| | | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.testng.Assert.*; |
| | | |
| | | import java.net.SocketException; |
| | |
| | | @Test(enabled=true) |
| | | public void namingConflicts() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : namingConflicts" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | /* |
| | |
| | | @Test(enabled=true, dataProvider="assured") |
| | | public void updateOperations(boolean assured) throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : updateOperations " + assured , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | cleanEntries(); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 27); |
| | | try { |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 3); |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0); |
| | | |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | // broker.setSoTimeout(100); |
| | | try |
| | | { |
| | | while (true) |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | broker.receive(); |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // broker.setSoTimeout(1000); |
| | | } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | catch (Exception e) |
| | | {} |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | |
| | | // Create an Entry (add operation) |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | // Create an Entry (add operation) |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | // Modify the entry |
| | | List<Modification> mods = generatemods("telephonenumber", "01 02 45"); |
| | | // Modify the entry |
| | | List<Modification> mods = generatemods("telephonenumber", "01 02 45"); |
| | | |
| | | ModifyOperation modOp = new ModifyOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), mods); |
| | | modOp.setInternalOperation(true); |
| | | modOp.run(); |
| | | ModifyOperation modOp = new ModifyOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), mods); |
| | | modOp.setInternalOperation(true); |
| | | modOp.run(); |
| | | |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyMsg, |
| | | "The received synchronization message is not a MODIFY msg"); |
| | | ModifyMsg modMsg = (ModifyMsg) msg; |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyMsg, |
| | | "The received synchronization message is not a MODIFY msg"); |
| | | ModifyMsg modMsg = (ModifyMsg) msg; |
| | | |
| | | receivedOp = modMsg.createOperation(connection); |
| | | assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY synchronization message is not for the excepted DN"); |
| | | receivedOp = modMsg.createOperation(connection); |
| | | assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY synchronization message is not for the excepted DN"); |
| | | |
| | | // Modify the entry DN |
| | | DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ; |
| | | ModifyDNOperation modDNOp = new ModifyDNOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), RDN |
| | | .decode("uid=new person"), true, DN |
| | | .decode("ou=People,dc=example,dc=com")); |
| | | modDNOp.run(); |
| | | assertNotNull(DirectoryServer.getEntry(newDN), |
| | | "The MOD_DN operation didn't create the new person entry"); |
| | | assertNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The MOD_DN operation didn't delete the old person entry"); |
| | | entryList.add(DirectoryServer.getEntry(newDN)); |
| | | // Modify the entry DN |
| | | DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ; |
| | | ModifyDNOperation modDNOp = new ModifyDNOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, personEntry.getDN(), RDN |
| | | .decode("uid=new person"), true, DN |
| | | .decode("ou=People,dc=example,dc=com")); |
| | | modDNOp.run(); |
| | | assertNotNull(DirectoryServer.getEntry(newDN), |
| | | "The MOD_DN operation didn't create the new person entry"); |
| | | assertNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The MOD_DN operation didn't delete the old person entry"); |
| | | entryList.add(DirectoryServer.getEntry(newDN)); |
| | | |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyDNMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | ModifyDNMsg moddnMsg = (ModifyDNMsg) msg; |
| | | receivedOp = moddnMsg.createOperation(connection); |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof ModifyDNMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | ModifyDNMsg moddnMsg = (ModifyDNMsg) msg; |
| | | receivedOp = moddnMsg.createOperation(connection); |
| | | |
| | | assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY_DN message is not for the excepted DN"); |
| | | assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0, |
| | | "The received MODIFY_DN message is not for the excepted DN"); |
| | | |
| | | // Delete the entry |
| | | Entry newPersonEntry = DirectoryServer.getEntry(newDN) ; |
| | | DeleteOperation delOp = new DeleteOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")); |
| | | delOp.run(); |
| | | assertNull(DirectoryServer.getEntry(newDN), |
| | | "Unable to delete the new person Entry"); |
| | | entryList.remove(newPersonEntry); |
| | | // Delete the entry |
| | | Entry newPersonEntry = DirectoryServer.getEntry(newDN) ; |
| | | DeleteOperation delOp = new DeleteOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")); |
| | | delOp.run(); |
| | | assertNull(DirectoryServer.getEntry(newDN), |
| | | "Unable to delete the new person Entry"); |
| | | entryList.remove(newPersonEntry); |
| | | |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof DeleteMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | DeleteMsg delMsg = (DeleteMsg) msg; |
| | | receivedOp = delMsg.createOperation(connection); |
| | | assertTrue(DN.decode(delMsg.getDn()).compareTo(DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")) == 0, |
| | | "The received DELETE message is not for the excepted DN"); |
| | | // See if the client has received the msg |
| | | msg = broker.receive(); |
| | | assertTrue(msg instanceof DeleteMsg, |
| | | "The received synchronization message is not a MODIFY DN msg"); |
| | | DeleteMsg delMsg = (DeleteMsg) msg; |
| | | receivedOp = delMsg.createOperation(connection); |
| | | assertTrue(DN.decode(delMsg.getDn()).compareTo(DN |
| | | .decode("uid= new person,ou=People,dc=example,dc=com")) == 0, |
| | | "The received DELETE message is not for the excepted DN"); |
| | | |
| | | /* |
| | | * Now check that when we send message to the Changelog server |
| | | * and that they are received and correctly replayed by the server. |
| | | * |
| | | * Start by testing the Add message reception |
| | | */ |
| | | addMsg = new AddMsg(gen.NewChangeNumber(), |
| | | personWithUUIDEntry.getDN().toString(), |
| | | user1entryUUID, baseUUID, |
| | | personWithUUIDEntry.getObjectClassAttribute(), |
| | | personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>()); |
| | | if (assured) |
| | | addMsg.setAssured(); |
| | | broker.publish(addMsg); |
| | | /* |
| | | * Now check that when we send message to the Changelog server |
| | | * and that they are received and correctly replayed by the server. |
| | | * |
| | | * Start by testing the Add message reception |
| | | */ |
| | | addMsg = new AddMsg(gen.NewChangeNumber(), |
| | | personWithUUIDEntry.getDN().toString(), |
| | | user1entryUUID, baseUUID, |
| | | personWithUUIDEntry.getObjectClassAttribute(), |
| | | personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>()); |
| | | if (assured) |
| | | addMsg.setAssured(); |
| | | broker.publish(addMsg); |
| | | |
| | | /* |
| | | * Check that the entry has been created in the local DS. |
| | | */ |
| | | Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true); |
| | | assertNotNull(resultEntry, |
| | | "The send ADD synchronization message was not applied"); |
| | | entryList.add(resultEntry); |
| | | /* |
| | | * Check that the entry has been created in the local DS. |
| | | */ |
| | | Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000, true); |
| | | assertNotNull(resultEntry, |
| | | "The send ADD synchronization message was not applied"); |
| | | entryList.add(resultEntry); |
| | | |
| | | /* |
| | | * Test the reception of Modify Msg |
| | | */ |
| | | modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(), |
| | | mods, user1entryUUID); |
| | | if (assured) |
| | | modMsg.setAssured(); |
| | | broker.publish(modMsg); |
| | | /* |
| | | * Test the reception of Modify Msg |
| | | */ |
| | | modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(), |
| | | mods, user1entryUUID); |
| | | if (assured) |
| | | modMsg.setAssured(); |
| | | broker.publish(modMsg); |
| | | |
| | | boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(), |
| | | "telephonenumber", "01 02 45", 1000); |
| | | boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(), |
| | | "telephonenumber", "01 02 45", 1000); |
| | | |
| | | if (found == false) |
| | | fail("The modification has not been correctly replayed."); |
| | | if (found == false) |
| | | fail("The modification has not been correctly replayed."); |
| | | |
| | | /* |
| | | * Test the Reception of Modify Dn Msg |
| | | */ |
| | | moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(), |
| | | gen.NewChangeNumber(), |
| | | user1entryUUID, null, |
| | | true, null, "uid= new person"); |
| | | if (assured) |
| | | moddnMsg.setAssured(); |
| | | broker.publish(moddnMsg); |
| | | /* |
| | | * Test the Reception of Modify Dn Msg |
| | | */ |
| | | moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(), |
| | | gen.NewChangeNumber(), |
| | | user1entryUUID, null, |
| | | true, null, "uid= new person"); |
| | | if (assured) |
| | | moddnMsg.setAssured(); |
| | | broker.publish(moddnMsg); |
| | | |
| | | resultEntry = getEntry( |
| | | DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true); |
| | | resultEntry = getEntry( |
| | | DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, true); |
| | | |
| | | assertNotNull(resultEntry, |
| | | "The modify DN synchronization message was not applied"); |
| | | assertNotNull(resultEntry, |
| | | "The modify DN synchronization message was not applied"); |
| | | |
| | | /* |
| | | * Test the Reception of Delete Msg |
| | | */ |
| | | delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com", |
| | | gen.NewChangeNumber(), user1entryUUID); |
| | | if (assured) |
| | | delMsg.setAssured(); |
| | | broker.publish(delMsg); |
| | | resultEntry = getEntry( |
| | | /* |
| | | * Test the Reception of Delete Msg |
| | | */ |
| | | delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com", |
| | | gen.NewChangeNumber(), user1entryUUID); |
| | | if (assured) |
| | | delMsg.setAssured(); |
| | | broker.publish(delMsg); |
| | | resultEntry = getEntry( |
| | | DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000, false); |
| | | |
| | | assertNull(resultEntry, |
| | | "The DELETE synchronization message was not replayed"); |
| | | |
| | | broker.stop(); |
| | | assertNull(resultEntry, |
| | | "The DELETE synchronization message was not replayed"); |
| | | } |
| | | finally |
| | | { |
| | | broker.stop(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | ServerState state = new ServerState(baseDn); |
| | | state.loadState(); |
| | | ChangelogBroker broker = new ChangelogBroker(state, baseDn, |
| | | serverId, 0, 0, 0, 0); |
| | | serverId, 0, 0, 0, 0, 100); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:8989"); |
| | | broker.start(servers); |
| | |
| | | @Test(enabled=true) |
| | | public void deleteNoSuchObject() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : deleteNoSuchObject" , 1); |
| | | |
| | | DN dn = DN.decode("cn=No Such Object,ou=People,dc=example,dc=com"); |
| | | Operation op = |
| | | new DeleteOperation(connection, |
| | |
| | | @Test(enabled=true) |
| | | public void infiniteReplayLoop() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : infiniteReplayLoop" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 3); |
| | | Thread.sleep(2000); |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 11); |
| | | try |
| | | { |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0); |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0); |
| | | |
| | | // Create a test entry. |
| | | String personLdif = "dn: uid=user.2,ou=People,dc=example,dc=com\n" |