NBIoT App System Demo Implementation With Python

The narrow band internet of things is coming, as known as NBIoT, which is used for machine to machine communications. Examples are devices for meter reading like electricity, gas, or water consumption. They are often stationary, thus need not an optimized handover. Only a small amount of data is usually transferred, which is even not delay sensitive. However, the number of these devices may become quite big, even up to several orders of magnitude compared to the traditional devices. NB-IoT network is consist of terminal devices, radio access network, core network, IoT server platform and user applications. This part focuses on the IoT server platform and user applications.

System Overview

The details of the network elements from the service operators are ignored here. I just view this system from the applications perspective. There are 5 subsystem for the whole application system. The IOT device part, the data receiving server, the web server, the database and the browser part.

The IOT device part contains sensors, micro control unit and the IOT module. Of course, devices like Raspberry PI or android devices that have operation system will also be ok, I just keep it simple, the control unit has no operation system, the IOT module is used for network acccessing. I will left this part for another article. And a fake client which can send UDP datagrams will used for testing. So no OS, no software updates. I won’t have to consider the NAT hole punching problems.

The data receving server should be accessible through the Internet. MySQL is used as the database. And the webserver is nginx. Actully the web part is a stand alone project and there is too much non-technical business logic. I will start another writing for that.

The browser part can be iOS app, android app, browser on mobile devices or laptops or even WeChat programs. I am not interested in this part, and it’ll be quite easy to implement.

The Data Collection Server

The messages are sent to this server. This server should have a mirror backup for redundancy. If there are performance issues, some load sharing machenism can be involved, the database can be separated if necessary.

Message Reception

The IOTMsgReception is for fetching a incoming message from the socket, transfer it from the bytes to a string and append it to the message queue. The message queue is a typical FIFO structure without size limit. At the same time, the queue is thread safe.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import socket
import threading


class IOTMsgReception(threading.Thread):

def __init__(self, identifier, name, queue, ipaddr, port, buffersize):
threading.Thread.__init__(self)
self.identifier = identifier
self.buffersize = buffersize
self.name = name
self.queue = queue
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((ipaddr, port))

def __del__(self):
self.close()

def get(self):
data, addr = self.sock.recvfrom(self.buffersize)
return bytes.decode(data)

def close(self):
self.sock.close()

def run(self):
while True:
msg = self.get()
self.queue.put(msg)

Message Processor

IOTMsgProcessors get the message simutaneously from the message queue in parallel, one processor holds a permanent connection to the database. Once the message is processed, it’ll be write to the database.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import threading
import pymysql
from configparser import ConfigParser


class IOTMsgProcessor(threading.Thread):

def __init__(self, identifier, name, queue):
threading.Thread.__init__(self)
self.identifier = identifier
self.name = name
self.queue = queue

# Global Configurations
self.config = ConfigParser()
self.config.read("server.config")

try:
self.db = pymysql.connect(host=self.config.get("db", "host"),
port=self.config.getint("db", "port"),
user=self.config.get("db", "user"),
passwd=self.config.get("db", "pass"),
db=self.config.get("db", "name"),
charset=self.config.get("db", "char"))
self.cursor = self.db.cursor()
except:
print("Fail to connect to database")

def __del__(self):
self.db.close()

def run(self):
while True:
line = self.queue.get()
self.proc(line)

def proc(self, data_to_save):
elements = str(data_to_save).split(":")

# Access Database
searchsql = self.replace_params(self.config.get("sql", "search"), "@PARAM@", elements[0])
updatesql = self.replace_params(self.config.get("sql", "update"), "@PARAM@", elements[1], elements[0])
insertsql = self.replace_params(self.config.get("sql", "insert"), "@PARAM@", elements[0], elements[1])

self.cursor.execute(searchsql)

if self.cursor.fetchone():
try:
self.cursor.execute(updatesql)
self.db.commit()
except:
self.db.rollback()
else:
try:
self.cursor.execute(insertsql)
self.db.commit()
except:
self.db.rollback()

def replace_params(self, line, mark, *params):
for i in range(len(params)):
line = str(line).replace(mark, params[i], 1)

return line

The Main Function

The main creates the queue, the reception thread and the processor threads, finally goes to sleep to wait for joining all the threads.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from queue import Queue
from iot_reception import *
from iot_processor import *


def main():
# Global Configurations
config = ConfigParser()
config.read("server.config")
threads_list = []

# Message Queue
msg_queue = Queue()

# Message Reception Thread
thread = IOTMsgReception("ThreadIDReception",
"ThreadReception",
msg_queue,
config.get("server", "ipaddress"),
config.getint("server", "udpport"),
config.getint("server", "buffersize"))
threads_list.append(thread)
thread.start()

# Message Processing Threads
for index in range(0, config.getint("concurrent", "process_thread_number")):
thread = IOTMsgProcessor("ThreadIDProcessor" + str(index),
"ThreadProcessor" + str(index),
msg_queue)
threads_list.append(thread)
thread.start()

# Join all the threads
for index in range(0, len(threads_list)):
threads_list[index].join()


if __name__ == "__main__":
main()

Configuration File

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[db]
host=172.17.237.168
port=3306
user=xxxxxx
pass=xxxxxx
name=NbIOT
char=utf8

[server]
ipaddress=
udpport=9999
buffersize=50

[concurrent]
process_thread_number=10

[sql]
search=SELECT * FROM NbIOT.IOTHardwareInUse where HardwareID='@PARAM@'
update=UPDATE `NbIOT`.`IOTHardwareInUse` SET `Data`='@PARAM@' WHERE `HardwareID`='@PARAM@'
insert=INSERT INTO `NbIOT`.`IOTHardwareInUse` (`HardwareID`, `Data`) VALUES ('@PARAM@', '@PARAM@')

The Client Simulation For Test

Pack a udp datagram and send it out, I was trying to fake the source IP address to make the data more real, but because of the existence of NAT, the source IP address has been changed when it arrives at the server. It’s all right for the simple test, so I’d like to just keep it like this.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from kamene.all import *
from kamene.layers.inet import IP, UDP

# IP Layer
# Actually it makes no sense to change the source ip addr, since there's NAT.
srcip = "11.11.111.111"
dstip = "47.95.194.185"
iplayer = IP(src=srcip, dst=dstip)

# UDP Layer
srcport = random.randint(1024, 65535)
dstport = [9999]
udplayer = UDP(sport=srcport, dport=dstport)

# APP Layer data
position = ["DL", "SY", "BJ", "SH", "TK"]
district = ["AA", "BB", "CC", "DD", "EE"]
identify = ["00", "01", "02", "03", "04"]
data_min = 10
data_max = 40

# Send 5 times
for i in range(0, 5):
prefix = position[random.randint(0, len(position) - 1)] + \
district[random.randint(0, len(district) - 1)] + \
identify[random.randint(0, len(identify) - 1)]
surfix = str(random.randint(data_min, data_max))
message = prefix + ":" + surfix
send(iplayer / udplayer / message)