| | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.tasks.InitializeTargetTask; |
| | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.AssuredMode.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | |
| | | * Current status for this replicated domain. |
| | | */ |
| | | protected ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | /** The configuration of the replication domain. */ |
| | | protected volatile ReplicationDomainCfg config; |
| | |
| | | */ |
| | | private void receiveChangeStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + getBaseDN() + |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("Replication domain " + getBaseDN() + |
| | | " received change status message:\n" + csMsg); |
| | | |
| | | ServerStatus reqStatus = csMsg.getRequestedStatus(); |
| | |
| | | case BAD_GEN_ID_STATUS: |
| | | break; |
| | | default: |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " + |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("updateDomainForNewStatus: unexpected status: " + |
| | | status); |
| | | } |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | if (debugEnabled() && !(msg instanceof HeartbeatMsg)) |
| | | if (logger.isTraceEnabled() && !(msg instanceof HeartbeatMsg)) |
| | | { |
| | | TRACER.debugVerbose("LocalizableMessage received <" + msg + ">"); |
| | | logger.trace("LocalizableMessage received <" + msg + ">"); |
| | | } |
| | | |
| | | if (msg instanceof AckMsg) |
| | |
| | | A remote error during the import will be received in the |
| | | receiveEntryBytes() method. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | | "[IE] processErrorMsg:" + getServerId() + |
| | | " baseDN: " + getBaseDN() + |
| | | " Error Msg received: " + errorMsg); |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] starting " + getName()); |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] starting " + getName()); |
| | | try |
| | | { |
| | | initializeRemote(serverIdToInitialize, serverIdToInitialize, null, |
| | |
| | | */ |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] ending " + getName()); |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] ending " + getName()); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void setAckVal(int serverId, int numAck) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck); |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck); |
| | | |
| | | this.ackVals.put(serverId, numAck); |
| | | |
| | |
| | | */ |
| | | public int getSlowestServer() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] getSlowestServer" + slowestServerId |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] getSlowestServer" + slowestServerId |
| | | + " " + this.ackVals.get(slowestServerId)); |
| | | |
| | | return this.slowestServerId; |
| | |
| | | exportRootException = ieEx != null ? ieEx : exportException; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName() |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName() |
| | | + " export ends with " + " connected=" + broker.isConnected() |
| | | + " exportRootException=" + exportRootException); |
| | | |
| | |
| | | { |
| | | // We are still disconnected, so we wait for the listener thread |
| | | // to reconnect - wait 10s |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | | "[IE] Exporter wait for reconnection by the listener thread"); |
| | | int att=0; |
| | | while (!broker.shuttingDown() && !broker.isConnected() |
| | |
| | | final Set<Integer> replicasWeAreWaitingFor = |
| | | new HashSet<Integer>(ieCtx.startList); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | | "[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); |
| | | |
| | | int waitResultAttempt = 0; |
| | |
| | | done = true; |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | | "[IE] wait for start dsId " + dsi.getDsId() |
| | | + " " + dsi.getStatus() |
| | | + " " + dsi.getGenerationId() |
| | |
| | | |
| | | ieCtx.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | | "[IE] wait for start ends with " + ieCtx.failureList); |
| | | } |
| | | |
| | |
| | | final Set<Integer> replicasWeAreWaitingFor = |
| | | new HashSet<Integer>(ieCtx.startList); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | | "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); |
| | | |
| | | /* |
| | |
| | | |
| | | ieCtx.failureList.addAll(replicasWeAreWaitingFor); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace( |
| | | "[IE] wait for end ends with " + ieCtx.failureList); |
| | | } |
| | | |
| | |
| | | // potential disconnection of the exporter. |
| | | msg = broker.receive(false, false, true); |
| | | |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] In " |
| | | logger.trace("[IE] In " |
| | | + broker.getReplicationMonitorInstanceName() |
| | | + ", receiveEntryBytes " + msg); |
| | | } |
| | |
| | | final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( |
| | | getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt); |
| | | broker.publish(amsg, false); |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] In " |
| | | logger.trace("[IE] In " |
| | | + broker.getReplicationMonitorInstanceName() |
| | | + ", publish InitializeRcvAckMsg" + amsg); |
| | | } |
| | |
| | | public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) |
| | | throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] Entering exportLDIFEntry entry=" + |
| | | Arrays.toString(lDIFEntry)); |
| | | |
| | | // build the message |
| | |
| | | int ourLastExportedCnt = ieCtx.msgCnt; |
| | | int slowestCnt = ieCtx.ackVals.get(slowestServerId); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " + |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] Entering exportLDIFEntry waiting " + |
| | | " our=" + ourLastExportedCnt + " slowest=" + slowestCnt); |
| | | |
| | | if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting"); |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] Entering exportLDIFEntry waiting"); |
| | | |
| | | // our export is too far beyond the slowest importer - let's wait |
| | | try { Thread.sleep(100); } |
| | |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] slowest got to us => stop waiting"); |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] slowest got to us => stop waiting"); |
| | | break; |
| | | } |
| | | } // Waiting the slowest loop |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" |
| | | if (logger.isTraceEnabled()) |
| | | logger.trace("[IE] Entering exportLDIFEntry pub entry=" |
| | | + Arrays.toString(lDIFEntry)); |
| | | |
| | | boolean sent = broker.publish(entryMessage, false); |
| | |
| | | public void initializeFromRemote(int source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this); |
| | | logger.trace("[IE] Entering initializeFromRemote for " + this); |
| | | } |
| | | |
| | | LocalizableMessage errMsg = !broker.isConnected() |
| | |
| | | { |
| | | InitializeTask initFromTask = null; |
| | | |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Entering initialize - domain=" + this); |
| | | logger.trace("[IE] Entering initialize - domain=" + this); |
| | | } |
| | | |
| | | int source = initTargetMsgReceived.getSenderID(); |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | logger.trace("[IE] Domain=" + this |
| | | + " ends import with exception=" + ieCtx.getException() |
| | | + " connected=" + broker.isConnected()); |
| | | } |
| | |
| | | // =================== |
| | | // No new attempt case |
| | | |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("[IE] Domain=" + this |
| | | logger.trace("[IE] Domain=" + this |
| | | + " ends initialization with exception=" + ieCtx.getException() |
| | | + " connected=" + broker.isConnected() |
| | | + " task=" + initFromTask |
| | |
| | | } |
| | | |
| | | status = newStatus; |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication domain " + getBaseDN() |
| | | logger.trace("Replication domain " + getBaseDN() |
| | | + " new status is: " + status); |
| | | } |
| | | |
| | |
| | | public void resetGenerationId(Long generationIdNewValue) |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("Server id " + getServerId() + " and domain " |
| | | logger.trace("Server id " + getServerId() + " and domain " |
| | | + getBaseDN() + " resetGenerationId " + generationIdNewValue); |
| | | } |
| | | |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication Listener thread starting."); |
| | | logger.trace("Replication Listener thread starting."); |
| | | } |
| | | |
| | | // Loop processing any incoming update messages. |
| | |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication Listener thread stopping."); |
| | | logger.trace("Replication Listener thread stopping."); |
| | | } |
| | | } |
| | | }, threadName); |
| | |
| | | msg.wait(10); |
| | | } catch (InterruptedException e) |
| | | { |
| | | if (debugEnabled()) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | TRACER.debugInfo("waitForAck method interrupted for replication " + |
| | | logger.trace("waitForAck method interrupted for replication " + |
| | | "baseDN: " + getBaseDN()); |
| | | } |
| | | break; |