The infrastructure behind the Political Futures Tracker: Real-time annotation and indexing of Twitter streams
One of the principal targets of the Political Futures Tracker project has been to develop the infrastructure that allows us to monitor incoming data streams from Twitter, analyse the tweets and make the analysis results available for searching in near-real-time. This post describes the architecture we have created, and explains how we applied it to two different scenarios — long-term monitoring of tweets by parliamentary candidates (and responses to those tweets) throughout the election campaign, and short-term intensive monitoring of tweets with particular hashtags during the televised leaders' debates.
The live processing system is made up of several distinct components:
- The "collector" component receives tweets from Twitter via their streaming API and forwards them to a reliable messaging queue (JBoss HornetQ). It also saves the raw JSON of the tweets in backup files for later re-processing if necessary.
- The "processor" component consumes tweets from the message queue, processes them with the GATE analysis pipeline and sends the annotated documents to GATE Mímir for indexing.
- GATE Mímir receives the annotated tweets and indexes their text and annotation data, making it available for searching after a short (configurable) delay.
In its simplest form, the system looks like this:
Each component is described in more detail below.
Collecting the data
Twitter offers a set of streaming APIs that deliver tweets to consumers in real time as they are posted. Of particular interest for our purposes is the statuses/filter API, which allows you to specify certain constraints and then delivers all tweets (up to a maximum of around 50 per second) that match those constraints. Various kinds of constraints are supported, but the two that are of interest to us are:
- "track" — a textual filter that delivers all tweets that mention specified keywords (typically hashtags)
- "follow" — a user ID filter that delivers all tweets by specified Twitter users, as well as any tweet that is a retweet of, or a reply to, a tweet by one of the specified users.
For the long-term monitoring use case we use the YourNextMP service to assemble a list of all known parliamentary candidates who have a Twitter account. We add to this list any former MPs who are not standing for re-election, plus official political party accounts such as @Conservatives, and accounts for prominent non-Westminster politicians (e.g. the SNP and Plaid Cymru leaders, who are members of the Scottish Parliament and the Welsh Assembly respectively), and "follow" this list of user IDs.
For the debates, we simply "track" relevant hashtags for each debate (#leadersdebate, #BBCdebate), plus more general hashtags relating to the election (#GE2015, #UKElection, etc.).
The collector component uses the Hosebird client, a Java library written by Twitter themselves to simplify access to the streaming API. The Hosebird library handles the complexity of authentication, long-lived HTTP connections, and backoff-and-retry behaviour when the connection drops for any reason, so the actual collector logic is very simple. When a tweet arrives on the stream, the collector parses the JSON to extract the tweet ID, then packages the JSON into a message and sends it to the message queue, tagged with its ID (for de-duplication purposes). In parallel, the collector writes the tweet JSON to a backup file, so it is preserved for future reference (for example, if we improve the analysis pipeline we may want to go back and re-process previously-collected tweets with the new pipeline).
On top of the core collector library, we add a simple web front-end to configure the collector with Twitter API credentials and details of which users and/or hashtags we want to follow.
Processing the tweets
The processor component is a simple standalone Java application built using the Spring Boot framework. Spring Boot handles the routine tasks, like parsing of command line arguments, configuration of logging, and management of the application lifecycle, and allows you to very quickly create applications based on the Spring Framework in a few lines of code and configuration. In particular, it has support for the Java Message Service (JMS), allowing us to create a message consumer application with a few Java annotations.
We use GATE's Spring support to load the GATE processing pipeline and inject it into the message listener created by Spring Boot. Command line parameters supply the locations of the GATE processing pipeline, the queue to pull messages from, and the Mímir index to receive the results.
Indexing the results
The processor sends its annotated tweets to a GATE Mímir indexing server. Mímir indexes the plain tweet text, structural metadata like sentence boundaries, hashtags and @mentions, and the semantic annotations detected by the analysis pipeline, such as topic mentions, sentiment expressions, and references to MPs from the previous parliament and candidates for election. We also index document-level metadata such as the tweet author, the timestamp of the tweet to a suitable level of granularity (the nearest hour for the long-term collection, the nearest minute for the high-intensity debate analysis). Mentions of candidates and former MPs are linked to a semantic knowledge base that provides additional information such as their party affiliation and which constituency they are standing in, and the constituencies are in turn linked to higher-level geographic regions, allowing us to formulate complex queries such as
Find all positive sentiment expressions about the "UK economy" theme in tweets written by Labour candidates for constituencies in Greater London.
By issuing a series of such queries, for each broad theme, each party, each region, etc. we can generate useful visualizations like these.
Mímir builds index structures from the annotated data in memory, and performs a "sync to disk" at regular intervals to make the indexed tweets available for processing. The interval between sync jobs determines how close to real-time the tweets become searchable — for the continuous processing of tweets by candidates, one sync per hour is sufficient, but for the debates where we receive thousands of tweets per minute and want to visualise the results as quickly as possible, we sync at least once every five minutes.
Robustness and scalability
The architecture is deliberately loosely coupled — there is no direct dependency between the collector and processor components, communication is mediated through the message queue — and the components can be distributed across different machines for higher performance and/or robustness. If a processor fails, incoming tweets will simply stack up in the message queue and will be dealt with when the processor restarts.
If the throughput is higher than a single processor can sustain then we can scale out horizontally by starting up more processor instances, and the message queue will handle the sharing out of messages among consumers without duplication. For extremely high throughput, beyond that which a single Mímir can handle, each collector could post its annotated tweets to a separate Mímir index, with searches handled through a "federated" front-end index. However this has not proved necessary in our tests, one Mímir can easily sustain 10-15,000 tweets per minute, far more than the Twitter streaming API is prepared to deliver.
On the collector side, it is possible to run several collector instances on different machines, all delivering messages to the same queue. These could be clones, all configured to stream the same tweets (to guard against the failure of a single collector), or each collector could be set up to follow a different hashtag (to get around the rate limits Twitter imposes on a single streaming connection), either way the message queue takes care of filtering out duplicates so that each distinct tweet is only processed once. This was a factor in the choice of HornetQ as the message broker, as it has native support for duplicate message detection.