Skip to main content

IoT cloud platform with EnMasse and Apache Spark using Red Hat OpenShift

The server uses a computer with OpenShift to deploy EnMasse and Apache Spark. The sources of the project are available on GitHub:


OpenShift is a computer software product from Red Hat for container-based software deployment and management. It is a supported distribution of Kubernetes using Docker containers and DevOps tools for accelerated application development. OpenShift may be executed locally by running a single-node OpenShift cluster inside a VM using minishift but it requires a hypervisor to start the virtual machine on which the OpenShift cluster is provisioned. The full installation documentation may be found here:
You need at least 6GB of RAM for your minishift instance since we're running both EnMasse and Spark on a local OpenShift cluster.

minishift start --cpus 2 --memory 6144

Once this command completes, the OpenShift cluster should be ready to use.


EnMasse is an open source messaging platform, with focus on scalability and performance. EnMasse can run on your own infrastructure or in the cloud, and simplifies the deployment of messaging infrastructure and promotes open standards like AMQP and MQTT etc. and aims to provide support for other protocols as well. To deploy EnMasse to OpenShift download the latest release and unpack:

tar xvf enmasse-0.13.2.tgz

The relase bundle contains OpenShift templates as well as a deployment script for deploying EnMasse.

Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing. To deploy Spark to OpenShift you may use Oshinko a project that covers several individual applications which all focus on the goal of deploying and managing Apache Spark clusters on Red Hat OpenShift and OpenShift Origin. You can find the full document installation here: First, install all the Oshinko resources into your project:

oc create -f

Second, start the Oshinko Web UI application:

oc new-app --template=oshinko-webui

To process the temperature message from the gateway i create a JavaReceiverInputDStream using AMQPUtils.createStream:

JavaReceiverInputDStream<DeviceTemperature> receiveStream =
  AMQPUtils.createStream(ssc, host, port,
    Option.apply(username), Option.apply(password), temperatureAddress,
    message -> {

     Section section = message.getBody();
     if (section instanceof AmqpValue) {
      Object value = ((AmqpValue) section).getValue();
      DeviceTemperature deviceTemperature =
      return new Some<>(deviceTemperature);
     } else if (section instanceof Data) {
      Binary data = ((Data)section).getValue();
      DeviceTemperature deviceTemperature =
        DeviceTemperature.fromJson(new String(data.getArray(), "UTF-8"));
      return new Some<>(deviceTemperature);
     } else {
      return null;

    }, StorageLevel.MEMORY_ONLY());

To reduce the stream i evaluate the avarage temperature on a time windows:

// reducing the pair stream by key (device-id) for getting average temperature value
JavaPairDStream<String, Integer> avgTemperaturesByDevice = temperaturesByDevice
.mapValues((value) -> { return new Integer[] {value, 1}; })
.reduceByKeyAndWindow((x, y) -> { return new Integer[] {x[0] + y[0], x[1] + y[1]}; }, new Duration(5000), new Duration(5000))
.mapValues((z) -> { return z[0] / z[1]; });

To make temperature forecast i use a linear regression model with 7 features: previous boiler temperature, previous weather temperature, previous weather humidity, previous weather wind speed, current weather temperature, current weather humidity, and current weather wind speed.

model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(7));

When data arrive in a streaming fashion, it is useful to fit regression models online, updating the parameters of the model as new data arrives. spark.mllib currently supports streaming linear regression using ordinary least squares. The fitting is similar to that performed offline, except fitting occurs on each batch of data, so that the model continually updates to reflect the data from the stream.

JavaDStream<LabeledPoint> trainingData = -> {
 CurrentWeather cwd = owm.currentWeatherByCityId(owmCityId);

 return new LabeledPoint((double )x._2, Vectors.dense(new double[] {
  cwd.getWindData().getSpeed() }));

// register the streams for training

Then you may use the model to predict the temperature:

JavaDStream<Vector> testData = -> {
 CurrentWeather cwd = owm.currentWeatherByCityId(owmCityId);
 HourlyWeatherForecast hwf = owm.hourlyWeatherForecastByCityId(owmCityId);
 return Vectors.dense(new double[] { x._2,
  hwf.getDataList().get(0).getWindData().getSpeed() });

JavaDStream<Double> forecastTemperatures = model.predictOn(testData);


Popular posts from this blog

IoT temperature sensor with ESP8266 using LWM2M (Eclipse Wakaama)

The sensor uses an ESP8266 board with the protocol Lightweight M2M to communicate the temperature read by DS18B20 to the gateway. The sources of the project are available on GitHub: ESP8266 The ESP8266 is a low-cost Wi-Fi microchip with full TCP/IP stack and microcontroller capability produced by Espressif Systems. The avalability of same SDK allows the chip to be programmed, removing the need for a separate microcontroller. To program the microchip i use the SDK Arduino core for ESP8266 WiFi chip  and  PlatformIO , that is an open source ecosystem for IoT development. LwM2M Lightweight M2M is a protocol from the Open Mobile Alliance for M2M or IoT device management and communication. It uses CoAP, a light and compact protocol with an efficient resource data model, for the application layer communication between LWM2M Servers and LWM2M Clients. Each service on a constrained device/sensor/actor is

Smart solar water heating solution to take part in the Open IoT Challenge 4.0!

Smart solar water heating solution uses IoT and AI technologies to improve the efficiency of passive solar water heating system. This solution is designed to take part in the Open IoT Challenge 4.0! Solar water heating Solar water heating (SWH) is the conversion of sunlight into heat for water heating using a solar thermal collector. Solar water heating systems include storage tanks and solar collectors. There are two types of solar water heating systems: active, which have circulating pumps and controls, and passive, which don't. Passive solar water heating systems are typically less expensive than active systems, but they're usually not as efficient. However, passive systems can be more reliable and may last longer. The solar panels heat the water in the storage tank when they are affected by the sun rays. The water reaches the highest temperature when the sun is stronger. If the water in the solar panel is hot the divert valve close the flow towards the boiler s

Connect wan automatically at boot and after a disconnection on OpenWrt

OpenWrt is an open source project for embedded operating system based on Linux, primarily used on embedded devices to route network traffic. I install OpenWrt on my Netgear DGN3500, an ADSL2+ gateway with wireless acccess point integrated. Finally the wifi signal is strong but the internet connection does not go up at boot or after a disconnection. Found solutions I find the following solutions: to schedule the reboot and the reconnections by cron ( ), to write a script to reconnect after a disconnection ( ) or to insert the command "ifup wan" in the file "/etc/init.d/network". Recomended solution To take all the advantages of the previous solutions i write the script "wanup" to connect the wan and i call its at boot and after the disconnection. To call the script "wanup" at boot yo