mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
03.09.2006 83dd61651cb5d73c1a15dfcb7d217c0f272722d2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.backends.jeb;
 
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.types.Entry;
 
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
 
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.io.ByteArrayOutputStream;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FilenameFilter;
import java.io.FileOutputStream;
import java.io.IOException;
 
/**
 * This class is used to create an attribute index for an import process.
 * It is used as follows.
 * <pre>
 * startProcessing();
 * processEntry(entry);
 * processEntry(entry);
 * ...
 * stopProcessing();
 * merge();
 * </pre>
 */
public class IndexBuilder
{
  /**
   * The import context.
   */
  private ImportContext importContext;
 
  /**
   * The index database.
   */
  private Index index;
 
  /**
   * The indexer to generate the index keys.
   */
  private Indexer indexer;
 
  /**
   * The write buffer.
   */
  ArrayList<IndexMod> buffer;
 
  /**
   * The write buffer size.
   */
  private int bufferSize;
 
  /**
   * Current output file number.
   */
  private int fileNumber = 0;
 
  /**
   * The index entry limit.
   */
  private int entryLimit;
 
  /**
   * A unique prefix for temporary files to prevent conflicts.
   */
  private String fileNamePrefix;
 
  /**
   * Indicates whether we are replacing existing data or not.
   */
  private boolean replaceExisting = false;
 
 
  private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream();
  private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream();
 
  private DataOutputStream addBytesDataStream;
  private DataOutputStream delBytesDataStream;
 
  /**
   * A file name filter to identify temporary files we have written.
   */
  private FilenameFilter filter = new FilenameFilter()
  {
    public boolean accept(File d, String name)
    {
      return name.startsWith(fileNamePrefix);
    }
  };
 
  /**
   * Construct an index builder.
   *
   * @param importContext The import context.
   * @param index The index database we are writing.
   * @param entryLimit The index entry limit.
   * @param bufferSize The amount of memory available for buffering.
   */
  public IndexBuilder(ImportContext importContext,
                      Index index, int entryLimit, long bufferSize)
  {
    this.importContext = importContext;
    this.index = index;
    this.indexer = index.indexer;
    this.entryLimit = entryLimit;
    this.bufferSize = (int)bufferSize/100;
    long tid = Thread.currentThread().getId();
    fileNamePrefix = importContext.getContainerName() + "_" +
         indexer.toString() + "_" + tid + "_";
    replaceExisting =
         importContext.getLDIFImportConfig().appendToExistingData() &&
         importContext.getLDIFImportConfig().replaceExistingEntries();
    addBytesDataStream = new DataOutputStream(addBytesStream);
    delBytesDataStream = new DataOutputStream(delBytesStream);
  }
 
  /**
   * This method must be called before this object can process any
   * entries. It cleans up any temporary files left over from a
   * previous import.
   */
  public void startProcessing()
  {
    // Clean up any work files left over from a previous run.
    File tempDir = new File(importContext.getConfig().getImportTempDirectory());
    File[] files = tempDir.listFiles(filter);
    if (files != null)
    {
      for (File f : files)
      {
        f.delete();
      }
    }
 
    buffer = new ArrayList<IndexMod>(bufferSize);
  }
 
  /**
   * Indicates that the index thread should process the provided entry.
   * @param oldEntry The existing contents of the entry, or null if this is
   * a new entry.
   * @param newEntry The new contents of the entry.
   * @param entryID The entry ID.
   * @throws DatabaseException If an error occurs in the JE database.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
       throws DatabaseException, IOException
  {
    Transaction txn = null;
 
    // Update the index for this entry.
    if (oldEntry != null)
    {
      // This is an entry being replaced.
      Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
      Set<ASN1OctetString> delKeys = new HashSet<ASN1OctetString>();
 
      indexer.replaceEntry(txn, oldEntry, newEntry, addKeys, delKeys);
 
      for (ASN1OctetString k : delKeys)
      {
        removeID(k.value(), entryID);
      }
 
      for (ASN1OctetString k : addKeys)
      {
        insertID(k.value(), entryID);
      }
    }
    else
    {
      // This is a new entry.
      Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
      indexer.indexEntry(txn, newEntry, addKeys);
      for (ASN1OctetString k : addKeys)
      {
        insertID(k.value(), entryID);
      }
    }
 
  }
 
 
 
  /**
   * Indicates that there will be no more updates.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  public void stopProcessing() throws IOException
  {
    flushBuffer();
  }
 
 
 
  /**
   * Get a statistic of the number of keys that reached the entry limit.
   *
   * @return The number of keys that reached the entry limit.
   */
  public int getEntryLimitExceededCount()
  {
    return index.getEntryLimitExceededCount();
  }
 
  /**
   * Record the insertion of an entry ID.
   * @param key The index key.
   * @param entryID The entry ID.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  private void insertID(byte[] key, EntryID entryID)
       throws IOException
  {
    if (buffer.size() >= bufferSize)
    {
      flushBuffer();
    }
 
    IndexMod kav = new IndexMod(key, entryID, false);
    buffer.add(kav);
  }
 
  /**
   * Record the deletion of an entry ID.
   * @param key The index key.
   * @param entryID The entry ID.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  private void removeID(byte[] key, EntryID entryID)
       throws IOException
  {
    if (buffer.size() >= bufferSize)
    {
      flushBuffer();
    }
 
    IndexMod kav = new IndexMod(key, entryID, true);
    buffer.add(kav);
  }
 
  /**
   * Called when the buffer is full. It first sorts the buffer using the same
   * key comparator used by the index database. Then it merges all the
   * IDs for the same key together and writes each key and its list of IDs
   * to an intermediate binary file.
   * A list of deleted IDs is only present if we are replacing existing entries.
   *
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  private void flushBuffer() throws IOException
  {
    if (buffer.size() == 0)
    {
      return;
    }
 
    // Keys must be sorted before we can merge duplicates.
    IndexModComparator comparator;
    if (replaceExisting)
    {
      // The entry IDs may be out of order.
      // We must sort by key and ID.
      comparator = new IndexModComparator(indexer.getComparator(), true);
    }
    else
    {
      // The entry IDs are all new and are therefore already ordered.
      // We just need to sort by key.
      comparator = new IndexModComparator(indexer.getComparator(), false);
    }
    Collections.sort(buffer, comparator);
 
    // Start a new file.
    fileNumber++;
    String fileName = fileNamePrefix + String.valueOf(fileNumber);
    File file = new File(importContext.getConfig().getImportTempDirectory(),
                         fileName);
    BufferedOutputStream bufferedStream =
         new BufferedOutputStream(new FileOutputStream(file));
    DataOutputStream dataStream = new DataOutputStream(bufferedStream);
 
    // Reset the byte array output streams but preserve the underlying arrays.
    addBytesStream.reset();
    delBytesStream.reset();
 
    try
    {
      byte[] currentKey = null;
      for (IndexMod key : buffer)
      {
        byte[] keyString = key.key;
        if (!Arrays.equals(keyString,currentKey))
        {
          if (currentKey != null)
          {
            dataStream.writeInt(currentKey.length);
            dataStream.write(currentKey);
            dataStream.writeInt(addBytesStream.size());
            addBytesStream.writeTo(dataStream);
            if (replaceExisting)
            {
              dataStream.writeInt(delBytesStream.size());
              delBytesStream.writeTo(dataStream);
            }
          }
 
          currentKey = keyString;
          addBytesStream.reset();
          delBytesStream.reset();
        }
 
        if (key.isDelete)
        {
          delBytesDataStream.writeLong(key.value.longValue());
        }
        else
        {
          addBytesDataStream.writeLong(key.value.longValue());
        }
 
      }
 
      if (currentKey != null)
      {
        dataStream.writeInt(currentKey.length);
        dataStream.write(currentKey);
        dataStream.writeInt(addBytesStream.size());
        addBytesStream.writeTo(dataStream);
        if (replaceExisting)
        {
          dataStream.writeInt(delBytesStream.size());
          delBytesStream.writeTo(dataStream);
        }
      }
 
      buffer = new ArrayList<IndexMod>(bufferSize);
    }
    finally
    {
      dataStream.close();
    }
  }
 
  /**
   * Get a string that identifies this index builder.
   *
   * @return A string that identifies this index builder.
   */
  public String toString()
  {
    return indexer.toString();
  }
}