RealtimeCorpusController.java
001 /*
002  *  Copyright (c) 1995-2012, The University of Sheffield. See the file
003  *  COPYRIGHT.txt in the software or at http://gate.ac.uk/gate/COPYRIGHT.txt
004  *
005  *  This file is part of GATE (see http://gate.ac.uk/), and is free
006  *  software, licenced under the GNU Library General Public License,
007  *  Version 2, June 1991 (in the distribution as file licence.html,
008  *  and also available at http://gate.ac.uk/gate/licence.html).
009  *
010  *  Valentin Tablan 08/05/2008
011  *
012  *  $Id: RealtimeCorpusController.java 17613 2014-03-10 09:14:31Z markagreenwood $
013  *
014  */
015 
016 package gate.creole;
017 
018 import gate.Document;
019 import gate.Executable;
020 import gate.Factory;
021 import gate.LanguageAnalyser;
022 import gate.Resource;
023 import gate.creole.metadata.CreoleParameter;
024 import gate.creole.metadata.CreoleResource;
025 import gate.creole.metadata.Optional;
026 import gate.util.Err;
027 import gate.util.Out;
028 import gate.util.profile.Profiler;
029 
030 import java.util.HashMap;
031 import java.util.concurrent.Callable;
032 import java.util.concurrent.ExecutorService;
033 import java.util.concurrent.Executors;
034 import java.util.concurrent.Future;
035 import java.util.concurrent.ThreadFactory;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.TimeoutException;
038 
039 import org.apache.log4j.Logger;
040 
041 /**
042  * A custom GATE controller that interrupts the execution over a document when a
043  * specified amount of time has elapsed. It also ignores all errors/exceptions 
044  * that may occur during execution and simply carries on with the next document
045  * when that happens.
046  */
047 @CreoleResource(name = "Real-Time Corpus Pipeline",
048     comment = "A serial controller for PR pipelines over corpora which "
049         "limits the run time of each PR.",
050     icon = "application-realtime",
051     helpURL = "http://gate.ac.uk/userguide/sec:creole-model:applications")
052 public class RealtimeCorpusController extends SerialAnalyserController {
053   
054   private static final long serialVersionUID = -676170588997880008L;
055 
056   private final static boolean DEBUG = false;
057   
058   /**
059    * Shared logger object.
060    */
061   private static final Logger logger = Logger.getLogger(
062           RealtimeCorpusController.class);
063 
064   /** Profiler to track PR execute time */
065   protected Profiler prof;
066   protected HashMap<String,Long> timeMap;
067   
068   /**
069    * An executor service used to execute the PRs over the document .
070    */
071   protected ExecutorService threadSource;
072   
073   /**
074    * The tread currently running the document processing.
075    */
076   protected volatile Thread currentWorkingThread;
077   
078   protected volatile boolean threadDying;
079   
080   public RealtimeCorpusController(){
081     super();
082     if(DEBUG) {
083       prof = new Profiler();
084       prof.enableGCCalling(false);
085       prof.printToSystemOut(true);
086       timeMap = new HashMap<String,Long>();
087     }
088   }
089   
090   protected class DocRunner implements Callable<Object>{
091     
092     public DocRunner(Document document) {
093       this.document = document;
094     }
095     
096     @Override
097     public Object call() throws Exception {
098       try {
099         // save a reference to the executor thread
100         currentWorkingThread = Thread.currentThread();
101         // run the system over the current document
102         // set the doc and corpus
103         for(int j = 0; j < prList.size(); j++) {
104           ((LanguageAnalyser)prList.get(j)).setDocument(document);
105           ((LanguageAnalyser)prList.get(j)).setCorpus(corpus);
106         }
107         interrupted = false;
108         // execute the PRs
109         // check all the PRs have the right parameters
110         checkParameters();
111         if(DEBUG) {
112           prof.initRun("Execute controller [" + getName() "]");
113         }
114 
115         // execute all PRs in sequence
116         interrupted = false;
117         for(int j = 0; j < prList.size(); j++) {
118           if(isInterrupted())
119             throw new ExecutionInterruptedException("The execution of the "
120                     + getName() " application has been abruptly interrupted!");
121 
122           if(Thread.currentThread().isInterrupted()) {
123             Err.println("Execution on document " + document.getName()
124                     " has been stopped");
125             break;
126           }
127 
128           try {
129             runComponent(j);
130           catch(ThreadDeath td) {
131             // we got stopped
132             throw td;
133           catch(Throwable e) {
134             if(threadDying){
135               // we're in the process of stopping 
136               Err.println("Execution on document " + document.getName()
137                   " has been stopped");
138               // stop running the rest of the PRs
139               break;
140             else {
141               // the thread was not in the process of being stopped: 
142               // actual exception during processing: throw upwards
143               throw e;
144             }
145           }
146           if(DEBUG) {
147             prof.checkPoint("~Execute PR ["
148                     + prList.get(j).getName() "]");
149             Long timeOfPR = timeMap.get(prList.get(j).getName());
150             if(timeOfPR == null)
151               timeMap.put(prList.get(j).getName(),
152                       new Long(prof.getLastDuration()));
153             else timeMap.put(prList.get(j).getName(),
154                     new Long(timeOfPR.longValue() + prof.getLastDuration()));
155             Out.println("Time taken so far by "
156                     + prList.get(j).getName()
157                     ": "
158                     + timeMap.get(prList.get(j).getName()));
159           }
160         }
161       }
162       catch(ThreadDeath td) {
163         // special case as we need to re-throw this one
164         Err.prln("Execution on document " + document.getName()
165                 " has been stopped");
166         throw (td);
167       }
168       catch(Throwable cause) {
169         if(suppressExceptions) {
170           logger.info("Execution on document " + document.getName()
171                   " has caused an error (ignored):\n=========================", cause);
172           logger.info("=========================\nError ignored...\n");
173         else  {
174           if(cause instanceof Exception) {
175             throw (Exception)cause;
176           else {
177             throw new Exception(cause);
178           }        
179         }
180       }
181       finally {
182         // remove the reference to the thread, as we're now done
183         currentWorkingThread = null;
184         // unset the doc and corpus
185         for(int j = 0; j < prList.size(); j++) {
186           ((LanguageAnalyser)prList.get(j)).setDocument(null);
187           ((LanguageAnalyser)prList.get(j)).setCorpus(null);
188         }
189 
190         if(DEBUG) {
191           prof.checkPoint("Execute controller [" + getName() "] finished");
192         }
193       }
194       return null;
195     }
196     private Document document;
197   }
198   
199   @Override
200   public void cleanup() {
201     threadSource.shutdownNow();
202     super.cleanup();
203   }
204 
205   Long actualTimeout;
206   Long actualGraceful;
207   
208   @Override
209   public Resource init() throws ResourceInstantiationException {
210     // the actual time limits used are the ones set as init parameters by default
211     // but if the apropriate property is set the value from the property is 
212     // used instead:
213     String propTimeout = System.getProperty("gate.creole.RealtimeCorpusController.timeout");
214     if(propTimeout != null) {
215       actualTimeout = Long.parseLong(propTimeout);
216     else {
217       actualTimeout = timeout;  
218     }
219     String propGraceful = System.getProperty("gate.creole.RealtimeCorpusController.graceful");
220     if(propGraceful != null) {
221       actualGraceful = Long.parseLong(propGraceful);
222     else {
223       actualGraceful = graceful;
224     }
225     // we normally require 2 threads: one to execute the PRs and another one to
226     // to execute the job stoppers. More threads are created as required.  We
227     // use a custom ThreadFactory that returns daemon threads so we don't block
228     // GATE from exiting if this controller has not been properly disposed of.
229     threadSource = Executors.newSingleThreadExecutor(new ThreadFactory() {
230       private ThreadFactory dtf = Executors.defaultThreadFactory();
231       @Override
232       public Thread newThread(Runnable r) {
233         Thread t = dtf.newThread(r);
234         t.setDaemon(true);
235         return t;
236       }
237     });
238     return super.init();
239   }
240 
241   /** Run the Processing Resources in sequence. */
242   @Override
243   @SuppressWarnings("deprecation")
244   public void executeImpl() throws ExecutionException{
245     interrupted = false;
246     String haveTimeout = null;
247     if(corpus == nullthrow new ExecutionException(
248       "(SerialAnalyserController) \"" + getName() "\":\n" +
249       "The corpus supplied for execution was null!");
250     //iterate through the documents in the corpus
251     for(int i = 0; i < corpus.size(); i++){
252       if(isInterrupted()) throw new ExecutionInterruptedException(
253         "The execution of the " + getName() +
254         " application has been abruptly interrupted!");
255 
256       boolean docWasLoaded = corpus.isDocumentLoaded(i);
257       Document doc = corpus.get(i);
258       // start the execution, in the separate thread
259       threadDying = false;
260       Future<?> docRunnerFuture = threadSource.submit(new DocRunner(doc));
261       // how long have we already waited 
262       long waitSoFar = 0;
263       // check if we should use graceful stop first 
264       if (actualGraceful != -&& (actualTimeout == -|| actualGraceful < actualTimeout )) {
265         try {
266           docRunnerFuture.get(actualGraceful, TimeUnit.MILLISECONDS);
267         catch(TimeoutException e) {
268           // we waited the graceful period, and the task did not finish
269           // -> interrupt the job (nicely)
270           threadDying = true;
271           waitSoFar += actualGraceful;
272           haveTimeout = "Execution timeout, attempting to gracefully stop worker thread...";
273           logger.info(haveTimeout);
274           // interrupt the working thread - we can't cancel the future as
275           // that would cause future get() calls to fail immediately with
276           // a CancellationException
277           Thread t = currentWorkingThread;
278           if(t != null) {
279             t.interrupt();
280           }
281           for(int j = 0; j < prList.size(); j++){
282             ((Executable)prList.get(j)).interrupt();
283           }
284           // next check scheduled for 
285           // - half-time between graceful and timeout, or
286           // - graceful-and-a-half (if no timeout)
287           long waitTime = (actualTimeout != -1
288                           (actualTimeout - actualGraceful
289                           (actualGraceful / 2);
290           try {
291             docRunnerFuture.get(waitTime, TimeUnit.MILLISECONDS);
292           catch(TimeoutException e1) {
293             // the mid point has been reached: try nullify
294             threadDying = true;
295             waitSoFar += waitTime;
296             haveTimeout = "Execution timeout, attempting to induce exception in order to stop worker thread...";
297             logger.info(haveTimeout);
298             for(int j = 0; j < prList.size(); j++){
299               ((LanguageAnalyser)prList.get(j)).setDocument(null);
300               ((LanguageAnalyser)prList.get(j)).setCorpus(null);
301             }            
302           catch(InterruptedException e1) {
303             // the current thread (not the execution thread!) was interrupted
304             // throw it forward
305             Thread.currentThread().interrupt();
306           catch(java.util.concurrent.ExecutionException e2) {
307             throw new ExecutionException(e2);
308           }
309         catch(java.util.concurrent.ExecutionException e) {
310           throw new ExecutionException(e);
311         catch(InterruptedException e) {
312           // the current thread (not the execution thread!) was interrupted
313           // throw it forward
314           Thread.currentThread().interrupt();
315         }
316       }
317       // wait before we call stop()
318       if(actualTimeout != -1) {
319         long waitTime = actualTimeout - waitSoFar;
320         if(waitTime > 0) {
321           try {
322             docRunnerFuture.get(waitTime, TimeUnit.MILLISECONDS);
323           catch(TimeoutException e) {
324             // we're out of time: stop the thread
325             threadDying = true;
326             haveTimeout = "Execution timeout, worker thread will be forcibly terminated!";
327             logger.info(haveTimeout);
328             // using a volatile variable instead of synchronisation
329             Thread theThread = currentWorkingThread;
330             if(theThread != null) {
331               theThread.stop();
332               try {
333                 // and wait for it to actually die
334                 docRunnerFuture.get();
335               catch(InterruptedException e2) {
336                 // current thread has been interrupted: 
337                 Thread.currentThread().interrupt();
338               catch(java.util.concurrent.ExecutionException ee) {
339                 if(ee.getCause() instanceof ThreadDeath) {
340                   // we have just caused this  
341                 else {
342                   logger.error("Real Time Controller Malfunction", ee);
343                   haveTimeout = "Real Time Controller Malfunction: "+ee.getMessage();
344                 }
345               }
346             }
347           catch(InterruptedException e) {
348             // the current thread (not the execution thread!) was interrupted
349             // throw it forward
350             Thread.currentThread().interrupt();
351           catch(java.util.concurrent.ExecutionException e) {
352             throw new ExecutionException(e);
353           }
354         else {
355           // stop now!
356           threadDying = true;
357           haveTimeout = "Execution timeout, worker thread will be forcibly terminated!";
358           logger.info(haveTimeout);
359           // using a volatile variable instead of synchronisation
360           Thread theThread = currentWorkingThread;
361           if(theThread != null){
362             theThread.stop();
363             try {
364               // and wait for it to actually die
365               docRunnerFuture.get();
366             catch(InterruptedException e) {
367               // current thread has been interrupted: 
368               Thread.currentThread().interrupt();
369             catch(java.util.concurrent.ExecutionException ee) {
370               if(ee.getCause() instanceof ThreadDeath) {
371                 // we have just caused this  
372               else {
373                 logger.error("Real Time Controller Malfunction", ee);
374                 haveTimeout = "Real Time Controller Malfunction: "+ee.getMessage();
375               }
376             }
377           }
378         }
379       }
380       
381       String docName = doc.getName();
382       // at this point we finished execution (one way or another)
383       if(!docWasLoaded){
384         //trigger saving
385         getCorpus().unloadDocument(doc);
386         //close the previously unloaded Doc
387         Factory.deleteResource(doc);
388       }
389       if(!suppressExceptions && haveTimeout != null) {
390         throw new ExecutionException("Execution timeout occurred");
391       }
392       // global progress bar depends on this status message firing at the end
393       // of processing for each document.
394       fireStatusChanged("Finished running " + getName() " on document " +
395           docName);
396     }
397   }
398   
399   
400   /**
401    * The timeout in milliseconds before execution on a document is 
402    * forcibly stopped (forcibly stopping execution may result in memory leaks 
403    * and/or unexpected behaviour).   
404    */
405   protected Long timeout;
406 
407   /**
408    * Gets the timeout in milliseconds before execution on a document is 
409    * forcibly stopped (forcibly stopping execution may result in memory leaks 
410    * and/or unexpected behaviour).
411    */
412   public Long getTimeout() {
413     return timeout;
414   }
415   
416   
417   /**
418    * Sets the timeout in milliseconds before execution on a document is 
419    * forcibly stopped (forcibly stopping execution may result in memory leaks 
420    * and/or unexpected behaviour).
421    @param timeout in milliseconds before execution is forcibly stopped
422    */
423   @CreoleParameter(defaultValue = "60000",
424       comment = "Timeout in milliseconds before execution on a document is forcibly stopped (forcibly stopping execution may result in memory leaks and/or unexpected behaviour)")
425   public void setTimeout(Long timeout) {
426     this.timeout = timeout;
427   }
428   
429   /**
430    * The timeout in milliseconds before execution on a document is 
431    * gracefully stopped. Defaults to -1 which disables this functionality and 
432    * relies, as previously, on forcibly stopping execution.
433    */
434   protected Long graceful;
435 
436   /**
437    * Gets the timeout in milliseconds before execution on a document is 
438    * gracefully stopped. Defaults to -1 which disables this functionality and 
439    * relies, as previously, on forcibly stopping execution.
440    */
441   public Long getGracefulTimeout() {
442     return graceful;
443   }
444 
445   /**
446    * Sets the timeout in milliseconds before execution on a document is 
447    * gracefully stopped. Defaults to -1 which disables this functionality and 
448    * relies, as previously, on forcibly stopping execution.
449    @param graceful timeout in milliseconds before execution is gracefully stopped
450    */
451   @CreoleParameter(defaultValue = "-1",
452       comment = "Timeout in milliseconds before execution on a document is gracefully stopped. Defaults to -1 which disables this functionality and relies, as previously, on forcibly stoping execution.")
453   public void setGracefulTimeout(Long graceful) {
454     this.graceful = graceful;
455   }
456   
457   /**
458    * If true, suppresses all exceptions. If false, passes all exceptions, including
459    * exceptions indicating a timeout, on to the caller.
460    */
461   @Optional
462   @CreoleParameter(defaultValue = "true",           
463     comment = "Should all exceptions be suppressed and just a message be written to standard logger.info?")
464   public void setSuppressExceptions(Boolean yesno) {
465     suppressExceptions = yesno;
466   }
467   
468   public Boolean getSuppressExceptions() {
469     return suppressExceptions;
470   }
471   
472   protected boolean suppressExceptions = true;
473 
474 }