We use proprietary and third party's cookies to improve your experience and our services, identifying your Internet Browsing preferences on our website; develop analytic activities and display advertising based on your preferences. If you keep browsing, you accept its use. You can get more information on our Cookie Policy
Cookies Policy
FIWARE.OpenSpecification.Data.BigData R5 - FIWARE Forge Wiki

FIWARE.OpenSpecification.Data.BigData R5

From FIWARE Forge Wiki

Jump to: navigation, search
Name FIWARE.OpenSpecification.Data.BigData
Chapter Data/Context Management,
Catalogue-Link to Implementation Cosmos
Owner FIWARE Telefonica I+D, Francisco Romero Bueno

Contents

Preface

Within this document you find a self-contained open specification of a FIWARE generic enabler, please consult as well the FIWARE Product Vision, the website on http://www.fiware.org and similar pages in order to understand the complete context of the FIWARE platform.


Copyright

Copyright © 2016 by Telefonica I+D

Legal Notice

Please check the following Legal Notice to understand the rights to use these specifications.

Overview

This document details the Architecture of the BigData Analysis Generic Enabler (GE).

Please observe this document is included as part of the Open Specification of the BigData Analysis GE.

Several implementation for this enabler (GEi) may exist, being Cosmos the reference implementation one (GEri).

Big Data Analysis GE

The BigData Analysis GE is intended to deploy means for analyzing both batch and/or stream data, in order to get, in the end, insights on such a data revealing new information that was hidden. Batch data is stored in advance, and latency is not extremelly important when processing it. Stream data is recevived and processed in near real-time, and the results are expected to be ready immediately, basically because this kind of data is not stored at all and the insights must be taken on the fly. These two different aspects of the GE lead to the definition of two main blocks, the batch processing and the stream processing ones.

Additional blocks may complement the above ones, such as the block in charge of the historic persistence of context data, or the short-term historic block, a querying API regarding historical time series.

Please observe the usage of all these blocks together is not mandatory; so a GE implementation can be based on one, two, three or all the blocks.

Batch processing block

The batch processing block aims to be a Big Data as a Service platform on top of Openstack infrastructure. This platform will be in charge of providing, mainly, Apache Hadoop clusters on demand, but should not be restricted to that, and other cluster-based analysis solutions may be considered as candidate software to be installed within the cluster, such as Apache Spark.

This part of the GE clearly differentiates among computing and storage services:

  • The computing service allows creating and configuring a coordinated cluster of machines in a fast way by means a simple command line or REST API call. Additionally, the GE allows for selecting a subset from a wide range of processing and coordination pre-configured technologies (Hadoop, Hive, Oozie, HBase, Sqoop, PIG, etc.).
  • The storage service allows feeding the clusters with a huge variety of data without any special adapter or configuration. The technology used within this GE allows for high throughout due to data translations are not performed. To store data in the storage service of the GE is as easy as using a command line or REST API call.

Using any of the services (computing or storage) does not imply to use the other. This way, the data storage is independent from the data lifecycle associated to its processing.

In addition, this block will enrich the Hadoop ecosystem with specific tools allowing for the integration of Big Data with other GEs such as the Publish-Subscribe Context Broker or the Open Data Portal.

Stream processing block

The stream processing component is a tool which represents a significant change from the current solutions in the area of the real-time processing of information. This is achieved by means of:

  • The integration of a user interface which combines visual programming and the power provided in this field by a technology which, as we have seen, is Apache Storm. In this way users are offered a modular interface which is easy to use and adaptable to their needs, all in combination with technology with a high processing capacity, thereby allowing the creation of customized solutions to problems which may arise among users with very different profiles.
  • The creation of a community of users. The Stream processing component is the result of cooperation and knowledge, whereby the work of each of the users can be reused and individual efforts represent an investment for improving the tool. Thus, with each contribution, the number of modules available increases and, therefore, the chances of generating intelligence do so too. The Stream processing component provides the members of the community with the capacity to gather information from multiple sources, process it and enrich it in a dynamic and continuous manner. In this model, users can create their own algorithms in order to handle the information in the manner most suitable for their purposes, by means of the creation of customised topologies and fully exploiting all their processing capabilities to obtain the greatest value possible from the processed information.

There are several solutions on the market which allow real-time data processing with high capacities, e.g. Apache Storm, but to use these tools it is necessary to have fairly advanced technical knowledge and costly installations with dedicated machines to obtain good yields. One of the purposes of the Stream processing component, since its inception, has been to eliminate this initial complexity, in many cases causing people with less technical knowledge but high capabilities and concerns when creating intelligence to stop employing tools as powerful as tools as Apache Storm.

To overcome this obstacle the stream processing component implements a new layer of abstraction over the Apache Storm technology, simplifying as much as possible the creation of processing elements and modules and combining them to form topologies in a simple and rapid way.

Historic persistence block

From a FIWARE perspective, one of the main sources of data both for batch and stream processing blocks before mentioned is the Publish/Subscribe Context Broker GE. Since the data managed by the Context Broker is an snapshot of the current context, past context data is lost unless persisted as an historic somewhere else; in this case, HDFS.

This block will use the subscription mechanism of the Context Broker in order to receive a flow of context-related notifications that will be stored in HDFS using some standard file format, e.g. CSV, Json or Parquet.

Although the main purpose for this block is building historics in HDFS, other storages can be used as well, e.g. a NoSQL backend. In this case, many other storing strategies can be adopted, for instance, time series.

Short-term historic querying block

As previously said, the architecture of the Big Data GE may be enriched with the usage of a block in charge of persisting historic context data, mainly in HDFS, but in other storages such as a NoSQL backend handling time series. Because of the characteristics of these storages making them different to HDFS, i.e. less capacity but higher access time, a specific block may be defined in charge of retrieving historical raw and aggregated time series information about the evolution in time of context data registered by a Publish/Subscribe Context Broker.

Intended audience

This document is mainly addressed to those service providers aiming to expose BigData Analysis GE services. For those service providers, the data analysis is not a goal itself but providing ways others can perform such data analysis.

If you are a data scientist willing to get some insights on certain data; or you are a software engineer in charge of productizing an application based on a previous data scientist analysis, this document may be of certain interest; but most probably you will want to visit the User and Programmer Guide of Cosmos GEri; and/or go directly to the FIWARE Lab global instance of Cosmos, there you will find an already deployed infrastructure ready to be used through the different APIs.

Structure of the document

Apart from this overview, the Architecture description of the BigData Analysis GE contains four main sections: we start by listing the basic concepts any reader should be familiarized with, including a review of the state of the art; these are the base for the detailed architecture section; the architecture is complemented with a detailed specification of the main interactions between a user and the enabler, but interaction among the components are described as well; the document ends with the basic design principles governing this Architecture.

When being part of the Open Specification, three more sections are added: the detailed specifications containing basically API descriptions, an explanation on other technologies reutilised by this GE, and a final section about common terms and definitions.

Contact information

Use ask.fiware.org for general questions about the enabler, for instance, use cases and discussions on the architecture. Use it even for general questions about FIWARE, e.g. how many cities are using FIWARE, how can I join the accelerator program, etc.

Despite it is given, please try to avoid personally emailing the GE owners unless they ask for it. In fact, if you send a private email you will probably receive an automatic response enforcing you to use ask.fiware.org. This is because using the mentioned method will create a public database of knowledge that can be useful for future users; private email is just private and cannot be shared.

Basic concepts

Five categories of basic concepts are shown in this document:

  • Distributed storage.
  • Distributed batch processing.
  • Distributed stream processing.
  • Big Data as a Service.
  • Time series.

Distributed storage

If the computing capabilities allow for working with the data, the storage ones make possible the data can be available for the computing capabilities. For achieving that, obviously it is necessary to have a big amount of storage space, but it is also critical the data is stored in an efficient way in order the access time does not become traumatic. Finally, the storage capabilities do not have sense without data injection mechanisms.

Distributed storage systems

A distributed storage system is a network of computers where the information is stored in more than one node, in order each one of those nodes owns certain part of the original information. It is very common these distributed systems apply a replication factor as well, i.e. each part of the original data is store twice o more times in different nodes of the network. The goal is to ensure the data recoverability when a node fails.

The distributed storage systems are crucial when dealing with Big Data environments because the grant the required storage volume and the scalability. Normally, a centralized process manages the system, providing a unique view of the total storage independently of the number of nodes within the network and the information contained in each node. This unique view offers services for creating, modifying and deleting files. This is possible because certain meta-information is managed: a list of distributed files, containing each file another list of data blocks and the location of each block.

In Big Data environments, the de facto standard is the Hadoop Distributed File System (HDFS), the file system under the MapReduce engine of Hadoop. However, many other distributed storage systems are arising, compatible or not with Hadoop, such as Cassandra File System (CFS) or Google File System.

NoSQL storage systems

Coined in the late 90's the term NoSQL represents database storage technologies that eschew relational database storage systems such as Oracle or MySQL. NoSQL emerged from a need to overcome the limitations of the relational model when working with large quantities of data, typically in unstructured form. Initially, as a reaction to these limitations, NoSQL was considered, as the name might be interpreted to be an opposition movement to using SQL based storage systems. However as it's seen that SQL and NoSQL systems often co-exist and complement each other the term "NoSQL" has morphed to mean "Not only SQL".

With a change in usage focus, new applications, in particular those for the web, are no longer read orientated rather they are tending to read/write if not write heavy. Traditional SQL based systems struggle with this when demand scales up often enough the underlying data store cannot do the same, without incurring downtime. These systems are based on the ACID ("Atomic, Consistent, Isolated, Durable") principle:

  • Atomic - either a transaction succeeds or not
  • Consistent - data needs to be in a consistent state
  • Isolated - one transaction cannot interfere with another
  • Durable - data persists once committed even after a restart or a power-loss

In systems that need to scale out it's not always possible to guarantee that the data being read is consistent or durable. For example when shopping during times of high demand, say Christmas, via the web for a particular item, it is more important that the web site remains responsive, so as not to dissuade customers, rather than the inventory count for every item is kept up to date. Over time item counts will get refreshed as more hardware is brought on stream to be able to cope with the demand.

NoSQL systems are designed around on Brewers CAP Theorem, that says if a distributed system wants Consistency, Availability and Partition Tolerance, it can only pick two. Rather than NoSQL striving for ACID compliance, NoSQL systems are said to aim for eventual consistency (BASE - Basic Availability, Soft and Eventual Consistency). Such that over time the data within the system becomes consistent via consolidation, in the same way accountants close their books at the end of an accounting period to provide an accurate state of accounts.

The different types of NoSQL database are:

  • Column Store - Data storage is orientated to the column rather than the row as it is with traditional DBMS engines, favouring aggregate operations on columns. These kinds of stores are typically used in data warehousing. Example implementations are Apache HBase, Google BigTable and Apache Cassandra (which in addition, as already seen, is a whole file system).
  • Key Value Store - A schema-less storage system where data is stored in key-value pairs. Data is accessed via a hash table using the unique key. Example implementations are Dynamo and Redis.
  • Document Store - Similar to Key Value storage, document storage works with semi-structured data that contain a collection of key-value pairs. Unlike key-value storage these documents can contain child elements that store relevant knowledge to that particular document. Unlike in traditional DBMS engines, document orientated storage does not require that every document contain all the fields if no information is there for that particular document. Example implementations are Apache CounchDB and MongoDB.
  • Graph Database - Using a graph structure, data about an entity is stored within a node, relationships between the nodes are defined in the edges that interconnect the nodes. This allows for lookups which utilize associative datasets as the information that relates to any given node is already present, eliminating the need to perform any joins. An example implementation is neo4j.

Data lifecycle

The data to be analysed in Big Data environments may be of very different nature, but as already commented, they can always be classified into two groups: batch and streaming. The first ones are big collections of data that must be stored in advance to the analysis, while the second kind of data is analysed on the fly, in real or almost real time.

When dealing with one type of data or the other this difference leads to adopt different ways for feeding the Big Data environment. Thus, in some cases will be necessary to allow input ports for dynamic flows of data, while in other cases the copying historic data from the source to the distributed storage will be enough. In any case, this phase is called Data Injection phase, and amount other things, it may imply the normalization, anonymization, encryption, filtering and compression of the data.

Next is the Data Ingestion Phase. Within this phase it must be ensured the necessary capability to absorb the volume of data is available. Then, and only then, the data copying (if it has sense) can be performed, either in raw of a predefined format. Sometimes, as occurs with the flows or events, it may be required an analytic study in the phase, due to such data in not usually stored.

After the data ingestion arises the Processing Phase, involving a unique analysis or an orchestrated chain of several ones. Certain elasticity must be granted if the computing capabilities increase during the processing task, and the data protection (privacy, integrity, etc.) must be monitored as well.

Finally, the results are consumed in the last phase. This means the output data becomes new input data for new other analysis, visualization tools, query systems, etc. The results may be stored in a different external repository in order to ensure their durability and allow for better access times.

Distributed batch processing

Computing capabilities (both processing power and available analysis technologies) are essential in every Big Data environment. On the one hand, the data volume is going to be large enough for having an efficient processing strategy (without being real time, the response time must be acceptable). On the other hand, we need efficacy when looking for insights (we need the better insights, not whatever ones). Efficiency is achieved by means of the MapReduce paradigm, while efficacy is given by the analytic tools and the jobs chaining orchestration.

MapReduce

MapReduce (MR) is a paradigm evolved from functional programming and applied to distributed systems. It was presented in 2004 by Google. It is meant for processing problems which solution can be expressed in commutative and associative functions.

In essence, MR offers an abstraction for processing large datasets on a set of machines, configured in a cluster. With this abstraction, the platform can easily solve the synchronization problem, freeing the developer thus of thinking about that issue.

All data of these datasets is stored, processed and distributed in the form of key-value pairs, where both the key and the value can be of any data type.

From the field of functional programming, it is proved that any problem which solution can be expressed in terms of commutative and associative functions, can be expressed in two types of functions: map (named also map in the MR paradigm) and fold (named reduce in the MR paradigm). Any job can be expressed as a sequence of these functions. These functions have a restriction: they operate on some input data, and produce a result without side effects, i.e. without modifying neither the input data nor any global state. This restriction is the key point to allow an easy parallelization.

Given a list of elements, map takes as an argument a function f (that takes a single argument) and applies it to all elements in a list, returning a list or results. The second step, fold, accumulates a new result by iterating through the elements in the result list. It takes three parameters: a base value, a list, and a function, g. Typically, map and fold are used in combination. The output of one function is the input of the next one (as functional programming avoids state and mutable data, all the computation must progress by passing results from one function to the next one), and this type of functions can be cascaded until finishing the job.

In the map type of function, a user-specified computation is applied over all input records in a dataset. As the result depends only on the input data, the task can be split among any number of instances (the mappers), each of them working on a subset of the input data, and can be distributed among any number of machines. These operations occur in parallel. Every key-value pair in the input data is processed, and they can produce none, one or multiple key-value pairs, with the same or different information. They yield intermediate output that is then dumped to the reduce functions.

The reduce phase has the function to aggregate the results disseminated in the map phase. In order to do so in an efficient way, all the results from all the mappers are sorted by the key element of the key-value pair, and the operation is distributed among a number of instances (the reducers, also running in parallel among the available machines). The platform guarantees that all the key-value pairs with the same key are presented to the same reducer. This phase has so the possibility to aggregate the information emitted in the map phase.

The job to be processed can be divided in any number of implementations of these two-phase cycles.

Figure 1 - MapReduce paradigm

The platform provides the framework to execute these operations distributed in parallel in a number of CPUs. The only point of synchronization is at the output of the map phase, were all the key-values must be available to be sorted and redistributed. This way, the developer has only to care about the implementation (according to the limitations of the paradigm) of the map and reduce functions, and the platform hides the complexity of data distribution and synchronization. Basically, the developer can access the combined resources (CPU, disk, memory) of the whole cluster, in a transparent way. The utility of the paradigm arises when dealing with big data problems, where a single machine has not enough memory to handle all the data, or its local disk would not be big and fast enough to cope with all the data.

This paradigm has had a number of different implementations: the already presented by Google, with a patent, the open source project Apache Hadoop, that is the most prominent and widely used implementation, and a number of implementations of the same concept: Sector/Sphere, Microsoft has also developed a framework for parallel computing, Dryad, which is a superset of MapReduce.

These implementations have been developed to solve a number of problems (task scheduling, scalability, fault tolerance...). One such problem is how to ensure that every task will have the input data available as soon as it is needed, without making network and disk input/output the system bottleneck (a difficulty inherent in big-data problems).

Most of these implementations (Google, Hadoop, Sphere, Dryad...) rely on a distributed file-system for data management.

Data files are split in large chunks (e.g. 64MB), and these chunks are stored and replicated to a number of data nodes. Tables keep track on how data files are split and where the replica for each chunk resides. When scheduling a task, the distributed file system can be queried to determine the node that has the required data to fulfil the task. The node that has the data (or one nearby) is selected to execute the operation, reducing network traffic.

The main problem of this model is the increased latency. Data can be distributed and processed in a very large number of machines, and synchronization is provided by the platform in a transparent way to the developer. But this ease of use has a price: no reduce operation can start until all the map operations have finished and their results are placed on the distributed file-system. These limitations increase the response time, and this response time limits the type of solutions where a “standard” MR solution can be applied when requiring time-critical responses.

Hadoop

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

Three are the pillars of Hadoop:

  • Hadoop Common, which provides access to the supported file systems (HDFS and many others).
  • The engine for parallel processing of vast amounts of data (Hadoop MapReduce), based on a proprietary resource manager and job scheduler (YARN).
  • Hadoop Distributed File System (HDFS), a distributed file system assuring high throughput when accessing the data.

Architecturally speaking, a distributed file system is on the bottom of the software stack. Please observe several compatible distributed file systems may be used, e.g. Amazon S3, CloudStore or FTP FileSystem, but here only HDFS will be explained. HDFS supports big data files by splitting them and distributing their blocks among all the Datanodes of the cluster. A special node called the Namenode is responsible for storing the indexes for each distributed file and, in general, for file system management (naming, deletions, creations, etc.).

In the first versions of Hadoop, over the HDFS layer was the MapReduce layer, mainly a set of Tasktracker nodes running within the slaves in charge of performing map and/or reduce tasks over the data stored in the HDFS, being a special node called the Jobtracker within the master who decides which task and which data is assigned to each Tasktracker. Nevertheless, with the irruption of the so-called MapReduce2 or YARN, the Jobtracker functions are assumed by the ResourceManager, i.e resource management and job scheduling, which are splitted into the ApplicationsManager and the Scheduler components, respectively. On the slaves side, the Tasktracker is replaced by the ApplicationsMaster, one per each running application. Now, the jobs are submitted to the ApplicationsManager, which negotiates a container for the specific ApplicationMaster. On its side, the ApplicationMaster negotiates with the ResourceManager's Scheduler the necessary computation resources for accomplishing the analysis task. At final term, the ResourceManager's Scheduler commands the different NodeManagers running at the slaves in order to arbitrate the whole resources among all the applications in the system.

Figure 2 - MapReduce2/YARN architecture; source: hadoop.apache.org

Both in MapReduce1 or YARN the list of tasks to be collaboratively accomplished is extracted from a Java JAR file, built by the analyst using the Hadoop API and containing the desired MapReduce job in terms of Java sentences invocating the Hadoop development API. This Java JAR is also replicated several times in the HDFS by the Jobtracker/ResourceManager in order it is highly available for the final interpreters of that Java code, the Tasktrackers/ApplicationMaster. Finally, a workstation or JobClient node is used to initially upload the JAR file to the HDFS and invoke the Hadoop execution.

The above architecture may change depending on the performance requirements and/or the dimensions of the available cluster. If the cluster is very limited, it may be necessary to merge two or more nodes in a single one, which penalizes the CPU performance, but improves the transactions at the same time. Therefore, it is usual to find both Tasktraker/ApplicationMaster and Datanode coexisting in the same node, or having the Jobtraker/ResourceManager running in the same machine than the Namenode. If the cluster is very limited, it is also possible to have a node acting both as Jobtraker/ResourceManager, Namenode and (limitedly) Tasktracker/ApplicationMaster and Datanode.

Once a job is finished, some resulting insights are generated. The HDFS may store these results together to the input data, but it is not recommended due to bad access response times, and a NoSQL database is suggested for these purposes (see below).

Analytic tools

Despite of MapReduce is the core of batch processing, its interface/user experience is complex and only suitable for developers knowing very well the different APIs of the chosen implementation (Java, Python, etc). Arises then the need for a new kind of tools abstracting the complexity of the MapReduce APIs and exposing easier interfaces.

SQL-like tools

The more popular analytic tools around MapReduce are those based on SQL. This is a wide extended, high level and powerful database query language, and its adaptation to the Big Data world had to be a success. And that was the case with Apache Hive, Apache Pig, Spark Streaming (older Shark), Impala and may other analytic tools running on top of Hadoop.

The key of the success of these tools is, as said, to abstract the MapReduce paradigm, which is still working but in a transparent way. So, when the user decides to retrieve certain information, for instance a specific column of the dataset, the SQL query is automatically transformed into some predefined MapReduce jobs; in the proposed example, a Map function will iterate on all the rows in order to select the desired column, and the Reduce function joins all the individuals values in a unique and new column.

As can be inferred, the kind of datasets where these analytic tools have sense are those whose information is stored in a structured fashion, i.e. datasets whose content may be easily transformed into SQL tables.

Other

There are many other alternatives to the SQL-like analytic tools. It is the case of the R adaptation for Hadoop; the machine learning tools included in Apache Mahout, a component of the Hadoop ecosystem; or the extensions allowing for programming in Python the MapReduce jobs.

These and other tools are out of the scope of the Big Data GE and will not be explained nor detailed.

Coordination

No data analyst will be able to do a complex processing in a single step. Maybe the chosen tools do not provide all the required functionalities; and in the case the tool is suitable for performing all the analysis, this confronts all the good developing practices (modularity, reusing, etc.). But the main reason for not doing so is that the complex analysis are commonly based in a sequence of steps where several tools may be involved: the output of an analysis will be the input of another one, thus, until the first one does not finish, the second one will not be launched. The problem here is to monitor when a job finishes and its output can be injected to the next job, and this is something cannot be done by simply waiting because it can last hours even days. It seems to be necessary an orchestration mechanism of processes. There are several alternatives for orchestration, mainly in widely used ecosystems such as Hadoop, where Apache Oozie is the main asset.

Distributed stream processing

Stream processing

Stream processing is a technical paradigm to process a large volume of unbound sequences of tuples (Stream) in real time. This sentence defines the Streaming process as the ability to process a sequence of continuous data in real time, in other words, processing information as it is received or at least with the shortest time possible.

Making a simile with a river, the source of the river is the origin of the data, the information flow is the river itself and a fisherman on the bank represents the processing.

There are many technologies and platforms which resolve many of the requirements of Batch processing, in other words, the processing of data stored or “at rest”. These include MapReduce and Apache Hadoop.

These technologies have made it possible to store and process data in previously unimaginable volumes. However, these systems do not work in real time. There is no way of converting Hadoop into a real-time processing system. In contrast, Apache Storm is designed for just this. If you are familiar with Hadoop, the following table will help you to understand the main differences between Hadoop and Storm:

Apache Hadoop Apache Storm

Large but finite tasks

“Infinite” tasks, known as Topologies

Processes many data at the same time

Processes an infinite flow of data, one tuple at a time

High latency

Low latency

Storm

Introduction

As stated in the project website, Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language.

Storm was created by Nathan Marz and the team from Backtype, a company which was acquired by Twitter in 2011. It became free software in late 2011, following the said acquisition, and has formed part of Apache Incubator since late 2013. Since September 2014 it has been a top-level project within the Apache Project. It is written predominantly in Clojure language.

Apache Storm is a platform which allows the analysis of continuous data streams, as it reads the data in real time, processing them at the same moment. It has many use cases: real-time analysis, machine learning, continuous computing, distributed RPC, ETL, etc. Storm can be integrated into queue technologies and the databases we currently use in our systems.

Apache Storm meets a series of requirements, such as those mentioned in the previous point, which make this technology powerful enough to become the spearhead of real-time processing:

  • Fast and scalable. Apache Storm is a distributed computing system, which means that it must be fast and scalable inasmuch as it is intended to maximize resources and distribute the load. For this purpose Storm makes use of parallelism, a message-passing mechanism using queues and an intra-thread message library called LMAX [1] which allows the distribution of the processes into different “workers” (we will see this concept later) which are distributed by the different nodes of the cluster, making the most of the resources.
  • Fault tolerance. Apache Storm works in a cluster operating model whereby each element works separately and there exists communication allowing us to keep a check on the status of each of them. The fault tolerance is provided by the separation of the roles and functions between each element. Although we will see this in greater detail later on, a description of each element of the Storm cluster is provided below by way of introduction in order to understand how Storm provides fault tolerance:
    • Nimbus: It is responsible for distributing the code of each Topology over the cluster nodes, assigning tasks to the nodes and monitoring any potential faults.
    • Zookeeper: Storm uses Zookeeper to coordinate the different services which make up the cluster.
    • Supervisor: It is responsible for collecting and processing the works assigned to the machine where it is located, using the Workers available in the said node.
    • Worker: The workers execute a full Topology or a portion of the Topology.
  • Guarantees messages will be processed. Storm guarantees that each message will be fully processed. This guarantee is not performed automatically; it is performed by using the mechanisms Storm provides to us by means of its ack framework.

Storm main concepts

To understand Apache Storm we need to understand some basic concepts. Concepts such as Topology, Stream, Spout and Bolt form part of the Storm operation.

Topology
Figure 3 - Storm topology

Storm implements Topologies. A topology is the design of an algorithm. The logic of a real-time application is packaged in the form of a topology. It is analogous to a Hadoop MapReduce job, with the difference that the latter ends and the topology is executed forever (or until the user halts it).

Topologies have the shape of DAG (directed acyclic graphs), whereby each vertex of the graph or node is a Spout or Bolt, connected to each other by Streams. All these concepts are described in detail below.

The topologies are defined by the Storm users according to the processing of the data they wish to perform. Initially, these data are “raw” when they are read by the Spouts. The Spouts perform the first transformation into tuples, according to the data model we are using for them in the topology, and they are then processed in the Bolts in accordance with the defined algorithm. In summary, the data enter, they are processed and the result is sent to the defined output.

Stream
Figure 4 - Storm stream

Storm use the concept of “Stream”, a “stream” being regarded as an unlimited sequence of tuples in which each tuple is a “message” which is passed from node to node.

Each stream has an identifier (ID) and has the fields which make up the tuples through it configured. We must be able to serialize the values of the tuples. By default, they may contain integers, longs, shorts, bytes, strings, doubles, floats, booleans and byte arrays.

Spout

Spouts are sources of streams in a topology. They are responsible for “building” the tuples, usually from an external data source, and they emit them within the topology. Examples of Spouts are those which read queueing systems like Kafka and RabbitMQ and those which receive streaming data (e.g. via the Twitter API).

Bolt

All the processing in a topology is performed in Bolts. They can perform filtering, functions, aggregates, links, reading and sending data to databases, etc.

A Bolt is no more than a portion of code, ideally as simple as possible, which performs processing on the tuples it receives and emits them so that they continue in the topology.

Therefore, a Bolt performs the transformation of the information. To perform complex transformations, we can use several Bolts in the same topology and split the processing between them. The more modular the Bolts are, the more scalable the topologies will be and the more often we will be able to reuse the Bolts in different topologies.

Tasks and workers
Figure 5 - Storm workers

Each Spout or Bolt executes one or more tasks in the cluster. Each task is an execution thread. We can configure the parallelism of each module (Spout or Bolt) individually.

The topologies execute one or more “worker” processes. Each worker is a Java virtual machine (JVM) and executes the subset of tasks of that topology For example, if the sum of all the parallelisms of the modules of a topology is 20 and 4 workers are assigned to it, each worker will execute 5 tasks. Storm attempts to divide the number of tasks proportionally among all the workers.

We can see a worker representing a topology with three modules (Spouts and Bolts). Two of them (1 and 2) have parallelism one (a single task) and the third has parallelism two (two tasks).

Cluster

A Storm cluster is similar to a Hadoop cluster. While a Hadoop cluster executes “MapReduce” tasks, Storm executes topologies. These two forms of execution are very different, especially because the MapReduce tasks are finite and the topologies are executed infinitely (or until the user manually halts them).

Figure 6 - Storm supervision

There are two types of nodes in a Storm cluster: the master node and the “worker” nodes. The master node executes a service called Nimbus, similar to Hadoop’s JobTracker. This service is responsible for distributing the code through the cluster, assigning tasks to the worker nodes and monitoring if there are faults.

Each node worker executes a service called Supervisor. This is responsible for dealing with requests for the assignment of tasks from Nimbus and for starting and halting worker processes based on what Nimbus assigns it.

The coordination between Nimbus and the Supervisors is performed by means of a Zookeeper node. In addition, the Nimbus service has rapid fault recovery and does not keep statuses; the status is stored in Zookeeper or in the local disk. This means you can kill the Nimbus process or the Supervisors and they will begin their execution again as if nothing has happened. This design makes the Storm clusters incredibly stable.

To sum up:

  • Nimbus: This is the master node, responsible for distributing the code through the cluster. It launches the workers, monitors the processing and reassigns the workers if necessary.
  • Zookeeper: This coordinates the Storm cluster.
  • Supervisor: This starts and halts the execution of the workers, depending on the orders from the Nimbus node (it communicates with Nimbus through the Zookeeper).

Big Data as a Service

Big Data services are complex to deploy and maintain, and in addition large infrastructure is usually required. These are the reasons most data scientists and software engineers, just willing to get some insights on certain data and in charge of productizing an application based on a previous data scientist analysis, respectively, rely more and more in specialized BigData service providers.

Specifically, these kind of Internet services provide:

  • Simplicity. The technology is ready to be used, there is no need on learning how to deploy it.
  • Scalability. The clusters may be as large as required, but if the initial resources become insufficient then more resources can be dynamically added.
  • Reliability. This is directly inherited from the distributed storage concept itself, which is redundant and fault tolerance by design.
  • Security. The fact of having a private computing cluster makes no more resources have to be shared among several clients or tenants.
  • Flexibility. This kind of service usually allow for personalizing the clusters, adding as many components as desired from the ecosystem.
  • Storage. It is usual the input data, even the analysis results are stored within another large storage based in cloud technologies as well.
  • Economic. These kind of services tend to be very economic since they are based on commodity hardware and open source software.

Since Hadoop is the de facto standard in the industry and has a long way, most of the BigData as a service platforms are about the on-demand request of Hadoop clusters. Nevertheless, other tools such as Spark and have seen the light in the recent times, increasing the interest for this kind of on-demand clusters.

Hadoop as a service (HaaS)

On the one hand, there are several commercial instances of HaaS:

On the other hand, Openstack's Sahara is the more relevant effort among the open source community on creating a complete HaaS software. As stated in the Sahara's web page, its key features are:

  • It is designed as an OpenStack component.
  • Sahara is managed through REST API with UI available as part of OpenStack Dashboard.
  • It supports for different Hadoop distributions and integrates with vendor specific management tools, such as Apache Ambari or Cloudera Management Console.
  • It uses predefined templates of Hadoop configurations with ability to modify parameters.
  • There is a file system implementation for Swift, in order it can be used as a permanent big data storage and a source of data for Hadoop.

Spark as a Service (SpaaS)

As in the case of HaaS, several initiatives of SpaaS are raising, specially:

From the open source point of view, the Openstack's Sahara community efforts about moving from a Hadoop as a Service platform to a multi-purpose BigData as a Service have been rewarded thanks to the publication of a plugin for Spark (among others).

Time series

As a matter of fact, the short-term historic is a time series database, this is, a database specialized in managing series of data points listed in time order. As a time series system, the short-term historic manages two basic concepts:

  • Resolution or aggregation period: the time period by which the aggregated time series information is grouped. The resolution can be interpreted as the “zoom level” the aggregated data is consumed. Possible valid resolution values supported by the short-term historic are: month, day, hour, minute and second.
  • Origin: for certain resolution, it is the origin of time for which the aggregated time series context information applies. For example, for a resolution of minutes, a valid origin value could be: 2015-03-01T13:00:00.000Z, meaning the 13th hour of March, the 3rd, 2015. The origin is stored using UTC time to avoid locale issues.

Currently available solutions to use time series databases are:

  • InfluxDB, as a reference of an innovative and relatively new time series database.
  • OpenTSDB, as a reference of a more established, distributed and scalable time series database.
  • MongoDB, as a reference of a NoSQL database.

Let's see an analysis of the above three alternatives:

Indicator InfluxDB OpenTSDB MongoDB

General maturity

Low (1b) (2)

High

High

Dependencies

LevelDB (3a), RocksDB, HyperLevelDB, LMDB (3b)

HBase (Bigtable data model) (4b), Cassandra is in the roadmap (35c)

Scalability

Low (1a)

High (4a) (35d)

Medium (24)

HA

Multiple TSDaemons (4a) (35a) (35b), Inherited HA from Hadoop world

Replica sets (28), Sharded clusters (29)

I/O performance

High (4c) (35e) (35f)

Performance highly dependant on the schema, i.e. data to be reported (17), Complex if the data input cadence is not constant (30) (33) (34)

I/O APIs

HTTP, Javascript, Java, Ruby, Python, Node.js
External dashboard integration: Graphana (9), influga (10)

HTTP (11), RPC, API Java
External dashboard integration: Graphana (9)

HTTP (18), Javascript, Java, Python, Ruby, Node.js, C, C++

Binary I/O

collectd protocol (5)
Additional binary protocol support in the roadmap

Direct “blob” loading

MongoDB Wire Protocol (19)
New Meta Driver spec in the roadmap (20)

I/O Security

In the roadmap (basic auth)

Authentication and authorization at HBase level. ACLs can also be used.
Depends on underlying Hadoop security

Authentication (25), authorization (role-based) (26), auditing (27) encryption (communications (SSL) and storage) (28)

Query language

SQL-like supporting continuous queries

Proprietary (7)

Programmatic JSON-oriented

Functions

min, max, mean, mode, median, percentile

sum, min, max, average, stdev, zimsum, mimmin, mimmax (8)

Aggregation pipeline (15), mongo-metrics (21)

Custom user-defined functions

In the roadmap

Map-Reduce support (31)

Auto-deletion of data

In the roadmap (TTL)

TTL support (32)

PubSub

In the roadmap

Basic support via tailable cursors (16)

Monitoring

Do it yourself (14)

Nagios

MongoDB Management Service + monitoring utilities + database commands

Other

Tags and column indexes in the roadmap (6)

Tags/metadata (13)
Trees for hierarchical organization (12)
KairosDB is a fork which uses Cassandra (23)

Who is using it?

(2)

ebay.com, pinterest.com, ticketmaster.com, tumblr.com, box.com... (22)

(for time series analysis) Silver Spring Networks, EnerNOC, Square, Server Density, Skyline Innovations (Nextility)

Generic architecture

Batch processing block

Figure 7 - Big Data architecture (shared computing cluster version)
Figure 8 - Big Data architecture (private computing clusters version)

As previusly said, the batch processing block is part of an enabler about providing BigData analysis means for data scientists, avoiding them to deploy any kind of infrastructure nor any kind of software, focusing on the analysis. So, the core idea of the GE is the on-demand provision of infrastructure supporting the desired analysis software that, once used, is released in order future users may have the opportunity to request resources as well.

On the one hand, the infrastructure usually comprises a cluster of machines since BigData is commonly processed in a distributed fashion. Machines can be physical or virtual; a mix of both types is also possible.

On the other hand, regarding the analysis software we consider the following tools as candidate to be installed on top of the provisioned cluster:

Regarding the on-demand provision of computing facilities, a manager must be deployed in charge of scheduling the usage of the resources availability in terms of computing power and time. Please have into account that requesting for computing resources may mean several different things:

  • A dedicated private entire cluster is deployed on top of the available infrastructure, running the desired technology.
  • A single already deployed shared cluster is partitioned into several users through any available mechanism within the run technology.
  • An intermediate solution, consisting of a limited pool of already deployed shared clusters, that are assigned to the users along the time.

Within the enabler, computing and storage are independent concepts. Thus, in addition to the above computing means, the proposed architecture also proposes a permanent and unlimited cluster for data storage purposes; since the demanded analysis clusters are temporal in the sense once used they are deleted and the resources released, the input, and most important, the output data must be saved somewhere stable.

Both storage and computing clusters are glued by using a GUI or a CLI, a means for provisioning user accounts, giving access to the features about creating on-demand clusters, using the analyzing tools, etc.

Far beyond the storage and computing platform, an ecosystem raises when the above solution is completed/complemented with certain management and utility tools:

  • A tool feeding the permanent and unlimited storage cluster with context data coming from the Publish-Subscribe Context Broker GE; this builds historical views of the context data.
  • A tool for designing compositions of chained MapReduce jobs based on general building blocks.
  • Hadoop extensions for using external data.
  • Open Data Portal GE extensions allowing using the storage services of the BigData Analysis GE for Open Big Data.
  • A PEP Proxy together with an OAuth2 Tokens Generator if wanting to implement OAuth2-based authentication and authorization in the REST APIs the enabler may have.

Permanent storage service

Being a distributed storage, the main candidate for implementing it is HDFS, nevertheless any other distributed storage may be used if its content may be used by the Big Data platform implementing the computing service of the enabler.

Apart from the storage facilities themselves, the storage service must include a services node, a single entry point to the storage resources hiding the details of the rest of the infrastructure. This services node will be accessed by the users as the endpoint for I/O operations (either a REST API server, either the host where to run any I/O command-line tool, or both).

Among the above mentioned I/O operations must be the following ones:

  • Write/in operations:
    • Creating a data container, such as a file or an object, with initial content.
    • Appending new data to an already existent container.
    • Deleting an already existent container.
    • Grouping data containers, such as a folder or a collection.
    • Changing ownership of data containers and groups of containers.
    • Changing permissions of data containers and groups of containers.
  • Read/out operations:
    • Listing the content of a group of data containers.
    • Detailing the ownership, permissions and path of a data container.
    • Reading a data container.

Computing resources manager

This is the component in charge of managing the available computing resources, which may be relatively limited depending on the particular deployment. In order to satisfy the demands of the users requesting for dedicated resources, the manager must have into account not only how many are available, but how much time they can be used. Thus, this is a bidimensional problem to deal with; some tool may help in discovering available computing slots and reserving them.

In any case, requests will parameterized by:

  • The amount of computing power.
  • The time this computing power will be used.
  • The kind of Big Data technology to be deployed.
  • A list with any other software both from the particular technology ecosystem or the BigData GE ecosystem aimed to be deployed.

On-demand computing service

Being a distributed computing cluster, the technology used depends on the user's demands:

  • If aiming to process batch data through the MapReduce paradigm, then Apache Hadoop is the definitive choice; if in addition any querying tools such as Hive or Pig may be used for a simpler approach.
  • If the user desires an in-memory data processing, oriented to chaining of several transformations and actions, then Apache Spark must be deployed. It also provides relative real-time capabilities in the sense of certain queries can be done on the stream of data.
  • Being quite similar to Spark, Apache Flink is addressed when iterative or cyclic transformations are desired on the data. Near real-time analysis is also allowed (again, through queries against the current stream of data).

Apart from the computing facilities themselves, the computing service must include a services node, a single entry point to the computing resources hiding the details of the rest of the infrastructure. This services node will be accessed by the users as the host where to locally uploading and running analysis jobs, but also as the endpoint where to remotely submit the execution of analysis jobs.

Ecosystem

New Hadoop inputs

Figure 9 - Integration with other enablers

HDFS is the native storage for Hadoop, thus it is obvious that it is natively designed for reading and processing big data files located in this file system. There exist libraries in charge of this. Nevertheless, these Hadoop libraries can be extended in order many other file systems, even many other non-file system storages can be used as input for the MapReduce jobs.

Within Hadoop itself it is the case of MySQL and Oracle databases, that can be interfaced by MapReduce programs in order to retrieve records within tables. Apart from this, another interesting enrichment regarding alternative sources of data is the Swift patch for Hadoop made by Openstack's Sahara.

Behind this idea is the concept of input split and record reading. When Hadoop takes a source of data, it first try to compose a set of input splits, containing each one of them a set of records. These splits are not the real data but pointers to it. This splitting phase is also used by Hadoop for figuring out the number of mappers and reducers required to accomplish the analysis. Then, a record reader specialized in interpreting these splits and getting the real data split by split is created, in order the mappers access the data in a sequential, not ordered and distributed way.

Being said that, this kind of extension may be created for several other FIWARE GEs that sound reasonable as sources of non HDFS input data:

  • Object Storage GE.
  • Open Data Portal GE.

Of course, the same extensions allowing for inputting non HDFS data may allow for outputting data coming from analysis.

Another obvious source of data for the BigData Analysis GE is the information within the Publish Subscribe Broker. Nevertheless, in this case the integration with the BigData Analysis GE is not so obvious. This kind of enabler deals with real-time context data, specifically the last value regarding a certain context, thus if the history regarding a certain context is wanted to be analyzed then that information must be periodically queried (brokers) or notified (publish/subscribe tools) by/to an external subsystem.

Such a subsystem is not in charge of hosting the historic, and simply works as a connector between the brokers or publish/subscribe managers and the final large storage. The main candidate for such a final large storage is the permanent storage of the BigData Analysis GE.

General purpose composable analyzing blocks

Data processing usually comprise the same conceptual transformations (filtering, truncation, adding a key, map-only...) and computations (average, sum, minimum or maximum, reduce-only...). It has no sense wasting the time in building once and once these pieces of software, and it would be more useful having them available in a library of general purose analyzing blocks, allowing reusing them along the time by many users. In fact, a much more powerful feature regarding this library is the composable characteristic of the blocks, allowing creating really complex analysis.

A step through this direction has been taken by Apache Spark and its concept of Resilient Distributed Datasets, transformations and actions. Something similar can be added to Hadoop, distinguishing the following categories of blocks:

  • Transformations and computations. On the one hand there are transformation jobs, i.e. MapReduce jobs that apply on most of the data that can be managed in a data science project. For instance, there is a transformation job aboult filtering lines in the input data based on a regular expression; another example could be the job allowing for joining two datasets into a single one. On the other hand there are computation jobs in charge of returning a value which is function of the input data. Examples of this kind of operation could be retrieving the number of data lines, or performing some aggregation operation on the dataset, such as adding it (obviously, if the dataset contains numerical information). Output value can be appended to an output dataset, either as it is returned by the computation job, either by prepending a key value.
  • NGSI and non NGSI. Of special interest are NGSI specific jobs that are only valid for NGSI-like datasets. Usually, these jobs have their generic counterpart, but the specific format of NGSI-like data (a JSON document with several custom fields) makes very hard to obtain the desired transformation or computation. For instance, computing the average of a certain entity's attribute will require at least a map job on all the NGSI data lines in order to obtain the desired value, and then the average job itself; nevertheless, there exists a single NGSI job dealing with such a computation. Another example could be filtering by entity type, which is a transformation: by using the generic version, a complex regular expresion must be used not only covering the entity type value itself but also covering the search for the field name containing the entity type; the NGSI version only requires a regular expresion for the entity type.
  • Flat and keyed-valued. Finally, there are (transformation, NGSI or not) jobs that add keys as part of their functionallity, while there are other (transformation, NGSI or not) jobs not adding keys at all. There are also other (computation, NGSI or not) jobs allowing to add a custom key to the output they generate.

Big Data as Open Data

Open Data tools such as CKAN are able to store datasets of relatively large size and made them available to the users in an open way; or if they are not meant to be public, they can be acquired if offered through the open data platform. Anyway, those tools cannot host big data files since they usually count on their own server for storage purposes, typically by deploying a database or specific file system.

An HDFS-based permanent storage cluster like the one proposed in this GE could be used by these Open Data Portals and platforms in order to support real big data files, offering the same functionality in terms of making them public, or at least making possible to acquire these large datasets through the platform, as usual.

User environment

Of course, the user may interact with the different REST APIs and other interfaces exposed by the BigData GE through specific clients. Nevertheless, a more user-friendly environment may be exposed by means of a GUI or at least a CLI. Such component must encapsulate the following features of the enabler:

  • User account management: creation, deletion, storage userspaces, quota extensions, etc.
  • I/O operations on the permanent stored data, including creation, read, update and deletion (CRUD model).
  • Computing resources requesting and releasing, both in terms of amount of resources and how much time those resources may be used.
  • Composition of analysis jobs, by using a library of predefined building blocks.
  • Adding user-defined building blocks to the public library.
  • Computing resources usage, either simple or composed jobs.

Stream processing block

Components

Figure 10 - Stream processing architecture

We can say that the Stream processing component internally consists of three main architectural elements, namely its graphic interface, the Apache Storm technology and the API which is responsible for allowing communication between the two elements.

Below appears a detailed explanation of each of the components we can see in Figure 12 and which make up the Stream processing component:

Editor

This is the interface from which the topologies are created using the “Drag & Drop” method over the surface of the Canvas. Three different parts can be distinguished in the editor:

  • Tool area: This contains the different modules which the user has added to his or her toolbox and which can be incorporated into the topology by simply selecting the module in question and dragging it to the Canvas area.
  • Canvas: This is the work area upon which the topology is constructed, incorporating the different modules and allowing them to connect to each other. It is also in this area where the topology will be edited in its different versions.
  • Context information: Two main sections can be distinguished within this area: “Properties”, where a name is given to the topology and, optionally, a description of it is added, with the section of the “Minimap” showing an overview of the topology.

API

The API can be found in the Backend and allows us to automate the management of the Apache Storm cluster. Thus, the following actions are performed by means of this API:

  • Launching topologies.
  • Halting topologies.
  • Updating topologies.
  • Adding new modules.
  • Managing the dependencies of the Java and Python libraries which need the modules to be executed correctly.

Apache Storm Cluster

The Stream processing component uses a standard Apache Storm cluster, as a result of which the integration with an existing cluster, possible on many occasions as long as the versions are compatible, is simple.

Data model

One of the requirements when developing a collaborative system such as the Stream processing component is to ensure that the different modules which comprise it communicate in the same language so that they can understand each other without any cause for confusion. For this reason, the Stream processing component uses a data model consisting of a single data structure with different entries based on the information we are working on at each moment, instead of the data model used by Apache Storm consisting of the ability to use multiple data types. This approach allows users to share and reuse the modules within the platform and the API to manage the JSON in a simple and practical manner. It allows us to add new fields to the different modules, delete them or check their existence via the API.

We can define each of the modules existing in the Stream processing component as:

  • Spout: This collects the information from an external source. It typically reads from a queue system like Kestrel, RabbitMQ or Kafka, but a Spout can also generate its own data stream by reading from sources which have a Streaming API such as Twitter.
  • Bolt: This is responsible for processing the input data it receives from a Spout or another Bolt and adding new output data. Most computing logic is performed in the Bolts, such as functions, filters, data links, communication with databases, etc.
  • Drain: This processes all the data which reach it and sends them to an external service, usually to store information on a database, to represent the information on a Dashboard or just to send it to a Log system.

Moreover, a NoSQL database such as MongoDB is used for performing the user management and storing information concerning the status of the topologies.

Historic persistence block

Event and agent concepts

An event is a unit of data that flows through an agent. Internally, an event is composed of a payload carrying the data itself, and a set of headers or metadata about the data.

An agent is a process that moves data flows from a source to a destination. The event flows traverses the agent, from an input component to an output component; input and output are connected thanks to a queue in the most basic construct.

Basic agent architecture

The basic agent architecture adopts as many flows/constructs as persistence elements, i.e. one for HDFS, another one for MongoDB, etc.

For each one of this flows, a Http source able to understand the input data and translating it to events is used. Nevetheless, this basic approach requires each source receives its own event flow. This is not a problem if the architect clearly defines which flows must end in a HDFS storage, or in a CartoDB storage, for instance. But, what happens if the same event must be stored at HDFS and CartoDB at the same time? In this case, the constructs are modified in order all of them have the same Http source; then, the notified event is replicated for each queue connected to the source.

Regarding the queues, they can be implemented in memory, because they are fast. Nevertheless, if aiming a reliable queue, persistent queues must be used: JDBC databases, files, etc.

Finally, this is the list of sources and destinations suitable for being supported by this block:

  • Destination for a NGSI source:
    • HDFS.
    • MySQL.
    • CKAN.
    • MongoDB, including the Short-term historic persistence.
    • PostgreSQL.
    • CartoDB.
    • DynamoDB.
    • Kafka.
  • Destinations for a Twitter source:
    • HDFS.

Advanced architectures

All the advanced architectures arise when trying to improve the performance of the basic one. As seen above, basic architecture is about an input putting events into a single queue where a single output component consumes those events. This can be clearly moved to a multiple output component configuration running in parallel; there are several possibilities:

Multiple outputs, single queue

You can simply add more outputs consuming events from the same single queue. This configuration theoretically increases the processing capabilities in the output side, but usually shows an important drawback, especially if the events are consumed by the outputs very fast: the outputs have to compete for the single queue. Thus, sometimes you can find that adding more outputs in this way simply turns the system slower than a single output configuration. This configuration is only recommended when the outputs require a lot of time to process a single event, ensuring few collisions when accessing the queue.

Multiple outputs, multiple queues

The above mentioned drawback can be solved by configuring a queue per each output, avoiding the competition for the single queue.

Multiple outputs, multiple queues, queues in parallel

In order to achieve this, the default dispatching mechanism, i.e. creating a copy of the input event for being put in all the queues connected to the input, must be changed for a dispatching mechanism able to create a copy of the input event for each final destination, but then implementing a round-robin-like multiplexion of the event among all the queues related to the final destination.

Short-term historic querying block

Figure 11 - Short-term historic architecture

The architecture of the short-term historic component is fairly basic and is composed of a REST API in charge of retrieving data from the database (as previously mentioned we opted for a NoSQL database but other options are possible) as depicted in the following figure:

Regarding the architecture of the short-term historic, the most important information is related to the exposed APIs and the interactions between the short-term historic and external components, more concretely the Publish/Subscribe Context Broker and the historic persistence components. This information is detailed in the following references and can be used as the starting point to developing alternative but compatible implementations of the short-term historic component:

  • Retrieval of raw and aggregated time series context information.
  • Removal of raw and aggregated time series context information.

Main Interactions

Providing input data to the permanent storage

The permanent storage of the enabler provides interfaces for uploading user data that can be lately processed by the different computing tools. Since the usage of Hadoop is mandatory, and some other tools such as Spark are able to use data from Hadoop, HDFS seems to be a good candidate for implementing the permanent storage; nevertheless, other technologies could be used if the proposed write-like operations are supported. HDFS supports those operations and many other through WebHDFS, the native REST API from Hadoop for HDFS file system management. Other used technology should provide a REST API supporting the above mentioned methods as well.

Any REST API provides means for managing the data outside the cluster. Nevertheless, from a machine within the cluster it may be possible to perform the same operations above from the command line. If using HDFS, File System Shell is available.

Of course, any GUI or CLI wrapping the REST API methods may be used to upload big data to the permanent storage.

Retrieving output data from the permanent storage

This interaction adds the read-like operations both to the REST API, the command-line binary, the GUI or the CLI.

Requesting computing resources

Computing resources requests made by any using willing a dedicated part of the available infrastructure will be done through a REST API, the GUI or the CLI.

Releasing computing resources

The REST API, the GUI or the CLI will be used to release computing resources allocated upon user request.

Submitting and running analysis

Once computing resources have been requested and provisioned for being used, they can be used to analyze the data stored by the user in the permanent storage. In order to do that, it will be necessary to develop an analysis application in a client machine, suited for the requested Big Data technology (for instance, a MapReduce job if Hadoop was requested, or a RDD specification and a set of transformations and actions on the case of using Spark).

Once the application has been developed, it can be submitted to the computing environment. This step is achieved by copying the application to the computing services node.

Querying for data

Both Hadoop and Spark provide means of querying for data.

Hadoop does it through querying languages such as Hive Query Language or Pig Language. These languages are used by specific clients connecting to the servers running in the computing services node; nevertheless, it is possible to locally run CLIs from the services node in a command-line fashion.

Spark allows a pseudo real-time inspection on the data as it is being received. Of course, data within RDDs can be inspected by performing certain transformation and actions implementing the extraction of the desired data.

In the case Flink clusters are deployed, it also allows the real-time inspection of the received data.

Basic Design Principles

The basic design principles of the BigData Analysis GE are:

  • To hide the complexity behind the process of creating Big Data environments, where some software packages must be appropriately configured in order they work in a coordinated fashion.
  • To offer a wide set of processing and querying technologies for being installed in the environment. The GE exposes a catalogue, which can grow up in an easy and opened way.
  • To focus the efforts on extracting insights and value added information. To achieve that, there are components allowing for complex SQL-like queries design, reusing binaries and to compose processing sequences thanks to an easy and intuitive web-based interface.

These basic principles make the BigData Analysis GE shows the following differentiating features when compared with similar solutions:

  • Able to satisfy several Big Data technologies demand: Given the starting point of building a HaaS platform, the GE is designed to easily evolve into a SpaaS platform, for instance.
  • The GE has the capability to deploy and configure heterogeneous technologies: It is possible to create environments where different technologies, focused on solving complementary parts of the same problem, can coexist. This way, it is possible to combine the batch processing with the real time processing, the storage and the SQL-like querying. I.e. the Big Data GE allows creating data processing architectures.
  • Data lifecycle end-to-end view: The GE covers all the phases of the data lifecycle, such as data injection, batch processing, real time processing, persistent storage, insights consumption through an API or the possibility to access them by means of SQL queries.
  • The existence of a CLI/GUI homegenizing the way the user interact with the enabler: the user may forget about the details of the different interfaces, focusing on the data storage and processing.
  • Components reusing and analysis compositions: the capability for reusing and combining different pieces of code, scripts or queries by the GE users makes it an opened environment, allowing a greater potential innovation due to the quick sharing of components and quick experimentation.

Detailed Specifications

Depending on the block and the specific functionality desired for the block, several APIs arise:

Re-utilised Technologies/Specifications

As enumerated all along the document, candidate technologies to be re-utilised are:

Other technologies not mentioned in this specification, but very promising with regard to the cluster deployment process:

  • Openstack, for virtualized infrastructure deployment. In fact, this is the virtualization technology within the core of FIWARE.
  • Sahara, the Openstack component for automated cluster deployment of Hadoop (and Spark) clusters.

Regarding any specification, all the availabe REST APIs within the above tecnologies are candidate to be re-utilised, specially:

Other non REST APIs are:

  • Hadoop API, for programmatically dealing with MapReduce execution, HDFS I/O, etc.
  • JDBC, used by Hive server.
  • MongoDB drivers.

Terms and definitions

This section comprises a summary of terms and definitions introduced during the previous sections. It intends to establish a vocabulary that will be help to carry out discussions internally and with third parties (e.g., Use Case projects in the EU FP7 Future Internet PPP). For a summary of terms and definitions managed at overall FIWARE level, please refer to FIWARE Global Terms and Definitions

  • Data refers to information that is produced, generated, collected or observed that may be relevant for processing, carrying out further analysis and knowledge extraction. Data in FIWARE has associated a data type and avalue. FIWARE will support a set of built-in basic data types similar to those existing in most programming languages. Values linked to basic data types supported in FIWARE are referred as basic data values. As an example, basic data values like ‘2’, ‘7’ or ‘365’ belong to the integer basic data type.
  • A data element refers to data whose value is defined as consisting of a sequence of one or more <name, type, value> triplets referred as data element attributes, where the type and value of each attribute is either mapped to a basic data type and a basic data value or mapped to the data type and value of another data element.
  • Context in FIWARE is represented through context elements. A context element extends the concept of data element by associating an EntityId and EntityType to it, uniquely identifying the entity (which in turn may map to a group of entities) in the FIWARE system to which the context element information refers. In addition, there may be some attributes as well as meta-data associated to attributes that we may define as mandatory for context elements as compared to data elements. Context elements are typically created containing the value of attributes characterizing a given entity at a given moment. As an example, a context element may contain values of some of the attributes “last measured temperature”, “square meters” and “wall color” associated to a room in a building. Note that there might be many different context elements referring to the same entity in a system, each containing the value of a different set of attributes. This allows that different applications handle different context elements for the same entity, each containing only those attributes of that entity relevant to the corresponding application. It will also allow representing updates on set of attributes linked to a given entity: each of these updates can actually take the form of a context element and contain only the value of those attributes that have changed.
  • An event is an occurrence within a particular system or domain; it is something that has happened, or is contemplated as having happened in that domain. Events typically lead to creation of some data or context element describing or representing the events, thus allowing them to processed. As an example, a sensor device may be measuring the temperature and pressure of a given boiler, sending a context element every five minutes associated to that entity (the boiler) that includes the value of these to attributes (temperature and pressure). The creation and sending of the context element is an event, i.e., what has occurred. Since the data/context elements that are generated linked to an event are the way events get visible in a computing system, it is common to refer to these data/context elements simply as "events".
  • A data event refers to an event leading to creation of a data element.
  • A context event refers to an event leading to creation of a context element.
  • An event object is used to mean a programming entity that represents an event in a computing system [EPIA] like event-aware GEs. Event objects allow to perform operations on event, also known as event processing. Event objects are defined as a data element (or a context element) representing an event to which a number of standard event object properties (similar to a header) are associated internally. These standard event object properties support certain event processing functions.
Personal tools
Create a book