| | |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import static org.forgerock.opendj.ldap.ResultCode.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.common.AssuredMode.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | |
| | | import java.io.OutputStream; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.Arrays; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.TimeoutException; |
| | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.forgerock.opendj.server.config.meta.ReplicationDomainCfgDefn.AssuredType; |
| | | import org.forgerock.opendj.server.config.server.ReplicationDomainCfg; |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.tasks.InitializeTargetTask; |
| | | import org.opends.server.tasks.InitializeTask; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | | /** |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This class contains the context related to an import or export launched on |
| | | * the domain. |
| | | */ |
| | | /** This class contains the context related to an import or export launched on the domain. */ |
| | | protected class ImportExportContext |
| | | { |
| | | /** The private task that initiated the operation. */ |
| | |
| | | private final Set<Integer> failureList = new HashSet<>(0); |
| | | |
| | | /** |
| | | * Flow control during initialization: for each remote server, counter of |
| | | * messages received. |
| | | * Flow control during initialization: Map of remote serverId to number of messages received. |
| | | */ |
| | | private final Map<Integer, Integer> ackVals = new HashMap<>(); |
| | | /** ServerId of the slowest server (the one with the smallest non null counter). */ |
| | |
| | | logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck); |
| | | } |
| | | |
| | | this.ackVals.put(serverId, numAck); |
| | | ackVals.put(serverId, numAck); |
| | | |
| | | // Recompute the server with the minAck returned,means the slowest server. |
| | | slowestServerId = serverId; |
| | | for (Integer sid : importExportContext.get().ackVals.keySet()) |
| | | int minMsgReceived = ackVals.get(serverId); |
| | | for (Entry<Integer, Integer> mapEntry : ackVals.entrySet()) |
| | | { |
| | | if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) |
| | | int nbMsgReceived = mapEntry.getValue(); |
| | | if (nbMsgReceived < minMsgReceived) |
| | | { |
| | | slowestServerId = sid; |
| | | slowestServerId = mapEntry.getKey(); |
| | | minMsgReceived = nbMsgReceived; |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | LocalizableMessage message = ERR_INVALID_EXPORT_TARGET.get(); |
| | | throw new DirectoryException(resultCode, message, e); |
| | | throw new DirectoryException(ResultCode.OTHER, ERR_INVALID_EXPORT_TARGET.get(), e); |
| | | } |
| | | } |
| | | |
| | |
| | | * @throws DirectoryException If it was not possible to publish the |
| | | * Initialization message to the Topology. |
| | | */ |
| | | public void initializeRemote(int target, Task initTask) |
| | | throws DirectoryException |
| | | public void initializeRemote(int target, Task initTask) throws DirectoryException |
| | | { |
| | | initializeRemote(target, getServerId(), initTask, getInitWindow()); |
| | | } |
| | |
| | | - to update the task with the server(s) where this test failed |
| | | */ |
| | | |
| | | Map<Integer, DSInfo> replicaInfos = getReplicaInfos(); |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | if (replicaInfos.isEmpty()) |
| | | { |
| | | throw new DirectoryException(UNWILLING_TO_PERFORM, |
| | | ERR_FULL_UPDATE_NO_REMOTES.get(getBaseDN(), getServerId())); |
| | | } |
| | | |
| | | logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL, |
| | | countEntries(), getBaseDN(), getServerId()); |
| | | |
| | | ieCtx.startList.addAll(getReplicaInfos().keySet()); |
| | | ieCtx.startList.addAll(replicaInfos.keySet()); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | for (DSInfo dsi : replicaInfos.values()) |
| | | { |
| | | if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | DSInfo dsi = getDsInfoOrNull(replicaInfos.values(), serverToInitialize); |
| | | if (dsi == null) |
| | | { |
| | | throw new DirectoryException(UNWILLING_TO_PERFORM, |
| | | ERR_FULL_UPDATE_MISSING_REMOTE.get(getBaseDN(), getServerId(), serverToInitialize)); |
| | | } |
| | | |
| | | logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START, countEntries(), |
| | | getBaseDN(), getServerId(), serverToInitialize); |
| | | |
| | | ieCtx.startList.add(serverToInitialize); |
| | | |
| | | // We manage the list of servers with which a flow control can be enabled |
| | | for (DSInfo dsi : getReplicaInfos().values()) |
| | | { |
| | | if (dsi.getDsId() == serverToInitialize && |
| | | dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | ieCtx.setAckVal(dsi.getDsId(), 0); |
| | | } |
| | | } |
| | | } |
| | | |
| | | DirectoryException exportRootException = null; |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private DSInfo getDsInfoOrNull(Collection<DSInfo> replicaInfos, int serverToInitialize) |
| | | { |
| | | for (DSInfo dsi : replicaInfos) |
| | | { |
| | | if (dsi.getDsId() == serverToInitialize |
| | | && dsi.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | return dsi; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * For all remote servers in the start list: |
| | | * - wait it has finished the import and present the expected generationID, |