opends/src/messages/messages/replication.properties
@@ -243,3 +243,6 @@ for domain %s with replication server %s - local data generation is %s \ - replication server data generation is %s - This may be only temporary \ or require a full resynchronization NOTICE_HEARTBEAT_FAILURE_97=%s is closing the session \ because it could not detect a heartbeat opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
@@ -27,6 +27,8 @@ package org.opends.server.replication.plugin; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -114,11 +116,7 @@ if (now > lastReceiveTime + 2 * heartbeatInterval) { // Heartbeat is well overdue so the server is assumed to be dead. if (debugEnabled()) { TRACER.debugInfo("Heartbeat monitor is closing the broker " + "session because it could not detect a heartbeat."); } logError(NOTE_HEARTBEAT_FAILURE.get(this.currentThread().getName())); session.close(); break; } opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -45,7 +45,7 @@ long numEntries; // The current number of entries exported long numExportedEntries; private long numExportedEntries; String entryBuffer = ""; /** @@ -92,11 +92,11 @@ entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex); // Send the entry if ((numEntries>0) && (numExportedEntries > numEntries)) if ((numEntries>0) && (getNumExportedEntries() > numEntries)) { // This outputstream has reached the total number // of entries to export. return; throw(new IOException()); } domain.exportLDIFEntry(entryBuffer); numExportedEntries++; @@ -114,4 +114,12 @@ } } } /** * Return the number of exported entries. * @return the numExportedEntries */ public long getNumExportedEntries() { return numExportedEntries; } } opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -524,8 +524,9 @@ if (heartbeatInterval > 0) { heartbeatMonitor = new HeartbeatMonitor("Replication Heartbeat Monitor", session, heartbeatInterval); new HeartbeatMonitor("Replication Heartbeat Monitor on " + baseDn + " with " + getReplicationServer(), session, heartbeatInterval); heartbeatMonitor.start(); } } opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -283,6 +283,7 @@ * @param count The value with which to initialize the counters. */ public void initImportExportCounters(long count) throws DirectoryException { entryCount = count; entryLeftCount = count; @@ -307,6 +308,7 @@ * an import or export. */ public void updateCounters() throws DirectoryException { entryLeftCount--; @@ -344,7 +346,7 @@ public ReplicationDomain(ReplicationDomainCfg configuration) throws ConfigException { super("replication flush"); super("replicationDomain_" + configuration.getBaseDN()); // Read the configuration parameters. replicationServers = configuration.getReplicationServer(); @@ -2536,7 +2538,10 @@ msg = broker.receive(); if (debugEnabled()) TRACER.debugInfo("Import: EntryBytes received " + msg); TRACER.debugInfo( " sid:" + this.serverId + " base DN:" + this.baseDN + " Import EntryBytes received " + msg); if (msg == null) { // The server is in the shutdown process @@ -2750,11 +2755,20 @@ } catch (DirectoryException de) { Message message = if ((ieContext != null) && (ieContext.checksumOutput) && (ros.getNumExportedEntries() >= ieContext.entryCount)) { // This is the normal end when computing the generationId // We can interrupt the export only by an IOException } else { Message message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject()); logError(message); throw new DirectoryException( ResultCode.OTHER, message, null); logError(message); throw new DirectoryException( ResultCode.OTHER, message, null); } } catch (Exception e) { @@ -2843,7 +2857,14 @@ serverId, ieContext.exportTarget, lDIFEntry.getBytes()); broker.publish(entryMessage); } ieContext.updateCounters(); try { ieContext.updateCounters(); } catch (DirectoryException de) { throw new IOException(de); } } /** @@ -2857,7 +2878,8 @@ public void initializeFromRemote(short source, Task initTask) throws DirectoryException { // TRACER.debugInfo("Entering initializeFromRemote"); if (debugEnabled()) TRACER.debugInfo("Entering initializeFromRemote"); acquireIEContext(); ieContext.initializeTask = initTask; @@ -2881,7 +2903,6 @@ public short decodeSource(String sourceString) throws DirectoryException { TRACER.debugInfo("Entering decodeSource"); short source = 0; Throwable cause = null; try @@ -3140,7 +3161,9 @@ // Process import backend.importLDIF(importConfig); TRACER.debugInfo("The import has ended successfully."); if (debugEnabled()) TRACER.debugInfo("The import has ended successfully on " + this.baseDN); stateSavingDisabled = false; } opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
@@ -100,4 +100,17 @@ { return this.senderID; } /** * Returns a string representation of the message. * * @return the string representation of this message. */ public String toString() { return "["+ this.getClass().getCanonicalName() + " sender=" + this.senderID + " destination=" + this.destination + "]"; } } opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -98,7 +98,8 @@ { if (debugEnabled()) { TRACER.debugVerbose("Closing SocketSession."); TRACER.debugInfo("Closing SocketSession." + Thread.currentThread().getStackTrace()); } if (plainSocket != null && !plainSocket.isClosed()) { opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -617,10 +617,14 @@ { if (!senderHandler.isReplicationServer()) { // Send to all replicationServers for (ServerHandler destinationHandler : replicationServers.values()) // Send to all replication servers with a least one remote // server connected for (ServerHandler rsh : replicationServers.values()) { servers.add(destinationHandler); if (!rsh.getRemoteLDAPServers().isEmpty()) { servers.add(rsh); } } } @@ -651,6 +655,8 @@ { for (ServerHandler h : replicationServers.values()) { // Send to all replication servers with a least one remote // server connected if (h.isRemoteLDAPServer(msg.getDestination())) { servers.add(h); @@ -696,13 +702,16 @@ { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get()); mb.append(" unreachable server ID=" + msg.getDestination()); mb.append(" unroutable message =" + msg); mb.append(" In Replication Server=" + this.replicationServer. getMonitorInstanceName()); mb.append(" domain =" + this.baseDn); mb.append(" unroutable message =" + msg.toString()); mb.append(" routing table is empty"); ErrorMessage errMsg = new ErrorMessage( this.replicationServer.getServerId(), msg.getsenderID(), mb.toMessage()); logError(mb.toMessage()); try { senderHandler.send(errMsg); opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -32,26 +32,20 @@ import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; import org.opends.messages.TaskMessages; import org.opends.messages.Message; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.replication.plugin.ReplicationDomain; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; /** @@ -122,8 +116,7 @@ String targetString = TaskUtils.getSingleValueString(attrList); target = domain.decodeTarget(targetString); createCounterAttribute(ATTR_TASK_INITIALIZE_LEFT, 0); createCounterAttribute(ATTR_TASK_INITIALIZE_DONE, 0); setTotal(0); } /** @@ -153,77 +146,27 @@ } /** * Create attribute to store entry counters. * @param name The name of the attribute. * @param value The value to store for that attribute. */ protected void createCounterAttribute(String name, long value) { AttributeType type; LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); Entry taskEntry = getTaskEntry(); try { type = getAttributeType(name, true); values.add(new AttributeValue(type, new ASN1OctetString(String.valueOf(value)))); ArrayList<Attribute> attrList = new ArrayList<Attribute>(1); attrList.add(new Attribute(type, name,values)); taskEntry.putAttribute(type, attrList); } finally { // taskScheduler.unlockEntry(taskEntryDN, lock); } } /** * Set the total number of entries expected to be exported. * @param total The total number of entries. * @throws DirectoryException when a problem occurs */ public void setTotal(long total) public void setTotal(long total) throws DirectoryException { this.total = total; try { updateAttribute(ATTR_TASK_INITIALIZE_LEFT, total); updateAttribute(ATTR_TASK_INITIALIZE_DONE, 0); } catch(Exception e) {} replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT, String.valueOf(total)); replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE, String.valueOf(0)); } /** * Set the total number of entries still to be exported. * @param left The total number of entries to be exported. * @throws DirectoryException when a problem occurs */ public void setLeft(long left) public void setLeft(long left) throws DirectoryException { this.left = left; try { updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left); updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left); } catch(Exception e) {} } /** * Update an attribute for this task. * @param name The name of the attribute. * @param value The value. * @throws DirectoryException When an error occurs. */ protected void updateAttribute(String name, long value) throws DirectoryException { Entry taskEntry = getTaskEntry(); ArrayList<Modification> modifications = new ArrayList<Modification>(); modifications.add(new Modification(ModificationType.REPLACE, new Attribute(name, String.valueOf(value)))); taskEntry.applyModifications(modifications); replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT, String.valueOf(left)); replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE,String.valueOf(total-left)); } }