opends/resource/config/config.ldif
@@ -2439,6 +2439,17 @@ ds-cfg-filter: (&(objectClass=groupOfUniqueNames)(objectClass=ds-virtual-static-group)) ds-cfg-allow-retrieving-membership: false dn: cn=Last External Changelog Cookie,cn=Virtual Attributes,cn=config objectClass: ds-cfg-virtual-attribute objectClass: ds-cfg-user-defined-virtual-attribute objectClass: top cn: Last External Changelog Cookie ds-cfg-attribute-type: lastExternalChangelogCookie ds-cfg-enabled: true ds-cfg-java-class: org.opends.server.replication.common.LastCookieVirtualProvider ds-cfg-value: 1 ds-cfg-filter: (objectClass=ds-root-dse) dn: cn=Work Queue,cn=config objectClass: top objectClass: ds-cfg-work-queue opends/resource/schema/00-core.ldif
@@ -154,6 +154,8 @@ X-ORIGIN 'RFC 2256' ) attributeTypes: ( 2.5.4.50 NAME 'uniqueMember' EQUALITY uniqueMemberMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.34 X-ORIGIN 'RFC 4519' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.585 NAME 'lastExternalChangelogCookie' SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 2.5.4.51 NAME 'houseIdentifier' EQUALITY caseIgnoreMatch SUBSTR caseIgnoreSubstringsMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15{32768} X-ORIGIN 'RFC 4519' ) opends/src/server/org/opends/server/replication/common/ExternalChangeLogSession.java
@@ -49,4 +49,11 @@ */ public abstract void close() throws DirectoryException; /** * Returns the last (newest) cookie value. * @return the last cookie value. */ public abstract MultiDomainServerState getLastCookie(); } opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
New file @@ -0,0 +1,205 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.common; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; import org.opends.server.api.VirtualAttributeProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.replication.server.ExternalChangeLogSessionImpl; import org.opends.server.types.AttributeValue; import org.opends.server.types.AttributeValues; import org.opends.server.types.ByteString; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.Entry; import org.opends.server.types.InitializationException; import org.opends.server.types.ResultCode; import org.opends.server.types.VirtualAttributeRule; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; /** * This class implements a virtual attribute provider in the root-dse entry * that contains the last (newest) cookie (cross domain state) * available in the server. */ public class LastCookieVirtualProvider extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg> { // The current configuration for this virtual attribute provider. private UserDefinedVirtualAttributeCfg currentConfig; /** * Creates a new instance of this member virtual attribute provider. */ public LastCookieVirtualProvider() { super(); // All initialization should be performed in the // initializeVirtualAttributeProvider method. } /** * {@inheritDoc} */ @Override() public void initializeVirtualAttributeProvider( UserDefinedVirtualAttributeCfg configuration) throws ConfigException, InitializationException { this.currentConfig = configuration; configuration.addUserDefinedChangeListener(this); } /** * {@inheritDoc} */ @Override() public void finalizeVirtualAttributeProvider() { currentConfig.removeUserDefinedChangeListener(this); } /** * {@inheritDoc} */ @Override() public boolean isMultiValued() { if (currentConfig == null) { return true; } else { return (currentConfig.getValue().size() > 1); } } /** * {@inheritDoc} */ @Override() public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule) { Set<AttributeValue> values = new HashSet<AttributeValue>(); try { ECLWorkflowElement eclwe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); if (eclwe!=null) { ExternalChangeLogSessionImpl eclsession = new ExternalChangeLogSessionImpl(eclwe.getReplicationServer()); String lastCookie = eclsession.getLastCookie().toString(); AttributeValue value = AttributeValues.create( ByteString.valueOf(lastCookie), ByteString.valueOf(lastCookie)); values=Collections.singleton(value); } return values; } catch(Exception e) { return values; } } /** * {@inheritDoc} */ @Override() public boolean isSearchable(VirtualAttributeRule rule, SearchOperation searchOperation) { // We will not allow searches based only on user-defined virtual attributes. return false; } /** * {@inheritDoc} */ @Override() public void processSearch(VirtualAttributeRule rule, SearchOperation searchOperation) { searchOperation.setResultCode(ResultCode.UNWILLING_TO_PERFORM); return; } /** * {@inheritDoc} */ public boolean isConfigurationChangeAcceptable( UserDefinedVirtualAttributeCfg configuration, List<Message> unacceptableReasons) { // The new configuration should always be acceptable. return true; } /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationChange( UserDefinedVirtualAttributeCfg configuration) { // Just accept the new configuration as-is. currentConfig = configuration; return new ConfigChangeResult(ResultCode.SUCCESS, false); } } opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -107,6 +107,18 @@ } /** * Update the ServerState of the provided serviceId with the * provided server state. * * @param serviceId The provided serviceId. * @param serverState The provided serverState. */ public void update(String serviceId, ServerState serverState) { list.put(serviceId,serverState); } /** * Create an object from a string representation. * @param mdss The provided string representation of the state. */ opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1308,6 +1308,7 @@ } } } if (debugEnabled()) TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + "," + this + " getGeneralizedOldestChange() " + @@ -1352,6 +1353,7 @@ { // non blocking UpdateMsg newMsg = clDomCtxts[idomain].mh.getnextMessage(false); if (debugEnabled()) TRACER.debugInfo(this + " getNextElligibleMessage got the next changelogmsg " + " from " + clDomCtxts[idomain].mh.getServiceId() @@ -1443,6 +1445,8 @@ String s = "=> "; ReplicationServer rs = replicationServerDomain.getReplicationServer(); if (debugEnabled()) TRACER.debugInfo("ECLSH.computeNewCrossDomainElligibleCN() " + " periodic starts rs="+rs); @@ -1470,6 +1474,7 @@ } } if (debugEnabled()) TRACER.debugInfo("SH.computeNewCrossDomainElligibleCN() periodic " + " ends with " + " the following domainElligibleCN for each domain :" + s + @@ -1507,6 +1512,7 @@ // then the server is considered down and not considered for eligibility if (TimeThread.getTime()-storedCN.getTime()>2000) { if (debugEnabled()) TRACER.debugInfo( "For RSD." + rsd.getBaseDn() + " Server " + sid + " is not considered for eligibility ... potentially down"); @@ -1518,6 +1524,8 @@ elligibleCN = storedCN; } } if (debugEnabled()) TRACER.debugInfo( "For RSD." + rsd.getBaseDn() + " ElligibleCN()=" + elligibleCN); return elligibleCN; opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -26,10 +26,14 @@ */ package org.opends.server.replication.server; import java.util.Iterator; import org.opends.server.replication.common.ExternalChangeLogSession; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.protocol.ECLUpdateMsg; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.types.DirectoryException; import org.opends.server.util.ServerConstants; /** * This interface defines a session used to search the external changelog @@ -45,6 +49,17 @@ /** * Create a new external changelog session. * @param rs The replication server to which we will request the log. * @throws DirectoryException When an error occurs. */ public ExternalChangeLogSessionImpl(ReplicationServer rs) throws DirectoryException { this.rs = rs; } /** * Create a new external changelog session. * @param rs The replication server to which we will request the log. * @param startECLSessionMsg The start session message containing the * details of the search request on the ECL. * @throws DirectoryException When an error occurs. @@ -82,4 +97,28 @@ { handler.getDomain().stopServer(handler); } /** * Returns the last (newest) cookie value. * @return the last cookie value. */ public MultiDomainServerState getLastCookie() { MultiDomainServerState result = new MultiDomainServerState(); // Initialize start state for all running domains with empty state Iterator<ReplicationServerDomain> rsdk = this.rs.getCacheIterator(); if (rsdk != null) { while (rsdk.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdk.next(); if (rsd.getBaseDn().compareToIgnoreCase( ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0) continue; result.update(rsd.getBaseDn(), rsd.getCLElligibleState()); } } return result; } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2928,7 +2928,6 @@ public ServerState getStartState() { ServerState domainStartState = new ServerState(); Iterator<Short> it = this.getDbServerState().iterator(); for (DbHandler dbHandler : sourceDbHandlers.values()) { domainStartState.update(dbHandler.getFirstChange()); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ExternalChangeLogTest.java
@@ -94,6 +94,7 @@ import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ExternalChangeLogSessionImpl; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; @@ -171,31 +172,6 @@ replicationServerPort = socket.getLocalPort(); socket.close(); /* // Add the replication server to the configuration TestCaseUtils.dsconfig( "create-replication-server", "--provider-name", "Multimaster Synchronization", "--set", "replication-db-directory:" + "externalChangeLogTestConfigureDb", "--set", "replication-port:" + replicationServerPort, "--set", "replication-server-id:71"); // Retrieves the replicationServer object DirectoryServer.getSynchronizationProviders(); for (SynchronizationProvider<?> provider : DirectoryServer .getSynchronizationProviders()) { if (provider instanceof MultimasterReplication) { MultimasterReplication mmp = (MultimasterReplication) provider; ReplicationServerListener list = mmp.getReplicationServerListener(); if (list != null) { replicationServer = list.getReplicationServer(); if (replicationServer != null) { break; } } } } */ ReplServerFakeConfiguration conf1 = new ReplServerFakeConfiguration( replicationServerPort, "ExternalChangeLogTestDb", @@ -643,9 +619,9 @@ s2test2.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); cn = new ChangeNumber(time++, ts++, s2test2.getServerId()); ChangeNumber cn3 = new ChangeNumber(time++, ts++, s2test2.getServerId()); delMsg = new DeleteMsg("uid="+tn+"3," + TEST_ROOT_DN_STRING2, cn, tn+"uuid3"); new DeleteMsg("uid="+tn+"3," + TEST_ROOT_DN_STRING2, cn3, tn+"uuid3"); s2test2.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); @@ -764,9 +740,9 @@ debugInfo(tn, "STEP 3 - from cookie" + cookie); cn = new ChangeNumber(time++, ts++, s1test.getServerId()); ChangeNumber cn5 = new ChangeNumber(time++, ts++, s1test.getServerId()); delMsg = new DeleteMsg("uid="+tn+"5," + TEST_ROOT_DN_STRING, cn, tn+"uuid5"); new DeleteMsg("uid="+tn+"5," + TEST_ROOT_DN_STRING, cn5, tn+"uuid5"); s1test.publish(delMsg); sleep(500); @@ -876,7 +852,7 @@ 1000, true); sleep(500); // Test startState of the domain for the first cookie feature // Test startState ("first cookie") of the ECL time = TimeThread.getTime(); cn = new ChangeNumber(time++, ts++, s1test2.getServerId()); delMsg = @@ -888,14 +864,14 @@ new DeleteMsg("uid="+tn+"7," + TEST_ROOT_DN_STRING, cn, tn+"uuid7"); s2test.publish(delMsg); cn = new ChangeNumber(time++, ts++, s1test2.getServerId()); ChangeNumber cn8 = new ChangeNumber(time++, ts++, s1test2.getServerId()); delMsg = new DeleteMsg("uid="+tn+"8," + TEST_ROOT_DN_STRING2, cn, tn+"uuid8"); new DeleteMsg("uid="+tn+"8," + TEST_ROOT_DN_STRING2, cn8, tn+"uuid8"); s1test2.publish(delMsg); cn = new ChangeNumber(time++, ts++, s2test.getServerId()); ChangeNumber cn9 = new ChangeNumber(time++, ts++, s2test.getServerId()); delMsg = new DeleteMsg("uid="+tn+"9," + TEST_ROOT_DN_STRING, cn, tn+"uuid9"); new DeleteMsg("uid="+tn+"9," + TEST_ROOT_DN_STRING, cn9, tn+"uuid9"); s2test.publish(delMsg); sleep(500); @@ -911,6 +887,58 @@ assertTrue(startState.getMaxChangeNumber(s2test2.getServerId()).getSeqnum()==2); assertTrue(startState.getMaxChangeNumber(s1test2.getServerId()).getSeqnum()==6); // Test lastState ("last cookie") of the ECL // create an ECL sessionm and request lastCookie ExternalChangeLogSessionImpl session = new ExternalChangeLogSessionImpl(replicationServer); MultiDomainServerState expectedLastCookie = new MultiDomainServerState("o=test:"+cn5+" "+cn9+";o=test2:"+cn3+" "+cn8+";"); MultiDomainServerState lastCookie = session.getLastCookie(); assertTrue(expectedLastCookie.equalsTo(lastCookie), " ExpectedLastCookie=" + expectedLastCookie + " lastCookie=" + lastCookie); // LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>(); lastcookieattribute.add("lastExternalChangelogCookie"); searchOp = connection.processSearch( ByteString.valueOf(""), SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(objectclass=*)"), lastcookieattribute, null, null); assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); cookie = ""; entries = searchOp.getSearchEntries(); if (entries != null) { for (SearchResultEntry resultEntry : entries) { debugInfo(tn, "Result entry=\n" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); try { List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie"); cookie = l.get(0).iterator().next().toString(); } catch(NullPointerException e) {} } } assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)), " Expected last cookie attribute value:" + expectedLastCookie + " Read from server: " + cookie + " are equal :"); s1test.stop(); s1test2.stop(); s2test.stop();