Log in Help
Print
Homereleasesgate-8.1-build5169-ALLpluginsTwittersrcgatecorporatwitter 〉 TweetStreamIterator.java
 
/*
 *  Copyright (c) 1995-2014, The University of Sheffield. See the file
 *  COPYRIGHT.txt in the software or at http://gate.ac.uk/gate/COPYRIGHT.txt
 *
 *  This file is part of GATE (see http://gate.ac.uk/), and is free
 *  software, licenced under the GNU Library General Public License,
 *  Version 2, June 1991 (in the distribution as file licence.html,
 *  and also available at http://gate.ac.uk/gate/licence.html).
 *  
 *  $Id: TweetStreamIterator.java 18420 2014-10-30 19:26:45Z ian_roberts $
 */
package gate.corpora.twitter;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.apache.log4j.Logger;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;


public class TweetStreamIterator implements Iterator<Tweet> {

  // Borrowed from gcp IOConstants
  public static final String SEARCH_KEY = "search_metadata";
  public static final String STATUS_KEY = "statuses";
  
  private static final Logger logger = Logger.getLogger(TweetStreamIterator.class.getName());

  private ObjectMapper objectMapper;
  private JsonParser jsonParser;
  private MappingIterator<JsonNode> iterator;
  private List<String> contentKeys, featureKeys;
  private boolean nested;
  private Iterator<JsonNode> nestedStatuses;
  private JsonNode nextNode;
  private boolean handleEntities;
  
  public TweetStreamIterator(String json, List<String> contentKeys, 
          List<String> featureKeys) throws JsonParseException, IOException {
    this(json, contentKeys, featureKeys, true);
  }
  
  
  public TweetStreamIterator(String json, List<String> contentKeys, 
          List<String> featureKeys, boolean handleEntities) throws JsonParseException, IOException {
    this.contentKeys = contentKeys;
    this.featureKeys = featureKeys;
    this.handleEntities = handleEntities;
    objectMapper = new ObjectMapper();
    jsonParser = objectMapper.getFactory().createParser(json);
    init();
  }
  
  public TweetStreamIterator(InputStream input, List<String> contentKeys, 
          List<String> featureKeys, boolean gzip) throws JsonParseException, IOException {
    this(input, contentKeys, featureKeys, gzip, true);
  }
  
  public TweetStreamIterator(InputStream input, List<String> contentKeys, 
          List<String> featureKeys, boolean gzip, boolean handleEntities)
                  throws JsonParseException, IOException {
    this.contentKeys = contentKeys;
    this.featureKeys = featureKeys;
    this.handleEntities = handleEntities;
    InputStream workingInput;
    
    // Following borrowed from gcp JSONStreamingInputHandler
    objectMapper = new ObjectMapper();

    if (gzip) {
      workingInput = new GZIPInputStream(input);
    }
    else {
      workingInput = input;
    }
    
    jsonParser = objectMapper.getFactory().createParser(workingInput).enable(Feature.AUTO_CLOSE_SOURCE);
    init();
  }

  private void init() throws JsonParseException, IOException {
    // If the first token in the stream is the start of an array ("[")
    // then assume the stream as a whole is an array of objects
    // To handle this, simply clear the token - The MappingIterator
    // returned by readValues will cope with the rest in either form.
    if(jsonParser.nextToken() == JsonToken.START_ARRAY) {
      jsonParser.clearCurrentToken();
    }
    iterator = objectMapper.readValues(jsonParser, JsonNode.class);
    this.nested = false;
    this.nestedStatuses = null;
  }
  
  @Override
  public boolean hasNext() {
    /* Suppressing empty documents in Population.populateCorpus is tricky.
     * So hasNext() returns true if their *could* be more tweets in the 
     * file, and next() returns null if there are none in the current 
     * main JsonNode; populateCorpus has to text for null.
     */
    return this.iterator.hasNext()  || 
            (this.nested && (this.nestedStatuses != null) && this.nestedStatuses.hasNext());
    // Belt & braces: this.nested should suffice.
  }

  
  @Override
  public Tweet next() {
    Tweet result = null;
    try {
      if (this.nested && this.nestedStatuses.hasNext()) {
        result = Tweet.readTweet(this.nestedStatuses.next(), contentKeys, featureKeys, handleEntities);
        // Clear the nested flag once the last item in the statuses
        // value's list has been used, so that the next call to next()
        // will drop to the else if clause.
        this.nested = this.nestedStatuses.hasNext();
      }
      
      else if (this.iterator.hasNext()) {
        this.nextNode = this.iterator.next();

        if (isSearchResultList(this.nextNode)) {
          this.nestedStatuses = getStatuses(this.nextNode).iterator();
          this.nested = this.nestedStatuses.hasNext();
          // Set the nested flag according as there is anything left
          // in the statuses value array (which could be empty).
        }
        else {
          this.nested = false;
          this.nestedStatuses = null;
          result = Tweet.readTweet(nextNode, contentKeys, featureKeys, handleEntities);
        }
      }
    }
    catch (IOException e) {
      logger.warn("Internal error in TweetStreamIterator", e);
    }
    return result;
  }

  
  @Override
  public void remove() {
    throw new UnsupportedOperationException("The TweetStream is read-only.");
  }
  
  
  public void close() throws IOException {
    iterator.close();
    jsonParser.close();
  }
  
  
  public static boolean isSearchResultList(JsonNode json) {
    return json.has(SEARCH_KEY) && json.has(STATUS_KEY);
  }
  
  
  public static ArrayNode getStatuses(JsonNode json) throws IOException {
    JsonNode statusList = json.get(STATUS_KEY);
    if (! (statusList instanceof ArrayNode)) {
      throw new IOException("Bad tweet format: value of 'statuses' is not an array!");
    }
    return (ArrayNode) statusList;    
  }
  
  
  public static boolean nonEmpty(JsonNode json) {
    boolean result = false;
    if (isSearchResultList(json)) {
      try {
        result = (getStatuses(json).size() > 0);
      }
      catch (IOException e) {
        logger.warn("Internal error in TweetStreamIterator", e);
      }
    }
    else {
      result = true;
    }
    return result;
  }
  
    
}