Bulk DLMS Smart Meter data push to AWS NLB using S3, Lambda & SQS.
An article discussing the architecture and python code to push static DLMS smart meter reads to a HES (Head End System) and into a Postgres RDS via SQS and Lambda functions.
In a recent company, I was part of a team working to build a HES (Head End System) for smart electric meters, and the goal was to read 15-min load profile data for 10-million simulated meter reads within 15-minutes. Lofty goal?
Yes, absolutely.
I was in charge of the AWS infrastructure within the team since everyone else was a programmer. As my first task, I set out to create a simulator so that we could send large volumes of DLMS data as if multiple smart meters are sending their data simultaneously to the NLB (Network Load Balancer) and then into the AWS VPC that hosted our HES. I had recently acquired an AWS Cloud Practitioner certification and wanted to put that to use, so I obliged.
I split the task into two parts. One was to create sequential meter-ids for the numbers of simulated meters in our population, daily midnight timestamped meter reads, and then populate the load profile for these meters in DynamoDB tables. This part of the simulator code can run every day via an AWS Event bridge cron event. The idea was to generate random daily reads, and daily load profile reads for our population of meters in the simulator tables. The second part was to convert these daily reads and the load profile data into DLMS data before sending it to the NLB.
The second part of the simulator is described here.
Here is the architecture diagram of the push simulator
The first lambda function ‘insert_pushdata_into_sqs’ as described in the architecture diagram, reads the CSV file from the S3 bucket, parses each line in the CSV file, reads just the second field (field[1]), which is the core DLMS message, and adds to the standard SQS queue.
The CSV file called ‘dlms.csv’:
File uploaded to S3 bucket:
The python code for the lambda function: insert_pushdata_into_sqs
import json
import boto3
import logging
import csv
import time
from datetime import datetime,timedelta
from dateutil.tz import tzlocal
from botocore.exceptions import ClientError# S3 Parameters
s3_client = boto3.client("s3")
S3_BUCKET_NAME = 'bucket_name' # add your bucket name here
S3_BUCKET_PREFIX = 'filename.csv' # add your filename here# SQS Parameters
sqs_client = boto3.client('sqs')
QueueName='<NameofQueue>' # add your queue name here
QueueUrl='<queue_url>' # add your queue url heredef lambda_handler(event, context):
# CSV File Parameters
cnt = 0
csvfile = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=S3_BUCKET_PREFIX)
csvcontent = csvfile['Body'].read().split(b'\n')
# SQS Parameters
sqs_queue_url = sqs_client.get_queue_url(QueueName=QueueName) ['QueueUrl']
# Print each row
for row in csvcontent:
# increment count
cnt = cnt + 1
# decode it so that bytes get converted to string and then split str row on a comma
row = row.decode()
fields = row.split(',')
if len(fields) > 1:
meterId = fields[0]
dlms_msg = fields[1]
print(dlms_msg)
# Now add each dlms msg to the SQS queue
try:
msg = sqs_client.send_message(QueueUrl=sqs_queue_url,
MessageBody=json.dumps(dlms_msg))
except ClientError as e:
logging.error(e)
# Print the end message with local time stamp
utc_time = datetime.now(tzlocal()) # gives UTC time
local_time = utc_time + timedelta(minutes=330) # Add 5.30 hrs = 330 minutes to get ISTprint(f"==> {cnt-1} Messages Sent at {local_time} ...")return {
'statusCode': 200,
'body': json.dumps('Reading from CSV Completed & Inserted into SQS Queue = SimDLMSMessages!')
}
The code for the second lambda function: insert_dlms_msgs_into_nlb
#Fire this lambda for each message in SQS Queue so that we can mimic parallel dlms message from various meters simulteneously
import json
import boto3
import logging
import csv
import socket
import time
from datetime import datetime,timedelta
from dateutil.tz import tzlocalfrom botocore.exceptions import ClientError# SQS Parameters
sqs_client = boto3.client('sqs')
QueueName='<NameofQueue>'
QueueUrl='<queue_url>'# NLB Settings
HOST='x.xxx.x.xxx' # add the NLB elastic IP here
PORT=8085def lambda_handler(event, context):
#Python code that gets input and processes it.
for record in event['Records']:
payload = record["body"]
print(f"===> Message Payload is {str(payload)}")
# Connect and ioen a socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST,PORT))
print(f"===> Connected to {HOST} on Port {PORT} ...")
# Send the message to the NLB
s.sendall(payload.encode());
# print the local time when msg was sent
utc_time = datetime.now(tzlocal()) # gives UTC time
local_time = utc_time + timedelta(minutes=330)
# Add 5.30 hrs = 330 minutes to get IST print(f"===> Message Sent at {local_time} ...")
s.close()
Please take care of the following from the code above:
- The Host and the Port of the NLB are defined correctly. Use ‘Hercules’ software in your Windows machine and its TCP client tab to check if the ports are open on the above NLB. The DNS name of the NLB did not work for me, so I used the elastic IP address attached to the NLB. Debugging this issue took me two days.
- Use python’s socket library to set up a TCP stream. Make sure you use the ‘socket.AF_INET, socket. SOCK_STREAM’ or else it will not open a TCP connection
- Use the s.sendall command to send all data in one transaction
- If the server on the listener is not sending an ACK back to the client, do not wait for a response with the s.recv command.
Since the second lambda function HAS to be triggered by the SQS, the setup of that is shown below:
Results:
Run the first lambda function manually by clicking on the ‘Test’ button. The results are shown below, where the seven(7) rows and their DLMS payload are posted to the SQS queue
Since the second lambda function gets triggered by the SQS, use the CloudWatch and its log streaming to get the output sent to a log file.
And finally, the log files from the two (2) listeners that were in the target group specified for the NLB, received these messages:
Next Steps:
- Increase the number of records in the CSV file to 1k, 5k, 10k, 50k, 100k, and 1 million, eventually.
- Check if any HTTP Sockets errors come up when so many messages are sent to the NLB simultaneously.
- Enable Auto-Scaling of the EC2 Listeners so that new EC2 instances are created as required as load increases.
Stay tuned for:
- I am working on the first part of this simulator, which will generate random loads for daily Kwh and daily load profile data. and convert them to DLMS format before saving it to DynamoDB and then to the CSV file
- The scalable HES code will ingest at least 1-million DLMS messages parse and insert them into an Aurora RDS Postgres database.
(Disclaimer: These are code snippets that I have written and architecture drawings I have drawn to explain the concept. Please tweak the code to your specific needs before using it. These are my views only and are not representative of any company I am currently working for or have worked in the past)
Please visit my portfolio site for contact details.
Thanks for reading.