Towards Lab IoT Part Two
In order to test communication we'll set up two clients, each representing an instrument, along with a server which acts as the MQTT broker, database and visualization. The three machines are all running ubuntu under VirtualBox.
In this case client1 sends measurement1 data which resets to a random value between 20 and 50 when it receives a signal from trigger. client2 sends measurement2 data which resets to 20 when it receives a signal from trigger. The controller monitors measurement1 and signals the trigger when measurement1 is above 50. The controller also updates influxDB with data from measurement1 and measurement2. client1 and client2 represent the two clients, running instruments and the server is controller. Codes for each are below.
controller (virtual machine 1 - ip address = 10.0.2.15)
1## controller
2##
3## this is the controller code that runs in the background
4## it subscribes to measurement1 and when that value
5## reaches the threshold of 50 it publishes a trigger
6## event that client 1 and client 2 both respond to
7##
8## It also doubles up as the subscriber for the influxDB
9## database which is code that can be run in a separate
10## thread or program
11##
12## publish: trigger1
13## subscribe: measurement1
14## response: if measurement1 > 50 then publish trigger
15
16import paho.mqtt.client as mqtt
17from influxdb import InfluxDBClient
18import datetime
19import logging
20import socket
21import time
22
23MQTT_BROKER = '10.0.2.15'
24HOSTNAME = socket.gethostname()
25IPADDR = socket.gethostbyname(HOSTNAME)
26INSTRUMENT_ID = 'controller'
27INFLUX_CLIENT = '10.0.2.15'
28INFLUX_DATABASE = 'lab1'
29
30# influxDB
31influx_client = InfluxDBClient(INFLUX_CLIENT, database = INFLUX_DATABASE)
32
33## define logging file
34FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
35logging.basicConfig(filename = '/home/harvey/controller.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
36d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
37
38# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
39def on_connect(client, userdata, flags, rc):
40 logging.info("Connection returned with result code:" + str(rc), extra = d)
41
42# Define the callback to hande publish from broker
43def on_message(client, userdata, msg):
44 # log the response
45 logging.info("Received message, topic:" + msg.topic + " payload:" + str(msg.payload), extra = d)
46
47 # push to influxDB
48 current_time = datetime.datetime.utcnow().isoformat()
49 topic = msg.topic.split('/')
50 measurement_type = topic[2]
51 json_body = [
52 {
53 "measurement": measurement_type,
54 "tags": {},
55 "time": current_time,
56 "fields": {
57 "value": int(msg.payload)
58 }
59 }
60 ]
61 influx_client.write_points(json_body)
62
63 # fire the trigger if measurement1 > 50
64 if (msg.topic == 'sensor/expt1/measurement1'):
65 if (int(msg.payload) > 50):
66 client.publish("sensor/trigger/client1", payload = str(msg.payload))
67
68# Callback handles disconnection, log the rc value
69def on_disconnect(client, userdata, rc):
70 logging.info("Disconnection returned with result code:" + str(rc), extra = d)
71
72def main():
73
74 # Create an instance of `Client`
75 client = mqtt.Client()
76 client.on_connect = on_connect
77 client.on_disconnect= on_disconnect
78 client.on_message = on_message
79
80 # Connect to broker
81 client.connect(MQTT_BROKER, 1883, 60)
82
83 # Subscribe to all topics and start the loop
84 client.subscribe("sensor/+/+", 0)
85 client.loop_forever()
86
87if __name__ == '__main__':
88 logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
89 main()
client1 (virtual machine 2 - ip address = 10.0.2.4)
1## client 1 communications
2##
3## this is an MQTT client code that runs in the background
4## of client 1
5## publish: measurement1
6## subscribe: trigger1
7## response: if trigger1 then reset measurement1 to a
8## random value between 20 and 50
9
10import paho.mqtt.client as mqtt
11import logging
12import socket
13import random
14import time
15
16MQTT_BROKER = '10.0.2.15'
17HOSTNAME = socket.gethostname()
18IPADDR = socket.gethostbyname(HOSTNAME)
19INSTRUMENT_ID = 'machine 1'
20
21## define logging file
22FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
23logging.basicConfig(filename = '/home/harvey/client1.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
24d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
25
26# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
27def on_connect(client, userdata, flags, rc):
28 logging.info("Connection returned with result code:" + str(rc), extra = d)
29
30# Define the callback to hande publish from broker
31def on_message(client, userdata, msg):
32 global measurement
33 logging.info("Received message, topic:" + msg.topic + " payload:" + str(msg.payload), extra = d)
34 if (msg.topic == 'sensor/trigger/client1'):
35 measurement = random.randrange(20, 50)
36 logging.info("Trigger 1: measurement reset to " + str(measurement), extra = d)
37
38# Callback handles disconnection, log the rc value
39def on_disconnect(client, userdata, rc):
40 logging.info("Disconnection returned with result code:" + str(rc), extra = d)
41
42def main():
43
44 # Create an instance of `Client`
45 client = mqtt.Client()
46 client.on_connect = on_connect
47 client.on_disconnect= on_disconnect
48 client.on_message = on_message
49
50 # Connect to broker
51 client.connect(MQTT_BROKER, 1883, 60)
52
53 # initial value of measurement
54 global measurement
55 measurement = 0
56
57 ## start MQTT client loop
58 client.loop_start()
59
60 # Subscribe to a topic
61 client.subscribe("sensor/trigger/client1", 0)
62
63 ## continuously update measurement value every second
64 ## simulates instrument operation
65 while True:
66 client.publish("sensor/expt1/measurement1", payload = measurement)
67 time.sleep(1)
68 measurement += 1
69
70 # Disconnection
71 time.sleep(1) # wait till all messages are processed
72 client.loop_stop()
73 client.disconnect()
74
75if __name__ == '__main__':
76 logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
77 main()
client2 (virtual machine 3 - ip address = 10.0.2.5)
1## client 2 communications
2##
3## this is an MQTT client code that runs in the background
4## of client 2
5## publish: measurement2
6## subscribe: trigger1
7## response: if trigger1 then reset measurement2 to 20
8
9import paho.mqtt.client as mqtt
10import logging
11import socket
12import time
13
14MQTT_BROKER = '10.0.2.15'
15HOSTNAME = socket.gethostname()
16IPADDR = socket.gethostbyname(HOSTNAME)
17INSTRUMENT_ID = 'machine 2'
18
19## define logging file
20FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
21logging.basicConfig(filename = '/home/harvey/client2.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
22d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
23
24# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
25def on_connect(client, userdata, flags, rc):
26 logging.info("Connection returned with result code:" + str(rc), extra = d)
27
28# Define the callback to hande publish from broker
29def on_message(client, userdata, msg):
30 global measurement
31 logging.info("Received message, topic:" + msg.topic + " payload:" + str(msg.payload), extra = d)
32 if (msg.topic == 'sensor/trigger/client1'):
33 measurement = 20
34 logging.info("Trigger 1: measurement reset to " + str(measurement), extra = d)
35
36# Callback handles disconnection, log the rc value
37def on_disconnect(client, userdata, rc):
38 logging.info("Disconnection returned with result code:" + str(rc), extra = d)
39
40def main():
41
42 # Create an instance of `Client`
43 client = mqtt.Client()
44 client.on_connect = on_connect
45 client.on_disconnect= on_disconnect
46 client.on_message = on_message
47
48 # Connect to broker
49 client.connect(MQTT_BROKER, 1883, 60)
50
51 ## start MQTT client loop
52 client.loop_start()
53
54 # Subscribe to a topic
55 client.subscribe("sensor/trigger/client1", 0)
56
57 # initial value of measurement
58 global measurement
59 measurement = 0
60
61 ## continuously update measurement value every second
62 ## simulates instrument operation
63 while True:
64 client.publish("sensor/expt1/measurement2", payload = measurement)
65 time.sleep(1)
66 measurement += 1
67
68 # Disconnection
69 time.sleep(1) # wait till all messages are processed
70 client.loop_stop()
71 client.disconnect()
72
73if __name__ == '__main__':
74 logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
75 main()
Visualization
Running all three codes leads to continuous output which updates every second. measurement1 and measurement2 are stored in the InfluxDB under database lab1 which can be visualized using grafana (default = localhost, port 3000).
measurement1 and measurement2 increment every second. When measurement1 reaches 50 it resets itself to a random value between 20 and 50 and resets measurement2 to 20.