We use proprietary and third party's cookies to improve your experience and our services, identifying your Internet Browsing preferences on our website; develop analytic activities and display advertising based on your preferences. If you keep browsing, you accept its use. You can get more information on our Cookie Policy
Cookies Policy
FIWARE.OpenSpecification.Data.BigData R3 - FIWARE Forge Wiki

FIWARE.OpenSpecification.Data.BigData R3

From FIWARE Forge Wiki

Jump to: navigation, search
Name FIWARE.OpenSpecification.Data.BigData
Chapter Data/Context Management,
Catalogue-Link to Implementation BigData Analysis
Owner FI-WARE Telefonica I+D, Andreu Urruela/Grant Croker



Within this document you find a self-contained open specification of a FIWARE generic enabler, please consult as well the FIWARE Product Vision, the website on http://www.fiware.org and similar pages in order to understand the complete context of the FIWARE platform.

FIWARE WIKI editorial remark:
This page corresponds to Release 3 of FIWARE. The latest version associated to the latest Release is linked from FIWARE Architecture


Copyright © 2013 by Telefonica I+D

Legal Notice

Please check the following Legal Notice to understand the rights to use these specifications.


Big Data

Big Data makes reference to the processing of vast amounts of data, being this data either stored in advance (what is called Big Data Batch Processing), either received in real time (Big Data Streaming Processing). In both cases the goal is to get insights, revealing new information that was hidden in the original data. The result of this new data discovery is automatically added to the initial volume.

Latency is not important when processing batches, but the results must be ready in a reasonable time (hours or days). Nevertheless, the processing of real-time data requires the results to be ready inmediately, basically because this kind of data is not stored at all and the insights must be taken on the fly.

Big Data GE

The Big Data Generic Enabler (GE) provides computing services for the batch processing and almost real time processing in addition to a distributed storage service. The goal is to explore the real value of the big volumes of data.

The computing service of the Big Data GE allows creating and configuring a coordinated cluster of machines in a fast way by means a simple command line or REST API call. Additionally, the GE allows for selecting a subset from a wide range of processing and coordination pre-configured technologies (Hadoop, Hive, Oozie, HBase, Sqoop, PIG, etc.).

The storage service of the GE allows feeding the clusters with a huge variety of data without any special adapter or configuration. The technology used within this GE allows for high throughout due to data translations are not performed. To store data in the storage service of the GE is as easy as using a command line or REST API call.

Using any of the services (computing or storage) does not imply to use the other. This way, the data storage is independent from the data lifecycle associated to its processing.

Target audience

The focus of this GE is in the data (big data) storage and analysis, that is, developers will only have to deal with the problem they want to resolve without worrying about the parallelization/distribution or size/scalability of the problem. In the batch processing case, this means that the enabler should be able to scale with the size of the data-set and the complexity of the applied algorithms. On the other hand, in the stream mode, the enabler has to scale with both input rate and the size of the continuous updated analytics. Note that other GEs in FI-WARE are more focused on real-time response of a continuous stream of events not making emphasis in the big-data consideration.

Example scenario

Imagine you are receiving a high volume stream of data that contains, amongst other things, a customer reference number (IMSI), a terminal ID (IMEI) and the ID of the cell tower they are currently connected to (CellID). As each mobile terminal moves throughout an operators area of coverage the stream will contain new entries with the IMSI, IMEI and CellID as they change between cell towers. This data stream can be joined / matched with the actual location (latitude, longitude) of the cell tower to determine the approximate location of a given subscriber or terminal. This information is then stored in the GE, creating a profile for the subscribers that identifies where they live and work. This information can then be joined with an analysis of the movements of mobile phones that can help to determine the time at which traffic jams are likely to happen in each of the roads. These insights can be then further used to notify people who are traveling what is the best route between two points, depending on the time of the day and day of the week.

Basic concepts

As already said, two are the basic capabilities of any Big Data environment: computing and storage.


Computing capabilities (both processing power and available analysis technologies) are essential in every Big Data environment. On the one hand, the data volume is going to be large enough for having an efficient processing strategy (without being real time, the response time must be acceptable). On the other hand, we need efficacy when looking for insights (we need the better insights, not whatever ones). Efficiency is achieved by means of the MapReduce paradigm, while efficacy is given by the analytic tools and the jobs chaining orchestration.


MapReduce (MR) is a paradigm evolved from functional programming and applied to distributed systems. It was presented in 2004 by Google [ BDA1 ]. It is meant for processing problems which solution can be expressed in commutative and associative functions.

In essence, MR offers an abstraction for processing large datasets on a set of machines, configured in a cluster. With this abstraction, the platform can easily solve the synchronization problem, freeing the developer thus of thinking about that issue.

All data of these datasets is stored, processed and distributed in the form of key-value pairs, where both the key and the value can be of any data type.

From the field of functional programming, it is proved that any problem which solution can be expressed in terms of commutative and associative functions, can be expressed in two types of functions: map (named also map in the MR paradigm) and fold (named reduce in the MR paradigm). Any job can be expressed as a sequence of these functions. These functions have a restriction: they operate on some input data, and produce a result without side effects, i.e. without modifying neither the input data nor any global state. This restriction is the key point to allow an easy parallelization.

Given a list of elements, map takes as an argument a function f (that takes a single argument) and applies it to all elements in a list (the top part of the Figure BDA-1), returning a list or results. The second step, fold, accumulates a new result by iterating through the elements in the result list. It takes three parameters: a base value, a list, and a function, g. Typically, map and fold are used in combination. The output of one function is the input of the next one (as functional programming avoids state and mutable data, all the computation must progress by passing results from one function to the next one), and this type of functions can be cascaded until finishing the job.

In the map type of function, a user-specified computation is applied over all input records in a dataset. As the result depends only on the input data, the task can be split among any number of instances (the mappers), each of them working on a subset of the input data, and can be distributed among any number of machines. These operations occur in parallel. Every key-value pair in the input data is processed, and they can produce none, one or multiple key-value pairs, with the same or different information. They yield intermediate output that is then dumped to the reduce functions.

The reduce phase has the function to aggregate the results disseminated in the map phase. In order to do so in an efficient way, all the results from all the mappers are sorted by the key element of the key-value pair, and the operation is distributed among a number of instances (the reducers, also running in parallel among the available machines). The platform guarantees that all the key-value pairs with the same key are presented to the same reducer. This phase has so the possibility to aggregate the information emitted in the map phase.

The job to be processed can be divided in any number of implementations of these two-phase cycles.

The platform provides the framework to execute these operations distributed in parallel in a number of CPUs. The only point of synchronization is at the output of the map phase, were all the key-values must be available to be sorted and redistributed. This way, the developer has only to care about the implementation (according to the limitations of the paradigm) of the map and reduce functions, and the platform hides the complexity of data distribution and synchronization. Basically, the developer can access the combined resources (CPU, disk, memory) of the whole cluster, in a transparent way. The utility of the paradigm arises when dealing with big data problems, where a single machine has not enough memory to handle all the data, or its local disk would not be big and fast enough to cope with all the data.

This paradigm has had a number of different implementations: the already presented by Google, with a patent [ BDA2 ] , the open source project Apache Hadoop [ BDA3 ], that is the most prominent and widely used implementation, and a number of implementations of the same concept: Sector/Sphere [ BDA4 ][ BDA5 ] Microsoft has also developed a framework for parallel computing, Dryad [ BDA6 ], which is a superset of MapReduce.

These implementations have been developed to solve a number of problems (task scheduling, scalability, fault tolerance...). One such problem is how to ensure that every task will have the input data available as soon as it is needed, without making network and disk input/output the system bottleneck (a difficulty inherent in big-data problems).

Most of these implementations (Google, Hadoop, Sphere, Dryad...) rely on a distributed file-system [ BDA7 ][ BDA8 ] for data management.

Data files are split in large chunks (e.g. 64MB), and these chunks are stored and replicated to a number of data nodes. Tables keep track on how data files are split and where the replica for each chunk resides. When scheduling a task, the distributed file system can be queried to determine the node that has the required data to fulfill the task. The node that has the data (or one nearby) is selected to execute the operation, reducing network traffic.

The main problem of this model is the increased latency. Data can be distributed and processed in a very large number of machines, and synchronization is provided by the platform in a transparent way to the developer. But this ease of use has a price: no reduce operation can start until all the map operations have finished and their results are placed on the distributed file-system. These limitations increase the response time, and this response time limits the type of solutions where a “standard” MR solution can be applied when requiring time-critical responses.

Analytic tools

Despite of MapReduce is the core of batch processing, its interface/user experience is complex and only suitable for developers knowing very well the different APIs of the chosen implementation (Java, Python, etc). Arises then the need for a new kind of tools abstracting the complexity of the MapReduce APIs and exposing easier interfaces.

SQL-like tools

The more popular analytic tools aroud MapReduce are those based on SQL. This is a wide extended, high level and powerful database query language, and its adaptation to the Big Data world had to be a success. And that was the case with Hive [ BDA9 ], Pig [ BDA10 ], Shark [ BDA22 ], Impala [ BDA12 ] and may other analytic tools running on top of Hadoop.

The key of the sucess of these tools is, as said, to abstract the MapReduce paradigm, which is still working but in a transparent way. So, when the user decides to retrieve certain information, for instance a specific column of the dataset, the SQL query is automatically transformed into some predefined MapReduce jobs; in the proposed example, a Map function will iterate on all the raws in order to select the desired column, and the Reduce function joins all the individuals values in a unique and new column.

As can be inferred, the kind of datasets where these analytic tools have sense are those whose information is stored in a structured fashion, i.e. datasets whose content may be easily transformed into SQL tables.


There are many other alternatives to the SQL-like analytic tools. It is the case of the R adaptation for Hadoop [ BDA13 ]; the machine learning tools included in MAhout [ BDA14 ], a component of the Hadoop ecosystem; or the extensions allowing for programming in Python the MapReduce jobs [ BDA15 ].

These and other tools are out of the scope of the Big Data GE and will not be explained nor detailed.


No data analyst will be able to do a complex processing in a single step. Maybe the chosen tools do not provide all the required functionalities; and in the case the tool is suitable for performing all the analysis, this confronts all the good developing practices (modularity, reusing, etc.). But the main reason for not doing so is that the complex analysis are commonly based in a sequence of steps where several tools may be involved: the output of an analysis will be the input of another one, thus, until the first one does not finish, the second one will not be launched. The problem here is to monitor when a job finishes and its putput can be injected to the next job, and this is something cannot be done by simply waiting because it can last hours even days. It seems to be necessary an orchestration mechanism of processes. There are several alternatives for orchestration, mainly in widely used ecosystems such as Hadoop, where Oozie is the main asset [ BDA16 ].


If the computing capabilities allow for working with the data, the storage ones make possible the data can be available for the computing capabilities. For achieving that, obviously it is necessary to have a big amount of storage space, but it is also critical the data is stored in an efficient way in order the access time does not become traumatic. Finally, the storage capacilities do not have sense without data injection mechanisms.

Distributed storage systems

A distributed storage system is a network of computers where the information is stored in more than one node, in order each one of those nodes owns certain part of the original information. It is very common these distributed systems apply a replication factor as well, i.e. each part of the original data is store twice o more times in different nodes of the network. The goal is to ensure the data recoverabilty when a node fails.

The distributed storage systems are crucial when dealing with Big Data environments because the grant the required storage volume and the scalabity. Normally, a centralized process manages the system, providing a unique view of the total storage independently of the number of nodes within the network and the information contained in each node. This unique view offers services for creating, modifying and deleting files. This is possible because certain meta-information is managed: a list of distributed files, containing each file another list of data blocks and the location of each block.

In Big Data environments, the de facto standard is the Hadoop Distributed File System (HDFS) [ BDA8 ], the file system under the MapReduce engine of Hadoop. However, many other distributed storage systems are arising, compatible or not with Hadoop, such as Cassandra File System (CFS) [ BDA17 ] or Google File System [ BDA7 ].

NoSQL storage systems

Coined in the late 90's the term NoSQL represents database storage technologies that eschew relational database storage systems such as Oracle or MySQL. NoSQL emerged from a need to overcome the limitations of the relational model when working with large quantities of data, typically in unstructured form. Initially, as a reaction to these limitations, NoSQL was considered, as the name might be interpreted to be an opposition movement to using SQL based storage systems. However as it's seen that SQL and NoSQL systems often co-exist and complement each other the term "NoSQL" has morphed to mean "Not only SQL".

With a change in usage focus, new applications, in particular those for the web, are no longer read orientated rather they are tending to read/write if not write heavy. Traditional SQL based systems struggle with this when demand scales up often enough the underlying data store cannot do the same, without incurring downtime. These systems are based on the ACID ("Atomic, Consistent, Isolated, Durable") principle:

  • Atomic - either a transaction succeeds or not
  • Consistent - data needs to be in a consistent state
  • Isolated - one transaction cannot interfere with another
  • Durable - data persists once committed even after a restart or a power-loss

In systems that need to scale out it's not always possible to guarantee that the data being read is consistent or durable. For example when shopping during times of high demand, say Christmas, via the web for a particular item, it is more important that the web site remains responsive, so as not to dissuade customers, rather than the inventory count for every item is kept up to date. Over time item counts will get refreshed as more hardware is brought on stream to be able to cope with the demand.

NoSQL systems are designed around on Brewers CAP Theorem [ BDA27 ][ BDA28 ], that says if a distributed system wants Consistency, Availability and Partition Tolerance, it can only pick two. Rather than NoSQL striving for ACID compliance, NoSQL systems are said to aim for eventual consistency (BASE - Basic Availability, Soft and Eventual Consistency [ BDA29 ]). Such that over time the data within the system becomes consistent via consolidation, in the same way accountants close their books at the end of an accounting period to provide an accurate state of accounts.

The different types of NoSQL database are:

  • Column Store - Data storage is orientated to the column rather than the row as it is with traditional DBMS engines, favouring aggregate operations on columns. These kinds of stores are typically used in data warehousing. Example implementations are Hadoop HBase [ BDA18 ], Google BigTable [ BDA19 ] and Apache Cassandra [ BDA17 ] (which in addition, as already seen, is a whole file system).
  • Key Value Store - A schema-less storage system where data is stored in key-value pairs. Data is accessed via a hash table using the unique key. Example implementations are Dynamo [ BDA23 ] and Redis [ BDA20 ].
  • Document Store - Similar to Key Value storage, document storage works with semi-structured data that contain a collection of key-value pairs. Unlike key-value storage these documents can contain child elements that store relevant knowledge to that particular document. Unlike in traditional DBMS engines, document orientated storage does not require that every document contain all the fields if no information is there for that particular document. Example implementations are CounchDB [ BDA24 ] and MongoDB [ BDA25 ].
  • Graph Database - Using a graph structure, data about an entity is stored within a node, relationships between the nodes are defined in the edges that interconnect the nodes. This allows for lookups which utilize associative datasets as the information that relates to any given node is already present, eliminating the need to perform any joins. An example implementation is neo4j [ BDA26 ].

Data lifecycle

The data to be analyzed in Big Data environments may be of very different nature, but as already commented, they can always be classified into two groups: batch and streaming. The first ones are big collections of data that must be stored in advance to the analysis, while the second kind of data is analyzed on the fly, in real or almost real time.

When dealing with one type of data or the other this difference leads to adopt different ways for feeding the Big Data environment. Thus, in some cases will be necessary to allow input ports for dynamic flows of data, while in other cases the copying historics from the source to the distributed storage will be enough. In any case, this phase is called Data Injection phase, and amont other things, it may imply the normalization, anonymization, encryption, filtering and compression of the data.

Next is the Data Ingestion Phase. Within this phase it must be ensured the necessary capability to absorb the volume of data is available. Then, and only then, the data copying (if it has sense) can be performed, either in raw of a predefined format. Sometimes, as occurs with the flows or events, it may be required an analytic study in the phase, due to such data in not usually stored.

After the data ingestion arises the Processing Phase, involving a unique analysis or an orchestrated chain of several ones. Certain elasticity must be granted if the computing capabilities increase during the processing task, and the data protection (privacy, integrity, etc.) must be monitored as well.

Finally, the results are consumed in the last phase. This means the output data becomes new input data for new other analysis, visualization tools, query systems, etc. The results may be stored in a different external repository in order to ensure their perdurability and allow for better access times.


Reference architecture: Apache Hadoop

As will be seen in the next section, the Big Data GE architecture expands the basic architecture of Apache Hadoop. Thus it is necessary to briefly explain it before.

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

Three are the pillars of Hadoop:

  • Hadoop Common, which provides access to the supported file systems (HDFS and many others).
  • The engine for parallel processing of vast amounts of data (Hadoop MapReduce), based on a proprietary resource manager and job scheduler (Hadoop YARN).
  • Hadoop Distributed File System (HDFS), a distributed file system assuring high througput when accessing the data.

Architecturally speaking, a distributed file system is on the bottom of the software stack. Please observe several compatible distributed file systems may be used, e.g. Amazon S3, CloudStore or FTP FileSystem, but here only HDFS will be explained. HDFS supports big data files by spliting them and distributing their blocks among all the Datanodes of the cluster. A special node called the Namenode is responsible for storing the indexes for each distributed file and, in general, for file system management (naming, deletions, creations, etc.).

Over the HDFS is the MapReduce layer, mainly a set of Tasktracker nodes in charge of performing map and/or reduce tasks over the data stored in the HDFS, being a special node called the Jobtracker who decides which task and which data is assigned to each Tasktracker. The list of tasks to be collaboratively accomplished is extracted from a Java JAR file, built by the analyst using the Hadoop API [ BDA36 ] and containing the desired MapReduce job in terms of Java sentences invocating the Hadoop development API. This Java JAR is also replicated several times in the HDFS by the Jobtracker in order it is highly available for the final interpreters of that Java code, the Tasktrackers. Finally, a workstation or JobClient node is used to initially upload the JAR file to the HDFS and invoke the Hadoop execution.

The above architecture may change depending on the performance requirements and/or the dimensions of the available cluster. If the cluster is very limited, it may be necessary to merge two or more nodes in a single one, which penalizes the CPU performance, but improves the transactions at the same time. Therefore, it is usual to find both Tasktraker and Datanode coexisting in the same node, or having the Jobtraker running in the same machine than the Namenode. If the cluster is very limited, it is also possible to have a node acting both as Jobtraker, Namenode and (limitedly) Tasktracker and Datanode.

Once a job is finished, some resulting insights are generated. The HDFS may store these results together to the input data, but it is not recommended due to bad access response times, and a NoSQL database is suggested for these purposes (see below).

Big Data GE Architecture

The Big Data GE is based on a Master Node executing management sofware and acting as a frontend for the final users. Usually, these users will be applications that, using the administration API, will request the creation of storage and/or computing clusters.

On the one hand, the creation of a cluster implies the creation of a Head Node, in charge of the storage management (it does not store anything), the computing management (it does not compute anything) and other specific services (analysis tools, data injectors...). On the other hand, one or more Slave Nodes are created as well, being their purpose the execution of analysis tasks and real storage of the data.


File:cosmos_architecture.png|700px|thumb|center|Figure BDA-1 - Big Data GE architecture]]

As can be seen in the above figure, both the Head Node and the Slave Nodes imitate the Namenode and Jobtracker combination, and the Datanode and Tasktracker combination, respectively. Indeed, those Hadoop services can be used for configuring the Big Data GE Head and Slave nodes, but many other solutions could be used as well:

  • Cassandra File System (CFS) [ BDA17 ], resulting in a configuration where the Hadoop MapReduce engine continues working on top of CFS.
  • To install a NoSQL distributed database such as HBase [ BDA18 ](key-pair schema) on top of HDFS is another option.
  • MapReduce can be also substitud/improved by using top level analytic tools such as Cascading [ BDA21 ], or in-memory analysis systems following the MapReduce paradigm but providing almost real-time results.
  • On top of everything, as previously said, a bunch of querying tools is available for the user, e.g. Hive [ BDA9 ], Impala [ BDA12 ] or Shark [ BDA22 ].

The combination of modules from the Hadoop ecosystem that can be installed in the Big Data GE are multiple, and they will depend on the needs the user has. The following figure tries to depict all the software stack that could be installed in the GE:

Figure BDA-2 - Big Data GE software stack

Next sections will go into the details about each one of the nodes, their functionality and the components implementing them. The pre-defined cluster for persistent storage will also be shown.

User environment

The user environment is about the application or set of application in charge of interacting with the Master Node regarding the creation of a Big Data cluster, the MapReduce jobs launching, etc. These application may be legacy (e.g. some REST manager), proprietary (using any of the REST libraries) or predefined such as the CLI and the Studio.


It is an easy command-line-based application that hides the details of the adminsitration REST API managed by the Master Node. Thus, through intuitive command such as "create a cluster", "upload this file", etc. the user is able to interact with the whole platform.

Since the CLI only interacts with the administration REST API, it is not able to execute analysis of any type. Therefore, once the cluster has been created, it is necessary to use the appropriate interfaces exposed by the analysis tool itself (the "hadoop" command if trying to do MapREduce, SQL-like queries if dealing with Hive, etc.).


The Studio is a Graphical User Interface allowing for workflow definitions, specifying in each step of the workflow whcih kind of analysis must be done (MapReduce job, SQL query, Python scripts, proprietary code...) and how the outputs of the different analysis steps are connected to the inputs of new analysis steps. Once the workflow is defined, it is executed in a cluster specifically created for it.

Typically, the orchestration of jobs is done through Oozie [ BDA16 ], and this component also runs under the Studio. The graphical workflows are translated into Oozie workflows, and once an Oozie server has been installed in the cluster, these configurations are loaded by suing the Oozie APIs.

Nevertheless, a component like this one can also perform other interesting functionalities, such as:

  • Host an historical log of workflow executions.
  • Host a collection of reusable software.
  • Work as an audit platform, due the behaviour of any workflow remains the same all along the time.

Master node

The master Node acts as the endpoint of the Big Data platfom. It is the node the user will go in order to create storage clusters and/or computing clusters.

Administration API

The administration API exposes all the operations it is possible to perform in the platform, these ones:

  • Cluster creation, parameterized by the cluster identifier, the size of the cluster and the Hadoop ecosystem services that must be installed.
  • Cluster deleteion, given its identifier.
  • Get the cluster details, given its identifier.
  • List all the available services that can be installed in a cluster.
  • Get the details regarding the Head Node of a cluster.
  • Get your user profile details.
  • Modify your user profile details.

Service Manager

It interprets all the operation of the administration API related to the creation and deletion of clusters. Depending on the operation, the Insfrastructure Manager allocates/frees storage and/or computing capabilities, and the software is deployed/undeployed as well.

Infrastructure Abstraction Layer

This component independizes the requests done by the Service Manager to the different Infrastructure Managers (the platform may be able to deploy clusters on top of many infrastructure types).


Ambari [ BDA31 ] is used to make Hadoop management (provisioning, managing, and monitoring clusters) simpler.

Studio server

This is the server side of the Studio described above.

Head node

The Head Node hosts all the services offered by the cluster, both the services exposed to the users and the services exposed for internal purposes.

Storage (management)

Regarding storage, the Head Node exposes an internal service nased on the Namenode from Hadoop. The Datanodes, whcih run in the slave nodes (see next sections), will connect with this service in order to receive instructions about I/O. On top of the Namenode service may run, if configured, the HBase service. HBase, conceptually, is a Big Table containing a great volume on key-value paors, but internally is based on the Hadoop distributed file system (HDFS).

An alternative to HDFS is Cassandra. Nevertheless, Cassandra still needs the Namenode service from Hadoop.

Data injection


WebHDFS [ BDA34 ] is the native REST API from Hadoop, and it is exposed for HDFS management purposes. It works by receiving I/O requests which are managed by the HDFS Namenode, and this service returns a redirection pointing to the especific Datanode where the data can be finally read or write.


It exists an alternative to WebHDFS, HttpFS [ BDA35 ], which implements the same API but whose behaviour is a bit different.

When using WebHDFS, both the node running the Namenode (Head Node) and each node running the Datanode (Slave Nodes) must have a public IP address (or their FQDN must be resovled into a public IP address). This is beacuse both the Namenode and the Datanodes (pointed by the redirections) must be reachables. This issue implies the usage of a huge number of public IP addresses that is not necessary: HttpFS acts as a gateway, and instead redirecting to the Slave Nodes, redirects to the Head Node itself (which in the end internally performs the desired operation with the Datanode).

Processing (management)

The tracking of the batch analysis is done through the Jobtracker service from Hadoop. Theslave nodes, running the Tasktracker service from Hadoop as well, will connect to this Head Node in order to receive instructions about which Map tasks and/or Reduce tasks (the same node can run both types of tasks) must be executed.

Cascading, a framework for the development, execution and synchronization of MapReduce jobs in Java, can be also installed in the Head Node. The libraries from Cascading highly simplify the Hadoop API. The processing is performed in almost real time thanks to Spark (smilar to Hadoop, but memory-based). Anyway, Spark is able to access the HDFS data for processing purposes.

Analytic tools

The Big Data GE provides SQL-like tools for analysis purposes. Examples are Hive, Impala, Shark. The envioned used tools are Hive and Pig.


The orchestration in the Big Data GE is done through Oozie. The Head Node runs this service, which is accessible by means of a REST API.

Slave nodes


The slave nodes run the Datanode service from Hadoop, needed to build the HDFS of the GE.


The slave nodes run the Tasktracker service from Hadoop, needed in the MapReduce processing.

Cluster for persistent storage (Infinity)

In addition to the processing clusters, whcih are created and destroyed on demand by the users, the platform has a default cluster used for persistent storage. It is available for all the users of the GE, even those not desiring to create processing clusters, and the data injection tools are the already described.

The goal of this cluster is, as said, to offer a persistent storage service, due to all the processing clusters die in the end. The user is responsible for deciding which data is stored in the persistent storage, and viceversa, which persistently stored data is going to be used by the processing cluster. Anyway, the data stored in this special cluster are protected from other users.

This persistent storage does not provide any processing capabilities.

Main Interactions

Providing input data to Infinity

Infinity is the permanent storage cluster of the GE, based on HDFS. As already explained, this is because the computing clusters have a clear lifecycle: they are created, used for certain computations and finally they are destroyed. Thus all the data to be analyzed must be initially uploaded to Infinity and used from that location.

They way the data is uploaded is done through:

  • WebHDFS [ BDA30 ], the native REST API from Hadoop for HDFS filesystem management.
  • Cosmos CLI, a piece of software intended to wrap a WebHDFS client and exposing a "command line" style interface to the user.

In previous releases of Cosmos there was the possibility to upload data by using a HDFS-based SFTP server, but this is a deprecated option.

Retrieving output data from Infinity

The same than in the above case applies when retrieving data from Infinity. Both WebHDFS and the CLI can be used for such purposes.

Again, the usage of the HDFS-based SFTP server is deprecated.

Creating a computing cluster

The creation of computing clusters will be achieved by using:

  • The administration API of the GE, which will provide in a REST API style a specific operation for this purpose.
  • The CLI, which will wrap a client using the above REST operation and exposing a "command line" style interface to the user.

The creation of a computing client will imply:

  • The creation of a Head node containing a private Namenode, Jobtracker and private services for Oozie, Hive and Pig, in addition to a WebHDFS server for the private cluster.
  • The creation of a certain number of Slave nodes, containing each one of them private Datanodes and Tasktrackers.

Each private computing cluster will have access to the user-related space in Infinity's HDFS.

Uploading and running MapReduce jobs

Once a private cluster has been created, it can be used to analyze the data stored by the user in Infinity. In order to do that, it will be necessary to develop a MapReduce application in a client machine, following the guidelines described at the User and Programmer Guide of this GE.

Once the MapReduce application has been developed, it can be uploaded to the private cluster. This step is achieved by copying the application to the Head node, and there it must be copied to the private HDFS by using standard Hadoop copying commands. MapReduce applications are run by commanding Hadoop from a shell opened in the Head node of the private cluster.

In order to do the uploading and running of the applications it is necesarry the user logs into the Head node. The running part can be avoided if Oozie is used. Oozie is an analysis tool designed for scheduling MapReduce applications, Hive and Pig tasks, and shell scripts.

Querying for data (Hive or Pig)

Data is queryed from the client side by using Hive Query Language [ BDA32 ] or Pig Language [ BDA33 ]. These languages are talked by specific clients connecting to the servers running in the private cluster (within the Head node). Hive or Pig are SQL-like languages which implement pre-defined MapReduce tasks in order to select, count, limit, order, etc. the data. These MapReduce tasks run in the private cluster as any other MapReduce job does, and the data is accessed from Infinity (permanent storage cluser) as any other MapReduce job does; for this purpose, Infinity must allow the creation of Hive and Pig tables.

Basic Design Principles

The basic desing principles of the Big Data GE are:

  • To hide the complexy behind the process of creating Big Data environments, where some software packages must be appropriately configured in order they work in a coordinated fashion.
  • To offer a wide set of processing and querying technologies for being installed in the environment. The GE exposes a catalogue, which can grow up in an easy and opened way.
  • To focus the efforts on extracting insights and value added information. To achieve that, there are components allowing for complex SQL-like queries design, reusing binaries and to compose processing sequences thanks to an easy and intuitive web-based interface.

These basic principles make the Big Data GE shows the following differentiating features when compared with similar solutions:

  • Independency of the resource provision technology: The GE is independent of the chosen infrastructure management system, i.e. the environments may be created in the cloud or directly on bare metal.
  • The Studio allows for components reusing and analysis compositions: the capability for reusing and combine different pieces of code, scripts or queries the different users of the GE create mades it becomes an opened environment, allowing a greater potential innovation due to the quick sharing of components and quick experimentation.
  • The GE has the capability to deploy and configure heterogeneous technologies: it is possible to create environments where different technologies, focused on solving complementary parts of the same problem, can coexist. This way, it is possible to combine the batch processing with the real time processing, the storage and the SQL-like querying. I.e. the Big Data GE allows creating data processing architectures.
  • Data lifecycle end-to-end view: The GE covers all the phases of the data lifecycle, such as data injection, alarm programming, batch processing, real time processing, persistent storage, insights consumption through an API or the possibility to access them by means of SQL queries.


BDA1 Google's MapReduce
BDA2 MapReduce US patent by Google
BDA3 Apache Hadoop
BDA4 Sector
BDA5 Sphere
BDA6 Dryad
BDA7 Google File System
BDA8 Apache HDFS
BDA9 Apache Hive
BDA10 Apache Pig
BDA11 Apache Spark
BDA12 Impala
BDA13 Enabling R on Hadoop
BDA14 Apache Mahout
BDA15 Dumbo
BDA16 Apache Oozie
BDA17 Apache Cassandra
BDA18 Apache HBase
BDA19 Google Big Table
BDA20 Redis
BDA21 Cascading
BDA22 Shark
BDA23 Amazon Dynamo
BDA24 Apache CouchDB
BDA25 MongoDB
BDA26 Neo4j
BDA27 CAP Theorem
BDA28 Brewer’s Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services
BDA29 Eventually Consistent - Dr. Werner Vogels
BDA31 Apache Ambari
BDA32 HiveQL
BDA33 Pig Latin
BDA35 HttpFS
BDA36 Hadoop API

Detailed Specifications

Following is a list of Open Specifications linked to this Generic Enabler. Specifications labeled as "PRELIMINARY" are considered stable but subject to minor changes derived from lessons learned during last interactions of the development of a first reference implementation planned for the current Major Release of FI-WARE. Specifications labeled as "DRAFT" are planned for future Major Releases of FI-WARE but they are provided for the sake of future users.

Open API Specifications

Re-utilised Technologies/Specifications

FIWARE WIKI editorial remark:
TBD: It will provide sufficient information on and references to the technologies being (re-)utilised - e.g. if you base your GE descriptions on particular APIs.

Terms and definitions

This section comprises a summary of terms and definitions introduced during the previous sections. It intends to establish a vocabulary that will be help to carry out discussions internally and with third parties (e.g., Use Case projects in the EU FP7 Future Internet PPP). For a summary of terms and definitions managed at overall FI-WARE level, please refer to FIWARE Global Terms and Definitions

  • Data refers to information that is produced, generated, collected or observed that may be relevant for processing, carrying out further analysis and knowledge extraction. Data in FIWARE has associated a data type and avalue. FIWARE will support a set of built-in basic data types similar to those existing in most programming languages. Values linked to basic data types supported in FIWARE are referred as basic data values. As an example, basic data values like ‘2’, ‘7’ or ‘365’ belong to the integer basic data type.
  • A data element refers to data whose value is defined as consisting of a sequence of one or more <name, type, value> triplets referred as data element attributes, where the type and value of each attribute is either mapped to a basic data type and a basic data value or mapped to the data type and value of another data element.
  • Context in FIWARE is represented through context elements. A context element extends the concept of data element by associating an EntityId and EntityType to it, uniquely identifying the entity (which in turn may map to a group of entities) in the FIWARE system to which the context element information refers. In addition, there may be some attributes as well as meta-data associated to attributes that we may define as mandatory for context elements as compared to data elements. Context elements are typically created containing the value of attributes characterizing a given entity at a given moment. As an example, a context element may contain values of some of the attributes “last measured temperature”, “square meters” and “wall color” associated to a room in a building. Note that there might be many different context elements referring to the same entity in a system, each containing the value of a different set of attributes. This allows that different applications handle different context elements for the same entity, each containing only those attributes of that entity relevant to the corresponding application. It will also allow representing updates on set of attributes linked to a given entity: each of these updates can actually take the form of a context element and contain only the value of those attributes that have changed.
  • An event is an occurrence within a particular system or domain; it is something that has happened, or is contemplated as having happened in that domain. Events typically lead to creation of some data or context element describing or representing the events, thus allowing them to processed. As an example, a sensor device may be measuring the temperature and pressure of a given boiler, sending a context element every five minutes associated to that entity (the boiler) that includes the value of these to attributes (temperature and pressure). The creation and sending of the context element is an event, i.e., what has occurred. Since the data/context elements that are generated linked to an event are the way events get visible in a computing system, it is common to refer to these data/context elements simply as "events".
  • A data event refers to an event leading to creation of a data element.
  • A context event refers to an event leading to creation of a context element.
  • An event object is used to mean a programming entity that represents an event in a computing system [EPIA] like event-aware GEs. Event objects allow to perform operations on event, also known as event processing. Event objects are defined as a data element (or a context element) representing an event to which a number of standard event object properties (similar to a header) are associated internally. These standard event object properties support certain event processing functions.
Personal tools
Create a book