opends/resource/config/config.ldif
@@ -2468,6 +2468,28 @@ ds-cfg-value: 1 ds-cfg-filter: (objectClass=ds-root-dse) dn: cn=External Changelog First Draft Change Number,cn=Virtual Attributes,cn=config objectClass: ds-cfg-virtual-attribute objectClass: ds-cfg-user-defined-virtual-attribute objectClass: top cn: External Changelog First Draft Change Number ds-cfg-attribute-type: firstChangeNumber ds-cfg-enabled: true ds-cfg-java-class: org.opends.server.replication.common.FirstChangeNumberVirtualAttributeProvider ds-cfg-value: 0 ds-cfg-filter: (objectClass=ds-root-dse) dn: cn=External Changelog Last Draft Change Number,cn=Virtual Attributes,cn=config objectClass: ds-cfg-virtual-attribute objectClass: ds-cfg-user-defined-virtual-attribute objectClass: top cn: External Changelog Last Draft Change Number ds-cfg-attribute-type: lastChangeNumber ds-cfg-enabled: true ds-cfg-java-class: org.opends.server.replication.common.LastChangeNumberVirtualAttributeProvider ds-cfg-value: 0 ds-cfg-filter: (objectClass=ds-root-dse) dn: cn=Work Queue,cn=config objectClass: top objectClass: ds-cfg-work-queue opends/resource/schema/00-core.ldif
@@ -156,6 +156,10 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.34 X-ORIGIN 'RFC 4519' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.585 NAME 'lastExternalChangelogCookie' SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.593 NAME 'firstChangeNumber' SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.594 NAME 'lastChangeNumber' SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 2.5.4.51 NAME 'houseIdentifier' EQUALITY caseIgnoreMatch SUBSTR caseIgnoreSubstringsMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15{32768} X-ORIGIN 'RFC 4519' ) opends/resource/schema/02-config.ldif
@@ -2410,7 +2410,11 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 USAGE directoryOperation X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.595 NAME 'ds-cfg-changetime-heartbeat-interval' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.592 NAME 'ds-cfg-solve-conflicts' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 @@ -2976,7 +2980,8 @@ ds-cfg-referrals-url $ ds-cfg-fractional-exclude $ ds-cfg-fractional-include $ ds-cfg-solve-conflicts ) ds-cfg-solve-conflicts $ ds-cfg-changetime-heartbeat-interval) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.58 NAME 'ds-cfg-length-based-password-validator' opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
@@ -153,6 +153,30 @@ </ldap:attribute> </adm:profile> </adm:property> <adm:property name="changetime-heartbeat-interval" advanced="true"> <adm:synopsis> Specifies the heart-beat interval that the Directory Server will use when sending its local change time to the Replication Server. </adm:synopsis> <adm:description> The Directory Server sends a regular heart-beat to the Replication within the specified interval. The heart-beat indicates the change time of the Directory Server to the Replication Server. </adm:description> <adm:default-behavior> <adm:defined> <adm:value>1000ms</adm:value> </adm:defined> </adm:default-behavior> <adm:syntax> <adm:duration base-unit="ms" lower-limit="0" /> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:name>ds-cfg-changetime-heartbeat-interval</ldap:name> </ldap:attribute> </adm:profile> </adm:property> <adm:property name="isolation-policy"> <adm:synopsis> Specifies the behavior of the Directory Server if a write opends/src/messages/messages/replication.properties
@@ -410,3 +410,6 @@ domain %s from server %s to all other servers of the topology is forbidden as \ the source server has some fractional configuration : only fractional servers \ in a replicated topology does not makes sense MILD_ERR_DRAFT_CHANGENUMBER_DATABASE_173=An error occurred when accessing the \ database of the draft change number : %s opends/src/server/org/opends/server/core/PersistentSearch.java
@@ -302,6 +302,9 @@ // Make sure that the entry matches the target filter. try { TRACER.debugInfo(this + " " + entry + " +filter=" + filter.matchesEntry(entry)); if (!filter.matchesEntry(entry)) { return; opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
New file @@ -0,0 +1,209 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.common; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; import org.opends.server.api.VirtualAttributeProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.AttributeValue; import org.opends.server.types.AttributeValues; import org.opends.server.types.ByteString; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.Entry; import org.opends.server.types.InitializationException; import org.opends.server.types.ResultCode; import org.opends.server.types.VirtualAttributeRule; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; /** * This class implements a virtual attribute provider that allows administrators * to define their own values that will be inserted into any entry that matches * the criteria defined in the virtual attribute rule. This can be used to * provide functionality like Class of Service (CoS) in the Sun Java System * Directory Server. */ public class FirstChangeNumberVirtualAttributeProvider extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg> { private static final DebugTracer TRACER = getTracer(); // The current configuration for this virtual attribute provider. private UserDefinedVirtualAttributeCfg currentConfig; /** * Creates a new instance of this member virtual attribute provider. */ public FirstChangeNumberVirtualAttributeProvider() { super(); // All initialization should be performed in the // initializeVirtualAttributeProvider method. } /** * {@inheritDoc} */ @Override() public void initializeVirtualAttributeProvider( UserDefinedVirtualAttributeCfg configuration) throws ConfigException, InitializationException { this.currentConfig = configuration; configuration.addUserDefinedChangeListener(this); } /** * {@inheritDoc} */ @Override() public void finalizeVirtualAttributeProvider() { currentConfig.removeUserDefinedChangeListener(this); } /** * {@inheritDoc} */ @Override() public boolean isMultiValued() { if (currentConfig == null) { return true; } else { return (currentConfig.getValue().size() > 1); } } /** * {@inheritDoc} */ @Override() public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule) { Set<AttributeValue> values = new HashSet<AttributeValue>(); String first="0"; try { ECLWorkflowElement eclwe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); if (eclwe!=null) { first = String.valueOf( eclwe.getReplicationServer().getFirstDraftChangeNumber()); } } catch(Exception e) { } AttributeValue value = AttributeValues.create( ByteString.valueOf(first), ByteString.valueOf(first)); values=Collections.singleton(value); return values; } /** * {@inheritDoc} */ @Override() public boolean isSearchable(VirtualAttributeRule rule, SearchOperation searchOperation) { // We will not allow searches based only on user-defined virtual attributes. return false; } /** * {@inheritDoc} */ @Override() public void processSearch(VirtualAttributeRule rule, SearchOperation searchOperation) { searchOperation.setResultCode(ResultCode.UNWILLING_TO_PERFORM); return; } /** * {@inheritDoc} */ public boolean isConfigurationChangeAcceptable( UserDefinedVirtualAttributeCfg configuration, List<Message> unacceptableReasons) { // The new configuration should always be acceptable. return true; } /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationChange( UserDefinedVirtualAttributeCfg configuration) { // Just accept the new configuration as-is. currentConfig = configuration; return new ConfigChangeResult(ResultCode.SUCCESS, false); } } opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
New file @@ -0,0 +1,209 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.common; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; import org.opends.server.api.VirtualAttributeProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.AttributeValue; import org.opends.server.types.AttributeValues; import org.opends.server.types.ByteString; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.Entry; import org.opends.server.types.InitializationException; import org.opends.server.types.ResultCode; import org.opends.server.types.VirtualAttributeRule; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; /** * This class implements a virtual attribute provider that allows administrators * to define their own values that will be inserted into any entry that matches * the criteria defined in the virtual attribute rule. This can be used to * provide functionality like Class of Service (CoS) in the Sun Java System * Directory Server. */ public class LastChangeNumberVirtualAttributeProvider extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg> { private static final DebugTracer TRACER = getTracer(); // The current configuration for this virtual attribute provider. private UserDefinedVirtualAttributeCfg currentConfig; /** * Creates a new instance of this member virtual attribute provider. */ public LastChangeNumberVirtualAttributeProvider() { super(); // All initialization should be performed in the // initializeVirtualAttributeProvider method. } /** * {@inheritDoc} */ @Override() public void initializeVirtualAttributeProvider( UserDefinedVirtualAttributeCfg configuration) throws ConfigException, InitializationException { this.currentConfig = configuration; configuration.addUserDefinedChangeListener(this); } /** * {@inheritDoc} */ @Override() public void finalizeVirtualAttributeProvider() { currentConfig.removeUserDefinedChangeListener(this); } /** * {@inheritDoc} */ @Override() public boolean isMultiValued() { if (currentConfig == null) { return true; } else { return (currentConfig.getValue().size() > 1); } } /** * {@inheritDoc} */ @Override() public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule) { Set<AttributeValue> values = new HashSet<AttributeValue>(); String last = "0"; try { ECLWorkflowElement eclwe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); if (eclwe!=null) { last = String.valueOf( eclwe.getReplicationServer().getLastDraftChangeNumber()); } } catch(Exception e) { } AttributeValue value = AttributeValues.create( ByteString.valueOf(last), ByteString.valueOf(last)); values=Collections.singleton(value); return values; } /** * {@inheritDoc} */ @Override() public boolean isSearchable(VirtualAttributeRule rule, SearchOperation searchOperation) { // We will not allow searches based only on user-defined virtual attributes. return false; } /** * {@inheritDoc} */ @Override() public void processSearch(VirtualAttributeRule rule, SearchOperation searchOperation) { searchOperation.setResultCode(ResultCode.UNWILLING_TO_PERFORM); return; } /** * {@inheritDoc} */ public boolean isConfigurationChangeAcceptable( UserDefinedVirtualAttributeCfg configuration, List<Message> unacceptableReasons) { // The new configuration should always be acceptable. return true; } /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationChange( UserDefinedVirtualAttributeCfg configuration) { // Just accept the new configuration as-is. currentConfig = configuration; return new ConfigChangeResult(ResultCode.SUCCESS, false); } } opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -26,9 +26,16 @@ */ package org.opends.server.replication.common; import java.util.HashMap; import java.util.Iterator; import java.util.TreeMap; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; /** * This object is used to store a list of ServerState object, one by @@ -106,7 +113,7 @@ */ public void update(String serviceId, ServerState serverState) { list.put(serviceId,serverState); list.put(serviceId,serverState.duplicate()); } /** @@ -208,11 +215,59 @@ { ServerState state = list.get(serviceId); ServerState coveredState = covered.list.get(serviceId); if ((coveredState == null) || (!state.cover(coveredState))) if ((state==null)||(coveredState == null) || (!state.cover(coveredState))) { return false; } } return true; } /** * Splits the provided generalizedServerState being a String with the * following syntax: "domain1:state1;domain2:state2;..." * to a hashmap of (domain DN, domain ServerState). * @param multidomainserverstate the provided state * @exception DirectoryException when an error occurs * @return the splited state. */ public static HashMap<String,ServerState> splitGenStateToServerStates( String multidomainserverstate) throws DirectoryException { HashMap<String,ServerState> startStates = new HashMap<String,ServerState>(); try { // Split the provided multidomainserverstate into domains String[] domains = multidomainserverstate.split(";"); for (String domain : domains) { // For each domain, split the changenumbers by server // and build a server state (SHOULD BE OPTIMIZED) ServerState serverStateByDomain = new ServerState(); String[] fields = domain.split(":"); String domainBaseDN = fields[0]; if (fields.length>1) { String strState = fields[1]; String[] strCN = strState.split(" "); for (String sr : strCN) { ChangeNumber fromChangeNumber = new ChangeNumber(sr); serverStateByDomain.update(fromChangeNumber); } } startStates.put(domainBaseDN, serverStateByDomain); } } catch(Exception e) { throw new DirectoryException( ResultCode.OPERATIONS_ERROR, Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: "), e); } return startStates; } } opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -222,7 +222,8 @@ { ChangeNumber change = list.get(key); Date date = new Date(change.getTime()); set.add(change.toString() + " " + date.toString()); set.add(change.toString() + " " + date.toString() + " " + change.getTime()); } } opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -530,7 +530,8 @@ saveGenerationId(generationId); } startPublishService(replicationServers, window, heartbeatInterval); startPublishService(replicationServers, window, heartbeatInterval, configuration.getChangetimeHeartbeatInterval()); /* * ChangeNumberGenerator is used to create new unique ChangeNumbers opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -315,22 +315,22 @@ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) { return "AddMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nassuredFlag: " + assuredFlag; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " assuredFlag: " + assuredFlag; } if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2) { return "AddMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nassuredFlag: " + assuredFlag + "\nassuredMode: " + assuredMode + "\nsafeDataLevel: " + safeDataLevel; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " assuredFlag: " + assuredFlag + " assuredMode: " + assuredMode + " safeDataLevel: " + safeDataLevel; } return "!!! Unknown version: " + protocolVersion + "!!!"; } opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
New file @@ -0,0 +1,148 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; import org.opends.server.replication.common.ChangeNumber; /** * Class that define messages sent by a replication domain (DS) * to the replication server to let the RS know the DS current * change time. */ public class ChangeTimeHeartbeatMsg extends ReplicationMsg { /** * The ChangeNumber containing the change time. */ private final ChangeNumber changeNumber; /** * Constructor of a Change Time Heartbeat message. */ public ChangeTimeHeartbeatMsg() { this.changeNumber = new ChangeNumber((long)0,0,(short)0); } /** * Constructor of a Change Time Heartbeat message providing * the change time value in a change number. * @param cn The provided change number. */ public ChangeTimeHeartbeatMsg(ChangeNumber cn) { this.changeNumber = cn; } /** * Get a change number with the transmitted change time. * @return the ChangeNumber */ public ChangeNumber getChangeNumber() { return changeNumber; } /** * Encode a change time message. * @return The encoded message. * @throws UnsupportedEncodingException When an error occurs. */ public byte[] encode() throws UnsupportedEncodingException { byte[] changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); int length = changeNumberByte.length; byte[] encodedMsg = new byte[length]; /* Put the ChangeNumber */ addByteArray(changeNumberByte, encodedMsg, 0); return encodedMsg; } /** * Creates a message from a provided byte array. * @param in The provided byte array. * @throws DataFormatException When an error occurs. */ public ChangeTimeHeartbeatMsg(byte[] in) throws DataFormatException { try { /* Read the changeNumber */ /* First byte is the type */ if (in[0] != MSG_TYPE_CT_HEARTBEAT) { throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg"); } int pos = 1; int length = getNextLength(in, pos); String changenumberStr = new String(in, pos, length, "UTF-8"); changeNumber = new ChangeNumber(changenumberStr); } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } catch (IllegalArgumentException e) { throw new DataFormatException(e.getMessage()); } } /** * Get a byte array from the message. * @return The byte array containing the PDU of the message. * @throws UnsupportedEncodingException When an error occurs. */ public byte[] getBytes() throws UnsupportedEncodingException { try { ByteArrayOutputStream oStream = new ByteArrayOutputStream(); /* Put the type of the operation */ oStream.write(MSG_TYPE_CT_HEARTBEAT); /* Put the ChangeNumber */ byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8"); oStream.write(changeNumberByte); oStream.write(0); return oStream.toByteArray(); } catch (IOException e) { // never happens return null; } } } opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -125,22 +125,22 @@ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) { return "DeleteMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nassuredFlag: " + assuredFlag; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " assuredFlag: " + assuredFlag; } if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2) { return "DeleteMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nassuredFlag: " + assuredFlag + "\nassuredMode: " + assuredMode + "\nsafeDataLevel: " + safeDataLevel; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " assuredFlag: " + assuredFlag + " assuredMode: " + assuredMode + " safeDataLevel: " + safeDataLevel; } return "!!! Unknown version: " + protocolVersion + "!!!"; } opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -45,18 +45,23 @@ // The value of the cookie updated with the current change private MultiDomainServerState cookie; // The changenumber as specified by draft-good-ldap-changelog. private int draftChangeNumber; /** * Creates a new message. * @param update The provided update. * @param cookie The provided cookie value * @param serviceId The provided serviceId. * @param draftChangeNumber The provided draft change number. */ public ECLUpdateMsg(LDAPUpdateMsg update, MultiDomainServerState cookie, String serviceId) String serviceId, int draftChangeNumber) { this.cookie = cookie; this.serviceId = serviceId; this.updateMsg = update; this.draftChangeNumber = draftChangeNumber; } /** @@ -93,6 +98,12 @@ this.serviceId = new String(in, pos, length, "UTF-8"); pos += length + 1; // Decode the draft changeNumber length = getNextLength(in, pos); this.draftChangeNumber = Integer.valueOf( new String(in, pos, length, "UTF-8")); pos += length + 1; // Decode the msg /* Read the mods : all the remaining bytes but the terminating 0 */ length = in.length - pos - 1; @@ -152,9 +163,10 @@ public String toString() { return "ECLUpdateMsg:[" + "updateMsg: " + updateMsg + "cookie: " + cookie + "serviceId: " + serviceId + "]"; " updateMsg: " + updateMsg + " cookie: " + cookie + " draftChangeNumber: " + draftChangeNumber + " serviceId: " + serviceId + "]"; } /** @@ -165,10 +177,13 @@ { byte[] byteCookie = String.valueOf(cookie).getBytes("UTF-8"); byte[] byteServiceId = String.valueOf(serviceId).getBytes("UTF-8"); byte[] byteDraftChangeNumber = Integer.toString(draftChangeNumber).getBytes("UTF-8"); byte[] byteUpdateMsg = updateMsg.getBytes(); int length = 1 + byteCookie.length + 1 + byteServiceId.length + 1 + byteDraftChangeNumber.length + 1 + byteUpdateMsg.length + 1; byte[] resultByteArray = new byte[length]; @@ -183,9 +198,31 @@ // Encode serviceid pos = addByteArray(byteServiceId, resultByteArray, pos); /* Put the draftChangeNumber */ pos = addByteArray(byteDraftChangeNumber, resultByteArray, pos); // Encode msg pos = addByteArray(byteUpdateMsg, resultByteArray, pos); return resultByteArray; } /** * Setter for the draftChangeNumber of this change. * @param draftChangeNumber the provided draftChangeNumber for this change. */ public void setDraftChangeNumber(int draftChangeNumber) { this.draftChangeNumber = draftChangeNumber; } /** * Getter for the draftChangeNumber of this change. * @return the draftChangeNumber of this change. */ public int getDraftChangeNumber() { return this.draftChangeNumber; } } opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -74,8 +74,8 @@ * Create a heartbeat monitor thread. * @param threadName The name of the heartbeat thread. * @param session The session on which heartbeats are to be monitored. * @param heartbeatInterval The expected interval between heartbeats in * milliseconds. * @param heartbeatInterval The expected interval between heartbeats received * (in milliseconds). */ public HeartbeatMonitor(String threadName, ProtocolSession session, long heartbeatInterval) @@ -93,7 +93,6 @@ shutdown = true; } /** * {@inheritDoc} */ opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -70,9 +70,12 @@ static final byte MSG_TYPE_START_SESSION = 27; static final byte MSG_TYPE_CHANGE_STATUS = 28; static final byte MSG_TYPE_GENERIC_UPDATE = 29; // Protocol version : 3 static final byte MSG_TYPE_START_ECL = 30; static final byte MSG_TYPE_START_ECL_SESSION = 31; static final byte MSG_TYPE_ECL_UPDATE = 32; static final byte MSG_TYPE_CT_HEARTBEAT = 33; // Adding a new type of message here probably requires to // change accordingly generateMsg method below @@ -232,6 +235,9 @@ case MSG_TYPE_ECL_UPDATE: msg = new ECLUpdateMsg(buffer); break; case MSG_TYPE_CT_HEARTBEAT: msg = new ChangeTimeHeartbeatMsg(buffer); break; default: throw new DataFormatException("received message with unknown type"); } opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -101,7 +101,7 @@ private ChangeNumber changeNumber; // Specifies whether the search is persistent and changesOnly private short isPersistent; private short isPersistent = NON_PERSISTENT; // A string helping debuging and tracing the client operation related when // processing, on the RS side, a request on the ECL. @@ -203,7 +203,14 @@ */ public StartECLSessionMsg() { eclRequestType = REQUEST_TYPE_FROM_COOKIE; crossDomainServerState = ""; firstDraftChangeNumber = -1; lastDraftChangeNumber = -1; changeNumber = new ChangeNumber((short)0,0,(short)0); isPersistent = NON_PERSISTENT; operationId = "-1"; excludedServiceIDs = new ArrayList<String>(); } /** opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -28,6 +28,7 @@ import org.opends.messages.MessageBuilder; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -46,6 +47,7 @@ import org.opends.server.types.InitializationException; import org.opends.server.util.TimeThread; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; @@ -121,6 +123,7 @@ * */ private long trimage; private static final DebugTracer TRACER = getTracer(); /** * Creates a new dbHandler associated to a given LDAP server. @@ -291,13 +294,14 @@ ChangeNumber recentChangeNumber = null; if (changeNumber == null) { flush(); } synchronized (msgQueue) { try { UpdateMsg msg = msgQueue.getFirst(); UpdateMsg msg = msgQueue.getLast(); recentChangeNumber = msg.getChangeNumber(); } catch (NoSuchElementException e) @@ -654,4 +658,14 @@ lastChange = db.readLastChange(); } } /** * Getter fot the serverID of the server for which this database is managed. * * @return the serverId. */ public short getServerId() { return this.serverId; } } opends/src/server/org/opends/server/replication/server/DraftCNDB.java
New file @@ -0,0 +1,742 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.UnsupportedEncodingException; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.opends.messages.MessageBuilder; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.DeadlockException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Transaction; /** * This class implements the interface between the underlying database * and the dbHandler class. * This is the only class that should have code using the BDB interfaces. */ public class DraftCNDB { private static final DebugTracer TRACER = getTracer(); private Database db = null; private ReplicationDbEnv dbenv = null; private ReplicationServer replicationServer; private DN baseDn; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; // The lock used to provide exclusive access to the thread that // close the db (shutdown or clear). private ReentrantReadWriteLock dbCloseLock; /** * Creates a new database or open existing database that will be used * to store and retrieve changes from an LDAP server. * @param replicationServer The ReplicationServer that needs to be shutdown. * @param dbenv The Db environment to use to create the db. * @throws DatabaseException If a database problem happened. */ public DraftCNDB( ReplicationServer replicationServer, ReplicationDbEnv dbenv) throws DatabaseException { this.dbenv = dbenv; this.replicationServer = replicationServer; // Get or create the associated ReplicationServerDomain and Db. db = dbenv.getOrCreateDraftCNDb(); dbCloseLock = new ReentrantReadWriteLock(true); } /** * Add an entry to the database. * @param draftCN the provided draftCN. * * @param value the provided value to be stored associated * with this draftCN. * @param domainBaseDN the provided domainBaseDn to be stored associated * with this draftCN. * @param changeNumber the provided replication change number to be * stored associated with this draftCN. */ public void addEntry(int draftCN, String value, String domainBaseDN, ChangeNumber changeNumber) { Transaction txn = null; try { int tries = 0; boolean done = false; // The database can return a Deadlock Exception if several threads are // accessing the database at the same time. This Exception is a // transient state, when it happens the transaction is aborted and // the operation is attempted again up to DEADLOCK_RETRIES times. while ((tries++ < DEADLOCK_RETRIES) && (!done)) { dbCloseLock.readLock().lock(); try { txn = dbenv.beginTransaction(); DatabaseEntry key = new ReplicationDraftCNKey(draftCN); DatabaseEntry data = new DraftCNData(draftCN, value, domainBaseDN, changeNumber); db.put(txn, key, data); txn.commitWriteNoSync(); txn = null; done = true; } catch (DeadlockException e) { if (txn != null) txn.abort(); txn = null; } finally { dbCloseLock.readLock().unlock(); } } if (!done) { // Could not write to the DB after DEADLOCK_RETRIES tries. // This ReplicationServer is not reliable and will be shutdown. MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); logError(mb.toMessage()); if (txn != null) { txn.abort(); } replicationServer.shutdown(); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } } /** * Shutdown the database. */ public void shutdown() { try { dbCloseLock.writeLock().lock(); try { db.close(); } finally { dbCloseLock.writeLock().unlock(); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString())); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); } } /** * Create a cursor that can be used to search or iterate on this DB. * * @param draftCN The draftCN from which the cursor must start. * @throws DatabaseException If a database error prevented the cursor * creation. * @throws Exception if the ReplServerDBCursor creation failed. * @return The ReplServerDBCursor. */ public DraftCNDBCursor openReadCursor(int draftCN) throws DatabaseException, Exception { return new DraftCNDBCursor(draftCN); } /** * Create a cursor that can be used to delete some record from this * ReplicationServer database. * * @throws DatabaseException If a database error prevented the cursor * creation. * @throws Exception if the ReplServerDBCursor creation failed. * * @return The ReplServerDBCursor. */ public DraftCNDBCursor openDeleteCursor() throws DatabaseException, Exception { return new DraftCNDBCursor(); } private void closeLockedCursor(Cursor cursor) throws DatabaseException { try { if (cursor != null) cursor.close(); } finally { dbCloseLock.readLock().unlock(); } } /** * Read the first Change from the database, 0 when none. * @return the first ChangeNumber. */ public int readFirstSeqnum() { Cursor cursor = null; String str = null; try { dbCloseLock.readLock().lock(); cursor = db.openCursor(null, null); } catch (DatabaseException e1) { dbCloseLock.readLock().unlock(); return 0; } try { try { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry entry = new DatabaseEntry(); OperationStatus status = cursor.getFirst(key, entry, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return 0; } try { str = new String(key.getData(), "UTF-8"); } catch (UnsupportedEncodingException e) { // never happens } int sn = new Integer(str); return sn; } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { /* database is faulty */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); return 0; } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); return 0; } } /** * Return the record count. * @return the record count. */ public long count() { try { return db.count(); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } return 0L; } /** * Read the last draftCN from the database. * @return the last draftCN. */ public int readLastDraftCN() { Cursor cursor = null; String str = null; try { dbCloseLock.readLock().lock(); try { cursor = db.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry entry = new DatabaseEntry(); OperationStatus status = cursor.getLast(key, entry, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return 0; } try { str = new String(key.getData(), "UTF-8"); } catch (UnsupportedEncodingException e) { // never happens } int sn = new Integer(str); return sn; } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); return 0; } catch (Exception e) { replicationServer.shutdown(); return 0; } } /** * {@inheritDoc} */ @Override public String toString() { return "DraftCNDB:" + baseDn.toString(); } /** * This Class implements a cursor that can be used to browse the database. */ public class DraftCNDBCursor { private Cursor cursor = null; // The transaction that will protect the actions done with the cursor // Will be let null for a read cursor // Will be set non null for a write cursor private Transaction txn = null; DatabaseEntry key = new DatabaseEntry(); DatabaseEntry entry = new DatabaseEntry(); /** * Creates a cursor that can be used for browsing the db. * * @param startingDraftCN the draftCN from which the cursor must * start. * @throws Exception when the startingDraftCN does not exist. */ private DraftCNDBCursor(int startingDraftCN) throws Exception { try { // Take the lock. From now on, whatever error that happen in the life // of this cursor should end by unlocking that lock. We must also // unlock it when throwing an exception. dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); if (startingDraftCN >= 0) { key = new ReplicationDraftCNKey(startingDraftCN); entry = new DatabaseEntry(); if (cursor.getSearchKey(key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not move the cursor to the expected startingChangeNumber if (cursor.getSearchKeyRange(key, entry, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not even move the cursor closed to it => failure throw new Exception("ChangeLog Draft Change Number " + startingDraftCN + " is not available"); } else { // We can move close to the startingChangeNumber. // Let's create a cursor from that point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { closeLockedCursor(cursor); dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); } } } else { // success : key has the right value } } } catch (Exception e) { // Unlocking is required before throwing any exception closeLockedCursor(cursor); throw (e); } } private DraftCNDBCursor() throws DatabaseException { try { // We'll go on only if no close or no clear is running dbCloseLock.readLock().lock(); // Create the transaction that will protect whatever done with this // write cursor. txn = dbenv.beginTransaction(); cursor = db.openCursor(txn, null); } catch(DatabaseException e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); if (txn != null) { try { txn.abort(); } catch (DatabaseException dbe) {} } closeLockedCursor(cursor); throw (e); } } /** * Close the ReplicationServer Cursor. */ public void close() { try { closeLockedCursor(cursor); cursor = null; } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } if (txn != null) { try { txn.commit(); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } } } /** * Abort the Cursor after a Deadlock Exception. * This method catch and ignore the DeadlockException because * this must be done when aborting a cursor after a DeadlockException * (per the Cursor documentation). * This should not be used in any other case. */ public void abort() { if (cursor == null) return; try { closeLockedCursor(cursor); cursor = null; } catch (DeadlockException e1) { // The DB documentation states that a DeadlockException // on the close method of a cursor that is aborting should // be ignored. } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } if (txn != null) { try { txn.abort(); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } } } /** * Getter for the value field of the current cursor. * @return The current value field. * @throws DatabaseException When an error happens. */ public String currentValue() throws DatabaseException { try { OperationStatus status = cursor.getCurrent(key, entry, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return null; } DraftCNData seqnumData = new DraftCNData(entry.getData()); return seqnumData.getValue(); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } return null; } /** * Getter for the serviceID field of the current cursor. * @return The current serviceID. * @throws DatabaseException When an error happens. */ public String currentServiceID() throws DatabaseException { try { OperationStatus status = cursor.getCurrent(key, entry, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return null; } DraftCNData seqnumData = new DraftCNData(entry.getData()); return seqnumData.getServiceID(); } catch(Exception e) { } return null; } /** * Returns the replication changeNumber associated with the current key. * @return the replication changeNumber * @throws DatabaseException when a problem occurs. */ public ChangeNumber currentChangeNumber() throws DatabaseException { try { OperationStatus status = cursor.getCurrent(key, entry, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return null; } DraftCNData seqnumData = new DraftCNData(entry.getData()); return seqnumData.getChangeNumber(); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } return null; } /** * a. * @return a. * @throws DatabaseException a. */ public boolean next() throws DatabaseException { OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return false; } return true; } /** * Delete the record at the current cursor position. * * @throws DatabaseException In case of database problem. */ public void delete() throws DatabaseException { cursor.delete(); } } /** * Clears this change DB from the changes it contains. * * @throws Exception Throws an exception it occurs. * @throws DatabaseException Throws a DatabaseException when it occurs. */ public void clear() throws Exception, DatabaseException { // The coming users will be blocked until the clear is done dbCloseLock.writeLock().lock(); try { String dbName = db.getDatabaseName(); // Closing is requested by the Berkeley DB before truncate db.close(); // Clears the changes dbenv.clearDb(dbName); // RE-create the db db = dbenv.getOrCreateDraftCNDb(); } catch(Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(), e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } finally { // Relax the waiting users dbCloseLock.writeLock().unlock(); } } } opends/src/server/org/opends/server/replication/server/DraftCNData.java
New file @@ -0,0 +1,173 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import java.io.UnsupportedEncodingException; import org.opends.messages.Message; import org.opends.server.replication.common.ChangeNumber; import com.sleepycat.je.DatabaseEntry; /** * SuperClass of DatabaseEntry used for data stored in the DraftCNDB. */ public class DraftCNData extends DatabaseEntry { private static final String FIELD_SEPARATOR = "!"; String value; String serviceID; ChangeNumber changeNumber; /** * Creates a record to be stored in the DraftCNDB. * @param draftCN The DraftCN key. * @param value The value (cookie). * @param serviceID The serviceID (domain DN). * @param changeNumber The replication change number. * * @throws UnsupportedEncodingException When the encoding of the message * failed because the UTF-8 encoding is not supported. */ public DraftCNData(int draftCN, String value, String serviceID, ChangeNumber changeNumber) throws UnsupportedEncodingException { String record = value + FIELD_SEPARATOR + serviceID + FIELD_SEPARATOR + changeNumber; byte[] byteValue; try { byteValue = record.getBytes("UTF-8"); this.setData(byteValue); } catch (UnsupportedEncodingException e) { // can't happen return; } } /** * Creates a record to be stored in the DraftCNDB from the provided byte[]. * @param data the provided byte[]. * @throws Exception a. */ public DraftCNData(byte[] data) throws Exception { decodeData(data); } /** * Decode a record into fields. * @param data the provided byte array. * @throws Exception when a problem occurs. */ public void decodeData(byte[] data) throws Exception { try { String stringData = new String(data, "UTF-8"); String[] str = stringData.split(FIELD_SEPARATOR, 3); value = str[0]; serviceID = str[1]; changeNumber = new ChangeNumber(str[2]); } catch (UnsupportedEncodingException e) { // should never happens // TODO: i18n throw new ReplicationDBException(Message.raw("need UTF-8 support")); } } /** * Getter for the value. * @return the value. * @throws Exception when a problem occurs. */ public String getValue() throws Exception { if (value == null) this.decodeData(this.getData()); return this.value; } /** * Getter for the service ID. * @return The serviceID.. * @throws Exception when a problem occurs. */ public String getServiceID() throws Exception { if (value == null) this.decodeData(this.getData()); return this.serviceID; } /** * Getter for the replication change number. * @return the replication change number. * @throws Exception when a problem occurs. */ public ChangeNumber getChangeNumber() throws Exception { if (value == null) this.decodeData(this.getData()); return this.changeNumber; } /** * Provide a string representation of these data. * @return the string representation of these data. */ public String toString() { StringBuilder buffer = new StringBuilder(); toString(buffer); return buffer.toString(); } /** * Dump a string representation of these data into the provided buffer. * @param buffer the provided buffer. */ public void toString(StringBuilder buffer) { buffer.append("DraftCNData : [value=" + value); buffer.append("] [serviceID=" + serviceID); buffer.append("] [changeNumber=" + changeNumber + "]"); } } opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
New file @@ -0,0 +1,548 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.util.ArrayList; import java.util.concurrent.locks.ReentrantLock; import org.opends.messages.MessageBuilder; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.server.DraftCNDB.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.DeadlockException; /** * This class is used for managing the replicationServer database for each * server in the topology. * It is responsible for efficiently saving the updates that is received from * each master server into stable storage. * This class is also able to generate a ReplicationIterator that can be * used to read all changes from a given ChangeNUmber. * * This class publish some monitoring information below cn=monitor. * */ public class DraftCNDbHandler implements Runnable { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); // A dedicated thread loops trim(). // trim() : deletes from the DB a number of changes that are older than a // certain date. // static int NO_KEY = 0; private DraftCNDB db; private int firstkey = NO_KEY; private int lastkey = NO_KEY; private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private boolean shutdown = false; private boolean trimDone = false; private DirectoryThread thread = null; private final Object flushLock = new Object(); private ReplicationServer replicationServer; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; /** * * The trim age in milliseconds. Changes record in the change DB that * are older than this age are removed. * */ private long trimage; /** * Creates a new dbHandler associated to a given LDAP server. * * @param replicationServer The ReplicationServer that creates this dbHandler. * @param dbenv the Database Env to use to create the ReplicationServer DB. * server for this domain. * @throws DatabaseException If a database problem happened */ public DraftCNDbHandler(ReplicationServer replicationServer, ReplicationDbEnv dbenv) throws DatabaseException { this.replicationServer = replicationServer; this.trimage = replicationServer.getTrimage(); // DB initialization db = new DraftCNDB(replicationServer, dbenv); firstkey = db.readFirstSeqnum(); lastkey = db.readLastDraftCN(); // Triming thread thread = new DirectoryThread(this, "Replication DraftCN db "); thread.start(); // Monitoring registration DirectoryServer.deregisterMonitorProvider( dbMonitor.getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(dbMonitor); } /** * Add an update to the list of messages that must be saved to the db * managed by this db handler. * This method is blocking if the size of the list of message is larger * than its maximum. * @param key The key for this record in the db. * @param value The associated value. * @param serviceID The associated serviceID. * @param cn The associated replication change number. * */ public synchronized void add(int key, String value, String serviceID, ChangeNumber cn) { db.addEntry(key, value, serviceID, cn); if (debugEnabled()) TRACER.debugInfo("In DraftCNDbhandler.add, added: " + " key=" + key + " value=" + value + " serviceID=" + serviceID + " cn=" + cn); } /** * Get the firstChange. * @return Returns the firstChange. */ public int getFirstKey() { return db.readFirstSeqnum(); } /** * Get the lastChange. * @return Returns the lastChange. */ public int getLastKey() { return db.readLastDraftCN(); } /** * Get the number of changes. * @return Returns the number of changes. */ public long count() { return db.count(); } /** * Get a read cursor on the database from a provided key. * The cursor MUST be released after use. * @param key The provided key. * @return the new cursor. */ public DraftCNDBCursor getReadCursor(int key) { try { return db.openReadCursor(key); } catch(Exception e) { return null; } } /** * Release a provided read cursor. * @param cursor The provided read cursor. */ public void releaseReadCursor(DraftCNDBCursor cursor) { try { cursor.close(); } catch(Exception e) { } } /** * Generate a new ReplicationIterator that allows to browse the db * managed by this dbHandler and starting at the position defined * by a given changeNumber. * * @param startDraftCN The position where the iterator must start. * * @return a new ReplicationIterator that allows to browse the db * managed by this dbHandler and starting at the position defined * by a given changeNumber. * * @throws DatabaseException if a database problem happened. * @throws Exception If there is no other change to push after change * with changeNumber number. */ public DraftCNDbIterator generateIterator(int startDraftCN) throws DatabaseException, Exception { DraftCNDbIterator it = new DraftCNDbIterator(db, startDraftCN); return it; } /** * Shutdown this dbHandler. */ public void shutdown() { if (shutdown == true) { return; } shutdown = true; synchronized (this) { this.notifyAll(); } synchronized (this) { while (trimDone == false) { try { this.wait(); } catch (Exception e) {} } } db.shutdown(); DirectoryServer.deregisterMonitorProvider( dbMonitor.getMonitorInstanceName()); } /** * Run method for this class. * Periodically Flushes the ReplicationServerDomain cache from memory to the * stable storage and trims the old updates. */ public void run() { while (shutdown == false) { try { trim(); synchronized (this) { try { this.wait(1000); } catch (InterruptedException e) { } } } catch (Exception end) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get()); mb.append(stackTraceToSingleLineString(end)); logError(mb.toMessage()); if (replicationServer != null) replicationServer.shutdown(); break; } } synchronized (this) { trimDone = true; this.notifyAll(); } } /** * Trim old changes from this database. * @throws DatabaseException In case of database problem. * @throws Exception In case of database problem. */ public void trim() throws DatabaseException, Exception { if (trimage == 0) return; if (this.count()==0) return; int tries = 0; boolean done = false; DraftCNDBCursor cursor = db.openDeleteCursor(); try { // In case of deadlock detection by the Database, this thread can // by aborted by a DeadlockException. This is a transient error and // the transaction should be attempted again. // We will try DEADLOCK_RETRIES times before failing. while ((tries++ < DEADLOCK_RETRIES) && (!done)) { // let's traverse the DraftCNDb if (!cursor.next()) break; // Fomr the draftCNDb change record, get the domain and changeNumber String serviceID = cursor.currentServiceID(); ChangeNumber cn = cursor.currentChangeNumber(); ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(serviceID, false); if (domain==null) { // the domain has been removed since the record was written in the // draftCNDb, thus it makes no sense to keep the record in the // draftCNDb. cursor.delete(); } else { // let's get the eligible part of the domain ServerState startSS = domain.getStartState(); ServerState endSS = domain.getEligibleState( replicationServer.getEligibleCN()); ChangeNumber fcn = startSS.getMaxChangeNumber(cn.getServerId()); ChangeNumber lcn = endSS.getMaxChangeNumber(cn.getServerId()); // if the draftCNDb change record, is out of the eligible part // of the domain, then it can be removed. if (cn.older(fcn)||cn.newer(lcn)) { cursor.delete(); } } } cursor.close(); done = true; } catch (DeadlockException e) { cursor.abort(); if (tries == DEADLOCK_RETRIES) { // could not handle the Deadlock after DEADLOCK_RETRIES tries. // shutdown the ReplicationServer. shutdown = true; throw (e); } } catch (DatabaseException e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.abort(); throw (e); } } /** * This internal class is used to implement the Monitoring capabilities * of the dbHandler. */ private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> { private DbMonitorProvider() { super("ReplicationServer DraftCN Database"); } /** * {@inheritDoc} */ @Override public ArrayList<Attribute> getMonitorData() { ArrayList<Attribute> attributes = new ArrayList<Attribute>(); attributes.add(Attributes.create("first-draft-changenumber", Integer.toString(db.readFirstSeqnum()))); attributes.add(Attributes.create("last-draft-changenumber", Integer.toString(db.readLastDraftCN()))); attributes.add(Attributes.create("count", Long.toString(count()))); return attributes; } /** * {@inheritDoc} */ @Override public String getMonitorInstanceName() { return "ReplicationServer DraftCN database "; } /** * {@inheritDoc} */ @Override public long getUpdateInterval() { /* we don't wont to do polling on this monitor */ return 0; } /** * {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException,InitializationException { // Nothing to do for now } /** * {@inheritDoc} */ @Override public void updateMonitorData() { // As long as getUpdateInterval() returns 0, this will never get called } } /** * {@inheritDoc} */ @Override public String toString() { return("draftCNdb:" + " " + firstkey + " " + lastkey); } /** * Set the Purge delay for this db Handler. * @param delay The purge delay in Milliseconds. */ public void setPurgeDelay(long delay) { trimage = delay; } /** * Clear the changes from this DB (from both memory cache and DB storage). * @throws DatabaseException When an exception occurs while removing the * changes from the DB. * @throws Exception When an exception occurs while accessing a resource * from the DB. * */ public void clear() throws DatabaseException, Exception { db.clear(); firstkey = db.readFirstSeqnum(); lastkey = db.readLastDraftCN(); } private ReentrantLock lock = new ReentrantLock(); /** * Tests if the current thread has the lock on this object. * @return True if the current thread has the lock. */ public boolean hasLock() { return (lock.getHoldCount() > 0); } /** * Takes the lock on this object (blocking until lock can be acquired). * @throws java.lang.InterruptedException If interrupted. */ public void lock() throws InterruptedException { lock.lockInterruptibly(); } /** * Releases the lock on this object. */ public void release() { lock.unlock(); } /** * Get the value associated to a provided key. * @param key the provided key. * @return the associated value, null when none. */ public String getValue(int key) { String value = null; DraftCNDBCursor draftCNDBCursor = null; try { draftCNDBCursor = db.openReadCursor(key); value = draftCNDBCursor.currentValue(); } catch(Exception e) { if (debugEnabled()) TRACER.debugInfo("In DraftCNDbHandler.getGeneralizedState, read: " + " key=" + key + " genServerState returned is null" + " first=" + db.readFirstSeqnum() + " last=" + db.readLastDraftCN() + " count=" + db.count() + " exception" + e + " " + e.getMessage()); return null; } finally { if (draftCNDBCursor != null) draftCNDBCursor.close(); } return value; } } opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
New file @@ -0,0 +1,178 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.server.DraftCNDB.*; import org.opends.server.types.DebugLogLevel; import com.sleepycat.je.DatabaseException; /** * This class allows to iterate through the changes received from a given * LDAP Server Identifier. */ public class DraftCNDbIterator { private static final DebugTracer TRACER = getTracer(); private DraftCNDBCursor draftCNDbCursor = null; /** * Creates a new ReplicationIterator. * All created iterator must be released by the caller using the * releaseCursor() method. * * @param db The db where the iterator must be created. * @param startDraftCN The draft CN after which the iterator * must start. * @throws Exception If there is no other change to push after change * with changeNumber number. * @throws DatabaseException If a database problem happened. */ public DraftCNDbIterator(DraftCNDB db, int startDraftCN) throws Exception, DatabaseException { draftCNDbCursor = db.openReadCursor(startDraftCN); if (draftCNDbCursor == null) { throw new Exception("no new change"); } } /** * Getter for the value field (external changelog cookie). * @return The value field (external changelog cookie). */ public String getValue() { try { return this.draftCNDbCursor.currentValue(); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); return null; } } /** * Getter for the serviceID field. * @return The service ID. */ public String getServiceID() { try { return this.draftCNDbCursor.currentServiceID(); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); return null; } } /** * Getter for the replication change number field. * @return The replication change number field. */ public ChangeNumber getChangeNumber() { try { ChangeNumber cn = this.draftCNDbCursor.currentChangeNumber(); return cn; } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); return null; } } /** * Getter for the draftCN field. * @return The draft CN field. */ public int getDraftCN() { ReplicationDraftCNKey sk = (ReplicationDraftCNKey)this.draftCNDbCursor.key; int currentSeqnum = sk.getDraftCN(); return currentSeqnum; } /** * Skip to the next record of the database. * @return true if has next, false elsewhere * @throws Exception When exception raised. * @throws DatabaseException When database exception raised. */ public boolean next() throws Exception, DatabaseException { boolean hasNext = draftCNDbCursor.next(); if (hasNext) { return true; } else { return false; } } /** * Release the resources and locks used by this Iterator. * This method must be called when the iterator is no longer used. * Failure to do it could cause DB deadlock. */ public void releaseCursor() { synchronized (this) { if (draftCNDbCursor != null) { draftCNDbCursor.close(); draftCNDbCursor = null; } } } /** * Called by the Gc when the object is garbage collected * Release the cursor in case the iterator was badly used and releaseCursor * was never called. */ protected void finalize() { releaseCursor(); } } opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -27,12 +27,14 @@ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -41,7 +43,6 @@ import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.api.DirectoryThread; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; @@ -52,7 +53,7 @@ import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; import org.opends.server.util.TimeThread; import org.opends.server.util.ServerConstants; /** * This class defines a server handler, which handles all interaction with a @@ -61,128 +62,234 @@ public class ECLServerHandler extends ServerHandler { // Properties filled only if remote server is a RS private String serverAddressURL; // This is a string identifying the operation, provided by the client part // of the ECL, used to help interpretation of messages logged. String operationId; // Iterator on the draftCN database. private DraftCNDbIterator draftCNDbIter = null; boolean draftCompat = false; /** * CLDomainContext : contains the state properties for the search * currently being processed, by replication domain. * Specifies the last draft changer number (seqnum) requested. */ private class CLDomainContext public int lastDraftCN = 0; /** * Specifies whether the draft change number (seqnum) db has been read until * its end. */ public boolean isEndOfDraftCNReached = false; /** * Specifies whether the current search has been requested to be persistent * or not. */ public short isPersistent; /** * Specifies the current search phase : INIT or PERSISTENT. */ public int searchPhase = INIT_PHASE; /** * Specifies the cookie contained in the request, specifying where * to start serving the ECL. */ public String startCookie; /** * Specifies the value of the cookie before the change currently processed * is returned. It is updated with the change number of the change * currently processed (thus becoming the "current" cookie just * before the change is returned. */ public MultiDomainServerState previousCookie = new MultiDomainServerState(); /** * Specifies the excluded DNs (like cn=admin, ...). */ public ArrayList<String> excludedServiceIDs = new ArrayList<String>(); /** * Eligible changeNumber - only changes older or equal to eligibleCN * are published in the ECL. */ public ChangeNumber eligibleCN = null; /** * Provides a string representation of this object. * @return the string representation. */ public String dumpState() { ReplicationServerDomain rsd; // the repl server domain boolean active; // is the domain still active MessageHandler mh; // the message handler associated UpdateMsg nextMsg; UpdateMsg nonElligiblemsg; return new String( this.getClass().getCanonicalName() + "[" + "[draftCompat=" + draftCompat + "] [persistent=" + isPersistent + "] [lastDraftCN=" + lastDraftCN + "] [isEndOfDraftCNReached=" + isEndOfDraftCNReached + "] [searchPhase=" + searchPhase + "] [startCookie=" + startCookie + "] [previousCookie=" + previousCookie + "]]"); } /** * Class that manages the 'by domain' state variables for the search being * currently processed on the ECL. * For example : * if search on 'cn=changelog' is being processed when 2 replicated domains * dc=us and dc=europe are configured, then there will be 2 DomainContext * used, one for ds=us, and one for dc=europe. */ private class DomainContext { ReplicationServerDomain rsd; boolean active; // active when there are still changes // supposed eligible for the ECL. MessageHandler mh; // the message handler from which are read // the changes for this domain private UpdateMsg nextMsg; private UpdateMsg nextNonEligibleMsg; ServerState startState; ServerState currentState; ServerState stopState; /** * Add to the provider buffer a string representation of this object. * {@inheritDoc} */ public void toString(StringBuilder buffer, int i) @Override public String toString() { CLDomainContext xx = clDomCtxts[i]; StringBuilder buffer = new StringBuilder(); toString(buffer); return buffer.toString(); } /** * Provide a string representation of this object for debug purpose.. */ public void toString(StringBuilder buffer) { buffer.append( " clDomCtxts(" + i + ") [act=" + xx.active + " rsd=" + rsd + " nextMsg=" + nextMsg + "(" + "[ [active=" + active + "] [rsd=" + rsd + "] [nextMsg=" + nextMsg + "(" + (nextMsg != null? new Date(nextMsg.getChangeNumber().getTime()).toString():"") + ")" + " nextNonEligibleMsg=" + nonElligiblemsg + " startState=" + startState + " stopState= " + stopState + " currState= " + currentState + "]"); "] [nextNonEligibleMsg=" + nextNonEligibleMsg + "] [startState=" + startState + "] [stopState= " + stopState + "] [currentState= " + currentState + "]]"); } /** * Get the next message elligible regarding * the crossDomain elligible CN. Put it in the context table. * @param opid The operation id. */ private void getNextEligibleMessageForDomain(String opid) { if (debugEnabled()) TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + " getNextEligibleMessageForDomain(" + opid+ ") " + "ctxt=" + toString()); assert(nextMsg == null); try { // Before get a new message from the domain, evaluate in priority // a message that has not been published to the ECL because it was // not eligible if (nextNonEligibleMsg != null) { boolean hasBecomeEligible = (nextNonEligibleMsg.getChangeNumber().getTime() <= eligibleCN.getTime()); if (debugEnabled()) TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + " getNextEligibleMessageForDomain(" + opid+ ") " + " stored nonEligibleMsg " + nextNonEligibleMsg + " has now become eligible regarding " + " the eligibleCN ("+ eligibleCN + " ):" + hasBecomeEligible); if (hasBecomeEligible) { // it is now elligible nextMsg = nextNonEligibleMsg; nextNonEligibleMsg = null; } else { // the oldest is still not elligible - let's wait next } } else { // Here comes a new message !!! // non blocking UpdateMsg newMsg = mh.getnextMessage(false); if (debugEnabled()) TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + " getNextEligibleMessageForDomain(" + opid+ ") " + " got new message : " + " serviceId=[" + mh.getServiceId() + "] [newMsg=" + newMsg + "]" + dumpState()); // in non blocking mode, return null when no more msg if (newMsg != null) { boolean isEligible = (newMsg.getChangeNumber().getTime() <= eligibleCN.getTime()); if (debugEnabled()) TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() + " getNextEligibleMessageForDomain(" + opid+ ") " + "newMsg isEligible=" + isEligible + " since " + "newMsg=[" + newMsg.getChangeNumber() + " " + new Date(newMsg.getChangeNumber().getTime()).toString() + "] eligibleCN=[" + eligibleCN + " " + new Date(eligibleCN.getTime()).toString()+"]" + dumpState()); if (isEligible) { nextMsg = newMsg; } else { nextNonEligibleMsg = newMsg; } } } } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } // The list of contexts by domain for the current search CLDomainContext[] clDomCtxts = new CLDomainContext[0]; // The global list of contexts by domain for the search currently processed. DomainContext[] domainCtxts = new DomainContext[0]; private void clDomCtxtsToString(String msg) private String clDomCtxtsToString(String msg) { StringBuilder buffer = new StringBuilder(); buffer.append(msg+"\n"); for (int i=0;i<clDomCtxts.length;i++) for (int i=0;i<domainCtxts.length;i++) { clDomCtxts[i].toString(buffer, i); domainCtxts[i].toString(buffer); buffer.append("\n"); } TRACER.debugInfo( "In " + this.getName() + " clDomCtxts: " + buffer.toString()); return buffer.toString(); } /** * Class that manages the state variables for the current search on the ECL. */ private class CLTraverseCtxt { /** * Specifies the next changer number (seqnum), -1 when not. */ public int nextSeqnum; /** * Specifies whether the current search has been requested to be persistent * or not. */ public short isPersistent; /** * Specifies the last changer number (seqnum) requested. */ public int stopSeqnum; /** * Specifies whether the change number (seqnum) db has been read until * its end. */ public boolean endOfSeqnumdbReached = false; /** * Specifies the current search phase. * 1 = init * 2 = persistent */ public int searchPhase = 1; /** * Specifies the cookie contained in the request, specifying where * to start serving the ECL. */ public String generalizedStartState; /** * Specifies the current cookie value. */ public MultiDomainServerState currentCookie = new MultiDomainServerState(); /** * Specifies the excluded DNs. */ public ArrayList<String> excludedServiceIDs = new ArrayList<String>(); /** * Provides a string representation of this object. * @return the string representation. */ public String toString() { return new String( this.getClass().getCanonicalName() + ":[" + " nextSeqnum=" + nextSeqnum + " persistent=" + isPersistent + " stopSeqnum" + stopSeqnum + " endOfSeqnumdbReached=" + endOfSeqnumdbReached + " searchPhase=" + searchPhase + " generalizedStartState=" + generalizedStartState + "]"); } } // The context of the current search private CLTraverseCtxt cLSearchCtxt = new CLTraverseCtxt(); static int UNDEFINED_PHASE = 0; static int INIT_PHASE = 1; static int PERSISTENT_PHASE = 2; /** * Starts this handler based on a start message received from remote server. @@ -199,10 +306,6 @@ inECLStartMsg.getVersion()); generationId = inECLStartMsg.getGenerationId(); serverURL = inECLStartMsg.getServerURL(); int separator = serverURL.lastIndexOf(':'); serverAddressURL = session.getRemoteAddress() + ":" + serverURL.substring(separator + 1); setInitialServerState(inECLStartMsg.getServerState()); setSendWindowSize(inECLStartMsg.getWindowSize()); if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) @@ -211,8 +314,6 @@ // Only V2 protocol has the group id in repl server start message this.groupId = inECLStartMsg.getGroupId(); } // FIXME:ECL Any generationID must be removed, it makes no sense here. oldGenerationId = -100; } catch(Exception e) { @@ -243,7 +344,7 @@ replicationServer, rcvWindowSize); try { setServiceIdAndDomain("cn=changelog"); setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); } catch(DirectoryException de) { @@ -268,12 +369,12 @@ StartECLSessionMsg startECLSessionMsg) throws DirectoryException { // FIXME:ECL queueSize is hard coded to 1 else Handler hangs for some reason // queueSize is hard coded to 1 else super class hangs for some reason super(null, 1, replicationServerURL, replicationServerId, replicationServer, 0); try { setServiceIdAndDomain("cn=changelog"); setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); } catch(DirectoryException de) { @@ -369,115 +470,153 @@ /** * Initialize the handler from a provided cookie value. * @param providedGeneralizedStartState The provided cookie value. * @param crossDomainStartState The provided cookie value. * @throws DirectoryException When an error is raised. */ public void initializeCLSearchFromGenState( String providedGeneralizedStartState) public void initializeCLSearchFromGenState(String crossDomainStartState) throws DirectoryException { this.cLSearchCtxt.nextSeqnum = -1; // will not generate seqnum initializeCLDomCtxts(providedGeneralizedStartState); initializeCLDomCtxts(crossDomainStartState); } /** * Initialize the handler from a provided draft first change number. * @param startDraftCN The provided draft first change number. * @throws DirectoryException When an error is raised. */ public void initializeCLSearchFromDraftCN(int startDraftCN) throws DirectoryException { String crossDomainStartState; draftCompat = true; DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler(); if (startDraftCN < 0) { // Request filter does not contain any firstDraftCN // So we'll generate from the beginning of what we have stored here. // Get the first DraftCN from DraftCNdb if (draftCNDb.count() == 0) { // db is empty isEndOfDraftCNReached = true; crossDomainStartState = null; } else { // get the generalizedServerState related to the start of the draftDb crossDomainStartState = draftCNDb.getValue(draftCNDb.getFirstKey()); // Get an iterator to traverse the draftCNDb try { draftCNDbIter = draftCNDb.generateIterator(draftCNDb.getFirstKey()); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); if (draftCNDbIter != null) draftCNDbIter.releaseCursor(); throw new DirectoryException( ResultCode.OPERATIONS_ERROR, Message.raw(Category.SYNC, Severity.FATAL_ERROR,"Server Error.")); } } } else { // Request filter does contain a startDraftCN // Read the draftCNDb to see whether it contains startDraftCN crossDomainStartState = draftCNDb.getValue(startDraftCN); if (crossDomainStartState != null) { // startDraftCN is present in the draftCnDb // Get an iterator to traverse the draftCNDb try { draftCNDbIter = draftCNDb.generateIterator(draftCNDb.getFirstKey()); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); if (draftCNDbIter != null) draftCNDbIter.releaseCursor(); throw new DirectoryException( ResultCode.OPERATIONS_ERROR, Message.raw(Category.SYNC, Severity.FATAL_ERROR,"Server Error.")); } } else { // startDraftCN provided in the request is not present in the draftCnDb // Is the provided startDraftCN <= the potential last DraftCNdb // Get the draftLimits (from the eligibleCN got at the beginning of // the operation. int[] limits = getECLDraftCNLimits(eligibleCN); if (startDraftCN<=limits[1]) { // startDraftCN is between first and last and has never been // returned yet crossDomainStartState = draftCNDb.getValue(draftCNDb.getLastKey()); // FIXME:ECL ... ok we'll start from the end of the draftCNDb BUT ... // this is NOT the request of the client !!!! } else { throw new DirectoryException( ResultCode.SUCCESS, Message.raw(Category.SYNC, Severity.INFORMATION,"Bad value provided for change number " + " Failed to match a replication state to "+startDraftCN)); } } } this.draftCompat = true; initializeCLDomCtxts(crossDomainStartState); } /** * Initialize the context for each domain. * @param providedGeneralizedStartState the provided generalized state * @param providedCookie the provided generalized state * @throws DirectoryException When an error occurs. */ public void initializeCLDomCtxts(String providedGeneralizedStartState) public void initializeCLDomCtxts(String providedCookie) throws DirectoryException { HashMap<String,ServerState> startStates = new HashMap<String,ServerState>(); ReplicationServer rs = replicationServerDomain.getReplicationServer(); try { // Initialize start state for all running domains with empty state Iterator<ReplicationServerDomain> rsdk = rs.getCacheIterator(); if (rsdk != null) { while (rsdk.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdk.next(); // skip the changelog domain if (rsd == this.replicationServerDomain) continue; startStates.put(rsd.getBaseDn(), new ServerState()); } } // Overwrite start state from the cookie provided in the request if ((providedGeneralizedStartState != null) && (providedGeneralizedStartState.length()>0)) { String[] domains = providedGeneralizedStartState.split(";"); for (String domainState : domains) { // Split baseDN and serverState String[] fields = domainState.split(":"); // BaseDN - Check it String domainBaseDNReceived = fields[0]; if (!startStates.containsKey(domainBaseDNReceived)) throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "unknown " + domainBaseDNReceived)); // ServerState ServerState domainServerState = new ServerState(); if (fields.length>1) { String strState = fields[1]; String[] strCN = strState.split(" "); for (String sr : strCN) { ChangeNumber fromChangeNumber = new ChangeNumber(sr); domainServerState.update(fromChangeNumber); } } startStates.put(domainBaseDNReceived, domainServerState); // FIXME: ECL first cookie value check // ECL For each of the provided state, it this state is older // than the older change stored in the replication changelog .... // then a purge occured since the time the cookie was published // it is recommended to do a full resync ReplicationServerDomain rsd = rs.getReplicationServerDomain(domainBaseDNReceived, false); ServerState domainStartState = rsd.getStartState(); if (!domainServerState.cover(domainStartState)) { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "too old cookie provided " + providedGeneralizedStartState + " first acceptable change for " + rsd.getBaseDn() + " is " + rsd.getStartState())); } } } } catch(DirectoryException de) { throw de; } catch(Exception e) { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "Exception raised: " + e.getMessage())); } // Parse the provided cookie and overwrite startState from it. if ((providedCookie != null) && (providedCookie.length()!=0)) startStates = MultiDomainServerState.splitGenStateToServerStates(providedCookie); try { // Now traverse all domains and build the initial changelog context Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator(); // Now traverse all domains and build all the initial contexts : // - the global one : dumpState() // - the domain by domain ones : domainCtxts Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator(); // Creates the table that will contain the real-time info by domain. clDomCtxts = new CLDomainContext[rs.getCacheSize()-1 -this.cLSearchCtxt.excludedServiceIDs.size()]; HashSet<DomainContext> tmpSet = new HashSet<DomainContext>(); int i =0; if (rsdi != null) { @@ -491,73 +630,87 @@ continue; // skip the excluded domains boolean excluded = false; for(String excludedServiceID : this.cLSearchCtxt.excludedServiceIDs) { if (excludedServiceID.equalsIgnoreCase(rsd.getBaseDn())) { excluded=true; break; } } if (excluded) if (isServiceIDExcluded(rsd.getBaseDn())) continue; // Creates the context record CLDomainContext newContext = new CLDomainContext(); newContext.active = true; newContext.rsd = rsd; // Creates the new domain context DomainContext newDomainCtxt = new DomainContext(); newDomainCtxt.active = true; newDomainCtxt.rsd = rsd; if (this.cLSearchCtxt.isPersistent == // Assign the start state for the domain if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) { newContext.startState = rsd.getCLElligibleState(); newDomainCtxt.startState = rsd.getEligibleState(eligibleCN); } else { newContext.startState = startStates.get(rsd.getBaseDn()); newContext.stopState = rsd.getCLElligibleState(); newDomainCtxt.startState = startStates.remove(rsd.getBaseDn()); if ((providedCookie==null)||(providedCookie.isEmpty())) newDomainCtxt.startState = new ServerState(); else if (newDomainCtxt.startState == null) throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "missing " + rsd.getBaseDn())); newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN); } newContext.currentState = new ServerState(); newDomainCtxt.currentState = new ServerState(); // Creates an unconnected SH // Creates an unconnected SH for the domain MessageHandler mh = new MessageHandler(maxQueueSize, replicationServerURL, replicationServerId, replicationServer); // set initial state mh.setInitialServerState(newContext.startState); mh.setInitialServerState(newDomainCtxt.startState); // set serviceID and domain mh.setServiceIdAndDomain(rsd.getBaseDn()); // register into domain // register the unconnected into the domain rsd.registerHandler(mh); newContext.mh = mh; newDomainCtxt.mh = mh; previousCookie.update( newDomainCtxt.rsd.getBaseDn(), newDomainCtxt.startState); // store the new context clDomCtxts[i] = newContext; tmpSet.add(newDomainCtxt); i++; } } // the next record from the seqnumdb should be the one cLSearchCtxt.endOfSeqnumdbReached = false; cLSearchCtxt.generalizedStartState = providedGeneralizedStartState; // Initializes all domain with the next elligible message for (int j=0; j<clDomCtxts.length; j++) if (!startStates.isEmpty()) { this.getNextElligibleMessage(j); if (clDomCtxts[j].nextMsg == null) clDomCtxts[j].active = false; throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "unknown " + startStates.toString())); } domainCtxts = tmpSet.toArray(new DomainContext[0]); // the next record from the DraftCNdb should be the one startCookie = providedCookie; // Initializes all domain with the next(first) elligible message for (int j=0; j<domainCtxts.length; j++) { domainCtxts[j].getNextEligibleMessageForDomain(operationId); if (domainCtxts[j].nextMsg == null) domainCtxts[j].active = false; } } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); // FIXME:ECL do not publish internal exception plumb to the client throw new DirectoryException( ResultCode.OPERATIONS_ERROR, Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " + e.getLocalizedMessage()), e); e), e); } if (debugEnabled()) TRACER.debugInfo( " initializeCLDomCtxts ends with " + " " + dumpState()); } /** @@ -569,21 +722,22 @@ } /** * Shutdown this handler ServerHandler. * Shutdown this handler. */ public void shutdown() { for (int i=0;i<clDomCtxts.length;i++) for (int i=0;i<domainCtxts.length;i++) { if (!clDomCtxts[i].rsd.unRegisterHandler(clDomCtxts[i].mh)) if (!domainCtxts[i].rsd.unRegisterHandler(domainCtxts[i].mh)) { TRACER.debugInfo(this +" shutdown() Internal error " + " when unregistering "+ clDomCtxts[i].mh); logError(Message.raw(Category.SYNC, Severity.NOTICE, this +" shutdown() - error when unregistering handler " + domainCtxts[i].mh)); } clDomCtxts[i].rsd.stopServer(clDomCtxts[i].mh); domainCtxts[i].rsd.stopServer(domainCtxts[i].mh); } super.shutdown(); clDomCtxts = null; domainCtxts = null; } /** @@ -629,7 +783,7 @@ attributes.add(Attributes.create("External-Changelog-Server", serverURL)); // FIXME:ECL No monitoring exist for ECL. // TODO:ECL No monitoring exist for ECL. return attributes; } /** @@ -640,8 +794,9 @@ { String localString; localString = "External changelog Server "; if (this.cLSearchCtxt==null) localString += serverId + " " + serverURL + " " + getServiceId(); if (this.serverId != 0) localString += serverId + " " + serverURL + " " + getServiceId() + " " + this.getOperationId(); else localString += this.getName(); return localString; @@ -652,18 +807,9 @@ */ public ServerStatus getStatus() { // FIXME:ECL Sould ECLServerHandler manage a ServerStatus ? return ServerStatus.INVALID_STATUS; } /** * Retrieves the Address URL for this server handler. * * @return The Address URL for this server handler, * in the form of an IP address and port separated by a colon. */ public String getServerAddressURL() { return serverAddressURL; // There is no other status possible for the ECL Server Handler to // be normally connected. return ServerStatus.NORMAL_STATUS; } /** * {@inheritDoc} @@ -684,29 +830,34 @@ { // this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ? this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ? this.setConsumerActive(true); this.cLSearchCtxt.searchPhase = 1; //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ? //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ? //this.setConsumerActive(true); this.operationId = startECLSessionMsg.getOperationId(); this.setName(this.getClass().getCanonicalName()+ " " + operationId); if (eligibleCNComputerThread==null) eligibleCNComputerThread = new ECLEligibleCNComputerThread(); cLSearchCtxt.isPersistent = startECLSessionMsg.isPersistent(); cLSearchCtxt.stopSeqnum = startECLSessionMsg.getLastDraftChangeNumber(); cLSearchCtxt.searchPhase = 1; cLSearchCtxt.currentCookie = new MultiDomainServerState( isPersistent = startECLSessionMsg.isPersistent(); lastDraftCN = startECLSessionMsg.getLastDraftChangeNumber(); searchPhase = INIT_PHASE; previousCookie = new MultiDomainServerState( startECLSessionMsg.getCrossDomainServerState()); cLSearchCtxt.excludedServiceIDs=startECLSessionMsg.getExcludedServiceIDs(); excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs(); replicationServer.disableEligibility(excludedServiceIDs); eligibleCN = replicationServer.getEligibleCN(); //-- if (startECLSessionMsg.getECLRequestType()==0) if (startECLSessionMsg.getECLRequestType()== StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE) { initializeCLSearchFromGenState( startECLSessionMsg.getCrossDomainServerState()); } else if (startECLSessionMsg.getECLRequestType()== StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER) { initializeCLSearchFromDraftCN( startECLSessionMsg.getFirstDraftChangeNumber()); } if (session != null) { @@ -735,21 +886,15 @@ // Resume the writer ((ECLServerWriter)writer).resumeWriter(); // FIXME:ECL Potential race condition if writer not yet resumed here // TODO:ECL Potential race condition if writer not yet resumed here } if (cLSearchCtxt.isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) { closePhase1(); closeInitPhase(); } /* TODO: Good Draft Compat //-- if (startCLMsg.getStartMode()==1) { initializeCLSearchFromProvidedSeqnum(startCLMsg.getSequenceNumber()); } /* TODO: From replication changenumber //-- if (startCLMsg.getStartMode()==2) { @@ -762,14 +907,14 @@ { // to get the CL first and last initializeCLDomCtxts(null); // from start ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN(); ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN(); try { // to get the CL first and last // last rely on the crossDomainElligibleCN thhus must have been // last rely on the crossDomainEligibleCN thhus must have been // computed before int[] limits = computeCLLimits(crossDomainElligibleCN); int[] limits = computeCLLimits(crossDomainEligibleCN); // Send the response CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]); session.publish(msg); @@ -793,11 +938,17 @@ } return; } Good Draft Compat */ */ // Store into domain registerIntoDomain(); if (debugEnabled()) TRACER.debugInfo( this.getName() + " initialized: " + " " + dumpState() + " " + " " + clDomCtxtsToString("")); } /** @@ -812,30 +963,12 @@ throws DirectoryException { boolean interrupted = true; ECLUpdateMsg msg = getnextUpdate(); ECLUpdateMsg msg = getNextECLUpdate(); // FIXME:ECL We should refactor so that a SH always have a session // TODO:ECL We should refactor so that a SH always have a session if (session == null) return msg; /* * When we remove a message from the queue we need to check if another * server is waiting in flow control because this queue was too long. * This check might cause a performance penalty an therefore it * is not done for every message removed but only every few messages. */ /** FIXME:ECL checkAllSaturation makes no sense for ECLServerHandler ? if (++saturationCount > 10) { saturationCount = 0; try { replicationServerDomain.checkAllSaturation(); } catch (IOException e) { } } */ boolean acquired = false; do { @@ -857,16 +990,17 @@ /** * Get the next message - non blocking - null when none. * * @param synchronous - not used - always non blocking. * @return the next message - null when none. * This method is currently not used but we don't want to keep the mother * class method since it make no sense for ECLServerHandler. * @param synchronous - not used * @return the next message */ protected UpdateMsg getnextMessage(boolean synchronous) { UpdateMsg msg = null; try { ECLUpdateMsg eclMsg = getnextUpdate(); ECLUpdateMsg eclMsg = getNextECLUpdate(); if (eclMsg!=null) msg = eclMsg.getUpdateMsg(); } @@ -878,60 +1012,24 @@ } /** * Get the next external changelog update. * * @return The ECL update, null when none. * @exception DirectoryException when any problem occurs. * Returns the next update message for the External Changelog (ECL). * @return the ECL update message, null when there aren't anymore. * @throws DirectoryException when an error occurs. */ protected ECLUpdateMsg getnextUpdate() public ECLUpdateMsg getNextECLUpdate() throws DirectoryException { return getGeneralizedNextECLUpdate(this.cLSearchCtxt); } ECLUpdateMsg oldestChange = null; /** * Computes the cross domain eligible message (non blocking). * Return null when search is covered */ private ECLUpdateMsg getGeneralizedNextECLUpdate(CLTraverseCtxt cLSearchCtxt) throws DirectoryException { ECLUpdateMsg theOldestChange = null; try { if (debugEnabled()) TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + "," + this + " getGeneralizedNextECLUpdate starts with ctxt=" + cLSearchCtxt); " getNextECLUpdate starts: " + dumpState()); if ((cLSearchCtxt.nextSeqnum != -1) && (!cLSearchCtxt.endOfSeqnumdbReached)) { /* TODO:ECL G Good changelog draft compat. // First time , initialise the cursor to traverse the seqnumdb if (seqnumDbReadIterator == null) { try { seqnumDbReadIterator = replicationServerDomain.getReplicationServer(). getSeqnumDbHandler().generateIterator(cLSearchCtxt.nextSeqnum); TRACER.debugInfo("getGeneralizedNextMessage(): " + " creates seqnumDbReadIterator from nextSeqnum=" + cLSearchCtxt.nextSeqnum + " 1rst=" + seqnumDbReadIterator.getSeqnum() + " CN=" + seqnumDbReadIterator.getChangeNumber() + cLSearchCtxt); } catch(Exception e) { cLSearchCtxt.endOfSeqnumdbReached = true; } } */ } try { // Search / no seqnum / not persistent // Search / no DraftCN / not persistent // ----------------------------------- // init: all domain are candidate // get one msg from each @@ -958,136 +1056,136 @@ // if one domain has no msg, still is candidate int iDom = 0; boolean nextclchange = true; while ((nextclchange) && (cLSearchCtxt.searchPhase==1)) boolean continueLooping = true; while ((continueLooping) && (searchPhase == INIT_PHASE)) { // Step 1 & 2 if (cLSearchCtxt.searchPhase==1) if (searchPhase == INIT_PHASE) { if (debugEnabled()) clDomCtxtsToString("In getGeneralizedNextMessage : " + "looking for the generalized oldest change"); // Normally we whould not loop .. except ... continueLooping = false; // Retrieves the index in the subx table of the // generalizedOldestChange iDom = getGeneralizedOldestChange(); iDom = getOldestChangeFromDomainCtxts(); // idomain != -1 means that we got one // generalized oldest change to process if (iDom==-1) // iDom == -1 means that there is no oldest change to process if (iDom == -1) { closePhase1(); closeInitPhase(); // signify end of phase 1 to the caller // signals end of phase 1 to the caller return null; } // idomain != -1 means that we got one // generalized oldest change to process String suffix = this.clDomCtxts[iDom].rsd.getBaseDn(); theOldestChange = new ECLUpdateMsg( (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg, null, // set later suffix); // Build the ECLUpdateMsg to be returned oldestChange = new ECLUpdateMsg( (LDAPUpdateMsg)domainCtxts[iDom].nextMsg, null, // cookie will be set later domainCtxts[iDom].rsd.getBaseDn(), 0); // draftChangeNumber may be set later domainCtxts[iDom].nextMsg = null; nextclchange = false; /* TODO:ECL G Good change log draft compat. if (cLSearchCtxt.nextSeqnum!=-1) if (draftCompat) { // Should either retrieve or generate a seqnum // we also need to check if the seqnumdb is acccurate reagrding // either retrieve a draftCN from the draftCNDb // or assign a new draftCN and store in the db DraftCNDbHandler draftCNDb=replicationServer.getDraftCNDbHandler(); // We also need to check if the draftCNdb is consistent with // the changelogdb. // if not, 2 potential reasons // -1 : purge from the changelog .. let's traverse the seqnumdb // -2 : changelog is late .. let's traverse the changelog // a/ : changelog has been purged (trim)let's traverse the draftCNDb // b/ : changelog is late .. let's traverse the changelogDb // The following loop allows to loop until being on the same cn // in the 2 dbs // replogcn : the oldest change from the changelog db ChangeNumber replogcn = theOldestChange.getChangeNumber(); DN replogReplDomDN = clDomCtxts[iDom].rsd.getBaseDn(); ChangeNumber cnFromChangelogDb = oldestChange.getUpdateMsg().getChangeNumber(); String dnFromChangelogDb = domainCtxts[iDom].rsd.getBaseDn(); while (true) { if (!cLSearchCtxt.endOfSeqnumdbReached) if (!isEndOfDraftCNReached) { // we did not reach yet the end of the seqnumdb // we did not reach yet the end of the DraftCNdb // seqnumcn : the next change from from the seqnum db ChangeNumber seqnumcn = seqnumDbReadIterator.getChangeNumber(); // the next change from the DraftCN db ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber(); String dnFromDraftCNDb = draftCNDbIter.getServiceID(); // are replogcn and seqnumcn should be the same change ? int cmp = replogcn.compareTo(seqnumcn); DN seqnumReplDomDN=DN.decode(seqnumDbReadIterator. getDomainDN()); // are replogcn and DraftCNcn should be the same change ? int areCNEqual = cnFromChangelogDb.compareTo(cnFromDraftCNDb); int areDNEqual = dnFromChangelogDb.compareTo(dnFromDraftCNDb); TRACER.debugInfo("seqnumgen: comparing the 2 db " + " changelogdb:" + replogReplDomDN + "=" + replogcn + " ts=" + new Date(replogcn.getTime()).toString() + "## seqnumdb:" + seqnumReplDomDN + "=" + seqnumcn + " ts=" + new Date(seqnumcn.getTime()).toString() + " sn older=" + seqnumcn.older(replogcn)); if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " + " comparing the 2 db DNs :" + dnFromChangelogDb + "?=" + cnFromChangelogDb + " timestamps:" + new Date(cnFromChangelogDb.getTime()) + " ?older" + new Date(cnFromDraftCNDb.getTime())); if ((replogReplDomDN.compareTo(seqnumReplDomDN)==0) && (cmp==0)) if ((areDNEqual==0) && (areCNEqual==0)) { // same domain and same CN => same change // assign the seqnum from the seqnumdb // to the change from the changelogdb // assign the DraftCN found to the change from the changelogdb if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate generating draftCN " + " assigning draftCN=" + draftCNDbIter.getDraftCN() + " to change=" + oldestChange); TRACER.debugInfo("seqnumgen: assigning seqnum=" + seqnumDbReadIterator.getSeqnum() + " to change=" + theOldestChange); theOldestChange.setSeqnum(seqnumDbReadIterator.getSeqnum()); oldestChange.setDraftChangeNumber( draftCNDbIter.getDraftCN()); // prepare the next seqnum for the potential next change added // to the seqnumDb cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum() + 1; break; } else { // replogcn and seqnumcn are NOT the same change if (seqnumcn.older(replogcn)) // replogcn and DraftCNcn are NOT on the same change if (cnFromDraftCNDb.older(cnFromChangelogDb)) { // the change from the seqnumDb is older // the change from the DraftCNDb is older // that means that the change has been purged from the // changelog // changelogDb (and DraftCNdb not yet been trimed) try { // let's traverse the seqnumdb searching for the change // let's traverse the DraftCNdb searching for the change // found in the changelogDb. TRACER.debugInfo("seqnumgen: will skip " + seqnumcn + " and next from the seqnum"); cLSearchCtxt.endOfSeqnumdbReached = (seqnumDbReadIterator.next()==false); TRACER.debugInfo("seqnumgen: has nexted cr to " + " sn=" + seqnumDbReadIterator.getSeqnum() + " cn=" + seqnumDbReadIterator.getChangeNumber() + " and reached end " + " of seqnumdb:"+cLSearchCtxt.endOfSeqnumdbReached); if (cLSearchCtxt.endOfSeqnumdbReached) TRACER.debugInfo("getNextECLUpdate generating draftCN " + " will skip " + cnFromDraftCNDb + " and read next change from the DraftCNDb."); isEndOfDraftCNReached = (draftCNDbIter.next()==false); TRACER.debugInfo("getNextECLUpdate generating draftCN " + " has skiped to " + " sn=" + draftCNDbIter.getDraftCN() + " cn=" + draftCNDbIter.getChangeNumber() + " End of draftCNDb ?"+isEndOfDraftCNReached); if (isEndOfDraftCNReached) { // we are at the end of the seqnumdb in the append mode // store in seqnumdb the pair // seqnum of the cur change,state before this change) replicationServerDomain.addSeqnum( cLSearchCtxt.nextSeqnum, getGenState(), clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), theOldestChange.getChangeNumber()); theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); cLSearchCtxt.nextSeqnum++; // we are at the end of the DraftCNdb in the append mode // generate a new draftCN and assign to this change oldestChange.setDraftChangeNumber( replicationServer.getNewDraftCN()); // store in DraftCNdb the pair // (draftCN_of_the_cur_change, state_before_this_change) draftCNDb.add( oldestChange.getDraftChangeNumber(), previousCookie.toString(), oldestChange.getServiceId(), oldestChange.getUpdateMsg().getChangeNumber()); break; } else { // next change from seqnumdb cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum() + 1; // let's go to test this new change fro the DraftCNdb continue; } } @@ -1100,108 +1198,99 @@ // the change from the changelogDb is older // it should have been stored lately // let's continue to traverse the changelogdb TRACER.debugInfo("seqnumgen: will skip " + replogcn + " and next from the CL"); nextclchange = true; TRACER.debugInfo("getNextECLUpdate: will skip " + cnFromChangelogDb + " and read next from the regular changelog."); continueLooping = true; break; // TO BE CHECKED } } } else { // we are at the end of the seqnumdb in the append mode // store in seqnumdb the pair // (seqnum of the current change, state before this change) replicationServerDomain.addSeqnum( cLSearchCtxt.nextSeqnum, getGenState(), clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), theOldestChange.getChangeNumber()); theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); cLSearchCtxt.nextSeqnum++; // we are at the end of the DraftCNdb in the append mode // store in DraftCNdb the pair // (DraftCN of the current change, state before this change) oldestChange.setDraftChangeNumber( replicationServer.getNewDraftCN()); draftCNDb.add( oldestChange.getDraftChangeNumber(), this.previousCookie.toString(), domainCtxts[iDom].rsd.getBaseDn(), oldestChange.getUpdateMsg().getChangeNumber()); break; } } // while seqnum } // nextseqnum !- -1 */ } // while DraftCN } // here we have the right oldest change and in the seqnum case we // have its seqnum // here we have the right oldest change // and in the draft case, we have its draft changenumber // Set and test the domain of the oldestChange see if we reached // the end of the phase for this domain clDomCtxts[iDom].currentState.update( theOldestChange.getUpdateMsg().getChangeNumber()); domainCtxts[iDom].currentState.update( oldestChange.getUpdateMsg().getChangeNumber()); if (clDomCtxts[iDom].currentState.cover(clDomCtxts[iDom].stopState)) if (domainCtxts[iDom].currentState.cover(domainCtxts[iDom].stopState)) { clDomCtxts[iDom].active = false; domainCtxts[iDom].active = false; } // Test the seqnum of the oldestChange see if we reached // the end of operation /* TODO:ECL G Good changelog draft compat. Not yet implemented if ((cLSearchCtxt.stopSeqnum>0) && (theOldestChange.getSeqnum()>=cLSearchCtxt.stopSeqnum)) { closePhase1(); // means end of phase 1 to the calling writer return null; } */ if (clDomCtxts[iDom].active) if (domainCtxts[iDom].active) { // populates the table with the next eligible msg from idomain // in non blocking mode, return null when no more eligible msg getNextElligibleMessage(iDom); domainCtxts[iDom].getNextEligibleMessageForDomain(operationId); } } // phase ==1 } // while (nextclchange) } // phase == INIT_PHASE } // while (...) if (cLSearchCtxt.searchPhase==2) if (searchPhase == PERSISTENT_PHASE) { clDomCtxtsToString("In getGeneralizedNextMessage (persistent): " + "looking for the generalized oldest change"); if (debugEnabled()) clDomCtxtsToString("In getNextECLUpdate (persistent): " + "looking for the generalized oldest change"); for (int ido=0; ido<clDomCtxts.length; ido++) for (int ido=0; ido<domainCtxts.length; ido++) { // get next msg getNextElligibleMessage(ido); domainCtxts[ido].getNextEligibleMessageForDomain(operationId); } // take the oldest one iDom = getGeneralizedOldestChange(); iDom = getOldestChangeFromDomainCtxts(); if (iDom != -1) { String suffix = this.clDomCtxts[iDom].rsd.getBaseDn(); String suffix = this.domainCtxts[iDom].rsd.getBaseDn(); theOldestChange = new ECLUpdateMsg( (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg, oldestChange = new ECLUpdateMsg( (LDAPUpdateMsg)domainCtxts[iDom].nextMsg, null, // set later suffix); suffix, 0); domainCtxts[iDom].nextMsg = null; // clean clDomCtxts[iDom].currentState.update( theOldestChange.getUpdateMsg().getChangeNumber()); domainCtxts[iDom].currentState.update( oldestChange.getUpdateMsg().getChangeNumber()); /* TODO:ECL G Good changelog draft compat. if (cLSearchCtxt.nextSeqnum!=-1) if (draftCompat) { // should generate seqnum // should generate DraftCN DraftCNDbHandler draftCNDb =replicationServer.getDraftCNDbHandler(); // store in seqnumdb the pair // (seqnum of the current change, state before this change) replicationServerDomain.addSeqnum( cLSearchCtxt.nextSeqnum, getGenState(), clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), theOldestChange.getChangeNumber()); theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); cLSearchCtxt.nextSeqnum++; oldestChange.setDraftChangeNumber( replicationServer.getNewDraftCN()); // store in DraftCNdb the pair // (DraftCN of the current change, state before this change) draftCNDb.add( oldestChange.getDraftChangeNumber(), this.previousCookie.toString(), domainCtxts[iDom].rsd.getBaseDn(), oldestChange.getUpdateMsg().getChangeNumber()); } */ } } } @@ -1214,39 +1303,48 @@ e); } if (theOldestChange != null) if (oldestChange != null) { if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + oldestChange.getUpdateMsg().getChangeNumber()); // Update the current state this.cLSearchCtxt.currentCookie.update( theOldestChange.getServiceId(), theOldestChange.getUpdateMsg().getChangeNumber()); previousCookie.update( oldestChange.getServiceId(), oldestChange.getUpdateMsg().getChangeNumber()); // Set the current value of global state in the returned message theOldestChange.setCookie(this.cLSearchCtxt.currentCookie); oldestChange.setCookie(previousCookie); if (debugEnabled()) TRACER.debugInfo("getNextECLUpdate returns result oldest change =" + oldestChange); } return theOldestChange; return oldestChange; } /** * Terminates the first (non persistent) phase of the search on the ECL. */ private void closePhase1() private void closeInitPhase() { // starvation of changelog messages // all domain have been unactived means are covered if (debugEnabled()) TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + "," + this + " closePhase1()" + " searchCtxt=" + cLSearchCtxt); getMonitorInstanceName() + "," + this + " closeInitPhase(): " + dumpState()); // go to persistent phase if one for (int i=0; i<clDomCtxts.length; i++) clDomCtxts[i].active = true; for (int i=0; i<domainCtxts.length; i++) domainCtxts[i].active = true; if (this.cLSearchCtxt.isPersistent != StartECLSessionMsg.NON_PERSISTENT) if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT) { // phase = 1 done AND persistent search => goto phase 2 cLSearchCtxt.searchPhase=2; // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE searchPhase = PERSISTENT_PHASE; if (writer ==null) { @@ -1257,269 +1355,53 @@ } else { // phase = 1 done AND !persistent search => reinit to phase 0 cLSearchCtxt.searchPhase=0; // INIT_PHASE is done AND search is not persistent => reinit searchPhase = UNDEFINED_PHASE; } /* TODO:ECL G Good changelog draft compat. if (seqnumDbReadIterator!=null) if (draftCNDbIter!=null) { // End of phase 1 => always release the seqnum iterator seqnumDbReadIterator.releaseCursor(); seqnumDbReadIterator = null; // End of INIT_PHASE => always release the iterator draftCNDbIter.releaseCursor(); draftCNDbIter = null; } */ } /** * Get the oldest change contained in the subx table. * The subx table should be populated before * @return the oldest change. * Get the index in the domainCtxt table of the domain with the oldest change. * @return the index of the domain with the oldest change, -1 when none. */ private int getGeneralizedOldestChange() private int getOldestChangeFromDomainCtxts() { int oldest = -1; for (int i=0; i<clDomCtxts.length; i++) for (int i=0; i<domainCtxts.length; i++) { if ((clDomCtxts[i].active)) if ((domainCtxts[i].active)) { // on the first loop, oldest==-1 // .msg is null when the previous (non blocking) nextMessage did // not have any eligible msg to return if (clDomCtxts[i].nextMsg != null) if (domainCtxts[i].nextMsg != null) { if ((oldest==-1) || (clDomCtxts[i].nextMsg.compareTo(clDomCtxts[oldest].nextMsg)<0)) (domainCtxts[i].nextMsg.compareTo(domainCtxts[oldest].nextMsg)<0)) { oldest = i; } } } } if (debugEnabled()) TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + "," + this + " getGeneralizedOldestChange() " + " returns " + ((oldest!=-1)?clDomCtxts[oldest].nextMsg:"")); getMonitorInstanceName() + "," + this + " getOldestChangeFromDomainCtxts() returns " + ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1")); return oldest; } /** * Get from the provided domain, the next message elligible regarding * the crossDomain elligible CN. Put it in the context table. * @param idomain the provided domain. */ private void getNextElligibleMessage(int idomain) { ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN(); try { if (clDomCtxts[idomain].nonElligiblemsg != null) { TRACER.debugInfo("getNextElligibleMessage tests if the already " + " stored nonElligibleMsg has becoem elligible regarding " + " the crossDomainElligibleCN ("+crossDomainElligibleCN + " ) " + clDomCtxts[idomain].nonElligiblemsg.getChangeNumber(). older(crossDomainElligibleCN)); // we already got the oldest msg and it was not elligible if (clDomCtxts[idomain].nonElligiblemsg.getChangeNumber(). older(crossDomainElligibleCN)) { // it is now elligible clDomCtxts[idomain].nextMsg = clDomCtxts[idomain].nonElligiblemsg; clDomCtxts[idomain].nonElligiblemsg = null; } else { // the oldest is still not elligible - let's wait next } } else { // non blocking UpdateMsg newMsg = clDomCtxts[idomain].mh.getnextMessage(false); if (debugEnabled()) TRACER.debugInfo(this + " getNextElligibleMessage got the next changelogmsg " + " from " + clDomCtxts[idomain].mh.getServiceId() + " newCLMsg=" + newMsg); clDomCtxts[idomain].nextMsg = clDomCtxts[idomain].nonElligiblemsg = null; // in non blocking mode, return null when no more msg if (newMsg != null) { /* TODO:ECL Take into account eligibility. TRACER.debugInfo("getNextElligibleMessage is " + newMsg.getChangeNumber() + new Date(newMsg.getChangeNumber().getTime()).toString() + " elligible " + newMsg.getChangeNumber().older(crossDomainElligibleCN)); if (newMsg.getChangeNumber().older(crossDomainElligibleCN)) { // is elligible clDomCtxts[idomain].nextMsg = newMsg; } else { // is not elligible clDomCtxts[idomain].nonElligiblemsg = newMsg; } */ clDomCtxts[idomain].nextMsg = newMsg; } } } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } /* */ ECLEligibleCNComputerThread eligibleCNComputerThread = null; ChangeNumber liveecn; private ChangeNumber computeCrossDomainElligibleCN() { return liveecn; } /** * This class specifies the thread that computes periodically * the cross domain eligible CN. */ private final class ECLEligibleCNComputerThread extends DirectoryThread { /** * The tracer object for the debug logger. */ private boolean shutdown = false; private ECLEligibleCNComputerThread() { super("ECL eligible CN computer thread"); start(); } public void run() { while (shutdown == false) { try { synchronized (this) { liveecn = computeNewCrossDomainElligibleCN(); try { this.wait(1000); } catch (InterruptedException e) { } } } catch (Exception end) { break; } } } private ChangeNumber computeNewCrossDomainElligibleCN() { ChangeNumber computedCrossDomainElligibleCN = null; String s = "=> "; ReplicationServer rs = replicationServerDomain.getReplicationServer(); if (debugEnabled()) TRACER.debugInfo("ECLSH.computeNewCrossDomainElligibleCN() " + " periodic starts rs="+rs); Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator(); if (rsdi != null) { while (rsdi.hasNext()) { ReplicationServerDomain domain = rsdi.next(); if (domain.getBaseDn().equalsIgnoreCase("cn=changelog")) continue; ChangeNumber domainElligibleCN = computeEligibleCN(domain); if (domainElligibleCN==null) continue; if ((computedCrossDomainElligibleCN == null) || (domainElligibleCN.older(computedCrossDomainElligibleCN))) { computedCrossDomainElligibleCN = domainElligibleCN; } s += "\n DN:" + domain.getBaseDn() + "\t\t domainElligibleCN :" + domainElligibleCN + "/" + new Date(domainElligibleCN.getTime()).toString(); } } if (debugEnabled()) TRACER.debugInfo("SH.computeNewCrossDomainElligibleCN() periodic " + " ends with " + " the following domainElligibleCN for each domain :" + s + "\n thus CrossDomainElligibleCN=" + computedCrossDomainElligibleCN + " ts=" + new Date(computedCrossDomainElligibleCN.getTime()).toString()); return computedCrossDomainElligibleCN; } } /** * Compute the eligible CN. * @param rsd The provided replication server domain for which we want * to retrieve the eligible date. * @return null if the domain does not play in eligibility. */ public ChangeNumber computeEligibleCN(ReplicationServerDomain rsd) { ChangeNumber elligibleCN = null; ServerState heartbeatState = rsd.getHeartbeatState(); if (heartbeatState==null) return null; // compute elligible CN ServerState hbState = heartbeatState.duplicate(); Iterator<Short> it = hbState.iterator(); while (it.hasNext()) { short sid = it.next(); ChangeNumber storedCN = hbState.getMaxChangeNumber(sid); // If the most recent UpdateMsg or CLHeartbeatMsg received is very old // then the server is considered down and not considered for eligibility if (TimeThread.getTime()-storedCN.getTime()>2000) { if (debugEnabled()) TRACER.debugInfo( "For RSD." + rsd.getBaseDn() + " Server " + sid + " is not considered for eligibility ... potentially down"); continue; } if ((elligibleCN == null) || (storedCN.older(elligibleCN))) { elligibleCN = storedCN; } } if (debugEnabled()) TRACER.debugInfo( "For RSD." + rsd.getBaseDn() + " ElligibleCN()=" + elligibleCN); return elligibleCN; } /** * Returns the client operation id. * @return The client operation id. */ @@ -1533,7 +1415,7 @@ * @return Whether the current search is persistent or not. */ public short isPersistent() { return this.cLSearchCtxt.isPersistent; return this.isPersistent; } /** @@ -1541,7 +1423,137 @@ * @return Whether the current search is persistent or not. */ public int getSearchPhase() { return this.cLSearchCtxt.searchPhase; return this.searchPhase; } /** * Refresh the eligibleCN by requesting the replication server. */ public void refreshEligibleCN() { eligibleCN = replicationServer.getEligibleCN(); } /* * Get first and last DraftCN * @param crossDomainEligibleCN * @return */ private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN) throws DirectoryException { /* The content of the DraftCNdb depends on the SEARCH operations done before * requesting the DraftCN. If no operations, DraftCNdb is empty. * The limits we want to get are the "potential" limits if a request was * done, the DraftCNdb is probably not complete to do that. * * The first DraftCN is : * - the first record from the DraftCNdb * - if none because DraftCNdb empty, * then * if no change in replchangelog then return 0 * else return 1 (DraftCN that WILL be returned to next search) * * The last DraftCN is : * - initialized with the last record from the DraftCNdb (0 if none) * and consider the genState associated * - to the last DraftCN, we add the count of updates in the replchangelog * FROM that genState TO the crossDomainEligibleCN * (this diff is done domain by domain) */ int firstDraftCN; int lastDraftCN; boolean DraftCNdbIsEmpty; DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler(); ReplicationServer rs = replicationServerDomain.getReplicationServer(); // Get the first DraftCN from the DraftCNdb firstDraftCN = draftCNDbH.getFirstKey(); HashMap<String,ServerState> domainsServerStateForLastSeqnum = null; if (firstDraftCN < 1) { DraftCNdbIsEmpty=true; firstDraftCN = 0; lastDraftCN = 0; } else { DraftCNdbIsEmpty=false; // Get the last DraftCN from the DraftCNdb lastDraftCN = draftCNDbH.getLastKey(); // Get the generalized state associated with the current last DraftCN // and initializes from it the startStates table String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN); if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0)) { domainsServerStateForLastSeqnum = MultiDomainServerState. splitGenStateToServerStates(lastSeqnumGenState); } } // Domain by domain Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator(); if (rsdi != null) { while (rsdi.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdi.next(); if (isServiceIDExcluded(rsd.getBaseDn())) continue; // for this domain, have the state in the replchangelog // where the last DraftCN update is ServerState domainServerStateForLastSeqnum; if ((domainsServerStateForLastSeqnum == null) || (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null)) { domainServerStateForLastSeqnum = new ServerState(); } else { domainServerStateForLastSeqnum = domainsServerStateForLastSeqnum.get(rsd.getBaseDn()); } // Count the number of (eligible) changes from this place // to the eligible CN (cross server) long ec = rsd.getEligibleCount( domainServerStateForLastSeqnum, crossDomainEligibleCN); // ... hum ... if ((ec>0) && (DraftCNdbIsEmpty==false)) ec--; // cumulates on domains lastDraftCN += ec; // DraftCN is empty and there are eligible updates in the repl changelog // then init first DraftCN if ((ec>0) && (firstDraftCN==0)) firstDraftCN = 1; } } return new int[]{firstDraftCN, lastDraftCN}; } private boolean isServiceIDExcluded(String serviceID) { // skip the excluded domains boolean excluded = false; for(String excludedServiceID : this.excludedServiceIDs) { if (excludedServiceID.equalsIgnoreCase(serviceID)) { excluded=true; break; } } return excluded; } } opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -66,6 +66,7 @@ private short protocolVersion = -1; private boolean suspended; private boolean shutdown; private PersistentSearch mypsearch; /** * Create a ServerWriter. @@ -80,7 +81,7 @@ { super(session, (short)-1, handler, replicationServerDomain); setName("Replication ECL Writer Thread for op:" + setName("Replication ECL Writer Thread for operation " + handler.getOperationId()); this.session = session; @@ -90,6 +91,21 @@ this.protocolVersion = handler.getProtocolVersion(); this.suspended = false; this.shutdown = false; // Look for the psearch object related to this operation , the one that // will ne notified with new entries to be returned. ECLWorkflowElement wfe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement( ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); for (PersistentSearch psearch : wfe.getPersistentSearches()) { if (psearch.getSearchOperation().toString().equals( handler.getOperationId())) { mypsearch = psearch; break; } } } /** @@ -139,18 +155,14 @@ } if (shutdown) { return; } // Not suspended doIt(); if (shutdown) { return; } suspendWriter(); } } @@ -186,7 +198,7 @@ } /** * Loop gettting changes from the domain and publishing them either to * Loop geting changes from the domain and publishing them either to * the provided session or to the ECL session interface. * @throws IOException when raised (connection closure) * @throws InterruptedException when raised @@ -213,6 +225,7 @@ { try { handler.refreshEligibleCN(); update = handler.takeECLUpdate(); } catch(DirectoryException de) @@ -273,7 +286,8 @@ throws IOException { if (debugEnabled()) TRACER.debugInfo(this + " publishes msg=[" + msg.toString() + "]"); TRACER.debugInfo(this.getName() + " publishes msg=[" + msg.toString() + "]"); if (session!=null) { @@ -281,17 +295,12 @@ } else { ECLWorkflowElement wfe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement( ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); // Notify persistent searches. for (PersistentSearch psearch : wfe.getPersistentSearches()) if (mypsearch != null) { try { Entry eclEntry = ECLSearchOperation.createEntryFromMsg(msg); psearch.processAdd(eclEntry, -1); mypsearch.processAdd(eclEntry, -1); } catch(Exception e) { @@ -299,6 +308,7 @@ ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() + " " + stackTraceToSingleLineString(e)); logError(errMessage); mypsearch.cancel(); } } } opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -87,7 +87,7 @@ public ECLUpdateMsg getNextUpdate() throws DirectoryException { return handler.getnextUpdate(); return handler.getNextECLUpdate(); } /** @@ -106,7 +106,7 @@ { MultiDomainServerState result = new MultiDomainServerState(); // Initialize start state for all running domains with empty state Iterator<ReplicationServerDomain> rsdk = this.rs.getCacheIterator(); Iterator<ReplicationServerDomain> rsdk = rs.getDomainIterator(); if (rsdk != null) { while (rsdk.hasNext()) @@ -116,7 +116,8 @@ if (rsd.getBaseDn().compareToIgnoreCase( ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0) continue; result.update(rsd.getBaseDn(), rsd.getCLElligibleState()); result.update(rsd.getBaseDn(), rsd.getEligibleState( rs.getEligibleCN())); } } return result; opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -351,7 +351,7 @@ //This method only returns the number of actual change entries, the //domain and any baseDN entries are not counted. long retNum=0; Iterator<ReplicationServerDomain> rcachei = server.getCacheIterator(); Iterator<ReplicationServerDomain> rcachei = server.getDomainIterator(); if (rcachei != null) { while (rcachei.hasNext()) @@ -541,7 +541,7 @@ Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message); } Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator(); Iterator<ReplicationServerDomain> rsdi = server.getDomainIterator(); if (rsdi != null) { while (rsdi.hasNext()) @@ -1358,7 +1358,7 @@ } // Walk through all entries and send the ones that match. Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator(); Iterator<ReplicationServerDomain> rsdi = server.getDomainIterator(); if (rsdi != null) { while (rsdi.hasNext()) opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -534,4 +534,27 @@ {} } } /** * Get or create a db to manage integer change number associated * to multidomain server state. * TODO:ECL how to manage compatibilty of this db with new domains * added or removed ? * @return the retrieved or created db. * @throws DatabaseException when a problem occurs. */ public Database getOrCreateDraftCNDb() throws DatabaseException { String stringId = "draftcndb"; // Opens the database for seqnum associated to this domain. // Create it if it does not already exist. DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(true); Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); return db; } } opends/src/server/org/opends/server/replication/server/ReplicationDraftCNKey.java
New file @@ -0,0 +1,68 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import java.io.UnsupportedEncodingException; import com.sleepycat.je.DatabaseEntry; /** * Superclass of DatabaseEntry. * Useful to create ReplicationServer keys from sequence numbers. */ public class ReplicationDraftCNKey extends DatabaseEntry { /** * Creates a new ReplicationKey from the given draft ChangeNumber. * @param draftCN The draft change number to use. */ public ReplicationDraftCNKey(int draftCN) { try { String s = String.valueOf(draftCN); int a = 16-s.length(); String sscn = new String("0000000000000000").substring(0, a) + s; this.setData(sscn.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happens, UTF-8 is always supported // TODO : add better logging } } /** * Getter for the draft change number associated with this key. * @return the draft change number associated with this key. */ public int getDraftCN() { String s = new String(this.getData()); int i = Integer.valueOf(s); return i; } } opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; @@ -62,12 +62,6 @@ { throw new Exception("no new change"); } if (this.next() == false) { cursor.close(); cursor = null; throw new Exception("no new change"); } } /** opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,6 +42,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -67,6 +68,7 @@ import org.opends.server.core.networkgroups.NetworkGroup; import org.opends.server.loggers.LogLevel; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ExternalChangeLogSession; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ReplServerStartMsg; @@ -164,6 +166,13 @@ // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled private int degradedStatusThreshold = 5000; // The handler of the draft change numbers database, the database used to // store the relation between a draft change number ('seqnum') and the // associated cookie. private DraftCNDbHandler draftCNDbHandler; // The last value generated of the draft change number. private int lastGeneratedDraftCN = 0; /** * The tracer object for the debug logger. */ @@ -1154,7 +1163,7 @@ * Returns null if none. * @return the iterator. */ public Iterator<ReplicationServerDomain> getCacheIterator() public Iterator<ReplicationServerDomain> getDomainIterator() { if (!baseDNs.isEmpty()) return baseDNs.values().iterator(); @@ -1167,7 +1176,7 @@ */ public void clearDb() { Iterator<ReplicationServerDomain> rcachei = getCacheIterator(); Iterator<ReplicationServerDomain> rcachei = getDomainIterator(); if (rcachei != null) { while (rcachei.hasNext()) @@ -1445,4 +1454,134 @@ return false; } } private ArrayList<String> excludedServiceIDs; /** * Excluded a list of domain from eligibility computation. * @param excludedServiceIDs the provided list of serviceIDs excluded from * the computation of eligibleCN. */ public void disableEligibility(ArrayList<String> excludedServiceIDs) { this.excludedServiceIDs = excludedServiceIDs; } /** * Returns the eligible CN cross domains - relies on the eligible CN from * each domain. * @return the cross domain eligible CN. */ public ChangeNumber getEligibleCN() { String debugLog = ""; // traverse the domains and get the eligible CN from each domain // store the oldest one as the cross domain eligible CN ChangeNumber eligibleCN = null; Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator(); if (rsdi != null) { while (rsdi.hasNext()) { ReplicationServerDomain domain = rsdi.next(); if (excludedServiceIDs.contains(domain.getBaseDn())) { continue; } ChangeNumber domainEligibleCN = domain.getEligibleCN(); String dates = ""; if (domainEligibleCN != null) { if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN))) { eligibleCN = domainEligibleCN; } dates = new Date(domainEligibleCN.getTime()).toString(); } debugLog += "[dn=" + domain.getBaseDn() + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]"; } } if (eligibleCN==null) { eligibleCN = new ChangeNumber(0,0,(short)0); } if (debugEnabled()) TRACER.debugInfo("In " + this + " getEligibleCN() ends with " + " the following domainEligibleCN for each domain :" + debugLog + " thus CrossDomainEligibleCN=" + eligibleCN + " ts=" + (eligibleCN!=null? new Date(eligibleCN.getTime()).toString(): null)); return eligibleCN; } /** * Get or create a handler on a Db on DraftCN for external changelog. * @return the handler. * @throws DirectoryException when needed. */ public synchronized DraftCNDbHandler getDraftCNDbHandler() throws DirectoryException { try { if (draftCNDbHandler == null) { draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv); if (draftCNDbHandler == null) return null; this.lastGeneratedDraftCN = getLastDraftChangeNumber(); } return draftCNDbHandler; } catch (Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); MessageBuilder mb = new MessageBuilder(); mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get("")); throw new DirectoryException(ResultCode.OPERATIONS_ERROR, mb.toMessage(), e); } } /** * Get the value of the first draft change number, 0 when db is empty. * @return the first value. */ public int getFirstDraftChangeNumber() { int first=0; if (draftCNDbHandler != null) first = draftCNDbHandler.getFirstKey(); return first; } /** * Get the value of the last draft change number, 0 when db is empty. * @return the last value. */ public int getLastDraftChangeNumber() { int last=0; if (draftCNDbHandler != null) last = draftCNDbHandler.getLastKey(); return last; } /** * Generate a new Draft ChangeNumber. * @return The generated Draft ChangeNUmber */ synchronized public int getNewDraftCN() { return ++lastGeneratedDraftCN; } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -63,6 +63,7 @@ import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.replication.protocol.AckMsg; import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; import org.opends.server.replication.protocol.ChangeStatusMsg; import org.opends.server.replication.protocol.ErrorMsg; import org.opends.server.replication.protocol.MonitorMsg; @@ -75,9 +76,11 @@ import org.opends.server.types.Attribute; import org.opends.server.types.AttributeBuilder; import org.opends.server.types.Attributes; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; import org.opends.server.util.TimeThread; import com.sleepycat.je.DatabaseException; /** @@ -173,6 +176,8 @@ // every n number of treated assured messages private int assuredTimeoutTimerPurgeCounter = 0; ServerState ctHeartbeatState = null; /** * Creates a new ReplicationServerDomain associated to the DN baseDn. * @@ -360,8 +365,7 @@ if ( (generationId>0) && (generationId != handler.getGenerationId()) ) { if (debugEnabled()) TRACER.debugInfo("In RS " + replicationServer.getServerId() + TRACER.debugInfo("In " + this.getName() + " for dn " + baseDn + ", update " + update.getChangeNumber().toString() + " will not be sent to replication server " + @@ -426,8 +430,7 @@ if (debugEnabled()) { if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) TRACER.debugInfo("In RS " + replicationServer.getServerId() + TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update " + update.getChangeNumber().toString() + " will not be sent to directory server " + @@ -1024,10 +1027,9 @@ { if (debugEnabled()) TRACER.debugInfo( "In RS " + this.replicationServer.getMonitorInstanceName() + " domain=" + this + " stopServer(SH)" + handler.getMonitorInstanceName() + " " + stackTraceToSingleLineString(new Exception())); "In " + this.replicationServer.getMonitorInstanceName() + " domain=" + this + " stopServer() on the server handler " + handler.getMonitorInstanceName()); /* * We must prevent deadlock on replication server domain lock, when for * instance this code is called from dying ServerReader but also dying @@ -1119,10 +1121,9 @@ { if (debugEnabled()) TRACER.debugInfo( "In RS " + this.replicationServer.getMonitorInstanceName() + " domain=" + this + " stopServer(MH)" + handler.getMonitorInstanceName() + " " + stackTraceToSingleLineString(new Exception())); "In " + this.replicationServer.getMonitorInstanceName() + " domain=" + this + " stopServer() on the message handler " + handler.getMonitorInstanceName()); /* * We must prevent deadlock on replication server domain lock, when for * instance this code is called from dying ServerReader but also dying @@ -1363,7 +1364,40 @@ try { return handler.generateIterator(changeNumber); ReplicationIterator it = handler.generateIterator(changeNumber); if (it.next()==false) { it.releaseCursor(); throw new Exception("no new change"); } return it; } catch (Exception e) { return null; } } /** * Creates and returns an iterator. * When the iterator is not used anymore, the caller MUST call the * ReplicationIterator.releaseCursor() method to free the resources * and locks used by the ReplicationIterator. * * @param serverId Identifier of the server for which the iterator is created. * @param changeNumber Starting point for the iterator. * @return the created ReplicationIterator. Null when no DB is available * for the provided server Id. */ public ReplicationIterator getIterator(short serverId, ChangeNumber changeNumber) { DbHandler handler = sourceDbHandlers.get(serverId); if (handler == null) return null; try { ReplicationIterator it = handler.generateIterator(changeNumber); return it; } catch (Exception e) { return null; @@ -1955,12 +1989,10 @@ ResetGenerationIdMsg genIdMsg) { if (debugEnabled()) { TRACER.debugInfo( "In RS " + getReplicationServer().getServerId() + "In " + this + " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n" + genIdMsg); } try { @@ -1982,14 +2014,12 @@ { // Order to take a gen id we already have, just ignore if (debugEnabled()) { TRACER.debugInfo( "In RS " + getReplicationServer().getServerId() "In " + this + " Reset generation id requested for baseDn " + baseDn + " but generation id was already " + this.generationId + ":\n" + genIdMsg); } } // If we are the first replication server warned, // then forwards the reset message to the remote replication servers @@ -2002,7 +2032,7 @@ rsHandler.setGenerationId(newGenId); if (senderHandler.isDataServer()) { rsHandler.forwardGenerationIdToRS(genIdMsg); rsHandler.send(genIdMsg); } } catch (IOException e) { @@ -2158,7 +2188,7 @@ } /** * Clears the Db associated with that cache. * Clears the Db associated with that domain. */ public void clearDbs() { @@ -2181,12 +2211,6 @@ } } stopDbHandlers(); if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + " The source db handler has been cleared"); } try { @@ -2471,11 +2495,6 @@ */ public void receivesMonitorDataResponse(MonitorMsg msg) { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + "Receiving " + msg + " from " + msg.getsenderID()); try { synchronized (monitorDataLock) @@ -2543,7 +2562,7 @@ { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + "In " + this + " baseDn=" + baseDn + " Processed msg from " + msg.getsenderID() + " New monitor data: " + wrkMonitorData.toString()); @@ -2819,24 +2838,29 @@ * Return the state that contain for each server the time of eligibility. * @return the state. */ public ServerState getHeartbeatState() public ServerState getChangeTimeHeartbeatState() { // TODO:ECL Eligility must be supported return this.getDbServerState(); if (ctHeartbeatState == null) { ctHeartbeatState = this.getDbServerState().duplicate(); } return ctHeartbeatState; } /** * TODO: code cleaning - remove this method. * Computes the change number eligible to the ECL. * @return null if the domain does not play in eligibility. */ public ChangeNumber computeEligibleCN() public ChangeNumber computeEligibleCN2() { ChangeNumber elligibleCN = null; ServerState heartbeatState = getHeartbeatState(); ChangeNumber eligibleCN = null; ServerState heartbeatState = getChangeTimeHeartbeatState(); if (heartbeatState==null) return null; // compute elligible CN // compute eligible CN ServerState hbState = heartbeatState.duplicate(); Iterator<Short> it = hbState.iterator(); @@ -2850,70 +2874,87 @@ if (TimeThread.getTime()-storedCN.getTime()>2000) { if (debugEnabled()) TRACER.debugInfo( "For RSD." + this.baseDn + " Server " + sid TRACER.debugInfo("In " + this.getName() + " Server " + sid + " is not considered for eligibility ... potentially down"); continue; } if ((elligibleCN == null) || (storedCN.older(elligibleCN))) if ((eligibleCN == null) || (storedCN.older(eligibleCN))) { elligibleCN = storedCN; eligibleCN = storedCN; } } if (debugEnabled()) TRACER.debugInfo( "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN); return elligibleCN; TRACER.debugInfo("In " + this.getName() + " computeEligibleCN() returns " + eligibleCN); return eligibleCN; } /** * Computes the eligible server state by minimizing the dbServerState and the * elligibleCN. * Computes the eligible server state for the domain. * Consists in taking the most recent change from the dbServerState and the * eligibleCN. * @param eligibleCN The provided eligibleCN. * @return The computed eligible server state. */ public ServerState getCLElligibleState() public ServerState getEligibleState(ChangeNumber eligibleCN) { // ChangeNumber elligibleCN = computeEligibleCN(); ServerState res = new ServerState(); ServerState dbState = this.getDbServerState(); res = dbState; ServerState result = new ServerState(); /* TODO:ECL Eligibility is not yet implemented Iterator<Short> it = dbState.iterator(); while (it.hasNext()) ServerState dbState = this.getDbServerState(); result = dbState.duplicate(); if (eligibleCN != null) { Short sid = it.next(); DbHandler h = sourceDbHandlers.get(sid); ChangeNumber dbCN = dbState.getMaxChangeNumber(sid); try Iterator<Short> it = dbState.iterator(); while (it.hasNext()) { if ((elligibleCN!=null)&&(elligibleCN.older(dbCN))) Short sid = it.next(); DbHandler h = sourceDbHandlers.get(sid); ChangeNumber dbCN = dbState.getMaxChangeNumber(sid); try { // some CN exist in the db newer than elligible CN ReplicationIterator ri = h.generateIterator(elligibleCN); ChangeNumber newCN = ri.getCurrentCN(); res.update(newCN); ri.releaseCursor(); if (eligibleCN.older(dbCN)) { // some CN exist in the db newer than eligible CN // let's get it ReplicationIterator ri = h.generateIterator(eligibleCN); try { if ((ri != null) && (ri.getChange()!=null)) { ChangeNumber newCN = ri.getChange().getChangeNumber(); result.update(newCN); } } finally { ri.releaseCursor(); ri = null; } } else { // no CN exist in the db newer than elligible CN result.update(dbCN); } } else catch(Exception e) { // no CN exist in the db newer than elligible CN res.update(dbCN); Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get( " " + stackTraceToSingleLineString(e)); logError(errMessage); TRACER.debugCaught(DebugLogLevel.ERROR, e); } } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } */ if (debugEnabled()) TRACER.debugInfo("In " + this.getName() + " getCLElligibleState returns:" + res); return res; TRACER.debugInfo("In " + this + " getEligibleState() result is " + result); return result; } /** @@ -2930,4 +2971,206 @@ } return domainStartState; } /** * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat * state. * For each DS, take the oldest CN from the changetime hearbeat state * and from the changelog db last CN. Can be null. * @return the eligible CN. */ public ChangeNumber getEligibleCN() { ChangeNumber eligibleCN = null; for (DbHandler db : sourceDbHandlers.values()) { // Consider this producer (DS/db). short sid = db.getServerId(); ChangeNumber changelogLastCN = db.getLastChange(); if (changelogLastCN != null) { if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN))) { eligibleCN = changelogLastCN; } } ChangeNumber heartbeatLastDN = getChangeTimeHeartbeatState().getMaxChangeNumber(sid); if ((heartbeatLastDN != null) && ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN)))) { eligibleCN = heartbeatLastDN; } } if (debugEnabled()) TRACER.debugInfo( "In " + this.getName() + " getEligibleCN() returns result =" + eligibleCN); return eligibleCN; } /** * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp) * value received, and forwarding the message to the other RSes. * @param senderHandler The handler for the server that sent the heartbeat. * @param msg The message to process. */ public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, ChangeTimeHeartbeatMsg msg ) { try { // Acquire lock on domain (see more details in comment of start() method // of ServerHandler) lock(); } catch (InterruptedException ex) { // Try doing job anyway... } try { storeReceivedCTHeartbeat(msg.getChangeNumber()); // If we are the first replication server warned, // then forwards the reset message to the remote replication servers for (ReplicationServerHandler rsHandler : replicationServers.values()) { try { // After we'll have sent the message , the remote RS will adopt // the new genId if (senderHandler.isDataServer()) { rsHandler.send(msg); } } catch (IOException e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName())); stopServer(rsHandler); } } } finally { release(); } } /** * Store a change time value received from a data server. * @param cn The provided change time. */ public void storeReceivedCTHeartbeat(ChangeNumber cn) { // TODO:May be we can spare processing by only storing CN (timestamp) // instead of a server state. getChangeTimeHeartbeatState().update(cn); /* if (debugEnabled()) { Set<String> ss = ctHeartbeatState.toStringSet(); String dss = ""; for (String s : ss) { dss = dss + " \\ " + s; } TRACER.debugInfo("In " + this.getName() + " " + dss); } */ } /** * This methods count the changes, server by server : * - from a start point (cn taken from the provided startState) * - to an end point (the provided endCN). * @param startState The provided start server state. * @param endCN The provided end change number. * @return The number of changes between startState and endCN. */ public long getEligibleCount(ServerState startState, ChangeNumber endCN) { long res = 0; ReplicationIterator ri=null; // Parses the dbState of the domain , server by server ServerState dbState = this.getDbServerState(); Iterator<Short> it = dbState.iterator(); while (it.hasNext()) { // for each server Short sid = it.next(); DbHandler h = sourceDbHandlers.get(sid); try { // Set on the change related to the startState ChangeNumber startCN = null; try { ri = h.generateIterator(startState.getMaxChangeNumber(sid)); startCN = ri.getChange().getChangeNumber(); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); // no change found (purge from CL) startCN = null; } finally { if (ri!=null) { ri.releaseCursor(); ri = null; } } if (startCN != null) { // Set on the change related to the endCN ChangeNumber upperCN; try { // Build a changenumber for this very server, with the timestamp // of the endCN ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid); ri = h.generateIterator(f); upperCN = ri.getChange().getChangeNumber(); } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); // no new change upperCN = h.getLastChange(); } finally { if (ri!=null) { ri.releaseCursor(); ri = null; } } long diff = upperCN.getSeqnum() - startCN.getSeqnum() + 1; res += diff; } // TODO:ECL We should compute if changenumber.seqnum has turned ! } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } return res; } } opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -783,4 +783,19 @@ { return serverAddressURL; } /** * Sends a message containing a generationId to a peer server. * The peer is expected to be a replication server. * * @param msg The GenerationIdMessage message to be sent. * @throws IOException When it occurs while sending the message, * */ public void forwardReplicationMsg(ReplicationMsg msg) throws IOException { session.publish(msg); } } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -46,13 +46,13 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.AckMsg; import org.opends.server.replication.protocol.ErrorMsg; import org.opends.server.replication.protocol.HeartbeatThread; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplServerStartMsg; import org.opends.server.replication.protocol.ResetGenerationIdMsg; import org.opends.server.replication.protocol.RoutableMsg; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.replication.protocol.StartMsg; @@ -389,14 +389,12 @@ } /** * Sends a message containing a generationId to a peer server. * The peer is expected to be a replication server. * * @param msg The GenerationIdMessage message to be sent. * Sends a message. * @param msg The message to be sent. * @throws IOException When it occurs while sending the message, * */ public void forwardGenerationIdToRS(ResetGenerationIdMsg msg) public void send(ReplicationMsg msg) throws IOException { session.publish(msg); @@ -1352,5 +1350,4 @@ inStartECLSessionMsg.toString()); } } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -52,6 +52,7 @@ import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.MonitorMsg; import org.opends.server.replication.protocol.MonitorRequestMsg; import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.ChangeStatusMsg; @@ -127,7 +128,8 @@ if (debugEnabled()) { TRACER.debugInfo(this.getName() + " receives " + msg); TRACER.debugInfo("In " + replicationServerDomain + " " + getName() + " receives " + msg); } if (msg instanceof AckMsg) @@ -282,6 +284,11 @@ { MonitorMsg replServerMonitorMsg = (MonitorMsg) msg; handler.process(replServerMonitorMsg); } else if (msg instanceof ChangeTimeHeartbeatMsg) { ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg; replicationServerDomain.processChangeTimeHeartbeatMsg(handler, cthbMsg); } else if (msg == null) { /* opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
New file @@ -0,0 +1,185 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.service; import org.opends.server.api.DirectoryThread; import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.util.TimeThread; import java.io.IOException; /** * This thread publishes a heartbeat message on a given protocol session at * regular intervals when there are no other replication messages being * published. */ public class CTHeartbeatPublisherThread extends DirectoryThread { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * For test purposes only to simulate loss of heartbeats. */ static private boolean heartbeatsDisabled = false; /** * The session on which heartbeats are to be sent. */ private ProtocolSession session; /** * The time in milliseconds between heartbeats. */ private long heartbeatInterval; private short serverId; /** * Set this to stop the thread. */ private Boolean shutdown = false; private final Object shutdown_lock = new Object(); /** * Create a heartbeat thread. * @param threadName The name of the heartbeat thread. * @param session The session on which heartbeats are to be sent. * @param heartbeatInterval The interval between heartbeats sent * (in milliseconds). * @param serverId The serverId of the sender domain. */ public CTHeartbeatPublisherThread(String threadName, ProtocolSession session, long heartbeatInterval, short serverId) { super(threadName); this.session = session; this.heartbeatInterval = heartbeatInterval; this.serverId = serverId; } /** * {@inheritDoc} */ @Override public void run() { try { if (debugEnabled()) { TRACER.debugInfo(getName() + " is starting, interval is %d", heartbeatInterval); } while (!shutdown) { long now = System.currentTimeMillis(); ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg( new ChangeNumber(TimeThread.getTime(),0, serverId)); if (now > session.getLastPublishTime() + heartbeatInterval) { if (!heartbeatsDisabled) { session.publish(ctHeartbeatMsg); } } try { long sleepTime = session.getLastPublishTime() + heartbeatInterval - now; if (sleepTime <= 0) { sleepTime = heartbeatInterval; } synchronized (shutdown_lock) { if (!shutdown) { shutdown_lock.wait(sleepTime); } } } catch (InterruptedException e) { // Keep looping. } } } catch (IOException e) { if (debugEnabled()) { TRACER.debugInfo(getName() + "could not send a heartbeat." + e.getMessage() + e.toString()); } // This will be caught in another thread. } finally { if (debugEnabled()) { TRACER.debugInfo(getName()+" is exiting."); } } } /** * Call this method to stop the thread. * This method is blocking until the thread has stopped. */ public void shutdown() { synchronized (shutdown_lock) { shutdown = true; shutdown_lock.notifyAll(); } } /** * For testing purposes only to simulate loss of heartbeats. * @param heartbeatsDisabled Set true to prevent heartbeats from being sent. */ public static void setHeartbeatsDisabled(boolean heartbeatsDisabled) { CTHeartbeatPublisherThread.heartbeatsDisabled = heartbeatsDisabled; } } opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -116,8 +116,8 @@ // performPhaseOneHandshake method. private String tmpReadableServerName = null; /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. * The expected duration in milliseconds between heartbeats received * from the replication server. Zero means heartbeats are off. */ private long heartbeatInterval = 0; /** @@ -142,6 +142,16 @@ // Same group id poller thread private SameGroupIdPoller sameGroupIdPoller = null; /** * The thread that publishes messages to the RS containing the current * change time of this DS. */ private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null; /** * The expected period in milliseconds between these messages are sent * to the replication server. Zero means heartbeats are off. */ private long changeTimeHeartbeatSendInterval = 0; /* * Properties for the last topology info received from the network. */ @@ -159,24 +169,27 @@ * * @param replicationDomain The replication domain that is creating us. * @param state The ServerState that should be used by this broker * when negotiating the session with the replicationServer. * when negotiating the session with the replicationServer. * @param baseDn The base DN that should be used by this broker * when negotiating the session with the replicationServer. * when negotiating the session with the replicationServer. * @param serverId The server ID that should be used by this broker * when negotiating the session with the replicationServer. * when negotiating the session with the replicationServer. * @param window The size of the send and receive window to use. * @param heartbeatInterval The interval between heartbeats requested of the * replicationServer, or zero if no heartbeats are requested. * * @param generationId The generationId for the server associated to the * provided serverId and for the domain associated to the provided baseDN. * @param heartbeatInterval The interval (in ms) between heartbeats requested * from the replicationServer, or zero if no heartbeats are requested. * @param replSessionSecurity The session security configuration. * @param groupId The group id of our domain. * @param changeTimeHeartbeatInterval The interval (in ms) between Change * time heartbeats are sent to the RS, * or zero if no CN heartbeat shoud be sent. */ public ReplicationBroker(ReplicationDomain replicationDomain, ServerState state, String baseDn, short serverId, int window, long generationId, long heartbeatInterval, ReplSessionSecurity replSessionSecurity, byte groupId) ReplSessionSecurity replSessionSecurity, byte groupId, long changeTimeHeartbeatInterval) { this.domain = replicationDomain; this.baseDn = baseDn; @@ -190,6 +203,7 @@ this.maxRcvWindow = window; this.maxRcvWindow = window; this.halfRcvWindow = window /2; this.changeTimeHeartbeatSendInterval = changeTimeHeartbeatInterval; } /** @@ -392,7 +406,8 @@ // Stop any existing poller and heartbeat monitor from a previous session. stopSameGroupIdPoller(); stopHeartBeat(); stopRSHeartBeatMonitoring(); stopChangeTimeHeartBeatPublishing(); boolean newServerWithSameGroupId = false; synchronized (connectPhaseLock) @@ -508,7 +523,8 @@ logError(message); startSameGroupIdPoller(); } startHeartBeat(); startRSHeartBeatMonitoring(); startChangeTimeHeartBeatPublishing(); } else { // Detected new RS with our group id: log disconnection to @@ -1025,7 +1041,7 @@ // Send our Start Session StartECLSessionMsg startECLSessionMsg = null; startECLSessionMsg = new StartECLSessionMsg(); startECLSessionMsg.setOperationId(Short.toString(serverId)); startECLSessionMsg.setOperationId("-1"); session.publish(startECLSessionMsg); /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ? @@ -1428,7 +1444,7 @@ /** * Start the heartbeat monitor thread. */ private void startHeartBeat() private void startRSHeartBeatMonitoring() { // Start a heartbeat monitor thread. if (heartbeatInterval > 0) @@ -1467,7 +1483,7 @@ /** * Stop the heartbeat monitor thread. */ void stopHeartBeat() void stopRSHeartBeatMonitoring() { if (heartbeatMonitor != null) { @@ -1753,7 +1769,8 @@ + " domain " + baseDn); } stopSameGroupIdPoller(); stopHeartBeat(); stopRSHeartBeatMonitoring(); stopChangeTimeHeartBeatPublishing(); replicationServer = "stopped"; shutdown = true; connected = false; @@ -2156,4 +2173,38 @@ { return connectionError; } /** * Starts publishing to the RS the current timestamp used in this server. */ public void startChangeTimeHeartBeatPublishing() { // Start a CN heartbeat thread. if (changeTimeHeartbeatSendInterval > 0) { ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( "Replication CN Heartbeat Thread started for " + baseDn + " with " + getReplicationServer(), session, changeTimeHeartbeatSendInterval, serverId); ctHeartbeatPublisherThread.start(); } else { TRACER.debugInfo(this + " is not configured to send CN heartbeat interval"); } } /** * Stops publishing to the RS the current timestamp used in this server. */ public void stopChangeTimeHeartBeatPublishing() { if (ctHeartbeatPublisherThread != null) { ctHeartbeatPublisherThread.shutdown(); ctHeartbeatPublisherThread = null; } } } opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2292,13 +2292,16 @@ * @param heartbeatInterval The heartbeatInterval that should be used * to check the availability of the replication * servers. * @param changetimeHeartbeatInterval The interval used to send change * time heartbeat to the replication server. * * @throws ConfigException If the DirectoryServer configuration was * incorrect. */ public void startPublishService( Collection<String> replicationServers, int window, long heartbeatInterval) throws ConfigException long heartbeatInterval, long changetimeHeartbeatInterval) throws ConfigException { if (broker == null) { @@ -2311,7 +2314,8 @@ getGenerationID(), heartbeatInterval, new ReplSessionSecurity(), getGroupId()); getGroupId(), changetimeHeartbeatInterval); broker.start(replicationServers); opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
@@ -37,12 +37,12 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -81,7 +81,6 @@ import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; @@ -94,6 +93,7 @@ import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.FilterType; import org.opends.server.types.Modification; import org.opends.server.types.ObjectClass; import org.opends.server.types.Privilege; @@ -107,7 +107,6 @@ import org.opends.server.types.operation.SearchReferenceSearchOperation; import org.opends.server.util.Base64; import org.opends.server.util.ServerConstants; import org.opends.server.util.TimeThread; @@ -136,22 +135,12 @@ // The associated DN. private DN rootBaseDN; // The cookie received in the ECL request control coming along // with the request. MultiDomainServerState requestCookie = null; /** * The replication server in which the search on ECL is to be performed. */ protected ReplicationServer replicationServer; /** * Indicates whether we should actually process the search. This should * only be false if it's a persistent search with changesOnly=true. */ protected boolean changesOnly; /** * The client connection for the search operation. */ protected ClientConnection clientConnection; @@ -177,6 +166,7 @@ private HashSet<String> supportedControls; // The set of supported features for this WE // TODO: any special feature to be implemented for an ECL search operation ? private HashSet<String> supportedFeatures; String privateDomainsBaseDN; @@ -202,11 +192,12 @@ ObjectClass topOC = DirectoryServer.getObjectClass(OC_TOP, true); eclObjectClasses.put(topOC, OC_TOP); ObjectClass eclEntryOC = DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true); true); eclObjectClasses.put(eclEntryOC, OC_CHANGELOG_ENTRY); // Define an empty sets for the supported controls and features. // FIXME:ECL Decide if ServerSideControl and VLV are supported supportedControls = new HashSet<String>(0); supportedControls.add(ServerConstants.OID_SERVER_SIDE_SORT_REQUEST_CONTROL); supportedControls.add(ServerConstants.OID_VLV_REQUEST_CONTROL); @@ -226,57 +217,79 @@ * if this operation should be cancelled */ public void processECLSearch(ECLWorkflowElement wfe) throws CanceledOperationException throws CanceledOperationException { boolean executePostOpPlugins = false; this.replicationServer = wfe.getReplicationServer(); clientConnection = getClientConnection(); // Get the plugin config manager that will be used for invoking plugins. PluginConfigManager pluginConfigManager = DirectoryServer.getPluginConfigManager(); changesOnly = false; // Check for a request to cancel this operation. checkIfCanceled(false); // Create a labeled block of code that we can break out of if a problem is // detected. searchProcessing: searchProcessing: { replicationServer = wfe.getReplicationServer(); clientConnection = getClientConnection(); startECLSessionMsg = new StartECLSessionMsg(); // Set default behavior as "from draft change number". // "from cookie" is set only when cookie is provided. startECLSessionMsg.setECLRequestType( StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER); // Set a string operationid that will help correlate any error message // logged for this operation with the 'real' client operation. startECLSessionMsg.setOperationId(this.toString()); // Set a list of excluded domains (also exclude 'cn=changelog' itself) ArrayList<String> excludedDomains = MultimasterReplication.getPrivateDomains(); if (!excludedDomains.contains(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); startECLSessionMsg.setExcludedDNs(excludedDomains); // Test existence of the RS - normally should always be here if (replicationServer == null) { setResultCode(ResultCode.OPERATIONS_ERROR); appendErrorMessage(ERR_SEARCH_BASE_DOESNT_EXIST.get( String.valueOf(baseDN))); break searchProcessing; } // Process the search base and filter to convert them from their raw forms // as provided by the client to the forms required for the rest of the // search processing. baseDN = getBaseDN(); filter = getFilter(); if ((baseDN == null) || (filter == null)){ break searchProcessing; } // Check to see if there are any controls in the request. If so, then // see if there is any special processing required. // Analyse controls - including the cookie control try { this.requestCookie = null; handleRequestControls(); if (this.requestCookie == null) { setResponseData(new DirectoryException( ResultCode.OPERATIONS_ERROR, Message.raw(Category.SYNC, Severity.FATAL_ERROR, "Cookie control expected"))); break searchProcessing; } } catch (DirectoryException de) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, de); } setResponseData(de); break searchProcessing; } // Process filter - extract draft change number (seqnum) conditions try { evaluateFilter(startECLSessionMsg, this.getFilter()); } catch (DirectoryException de) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, de); setResponseData(de); break searchProcessing; } @@ -287,7 +300,7 @@ // Invoke the pre-operation search plugins. executePostOpPlugins = true; PluginResult.PreOperation preOpResult = pluginConfigManager.invokePreOperationSearchPlugins(this); pluginConfigManager.invokePreOperationSearchPlugins(this); if (!preOpResult.continueProcessing()) { setResultCode(preOpResult.getResultCode()); @@ -297,26 +310,12 @@ break searchProcessing; } // Check for a request to cancel this operation. checkIfCanceled(false); // Test existence of the RS if (replicationServer == null) { setResultCode(ResultCode.OPERATIONS_ERROR); appendErrorMessage(ERR_SEARCH_BASE_DOESNT_EXIST.get( String.valueOf(baseDN))); break searchProcessing; } // We'll set the result code to "success". If a problem occurs, then it // will be overwritten. // Be optimistic by default. setResultCode(ResultCode.SUCCESS); // If there's a persistent search, then register it with the server. if (persistentSearch != null) { @@ -332,9 +331,7 @@ catch (DirectoryException de) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, de); } setResponseData(de); @@ -343,7 +340,6 @@ persistentSearch.cancel(); setSendResponse(true); } break searchProcessing; } catch (CanceledOperationException coe) @@ -353,31 +349,25 @@ persistentSearch.cancel(); setSendResponse(true); } throw coe; } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } setResultCode(DirectoryServer.getServerErrorResultCode()); appendErrorMessage(ERR_SEARCH_BACKEND_EXCEPTION.get( getExceptionMessage(e))); getExceptionMessage(e))); if (persistentSearch != null) { persistentSearch.cancel(); setSendResponse(true); } break searchProcessing; } } // Check for a request to cancel this operation. checkIfCanceled(false); @@ -385,7 +375,7 @@ if (executePostOpPlugins) { PluginResult.PostOperation postOpResult = pluginConfigManager.invokePostOperationSearchPlugins(this); pluginConfigManager.invokePostOperationSearchPlugins(this); if (!postOpResult.continueProcessing()) { setResultCode(postOpResult.getResultCode()); @@ -398,13 +388,13 @@ /** * Handles any controls contained in the request. * Handles any controls contained in the request - including the cookie ctrl. * * @throws DirectoryException If there is a problem with any of the request * controls. */ protected void handleRequestControls() throws DirectoryException throws DirectoryException { List<Control> requestControls = getRequestControls(); if ((requestControls != null) && (! requestControls.isEmpty())) @@ -414,22 +404,28 @@ Control c = requestControls.get(i); String oid = c.getOID(); if (! AccessControlConfigManager.getInstance(). getAccessControlHandler().isAllowed(baseDN, this, c)) getAccessControlHandler().isAllowed(baseDN, this, c)) { throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS, ERR_CONTROL_INSUFFICIENT_ACCESS_RIGHTS.get(oid)); ERR_CONTROL_INSUFFICIENT_ACCESS_RIGHTS.get(oid)); } if (oid.equals(OID_ECL_COOKIE_EXCHANGE_CONTROL)) { ExternalChangelogRequestControl eclControl = getRequestControl(ExternalChangelogRequestControl.DECODER); this.requestCookie = eclControl.getCookie(); MultiDomainServerState cookie = eclControl.getCookie(); if (cookie!=null) { startECLSessionMsg.setECLRequestType( StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE); startECLSessionMsg.setCrossDomainServerState(cookie.toString()); } } else if (oid.equals(OID_LDAP_ASSERTION)) { LDAPAssertionRequestControl assertControl = getRequestControl(LDAPAssertionRequestControl.DECODER); getRequestControl(LDAPAssertionRequestControl.DECODER); try { @@ -449,20 +445,20 @@ } throw new DirectoryException(de.getResultCode(), ERR_SEARCH_CANNOT_GET_ENTRY_FOR_ASSERTION.get( de.getMessageObject())); ERR_SEARCH_CANNOT_GET_ENTRY_FOR_ASSERTION.get( de.getMessageObject())); } if (entry == null) { throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_SEARCH_NO_SUCH_ENTRY_FOR_ASSERTION.get()); ERR_SEARCH_NO_SUCH_ENTRY_FOR_ASSERTION.get()); } if (! assertionFilter.matchesEntry(entry)) { throw new DirectoryException(ResultCode.ASSERTION_FAILED, ERR_SEARCH_ASSERTION_FAILED.get()); ERR_SEARCH_ASSERTION_FAILED.get()); } } catch (DirectoryException de) @@ -478,8 +474,8 @@ } throw new DirectoryException(ResultCode.PROTOCOL_ERROR, ERR_SEARCH_CANNOT_PROCESS_ASSERTION_FILTER.get( de.getMessageObject()), de); ERR_SEARCH_CANNOT_PROCESS_ASSERTION_FILTER.get( de.getMessageObject()), de); } } else if (oid.equals(OID_PROXIED_AUTH_V1)) @@ -489,11 +485,11 @@ if (! clientConnection.hasPrivilege(Privilege.PROXIED_AUTH, this)) { throw new DirectoryException(ResultCode.AUTHORIZATION_DENIED, ERR_PROXYAUTH_INSUFFICIENT_PRIVILEGES.get()); ERR_PROXYAUTH_INSUFFICIENT_PRIVILEGES.get()); } ProxiedAuthV1Control proxyControl = getRequestControl(ProxiedAuthV1Control.DECODER); getRequestControl(ProxiedAuthV1Control.DECODER); Entry authorizationEntry = proxyControl.getAuthorizationEntry(); setAuthorizationEntry(authorizationEntry); @@ -513,11 +509,11 @@ if (! clientConnection.hasPrivilege(Privilege.PROXIED_AUTH, this)) { throw new DirectoryException(ResultCode.AUTHORIZATION_DENIED, ERR_PROXYAUTH_INSUFFICIENT_PRIVILEGES.get()); ERR_PROXYAUTH_INSUFFICIENT_PRIVILEGES.get()); } ProxiedAuthV2Control proxyControl = getRequestControl(ProxiedAuthV2Control.DECODER); getRequestControl(ProxiedAuthV2Control.DECODER); Entry authorizationEntry = proxyControl.getAuthorizationEntry(); setAuthorizationEntry(authorizationEntry); @@ -536,15 +532,17 @@ getRequestControl(PersistentSearchControl.DECODER); persistentSearch = new PersistentSearch(this, psearchControl.getChangeTypes(), psearchControl.getReturnECs()); psearchControl.getChangeTypes(), psearchControl.getReturnECs()); // If we're only interested in changes, then we don't actually want // to process the search now. if (psearchControl.getChangesOnly()) { changesOnly = true; } if (!psearchControl.getChangesOnly()) startECLSessionMsg.setPersistent( StartECLSessionMsg.PERSISTENT); else startECLSessionMsg.setPersistent( StartECLSessionMsg.PERSISTENT_CHANGES_ONLY); } else if (oid.equals(OID_LDAP_SUBENTRIES)) { @@ -553,7 +551,7 @@ else if (oid.equals(OID_MATCHED_VALUES)) { MatchedValuesControl matchedValuesControl = getRequestControl(MatchedValuesControl.DECODER); getRequestControl(MatchedValuesControl.DECODER); setMatchedValuesControl(matchedValuesControl); } else if (oid.equals(OID_ACCOUNT_USABLE_CONTROL)) @@ -569,20 +567,19 @@ setVirtualAttributesOnly(true); } else if (oid.equals(OID_GET_EFFECTIVE_RIGHTS) && DirectoryServer.isSupportedControl(OID_GET_EFFECTIVE_RIGHTS)) DirectoryServer.isSupportedControl(OID_GET_EFFECTIVE_RIGHTS)) { // Do nothing here and let AciHandler deal with it. } // NYI -- Add support for additional controls. // TODO: Add support for additional controls, including VLV else if (c.isCritical()) { if ((replicationServer == null) || (! supportsControl(oid))) { throw new DirectoryException( ResultCode.UNAVAILABLE_CRITICAL_EXTENSION, ERR_SEARCH_UNSUPPORTED_CRITICAL_CONTROL.get(oid)); ResultCode.UNAVAILABLE_CRITICAL_EXTENSION, ERR_SEARCH_UNSUPPORTED_CRITICAL_CONTROL.get(oid)); } } } @@ -592,35 +589,12 @@ private void processSearch() throws DirectoryException, CanceledOperationException { startECLSessionMsg = new StartECLSessionMsg(); startECLSessionMsg.setECLRequestType( StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE); startECLSessionMsg.setChangeNumber( new ChangeNumber(TimeThread.getTime(),(short)0, (short)0)); startECLSessionMsg.setCrossDomainServerState(requestCookie.toString()); if (debugEnabled()) TRACER.debugInfo( " processSearch toString=[" + toString() + "] opid=[" + startECLSessionMsg.getOperationId() + "]"); if (persistentSearch==null) startECLSessionMsg.setPersistent(StartECLSessionMsg.NON_PERSISTENT); else if (!changesOnly) startECLSessionMsg.setPersistent(StartECLSessionMsg.PERSISTENT); else startECLSessionMsg.setPersistent( StartECLSessionMsg.PERSISTENT_CHANGES_ONLY); startECLSessionMsg.setFirstDraftChangeNumber(0); startECLSessionMsg.setLastDraftChangeNumber(0); // Help correlate with access log with the format: "conn=x op=y msgID=z" startECLSessionMsg.setOperationId( "conn="+String.valueOf(this.getConnectionID()) + " op="+String.valueOf(this.getOperationID()) + " msgID="+String.valueOf(getOperationID())); startECLSessionMsg.setExcludedDNs( MultimasterReplication.getPrivateDomains()); // Start session // Start a specific ECL session eclSession = replicationServer.createECLSession(startECLSessionMsg); if (!getScope().equals(SearchScope.SINGLE_LEVEL)) @@ -718,9 +692,9 @@ private boolean matchFilter(Entry entry) throws DirectoryException { boolean ms = entry.matchesBaseAndScope(getBaseDN(), getScope()); boolean mf = getFilter().matchesEntry(entry); return (ms && mf); boolean baseScopeMatch = entry.matchesBaseAndScope(getBaseDN(), getScope()); boolean filterMatch = getFilter().matchesEntry(entry); return (baseScopeMatch && filterMatch); } /** @@ -755,95 +729,99 @@ null, // real time current entry null, // real time attrs names null, // hist entry attributes -1, // TODO:ECL G Good changelog draft compat. addMsg.getSeqnum() eclmsg.getDraftChangeNumber(), "add"); } else if (msg instanceof ModifyMsg) { ModifyMsg modMsg = (ModifyMsg)msg; InternalClientConnection conn = InternalClientConnection.getRootConnection(); try if (msg instanceof ModifyMsg) { // Map the modMsg modifications to an LDIF string // for the 'changes' attribute of the CL entry ModifyOperation modifyOperation = (ModifyOperation)modMsg.createOperation(conn); String LDIFchanges = modToLDIF(modifyOperation.getModifications()); ModifyMsg modMsg = (ModifyMsg)msg; InternalClientConnection conn = InternalClientConnection.getRootConnection(); try { // Map the modMsg modifications to an LDIF string // for the 'changes' attribute of the CL entry ModifyOperation modifyOperation = (ModifyOperation)modMsg.createOperation(conn); String LDIFchanges = modToLDIF(modifyOperation.getModifications()); // TODO:ECL G Good changelog draft compat. Hist entry attributes // ArrayList<RawAttribute> attributes = modMsg.getEntryAttributes(); // TODO:ECL Hist entry attributes // ArrayList<RawAttribute> attributes = modMsg.getEntryAttributes(); clEntry = createChangelogEntry( eclmsg.getServiceId(), eclmsg.getCookie().toString(), DN.decode(modMsg.getDn()), modMsg.getChangeNumber(), LDIFchanges, modMsg.getUniqueId(), null, // real time current entry null, // real time attrs names null, // hist entry attributes eclmsg.getDraftChangeNumber(), "modify"); } catch(Exception e) { // Exceptions raised by createOperation for example throw new DirectoryException(ResultCode.OTHER, Message.raw(Category.SYNC, Severity.NOTICE, " Server fails to create entry: "),e); } } else if (msg instanceof ModifyDNMsg) { ModifyDNMsg modDNMsg = (ModifyDNMsg)msg; clEntry = createChangelogEntry( eclmsg.getServiceId(), eclmsg.getCookie().toString(), DN.decode(modMsg.getDn()), modMsg.getChangeNumber(), LDIFchanges, modMsg.getUniqueId(), DN.decode(modDNMsg.getDn()), modDNMsg.getChangeNumber(), null, modDNMsg.getUniqueId(), null, // real time current entry null, // real time attrs names null, // hist entry attributes -1, // TODO:ECL G Good changelog draft compat. modMsg.getSeqnum() "modify"); eclmsg.getDraftChangeNumber(), "modrdn"); Attribute a = Attributes.create("newrdn", modDNMsg.getNewRDN()); clEntry.addAttribute(a, null); Attribute b = Attributes.create("newsuperior", modDNMsg.getNewSuperior()); clEntry.addAttribute(b, null); Attribute c = Attributes.create("deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn())); clEntry.addAttribute(c, null); } catch(Exception e) else if (msg instanceof DeleteMsg) { // FIXME:ECL Handle error when createOperation raise DataFormatExceptin DeleteMsg delMsg = (DeleteMsg)msg; /* TODO:ECL Entry attributes for DEL op ArrayList<RawAttribute> rattributes = new ArrayList<RawAttribute>(); ArrayList<RawAttribute> rattributes = delMsg.getEntryAttributes(); // Map the entry attributes of the DelMsg to an LDIF string // for the 'deletedentryattributes' attribute of the CL entry String delAttrs = delMsgToLDIFString(rattributes); */ clEntry = createChangelogEntry( eclmsg.getServiceId(), eclmsg.getCookie().toString(), DN.decode(delMsg.getDn()), delMsg.getChangeNumber(), null, delMsg.getUniqueId(), null, null, null, //rattributes, eclmsg.getDraftChangeNumber(), "delete"); } } else if (msg instanceof ModifyDNMsg) { ModifyDNMsg modDNMsg = (ModifyDNMsg)msg; clEntry = createChangelogEntry( eclmsg.getServiceId(), eclmsg.getCookie().toString(), DN.decode(modDNMsg.getDn()), modDNMsg.getChangeNumber(), null, modDNMsg.getUniqueId(), null, // real time current entry null, // real time attrs names null, // hist entry attributes -1, // TODO:ECL G Good changelog draft compat. modDNMsg.getSeqnum() "modrdn"); Attribute a = Attributes.create("newrdn", modDNMsg.getNewRDN()); clEntry.addAttribute(a, null); Attribute b = Attributes.create("newsuperior", modDNMsg.getNewSuperior()); clEntry.addAttribute(b, null); Attribute c = Attributes.create("deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn())); clEntry.addAttribute(c, null); } else if (msg instanceof DeleteMsg) { DeleteMsg delMsg = (DeleteMsg)msg; ArrayList<RawAttribute> rattributes = new ArrayList<RawAttribute>(); /* TODO:ECL Entry attributes for DEL op ArrayList<RawAttribute> rattributes = delMsg.getEntryAttributes(); // Map the entry attributes of the DelMsg to an LDIF string // for the 'deletedentryattributes' attribute of the CL entry String delAttrs = delMsgToLDIFString(rattributes); */ clEntry = createChangelogEntry( eclmsg.getServiceId(), eclmsg.getCookie().toString(), DN.decode(delMsg.getDn()), delMsg.getChangeNumber(), null, delMsg.getUniqueId(), null, null, null, //rattributes, -1, // TODO:ECL G Good changelog draft compat. delMsg.getSeqnum() "delete"); } return clEntry; } @@ -863,20 +841,6 @@ HashMap<AttributeType,List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>(); // Add to the root entry the replication state of each domain // TODO:ECL Put in ECL root entry, the ServerState for each domain AttributeType descType = DirectoryServer.getAttributeType("description"); List<ReplicationDomain> supportedReplicationDomains = new ArrayList<ReplicationDomain>(); for (ReplicationDomain domain : supportedReplicationDomains) { // Crappy stuff to return the server state to the client LinkedList<Attribute> attrList = new LinkedList<Attribute>(); attrList.add(Attributes.create("description", domain.getServiceID() + "/" + domain.getServerState().toString())); userAttrs.put(descType, attrList); } Entry e = new Entry(this.rootBaseDN, oclasses, userAttrs, operationalAttrs); return e; @@ -916,10 +880,19 @@ String changetype) throws DirectoryException { String dnString = "cn="+ changeNumber +"," + serviceID + "," + String dnString = ""; if (draftChangenumber == 0) { // Draft uncompat mode dnString = "cn="+ changeNumber +"," + serviceID + "," + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT; } else { // Draft compat mode dnString = "cn="+ draftChangenumber + "," + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT; } HashMap<ObjectClass,String> oClasses = new LinkedHashMap<ObjectClass,String>(3); oClasses.putAll(eclObjectClasses); @@ -962,7 +935,7 @@ attrList = new ArrayList<Attribute>(1); attrList.add(a); uAttrs.put(a.getAttributeType(), attrList); */ */ // a = Attributes.create("changetype", changetype); @@ -1017,18 +990,21 @@ uAttrs.put(a.getAttributeType(), attrList); } a = Attributes.create("targetentryuuid", targetUUID); attrList = new ArrayList<Attribute>(1); attrList.add(a); operationalAttrs.put(a.getAttributeType(), attrList); if (draftChangenumber>0) if (targetUUID != null) { // compat mode a = Attributes.create("targetuniqueid", ECLSearchOperation.openDsToSunDseeNsUniqueId(targetUUID)); a = Attributes.create("targetentryuuid", targetUUID); attrList = new ArrayList<Attribute>(1); attrList.add(a); operationalAttrs.put(a.getAttributeType(), attrList); if (draftChangenumber>0) { // compat mode a = Attributes.create("targetuniqueid", ECLSearchOperation.openDsToSunDseeNsUniqueId(targetUUID)); attrList = new ArrayList<Attribute>(1); attrList.add(a); operationalAttrs.put(a.getAttributeType(), attrList); } } a = Attributes.create("changelogcookie", cookie); @@ -1061,7 +1037,7 @@ uAttrs.put(a.getAttributeType(), attrList); } } */ */ /* if (targetAttrNames != null) @@ -1082,7 +1058,7 @@ } } } */ */ /* TODO: Implement entry attributes historical values if (histEntryAttributes != null) { @@ -1105,7 +1081,7 @@ } } } */ */ // at the end build the CL entry to be returned Entry cle = new Entry( @@ -1315,7 +1291,7 @@ */ public CancelResult cancel(CancelRequest cancelRequest) { if (eclSession!=null) if (eclSession != null) { try { @@ -1327,20 +1303,16 @@ } /** * The unique identifier used in DSEE is named nsUniqueId and its format is * HHHHHHHH-HHHHHHHH-HHHHHHHH-HHHHHHHH where H is a hex digit. * An nsUniqueId value is for example 3970de28-08b311d9-8095b9bf-c4d9231c * The unique identifier used in OpenDS is named entryUUID. * Its value is for example entryUUID: 50dd9673-71e1-4478-b13c-dba387c4d7e1 * @param entryUid the OpenDS entry UID * @return the Dsee format for the entry UID */ * The unique identifier used in DSEE is named nsUniqueId and its format is * HHHHHHHH-HHHHHHHH-HHHHHHHH-HHHHHHHH where H is a hex digit. * An nsUniqueId value is for example 3970de28-08b311d9-8095b9bf-c4d9231c * The unique identifier used in OpenDS is named entryUUID. * Its value is for example entryUUID: 50dd9673-71e1-4478-b13c-dba387c4d7e1 * @param entryUid the OpenDS entry UID * @return the Dsee format for the entry UID */ private static String openDsToSunDseeNsUniqueId(String entryUid) { if (entryUid == null) return null; // the conversion from one unique identifier to an other is // a question of formating : the last "-" is placed StringBuffer buffer = new StringBuffer(entryUid); @@ -1355,5 +1327,106 @@ return buffer.toString(); } } /** * Traverse the provided search filter, looking for some conditions * on attributes that can be optimized in the ECL. * When found, populate the provided StartECLSessionMsg. * @param startCLmsg the startCLMsg to be populated. * @param sf the provided search filter. * @throws DirectoryException when an exception occurs. */ public static void evaluateFilter(StartECLSessionMsg startCLmsg, SearchFilter sf) throws DirectoryException { StartECLSessionMsg msg = evaluateFilter2(sf); startCLmsg.setFirstDraftChangeNumber(msg.getFirstDraftChangeNumber()); startCLmsg.setLastDraftChangeNumber(msg.getLastDraftChangeNumber()); startCLmsg.setChangeNumber(msg.getChangeNumber()); } private static StartECLSessionMsg evaluateFilter2(SearchFilter sf) throws DirectoryException { StartECLSessionMsg startCLmsg = new StartECLSessionMsg(); startCLmsg.setFirstDraftChangeNumber(-1); startCLmsg.setLastDraftChangeNumber(-1); startCLmsg.setChangeNumber(new ChangeNumber(0,0,(short)0)); // Here are the 3 elementary cases we know how to optimize if ((sf != null) && (sf.getFilterType() == FilterType.GREATER_OR_EQUAL) && (sf.getAttributeType() != null) && (sf.getAttributeType().getPrimaryName(). equalsIgnoreCase("changeNumber"))) { int sn = Integer.decode( sf.getAssertionValue().getNormalizedValue().toString()); startCLmsg.setFirstDraftChangeNumber(sn); return startCLmsg; } else if ((sf != null) && (sf.getFilterType() == FilterType.LESS_OR_EQUAL) && (sf.getAttributeType() != null) && (sf.getAttributeType().getPrimaryName(). equalsIgnoreCase("changeNumber"))) { int sn = Integer.decode( sf.getAssertionValue().getNormalizedValue().toString()); startCLmsg.setLastDraftChangeNumber(sn); return startCLmsg; } else if ((sf != null) && (sf.getFilterType() == FilterType.EQUALITY) && (sf.getAttributeType() != null) && (sf.getAttributeType().getPrimaryName(). equalsIgnoreCase("replicationcsn"))) { // == exact changenumber ChangeNumber cn = new ChangeNumber(sf.getAssertionValue().toString()); startCLmsg.setChangeNumber(cn); return startCLmsg; } else if ((sf != null) && (sf.getFilterType() == FilterType.EQUALITY) && (sf.getAttributeType() != null) && (sf.getAttributeType().getPrimaryName(). equalsIgnoreCase("changenumber"))) { int sn = Integer.decode( sf.getAssertionValue().getNormalizedValue().toString()); startCLmsg.setFirstDraftChangeNumber(sn); startCLmsg.setLastDraftChangeNumber(sn); return startCLmsg; } else if ((sf != null) && (sf.getFilterType() == FilterType.AND)) { // Here is the only binary operation we know how to optimize Collection<SearchFilter> comps = sf.getFilterComponents(); SearchFilter sfs[] = comps.toArray(new SearchFilter[0]); StartECLSessionMsg m1 = evaluateFilter2(sfs[0]); StartECLSessionMsg m2 = evaluateFilter2(sfs[1]); int l1 = m1.getLastDraftChangeNumber(); int l2 = m2.getLastDraftChangeNumber(); if (l1 == -1) startCLmsg.setLastDraftChangeNumber(l2); else if (l2 == -1) startCLmsg.setLastDraftChangeNumber(l1); else startCLmsg.setLastDraftChangeNumber(Math.min(l1,l2)); int f1 = m1.getFirstDraftChangeNumber(); int f2 = m2.getFirstDraftChangeNumber(); startCLmsg.setFirstDraftChangeNumber(Math.max(f1,f2)); return startCLmsg; } else { return startCLmsg; } } } opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLWorkflowElement.java
@@ -171,12 +171,12 @@ /** * Registers the provided persistent search operation with this * local backend workflow element so that it will be notified of any * workflow element so that it will be notified of any * add, delete, modify, or modify DN operations that are performed. * * @param persistentSearch * The persistent search operation to register with this * local backend workflow element. * workflow element. */ void registerPersistentSearch(PersistentSearch persistentSearch) { @@ -198,10 +198,10 @@ /** * Gets the list of persistent searches currently active against * this local backend workflow element. * this workflow element. * * @return The list of persistent searches currently active against * this local backend workflow element. * this workflow element. */ public List<PersistentSearch> getPersistentSearches() { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -82,6 +82,7 @@ import org.opends.server.protocols.ldap.SearchResultDoneProtocolOp; import org.opends.server.protocols.ldap.SearchResultEntryProtocolOp; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.DomainFakeCfg; @@ -95,7 +96,9 @@ import org.opends.server.replication.protocol.ModifyDnContext; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.DraftCNDbHandler; import org.opends.server.replication.server.ExternalChangeLogSessionImpl; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; @@ -118,15 +121,18 @@ import org.opends.server.types.ModificationType; import org.opends.server.types.RDN; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.opends.server.util.LDIFWriter; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** * Tests for the replicationServer code. */ @@ -142,12 +148,16 @@ // The port of the replicationServer. private int replicationServerPort; public static final String TEST_ROOT_DN_STRING2 = "o=test2"; public static final String TEST_BACKEND_ID2 = "test2"; private static final String TEST_ROOT_DN_STRING2 = "o=test2"; private static final String TEST_BACKEND_ID2 = "test2"; // The LDAPStatistics object associated with the LDAP connection handler. protected LDAPStatistics ldapStatistics; private LDAPStatistics ldapStatistics; private ChangeNumber gblCN; List<Control> NO_CONTROL = null; /** * Set up the environment for performing the tests in this Class. * Replication @@ -181,7 +191,7 @@ replicationServer = new ReplicationServer(conf1);; debugInfo("configure", "ReplicationServer created"+replicationServer); } /** @@ -190,25 +200,47 @@ @Test(enabled=true) public void ECLReplicationServerTest() { ECLOnPrivateBackend();replicationServer.clearDb(); ECLRemoteEmpty();replicationServer.clearDb(); ECLDirectEmpty();replicationServer.clearDb(); ECLDirectAllOps();replicationServer.clearDb(); ECLEmpty();replicationServer.clearDb(); ECLAllOps();replicationServer.clearDb(); ECLRemoteNonEmpty();replicationServer.clearDb(); ECLDirectMsg();replicationServer.clearDb(); ECLDirectPsearch(true);replicationServer.clearDb(); ECLDirectPsearch(false);replicationServer.clearDb(); ECLPrivateDirectMsg();replicationServer.clearDb(); ECLTwoDomains();replicationServer.clearDb(); ECLPsearch(true, false);replicationServer.clearDb(); ECLPsearch(false, false);replicationServer.clearDb(); ECLSimulPsearches();replicationServer.clearDb(); // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned // TODO:ECL Test invalid DN in cookie returns UNWILLING + message // TODO:ECL Test notif control returned contains the cookie // TODO:ECL Test the attributes list and values returned in ECL entries // TODO:ECL Test search -s base, -s one ChangeTimeHeartbeatTest();replicationServer.clearDb(); ECLFilterTest(); ECLCompatEmpty(); ECLCompatBadSeqnum(); ECLCompatWriteReadAllOps(1); ECLCompatWriteReadAllOps(5); ECLCompatReadFrom(6); ECLCompatReadFromTo(5,7); ECLCompatTestLimits(1,8); ECLCompatPurge(); ECLCompatTestLimits(0,0); ECLPsearch(true, true);replicationServer.clearDb(); ECLPsearch(false, true); ECLFilterOnReplicationCsn();replicationServer.clearDb(); ECLSimulPsearches();replicationServer.clearDb(); } //======================================================= // From 3 remote ECL session, // Test DoneMsg is received from 1 suffix on each session. // Objectives // - Test that everything id ok with no changes // Procedure // - Does a SEARCH from 3 different remote ECL session, // - Verify DoneMsg is received on each session. private void ECLRemoteEmpty() { String tn = "ECLRemoteEmpty"; @@ -244,9 +276,11 @@ } while(!(msg instanceof DoneMsg)); assertTrue(msgc==1, "Ending " + tn + " with incorrect message type :" + "Ending " + tn + " with incorrect message number :" + msg.getClass().getCanonicalName()); assertTrue(msg instanceof DoneMsg); assertTrue(msg instanceof DoneMsg, "Ending " + tn + " with incorrect message type :" + msg.getClass().getCanonicalName()); // Test broker2 receives only Done msgc=0; @@ -257,8 +291,11 @@ } while(!(msg instanceof DoneMsg)); assertTrue(msgc==1, "Ending " + tn + " with incorrect message type :" + "Ending " + tn + " with incorrect message number :" + msg.getClass().getCanonicalName()); assertTrue(msg instanceof DoneMsg, "Ending " + tn + " with incorrect message type :" + msg.getClass().getCanonicalName()); // Test broker3 receives only Done msgc=0; @@ -269,26 +306,31 @@ } while(!(msg instanceof DoneMsg)); assertTrue(msgc==1, "Ending " + tn + " with incorrect message type :" + "Ending " + tn + " with incorrect message number :" + msg.getClass().getCanonicalName()); assertTrue(msg instanceof DoneMsg, "Ending " + tn + " with incorrect message type :" + msg.getClass().getCanonicalName()); server1.stop(); server2.stop(); server3.stop(); sleep(500); debugInfo(tn, "Ending test successfully\n\n"); debugInfo(tn, "Ending test successfully\n\n"); } catch(Exception e) { debugInfo(tn, "Ending test with exception:" fail("Ending test " + tn + " with exception:" + stackTraceToSingleLineString(e)); assertTrue(e == null); } } //======================================================= // From 1 remote ECL session, // test simple update to be received from 2 suffixes // Objectives // - Test that everything id ok with changes on 2 suffixes // Procedure // - From 1 remote ECL session, // - Test simple update to be received from 2 suffixes private void ECLRemoteNonEmpty() { String tn = "ECLRemoteNonEmpty"; @@ -331,7 +373,7 @@ // wait for the server to take these changes into account sleep(500); // open ECL broker serverECL = openReplicationSession( DN.decode("cn=changelog"), (short)10, @@ -343,11 +385,11 @@ msg = serverECL.receive(); ECLUpdateMsg eclu = (ECLUpdateMsg)msg; UpdateMsg u = eclu.getUpdateMsg(); debugInfo(tn, "RESULT:" + u.getChangeNumber()); debugInfo(tn, "RESULT:" + u.getChangeNumber() + " " + eclu.getCookie()); assertTrue(u.getChangeNumber().equals(cn1), "RESULT:" + u.getChangeNumber()); assertTrue(eclu.getCookie().equalsTo(new MultiDomainServerState( "o=test:"+delMsg1.getChangeNumber()+";"))); "o=test:"+delMsg1.getChangeNumber()+";o=test2:;"))); // receive change 2 from suffix 2 msg = serverECL.receive(); eclu = (ECLUpdateMsg)msg; @@ -355,8 +397,8 @@ debugInfo(tn, "RESULT:" + u.getChangeNumber()); assertTrue(u.getChangeNumber().equals(cn2), "RESULT:" + u.getChangeNumber()); assertTrue(eclu.getCookie().equalsTo(new MultiDomainServerState( "o=test2:"+delMsg2.getChangeNumber()+";"+ "o=test:"+delMsg1.getChangeNumber()+";"))); "o=test2:"+delMsg2.getChangeNumber()+";"+ "o=test:"+delMsg1.getChangeNumber()+";"))); // receive Done msg = serverECL.receive(); @@ -372,35 +414,23 @@ } catch(Exception e) { debugInfo(tn, "Ending test with exception:" fail("Ending test " + tn + " with exception:" + stackTraceToSingleLineString(e)); assertTrue(e == null); } } /** * From embedded ECL (no remote session) * With empty RS, simple search should return only root entry. */ private void ECLDirectEmpty() private void ECLEmpty() { String tn = "ECLDirectEmpty"; String tn = "ECLEmpty"; debugInfo(tn, "Starting test\n\n"); try { // search on 'cn=changelog' InternalSearchOperation op = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, LDAPFilter.decode("(objectclass=*)")); // Error because no cookie assertEquals( op.getResultCode(), ResultCode.OPERATIONS_ERROR, op.getErrorMessage().toString()); // search on 'cn=changelog' InternalSearchOperation op2 = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, @@ -409,7 +439,7 @@ false, LDAPFilter.decode("(objectclass=*)"), new LinkedHashSet<String>(0), getControls(""), createControls(""), null); // success @@ -423,13 +453,17 @@ } catch(LDAPException e) { debugInfo(tn, "Ending test with exception e=" fail("Ending test " + tn + " with exception e=" + stackTraceToSingleLineString(e)); assertTrue(e == null); } } private ArrayList<Control> getControls(String cookie) /** * Build a list of controls including the cookie provided. * @param cookie The provided cookie. * @return The built list of controls. */ private ArrayList<Control> createControls(String cookie) { ExternalChangelogRequestControl control = new ExternalChangelogRequestControl(true, @@ -439,7 +473,9 @@ return controls; } // Utility - creates an LDIFWriter to dump result entries /** * Utility - creates an LDIFWriter to dump result entries. */ private static LDIFWriter getLDIFWriter() { LDIFWriter ldifWriter = null; @@ -470,46 +506,41 @@ assertNotNull(getEntry(entry.getDN(), 1000, true)); } //======================================================= // From embebbded ECL // Search ECL with 1 domain, public then private backend private void ECLPrivateDirectMsg() private void ECLOnPrivateBackend() { ReplicationServer replServer = null; String tn = "ECLPrivateDirectMsg"; String tn = "ECLOnPrivateBackend"; debugInfo(tn, "Starting test"); try { // Initialize a second test backend // Initialize a second test backend o=test2, in addtion to o=test // Configure replication on this backend // Add the root entry in the backend Backend backend2 = initializeTestBackend2(false); DN baseDn = DN.decode(TEST_ROOT_DN_STRING2); // configure and start replication of TEST_ROOT_DN_STRING on the server DN baseDn2 = DN.decode(TEST_ROOT_DN_STRING2); SortedSet<String> replServers = new TreeSet<String>(); replServers.add("localhost:"+replicationServerPort); DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, (short) 1602, replServers); LDAPReplicationDomain domain = MultimasterReplication.createNewDomain(domainConf); new DomainFakeCfg(baseDn2, (short) 1602, replServers); LDAPReplicationDomain domain2 = MultimasterReplication.createNewDomain(domainConf); SynchronizationProvider replicationPlugin = new MultimasterReplication(); replicationPlugin.completeSynchronizationProvider(); sleep(1000); Entry e = createEntry(baseDn); Entry e = createEntry(baseDn2); addEntry(e); sleep(1000); // Search on ECL from start on all suffixes String cookie = ""; ExternalChangelogRequestControl control = new ExternalChangelogRequestControl(true, new MultiDomainServerState("")); new MultiDomainServerState(cookie)); ArrayList<Control> controls = new ArrayList<Control>(0); controls.add(control); // search on 'cn=changelog' LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); debugInfo(tn, "Search with cookie=" + cookie); sleep(2000); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, @@ -521,23 +552,27 @@ attributes, controls, null); // Expect SUCCESS and root entry returned assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); assertTrue(entries != null); if (entries != null) { for (SearchResultEntry resultEntry : entries) { debugInfo(tn, "Result private entry=" + resultEntry.toLDIFString()); // Expect debugInfo(tn, "Entry returned=" + resultEntry.toLDIFString()); } } assertEquals(entries.size(),1, "Entries number returned by search"); // Same with private backend domain.getBackend().setPrivateBackend(true); // // Set the backend private and do again a search on ECL that should // now not return the entry // domain2.getBackend().setPrivateBackend(true); debugInfo(tn, "Search with cookie=" + cookie); searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, @@ -549,41 +584,37 @@ attributes, controls, null); // Expect success but no entry returned assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); entries = searchOp.getSearchEntries(); assertTrue(entries != null); assertTrue(entries.size()==0); if (replServer != null) replServer.remove(); if (domain != null) MultimasterReplication.deleteDomain(baseDn); // Cleaning if (domain2 != null) MultimasterReplication.deleteDomain(baseDn2); if (replicationPlugin != null) DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); removeTestBackend2(backend2); } catch(Exception e) { debugInfo(tn, "Ending test ECLDirectMsg with exception:\n" fail("Ending test " + tn + " with exception:" + stackTraceToSingleLineString(e)); assertTrue(e == null); } debugInfo(tn, "Ending test successfully"); } //======================================================= // From embebbded ECL // Search ECL with 4 messages on 2 suffixes from 2 brokers private void ECLDirectMsg() private void ECLTwoDomains() { String tn = "ECLDirectMsg"; String tn = "ECLTwoDomains"; debugInfo(tn, "Starting test"); try { // Initialize a second test backend @@ -630,7 +661,7 @@ new DeleteMsg("uid="+tn+"4," + TEST_ROOT_DN_STRING, cn, tn+"uuid4"); s1test.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); sleep(500); sleep(1500); // Changes are : // s1 s2 @@ -638,29 +669,28 @@ // o=test2 msg2/msg2 String cookie= ""; debugInfo(tn, "STEP 1 - from empty cookie("+cookie+")"); // search on 'cn=changelog' LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); debugInfo(tn, "Search with cookie=" + cookie + "\""); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(targetDN=*direct*)"), attributes, getControls(cookie), null); SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(targetDN=*"+tn+"*)"), attributes, createControls(cookie), null); // We expect SUCCESS and the 4 changes assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); cookie=""; LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); if (entries != null) { @@ -669,35 +699,30 @@ { debugInfo(tn, " RESULT entry returned:" + entry.toSingleLineString()); ldifWriter.writeEntry(entry); try if (i++==2) { if (i++==2) cookie = entry.getAttribute("changelogcookie").get(0).iterator().next().toString(); // Store the cookie returned with the 3rd ECL entry returned to use // it in the test below. cookie = entry.getAttribute("changelogcookie").get(0).iterator().next().toString(); } catch(NullPointerException e) {} } } assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS); // We expect the 4 changes assertEquals(searchOp.getSearchEntries().size(), 4); // Now start from last cookie and expect to get the last change // search on 'cn=changelog' // Now start from last cookie and expect to get ONLY the 4th change attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); debugInfo(tn, "STEP 2 - from cookie" + cookie); ExternalChangelogRequestControl control = new ExternalChangelogRequestControl(true, new MultiDomainServerState(cookie)); ArrayList<Control> controls = new ArrayList<Control>(0); controls.add(control); debugInfo(tn, "Search with cookie=" + cookie); searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, @@ -705,18 +730,16 @@ 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(targetDN=*direct*)"), LDAPFilter.decode("(targetDN=*"+tn+"*)"), attributes, controls, null); // We expect SUCCESS and the 4th change assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); cookie= ""; entries = searchOp.getSearchEntries(); cookie=""; if (entries != null) { for (SearchResultEntry entry : entries) @@ -725,21 +748,18 @@ ldifWriter.writeEntry(entry); try { cookie = entry.getAttribute("changelogcookie").get(0).iterator().next().toString(); // Store the cookie returned with the 4rd ECL entry returned to use // it in the test below. cookie = entry.getAttribute("changelogcookie").get(0).iterator().next().toString(); } catch(NullPointerException e) {} } } assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS); // we expect msg4 assertEquals(searchOp.getSearchEntries().size(), 1); debugInfo(tn, "STEP 3 - from cookie" + cookie); // Now publishes a new change and search from the previous cookie ChangeNumber cn5 = new ChangeNumber(time++, ts++, s1test.getServerId()); delMsg = new DeleteMsg("uid="+tn+"5," + TEST_ROOT_DN_STRING, cn5, tn+"uuid5"); @@ -757,6 +777,7 @@ controls = new ArrayList<Control>(0); controls.add(control); debugInfo(tn, "Search with cookie=" + cookie + "\""); searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, @@ -764,15 +785,13 @@ 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(targetDN=*direct*)"), LDAPFilter.decode("(targetDN=*"+tn+"*)"), attributes, controls, null); assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); cookie= ""; entries = searchOp.getSearchEntries(); if (entries != null) { @@ -782,20 +801,17 @@ ldifWriter.writeEntry(resultEntry); try { cookie = resultEntry.getAttribute("changelogcookie").get(0).iterator().next().toString(); cookie = resultEntry.getAttribute("changelogcookie").get(0).iterator().next().toString(); } catch(NullPointerException e) {} } } assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS); // we expect 1 entries : msg5 assertEquals(searchOp.getSearchEntries().size(), 1); cookie=""; debugInfo(tn, "STEP 4 - [filter:o=test cookie:" + cookie + "]"); control = new ExternalChangelogRequestControl(true, @@ -803,6 +819,8 @@ controls = new ArrayList<Control>(0); controls.add(control); debugInfo(tn, "Search with cookie=" + cookie + "\" and filter on domain=" + "(targetDN=*direct*,o=test)"); searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, @@ -810,15 +828,15 @@ 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(targetDN=*direct*,o=test)"), LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"), attributes, controls, null); assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); entries = searchOp.getSearchEntries(); if (entries != null) { @@ -828,18 +846,20 @@ ldifWriter.writeEntry(resultEntry); try { cookie = resultEntry.getAttribute("changelogcookie").get(0).iterator().next().toString(); cookie = resultEntry.getAttribute("changelogcookie").get(0).iterator().next().toString(); } catch(NullPointerException e) {} } } assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS); // we expect msg1 + msg4 + msg5 assertEquals(searchOp.getSearchEntries().size(), 3); // // Test startState ("first cookie") of the ECL // // -- ReplicationBroker s1test2 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING2), (short) 1203, @@ -852,7 +872,6 @@ 1000, true); sleep(500); // Test startState ("first cookie") of the ECL time = TimeThread.getTime(); cn = new ChangeNumber(time++, ts++, s1test2.getServerId()); delMsg = @@ -887,8 +906,9 @@ assertTrue(startState.getMaxChangeNumber(s2test2.getServerId()).getSeqnum()==2); assertTrue(startState.getMaxChangeNumber(s1test2.getServerId()).getSeqnum()==6); // Test lastState ("last cookie") of the ECL // create an ECL sessionm and request lastCookie // // Test lastExternalChangelogCookie attribute of the ECL // ExternalChangeLogSessionImpl session = new ExternalChangeLogSessionImpl(replicationServer); MultiDomainServerState expectedLastCookie = @@ -911,13 +931,12 @@ false, // Types only LDAPFilter.decode("(objectclass=*)"), lastcookieattribute, null, NO_CONTROL, null); assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); cookie = ""; entries = searchOp.getSearchEntries(); if (entries != null) @@ -943,24 +962,23 @@ s1test2.stop(); s2test.stop(); s2test2.stop(); removeTestBackend2(backend2); } catch(Exception e) { debugInfo(tn, "Ending test ECLDirectMsg with exception:\n" fail("Ending test " + tn + "with exception:\n" + stackTraceToSingleLineString(e)); assertTrue(e == null); } debugInfo(tn, "Ending test successfully"); } // simple update to be received private void ECLDirectAllOps() private void ECLAllOps() { String tn = "ECLDirectAllOps"; String tn = "ECLAllOps"; debugInfo(tn, "Starting test\n\n"); try { LDIFWriter ldifWriter = getLDIFWriter(); @@ -972,6 +990,12 @@ 1000, true); int ts = 1; // Creates broker on o=test2 ReplicationBroker server02 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING2), (short) 1202, 100, replicationServerPort, 1000, true); String user1entryUUID = "11111111-1111-1111-1111-111111111111"; String baseUUID = "22222222-2222-2222-2222-222222222222"; @@ -1000,6 +1024,15 @@ server01.publish(addMsg); debugInfo(tn, " publishes " + addMsg.getChangeNumber()); // Publish DEL /* ChangeNumber cn12 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1202); DeleteMsg delMsg2 = new DeleteMsg("uid="+tn+"12," + TEST_ROOT_DN_STRING2, cn12, tn+"uuid12"); server02.publish(delMsg2); debugInfo(tn, " publishes " + delMsg2.getChangeNumber()); */ // Publish MOD ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); Attribute attr1 = Attributes.create("description", "new value"); @@ -1024,12 +1057,11 @@ ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp); server01.publish(modDNMsg); debugInfo(tn, " publishes " + modDNMsg.getChangeNumber()); sleep(600); sleep(1000); String cookie= ""; // search on 'cn=changelog' debugInfo(tn, "STEP 1 - from empty cookie("+cookie+")"); LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); @@ -1040,28 +1072,28 @@ ArrayList<Control> controls = new ArrayList<Control>(0); controls.add(control); debugInfo(tn, "Search with cookie=" + cookie + "\" filter=" + "(targetdn=*"+tn+"*,o=test)"); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(targetdn=*"+tn+"*,o=test)"), attributes, controls, null); SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(targetdn=*"+tn+"*,o=test)"), attributes, controls, null); sleep(500); // test success assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); // test 4 entries returned LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); // 4 entries expected assertEquals(searchOp.getSearchEntries().size(), 4); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); if (entries != null) { int i=0; @@ -1073,16 +1105,20 @@ if (i==1) { // check the DEL entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn=" + cn1 + "," + TEST_ROOT_DN_STRING + ",cn=changelog")); checkValue(resultEntry,"replicationcsn",cn1.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","delete"); checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";"); checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",tn+"uuid1"); checkValue(resultEntry,"changenumber","-1"); checkValue(resultEntry,"changenumber","0"); } else if (i==2) { // check the ADD entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn=" + cn2 + "," + TEST_ROOT_DN_STRING + ",cn=changelog")); String expectedValue1 = "objectClass: domain\nobjectClass: top\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n\n"; String expectedValue2 = "entryUUID: 11111111-1111-1111-1111-111111111111\n" + @@ -1092,12 +1128,14 @@ checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","add"); checkValue(resultEntry,"changelogcookie","o=test:"+cn2.toString()+";"); checkValue(resultEntry,"changelogcookie","o=test:"+cn2.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber","-1"); checkValue(resultEntry,"changenumber","0"); } else if (i==3) { // check the MOD entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn=" + cn3 + "," + TEST_ROOT_DN_STRING + ",cn=changelog")); String expectedValue = "replace: description\n" + "description: new value\n-\n"; checkValue(resultEntry,"changes",expectedValue); @@ -1105,32 +1143,35 @@ checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","modify"); checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";"); checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",tn+"uuid3"); checkValue(resultEntry,"changenumber","-1"); checkValue(resultEntry,"changenumber","0"); } else if (i==4) { // check the MODDN entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn=" + cn4 + "," + TEST_ROOT_DN_STRING + ",cn=changelog")); checkValue(resultEntry,"replicationcsn",cn4.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","modrdn"); checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";"); checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",tn+"uuid4"); checkValue(resultEntry,"newrdn","uid=ECLDirectAllOpsnew4"); checkValue(resultEntry,"newrdn","uid=ECLAllOpsnew4"); checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2); checkValue(resultEntry,"deleteoldrdn","true"); checkValue(resultEntry,"changenumber","-1"); checkValue(resultEntry,"changenumber","0"); } } } server01.stop(); if (server02 != null) server02.stop(); } catch(Exception e) { debugInfo(tn, "Ending test with exception:\n" fail("Ending test " + tn + " with exception:\n" + stackTraceToSingleLineString(e)); assertTrue(e == null); } debugInfo(tn, "Ending test with success"); } @@ -1140,13 +1181,13 @@ AttributeValue av = null; try { List<Attribute> attrs = entry.getAttribute(attrName); Attribute a = attrs.iterator().next(); av = a.iterator().next(); String encodedValue = av.toString(); List<Attribute> attrs = entry.getAttribute(attrName); Attribute a = attrs.iterator().next(); av = a.iterator().next(); String encodedValue = av.toString(); assertTrue(encodedValue.equalsIgnoreCase(expectedValue), "In entry " + entry + " attr <" + attrName + "> equals " + av + " instead of expected value " + expectedValue); "In entry " + entry + " attr <" + attrName + "> equals " + av + " instead of expected value " + expectedValue); } catch(Exception e) { @@ -1155,23 +1196,23 @@ av + " instead of expected value " + expectedValue); } } private static void checkPossibleValues(Entry entry, String attrName, String expectedValue1, String expectedValue2) { AttributeValue av = null; try { List<Attribute> attrs = entry.getAttribute(attrName); Attribute a = attrs.iterator().next(); av = a.iterator().next(); String encodedValue = av.toString(); List<Attribute> attrs = entry.getAttribute(attrName); Attribute a = attrs.iterator().next(); av = a.iterator().next(); String encodedValue = av.toString(); assertTrue( (encodedValue.equalsIgnoreCase(expectedValue1) || encodedValue.equalsIgnoreCase(expectedValue2)), "In entry " + entry + " attr <" + attrName + "> equals " + av + " instead of one of the expected values " + expectedValue1 + " or " + expectedValue2); "In entry " + entry + " attr <" + attrName + "> equals " + av + " instead of one of the expected values " + expectedValue1 + " or " + expectedValue2); } catch(Exception e) { @@ -1181,13 +1222,14 @@ + " or " + expectedValue2); } } /** * Test persistent search */ private void ECLDirectPsearch(boolean changesOnly) private void ECLPsearch(boolean changesOnly, boolean compatMode) { String tn = "ECLDirectPsearch_" + String.valueOf(changesOnly); String tn = "ECLPsearch_" + String.valueOf(changesOnly) + "_" + String.valueOf(compatMode); debugInfo(tn, "Starting test \n\n"); Socket s =null; @@ -1217,13 +1259,20 @@ // Produce update on this suffix ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); DeleteMsg delMsg = new DeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, cn, tn+"uuid1"); new DeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, cn, "11111111-1112-1113-1114-111111111114"); debugInfo(tn, " publishing " + delMsg.getChangeNumber()); server01.publish(delMsg); this.sleep(500); // let's be sure the message is in the RS // Creates cookie control ArrayList<Control> controls = getControls(""); String cookie = ""; ArrayList<Control> controls = createControls(cookie); if (compatMode) { cookie = null; controls = new ArrayList<Control>(0); } // Creates psearch control HashSet<PersistentSearchChangeType> changeTypes = @@ -1250,6 +1299,7 @@ null); // Connects and bind debugInfo(tn, "Search with cookie=" + cookie + "\""); s = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); org.opends.server.tools.LDAPReader r = new org.opends.server.tools.LDAPReader(s); LDAPWriter w = new LDAPWriter(s); @@ -1269,7 +1319,7 @@ long searchReferences = ldapStatistics.getSearchResultReferences(); long searchesDone = ldapStatistics.getSearchResultsDone(); debugInfo(tn, "Sending the PSearch request filter=(targetDN=*"+tn+"*,o=test)"); debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)"); LDAPMessage message; message = new LDAPMessage(2, searchRequest, controls); w.writeMessage(message); @@ -1281,21 +1331,24 @@ if (changesOnly == false) { // Wait for change 1 debugInfo(tn, "Waiting for : INIT search expected to return change 1"); debugInfo(tn, "Waiting for init search expected to return change 1"); searchEntries = 0; message = null; try { while ((searchEntries<1) && (message = r.readMessage()) != null) { debugInfo(tn, "First search returns " + debugInfo(tn, "Init search Result=" + message.getProtocolOpType() + message + " " + searchEntries); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); searchEntries++; // FIXME:ECL Double check 10 is really the valid value here. checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", (compatMode?"10":"0")); break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: @@ -1307,7 +1360,6 @@ assertEquals( searchResultDone.getResultCode(), ResultCode.SUCCESS, searchResultDone.getErrorMessage().toString()); // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); searchesDone++; break; } @@ -1315,23 +1367,24 @@ } catch(Exception e) { debugInfo(tn, "INIT search failed with e=" + stackTraceToSingleLineString(e)); fail("init search failed with e=" + stackTraceToSingleLineString(e)); } debugInfo(tn, "INIT search done with success. searchEntries=" + searchEntries + " searchesDone="+ searchesDone); + searchEntries + " #searchesDone="+ searchesDone); } // Produces change 2 cn = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); String expectedDn = "uid=" + tn + "2," + TEST_ROOT_DN_STRING; delMsg = new DeleteMsg(expectedDn, cn, tn + "uuid2"); delMsg = new DeleteMsg(expectedDn, cn, "11111111-1112-1113-1114-111111111115"); debugInfo(tn, " publishing " + delMsg.getChangeNumber()); server01.publish(delMsg); this.gblCN = cn; this.sleep(1000); debugInfo(tn, delMsg.getChangeNumber() + " published , will wait for new entries (Persist)"); " published , psearch will now wait for new entries"); // wait for the 1 new entry searchEntries = 0; @@ -1340,7 +1393,7 @@ message = null; while ((searchEntries<1) && (message = r.readMessage()) != null) { debugInfo(tn, "2nd search returns " + debugInfo(tn, "psearch search Result=" + message.getProtocolOpType() + message); switch (message.getProtocolOpType()) { @@ -1364,7 +1417,7 @@ } } sleep(1000); // Check we received change 2 for (LDAPAttribute a : searchResultEntry.getAttributes()) { @@ -1391,16 +1444,172 @@ // When problem found, we have to re-enable this test. if (false) { // ACI step debugInfo(tn, "Starting ACI step"); s = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); r = new org.opends.server.tools.LDAPReader(s); w = new LDAPWriter(s); s.setSoTimeout(1500000); bindAsWhoEver(w, r, "toto", "tutu", LDAPResultCode.OPERATIONS_ERROR); searchRequest = // ACI step debugInfo(tn, "Starting ACI step"); s = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); r = new org.opends.server.tools.LDAPReader(s); w = new LDAPWriter(s); s.setSoTimeout(1500000); bindAsWhoEver(w, r, "toto", "tutu", LDAPResultCode.OPERATIONS_ERROR); searchRequest = new SearchRequestProtocolOp( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, Integer.MAX_VALUE, Integer.MAX_VALUE, false, LDAPFilter.decode("(targetDN=*directpsearch*,o=test)"), null); debugInfo(tn, "ACI test : sending search"); message = new LDAPMessage(2, searchRequest, createControls("")); w.writeMessage(message); searchesDone=0; searchEntries = 0; searchResultEntry = null; searchResultDone = null; while ((searchesDone==0) && (message = r.readMessage()) != null) { debugInfo(tn, "ACI test : message returned " + message.getProtocolOpType() + message); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); //assertTrue(false, "Unexpected entry returned in ACI test of " + tn + searchResultEntry); searchEntries++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: searchReferences++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: searchResultDone = message.getSearchResultDoneProtocolOp(); assertEquals(searchResultDone.getResultCode(), ResultCode.SUCCESS.getIntValue()); // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); searchesDone++; break; } } // search should end with success assertTrue(searchesDone==1); // but returning no entry assertEquals(searchEntries,0, "Bad search entry# in ACI test of " + tn); } try { s.close(); } catch (Exception e) {}; sleep(1000); } catch(Exception e) { fail("Test " + tn + " fails with " + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ends test successfully"); } /** * Test parallel simultaneous psearch with different filters. */ private void ECLSimulPsearches() { String tn = "ECLSimulPsearches"; debugInfo(tn, "Starting test \n\n"); Socket s1, s2, s3 = null; boolean compatMode = false; boolean changesOnly = false; // create stats for (ConnectionHandler ch : DirectoryServer.getConnectionHandlers()) { if (ch instanceof LDAPConnectionHandler) { LDAPConnectionHandler lch = (LDAPConnectionHandler) ch; if (!lch.useSSL()) { ldapStatistics = lch.getStatTracker(); } } } assertNotNull(ldapStatistics); try { // Create broker on o=test ReplicationBroker server01 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); int ts = 1; // Create broker on o=test2 ReplicationBroker server02 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING2), (short) 1202, 100, replicationServerPort, 1000, true, EMPTY_DN_GENID); // Produce update 1 ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); DeleteMsg delMsg1 = new DeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, cn1, "11111111-1111-1111-1111-111111111111"); debugInfo(tn, " publishing " + delMsg1); server01.publish(delMsg1); this.sleep(500); // let's be sure the message is in the RS // Produce update 2 ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1202); DeleteMsg delMsg2 = new DeleteMsg("uid=" + tn + "2," + TEST_ROOT_DN_STRING2, cn2, "22222222-2222-2222-2222-222222222222"); debugInfo(tn, " publishing " + delMsg2); server02.publish(delMsg2); this.sleep(500); // let's be sure the message is in the RS // Produce update 3 ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1202); DeleteMsg delMsg3 = new DeleteMsg("uid=" + tn + "3," + TEST_ROOT_DN_STRING2, cn3, "33333333-3333-3333-3333-333333333333"); debugInfo(tn, " publishing " + delMsg3); server02.publish(delMsg3); this.sleep(500); // let's be sure the message is in the RS // Creates cookie control String cookie = ""; ArrayList<Control> controls = createControls(cookie); if (compatMode) { cookie = null; controls = new ArrayList<Control>(0); } // Creates psearch control HashSet<PersistentSearchChangeType> changeTypes = new HashSet<PersistentSearchChangeType>(); changeTypes.add(PersistentSearchChangeType.ADD); changeTypes.add(PersistentSearchChangeType.DELETE); changeTypes.add(PersistentSearchChangeType.MODIFY); changeTypes.add(PersistentSearchChangeType.MODIFY_DN); boolean returnECs = true; PersistentSearchControl persSearchControl = new PersistentSearchControl( changeTypes, changesOnly, returnECs); controls.add(persSearchControl); LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); // Creates request 1 SearchRequestProtocolOp searchRequest1 = new SearchRequestProtocolOp( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, @@ -1408,26 +1617,255 @@ Integer.MAX_VALUE, Integer.MAX_VALUE, false, LDAPFilter.decode("(targetDN=*directpsearch*,o=test)"), null); LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"), attributes); debugInfo(tn, "ACI test : sending search"); message = new LDAPMessage(2, searchRequest, getControls("")); w.writeMessage(message); // Creates request 2 SearchRequestProtocolOp searchRequest2 = new SearchRequestProtocolOp( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, Integer.MAX_VALUE, Integer.MAX_VALUE, false, LDAPFilter.decode("(targetDN=*"+tn+"*,o=test2)"), attributes); searchesDone=0; // Creates request 3 SearchRequestProtocolOp searchRequest3 = new SearchRequestProtocolOp( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, Integer.MAX_VALUE, Integer.MAX_VALUE, false, LDAPFilter.decode("objectclass=*"), attributes); // Connects and bind s1 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); org.opends.server.tools.LDAPReader r1 = new org.opends.server.tools.LDAPReader(s1); LDAPWriter w1 = new LDAPWriter(s1); s1.setSoTimeout(1500000); bindAsManager(w1, r1); // Connects and bind s2 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); org.opends.server.tools.LDAPReader r2 = new org.opends.server.tools.LDAPReader(s2); LDAPWriter w2 = new LDAPWriter(s2); s2.setSoTimeout(1500000); bindAsManager(w2, r2); // Connects and bind s3 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); org.opends.server.tools.LDAPReader r3 = new org.opends.server.tools.LDAPReader(s3); LDAPWriter w3 = new LDAPWriter(s3); s3.setSoTimeout(1500000); bindAsManager(w3, r3); // Since we are going to be watching the post-response count, we need to // wait for the server to become idle before kicking off the next request // to ensure that any remaining post-response processing from the previous // operation has completed. assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000)); InvocationCounterPlugin.resetAllCounters(); long searchRequests = ldapStatistics.getSearchRequests(); long searchEntries = ldapStatistics.getSearchResultEntries(); long searchReferences = ldapStatistics.getSearchResultReferences(); long searchesDone = ldapStatistics.getSearchResultsDone(); LDAPMessage message; message = new LDAPMessage(2, searchRequest1, controls); w1.writeMessage(message); this.sleep(500); message = new LDAPMessage(2, searchRequest2, controls); w2.writeMessage(message); this.sleep(500); message = new LDAPMessage(2, searchRequest3, controls); w3.writeMessage(message); this.sleep(500); SearchResultEntryProtocolOp searchResultEntry = null; SearchResultDoneProtocolOp searchResultDone = null; if (changesOnly == false) { debugInfo(tn, "Search1 Persistent filter="+searchRequest1.getFilter().toString() + " expected to return change " + cn1); searchEntries = 0; message = null; try { while ((searchEntries<1) && (message = r1.readMessage()) != null) { debugInfo(tn, "Search1 Result=" + message.getProtocolOpType() + " " + message); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); searchEntries++; if (searchEntries==1) { checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",cn1.toString()); checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", (compatMode?"10":"0")); } break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: searchReferences++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: searchResultDone = message.getSearchResultDoneProtocolOp(); assertEquals( searchResultDone.getResultCode(), ResultCode.SUCCESS, searchResultDone.getErrorMessage().toString()); searchesDone++; break; } } } catch(Exception e) { fail("Search1 failed with e=" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Search1 done with success. searchEntries=" + searchEntries + " #searchesDone="+ searchesDone); searchEntries = 0; message = null; try { debugInfo(tn, "Search 2 Persistent filter="+searchRequest2.getFilter().toString() + " expected to return change " + cn2 + " & " + cn3); while ((searchEntries<2) && (message = r2.readMessage()) != null) { debugInfo(tn, "Search 2 Result=" + message.getProtocolOpType() + message); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); searchEntries++; checkValue(searchResultEntry.toSearchResultEntry(),"changenumber", (compatMode?"10":"0")); break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: searchReferences++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: searchResultDone = message.getSearchResultDoneProtocolOp(); assertEquals( searchResultDone.getResultCode(), ResultCode.SUCCESS, searchResultDone.getErrorMessage().toString()); searchesDone++; break; } } } catch(Exception e) { fail("Search2 failed with e=" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Search2 done with success. searchEntries=" + searchEntries + " #searchesDone="+ searchesDone); searchEntries = 0; message = null; try { debugInfo(tn, "Search3 Persistent filter="+searchRequest3.getFilter().toString() + " expected to return change top + " + cn1 + " & " + cn2 + " & " + cn3); while ((searchEntries<4) && (message = r3.readMessage()) != null) { debugInfo(tn, "Search3 Result=" + message.getProtocolOpType() + " " + message); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); searchEntries++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: searchReferences++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: searchResultDone = message.getSearchResultDoneProtocolOp(); assertEquals( searchResultDone.getResultCode(), ResultCode.SUCCESS, searchResultDone.getErrorMessage().toString()); searchesDone++; break; } } } catch(Exception e) { fail("Search3 failed with e=" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Search3 done with success. searchEntries=" + searchEntries + " #searchesDone="+ searchesDone); } // Produces additional change ChangeNumber cn11 = new ChangeNumber(TimeThread.getTime(), 11, (short)1201); String expectedDn11 = "uid=" + tn + "11," + TEST_ROOT_DN_STRING; DeleteMsg delMsg11 = new DeleteMsg(expectedDn11, cn11, "44444444-4444-4444-4444-444444444444"); debugInfo(tn, " publishing " + delMsg11); server01.publish(delMsg11); this.sleep(500); debugInfo(tn, delMsg11.getChangeNumber() + " published additionally "); // Produces additional change ChangeNumber cn12 = new ChangeNumber(TimeThread.getTime(), 12, (short)1202); String expectedDn12 = "uid=" + tn + "12," + TEST_ROOT_DN_STRING2; DeleteMsg delMsg12 = new DeleteMsg(expectedDn12, cn12, "55555555-5555-5555-5555-555555555555"); debugInfo(tn, " publishing " + delMsg12 ); server02.publish(delMsg12); this.sleep(500); debugInfo(tn, delMsg12.getChangeNumber() + " published additionally "); // Produces additional change ChangeNumber cn13 = new ChangeNumber(TimeThread.getTime(), 13, (short)1202); String expectedDn13 = "uid=" + tn + "13," + TEST_ROOT_DN_STRING2; DeleteMsg delMsg13 = new DeleteMsg(expectedDn13, cn13, "66666666-6666-6666-6666-666666666666"); debugInfo(tn, " publishing " + delMsg13); server02.publish(delMsg13); this.sleep(500); debugInfo(tn, delMsg13.getChangeNumber() + " published additionally "); // wait 11 searchEntries = 0; searchResultEntry = null; searchResultDone = null; while ((searchesDone==0) && (message = r.readMessage()) != null) message = null; while ((searchEntries<1) && (message = r1.readMessage()) != null) { debugInfo(tn, "ACI test : message returned " + message.getProtocolOpType() + message); debugInfo(tn, "Search 11 Result=" + message.getProtocolOpType() + " " + message); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); //assertTrue(false, "Unexpected entry returned in ACI test of " + tn + searchResultEntry); searchEntries++; break; @@ -1437,27 +1875,141 @@ case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: searchResultDone = message.getSearchResultDoneProtocolOp(); assertEquals(searchResultDone.getResultCode(), ResultCode.SUCCESS.getIntValue()); assertEquals( searchResultDone.getResultCode(), ResultCode.SUCCESS, searchResultDone.getErrorMessage().toString()); // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); searchesDone++; break; } } // search should end with success assertTrue(searchesDone==1); // but returning no entry assertEquals(searchEntries,0, "Bad search entry# in ACI test of " + tn); } sleep(1000); /* // Check we received change 11 for (LDAPAttribute a : searchResultEntry.getAttributes()) { if (a.getAttributeType().equalsIgnoreCase("targetDN")) { for (ByteString av : a.getValues()) { assertTrue(av.toString().equalsIgnoreCase(expectedDn11), "Entry returned by psearch11 is " + av.toString() + " when expected is " + expectedDn11); } } } */ debugInfo(tn, "Search 1 successfully receives additional changes"); try { s.close(); } catch (Exception e) {}; // wait 12 & 13 searchEntries = 0; searchResultEntry = null; searchResultDone = null; message = null; while ((searchEntries<2) && (message = r2.readMessage()) != null) { debugInfo(tn, "psearch search 12 Result=" + message.getProtocolOpType() + " " + message); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); searchEntries++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: searchReferences++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: searchResultDone = message.getSearchResultDoneProtocolOp(); assertEquals( searchResultDone.getResultCode(), ResultCode.SUCCESS, searchResultDone.getErrorMessage().toString()); // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); searchesDone++; break; } } sleep(1000); /* // Check we received change 12 for (LDAPAttribute a : searchResultEntry.getAttributes()) { if (a.getAttributeType().equalsIgnoreCase("targetDN")) { for (ByteString av : a.getValues()) { assertTrue(av.toString().equalsIgnoreCase(expectedDn12), "Entry returned by psearch 12 is " + av.toString() + " when expected is " + expectedDn12); } } } */ debugInfo(tn, "Search 2 successfully receives additional changes"); // wait 11 & 12 & 13 searchEntries = 0; searchResultEntry = null; searchResultDone = null; message = null; while ((searchEntries<3) && (message = r3.readMessage()) != null) { debugInfo(tn, "psearch search 13 Result=" + message.getProtocolOpType() + " " + message); switch (message.getProtocolOpType()) { case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY: searchResultEntry = message.getSearchResultEntryProtocolOp(); searchEntries++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE: searchReferences++; break; case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE: searchResultDone = message.getSearchResultDoneProtocolOp(); assertEquals( searchResultDone.getResultCode(), ResultCode.SUCCESS, searchResultDone.getErrorMessage().toString()); // assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1); searchesDone++; break; } } sleep(1000); // Check we received change 13 for (LDAPAttribute a : searchResultEntry.getAttributes()) { if (a.getAttributeType().equalsIgnoreCase("targetDN")) { for (ByteString av : a.getValues()) { assertTrue(av.toString().equalsIgnoreCase(expectedDn13), "Entry returned by psearch 13 is " + av.toString() + " when expected is " + expectedDn13); } } } debugInfo(tn, "Search 3 successfully receives additional changes"); server01.stop(); server02.stop(); try { s1.close(); } catch (Exception e) {}; try { s2.close(); } catch (Exception e) {}; try { s3.close(); } catch (Exception e) {}; sleep(1000); } catch(Exception e) { assertTrue(e==null, stackTraceToSingleLineString(e)); fail("Test " + tn + " fails with " + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ends test successfuly"); debugInfo(tn, "Ends test successfully"); } // utility - bind as required @@ -1522,7 +2074,7 @@ TestCaseUtils.dsconfig( "delete-replication-server", "--provider-name", "Multimaster Synchronization"); */ */ replicationServer = null; } /** @@ -1549,7 +2101,7 @@ { logError(Message.raw(Category.SYNC, Severity.NOTICE, "** TEST " + tn + " ** " + s)); TRACER.debugInfo("** TEST " + tn + " ** " + s); //TRACER.debugInfo("** TEST " + tn + " ** " + s); } } @@ -1558,7 +2110,7 @@ */ private static Backend initializeTestBackend2(boolean createBaseEntry) throws IOException, InitializationException, ConfigException, DirectoryException DirectoryException { DN baseDN = DN.decode(TEST_ROOT_DN_STRING2); @@ -1590,11 +2142,849 @@ } return memoryBackend; } private static void removeTestBackend2(Backend backend) { MemoryBackend memoryBackend = (MemoryBackend)backend; memoryBackend.finalizeBackend(); DirectoryServer.deregisterBackend(memoryBackend); } //======================================================= private void ChangeTimeHeartbeatTest() { String tn = "ChangeTimeHeartbeatTest"; debugInfo(tn, "Starting test"); try { // Initialize a second test backend Backend backend2 = initializeTestBackend2(true); // -- ReplicationBroker s1test = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); ReplicationBroker s2test2 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING2), (short) 1202, 100, replicationServerPort, 1000, true, EMPTY_DN_GENID); sleep(500); // Produce updates long time = TimeThread.getTime(); int ts = 1; ChangeNumber cn = new ChangeNumber(time, ts++, s1test.getServerId()); DeleteMsg delMsg = new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn, tn+"uuid1"); s1test.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); cn = new ChangeNumber(time++, ts++, s2test2.getServerId()); delMsg = new DeleteMsg("uid="+tn+"2," + TEST_ROOT_DN_STRING2, cn, tn+"uuid2"); s2test2.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); ChangeNumber cn3 = new ChangeNumber(time++, ts++, s2test2.getServerId()); delMsg = new DeleteMsg("uid="+tn+"3," + TEST_ROOT_DN_STRING2, cn3, tn+"uuid3"); s2test2.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); cn = new ChangeNumber(time++, ts++, s1test.getServerId()); delMsg = new DeleteMsg("uid="+tn+"4," + TEST_ROOT_DN_STRING, cn, tn+"uuid4"); s1test.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); sleep(500); // -- ReplicationBroker s1test2 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING2), (short) 1203, 100, replicationServerPort, 1000, true, EMPTY_DN_GENID); ReplicationBroker s2test = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1204, 100, replicationServerPort, 1000, true); sleep(500); // Test startState ("first cookie") of the ECL time = TimeThread.getTime(); cn = new ChangeNumber(time++, ts++, s1test2.getServerId()); delMsg = new DeleteMsg("uid="+tn+"6," + TEST_ROOT_DN_STRING2, cn, tn+"uuid6"); s1test2.publish(delMsg); cn = new ChangeNumber(time++, ts++, s2test.getServerId()); delMsg = new DeleteMsg("uid="+tn+"7," + TEST_ROOT_DN_STRING, cn, tn+"uuid7"); s2test.publish(delMsg); ChangeNumber cn8 = new ChangeNumber(time++, ts++, s1test2.getServerId()); delMsg = new DeleteMsg("uid="+tn+"8," + TEST_ROOT_DN_STRING2, cn8, tn+"uuid8"); s1test2.publish(delMsg); ChangeNumber cn9 = new ChangeNumber(time++, ts++, s2test.getServerId()); delMsg = new DeleteMsg("uid="+tn+"9," + TEST_ROOT_DN_STRING, cn9, tn+"uuid9"); s2test.publish(delMsg); sleep(500); ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false); rsd1.getDbServerState(); rsd1.getChangeTimeHeartbeatState(); debugInfo(tn, " DbServerState=" + rsd1.getDbServerState() + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState() + " eligibleCN=" + rsd1.getEligibleCN()); // FIXME:ECL Enable this test by adding an assert on the right value ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2, false); rsd2.getDbServerState(); rsd2.getChangeTimeHeartbeatState(); debugInfo(tn, " DbServerState=" + rsd2.getDbServerState() + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState() + " eligibleCN=" + rsd2.getEligibleCN()); // FIXME:ECL Enable this test by adding an assert on the right value s1test.stop(); s1test2.stop(); s2test.stop(); s2test2.stop(); removeTestBackend2(backend2); } catch(Exception e) { fail("Ending test " + tn + " with exception:" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test successfully"); } /** * From embedded ECL (no remote session) * With empty RS, simple search should return only root entry. */ private void ECLCompatEmpty() { String tn = "ECLCompatEmpty"; debugInfo(tn, "Starting test\n\n"); try { // search on 'cn=changelog' String filter = "(objectclass=*)"; debugInfo(tn, " Search: " + filter); InternalSearchOperation op = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, LDAPFilter.decode(filter)); // success assertEquals( op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage().toString()); // root entry returned assertEquals(op.getEntriesSent(), 1); debugInfo(tn, "Ending test successfully"); } catch(LDAPException e) { fail("Ending test " + tn + " with exception=" + stackTraceToSingleLineString(e)); } } private void ECLCompatWriteReadAllOps(int firstDraftChangeNumber) { String tn = "ECLCompatWriteReadAllOps/" + String.valueOf(firstDraftChangeNumber); debugInfo(tn, "Starting test\n\n"); try { LDIFWriter ldifWriter = getLDIFWriter(); // Creates broker on o=test ReplicationBroker server01 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); int ts = 1; String user1entryUUID = "11111111-1112-1113-1114-111111111115"; String baseUUID = "22222222-2222-2222-2222-222222222222"; // Publish DEL ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); DeleteMsg delMsg = new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, user1entryUUID); server01.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); // Publish ADD gblCN = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); String lentry = new String( "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: "+user1entryUUID+"\n"); Entry entry = TestCaseUtils.entryFromLdifString(lentry); AddMsg addMsg = new AddMsg( gblCN, "uid="+tn+"2," + TEST_ROOT_DN_STRING, user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry.getAttributes(), new ArrayList<Attribute>()); server01.publish(addMsg); debugInfo(tn, " publishes " + addMsg.getChangeNumber()); // Publish MOD ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); Attribute attr1 = Attributes.create("description", "new value"); Modification mod1 = new Modification(ModificationType.REPLACE, attr1); List<Modification> mods = new ArrayList<Modification>(); mods.add(mod1); ModifyMsg modMsg = new ModifyMsg(cn3, DN .decode("uid="+tn+"3," + TEST_ROOT_DN_STRING), mods, user1entryUUID); server01.publish(modMsg); debugInfo(tn, " publishes " + modMsg.getChangeNumber()); // Publish modDN ChangeNumber cn4 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null, DN.decode("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN RDN.decode("uid="+tn+"new4"), // new rdn true, // deleteoldrdn DN.decode(TEST_ROOT_DN_STRING2)); // new superior op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(cn4, user1entryUUID, "newparentId")); LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op); ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp); server01.publish(modDNMsg); debugInfo(tn, " publishes " + modDNMsg.getChangeNumber()); sleep(1000); // search on 'cn=changelog' LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); String filter = "(targetdn=*"+tn.toLowerCase()+"*,o=test)"; debugInfo(tn, " Search: " + filter); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode(filter), attributes, NO_CONTROL, null); sleep(500); // test success assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); // test 4 entries returned LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); // 4 entries expected assertEquals(searchOp.getSearchEntries().size(), 4); if (entries != null) { int i=0; for (SearchResultEntry resultEntry : entries) { i++; debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); if (i==1) { // check the DEL entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+0)+",cn=changelog")); checkValue(resultEntry,"replicationcsn",cn1.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","delete"); checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0)); checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115"); } else if (i==2) { // check the ADD entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+1)+",cn=changelog")); String expectedValue1 = "objectClass: domain\nobjectClass: top\n" + "entryUUID: "+user1entryUUID+"\n\n"; String expectedValue2 = "entryUUID: "+user1entryUUID+"\n" + "objectClass: domain\nobjectClass: top\n\n"; checkPossibleValues(resultEntry,"changes",expectedValue1, expectedValue2); checkValue(resultEntry,"replicationcsn",gblCN.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","add"); checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1)); } else if (i==3) { // check the MOD entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+2)+",cn=changelog")); String expectedValue = "replace: description\n" + "description: new value\n-\n"; checkValue(resultEntry,"changes",expectedValue); checkValue(resultEntry,"replicationcsn",cn3.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","modify"); checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2)); } else if (i==4) { // check the MODDN entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+3)+",cn=changelog")); checkValue(resultEntry,"replicationcsn",cn4.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","modrdn"); checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"newrdn","uid="+tn+"new4"); checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2); checkValue(resultEntry,"deleteoldrdn","true"); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+3)); } } } server01.stop(); filter = "(&(targetdn=*"+tn.toLowerCase()+"*,o=test)(&(changenumber>="+ firstDraftChangeNumber+")(changenumber<="+(firstDraftChangeNumber+3)+")))"; debugInfo(tn, " Search: " + filter); searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode(filter), attributes, NO_CONTROL, null); sleep(500); // test success assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); entries = searchOp.getSearchEntries(); if (entries != null) { int i=0; for (SearchResultEntry resultEntry : entries) { i++; debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); if (i==1) { // check the DEL entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+0)+",cn=changelog")); checkValue(resultEntry,"replicationcsn",cn1.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","delete"); checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0)); checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115"); } else if (i==2) { // check the ADD entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+1)+",cn=changelog")); String expectedValue1 = "objectClass: domain\nobjectClass: top\n" + "entryUUID: "+user1entryUUID+"\n\n"; String expectedValue2 = "entryUUID: "+user1entryUUID+"\n" + "objectClass: domain\nobjectClass: top\n\n"; checkPossibleValues(resultEntry,"changes",expectedValue1, expectedValue2); checkValue(resultEntry,"replicationcsn",gblCN.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","add"); checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1)); } else if (i==3) { // check the MOD entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+2)+",cn=changelog")); String expectedValue = "replace: description\n" + "description: new value\n-\n"; checkValue(resultEntry,"changes",expectedValue); checkValue(resultEntry,"replicationcsn",cn3.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","modify"); checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2)); } else if (i==4) { // check the MODDN entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn="+String.valueOf(firstDraftChangeNumber+3)+",cn=changelog")); checkValue(resultEntry,"replicationcsn",cn4.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING); checkValue(resultEntry,"changetype","modrdn"); checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"newrdn","uid="+tn+"new4"); checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2); checkValue(resultEntry,"deleteoldrdn","true"); checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+3)); } } } assertEquals(searchOp.getSearchEntries().size(), 4); } catch(Exception e) { fail("Ending test " + tn + " with exception:" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } private void ECLCompatReadFrom(int firstDraftChangeNumber) { String tn = "ECLCompatReadFrom/" + String.valueOf(firstDraftChangeNumber); debugInfo(tn, "Starting test\n\n"); try { LDIFWriter ldifWriter = getLDIFWriter(); // Creates broker on o=test ReplicationBroker server01 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); String user1entryUUID = "11111111-1112-1113-1114-111111111115"; LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); String filter = "(changenumber="+firstDraftChangeNumber+")"; debugInfo(tn, " Search: " + filter); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode(filter), attributes, NO_CONTROL, null); sleep(500); // test success assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); assertEquals(searchOp.getSearchEntries().size(), 1); if (entries != null) { int i=0; for (SearchResultEntry resultEntry : entries) { i++; debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); // check the entry has the right content assertTrue(resultEntry.getDN().toNormalizedString().equalsIgnoreCase( "cn=6,cn=changelog")); checkValue(resultEntry,"replicationcsn",gblCN.toString()); checkValue(resultEntry,"replicaidentifier","1201"); checkValue(resultEntry,"changetype","add"); checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;"); checkValue(resultEntry,"targetentryuuid",user1entryUUID); checkValue(resultEntry,"changenumber","6"); } } server01.stop(); } catch(Exception e) { fail("Ending test " + tn + " with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } /** * Read the ECL in compat mode from firstDraftChangeNumber and to * lastDraftChangeNumber. * @param firstDraftChangeNumber * @param lastDraftChangeNumber */ private void ECLCompatReadFromTo(int firstDraftChangeNumber, int lastDraftChangeNumber) { String tn = "ECLCompatReadFromTo/" + String.valueOf(firstDraftChangeNumber) + "/" + String.valueOf(lastDraftChangeNumber); debugInfo(tn, "Starting test\n\n"); try { // search on 'cn=changelog' LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); String filter = "(&(changenumber>="+firstDraftChangeNumber+")(changenumber<="+lastDraftChangeNumber+"))"; debugInfo(tn, " Search: " + filter); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode(filter), attributes, NO_CONTROL, null); sleep(500); // test success assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); assertEquals(searchOp.getSearchEntries().size(), lastDraftChangeNumber-firstDraftChangeNumber+1); } catch(Exception e) { fail("Ending test " + tn + " with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } /** * Read the ECL in compat mode providing an unknown draft changenumber. */ private void ECLCompatBadSeqnum() { String tn = "ECLCompatBadSeqnum"; debugInfo(tn, "Starting test\n\n"); try { // search on 'cn=changelog' LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); String filter = "(changenumber=1000)"; debugInfo(tn, " Search: " + filter); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode(filter), attributes, NO_CONTROL, null); sleep(500); // test success and no entries returned assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); assertEquals(searchOp.getSearchEntries().size(), 0); } catch(Exception e) { fail("Ending test "+tn+" with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } /** * Read the ECL in compat mode providing an unknown draft changenumber. */ private void ECLFilterOnReplicationCsn() { String tn = "ECLFilterOnReplicationCsn"; debugInfo(tn, "Starting test\n\n"); try { LDIFWriter ldifWriter = getLDIFWriter(); // search on 'cn=changelog' LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("+"); attributes.add("*"); String filter = "(replicationcsn="+this.gblCN+")"; debugInfo(tn, " Search: " + filter); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf("cn=changelog"), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode(filter), attributes, NO_CONTROL, null); sleep(500); // test success and no entries returned assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); assertEquals(searchOp.getSearchEntries().size(), 1); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); assertEquals(searchOp.getSearchEntries().size(), 1); if (entries != null) { int i=0; for (SearchResultEntry resultEntry : entries) { i++; debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); // check the DEL entry has the right content checkValue(resultEntry,"replicationcsn",gblCN.toString()); // TODO:ECL check values of the other attributes } } } catch(Exception e) { fail("Ending test "+tn+" with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } /** * Test that different values of filter are correctly decoded * to find if the search op on the ECL can be optimized * regarding the Draft changenumbers. */ private void ECLFilterTest() { String tn = "ECLFilterTest"; debugInfo(tn, "Starting test\n\n"); try { StartECLSessionMsg startCLmsg = new StartECLSessionMsg(); // ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(objectclass=*)")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),-1); assertEquals(startCLmsg.getLastDraftChangeNumber(),-1); // ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(changenumber>=2)")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),2); assertEquals(startCLmsg.getLastDraftChangeNumber(),-1); // ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(&(changenumber>=2)(changenumber<=5))")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),2); assertEquals(startCLmsg.getLastDraftChangeNumber(),5); // try { ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(&(changenumber>=2)(changenumber<+5))")); assertTrue((startCLmsg.getFirstDraftChangeNumber()==1)); } catch(DirectoryException de) { assertTrue(de != null); } // ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(&(dc=x)(&(changenumber>=2)(changenumber<=5)))")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),2); assertEquals(startCLmsg.getLastDraftChangeNumber(),5); ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(&(&(changenumber>=3)(changenumber<=4))(&(|(dc=y)(dc=x))(&(changenumber>=2)(changenumber<=5))))")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),3); assertEquals(startCLmsg.getLastDraftChangeNumber(),4); // ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(|(objectclass=*)(&(changenumber>=2)(changenumber<=5)))")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),-1); assertEquals(startCLmsg.getLastDraftChangeNumber(),-1); // ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(changenumber=8)")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),8); assertEquals(startCLmsg.getLastDraftChangeNumber(),8); // ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 1, 0); ChangeNumber changeNumber1 = gen.newChangeNumber(); ECLSearchOperation.evaluateFilter(startCLmsg, SearchFilter.createFilterFromString("(replicationcsn="+changeNumber1+")")); assertEquals(startCLmsg.getFirstDraftChangeNumber(),-1); assertEquals(startCLmsg.getLastDraftChangeNumber(),-1); assertEquals(startCLmsg.getChangeNumber(), changeNumber1); } catch(Exception e) { fail("Ending "+tn+" test with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } private void ECLCompatPurge() { String tn = "ECLCompatPurge"; debugInfo(tn, "Starting test\n\n"); try { DraftCNDbHandler draftdb = replicationServer.getDraftCNDbHandler(); assertEquals(draftdb.count(), 8); draftdb.setPurgeDelay(1000); // Now Purge the changelog db this.replicationServer.clearDb(); // Expect changes purged from the changelog db to be sometimes // also purged from the DraftCNDb. while(draftdb.count()>0) { debugInfo(tn, "draftdb.count="+draftdb.count()); sleep(200); } } catch(Exception e) { fail("Ending "+tn+" test with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } private void ECLCompatTestLimits(int expectedFirst, int expectedLast) { String tn = "ECLCompatTestLimits"; debugInfo(tn, "Starting test\n\n"); try { LDIFWriter ldifWriter = getLDIFWriter(); // search on 'cn=changelog' LinkedHashSet<String> attributes = new LinkedHashSet<String>(); attributes.add("*"); attributes.add("+"); debugInfo(tn, " Search: rootDSE"); InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf(""), SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(objectclass=*)"), attributes, NO_CONTROL, null); sleep(500); // test success and no entries returned assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString()); assertEquals(searchOp.getSearchEntries().size(), 1); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); assertEquals(searchOp.getSearchEntries().size(), 1); if (entries != null) { int i=0; for (SearchResultEntry resultEntry : entries) { i++; debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); checkValue(resultEntry,"firstchangenumber", String.valueOf(expectedFirst)); checkValue(resultEntry,"lastchangenumber", String.valueOf(expectedLast)); } } } catch(Exception e) { fail("Ending "+tn+" test with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -221,7 +221,7 @@ ReplicationBroker broker = new ReplicationBroker(null, state, baseDn.toNormalizedString(), serverId, window_size, generationId, 100000, getReplSessionSecurity(), (byte)1); generationId, 100000, getReplSessionSecurity(), (byte)1, 500); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + port); broker.start(servers); @@ -381,7 +381,7 @@ { ReplicationBroker broker = new ReplicationBroker(null, state, baseDn.toNormalizedString(), serverId, window_size, generationId, 100000, getReplSessionSecurity(), (byte)1); 100000, getReplSessionSecurity(), (byte)1, 500); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + port); broker.start(servers); @@ -420,7 +420,7 @@ ReplicationBroker broker = new ReplicationBroker(null, state, baseDn.toNormalizedString(), serverId, window_size, generationId, 0, getReplSessionSecurity(), (byte)1); generationId, 0, getReplSessionSecurity(), (byte)1, 500); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + port); broker.start(servers); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -210,7 +210,10 @@ "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n" + // heartbeat = 10 min so no need to emulate heartbeat in fake RS: session // not closed by client "ds-cfg-heartbeat-interval: 600000ms\n"; "ds-cfg-heartbeat-interval: 600000ms\n" + // heartbeat = 10 min so no need to emulate heartbeat in fake RS: session // not closed by client "ds-cfg-changetime-heartbeat-interval: 0ms\n"; String configEntryLdif = null; switch (assuredMode) @@ -253,7 +256,8 @@ "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n" + // heartbeat = 10 min so no need to emulate heartbeat in fake RS: session // not closed by client "ds-cfg-heartbeat-interval: 600000ms\n"; "ds-cfg-heartbeat-interval: 600000ms\n" + "ds-cfg-changetime-heartbeat-interval: 0ms\n"; Entry domainCfgEntry = TestCaseUtils.entryFromLdifString(configEntryLdif); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -186,6 +186,14 @@ /** * {@inheritDoc} */ public long getChangetimeHeartbeatInterval() { return 0; } /** * {@inheritDoc} */ public long getMaxReceiveDelay() { return 0; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -685,7 +685,7 @@ { super(serviceID, serverID); generationID = generationId; startPublishService(replicationServers, window, heartbeatInterval); startPublishService(replicationServers, window, heartbeatInterval, 500); startListenService(); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -324,7 +324,7 @@ { ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN, dsId, 100, generationId, 0, new ReplSessionSecurity(null, null, null, true), (byte) 1); new ReplSessionSecurity(null, null, null, true), (byte) 1, 500); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + rs1Port); broker.start(servers); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -629,6 +629,7 @@ (short) 123, (short) 45); op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid")); DeleteMsg delmsg = new DeleteMsg(op); int draftcn = 21; String serviceId = "serviceid"; @@ -639,9 +640,10 @@ "o=test2:000001210b6f21e904b100000002 000001210b6f21e904b200000002;"); // Constructor test ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, serviceId); ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, serviceId, draftcn); assertTrue(msg1.getCookie().equalsTo(cookie)); assertTrue(msg1.getServiceId().equalsIgnoreCase(serviceId)); assertTrue((msg1.getDraftChangeNumber()==draftcn)); DeleteMsg delmsg2 = (DeleteMsg)msg1.getUpdateMsg(); assertTrue(delmsg.compareTo(delmsg2)==0); @@ -651,6 +653,9 @@ assertTrue(msg2.getCookie().equalsTo(cookie)); assertTrue(msg2.getServiceId().equalsIgnoreCase(msg1.getServiceId())); assertTrue(msg2.getServiceId().equalsIgnoreCase(serviceId)); assertTrue(msg2.getDraftChangeNumber()==(msg1.getDraftChangeNumber())); assertTrue(msg2.getDraftChangeNumber()==draftcn); DeleteMsg delmsg1 = (DeleteMsg)msg1.getUpdateMsg(); delmsg2 = (DeleteMsg)msg2.getUpdateMsg(); assertTrue(delmsg2.compareTo(delmsg)==0); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -429,7 +429,7 @@ List<String> replicationServers = new ArrayList<String>(); replicationServers.add("localhost:" + rsPort); fakeReplicationDomain.startPublishService(replicationServers, window, 1000); fakeReplicationDomain.startPublishService(replicationServers, window, 1000, 500); if (startListen) fakeReplicationDomain.startListenService(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DraftCNDbHandlerTest.java
New file @@ -0,0 +1,299 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import java.io.File; import java.net.ServerSocket; import org.opends.server.TestCaseUtils; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.server.DraftCNDB.DraftCNDBCursor; import org.testng.annotations.Test; /** * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db : * - periodic trim * - call to clear method() */ public class DraftCNDbHandlerTest extends ReplicationTestCase { /** * This test makes basic operations of a DraftCNDb : * - create the db * - add records * - read them with a cursor * - set a very short trim period * - wait for the db to be trimmed / here since the changes are not stored in * the replication changelog, the draftCNDb will be cleared. * * @throws Exception */ @Test() void testDraftCNDbHandlerTrim() throws Exception { File testRoot = null; ReplicationServer replicationServer = null; ReplicationDbEnv dbEnv = null; DraftCNDbHandler handler = null; try { TestCaseUtils.startServer(); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); socket.close(); // configure a ReplicationServer. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, 100, null); replicationServer = new ReplicationServer(conf); // create or clean a directory for the DraftCNDbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = buildRoot + File.separator + "build" + File.separator + "unit-tests" + File.separator + "DraftCNDbHandler"; testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); dbEnv = new ReplicationDbEnv(path, replicationServer); handler = new DraftCNDbHandler(replicationServer, dbEnv); handler.setPurgeDelay(0); // Prepare data to be stored in the db int sn1 = 3; int sn2 = 4; int sn3 = 5; String value1 = "value1"; String value2 = "value2"; String value3 = "value3"; String serviceID1 = "serviceID1"; String serviceID2 = "serviceID2"; String serviceID3 = "serviceID3"; ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 1, 0); ChangeNumber changeNumber1 = gen.newChangeNumber(); ChangeNumber changeNumber2 = gen.newChangeNumber(); ChangeNumber changeNumber3 = gen.newChangeNumber(); // Add records handler.add(sn1, value1, serviceID1, changeNumber1); handler.add(sn2, value2, serviceID2, changeNumber2); handler.add(sn3, value3, serviceID3, changeNumber3); // The ChangeNumber should not get purged int firstkey = handler.getFirstKey(); assertEquals(firstkey, sn1); assertEquals(handler.getLastKey(), sn3); DraftCNDBCursor dbc = handler.getReadCursor(firstkey); assertEquals(dbc.currentChangeNumber(), changeNumber1); assertEquals(dbc.currentServiceID(), serviceID1); assertEquals(dbc.currentValue(), value1); assertTrue(dbc.toString().length() != 0); assertTrue(dbc.next()); assertEquals(dbc.currentChangeNumber(), changeNumber2); assertEquals(dbc.currentServiceID(), serviceID2); assertEquals(dbc.currentValue(), value2); assertTrue(dbc.next()); assertEquals(dbc.currentChangeNumber(), changeNumber3); assertEquals(dbc.currentServiceID(), serviceID3); assertEquals(dbc.currentValue(), value3); assertFalse(dbc.next()); handler.releaseReadCursor(dbc); handler.setPurgeDelay(100); // Check the db is cleared. while(handler.count()!=0) { Thread.sleep(200); } assertEquals(handler.getFirstKey(), 0); assertEquals(handler.getLastKey(), 0); } finally { if (handler != null) handler.shutdown(); if (dbEnv != null) dbEnv.shutdown(); if (replicationServer != null) replicationServer.remove(); if (testRoot != null) TestCaseUtils.deleteDirectory(testRoot); } } /** * This test makes basic operations of a DraftCNDb and explicitely call * the clear() method instead of waiting for the periodic trim to clear * it. * - create the db * - add records * - read them with a cursor * - clear the db. * @throws Exception */ @Test() void testDraftCNDbHandlerClear() throws Exception { File testRoot = null; ReplicationServer replicationServer = null; ReplicationDbEnv dbEnv = null; DraftCNDbHandler handler = null; try { TestCaseUtils.startServer(); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); socket.close(); // configure a ReplicationServer. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, 100, null); replicationServer = new ReplicationServer(conf); // create or clean a directory for the DraftCNDbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = buildRoot + File.separator + "build" + File.separator + "unit-tests" + File.separator + "DraftCNDbHandler"; testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); dbEnv = new ReplicationDbEnv(path, replicationServer); handler = new DraftCNDbHandler(replicationServer, dbEnv); handler.setPurgeDelay(0); // assertTrue(handler.count()==0); // Prepare data to be stored in the db int sn1 = 3; int sn2 = 4; int sn3 = 5; String value1 = "value1"; String value2 = "value2"; String value3 = "value3"; String serviceID1 = "serviceID1"; String serviceID2 = "serviceID2"; String serviceID3 = "serviceID3"; ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 1, 0); ChangeNumber changeNumber1 = gen.newChangeNumber(); ChangeNumber changeNumber2 = gen.newChangeNumber(); ChangeNumber changeNumber3 = gen.newChangeNumber(); // Add records handler.add(sn1, value1, serviceID1, changeNumber1); handler.add(sn2, value2, serviceID2, changeNumber2); handler.add(sn3, value3, serviceID3, changeNumber3); Thread.sleep(500); // Checks assertEquals(handler.getFirstKey(), sn1); assertEquals(handler.getLastKey(), sn3); assertEquals(handler.count(), 3, "Db count"); assertEquals(handler.getValue(sn1),value1); assertEquals(handler.getValue(sn2),value2); assertEquals(handler.getValue(sn3),value3); DraftCNDbIterator it = handler.generateIterator(sn1); assertEquals(it.getDraftCN(),sn1); assertTrue(it.next()); assertEquals(it.getDraftCN(),sn2); assertTrue(it.next()); assertEquals(it.getDraftCN(),sn3); assertFalse(it.next()); it.releaseCursor(); it = handler.generateIterator(sn2); assertEquals(it.getDraftCN(),sn2); assertTrue(it.next()); assertEquals(it.getDraftCN(),sn3); assertFalse(it.next()); it.releaseCursor(); it = handler.generateIterator(sn3); assertEquals(it.getDraftCN(),sn3); assertFalse(it.next()); it.releaseCursor(); // Clear ... handler.clear(); // Check the db is cleared. assertEquals(handler.getFirstKey(), 0); assertEquals(handler.getLastKey(), 0); assertTrue(handler.count()==0); } finally { if (handler != null) handler.shutdown(); if (dbEnv != null) dbEnv.shutdown(); if (replicationServer != null) replicationServer.remove(); if (testRoot != null) TestCaseUtils.deleteDirectory(testRoot); } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -67,7 +67,7 @@ BlockingQueue<UpdateMsg> queue) throws ConfigException { super(serviceID, serverID); startPublishService(replicationServers, window, heartbeatInterval); startPublishService(replicationServers, window, heartbeatInterval, 500); startListenService(); this.queue = queue; } @@ -83,7 +83,7 @@ int exportedEntryCount) throws ConfigException { super(serviceID, serverID); startPublishService(replicationServers, window, heartbeatInterval); startPublishService(replicationServers, window, heartbeatInterval, 500); startListenService(); this.exportString = exportString; this.importString = importString; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.service; @@ -63,7 +63,7 @@ BlockingQueue<UpdateMsg> queue) throws ConfigException { super(serviceID, serverID); startPublishService(replicationServers, window, heartbeatInterval); startPublishService(replicationServers, window, heartbeatInterval, 500); startListenService(); this.queue = queue; } @@ -78,7 +78,7 @@ StringBuilder importString) throws ConfigException { super(serviceID, serverID); startPublishService(replicationServers, window, heartbeatInterval); startPublishService(replicationServers, window, heartbeatInterval, 500); startListenService(); this.exportString = exportString; this.importString = importString;