opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -89,7 +89,7 @@ } else { entryBuffer = new String(b, startOfEntryIndex, len); entryBuffer = new String(b, startOfEntryIndex, bytesToRead); break; } } opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -705,12 +705,8 @@ } catch(DirectoryException de) { // Returns an error message to notify the sender int msgID = de.getMessageID(); ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), msgID, de.getMessage()); broker.publish(errorMsg); // An error message has been sent to the peer // Nothing more to do locally } } else if (msg instanceof InitializeTargetMessage) @@ -1900,7 +1896,7 @@ ((InitializeTask)ieContext.initializeTask). setState(ieContext.updateTaskCompletionState(),ieContext.exception); ieContext = null; releaseIEContext(); } } } @@ -2184,7 +2180,7 @@ try { source = Integer.decode(sourceString).shortValue(); if (source >= -1) if ((source >= -1) && (source != serverId)) { // TODO Verifies serverID is in the domain // We shold check here that this is a server implied @@ -2302,34 +2298,52 @@ public void initializeTarget(short target, short requestorID, Task initTask) throws DirectoryException { // FIXME Temporary workaround - will probably be fixed when implementing // dynamic config retrievesBackendInfos(this.baseDN); acquireIEContext(); ieContext.exportTarget = target; ieContext.initializeTask = initTask; ieContext.initTaskCounters(backend.getEntryCount()); if (initTask != null) { ieContext.initializeTask = initTask; ieContext.initTaskCounters(backend.getEntryCount()); } // Send start message // Send start message to the peer InitializeTargetMessage initializeMessage = new InitializeTargetMessage( baseDN, serverId, ieContext.exportTarget, requestorID, ieContext.entryLeftCount); backend.getEntryCount()); log("SD : publishes " + initializeMessage + " for #entries=" + ieContext.entryCount); " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount); broker.publish(initializeMessage); // make an export and send entries exportBackend(); // Successfull termnation DoneMessage doneMsg = new DoneMessage(serverId, initializeMessage.getDestination()); broker.publish(doneMsg); if (ieContext != null) try { ieContext.updateTaskCompletionState(); ieContext = null; exportBackend(); // Notify the peer of the success DoneMessage doneMsg = new DoneMessage(serverId, initializeMessage.getDestination()); broker.publish(doneMsg); releaseIEContext(); } catch(DirectoryException de) { // Notify the peer of the failure int msgID = de.getMessageID(); ErrorMessage errorMsg = new ErrorMessage(target, msgID, de.getMessage()); broker.publish(errorMsg); releaseIEContext(); throw(de); } } @@ -2470,7 +2484,8 @@ * * @param baseDN The baseDN of the domain to retrieve * @return The domain retrieved * @throws DirectoryException When an error occured. * @throws DirectoryException When an error occured or no domain * match the provided baseDN. */ public static ReplicationDomain retrievesReplicationDomain(DN baseDN) throws DirectoryException @@ -2495,12 +2510,8 @@ MultimasterReplication.findDomain(baseDN, null); if (sdomain == null) { int msgID = MSGID_NO_MATCHING_DOMAIN; String message = getMessage(msgID) + " " + baseDN; throw new DirectoryException(ResultCode.OTHER, message, msgID); break; } if (replicationDomain != null) { // Should never happen @@ -2511,6 +2522,14 @@ } replicationDomain = sdomain; } if (replicationDomain == null) { int msgID = MSGID_NO_MATCHING_DOMAIN; String message = getMessage(msgID) + " " + baseDN; throw new DirectoryException(ResultCode.OTHER, message, msgID); } return replicationDomain; } opendj-sdk/opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -119,7 +119,7 @@ message, msgID); } domain=ReplicationDomain.retrievesReplicationDomain(domainDN); domain = ReplicationDomain.retrievesReplicationDomain(domainDN); attrList = taskEntry.getAttribute(typeSourceScope); String sourceString = TaskUtils.getSingleValueString(attrList); @@ -139,7 +139,7 @@ TRACER.debugInfo("InitializeTask is starting domain: %s source:%d", domain.getBaseDN(), source); } initState = getTaskState(); // RUNNING initState = getTaskState(); try { // launch the import @@ -170,7 +170,8 @@ if (debugEnabled()) { TRACER.debugInfo("InitializeTask is ending with state:%d", initState); TRACER.debugInfo("InitializeTask is ending with state:%s", initState.toString()); } return initState; } @@ -195,10 +196,7 @@ } if (debugEnabled()) { logError(ErrorLogCategory.TASK, ErrorLogSeverity.SEVERE_ERROR, "setState: "+newState, 1); TRACER.debugInfo("InitializeTask/setState: ", newState); TRACER.debugInfo("InitializeTask/setState: %s", newState); } initState = newState; synchronized (initState) opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -275,6 +275,13 @@ " Expected entries :" + updatedEntries.length); } /** * Add a task to the configuration of the current running DS. * @param taskEntry The task to add. * @param expectedResult The expected result code for the ADD. * @param errorMessageID The expected error messageID when the expected * result code is not SUCCESS */ private void addTask(Entry taskEntry, ResultCode expectedResult, int errorMessageID) { @@ -762,7 +769,7 @@ // suffix synchronized String synchroServerStringDN = synchroPluginStringDN; String synchroServerLdif = "dn: cn=example, cn=domains" + synchroServerStringDN + "\n" "dn: cn=example, cn=domains," + synchroServerStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-provider-config\n" + "cn: example\n" @@ -1262,6 +1269,7 @@ // Creates config to synchronize suffix connectServer1ToChangelog(changelog1ID); // Test 1 Entry taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", @@ -1277,6 +1285,19 @@ waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN); // Test 2 taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: "+baseDn, "ds-task-initialize-replica-server-id: " + server1ID); addTask(taskInit, ResultCode.OTHER, MSGID_INVALID_IMPORT_SOURCE); if (sd != null) { log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());