IndexWriter.java
001 package gate.creole.annic.apache.lucene.index;
002 
003 /**
004  * Copyright 2004 The Apache Software Foundation
005  *
006  * Licensed under the Apache License, Version 2.0 (the "License");
007  * you may not use this file except in compliance with the License.
008  * You may obtain a copy of the License at
009  *
010  *     http://www.apache.org/licenses/LICENSE-2.0
011  *
012  * Unless required by applicable law or agreed to in writing, software
013  * distributed under the License is distributed on an "AS IS" BASIS,
014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015  * See the License for the specific language governing permissions and
016  * limitations under the License.
017  */
018 
019 import java.io.IOException;
020 import java.io.File;
021 import java.io.PrintStream;
022 import java.util.Vector;
023 
024 import gate.creole.annic.apache.lucene.store.Directory;
025 import gate.creole.annic.apache.lucene.store.RAMDirectory;
026 import gate.creole.annic.apache.lucene.store.FSDirectory;
027 import gate.creole.annic.apache.lucene.store.Lock;
028 import gate.creole.annic.apache.lucene.store.InputStream;
029 import gate.creole.annic.apache.lucene.store.OutputStream;
030 import gate.creole.annic.apache.lucene.search.Similarity;
031 import gate.creole.annic.apache.lucene.document.Document;
032 import gate.creole.annic.apache.lucene.analysis.Analyzer;
033 
034 
035 /**
036   An IndexWriter creates and maintains an index.
037 
038   The third argument to the <a href="#IndexWriter"><b>constructor</b></a>
039   determines whether a new index is created, or whether an existing index is
040   opened for the addition of new documents.
041 
042   In either case, documents are added with the <a
043   href="#addDocument"><b>addDocument</b></a> method.  When finished adding
044   documents, <a href="#close"><b>close</b></a> should be called.
045 
046   If an index will not have more documents added for a while and optimal search
047   performance is desired, then the <a href="#optimize"><b>optimize</b></a>
048   method should be called before the index is closed.
049   */
050 @SuppressWarnings({"rawtypes","unchecked"})
051 public class IndexWriter {
052 
053   /**
054    * Default value is 1000.  Use <code>gate.creole.annic.apache.lucene.writeLockTimeout</code>
055    * system property to override.
056    */
057   public static final long WRITE_LOCK_TIMEOUT =
058     Integer.parseInt(System.getProperty("gate.creole.annic.apache.lucene.writeLockTimeout",
059       "1000"));
060 
061   /**
062    * Default value is 10000.  Use <code>gate.creole.annic.apache.lucene.commitLockTimeout</code>
063    * system property to override.
064    */
065   public static final long COMMIT_LOCK_TIMEOUT =
066     Integer.parseInt(System.getProperty("gate.creole.annic.apache.lucene.commitLockTimeout",
067       "10000"));
068 
069   public static final String WRITE_LOCK_NAME = "write.lock";
070   public static final String COMMIT_LOCK_NAME = "commit.lock";
071 
072   /**
073    * Default value is 10.  Use <code>gate.creole.annic.apache.lucene.mergeFactor</code>
074    * system property to override.
075    */
076   public static final int DEFAULT_MERGE_FACTOR =
077     Integer.parseInt(System.getProperty("gate.creole.annic.apache.lucene.mergeFactor",
078       "10"));
079 
080   /**
081    * Default value is 10.  Use <code>gate.creole.annic.apache.lucene.minMergeDocs</code>
082    * system property to override.
083    */
084   public static final int DEFAULT_MIN_MERGE_DOCS =
085     Integer.parseInt(System.getProperty("gate.creole.annic.apache.lucene.minMergeDocs",
086       "10"));
087 
088   /**
089    * Default value is {@link Integer#MAX_VALUE}.
090    * Use <code>gate.creole.annic.apache.lucene.maxMergeDocs</code> system property to override.
091    */
092   public static final int DEFAULT_MAX_MERGE_DOCS =
093     Integer.parseInt(System.getProperty("gate.creole.annic.apache.lucene.maxMergeDocs",
094       String.valueOf(Integer.MAX_VALUE)));
095 
096   /**
097    * Default value is 10000.  Use <code>gate.creole.annic.apache.lucene.maxFieldLength</code>
098    * system property to override.
099    */
100   public static final int DEFAULT_MAX_FIELD_LENGTH =
101     Integer.parseInt(System.getProperty("gate.creole.annic.apache.lucene.maxFieldLength",
102       "300000"));
103 
104 
105   private Directory directory;  // where this index resides
106   private Analyzer analyzer;    // how to analyze text
107 
108   private Similarity similarity = Similarity.getDefault()// how to normalize
109 
110   private SegmentInfos segmentInfos = new SegmentInfos()// the segments
111   private final Directory ramDirectory = new RAMDirectory()// for temp segs
112 
113   private Lock writeLock;
114 
115   /** Use compound file setting. Defaults to true, minimizing the number of
116    * files used.  Setting this to false may improve indexing performance, but
117    * may also cause file handle problems.
118    */
119   private boolean useCompoundFile = true;
120 
121   private boolean closeDir;
122 
123   /** Setting to turn on usage of a compound file. When on, multiple files
124    *  for each segment are merged into a single file once the segment creation
125    *  is finished. This is done regardless of what directory is in use.
126    */
127   public boolean getUseCompoundFile() {
128     return useCompoundFile;
129   }
130 
131   /** Setting to turn on usage of a compound file. When on, multiple files
132    *  for each segment are merged into a single file once the segment creation
133    *  is finished. This is done regardless of what directory is in use.
134    */
135   public void setUseCompoundFile(boolean value) {
136     useCompoundFile = value;
137   }
138 
139 
140     /** Expert: Set the Similarity implementation used by this IndexWriter.
141    *
142    @see Similarity#setDefault(Similarity)
143    */
144   public void setSimilarity(Similarity similarity) {
145     this.similarity = similarity;
146   }
147 
148   /** Expert: Return the Similarity implementation used by this IndexWriter.
149    *
150    <p>This defaults to the current value of {@link Similarity#getDefault()}.
151    */
152   public Similarity getSimilarity() {
153     return this.similarity;
154   }
155 
156   /**
157    * Constructs an IndexWriter for the index in <code>path</code>.
158    * Text will be analyzed with <code>a</code>.  If <code>create</code>
159    * is true, then a new, empty index will be created in
160    <code>path</code>, replacing the index already there, if any.
161    *
162    @param path the path to the index directory
163    @param a the analyzer to use
164    @param create <code>true</code> to create the index or overwrite
165    *  the existing one; <code>false</code> to append to the existing
166    *  index
167    @throws IOException if the directory cannot be read/written to, or
168    *  if it does not exist, and <code>create</code> is
169    *  <code>false</code>
170    */
171   public IndexWriter(String path, Analyzer a, boolean create)
172        throws IOException {
173     this(FSDirectory.getDirectory(path, create), a, create, true);
174   }
175 
176   /**
177    * Constructs an IndexWriter for the index in <code>path</code>.
178    * Text will be analyzed with <code>a</code>.  If <code>create</code>
179    * is true, then a new, empty index will be created in
180    <code>path</code>, replacing the index already there, if any.
181    *
182    @param path the path to the index directory
183    @param a the analyzer to use
184    @param create <code>true</code> to create the index or overwrite
185    *  the existing one; <code>false</code> to append to the existing
186    *  index
187    @throws IOException if the directory cannot be read/written to, or
188    *  if it does not exist, and <code>create</code> is
189    *  <code>false</code>
190    */
191   public IndexWriter(File path, Analyzer a, boolean create)
192        throws IOException {
193     this(FSDirectory.getDirectory(path, create), a, create, true);
194   }
195 
196   /**
197    * Constructs an IndexWriter for the index in <code>d</code>.
198    * Text will be analyzed with <code>a</code>.  If <code>create</code>
199    * is true, then a new, empty index will be created in
200    <code>d</code>, replacing the index already there, if any.
201    *
202    @param d the index directory
203    @param a the analyzer to use
204    @param create <code>true</code> to create the index or overwrite
205    *  the existing one; <code>false</code> to append to the existing
206    *  index
207    @throws IOException if the directory cannot be read/written to, or
208    *  if it does not exist, and <code>create</code> is
209    *  <code>false</code>
210    */
211   public IndexWriter(Directory d, Analyzer a, boolean create)
212        throws IOException {
213     this(d, a, create, false);
214   }
215 
216   private IndexWriter(Directory d, Analyzer a, final boolean create, boolean closeDir)
217     throws IOException {
218       this.closeDir = closeDir;
219       directory = d;
220       analyzer = a;
221 
222       Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME);
223       if (!writeLock.obtain(WRITE_LOCK_TIMEOUT)) // obtain write lock
224         throw new IOException("Index locked for write: " + writeLock);
225       this.writeLock = writeLock;                   // save it
226 
227       synchronized (directory) {        // in- & inter-process sync
228         new Lock.With(directory.makeLock(IndexWriter.COMMIT_LOCK_NAME), COMMIT_LOCK_TIMEOUT) {
229             @Override
230             public Object doBody() throws IOException {
231               if (create)
232                 segmentInfos.write(directory);
233               else
234                 segmentInfos.read(directory);
235               return null;
236             }
237           }.run();
238       }
239   }
240 
241   /** Flushes all changes to an index and closes all associated files. */
242   public synchronized void close() throws IOException {
243     flushRamSegments();
244     ramDirectory.close();
245     writeLock.release();                          // release write lock
246     writeLock = null;
247     if(closeDir)
248       directory.close();
249   }
250 
251   /** Release the write lock, if needed. */
252   @Override
253   protected void finalize() throws IOException {
254     if (writeLock != null) {
255       writeLock.release();                        // release write lock
256       writeLock = null;
257     }
258   }
259 
260   /** Returns the analyzer used by this index. */
261   public Analyzer getAnalyzer() {
262       return analyzer;
263   }
264 
265 
266   /** Returns the number of documents currently in this index. */
267   public synchronized int docCount() {
268     int count = 0;
269     for (int i = 0; i < segmentInfos.size(); i++) {
270       SegmentInfo si = segmentInfos.info(i);
271       count += si.docCount;
272     }
273     return count;
274   }
275 
276   /**
277    * The maximum number of terms that will be indexed for a single field in a
278    * document.  This limits the amount of memory required for indexing, so that
279    * collections with very large files will not crash the indexing process by
280    * running out of memory.<p/>
281    * Note that this effectively truncates large documents, excluding from the
282    * index terms that occur further in the document.  If you know your source
283    * documents are large, be sure to set this value high enough to accomodate
284    * the expected size.  If you set it to Integer.MAX_VALUE, then the only limit
285    * is your memory, but you should anticipate an OutOfMemoryError.<p/>
286    * By default, no more than 10,000 terms will be indexed for a field.
287   */
288   public int maxFieldLength = DEFAULT_MAX_FIELD_LENGTH;
289 
290   /**
291    * Adds a document to this index.  If the document contains more than
292    {@link #maxFieldLength} terms for a given field, the remainder are
293    * discarded.
294    */
295   public void addDocument(Document docthrows IOException {
296     addDocument(doc, analyzer);
297   }
298 
299   /**
300    * Adds a document to this index, using the provided analyzer instead of the
301    * value of {@link #getAnalyzer()}.  If the document contains more than
302    {@link #maxFieldLength} terms for a given field, the remainder are
303    * discarded.
304    */
305   public void addDocument(Document doc, Analyzer analyzerthrows IOException {
306     DocumentWriter dw =
307       new DocumentWriter(ramDirectory, analyzer, similarity, maxFieldLength);
308     String segmentName = newSegmentName();
309     dw.addDocument(segmentName, doc);
310     synchronized (this) {
311       segmentInfos.addElement(new SegmentInfo(segmentName, 1, ramDirectory));
312       maybeMergeSegments();
313     }
314   }
315 
316   final int getSegmentsCounter(){
317     return segmentInfos.counter;
318   }
319 
320   private final synchronized String newSegmentName() {
321     return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
322   }
323 
324   /** Determines how often segment indices are merged by addDocument().  With
325    * smaller values, less RAM is used while indexing, and searches on
326    * unoptimized indices are faster, but indexing speed is slower.  With larger
327    * values, more RAM is used during indexing, and while searches on unoptimized
328    * indices are slower, indexing is faster.  Thus larger values (> 10) are best
329    * for batch index creation, and smaller values (< 10) for indices that are
330    * interactively maintained.
331    *
332    <p>This must never be less than 2.  The default value is 10.*/
333   public int mergeFactor = DEFAULT_MERGE_FACTOR;
334 
335   /** Determines the minimal number of documents required before the buffered
336    * in-memory documents are merging and a new Segment is created.
337    * Since Documents are merged in a {@link gate.creole.annic.apache.lucene.store.RAMDirectory},
338    * large value gives faster indexing.  At the same time, mergeFactor limits
339    * the number of files open in a FSDirectory.
340    *
341    <p> The default value is 10.*/
342   public int minMergeDocs = DEFAULT_MIN_MERGE_DOCS;
343 
344 
345   /** Determines the largest number of documents ever merged by addDocument().
346    * Small values (e.g., less than 10,000) are best for interactive indexing,
347    * as this limits the length of pauses while indexing to a few seconds.
348    * Larger values are best for batched indexing and speedier searches.
349    *
350    <p>The default value is {@link Integer#MAX_VALUE}. */
351   public int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;
352 
353   /** If non-null, information about merges will be printed to this. */
354   public PrintStream infoStream = null;
355 
356   /** Merges all segments together into a single segment, optimizing an index
357       for search. */
358   public synchronized void optimize() throws IOException {
359     flushRamSegments();
360     while (segmentInfos.size() ||
361            (segmentInfos.size() == &&
362             (SegmentReader.hasDeletions(segmentInfos.info(0)) ||
363              segmentInfos.info(0).dir != directory ||
364              (useCompoundFile &&
365               (!SegmentReader.usesCompoundFile(segmentInfos.info(0)) ||
366                 SegmentReader.hasSeparateNorms(segmentInfos.info(0))))))) {
367       int minSegment = segmentInfos.size() - mergeFactor;
368       mergeSegments(minSegment < : minSegment);
369     }
370   }
371 
372   /** Merges all segments from an array of indexes into this index.
373    *
374    <p>This may be used to parallelize batch indexing.  A large document
375    * collection can be broken into sub-collections.  Each sub-collection can be
376    * indexed in parallel, on a different thread, process or machine.  The
377    * complete index can then be created by merging sub-collection indexes
378    * with this method.
379    *
380    <p>After this completes, the index is optimized. */
381   public synchronized void addIndexes(Directory[] dirs)
382       throws IOException {
383     optimize();            // start with zero or 1 seg
384     for (int i = 0; i < dirs.length; i++) {
385       SegmentInfos sis = new SegmentInfos();    // read infos from dir
386       sis.read(dirs[i]);
387       for (int j = 0; j < sis.size(); j++) {
388         segmentInfos.addElement(sis.info(j));    // add each info
389       }
390     }
391     optimize();            // final cleanup
392   }
393 
394   /** Merges the provided indexes into this index.
395    <p>After this completes, the index is optimized. </p>
396    <p>The provided IndexReaders are not closed.</p>
397    */
398   public synchronized void addIndexes(IndexReader[] readers)
399     throws IOException {
400 
401     optimize();            // start with zero or 1 seg
402 
403     String mergedName = newSegmentName();
404     SegmentMerger merger = new SegmentMerger(directory, mergedName, false);
405 
406     if (segmentInfos.size() == 1)                 // add existing index, if any
407       merger.add(new SegmentReader(segmentInfos.info(0)));
408 
409     for (int i = 0; i < readers.length; i++)      // add new indexes
410       merger.add(readers[i]);
411 
412     int docCount = merger.merge();                // merge 'em
413 
414     segmentInfos.setSize(0);                      // pop old infos & add new
415     segmentInfos.addElement(new SegmentInfo(mergedName, docCount, directory));
416 
417     synchronized (directory) {        // in- & inter-process sync
418       new Lock.With(directory.makeLock("commit.lock"), COMMIT_LOCK_TIMEOUT) {
419     @Override
420     public Object doBody() throws IOException {
421       segmentInfos.write(directory);    // commit changes
422       return null;
423     }
424   }.run();
425     }
426   }
427 
428   /** Merges all RAM-resident segments. */
429   private final void flushRamSegments() throws IOException {
430     int minSegment = segmentInfos.size()-1;
431     int docCount = 0;
432     while (minSegment >= &&
433            (segmentInfos.info(minSegment)).dir == ramDirectory) {
434       docCount += segmentInfos.info(minSegment).docCount;
435       minSegment--;
436     }
437     if (minSegment < ||        // add one FS segment?
438         (docCount + segmentInfos.info(minSegment).docCount> mergeFactor ||
439         !(segmentInfos.info(segmentInfos.size()-1).dir == ramDirectory))
440       minSegment++;
441     if (minSegment >= segmentInfos.size())
442       return;            // none to merge
443     mergeSegments(minSegment);
444   }
445 
446   /** Incremental segment merger.  */
447   private final void maybeMergeSegments() throws IOException {
448     long targetMergeDocs = minMergeDocs;
449     while (targetMergeDocs <= maxMergeDocs) {
450       // find segments smaller than current target size
451       int minSegment = segmentInfos.size();
452       int mergeDocs = 0;
453       while (--minSegment >= 0) {
454         SegmentInfo si = segmentInfos.info(minSegment);
455         if (si.docCount >= targetMergeDocs)
456           break;
457         mergeDocs += si.docCount;
458       }
459 
460       if (mergeDocs >= targetMergeDocs)      // found a merge to do
461         mergeSegments(minSegment+1);
462       else
463         break;
464 
465       targetMergeDocs *= mergeFactor;      // increase target size
466     }
467   }
468 
469   /** Pops segments off of segmentInfos stack down to minSegment, merges them,
470     and pushes the merged index onto the top of the segmentInfos stack. */
471   private final void mergeSegments(int minSegment)
472       throws IOException {
473     String mergedName = newSegmentName();
474     if (infoStream != nullinfoStream.print("merging segments");
475     SegmentMerger merger =
476         new SegmentMerger(directory, mergedName, useCompoundFile);
477 
478     final Vector segmentsToDelete = new Vector();
479     for (int i = minSegment; i < segmentInfos.size(); i++) {
480       SegmentInfo si = segmentInfos.info(i);
481       if (infoStream != null)
482         infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
483       IndexReader reader = new SegmentReader(si);
484       merger.add(reader);
485       if ((reader.directory() == this.directory|| // if we own the directory
486           (reader.directory() == this.ramDirectory))
487         segmentsToDelete.addElement(reader);   // queue segment for deletion
488     }
489 
490     int mergedDocCount = merger.merge();
491 
492     if (infoStream != null) {
493       infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)");
494     }
495 
496     segmentInfos.setSize(minSegment);          // pop old infos & add new
497     segmentInfos.addElement(new SegmentInfo(mergedName, mergedDocCount,
498                                             directory));
499 
500     // close readers before we attempt to delete now-obsolete segments
501     merger.closeReaders();
502 
503     synchronized (directory) {                 // in- & inter-process sync
504       new Lock.With(directory.makeLock(IndexWriter.COMMIT_LOCK_NAME), COMMIT_LOCK_TIMEOUT) {
505           @Override
506           public Object doBody() throws IOException {
507             segmentInfos.write(directory);     // commit before deleting
508             deleteSegments(segmentsToDelete);  // delete now-unused segments
509             return null;
510           }
511         }.run();
512     }
513 
514   }
515 
516   /* Some operating systems (e.g. Windows) don't permit a file to be deleted
517      while it is opened for read (e.g. by another process or thread).  So we
518      assume that when a delete fails it is because the file is open in another
519      process, and queue the file for subsequent deletion. */
520 
521   private final void deleteSegments(Vector segmentsthrows IOException {
522     Vector deletable = new Vector();
523 
524     deleteFiles(readDeleteableFiles(), deletable)// try to delete deleteable
525 
526     for (int i = 0; i < segments.size(); i++) {
527       SegmentReader reader = (SegmentReader)segments.elementAt(i);
528       if (reader.directory() == this.directory)
529   deleteFiles(reader.files(), deletable);    // try to delete our files
530       else
531   deleteFiles(reader.files(), reader.directory())// delete other files
532     }
533 
534     writeDeleteableFiles(deletable);      // note files we can't delete
535   }
536 
537   private final void deleteFiles(Vector files, Directory directory)
538        throws IOException {
539     for (int i = 0; i < files.size(); i++) {
540     directory.deleteFile((String)files.elementAt(i));
541     }
542   }
543 
544   private final void deleteFiles(Vector files, Vector deletable)
545        throws IOException {
546     for (int i = 0; i < files.size(); i++) {
547       String file = (String)files.elementAt(i);
548       try {
549         directory.deleteFile(file);      // try to delete each file
550       catch (IOException e) {        // if delete fails
551         if (directory.fileExists(file)) {
552           if (infoStream != null)
553             infoStream.println(e.getMessage() "; Will re-try later.");
554           deletable.addElement(file);      // add to deletable
555         }
556       }
557     }
558   }
559 
560   private final Vector readDeleteableFiles() throws IOException {
561     Vector result = new Vector();
562     if (!directory.fileExists("deletable"))
563       return result;
564 
565     InputStream input = directory.openFile("deletable");
566     try {
567       for (int i = input.readInt(); i > 0; i--)    // read file names
568         result.addElement(input.readString());
569     finally {
570       input.close();
571     }
572     return result;
573   }
574 
575   private final void writeDeleteableFiles(Vector filesthrows IOException {
576     OutputStream output = directory.createFile("deleteable.new");
577     try {
578       output.writeInt(files.size());
579       for (int i = 0; i < files.size(); i++)
580         output.writeString((String)files.elementAt(i));
581     finally {
582       output.close();
583     }
584     directory.renameFile("deleteable.new""deletable");
585   }
586 }