opends/src/server/org/opends/server/replication/plugin/Historical.java
@@ -82,15 +82,9 @@ public static final String HISTORICAL = "ds-synch-historical"; /** * The AttributeType associated to the attribute used to store * hitorical information. * The name of the entryuuid attribute. */ public static final AttributeType historicalAttrType = DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME); static final String ENTRYUIDNAME = "entryuuid"; static final AttributeType entryuuidAttrType = DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME); /* @@ -177,7 +171,7 @@ Modification mod; mod = new Modification(ModificationType.REPLACE, attr); mods.add(mod); modifiedEntry.removeAttribute(historicalAttrType); modifiedEntry.removeAttribute(attr.getAttributeType()); modifiedEntry.addAttribute(attr, null); } @@ -194,7 +188,7 @@ private AttributeInfo getAttrInfo(Modification mod) { Attribute modAttr = mod.getAttribute(); if (modAttr.getAttributeType().equals(Historical.historicalAttrType)) if (isHistoricalAttribute(modAttr)) { // Don't keep historical information for the attribute that is // used to store the historical information. @@ -229,6 +223,8 @@ */ public Attribute encode() { AttributeType historicalAttrType = DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME); LinkedHashSet<AttributeValue> hist = new LinkedHashSet<AttributeValue>(); for (Map.Entry<AttributeType, AttrInfoWithOptions> entryWithOptions : @@ -344,7 +340,7 @@ */ public static Historical load(Entry entry) { List<Attribute> hist = entry.getAttribute(historicalAttrType); List<Attribute> hist = getHistoricalAttr(entry); Historical histObj = new Historical(); AttributeType lastAttrType = null; Set<String> lastOptions = new HashSet<String>(); @@ -441,7 +437,7 @@ { TreeMap<ChangeNumber, FakeOperation> operations = new TreeMap<ChangeNumber, FakeOperation>(); List<Attribute> attrs = entry.getOperationalAttribute(historicalAttrType); List<Attribute> attrs = getHistoricalAttr(entry); if (attrs != null) { for (Attribute attr : attrs) @@ -478,6 +474,19 @@ } /** * Get the Attribute used to store the historical information from * the given Entry. * * @param entry The entry containing the historical information. * * @return The Attribute used to store the historical information. */ public static List<Attribute> getHistoricalAttr(Entry entry) { return entry.getAttribute(HISTORICALATTRIBUTENAME); } /** * Get the entry unique Id in String form. * * @param entry The entry for which the unique id should be returned. @@ -487,6 +496,8 @@ public static String getEntryUuid(Entry entry) { String uuidString = null; AttributeType entryuuidAttrType = DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME); List<Attribute> uuidAttrs = entry.getOperationalAttribute(entryuuidAttrType); if (uuidAttrs != null) @@ -513,6 +524,8 @@ { String uuidString = null; Map<AttributeType, List<Attribute>> attrs = op.getOperationalAttributes(); AttributeType entryuuidAttrType = DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME); List<Attribute> uuidAttrs = attrs.get(entryuuidAttrType); if (uuidAttrs != null) @@ -526,5 +539,20 @@ } return uuidString; } /** * Check if a given attribute is an attribute used to store historical * information. * * @param attr The attribute that needs to be checked. * * @return a boolean indicating if the given attribute is * used to store historical information. */ public static boolean isHistoricalAttribute(Attribute attr) { AttributeType attrType = attr.getAttributeType(); return attrType.getNameOrOID().equals(Historical.HISTORICALATTRIBUTENAME); } } opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -50,6 +50,8 @@ private ReplicationDomain listener; private boolean shutdown = false; private boolean done = false; /** * Constructor for the ListenerThread. @@ -76,14 +78,13 @@ public void run() { UpdateMessage msg; boolean done = false; if (debugEnabled()) { TRACER.debugInfo("Replication Listener thread starting."); } while (!done) while (shutdown == false) { try { @@ -91,7 +92,8 @@ { listener.replay(msg); } done = true; if (msg == null) shutdown = true; } catch (Exception e) { /* @@ -104,9 +106,27 @@ logError(message); } } done = true; if (debugEnabled()) { TRACER.debugInfo("Replication Listener thread stopping."); } } /** * Wait for the completion of this thread. */ public void waitForShutdown() { try { while (done == false) { Thread.sleep(50); } } catch (InterruptedException e) { // exit the loop if this thread is interrupted. } } } opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -192,6 +192,7 @@ MultimasterSynchronizationProviderCfg configuration) throws ConfigException { domains.clear(); replicationServerListener = new ReplicationServerListener(configuration); // Register as an add and delete listener with the root configuration so we @@ -435,6 +436,7 @@ { domain.shutdown(); } domains.clear(); // shutdown the ReplicationServer Service if necessary if (replicationServerListener != null) opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -245,6 +245,12 @@ private DN configDn; /** * A boolean indicating if the thread used to save the persistentServerState * is terminated. */ private boolean done = false; /** * This class contain the context related to an import or export * launched on the domain. */ @@ -1207,6 +1213,8 @@ { } } state.save(); done = true; } /** @@ -1216,6 +1224,10 @@ */ private void createListeners() { synchronized (synchroThreads) { if (!shutdown) { synchroThreads.clear(); for (int i=0; i<listenerThreadNumber; i++) { @@ -1224,20 +1236,26 @@ synchroThreads.add(myThread); } } } } /** * Shutdown this ReplicationDomain. */ public void shutdown() { // stop the flush thread shutdown = true; synchronized (synchroThreads) { // stop the listener threads for (ListenerThread thread : synchroThreads) { thread.shutdown(); } } // stop the flush thread shutdown = true; synchronized (this) { this.notify(); @@ -1253,7 +1271,19 @@ // wait for the listener thread to stop for (ListenerThread thread : synchroThreads) { thread.shutdown(); thread.waitForShutdown(); } // wait for completion of the persistentServerState thread. try { while (!done) { Thread.sleep(50); } } catch (InterruptedException e) { // stop waiting when interrupted. } } @@ -2249,7 +2279,7 @@ */ public long computeGenerationId() throws DirectoryException { Backend backend = this.retrievesBackend(baseDN); Backend backend = retrievesBackend(baseDN); long bec = backend.getEntryCount(); this.acquireIEContext(); ieContext.checksumOutput = true; @@ -3049,7 +3079,7 @@ LDIFImportConfig importConfig = null; DirectoryException de = null; Backend backend = this.retrievesBackend(baseDN); Backend backend = retrievesBackend(baseDN); if (!backend.supportsLDIFImport()) { opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -215,7 +215,8 @@ continue; } } if (!attr.getAttributeType().equals(Historical.historicalAttrType)) if (!Historical.isHistoricalAttribute(attr)) { LDAPModification ldapmod = new LDAPModification( mod.getModificationType(), new LDAPAttribute(mod.getAttribute())); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -26,8 +26,6 @@ */ package org.opends.server.replication; import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES; import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; @@ -40,25 +38,22 @@ import java.io.File; import java.net.ServerSocket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import java.net.SocketTimeoutException; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.core.AddOperation; import org.opends.server.core.AddOperationBasis; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.plugin.ReplicationDomain; @@ -71,7 +66,6 @@ import org.opends.server.replication.protocol.SocketSession; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.tasks.LdifFileWriter; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; @@ -79,8 +73,6 @@ import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchScope; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -1028,7 +1020,7 @@ // At this moment, root entry of the domain has been removed so // genId is no more in the database ... but it has still the old // value in memory. int found = testEntriesInDb(); testEntriesInDb(); replDomain.loadGenerationId(); debugInfo("Successfully ending " + testCase); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -52,7 +52,6 @@ import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.core.AddOperation; import org.opends.server.core.AddOperationBasis; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -36,10 +36,10 @@ import java.util.ArrayList; import java.util.List; import org.opends.server.TestCaseUtils; import org.opends.messages.Message; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.core.AddOperationBasis; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; @@ -49,7 +49,6 @@ import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.plugin.ReplicationDomain; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplicationMessage; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -793,6 +793,7 @@ gen.newChangeNumber(), user1entrysecondUUID); broker.publish(delMsg); resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, false); resultEntry = getEntry(personWithSecondUniqueID.getDN(), 10000, false); // check that the delete operation has been applied assertNull(resultEntry, @@ -1171,9 +1172,19 @@ broker.publish(modDnMsg); // unfortunately it is difficult to check that the operation // did not do anything. // The only thing we can check is that resolved naminf conflict counter // The only thing we can check is that resolved naming conflict counter // has correctly been incremented. assertEquals(getMonitorDelta(), 1); int count = 0; while ((count<2000) && getMonitorDelta() == 0) { // it is possible that the update has not yet been applied // wait a short time and try again. Thread.sleep(100); count++; } // if the monitor counter did not get incremented after 200sec // then something got wrong. assertTrue(count < 200); // Check that there was no administrative alert generated // because the conflict has been automatically resolved. opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -171,7 +171,7 @@ DN dn = DN.decode("uid=user.1,o=test"); Entry entry = DirectoryServer.getEntry(dn); List<Attribute> attrs = entry.getAttribute(Historical.historicalAttrType); List<Attribute> attrs = Historical.getHistoricalAttr(entry); Attribute before = attrs.get(0); // Check that encoding and decoding preserves the history information. opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java
@@ -108,8 +108,7 @@ // try a new modify operation on the base entry. op = conn.processModify(baseDn, generatemods("description", "test")); // chek that the operation was successful. // check that the update failed. // check that the operation was successfull. assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getAdditionalLogMessage().toString()); } @@ -119,9 +118,12 @@ MultimasterReplication.deleteDomain(baseDn); if (replicationPlugin != null) { replicationPlugin.finalizeSynchronizationProvider(); DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); } } } /** * Clean the database and replace with a single entry. opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java
@@ -601,6 +601,10 @@ */ private Entry initializeEntry() throws DirectoryException { AttributeType entryuuidAttrType = DirectoryServer.getSchema().getAttributeType( Historical.ENTRYUIDNAME); /* * Objectclass and DN do not have any impact on the modifty conflict * resolution for the description attribute. Always use the same values @@ -622,10 +626,10 @@ // Create the att values list LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>( 1); values.add(new AttributeValue(Historical.entryuuidAttrType, values.add(new AttributeValue(entryuuidAttrType, new ASN1OctetString(uuid.toString()))); ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1); Attribute uuidAttr = new Attribute(Historical.entryuuidAttrType, Attribute uuidAttr = new Attribute(entryuuidAttrType, "entryUUID", values); uuidList.add(uuidAttr); @@ -635,7 +639,7 @@ Map<AttributeType, List<Attribute>> operationalAttributes = entry .getOperationalAttributes(); operationalAttributes.put(Historical.entryuuidAttrType, uuidList); operationalAttributes.put(entryuuidAttrType, uuidList); return entry; } @@ -645,6 +649,8 @@ private void testHistoricalAndFake( Historical hist, Entry entry) { AttributeType entryuuidAttrType = DirectoryServer.getSchema().getAttributeType(Historical.ENTRYUIDNAME); // Get the historical uuid associated to the entry // (the one that needs to be tested) @@ -652,7 +658,7 @@ // Get the Entry uuid in String format List<Attribute> uuidAttrs = entry .getOperationalAttribute(Historical.entryuuidAttrType); .getOperationalAttribute(entryuuidAttrType); uuidAttrs.get(0).getValues().iterator().next().toString(); if (uuidAttrs != null) @@ -730,6 +736,10 @@ private List<Modification> replayModify( Entry entry, Historical hist, Modification mod, int date) { AttributeType historicalAttrType = DirectoryServer.getSchema().getAttributeType( Historical.HISTORICALATTRIBUTENAME); InternalClientConnection connection = InternalClientConnection.getRootConnection(); ChangeNumber t = new ChangeNumber(date, (short) 0, (short) 0); @@ -763,7 +773,7 @@ * works by encoding decoding and checking that the result is the same * as the initial value. */ entry.removeAttribute(Historical.historicalAttrType); entry.removeAttribute(historicalAttrType); entry.addAttribute(hist.encode(), null); Historical hist2 = Historical.load(entry); assertEquals(hist2.encode().toString(), hist.encode().toString()); @@ -793,6 +803,9 @@ private void testHistorical( Historical hist, LocalBackendAddOperation addOp) { AttributeType entryuuidAttrType = DirectoryServer.getSchema().getAttributeType( Historical.ENTRYUIDNAME); // Get the historical uuid associated to the entry // (the one that needs to be tested) @@ -800,7 +813,7 @@ // Get the op uuid in String format List<Attribute> uuidAttrs = addOp.getOperationalAttributes().get( Historical.entryuuidAttrType); entryuuidAttrType); uuidAttrs.get(0).getValues().iterator().next().toString(); if (uuidAttrs != null) opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
@@ -49,8 +49,11 @@ @Test() public void replServerApplyChangeTest() throws Exception { ReplicationServer replicationServer = null; TestCaseUtils.startServer(); try { // find two free ports for the replication Server port ServerSocket socket1 = TestCaseUtils.bindFreePort(); int replicationServerPort = socket1.getLocalPort(); @@ -63,7 +66,7 @@ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( replicationServerPort, null, 0, 1, 0, 0, null); ReplicationServer replicationServer = new ReplicationServer(conf); replicationServer = new ReplicationServer(conf); // Most of the configuration change are trivial to apply. // The interesting change is the change of the replication server port. @@ -84,4 +87,9 @@ // broker did connect successfully. assertTrue(broker.getCurrentSendWindow() != 0); } finally { replicationServer.shutdown(); } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -26,7 +26,6 @@ */ package org.opends.server.replication.server; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT; @@ -41,7 +40,6 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -49,9 +47,6 @@ import java.util.TreeSet; import java.util.UUID; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.core.DirectoryServer; @@ -75,7 +70,6 @@ import org.opends.server.replication.protocol.ReplServerStartMessage; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.ResetGenerationId; import org.opends.server.replication.protocol.ServerStartMessage; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.WindowMessage;