LuceneDataStoreImpl.java
001 package gate.persist;
002 
003 import gate.Corpus;
004 import gate.DataStore;
005 import gate.Document;
006 import gate.Factory;
007 import gate.FeatureMap;
008 import gate.Gate;
009 import gate.LanguageResource;
010 import gate.Resource;
011 import gate.corpora.SerialCorpusImpl;
012 import gate.creole.ResourceInstantiationException;
013 import gate.creole.annic.Constants;
014 import gate.creole.annic.Hit;
015 import gate.creole.annic.IndexException;
016 import gate.creole.annic.Indexer;
017 import gate.creole.annic.SearchException;
018 import gate.creole.annic.SearchableDataStore;
019 import gate.creole.annic.Searcher;
020 import gate.creole.annic.lucene.LuceneIndexer;
021 import gate.creole.annic.lucene.LuceneSearcher;
022 import gate.event.CorpusEvent;
023 import gate.event.CorpusListener;
024 import gate.event.CreoleEvent;
025 import gate.event.CreoleListener;
026 import gate.util.Files;
027 import gate.util.GateRuntimeException;
028 import gate.util.Strings;
029 import gate.util.persistence.PersistenceManager;
030 
031 import java.io.BufferedReader;
032 import java.io.File;
033 import java.io.FileOutputStream;
034 import java.io.FileReader;
035 import java.io.IOException;
036 import java.io.OutputStreamWriter;
037 import java.lang.ref.ReferenceQueue;
038 import java.lang.ref.SoftReference;
039 import java.net.URL;
040 import java.util.ArrayList;
041 import java.util.Collection;
042 import java.util.HashMap;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.concurrent.ConcurrentHashMap;
047 import java.util.concurrent.ConcurrentMap;
048 import java.util.concurrent.Executors;
049 import java.util.concurrent.ScheduledThreadPoolExecutor;
050 import java.util.concurrent.TimeUnit;
051 import java.util.concurrent.atomic.AtomicBoolean;
052 
053 import org.apache.commons.io.IOUtils;
054 
055 public class LuceneDataStoreImpl extends SerialDataStore implements
056                                                         SearchableDataStore,
057                                                         CorpusListener,
058                                                         CreoleListener {
059 
060   /**
061    * serial version UID
062    */
063   private static final long serialVersionUID = 3618696392336421680L;
064 
065   /**
066    * To store canonical lock objects for each LR ID.
067    */
068   protected Map<Object, LabelledSoftReference> lockObjects =
069           new HashMap<Object, LabelledSoftReference>();
070 
071   /**
072    * Reference queue with which the soft references in the lockObjects
073    * map will be registered.
074    */
075   protected ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
076 
077   /**
078    * Indicates if the datastore is being closed.
079    */
080   protected boolean dataStoreClosing = false;
081 
082   /**
083    * Executor to run the indexing tasks
084    */
085   protected ScheduledThreadPoolExecutor executor;
086 
087   /**
088    * Map keeping track of the most recent indexing task for each LR ID.
089    */
090   protected ConcurrentMap<Object, IndexingTask> currentTasks =
091           new ConcurrentHashMap<Object, IndexingTask>();
092 
093   /**
094    * Number of milliseconds we should wait after a sync before
095    * attempting to re-index a document. If sync is called again for the
096    * same document within this time then the timer for the re-indexing
097    * task is reset. Thus if several changes to the same document are
098    * made in quick succession it will only be re-indexed once. On the
099    * other hand, if the delay is set too long the document may never be
100    * indexed until the data store is closed. The default delay is 1000
101    * (one second).
102    */
103   protected long indexDelay = 1000L;
104 
105   /**
106    * Indexer to be used for indexing documents
107    */
108   protected Indexer indexer;
109 
110   /**
111    * Index Parameters
112    */
113   protected Map<String,Object> indexParameters;
114 
115   /**
116    * URL of the index
117    */
118   protected URL indexURL;
119 
120   /**
121    * Searcher to be used for searching the indexed documents
122    */
123   protected Searcher searcher;
124 
125   /**
126    * This is where we store the search parameters
127    */
128   protected Map<String,Object> searchParameters;
129 
130   /** Close the data store. */
131   @Override
132   public void close() throws PersistenceException {
133     // stop listening to Creole events
134     Gate.getCreoleRegister().removeCreoleListener(this);
135     // shut down the executor. We submit the shutdown request
136     // as a zero-delay task rather than calling shutdown directly,
137     // in order to interrupt any timed wait currently in progress.
138     executor.execute(new Runnable() {
139       @Override
140       public void run() {
141         executor.shutdown();
142       }
143     });
144     try {
145       // allow up to two minutes for indexing to finish
146       executor.awaitTermination(120, TimeUnit.SECONDS);
147     catch(InterruptedException e) {
148       // propagate the interruption
149       Thread.currentThread().interrupt();
150     }
151 
152     // At this point, any in-progress indexing tasks have
153     // finished. We now process any tasks that were queued
154     // but not run, running them in the current thread.
155     Collection<IndexingTask> queuedTasks = currentTasks.values();
156     // copy the tasks into an array to avoid concurrent
157     // modification issues, as IndexingTask.run modifies
158     // the currentTasks map
159     IndexingTask[] queuedTasksArray =
160             queuedTasks.toArray(new IndexingTask[queuedTasks.size()]);
161     for(IndexingTask task : queuedTasksArray) {
162       task.run();
163     }
164 
165     super.close();
166   // close()
167 
168   /** Open a connection to the data store. */
169   @Override
170   public void open() throws PersistenceException {
171     super.open();
172     /*
173      * check if the storage directory is a valid serial datastore if we
174      * want to support old style: String versionInVersionFile = "1.0";
175      * (but this means it will open *any* directory)
176      */
177     BufferedReader isr = null;
178     try {
179       isr = new BufferedReader(new FileReader(getVersionFile()));
180       currentProtocolVersion = isr.readLine();
181       String indexDirRelativePath = isr.readLine();
182 
183       if(indexDirRelativePath != null
184               && indexDirRelativePath.trim().length() 1) {
185         URL storageDirURL = storageDir.toURI().toURL();
186         URL theIndexURL = new URL(storageDirURL, indexDirRelativePath);
187         // check if index directory exists
188         File indexDir = Files.fileFromURL(theIndexURL);
189         if(!indexDir.exists()) {
190           throw new PersistenceException("Index directory "
191                   + indexDirRelativePath
192                   " could not be found for datastore at " + storageDirURL);
193         }
194 
195         indexURL = theIndexURL;
196         this.indexer = new LuceneIndexer(indexURL);
197         this.searcher = new LuceneSearcher();
198         ((LuceneSearcher)this.searcher).setLuceneDatastore(this);
199       }
200     catch(IOException e) {
201       throw new PersistenceException("Invalid storage directory: " + e);
202     finally {
203       IOUtils.closeQuietly(isr);
204     }
205     
206     if(!isValidProtocolVersion(currentProtocolVersion))
207       throw new PersistenceException("Invalid protocol version number: "
208               + currentProtocolVersion);
209 
210     // Lets create a separate indexer thread which keeps running in the
211     // background
212     executor =
213             new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
214     // set up the executor so it does not execute delayed indexing tasks
215     // that are still waiting when it is shut down. We run these tasks
216     // immediately at shutdown time rather than waiting.
217     executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
218     executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
219     
220     // start listening to Creole events
221     Gate.getCreoleRegister().addCreoleListener(this);
222   }
223 
224   /**
225    * Obtain the lock object on which we must synchronize when loading or
226    * saving the LR with the given ID.
227    */
228   private Object lockObjectForID(Object id) {
229     synchronized(lockObjects) {
230       processRefQueue();
231       Object lock = null;
232       if(lockObjects.containsKey(id)) {
233         lock = lockObjects.get(id).get();
234       }
235       if(lock == null) {
236         lockObjects.remove(id);
237         lock = new Object();
238         LabelledSoftReference ref = new LabelledSoftReference(lock);
239         ref.label = id;
240         lockObjects.put(id, ref);
241       }
242 
243       return lock;
244     }
245   }
246 
247   /**
248    * Cleans up the lockObjects map by removing any entries whose
249    * SoftReference values have been cleared by the garbage collector.
250    */
251   private void processRefQueue() {
252     LabelledSoftReference ref = null;
253     while((ref = LabelledSoftReference.class.cast(refQueue.poll())) != null) {
254       // check that the queued ref hasn't already been replaced in the
255       // map
256       if(lockObjects.get(ref.label== ref) {
257         lockObjects.remove(ref.label);
258       }
259     }
260   }
261 
262   /**
263    * Submits the given LR ID for indexing. The task is delayed by 5
264    * seconds, so multiple updates to the same LR in close succession do
265    * not un-necessarily trigger multiple re-indexing passes.
266    */
267   protected void queueForIndexing(Object lrID) {
268     IndexingTask existingTask = currentTasks.get(lrID);
269     if(existingTask != null) {
270       existingTask.disable();
271     }
272 
273     IndexingTask newTask = new IndexingTask(lrID);
274     currentTasks.put(lrID, newTask);
275     // set the LR to be indexed after the configured delay
276     executor.schedule(newTask, indexDelay, TimeUnit.MILLISECONDS);
277   }
278 
279   /**
280    * Delete a resource from the data store.
281    */
282   @Override
283   public void delete(String lrClassName, Object lrPersistenceId)
284           throws PersistenceException {
285 
286     IndexingTask task = currentTasks.get(lrPersistenceId);
287     if(task != null) {
288       task.disable();
289     }
290 
291     // and we delete it from the datastore
292     // we obtained the lock on this - in order to avoid clashing between
293     // the object being loaded by the indexer thread and the thread that
294     // deletes it
295     Object lock = lockObjectForID(lrPersistenceId);
296     synchronized(lock) {
297       super.delete(lrClassName, lrPersistenceId);
298     }
299     lock = null;
300 
301     /*
302      * lets first find out if the deleted resource is a corpus. Deleting
303      * a corpus does not require deleting all its member documents but
304      * we need to remove the reference of corpus from all its underlying
305      * documents in index
306      */
307     try {
308       if(Corpus.class.isAssignableFrom(Class.forName(lrClassName, true,
309               Gate.getClassLoader()))) {
310         /*
311          * we would issue a search query to obtain all documents which
312          * belong to his corpus and set them as referring to null
313          * instead of refering to the given corpus
314          */
315         Map<String, Object> parameters = new HashMap<String, Object>();
316         parameters.put(Constants.INDEX_LOCATION_URL, indexURL);
317         parameters.put(Constants.CORPUS_ID, lrPersistenceId.toString());
318         try {
319           boolean success = getSearcher().search("nothing", parameters);
320           if(!successreturn;
321 
322           Hit[] hits = getSearcher().next(-1);
323           if(hits == null || hits.length == 0) {
324             // do nothing
325             return;
326           }
327 
328           for(int i = 0; i < hits.length; i++) {
329             String docID = hits[i].getDocumentID();
330             queueForIndexing(docID);
331           }
332         catch(SearchException se) {
333           throw new PersistenceException(se);
334         }
335         return;
336       }
337     catch(ClassNotFoundException cnfe) {
338       // don't do anything
339     }
340 
341     // we want to delete this document from the Index as well
342     ArrayList<Object> removed = new ArrayList<Object>();
343     removed.add(lrPersistenceId);
344     try {
345       synchronized(indexer) {
346         this.indexer.remove(removed);
347       }
348     catch(IndexException ie) {
349       throw new PersistenceException(ie);
350     }
351   }
352 
353   @Override
354   public LanguageResource getLr(String lrClassName, Object lrPersistenceId)
355           throws PersistenceException, SecurityException {
356     LanguageResource lr = super.getLr(lrClassName, lrPersistenceId);
357     if(lr instanceof Corpus) {
358       ((Corpus)lr).addCorpusListener(this);
359     }
360     return lr;
361   }
362 
363   /**
364    * Save: synchonise the in-memory image of the LR with the persistent
365    * image.
366    */
367   @Override
368   public void sync(LanguageResource lrthrows PersistenceException {
369     if(lr.getLRPersistenceId() != null) {
370       // lock the LR ID so we don't write to the file while an
371       // indexer task is reading it
372       Object lock = lockObjectForID(lr.getLRPersistenceId());
373       synchronized(lock) {
374         
375         // we load the copy of this LR and check if any modification were done
376         // if so, it should be reindexed or else it should not be synced again.
377         LanguageResource copy = null;
378         try {
379           copy =
380                   getLr(lr.getClass().getName(), lr.getLRPersistenceId());
381 
382           // we check it only if it is an instance of Document
383           if(copy instanceof Document && lr instanceof Document) {
384             Document cDoc = (Document)copy;
385             Document lrDoc = (Document)lr;
386             boolean sameDocs = false;
387             
388             // we only check content and annotation sets
389             // as that's what matters from the annic perspective
390             if(cDoc.getContent().equals(lrDoc.getContent())) {
391               if(cDoc.getAnnotations().equals(lrDoc.getAnnotations())) {
392                 if(cDoc.getNamedAnnotationSets().equals(
393                         lrDoc.getNamedAnnotationSets())) {
394                   boolean allSetsSame = true;
395                   for(String key : cDoc.getNamedAnnotationSets().keySet()) {
396                     if(!cDoc.getAnnotations(key).equals(lrDoc.getAnnotations(key))) {
397                       allSetsSame = false;
398                       break;
399                     }
400                   }
401                   if(allSetsSame) {
402                     sameDocs = true;
403                   }
404                 }
405               }
406             }
407 
408             
409             if(sameDocs) {
410               lock = null;
411               return;
412             }
413           }
414         catch(SecurityException e) {
415           e.printStackTrace();
416         finally {
417           
418           // delete the copy of this LR
419           if(copy != null) {
420             Factory.deleteResource(copy);
421           }
422         }
423         
424         super.sync(lr);
425       }
426       lock = null;
427     else {
428       super.sync(lr);
429     }
430 
431 
432 
433     if(lr instanceof Document) {
434       queueForIndexing(lr.getLRPersistenceId());
435     }
436   }
437 
438   /**
439    * Sets the Indexer to be used for indexing Datastore
440    */
441   @Override
442   public void setIndexer(Indexer indexer, Map<String,Object> indexParameters)
443           throws IndexException {
444 
445     this.indexer = indexer;
446     this.indexParameters = indexParameters;
447     this.indexURL = (URL)this.indexParameters.get(Constants.INDEX_LOCATION_URL);
448     this.indexer.createIndex(this.indexParameters);
449 
450     // dump the version file
451     try {
452       File versionFile = getVersionFile();
453       OutputStreamWriter osw =
454               new OutputStreamWriter(new FileOutputStream(versionFile));
455       osw.write(versionNumber + Strings.getNl());
456       String indexDirRelativePath =
457               PersistenceManager.getRelativePath(storageDir.toURI().toURL(),
458                       indexURL);
459       osw.write(indexDirRelativePath);
460       osw.close();
461     catch(IOException e) {
462       throw new IndexException("couldn't write version file: " + e);
463     }
464   }
465 
466   @Override
467   public Indexer getIndexer() {
468     return this.indexer;
469   }
470 
471   @Override
472   public void setSearcher(Searcher searcherthrows SearchException {
473     this.searcher = searcher;
474     if(this.searcher instanceof LuceneSearcher) {
475       ((LuceneSearcher)this.searcher).setLuceneDatastore(this);
476     }
477   }
478 
479   @Override
480   public Searcher getSearcher() {
481     return this.searcher;
482   }
483 
484   /**
485    * Sets the delay in milliseconds that we should wait after a sync
486    * before attempting to re-index a document. If sync is called again
487    * for the same document within this time then the timer for the
488    * re-indexing task is reset. Thus if several changes to the same
489    * document are made in quick succession it will only be re-indexed
490    * once. On the other hand, if the delay is set too long the document
491    * may never be indexed until the data store is closed. The default
492    * delay is 1000ms (one second), which should be appropriate for usage
493    * in the GATE GUI.
494    */
495   public void setIndexDelay(long indexDelay) {
496     this.indexDelay = indexDelay;
497   }
498 
499   public long getIndexDelay() {
500     return indexDelay;
501   }
502 
503   /**
504    * Search the datastore
505    */
506   @Override
507   public boolean search(String query, Map<String,Object> searchParameters)
508           throws SearchException {
509     return this.searcher.search(query, searchParameters);
510   }
511 
512   /**
513    * Returns the next numberOfPatterns
514    
515    @param numberOfPatterns
516    @return null if no patterns found
517    */
518   @Override
519   public Hit[] next(int numberOfPatternsthrows SearchException {
520     return this.searcher.next(numberOfPatterns);
521   }
522 
523   // Corpus Events
524   /**
525    * This method is invoked whenever a document is removed from a corpus
526    */
527   @Override
528   public void documentRemoved(CorpusEvent ce) {
529     Object docLRID = ce.getDocumentLRID();
530 
531     /*
532      * we need to remove this document from the index
533      */
534     if(docLRID != null) {
535       ArrayList<Object> removed = new ArrayList<Object>();
536       removed.add(docLRID);
537       try {
538         synchronized(indexer) {
539           indexer.remove(removed);
540         }
541       catch(IndexException ie) {
542         throw new GateRuntimeException(ie);
543       }
544       // queueForIndexing(docLRID);
545     }
546   }
547 
548   /**
549    * This method is invoked whenever a document is added to a particular
550    * corpus
551    */
552   @Override
553   public void documentAdded(CorpusEvent ce) {
554     /*
555      * we don't want to do anything here, because the sync is
556      * automatically called when a document is added to a corpus which
557      * is part of the the datastore
558      */
559   }
560 
561   /*
562    * (non-Javadoc)
563    
564    * @see
565    * gate.event.CreoleListener#datastoreClosed(gate.event.CreoleEvent)
566    */
567   @Override
568   public void datastoreClosed(CreoleEvent e) {
569   }
570 
571   /*
572    * (non-Javadoc)
573    
574    * @see
575    * gate.event.CreoleListener#datastoreCreated(gate.event.CreoleEvent)
576    */
577   @Override
578   public void datastoreCreated(CreoleEvent e) {
579   }
580 
581   /*
582    * (non-Javadoc)
583    
584    * @see
585    * gate.event.CreoleListener#datastoreOpened(gate.event.CreoleEvent)
586    */
587   @Override
588   public void datastoreOpened(CreoleEvent e) {
589   }
590 
591   /*
592    * (non-Javadoc)
593    
594    * @see
595    * gate.event.CreoleListener#resourceLoaded(gate.event.CreoleEvent)
596    */
597   @Override
598   public void resourceLoaded(CreoleEvent e) {
599   }
600 
601   /*
602    * (non-Javadoc)
603    
604    * @see gate.event.CreoleListener#resourceRenamed(gate.Resource,
605    * java.lang.String, java.lang.String)
606    */
607   @Override
608   public void resourceRenamed(Resource resource, String oldName, String newName) {
609   }
610 
611   /*
612    * (non-Javadoc)
613    
614    * @see
615    * gate.event.CreoleListener#resourceUnloaded(gate.event.CreoleEvent)
616    */
617   @Override
618   public void resourceUnloaded(CreoleEvent e) {
619     // if the resource being close is one of our corpora. we need to
620     // remove
621     // the corpus listener associated with it
622     Resource res = e.getResource();
623     if(res instanceof Corpus) {
624       ((Corpus)res).removeCorpusListener(this);
625     }
626   }
627 
628   protected class IndexingTask implements Runnable {
629     private AtomicBoolean disabled = new AtomicBoolean(false);
630 
631     private Object lrID;
632 
633     public IndexingTask(Object lrID) {
634       this.lrID = lrID;
635     }
636 
637     public void disable() {
638       disabled.set(true);
639     }
640 
641     @Override
642     public void run() {
643       // remove this task from the currentTasks map if it has not been
644       // superseded by a later task
645       currentTasks.remove(lrID, this);
646       // only run the rest of the process if this task has not been
647       // disabled (because a newer task for the same LR was scheduled).
648       // We set the disabled flag at this point so the same task cannot
649       // be run twice.
650       if(disabled.compareAndSet(false, true)) {
651         Document doc = null;
652         // read the document from datastore
653         FeatureMap features = Factory.newFeatureMap();
654         features.put(DataStore.LR_ID_FEATURE_NAME, lrID);
655         features.put(DataStore.DATASTORE_FEATURE_NAME, LuceneDataStoreImpl.this);
656         FeatureMap hidefeatures = Factory.newFeatureMap();
657         Gate.setHiddenAttribute(hidefeatures, true);
658         try {
659           // lock the LR ID so we don't try and read a file
660           // which is in the process of being written
661           Object lock = lockObjectForID(lrID);
662           synchronized(lock) {
663             doc =
664                     (Document)Factory
665                             .createResource("gate.corpora.DocumentImpl",
666                                     features, hidefeatures);
667           }
668           lock = null;
669         catch(ResourceInstantiationException rie) {
670           // this means the LR ID was null
671           doc = null;
672         }
673 
674         // if the document is not null,
675         // proceed to indexing it
676         if(doc != null) {
677 
678           /*
679            * we need to reindex this document in order to synchronize it
680            * lets first remove it from the index
681            */
682           ArrayList<Object> removed = new ArrayList<Object>();
683           removed.add(lrID);
684           try {
685             synchronized(indexer) {
686               indexer.remove(removed);
687             }
688           catch(IndexException ie) {
689             throw new GateRuntimeException(ie);
690           }
691 
692           // and add it back
693           ArrayList<Document> added = new ArrayList<Document>();
694           added.add(doc);
695 
696           try {
697             String corpusPID = null;
698 
699             /*
700              * we need to find out the corpus which this document
701              * belongs to one easy way is to check all instances of
702              * serial corpus loaded in memory
703              */
704             List<LanguageResource> scs =
705                     Gate.getCreoleRegister().getLrInstances(
706                             SerialCorpusImpl.class.getName());
707             if(scs != null) {
708               /*
709                * we need to check which corpus the deleted class
710                * belonged to
711                */
712               Iterator<LanguageResource> iter = scs.iterator();
713               while(iter.hasNext()) {
714                 SerialCorpusImpl sci = (SerialCorpusImpl)iter.next();
715                 if(sci != null) {
716                   if(sci.contains(doc)) {
717                     corpusPID = sci.getLRPersistenceId().toString();
718                     break;
719                   }
720                 }
721               }
722             }
723 
724             /*
725              * it is also possible that the document is loaded from
726              * datastore without being loaded from the corpus (e.g.
727              * using getLR(...) method of datastore) in this case the
728              * relevant corpus won't exist in memory
729              */
730             if(corpusPID == null) {
731               List<String> corpusPIDs = getLrIds(SerialCorpusImpl.class.getName());
732               if(corpusPIDs != null) {
733                 for(int i = 0; i < corpusPIDs.size(); i++) {
734                   Object corpusID = corpusPIDs.get(i);
735 
736                   SerialCorpusImpl corpusLR = null;
737                   // we will have to load this corpus
738                   FeatureMap params = Factory.newFeatureMap();
739                   params.put(DataStore.DATASTORE_FEATURE_NAME,
740                           LuceneDataStoreImpl.this);
741                   params.put(DataStore.LR_ID_FEATURE_NAME, corpusID);
742                   hidefeatures = Factory.newFeatureMap();
743                   Gate.setHiddenAttribute(hidefeatures, true);
744                   Object lock = lockObjectForID(corpusID);
745                   synchronized(lock) {
746                     corpusLR =
747                             (SerialCorpusImpl)Factory.createResource(
748                                     SerialCorpusImpl.class.getCanonicalName(),
749                                     params, hidefeatures);
750                   }
751                   lock = null;
752 
753                   if(corpusLR != null) {
754                     if(corpusLR.contains(doc)) {
755                       corpusPID = corpusLR.getLRPersistenceId().toString();
756                     }
757                     Factory.deleteResource(corpusLR);
758                     if(corpusPID != nullbreak;
759                   }
760                 }
761               }
762             }
763 
764             synchronized(indexer) {
765               indexer.add(corpusPID, added);
766             }
767 
768             Factory.deleteResource(doc);
769           catch(Exception ie) {
770             ie.printStackTrace();
771           }
772         }
773       }
774     }
775 
776   }
777 
778   /**
779    * Soft reference with an associated label.
780    */
781   private class LabelledSoftReference extends SoftReference<Object> {
782     Object label;
783 
784     public LabelledSoftReference(Object referent) {
785       super(referent);
786     }
787   }
788 }