| | |
| | | import java.net.*; |
| | | import java.util.*; |
| | | import java.util.concurrent.CopyOnWriteArraySet; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.*; |
| | | import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; |
| | |
| | | import org.opends.server.replication.server.changelog.je.JEChangelogDB; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | new HashMap<DN, ReplicationServerDomain>(); |
| | | |
| | | private final ChangelogDB changelogDB; |
| | | private volatile boolean shutdown = false; |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | private boolean stopListen = false; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | private final ReplSessionSecurity replSessionSecurity; |
| | | |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private static String eclWorkflowID = |
| | | private static final String eclWorkflowID = |
| | | "External Changelog Workflow ID"; |
| | | private ECLWorkflowElement eclwe; |
| | | private AtomicReference<WorkflowImpl> eclWorkflowImpl = |
| | | private final AtomicReference<WorkflowImpl> eclWorkflowImpl = |
| | | new AtomicReference<WorkflowImpl>(); |
| | | |
| | | /** |
| | | * This is required for unit testing, so that we can keep track of all the |
| | | * replication servers which are running in the VM. |
| | | */ |
| | | private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>(); |
| | | private static final Set<Integer> localPorts = |
| | | new CopyOnWriteArraySet<Integer>(); |
| | | |
| | | // Monitors for synchronizing domain creation with the connect thread. |
| | | private final Object domainTicketLock = new Object(); |
| | |
| | | * Holds the list of all replication servers instantiated in this VM. |
| | | * This allows to perform clean up of the RS databases in unit tests. |
| | | */ |
| | | private static List<ReplicationServer> allInstances = |
| | | private static final List<ReplicationServer> allInstances = |
| | | new ArrayList<ReplicationServer>(); |
| | | |
| | | /** |
| | |
| | | * ports from other replication servers or from LDAP servers |
| | | * and spawn further thread responsible for handling those connections |
| | | */ |
| | | |
| | | void runListen() |
| | | { |
| | | Message listenMsg = NOTE_REPLICATION_SERVER_LISTENING.get( |
| | | logError(NOTE_REPLICATION_SERVER_LISTENING.get( |
| | | getServerId(), |
| | | listenSocket.getInetAddress().getHostAddress(), |
| | | listenSocket.getLocalPort()); |
| | | logError(listenMsg); |
| | | listenSocket.getLocalPort())); |
| | | |
| | | while (!shutdown && !stopListen) |
| | | while (!shutdown.get() && !stopListen) |
| | | { |
| | | // Wait on the replicationServer port. |
| | | // Read incoming messages and create LDAP or ReplicationServer listener |
| | | // and Publisher. |
| | | |
| | | try |
| | | { |
| | | Session session; |
| | |
| | | session = replSessionSecurity.createServerSession(newSocket, |
| | | timeoutMS); |
| | | if (session == null) // Error, go back to accept |
| | | { |
| | | continue; |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // If problems happen during the SSL handshake, it is necessary |
| | | // to close the socket to free the associated resources. |
| | | if (newSocket != null) |
| | | { |
| | | newSocket.close(); |
| | | } |
| | | continue; |
| | | } |
| | | |
| | |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | if (!shutdown) |
| | | if (!shutdown.get()) |
| | | { |
| | | logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage())); |
| | | } |
| | |
| | | { |
| | | synchronized (connectThreadLock) |
| | | { |
| | | while (!shutdown) |
| | | while (!shutdown.get()) |
| | | { |
| | | HostPort localAddress = HostPort.localAddress(getReplicationPort()); |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | |
| | | boolean sslEncryption = replSessionSecurity.isSslEncryption(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RS " + getMonitorInstanceName() + " connects to " |
| | | + remoteServerAddress); |
| | | } |
| | | |
| | | Socket socket = new Socket(); |
| | | Session session = null; |
| | |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | close(session); |
| | | close(socket); |
| | | } |
| | |
| | | */ |
| | | private void initialize() |
| | | { |
| | | shutdown = false; |
| | | shutdown.set(false); |
| | | |
| | | try |
| | | { |
| | |
| | | |
| | | // creates working threads: we must first connect, then start to listen. |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates connect thread"); |
| | | { |
| | | TRACER.debugInfo("RS " + getMonitorInstanceName() |
| | | + " creates connect thread"); |
| | | } |
| | | connectThread = new ReplicationServerConnectThread(this); |
| | | connectThread.start(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates listen thread"); |
| | | { |
| | | TRACER.debugInfo("RS " + getMonitorInstanceName() |
| | | + " creates listen thread"); |
| | | } |
| | | |
| | | listenThread = new ReplicationServerListenThread(this); |
| | | listenThread.start(); |
| | |
| | | eclwe = new ECLWorkflowElement(this); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " successfully initialized"); |
| | | { |
| | | TRACER.debugInfo("RS " + getMonitorInstanceName() |
| | | + " successfully initialized"); |
| | | } |
| | | } catch (UnknownHostException e) |
| | | { |
| | | logError(ERR_UNKNOWN_HOSTNAME.get()); |
| | |
| | | /** |
| | | * Waits for connections to this ReplicationServer. |
| | | */ |
| | | public void waitConnections() |
| | | void waitConnections() |
| | | { |
| | | // Acquire a domain ticket and wait for a complete cycle of the connect |
| | | // thread. |
| | |
| | | // Wait until the connect thread has processed next connect phase. |
| | | synchronized (domainTicketLock) |
| | | { |
| | | while (myDomainTicket > domainTicket && !shutdown) |
| | | while (myDomainTicket > domainTicket && !shutdown.get()) |
| | | { |
| | | try |
| | | { |
| | |
| | | { |
| | | localPorts.remove(getReplicationPort()); |
| | | |
| | | if (shutdown) |
| | | if (!shutdown.compareAndSet(false, true)) |
| | | { |
| | | return; |
| | | |
| | | shutdown = true; |
| | | } |
| | | |
| | | // shutdown the connect thread |
| | | if (connectThread != null) |
| | |
| | | connectThread.interrupt(); |
| | | } |
| | | |
| | | // shutdown the listener thread |
| | | try |
| | | { |
| | | if (listenSocket != null) |
| | | { |
| | | listenSocket.close(); |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | // replication Server service is closing anyway. |
| | | } |
| | | |
| | | // shutdown the listen thread |
| | | StaticUtils.close(listenSocket); |
| | | if (listenThread != null) |
| | | { |
| | | listenThread.interrupt(); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ReplicationServerCfg configuration) |
| | |
| | | catch (ChangelogException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | resultCode = ResultCode.OPERATIONS_ERROR; |
| | | } |
| | | } |
| | |
| | | catch (IOException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | logError(ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString())); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | logError(ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString())); |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean isConfigurationChangeAcceptable( |
| | | ReplicationServerCfg configuration, List<Message> unacceptableReasons) |
| | |
| | | */ |
| | | public long getGenerationId(DN baseDN) |
| | | { |
| | | ReplicationServerDomain rsd = getReplicationServerDomain(baseDN); |
| | | if (rsd!=null) |
| | | return rsd.getGenerationId(); |
| | | return -1; |
| | | final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN); |
| | | return rsd != null ? rsd.getGenerationId() : -1; |
| | | } |
| | | |
| | | /** |
| | |
| | | public void remove() |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing"); |
| | | |
| | | } |
| | | shutdown(); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | if (serversToDisconnect.isEmpty()) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | for (ReplicationServerDomain domain: getReplicationServerDomains()) |
| | | { |