| | |
| | | import java.util.Collection; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | |
| | |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.BackupConfig; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.DN; |
| | |
| | | |
| | | private String localURL = "null"; |
| | | private boolean shutdown = false; |
| | | private short replicationServerId; |
| | | private ReplicationDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | private int queueSize; |
| | |
| | | private boolean connectedInTopology = false; |
| | | private final Object connectedInTopologyLock = new Object(); |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | // Timeout (in milliseconds) when waiting for acknowledgments |
| | | private long assuredTimeout = 1000; |
| | | |
| | | // Group id |
| | | private byte groupId = (byte)1; |
| | | |
| | | // Number of pending changes for a DS, considered as threshold value to put |
| | | // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled |
| | | private int degradedStatusThreshold = 5000; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | |
| | | super("Replication Server" + configuration.getReplicationPort()); |
| | | |
| | | replicationPort = configuration.getReplicationPort(); |
| | | replicationServerId = (short) configuration.getReplicationServerId(); |
| | | serverId = (short) configuration.getReplicationServerId(); |
| | | replicationServers = configuration.getReplicationServer(); |
| | | if (replicationServers == null) |
| | | replicationServers = new ArrayList<String>(); |
| | |
| | | Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()); |
| | | throw new ConfigException(msg, e); |
| | | } |
| | | groupId = (byte)configuration.getGroupId(); |
| | | assuredTimeout = configuration.getAssuredTimeout(); |
| | | degradedStatusThreshold = configuration.getDegradedStatusThreshold(); |
| | | |
| | | replSessionSecurity = new ReplSessionSecurity(configuration); |
| | | initialize(replicationServerId, replicationPort); |
| | | initialize(replicationPort); |
| | | configuration.addChangeListener(this); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | |
| | |
| | | try |
| | | { |
| | | newSocket = listenSocket.accept(); |
| | | newSocket.setReceiveBufferSize(1000000); |
| | | newSocket.setTcpNoDelay(true); |
| | | newSocket.setKeepAlive(true); |
| | | ProtocolSession session = |
| | | replSessionSecurity.createServerSession(newSocket); |
| | | replSessionSecurity.createServerSession(newSocket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | if (session == null) // Error, go back to accept |
| | | continue; |
| | | ServerHandler handler = new ServerHandler(session, queueSize); |
| | |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(ServerAddr, 500); |
| | | |
| | | ServerHandler handler = new ServerHandler( |
| | | replSessionSecurity.createClientSession(serverURL, socket), |
| | | replSessionSecurity.createClientSession(serverURL, socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT), |
| | | queueSize); |
| | | handler.start(baseDn, serverId, this.serverURL, rcvWindow, |
| | | sslEncryption, this); |
| | |
| | | /** |
| | | * initialization function for the replicationServer. |
| | | * |
| | | * @param changelogId The unique identifier for this replicationServer. |
| | | * @param changelogPort The port on which the replicationServer should |
| | | * listen. |
| | | * |
| | | */ |
| | | private void initialize(short changelogId, int changelogPort) |
| | | private void initialize(int changelogPort) |
| | | { |
| | | shutdown = false; |
| | | |
| | |
| | | this); |
| | | |
| | | /* |
| | | * create replicationServer replicationServerDomain |
| | | */ |
| | | serverId = changelogId; |
| | | |
| | | /* |
| | | * Open replicationServer socket |
| | | */ |
| | | String localhostname = InetAddress.getLocalHost().getHostName(); |
| | |
| | | serverURL = localhostname + ":" + String.valueOf(changelogPort); |
| | | localURL = localAdddress + ":" + String.valueOf(changelogPort); |
| | | listenSocket = new ServerSocket(); |
| | | listenSocket.setReceiveBufferSize(1000000); |
| | | listenSocket.bind(new InetSocketAddress(changelogPort)); |
| | | |
| | | /* |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates connect threads"); |
| | | " creates connect thread"); |
| | | connectThread = |
| | | new ReplicationServerConnectThread("Replication Server Connect", this); |
| | | new ReplicationServerConnectThread("Replication Server Connect " + |
| | | serverId , this); |
| | | connectThread.start(); |
| | | |
| | | // FIXME : Is it better to have the time to receive the ReplServerInfo |
| | |
| | | try { Thread.sleep(300);} catch(Exception e) {} |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates listen threads"); |
| | | " creates listen thread"); |
| | | |
| | | listenThread = |
| | | new ReplicationServerListenThread("Replication Server Listener", this); |
| | | new ReplicationServerListenThread("Replication Server Listener " + |
| | | serverId , this); |
| | | listenThread.start(); |
| | | |
| | | if (debugEnabled()) |
| | |
| | | * DN given in parameter. |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @param baseDn The DN for which the dbHandler must be created. |
| | | * @return The new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | |
| | | * @param configuration The configuration to check. |
| | | * @param unacceptableReasons When the configuration is not acceptable, this |
| | | * table is use to return the reasons why this |
| | | * configuration is not acceptbale. |
| | | * configuration is not acceptable. |
| | | * |
| | | * @return true if the configuration is acceptable, false other wise. |
| | | */ |
| | |
| | | serverURL = localhostname + ":" + String.valueOf(replicationPort); |
| | | localURL = localAdddress + ":" + String.valueOf(replicationPort); |
| | | listenSocket = new ServerSocket(); |
| | | listenSocket.setReceiveBufferSize(1000000); |
| | | listenSocket.bind(new InetSocketAddress(replicationPort)); |
| | | |
| | | listenThread = |
| | |
| | | } |
| | | } |
| | | |
| | | // Update threshold value for status analyzers (stop them if requested |
| | | // value is 0) |
| | | if (degradedStatusThreshold != configuration.getDegradedStatusThreshold()) |
| | | { |
| | | int oldThresholdValue = degradedStatusThreshold; |
| | | degradedStatusThreshold = configuration.getDegradedStatusThreshold(); |
| | | for(ReplicationServerDomain rsd : baseDNs.values()) |
| | | { |
| | | if (degradedStatusThreshold == 0) |
| | | { |
| | | // Requested to stop analyzers |
| | | rsd.stopStatusAnalyzer(); |
| | | } else if (rsd.isRunningStatusAnalyzer()) |
| | | { |
| | | // Update the threshold value for this running analyzer |
| | | rsd.updateStatusAnalyzer(degradedStatusThreshold); |
| | | } else if (oldThresholdValue == 0) |
| | | { |
| | | // Requested to start analyzers with provided threshold value |
| | | if (rsd.getConnectedDSs().size() > 0) |
| | | rsd.startStatusAnalyzer(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if ((configuration.getReplicationDBDirectory() != null) && |
| | | (!dbDirname.equals(configuration.getReplicationDBDirectory()))) |
| | | { |
| | |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "Replication Server " + this.replicationPort + " " |
| | | + replicationServerId; |
| | | + serverId; |
| | | } |
| | | |
| | | /** |
| | |
| | | * publish the server id and the port number. |
| | | */ |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(new Attribute("replication server id", |
| | | attributes.add(Attributes.create("replication server id", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("replication server port", |
| | | attributes.add(Attributes.create("replication server port", |
| | | String.valueOf(replicationPort))); |
| | | |
| | | /* |
| | | * Add all the base DNs that are known by this replication server. |
| | | */ |
| | | AttributeType baseType= |
| | | DirectoryServer.getAttributeType("base-dn", true); |
| | | LinkedHashSet<AttributeValue> baseValues = |
| | | new LinkedHashSet<AttributeValue>(); |
| | | AttributeBuilder builder = new AttributeBuilder("base-dn"); |
| | | for (DN base : baseDNs.keySet()) |
| | | { |
| | | baseValues.add(new AttributeValue(baseType, base. toString())); |
| | | builder.add(base.toString()); |
| | | } |
| | | |
| | | Attribute bases = new Attribute(baseType, "base-dn", baseValues); |
| | | attributes.add(bases); |
| | | attributes.add(builder.toAttribute()); |
| | | |
| | | // Publish to monitor the generation ID by replicationServerDomain |
| | | AttributeType generationIdType= |
| | | DirectoryServer.getAttributeType("base-dn-generation-id", true); |
| | | LinkedHashSet<AttributeValue> generationIdValues = |
| | | new LinkedHashSet<AttributeValue>(); |
| | | builder = new AttributeBuilder("base-dn-generation-id"); |
| | | for (DN base : baseDNs.keySet()) |
| | | { |
| | | long generationId=-1; |
| | |
| | | getReplicationServerDomain(base, false); |
| | | if (replicationServerDomain != null) |
| | | generationId = replicationServerDomain.getGenerationId(); |
| | | generationIdValues.add(new AttributeValue(generationIdType, |
| | | base.toString() + " " + generationId)); |
| | | builder.add(base.toString() + " " + generationId); |
| | | } |
| | | Attribute generationIds = new Attribute(generationIdType, "generation-id", |
| | | generationIdValues); |
| | | attributes.add(generationIds); |
| | | attributes.add(builder.toAttribute()); |
| | | |
| | | return attributes; |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN)) |
| | | if (DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN)) |
| | | { |
| | | // Delete the replication backend |
| | | DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN, |
| | |
| | | boolean successful) |
| | | { |
| | | if (backend.getBackendID().equals(backendId)) |
| | | initialize(this.replicationServerId, this.replicationPort); |
| | | initialize(this.replicationPort); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the assured mode timeout. |
| | | * @return The assured mode timeout. |
| | | */ |
| | | public long getAssuredTimeout() |
| | | { |
| | | return assuredTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Get The replication server group id. |
| | | * @return The replication server group id. |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | } |
| | | |
| | | /** |
| | | * Get the threshold value for status analyzer. |
| | | * @return The threshold value for status analyzer. |
| | | */ |
| | | public int getDegradedStatusThreshold() |
| | | { |
| | | return degradedStatusThreshold; |
| | | } |
| | | |
| | | /** |
| | | * Compute the list of replication servers that are not any |
| | | * more connected to this Replication Server and stop the |
| | | * corresponding handlers. |