Towards Lab IoT Part Two

In order to test communication we'll set up two clients, each representing an instrument, along with a server which acts as the MQTT broker, database and visualization. The three machines are all running ubuntu under VirtualBox.
In this case client1 sends measurement1 data which resets to a random value between 20 and 50 when it receives a signal from trigger. client2 sends measurement2 data which resets to 20 when it receives a signal from trigger. The controller monitors measurement1 and signals the trigger when measurement1 is above 50. The controller also updates influxDB with data from measurement1 and measurement2. client1 and client2 represent the two clients, running instruments and the server is controller. Codes for each are below.

controller (virtual machine 1 - ip address = 10.0.2.15)

 1## controller
 2## 
 3## this is the controller code that runs in the background
 4## it subscribes to measurement1 and when that value
 5## reaches the threshold of 50 it publishes a trigger
 6## event that client 1 and client 2 both respond to
 7##
 8## It also doubles up as the subscriber for the influxDB
 9## database which is code that can be run in a separate
10## thread or program
11## 
12## publish: trigger1
13## subscribe: measurement1
14## response: if measurement1 > 50 then publish trigger 
15 
16import paho.mqtt.client as mqtt
17from influxdb import InfluxDBClient
18import datetime
19import logging
20import socket
21import time
22 
23MQTT_BROKER = '10.0.2.15'
24HOSTNAME = socket.gethostname()
25IPADDR = socket.gethostbyname(HOSTNAME)
26INSTRUMENT_ID = 'controller'
27INFLUX_CLIENT = '10.0.2.15'
28INFLUX_DATABASE = 'lab1'
29 
30# influxDB
31influx_client = InfluxDBClient(INFLUX_CLIENT, database = INFLUX_DATABASE)
32 
33## define logging file
34FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
35logging.basicConfig(filename = '/home/harvey/controller.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
36d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
37 
38# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
39def on_connect(client, userdata, flags, rc):
40    logging.info("Connection returned with result code:" + str(rc), extra = d)
41 
42# Define the callback to hande publish from broker
43def on_message(client, userdata, msg):
44    # log the response
45    logging.info("Received message, topic:" + msg.topic + "  payload:" + str(msg.payload), extra = d)
46    
47    # push to influxDB
48    current_time = datetime.datetime.utcnow().isoformat()
49    topic = msg.topic.split('/')
50    measurement_type = topic[2]
51    json_body = [
52        {
53            "measurement": measurement_type,
54            "tags": {},
55            "time": current_time,
56            "fields": {
57                "value": int(msg.payload)
58            }
59        }
60    ]
61    influx_client.write_points(json_body)
62 
63    # fire the trigger if measurement1 > 50
64    if (msg.topic == 'sensor/expt1/measurement1'):
65        if (int(msg.payload) > 50):
66            client.publish("sensor/trigger/client1", payload = str(msg.payload))
67 
68# Callback handles disconnection, log the rc value
69def on_disconnect(client, userdata, rc):
70    logging.info("Disconnection returned with result code:" + str(rc), extra = d)
71 
72def main():
73 
74    # Create an instance of `Client`
75    client = mqtt.Client()
76    client.on_connect = on_connect
77    client.on_disconnect= on_disconnect
78    client.on_message = on_message
79 
80    # Connect to broker
81    client.connect(MQTT_BROKER, 1883, 60)
82 
83    # Subscribe to all topics and start the loop
84    client.subscribe("sensor/+/+", 0)
85    client.loop_forever()
86 
87if __name__ == '__main__':
88    logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
89    main()

client1 (virtual machine 2 - ip address = 10.0.2.4)

 1## client 1 communications
 2## 
 3## this is an MQTT client code that runs in the background
 4## of client 1
 5## publish: measurement1
 6## subscribe: trigger1
 7## response: if trigger1 then reset measurement1 to a 
 8## random value between 20 and 50
 9 
10import paho.mqtt.client as mqtt
11import logging
12import socket
13import random
14import time
15 
16MQTT_BROKER = '10.0.2.15'
17HOSTNAME = socket.gethostname()
18IPADDR = socket.gethostbyname(HOSTNAME)
19INSTRUMENT_ID = 'machine 1'
20 
21## define logging file
22FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
23logging.basicConfig(filename = '/home/harvey/client1.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
24d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
25 
26# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
27def on_connect(client, userdata, flags, rc):
28    logging.info("Connection returned with result code:" + str(rc), extra = d)
29 
30# Define the callback to hande publish from broker
31def on_message(client, userdata, msg):
32    global measurement
33    logging.info("Received message, topic:" + msg.topic + "  payload:" + str(msg.payload), extra = d)
34    if (msg.topic == 'sensor/trigger/client1'):
35        measurement = random.randrange(20, 50)
36        logging.info("Trigger 1: measurement reset to " + str(measurement), extra = d)
37 
38# Callback handles disconnection, log the rc value
39def on_disconnect(client, userdata, rc):
40    logging.info("Disconnection returned with result code:" + str(rc), extra = d)
41 
42def main():
43 
44    # Create an instance of `Client`
45    client = mqtt.Client()
46    client.on_connect = on_connect
47    client.on_disconnect= on_disconnect
48    client.on_message = on_message
49 
50    # Connect to broker
51    client.connect(MQTT_BROKER, 1883, 60)
52 
53    # initial value of measurement
54    global measurement
55    measurement = 0
56 
57    ## start MQTT client loop
58    client.loop_start()
59 
60    # Subscribe to a topic
61    client.subscribe("sensor/trigger/client1", 0)
62 
63    ## continuously update measurement value every second
64    ## simulates instrument operation
65    while True:
66        client.publish("sensor/expt1/measurement1", payload = measurement)
67        time.sleep(1)
68        measurement += 1
69 
70    # Disconnection
71    time.sleep(1) # wait till all messages are processed
72    client.loop_stop()
73    client.disconnect()
74 
75if __name__ == '__main__':
76    logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
77    main()

client2 (virtual machine 3 - ip address = 10.0.2.5)

 1## client 2 communications
 2## 
 3## this is an MQTT client code that runs in the background
 4## of client 2
 5## publish: measurement2
 6## subscribe: trigger1
 7## response: if trigger1 then reset measurement2 to 20 
 8 
 9import paho.mqtt.client as mqtt
10import logging
11import socket
12import time
13 
14MQTT_BROKER = '10.0.2.15'
15HOSTNAME = socket.gethostname()
16IPADDR = socket.gethostbyname(HOSTNAME)
17INSTRUMENT_ID = 'machine 2'
18 
19## define logging file
20FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
21logging.basicConfig(filename = '/home/harvey/client2.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
22d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
23 
24# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
25def on_connect(client, userdata, flags, rc):
26    logging.info("Connection returned with result code:" + str(rc), extra = d)
27 
28# Define the callback to hande publish from broker
29def on_message(client, userdata, msg):
30    global measurement
31    logging.info("Received message, topic:" + msg.topic + "  payload:" + str(msg.payload), extra = d)
32    if (msg.topic == 'sensor/trigger/client1'):
33        measurement = 20
34        logging.info("Trigger 1: measurement reset to " + str(measurement), extra = d)
35 
36# Callback handles disconnection, log the rc value
37def on_disconnect(client, userdata, rc):
38    logging.info("Disconnection returned with result code:" + str(rc), extra = d)
39 
40def main():
41 
42    # Create an instance of `Client`
43    client = mqtt.Client()
44    client.on_connect = on_connect
45    client.on_disconnect= on_disconnect
46    client.on_message = on_message
47 
48    # Connect to broker
49    client.connect(MQTT_BROKER, 1883, 60)
50 
51    ## start MQTT client loop
52    client.loop_start()
53 
54    # Subscribe to a topic
55    client.subscribe("sensor/trigger/client1", 0)
56 
57    # initial value of measurement
58    global measurement
59    measurement = 0
60 
61    ## continuously update measurement value every second
62    ## simulates instrument operation
63    while True:
64        client.publish("sensor/expt1/measurement2", payload = measurement)
65        time.sleep(1)
66        measurement += 1
67 
68    # Disconnection
69    time.sleep(1) # wait till all messages are processed
70    client.loop_stop()
71    client.disconnect()
72 
73if __name__ == '__main__':
74    logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
75    main()

Visualization

Running all three codes leads to continuous output which updates every second. measurement1 and measurement2 are stored in the InfluxDB under database lab1 which can be visualized using grafana (default = localhost, port 3000).
measurement1 and measurement2 increment every second. When measurement1 reaches 50 it resets itself to a random value between 20 and 50 and resets measurement2 to 20.