opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/MultimasterDomainConfiguration.xml
@@ -244,4 +244,44 @@ </ldap:attribute> </adm:profile> </adm:property> <adm:property name="isolation-policy" mandatory="false" multi-valued="false"> <adm:synopsis> <adm:user-friendly-name /> indicates the behavior of the LDAP server if an update is attempted when replication has been configured but none of the configured Replication Servers are up an running when the update is received. </adm:synopsis> <adm:default-behavior> <adm:defined> <adm:value>reject-all-updates</adm:value></adm:defined></adm:default-behavior> <adm:syntax> <adm:enumeration> <adm:value name="accept-all-updates"> <adm:synopsis> Indicates that updates should be accepted even though it is not possible to send them to any Replication Server. Best effort will be made to resend those updates to a Replication Servers when one of them is available, however those changes will be at risk because they will only be available from the historical information. This mode may also introduce high replication latency. </adm:synopsis> </adm:value> <adm:value name="reject-all-updates"> <adm:synopsis> Indicates that all updates attempted on this replicated base-dn on this server when no Replication Server is available will be denied. </adm:synopsis> </adm:value> </adm:enumeration> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:oid>1.3.6.1.4.1.26027.1.1.383</ldap:oid> <ldap:name>ds-cfg-isolation-policy</ldap:name> </ldap:attribute> </adm:profile> </adm:property> </adm:managed-object> opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -427,6 +427,21 @@ public static final int MSGID_CHANGELOG_UNSUPPORTED_UTF8_ENCODING = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 60; /** * An update operation is aborted on error because the replication * is defined but the replicationDomain could not contact any * of the ReplicationServer. */ public static final int MSGID_REPLICATION_COULD_NOT_CONNECT = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 61; /** * After a failure to connect to any replication server the * replication was finally able to connect. */ public static final int MSGID_NOW_FOUND_CHANGELOG = CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 62; /** * Register the messages from this class in the core server. @@ -483,7 +498,8 @@ "Could not find a replication server that has seen all the local" + " changes. Going to replay changes"); registerMessage(MSGID_COULD_NOT_FIND_CHANGELOG, "Could not connect to any replication server, retrying..."); "Could not connect to any replication server on suffix %s, " +"retrying..."); registerMessage(MSGID_EXCEPTION_CLOSING_DATABASE, "Error closing changelog database %s : "); registerMessage(MSGID_EXCEPTION_DECODING_OPERATION, @@ -587,5 +603,10 @@ "The JVM does not support UTF-8. This is required to be able to " + "encode the changes in the database. " + "This replication server will now shutdown"); registerMessage(MSGID_REPLICATION_COULD_NOT_CONNECT, "The Replication is configured for suffix %s " + "but was not able to connect to any Replication Server"); registerMessage(MSGID_NOW_FOUND_CHANGELOG, "A Replication Server was found for suffix %s"); } } opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -119,6 +119,15 @@ */ private int numLostConnections = 0; /** * When the broker cannort connect to any replication server * it log an error and keeps continuing every second. * This boolean is set when the first failure happens and is used * to avoid repeating the error message for further failure to connect * and to know that it is necessary to print a new message when the broker * finally succeed to connect. */ private boolean connectionError = false; /** * Creates a new ReplicationServer Broker for a particular ReplicationDomain. @@ -349,11 +358,16 @@ * There was no server waiting on this host:port * Log a notice and try the next replicationServer in the list */ int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING; String message = getMessage(msgID, server); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); if (!connectionError ) { // the error message is only logged once to avoid overflowing // the error log int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING; String message = getMessage(msgID, server); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); } } catch (Exception e) { @@ -402,18 +416,36 @@ } } if (!connected) if (connected) { // This server has connected correctly. // let's check if it was previosuly on error, in this case log // a message to let the administratot know that the failure was resolved. if (connectionError) { connectionError = false; int msgID = MSGID_NOW_FOUND_CHANGELOG; String message = getMessage(msgID, baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); } } else { /* * This server could not find any replicationServer * It's going to start in degraded mode. * Log a message */ checkState = false; int msgID = MSGID_COULD_NOT_FIND_CHANGELOG; String message = getMessage(msgID); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); if (!connectionError) { checkState = false; connectionError = true; int msgID = MSGID_COULD_NOT_FIND_CHANGELOG; String message = getMessage(msgID, baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); } } } @@ -502,6 +534,8 @@ while (!done) { if (connectionError) return; synchronized (lock) { try @@ -536,7 +570,9 @@ while (shutdown == false) { if (!connected) { reStart(); } ProtocolSession failingSession = session; try @@ -750,4 +786,14 @@ return protocolVersion; } /** * Check if the broker is connected to a ReplicationServer and therefore * ready to received and send Replication Messages. * * @return true if the server is connected, false if not. */ public boolean isConnected() { return !connectionError; } } opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -52,6 +52,7 @@ import java.util.zip.DataFormatException; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*; import org.opends.server.admin.std.server.MultimasterDomainCfg; import org.opends.server.admin.std.server.BackendCfg; import org.opends.server.api.Backend; @@ -192,6 +193,41 @@ private long heartbeatInterval = 0; short serverId; // The context related to an import or export being processed // Null when none is being processed. private IEContext ieContext = null; // The backend information necessary to make an import or export. private Backend backend; private List<DN> branches = new ArrayList<DN>(0); private int listenerThreadNumber = 10; private Collection<String> replicationServers; private DN baseDN; private boolean shutdown = false; private InternalClientConnection conn = InternalClientConnection.getRootConnection(); private boolean solveConflictFlag = true; private boolean disabled = false; private boolean stateSavingDisabled = false; private int window = 100; /** * The isoalation policy that this domain is going to use. * This field describes the behavior of the domain when an update is * attempted and the domain could not connect to any Replication Server. * Possible values are accept-updates or deny-updates, but other values * may be added in the futur. */ private IsolationPolicy isolationpolicy; /** * This class contain the context related to an import or export * launched on the domain. @@ -272,33 +308,6 @@ } } // The context related to an import or export being processed // Null when none is being processed. private IEContext ieContext = null; // The backend information necessary to make an import or export. private Backend backend; private List<DN> branches = new ArrayList<DN>(0); private int listenerThreadNumber = 10; private Collection<String> replicationServers; private DN baseDN; private boolean shutdown = false; private InternalClientConnection conn = InternalClientConnection.getRootConnection(); private boolean solveConflictFlag = true; private boolean disabled = false; private boolean stateSavingDisabled = false; private int window = 100; /** * Creates a new ReplicationDomain using configuration from configEntry. * @@ -320,6 +329,7 @@ maxSendDelay = (int) configuration.getMaxSendDelay(); window = configuration.getWindowSize(); heartbeatInterval = configuration.getHeartbeatInterval(); isolationpolicy = configuration.getIsolationPolicy(); /* * Modify conflicts are solved for all suffixes but the schema suffix @@ -412,6 +422,12 @@ public SynchronizationProviderResult handleConflictResolution( DeleteOperation deleteOperation) { if ((!deleteOperation.isSynchronizationOperation()) && (!brokerIsConnected(deleteOperation))) { return new SynchronizationProviderResult(false); } DeleteContext ctx = (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT); Entry deletedEntry = deleteOperation.getEntryToDelete(); @@ -464,6 +480,12 @@ public SynchronizationProviderResult handleConflictResolution( AddOperation addOperation) { if ((!addOperation.isSynchronizationOperation()) && (!brokerIsConnected(addOperation))) { return new SynchronizationProviderResult(false); } if (addOperation.isSynchronizationOperation()) { AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT); @@ -520,6 +542,54 @@ } /** * Check that the broker associated to this ReplicationDomain has found * a Replication Server and that this LDAP server is therefore able to * process operations. * If not set the ResultCode and the response message, * interrupt the operation, and return false * * @param Operation The Operation that needs to be checked. * * @return true when it OK to process the Operation, false otherwise. * When false is returned the resultCode and the reponse message * is also set in the Operation. */ private boolean brokerIsConnected(Operation op) { if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) { // this policy imply that we always aceept updates. return true; } if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) { // this isolation policy specifies that the updates are denied // when the broker is not connected. if (broker.isConnected()) { return true; } else { String msg = getMessage(MSGID_REPLICATION_COULD_NOT_CONNECT, baseDN.toString()); DirectoryException result = new DirectoryException( ResultCode.UNWILLING_TO_PERFORM, msg, MSGID_REPLICATION_COULD_NOT_CONNECT); op.setResponseData(result); return false; } } // we should never get there as the only possible policies are // ACCEPT_UPDATES and DENY_UPDATES return true; } /** * Implement the handleConflictResolution phase of the ModifyDNOperation. * * @param modifyDNOperation The ModifyDNOperation. @@ -529,6 +599,12 @@ public SynchronizationProviderResult handleConflictResolution( ModifyDNOperation modifyDNOperation) { if ((!modifyDNOperation.isSynchronizationOperation()) && (!brokerIsConnected(modifyDNOperation))) { return new SynchronizationProviderResult(false); } ModifyDnContext ctx = (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT); if (ctx != null) @@ -600,6 +676,12 @@ public SynchronizationProviderResult handleConflictResolution( ModifyOperation modifyOperation) { if ((!modifyOperation.isSynchronizationOperation()) && (!brokerIsConnected(modifyOperation))) { return new SynchronizationProviderResult(false); } ModifyContext ctx = (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT); @@ -2808,7 +2890,6 @@ MultimasterDomainCfg configuration, List<String> unacceptableReasons) { // Check that there is not already a domain with the same DN // TODO : Check that the server id is a short DN dn = configuration.getReplicationDN(); if (MultimasterReplication.findDomain(dn,null) != null) { @@ -2826,6 +2907,8 @@ MultimasterDomainCfg configuration) { // server id and base dn are readonly. // isolationPolicy can be set immediately and will apply // to the next updates. // The other parameters needs to be renegociated with the ReplicationServer. // so that requires restarting the session with the ReplicationServer. replicationServers = configuration.getReplicationServer(); @@ -2837,6 +2920,7 @@ heartbeatInterval = configuration.getHeartbeatInterval(); broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window, heartbeatInterval); isolationpolicy = configuration.getIsolationPolicy(); return new ConfigChangeResult(ResultCode.SUCCESS, false); } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -32,6 +32,7 @@ import org.opends.server.admin.PropertyProvider; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.client.MultimasterDomainCfgClient; import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.IsolationPolicy; import org.opends.server.admin.std.server.MultimasterDomainCfg; import org.opends.server.types.DN; @@ -45,6 +46,7 @@ private int serverId; private SortedSet<String> replicationServers; private long heartbeatInterval = 1000; private IsolationPolicy policy = IsolationPolicy.REJECT_ALL_UPDATES; /** * Creates a new Domain with the provided information @@ -180,4 +182,21 @@ heartbeatInterval = interval; } /** * Get the isolation policy. */ public IsolationPolicy getIsolationPolicy() { return policy; } /** * Set the isolation policy. * * @param policy the policy that must now be used. */ public void setIsolationPolicy(IsolationPolicy policy) { this.policy = policy; } } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java
New file @@ -0,0 +1,174 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.ServerSocket; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import static org.testng.Assert.*; import org.opends.server.TestCaseUtils; import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.IsolationPolicy; import org.opends.server.api.SynchronizationProvider; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.types.DN; import org.opends.server.types.ResultCode; import org.testng.annotations.Test; /** * Test behavior of an LDAP server that is not able to connect * to any of the configured Replication Server. */ public class IsolationTest extends ReplicationTestCase { private static final String BASEDN_STRING = "dc=example,dc=com"; /** * Check that the server correctly accept or reject updates when * the replication is configured but could not connect to * any of the configured replication server. */ @SuppressWarnings("unchecked") @Test() public void noUpdateIsolationPolicyTest() throws Exception { ReplicationDomain domain = null; DN baseDn = DN.decode(BASEDN_STRING); SynchronizationProvider replicationPlugin = null; short serverId = 1; cleanDB(); try { // configure and start replication of dc=example,dc=com on the server // using a replication server that is not started replicationPlugin = new MultimasterReplication(); DirectoryServer.registerSynchronizationProvider(replicationPlugin); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int replServerPort = socket.getLocalPort(); socket.close(); SortedSet<String> replServers = new TreeSet<String>(); replServers.add("localhost:" + replServerPort); DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); domainConf.setHeartbeatInterval(100000); domain = MultimasterReplication.createNewDomain(domainConf); // check that the udates fail with the unwilling to perform error. InternalClientConnection conn = InternalClientConnection.getRootConnection(); ModifyOperation op = conn.processModify(baseDn, generatemods("description", "test")); // check that the update failed. assertEquals(ResultCode.UNWILLING_TO_PERFORM, op.getResultCode()); // now configure the domain to accept changes even though it is not // connectetd to any replication server. domainConf.setIsolationPolicy(IsolationPolicy.ACCEPT_ALL_UPDATES); domain.applyConfigurationChange(domainConf); // try a new modify operation on the base entry. op = conn.processModify(baseDn, generatemods("description", "test")); // chek that the operation was successful. // check that the update failed. assertEquals(ResultCode.SUCCESS, op.getResultCode()); } finally { if (domain != null) MultimasterReplication.deleteDomain(baseDn); if (replicationPlugin != null) DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); } } /** * Clean the database and replace with a single entry. * * @throws FileNotFoundException * @throws IOException * @throws Exception */ private void cleanDB() throws FileNotFoundException, IOException, Exception { String baseentryldif = "dn:" + BASEDN_STRING + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "dc: example\n" + "entryuuid: " + stringUID(1) + "\n"; // Initialization : // Load the database with a single entry : String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = buildRoot + File.separator + "build" + File.separator + "unit-tests" + File.separator + "package"+ File.separator + "addModDelDependencyTest"; OutputStream out = new FileOutputStream(new File(path)); out.write(baseentryldif.getBytes()); task("dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks\n" + "objectclass: top\n" + "objectclass: ds-task\n" + "objectclass: ds-task-import\n" + "ds-task-class-name: org.opends.server.tasks.ImportTask\n" + "ds-task-import-backend-id: userRoot\n" + "ds-task-import-ldif-file: " + path + "\n" + "ds-task-import-reject-file: " + path + "reject\n"); } /** * Builds and return a uuid from an integer. * This methods assume that unique integers are used and does not make any * unicity checks. It is only responsible for generating a uid with a * correct syntax. */ private String stringUID(int i) { return String.format("11111111-1111-1111-1111-%012x", i); } }