| | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | |
| | | * <li>then check it's not null</li> |
| | | * <li>then close all inside</li> |
| | | * </ol> |
| | | * When creating a JEReplicaDB, synchronize on the domainMap to avoid |
| | | * When creating a replicaDB, synchronize on the domainMap to avoid |
| | | * concurrent shutdown. |
| | | */ |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> |
| | | domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private ReplicationServerCfg config; |
| | | private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs = |
| | | new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>(); |
| | | /** |
| | | * \@GuardedBy("itself") |
| | | */ |
| | | private final Map<DN, List<DomainDBCursor>> registeredDomainCursors = |
| | | new HashMap<DN, List<DomainDBCursor>>(); |
| | | private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = |
| | | new CopyOnWriteArrayList<MultiDomainDBCursor>(); |
| | | private ReplicationDbEnv replicationEnv; |
| | | private final ReplicationServerCfg config; |
| | | private final File dbDirectory; |
| | | |
| | | /** |
| | |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private AtomicBoolean shutdown = new AtomicBoolean(); |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR = |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new DBCursor<UpdateMsg>() |
| | | { |
| | | |
| | |
| | | }; |
| | | |
| | | /** |
| | | * Builds an instance of this class. |
| | | * Creates a new changelog DB. |
| | | * |
| | | * @param replicationServer |
| | | * the local replication server. |
| | |
| | | * @throws ConfigException |
| | | * if a problem occurs opening the supplied directory |
| | | */ |
| | | public JEChangelogDB(ReplicationServer replicationServer, |
| | | ReplicationServerCfg config) throws ConfigException |
| | | public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) |
| | | throws ConfigException |
| | | { |
| | | this.config = config; |
| | | this.replicationServer = replicationServer; |
| | | this.dbDirectory = makeDir(config.getReplicationDBDirectory()); |
| | | } |
| | | |
| | | private File makeDir(String dbDirName) throws ConfigException |
| | | private File makeDir(final String dbDirName) throws ConfigException |
| | | { |
| | | // Check that this path exists or create it. |
| | | final File dbDirectory = getFileForPath(dbDirName); |
| | |
| | | { |
| | | logger.traceException(e); |
| | | |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); |
| | | mb.append(e.getLocalizedMessage()); |
| | | mb.append(" "); |
| | | mb.append(dbDirectory); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e); |
| | | final LocalizableMessageBuilder mb = new LocalizableMessageBuilder( |
| | | e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory)); |
| | | throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); |
| | | } |
| | | } |
| | | |
| | | private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN) |
| | | private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN) |
| | | { |
| | | final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); |
| | | if (domainMap != null) |
| | |
| | | return Collections.emptyMap(); |
| | | } |
| | | |
| | | private JEReplicaDB getReplicaDB(DN baseDN, int serverId) |
| | | private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId) |
| | | { |
| | | return getDomainMap(baseDN).get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Provision resources for the specified serverId in the specified replication |
| | | * domain. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain where to add the serverId |
| | | * @param serverId |
| | | * the server Id to add to the replication domain |
| | | * @throws ChangelogException |
| | | * If a database error happened. |
| | | */ |
| | | private void commission(DN baseDN, int serverId, ReplicationServer rs) |
| | | throws ChangelogException |
| | | { |
| | | getOrCreateReplicaDB(baseDN, serverId, rs); |
| | | } |
| | | |
| | | /** |
| | | * Returns a {@link JEReplicaDB}, possibly creating it. |
| | | * |
| | | * @param baseDN |
| | |
| | | * the serverId for which to create a ReplicaDB |
| | | * @param server |
| | | * the ReplicationServer |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it had |
| | | * to be created |
| | | * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created |
| | | * @throws ChangelogException |
| | | * if a problem occurred with the database |
| | | */ |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN, |
| | | int serverId, ReplicationServer server) throws ChangelogException |
| | | Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, |
| | | final ReplicationServer server) throws ChangelogException |
| | | { |
| | | while (!shutdown.get()) |
| | | { |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = |
| | | getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = |
| | | getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); |
| | | final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); |
| | | if (result != null) |
| | | { |
| | | final Boolean dbWasCreated = result.getSecond(); |
| | | if (dbWasCreated) |
| | | { // new replicaDB => update all cursors with it |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors != null && !cursors.isEmpty()) |
| | | { |
| | | for (DomainDBCursor cursor : cursors) |
| | | { |
| | | cursor.addReplicaDB(serverId, null); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return result; |
| | | } |
| | | } |
| | | throw new ChangelogException( |
| | | ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); |
| | | } |
| | | |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap( |
| | | DN baseDN) |
| | | private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN) |
| | | { |
| | | // happy path: the domainMap already exists |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = |
| | | domainToReplicaDBs.get(baseDN); |
| | | final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); |
| | | if (currentValue != null) |
| | | { |
| | | return currentValue; |
| | |
| | | // unlucky, the domainMap does not exist: take the hit and create the |
| | | // newValue, even though the same could be done concurrently by another |
| | | // thread |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = |
| | | new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = |
| | | domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>(); |
| | | final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); |
| | | if (previousValue != null) |
| | | { |
| | | // there was already a value associated to the key, let's use it |
| | | return previousValue; |
| | | } |
| | | |
| | | if (MultimasterReplication.isECLEnabledDomain(baseDN)) |
| | | { |
| | | // we just created a new domain => update all cursors |
| | | for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) |
| | | { |
| | | cursor.addDomain(baseDN, null); |
| | | } |
| | | } |
| | | return newValue; |
| | | } |
| | | |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB( |
| | | final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId, |
| | | DN baseDN, ReplicationServer server) throws ChangelogException |
| | | private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap, |
| | | final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException |
| | | { |
| | | // happy path: the JEReplicaDB already exists |
| | | // happy path: the replicaDB already exists |
| | | JEReplicaDB currentValue = domainMap.get(serverId); |
| | | if (currentValue != null) |
| | | { |
| | | return Pair.of(currentValue, false); |
| | | } |
| | | |
| | | // unlucky, the JEReplicaDB does not exist: take the hit and synchronize |
| | | // unlucky, the replicaDB does not exist: take the hit and synchronize |
| | | // on the domainMap to create a new ReplicaDB |
| | | synchronized (domainMap) |
| | | { |
| | |
| | | // The domainMap could have been concurrently removed because |
| | | // 1) a shutdown was initiated or 2) an initialize was called. |
| | | // Return will allow the code to: |
| | | // 1) shutdown properly or 2) lazily recreate the JEReplicaDB |
| | | // 1) shutdown properly or 2) lazily recreate the replicaDB |
| | | return null; |
| | | } |
| | | |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv); |
| | | final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv); |
| | | domainMap.put(serverId, newDB); |
| | | return Pair.of(newDB, true); |
| | | } |
| | |
| | | try |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = dbEnv.getChangelogState(); |
| | | replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); |
| | | final ChangelogState changelogState = replicationEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | |
| | | { |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | | commission(entry.getKey(), serverId, replicationServer); |
| | | getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void shutdownCNIndexDB() throws ChangelogException |
| | | private void shutdownChangeNumberIndexDB() throws ChangelogException |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | if (dbEnv != null) |
| | | if (replicationEnv != null) |
| | | { |
| | | // wait for shutdown of the threads holding cursors |
| | | try |
| | |
| | | // do nothing: we are already shutting down |
| | | } |
| | | |
| | | dbEnv.shutdown(); |
| | | replicationEnv.shutdown(); |
| | | } |
| | | |
| | | if (firstException != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears all content from the changelog database, but leaves its directory on |
| | | * the filesystem. |
| | | * Clears all records from the changelog (does not remove the changelog itself). |
| | | * |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * If an error occurs when clearing the changelog. |
| | | */ |
| | | public void clearDB() throws ChangelogException |
| | | { |
| | |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | | shutdownChangeNumberIndexDB(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | // 3- clear the changelogstate DB |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDN); |
| | | replicationEnv.clearGenerationId(baseDN); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | { |
| | | if (computeChangeNumber) |
| | | { |
| | | startIndexer(dbEnv.getChangelogState()); |
| | | startIndexer(replicationEnv.getChangelogState()); |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv); |
| | | cnIndexDB = new JEChangeNumberIndexDB(replicationEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | replicaDBCursor.next(); |
| | | final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | return cursor; |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | { |
| | | final ServerState domainState = offlineReplicas.getServerState(baseDN); |
| | | if (domainState != null) |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | for (CSN offlineCSN : domainState) |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | cursors = new ArrayList<DomainDBCursor>(); |
| | | registeredDomainCursors.put(baseDN, cursors); |
| | | } |
| | | cursors.add(cursor); |
| | | return cursor; |
| | | } |
| | | } |
| | | |
| | | private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final MultiDomainServerState offlineReplicas = |
| | | replicationEnv.getChangelogState().getOfflineReplicas(); |
| | | final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); |
| | | if (offlineCSN != null |
| | | && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | return null; |
| | | } |
| | |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | { |
| | | JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | return replicaDB.generateCursorFrom(startAfterCSN); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | | } |
| | | return EMPTY_CURSOR; |
| | | return EMPTY_CURSOR_REPLICA_DB; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | public void unregisterCursor(final DBCursor<?> cursor) |
| | | { |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | updateMsg.getCSN().getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | if (cursor instanceof MultiDomainDBCursor) |
| | | { |
| | | registeredMultiDomainCursors.remove(cursor); |
| | | } |
| | | else if (cursor instanceof DomainDBCursor) |
| | | { |
| | | final DomainDBCursor domainCursor = (DomainDBCursor) cursor; |
| | | synchronized (registeredMultiDomainCursors) |
| | | { |
| | | final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); |
| | | if (cursors != null) |
| | | { |
| | | cursors.remove(cursor); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, |
| | | csn.getServerId(), replicationServer); |
| | | final JEReplicaDB replicaDB = pair.getFirst(); |
| | | replicaDB.add(updateMsg); |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.publishUpdateMsg(baseDN, updateMsg); |
| | | } |
| | | return wasCreated; |
| | | return pair.getSecond(); // replica DB was created |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | @Override |
| | | public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException |
| | | { |
| | | dbEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | replicationEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |