opends/src/messages/messages/replication.properties
@@ -435,5 +435,7 @@ server for %s in local server id %s SEVERE_ERR_SERVER_BADLY_DISCONNECTED_181= %s has badly disconnected from this \ replication server %s NOTICE_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR_182=Error when loading a virtual \ NOTICE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR_182=Error when loading a virtual \ attribute for external change log: Attribute: %s , Error: %s NOTICE_ERR_UNABLE_TO_ENABLE_ECL_183=Error in %s when enabling the external \ change log: %s opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -157,6 +157,7 @@ import org.opends.server.types.operation.PreOperationModifyDNOperation; import org.opends.server.types.operation.PreOperationModifyOperation; import org.opends.server.types.operation.PreOperationOperation; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import org.opends.server.workflowelement.localbackend.*; /** @@ -896,7 +897,7 @@ } /** * Utility class to have get a sting iterator from an AtributeValue iterator. * Utility class to have get a string iterator from an AtributeValue iterator. * Assuming the attribute values are strings. */ public static class AttributeValueStringIterator implements Iterator<String> @@ -4181,6 +4182,31 @@ super.sessionInitiated( initStatus, replicationServerState,generationID, session); // Now that we are connected , we can enable ECL if : // 1/ RS must in the same JVM and created an ECL_WORKFLOW_ELEMENT // and 2/ this domain must NOT be private if (!getBackend().isPrivateBackend()) { try { ECLWorkflowElement wfe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement( ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); if (wfe!=null) wfe.getReplicationServer().enableECL(); } catch(DirectoryException de) { //FIXME:DirectoryException is raised by initializeECL => fix err msg Message message = NOTE_ERR_UNABLE_TO_ENABLE_ECL.get( "Replication Domain on" + baseDn, de.getMessage() + " " + de.getCause().getMessage()); logError(message); // and go on } } // Now for bad data set status if needed if (force_bad_data_set) { @@ -4297,6 +4323,7 @@ e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); } } /** opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -185,8 +185,11 @@ */ private static final DebugTracer TRACER = getTracer(); private String externalChangeLogWorkflowID = "External Changelog Workflow ID"; private static String externalChangeLogWorkflowID = "External Changelog Workflow ID"; ECLWorkflowElement eclwe; WorkflowImpl externalChangeLogWorkflowImpl = null; private static HashSet<Integer> localPorts = new HashSet<Integer>(); // used to synchronize the domain creation with the connect thread. @@ -501,15 +504,6 @@ socket.setTcpNoDelay(true); socket.connect(ServerAddr, 500); /* ServerHandler handler = new ServerHandler( replSessionSecurity.createClientSession(serverURL, socket, ReplSessionSecurity.HANDSHAKE_TIMEOUT), queueSize); handler.start(baseDn, serverId, this.serverURL, rcvWindow, sslEncryption, this); */ ReplicationServerHandler handler = new ReplicationServerHandler( replSessionSecurity.createClientSession(remoteServerURL, socket, @@ -578,9 +572,14 @@ serverId , this); listenThread.start(); // Initialize the External Changelog // FIXME: how is WF creation enabed/disabled in the RS ? initializeECL(); // Creates the ECL workflow elem so that DS (LDAPReplicationDomain) // can know me and really enableECL. if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID) != null) { // Already done . Nothing to do return; } eclwe = new ECLWorkflowElement(this); if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ @@ -615,41 +614,38 @@ } /** * Initializes the ECL access by creating a dedicated workflow element. * @throws DirectoryException * Enable the ECL access by creating a dedicated workflow element. * @throws DirectoryException when an error occurs. */ private void initializeECL() public void enableECL() throws DirectoryException { WorkflowImpl externalChangeLogWorkflow; if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID) !=null) if (externalChangeLogWorkflowImpl!=null) { // do nothing if ECL is already enabled return; ECLWorkflowElement eclwe = new ECLWorkflowElement(this); } // Create the workflow for the base DN and register the workflow with // the server. externalChangeLogWorkflow = new WorkflowImpl( externalChangeLogWorkflowImpl = new WorkflowImpl( externalChangeLogWorkflowID, DN.decode(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT), eclwe.getWorkflowElementID(), eclwe); externalChangeLogWorkflow.register(); externalChangeLogWorkflowImpl.register(); NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup(); defaultNetworkGroup.registerWorkflow(externalChangeLogWorkflow); defaultNetworkGroup.registerWorkflow(externalChangeLogWorkflowImpl); // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup? NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup(); adminNetworkGroup.registerWorkflow(externalChangeLogWorkflow); adminNetworkGroup.registerWorkflow(externalChangeLogWorkflowImpl); // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup? NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup(); internalNetworkGroup.registerWorkflow(externalChangeLogWorkflow); internalNetworkGroup.registerWorkflow(externalChangeLogWorkflowImpl); try { enableECLVirtualAttr("lastexternalchangelogcookie", new LastCookieVirtualProvider()); enableECLVirtualAttr("firstchangenumber", @@ -659,14 +655,10 @@ enableECLVirtualAttr("changelog", new ChangelogBaseDNVirtualAttributeProvider()); } catch (Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } private void enableECLVirtualAttr(String attrName, private static void enableECLVirtualAttr(String attrName, VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> provider) throws DirectoryException { Set<DN> baseDNs = new HashSet<DN>(0); Set<DN> groupDNs = new HashSet<DN>(0); @@ -694,13 +686,13 @@ baseDNs, groupDNs, filters, conflictBehavior); DirectoryServer.registerVirtualAttribute(rule); } catch (Exception e) { Message message = NOTE_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e.toString()); logError(message); NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e.toString()); throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -138,6 +138,7 @@ import org.opends.server.util.LDIFWriter; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -216,12 +217,32 @@ @Test(enabled=true) public void ECLReplicationServerTest() { // No RSDomain created yet => RS only case => ECL is not a supported ECLIsNotASupportedSuffix(); // Following test does not create RSDomain (only broker) but want to test // ECL .. so let's enable ECl manually // Now that we tested that ECl is not available try { ECLWorkflowElement wfe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement( ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); if (wfe!=null) wfe.getReplicationServer().enableECL(); } catch(DirectoryException de) { fail("Ending test " + " with exception:" + stackTraceToSingleLineString(de)); } // Test all types of ops. ECLAllOps(); // Do not clean the db for the next test // First and last should be ok whenever a request has been done or not // in compat mode. ECLCompatTestLimits(1,4);replicationServer.clearDb(); ECLCompatTestLimits(1,4,true);replicationServer.clearDb(); // Test with a mix of domains, a mix of DSes ECLTwoDomains(); replicationServer.clearDb(); @@ -259,7 +280,7 @@ // First and last should be ok whenever a request has been done or not // in compat mode. ECLCompatTestLimits(1,4);replicationServer.clearDb(); ECLCompatTestLimits(1,4, true);replicationServer.clearDb(); // Test remote API (ECL through replication protocol) with NON empty ECL ECLRemoteNonEmpty();replicationServer.clearDb(); @@ -319,7 +340,7 @@ ECLCompatReadFromTo(5,7); // Test first and last draft changenumber ECLCompatTestLimits(1,8); ECLCompatTestLimits(1,8, true); // Test first and last draft changenumber, a dd a new change, do not // search again the ECL, but search fro first and last @@ -329,7 +350,7 @@ ECLPurgeDraftCNDbAfterChangelogClear(); // Test first and last are updated ECLCompatTestLimits(0,0); ECLCompatTestLimits(0,0, true); // Persistent search in changesOnly mode ECLPsearch(true, true);replicationServer.clearDb(); @@ -346,6 +367,11 @@ } private void ECLIsNotASupportedSuffix() { ECLCompatTestLimits(0,0, false); } //======================================================= // Objectives // - Test that everything id ok with no changes @@ -1182,6 +1208,7 @@ try { // Root DSE InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf(""), @@ -3263,7 +3290,8 @@ debugInfo(tn, "Ending test with success"); } private void ECLCompatTestLimits(int expectedFirst, int expectedLast) private void ECLCompatTestLimits(int expectedFirst, int expectedLast, boolean eclEnabled) { String tn = "ECLCompatTestLimits"; debugInfo(tn, "Starting test\n\n"); @@ -3302,6 +3330,8 @@ i++; debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); if (eclEnabled) { checkValue(resultEntry,"firstchangenumber", String.valueOf(expectedFirst)); checkValue(resultEntry,"lastchangenumber", @@ -3309,6 +3339,19 @@ checkValue(resultEntry,"changelog", String.valueOf("cn=changelog")); } else { assertEquals(getAttributeValue(resultEntry, "firstchangenumber"), null); assertEquals(getAttributeValue(resultEntry, "lastchangenumber"), null); assertEquals(getAttributeValue(resultEntry, "changelog"), null); assertEquals(getAttributeValue(resultEntry, "lastExternalChangelogCookie"), null); } } } } catch(Exception e) @@ -3326,7 +3369,7 @@ debugInfo(tn, "Starting test\n\n"); try { ECLCompatTestLimits(expectedFirst, expectedLast); ECLCompatTestLimits(expectedFirst, expectedLast, true); // Creates broker on o=test ReplicationBroker server01 = openReplicationSession( @@ -3346,7 +3389,7 @@ sleep(500); server01.stop(); ECLCompatTestLimits(expectedFirst, expectedLast+1); ECLCompatTestLimits(expectedFirst, expectedLast+1, true); } catch(Exception e)