| | |
| | | { |
| | | private Environment dbEnvironment; |
| | | private Database changelogStateDb; |
| | | /** |
| | | * The current changelogState. This is in-memory version of what is inside the |
| | | * on-disk changelogStateDB. It improves performances in case the |
| | | * changelogState is read often. |
| | | * |
| | | * @GuardedBy("stateLock") |
| | | */ |
| | | private final ChangelogState changelogState; |
| | | /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */ |
| | | private final Object stateLock = new Object(); |
| | | private final List<Database> allDbs = new CopyOnWriteArrayList<Database>(); |
| | | private ReplicationServer replicationServer; |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | |
| | | * of all the servers that have been seen in the past. |
| | | */ |
| | | changelogStateDb = openDatabase("changelogstate"); |
| | | changelogState = readOnDiskChangelogState(); |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Read and return the list of known servers from the database. |
| | | * Return the current changelog state. |
| | | * |
| | | * @return the current {@link ChangelogState} |
| | | */ |
| | | public ChangelogState getChangelogState() |
| | | { |
| | | return changelogState; |
| | | } |
| | | |
| | | /** |
| | | * Read and return the changelog state from the database. |
| | | * |
| | | * @return the {@link ChangelogState} read from the changelogState DB |
| | | * @throws ChangelogException |
| | | * if a database problem occurs |
| | | */ |
| | | public ChangelogState readChangelogState() throws ChangelogException |
| | | protected ChangelogState readOnDiskChangelogState() throws ChangelogException |
| | | { |
| | | return decodeChangelogState(readWholeState()); |
| | | } |
| | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | final Database replicaDB = openDatabase(replicaEntry.getKey()); |
| | | |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | synchronized (stateLock) |
| | | { |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | changelogState.addServerIdToDomain(serverId, baseDN); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | changelogState.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | return replicaDB; |
| | | } |
| | | catch (RuntimeException e) |
| | |
| | | */ |
| | | public void clearGenerationId(DN baseDN) throws ChangelogException |
| | | { |
| | | final int unusedGenId = 0; |
| | | deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | final int unusedGenId = 0; |
| | | deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | changelogState.setDomainGenerationId(baseDN, unusedGenId); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void clearServerId(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | changelogState.setDomainGenerationId(baseDN, -1); |
| | | } |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(Entry<byte[], ?> entry, |
| | |
| | | public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | changelogState.addOfflineReplica(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), |
| | | "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); |
| | | synchronized (stateLock) |
| | | { |
| | | deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), |
| | | "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); |
| | | changelogState.removeOfflineReplica(baseDN, serverId); |
| | | } |
| | | } |
| | | |
| | | private void putInChangelogStateDB(Entry<byte[], byte[]> entry, |