| | |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.server.ReplicationServer.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.math.BigDecimal; |
| | | import java.math.MathContext; |
| | | import java.math.RoundingMode; |
| | | import java.net.ConnectException; |
| | | import java.net.InetAddress; |
| | | import java.net.InetSocketAddress; |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.net.UnknownHostException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.net.*; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.MutableBoolean; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.server.ReplicationServer.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * The broker for Multi-master Replication. |
| | | */ |
| | |
| | | private volatile Collection<String> replicationServerUrls; |
| | | private volatile boolean connected = false; |
| | | /** |
| | | * String reported under cn=monitor when there is no connected RS. |
| | | * String reported under CSN=monitor when there is no connected RS. |
| | | */ |
| | | public final static String NO_CONNECTED_SERVER = "Not connected"; |
| | | private volatile String replicationServer = NO_CONNECTED_SERVER; |
| | |
| | | * @param groupId The group id of our domain. |
| | | * @param changeTimeHeartbeatInterval The interval (in ms) between Change |
| | | * time heartbeats are sent to the RS, |
| | | * or zero if no CN heartbeat should be sent. |
| | | * or zero if no CSN heartbeat should be sent. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, String baseDn, int serverID2, int window, |
| | |
| | | /** |
| | | * Now apply the choice base on the weight to the best servers list |
| | | */ |
| | | if (bestServers.size() > 1) |
| | | if (bestServers.size() == 1) |
| | | { |
| | | return bestServers.values().iterator().next(); |
| | | } |
| | | |
| | | if (firstConnection) |
| | | { |
| | | // We are not connected to a server yet |
| | |
| | | return computeBestServerForWeight(bestServers, rsServerId, |
| | | localServerId); |
| | | } |
| | | } else |
| | | { |
| | | return bestServers.values().iterator().next(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | Map<Integer, ReplicationServerInfo> moreUpToDateServers = |
| | | new HashMap<Integer, ReplicationServerInfo>(); |
| | | |
| | | // Extract the change number of the latest change generated by the local |
| | | // server |
| | | ChangeNumber myChangeNumber = localState.getChangeNumber(localServerId); |
| | | if (myChangeNumber == null) |
| | | // Extract the CSN of the latest change generated by the local server |
| | | CSN myCSN = localState.getCSN(localServerId); |
| | | if (myCSN == null) |
| | | { |
| | | myChangeNumber = new ChangeNumber(0, 0, localServerId); |
| | | myCSN = new CSN(0, 0, localServerId); |
| | | } |
| | | |
| | | /** |
| | |
| | | * if for instance we failed and restarted, having sent some changes to the |
| | | * RS but without having time to store our own state) regarding our own |
| | | * server id. If some servers more up to date, prefer this list but take |
| | | * only the latest change number. |
| | | * only the latest CSN. |
| | | */ |
| | | ChangeNumber latestRsChangeNumber = null; |
| | | CSN latestRsCSN = null; |
| | | for (Integer rsId : bestServers.keySet()) |
| | | { |
| | | ReplicationServerInfo replicationServerInfo = bestServers.get(rsId); |
| | | ServerState rsState = replicationServerInfo.getServerState(); |
| | | ChangeNumber rsChangeNumber = rsState.getChangeNumber(localServerId); |
| | | if (rsChangeNumber == null) |
| | | CSN rsCSN = rsState.getCSN(localServerId); |
| | | if (rsCSN == null) |
| | | { |
| | | rsChangeNumber = new ChangeNumber(0, 0, localServerId); |
| | | rsCSN = new CSN(0, 0, localServerId); |
| | | } |
| | | |
| | | // Has this replication server the latest local change ? |
| | | if (myChangeNumber.olderOrEqual(rsChangeNumber)) |
| | | if (myCSN.olderOrEqual(rsCSN)) |
| | | { |
| | | if (myChangeNumber.equals(rsChangeNumber)) |
| | | if (myCSN.equals(rsCSN)) |
| | | { |
| | | // This replication server has exactly the latest change from the |
| | | // local server |
| | |
| | | { |
| | | // This replication server is even more up to date than the local |
| | | // server |
| | | if (latestRsChangeNumber == null) |
| | | if (latestRsCSN == null) |
| | | { |
| | | // Initialize the latest change number |
| | | latestRsChangeNumber = rsChangeNumber; |
| | | // Initialize the latest CSN |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | if (rsChangeNumber.newerOrEquals(latestRsChangeNumber)) |
| | | if (rsCSN.newerOrEquals(latestRsCSN)) |
| | | { |
| | | if (rsChangeNumber.equals(latestRsChangeNumber)) |
| | | if (rsCSN.equals(latestRsCSN)) |
| | | { |
| | | moreUpToDateServers.put(rsId, replicationServerInfo); |
| | | } else |
| | |
| | | // new RS |
| | | moreUpToDateServers.clear(); |
| | | moreUpToDateServers.put(rsId, replicationServerInfo); |
| | | latestRsChangeNumber = rsChangeNumber; |
| | | latestRsCSN = rsCSN; |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | // Prefer servers more up to date than local server |
| | | return moreUpToDateServers; |
| | | } else |
| | | { |
| | | return upToDateServers; |
| | | } |
| | | return upToDateServers; |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | public void startChangeTimeHeartBeatPublishing() |
| | | { |
| | | // Start a CN heartbeat thread. |
| | | // Start a CSN heartbeat thread. |
| | | if (changeTimeHeartbeatSendInterval > 0) |
| | | { |
| | | String threadName = "Replica DS(" |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(this |
| | | + " is not configured to send CN heartbeat interval"); |
| | | + " is not configured to send CSN heartbeat interval"); |
| | | } |
| | | } |
| | | |