worktree-test/main.py

173 lines
6.1 KiB
Python
Raw Permalink Normal View History

2021-03-03 15:39:43 +00:00
"""
Created on Tue May 5 15:37:56 2020
@author: nits
connection:
green and white = B
yellow and brown = A
"""
import windsensor
from jsonwind import json_wma,json_debug,json_concat
from json import loads
import sighand
import mqtt_comm
import time
import avg_values
import configfile
import db_handle
from threading import Timer
from os import listdir
from os.path import isfile, join
from os import remove
import threading
from threadf import Publisher
import threadf
threadf.store_count=0
def main():
#extra threads and parse file
conf = configfile.Configuration()
#initialization of variables and objects
signal_handler=sighand.SignalHandler()
wind = windsensor.WindSensor()
speed_mean= avg_values.AverageValue()
# topic = conf.topic
days=conf.days_archive
topic_debug = conf.topic_debug
debug_mode = conf.debug_mode
qos_mqtt=conf.qos
start = time.time()
numberOfHosts=conf.numberOfHosts
numberOfPublishers=conf.numberOfPublishers
#check the hosts and publisher objects
print("Nr of hosts: ", numberOfHosts)
hosts = []
print("Nr of publishers: ", numberOfPublishers)
publishers = []
for i in range(1, numberOfHosts+1):
name = "Host"+str(i)
port = conf.config[name].getint('port')
ident = conf.config[name].get('client_name')
connect = conf.config[name].getboolean('connect')
host_name = conf.config[name].get('name')
host = mqtt_comm.MQTT_Client(host_name, ident, port,connect)
hosts.append(host)
for host in hosts:
if(host.connect):
host.client_connect()
host.client.loop_start()
#create the publisher objects and link them to the hosts
for i in range(1, numberOfPublishers+1):
name = "Publisher"+str(i)
pubflag=conf.config[name].getboolean('pubflag')
arcflag=conf.config[name].getboolean('arc_flag')
daemonTh=conf.config[name].getboolean('daemonTh')
st_flag=conf.config[name].getboolean('store_flag')
topic = conf.config[name].get('topic')
host_n=conf.config[name].getint('host_n')
thName=conf.config[name].get('threadName')
source=conf.config[name].get('source')
order=conf.config[name].get('order')
n=conf.config[name].getint('n')
print("publisher",i,"daemon =",daemonTh,"archive=",arcflag,"publish=",pubflag)
Publisher_n = threadf.Publisher(pubflag,arcflag,topic,hosts[host_n-1],qos_mqtt,daemonTh,thName,source,st_flag,order,n)
publishers.append(Publisher_n)
#connect to SQL
while True:
try:
sqinit=db_handle.Sqlite3_DB();
break;
except db_handle.sqlite3.Error as er:
print('SQLite error: %s' % (' '.join(er.args)))
print("Exception class is: ", er.__class__)
time.sleep(3)
threadf.store_count=sqinit.sql_count();
print("Archive has INITIALLY ",threadf.store_count," rows")
time_json=""
while (True):
# if(threadf.quit==False):
if(time.time() - start > 1.0 or ((time_json)is "")): #Wait 1 second to run loop, if sensor comm fails, will try faster so data won't be lost
if(threadf.lock.locked()):
threadf.lock.release()#release lock to run the main thread
#print("Lock released for running the main routines")
threadf.lock.acquire()
start = time.time()
#load data from sensors and calculate mean
time_act = wind.readDateTime()
windspeed_act = wind.readWindSpeed()
wind_direct= wind.readWindDirection()
wind_temp=wind.readTemp()
speed_mean.insert_value_list2(windspeed_act)
speed_mean.insert_value_list10(windspeed_act)
speed_m2=speed_mean.get_m2()
speed_max2=speed_mean.get_max2()
speed_m10=speed_mean.get_m10()
speed_max10=speed_mean.get_max10()
rm_number=1800 #number of removed lines when requested, 1800 = half an hour
if(threadf.store_count>(days*86400+rm_number)):#number of samples per day(size in lines)
startc = time.time()
print('Too many files on the archive, removing older than',days,'days size')
sqinit.remove_oldests_from_db(rm_number);#clean DB if DB is too large
threadf.store_count=threadf.store_count-rm_number;
timec=time.time()-startc
print("time taken to remove",rm_number," lines from the old data was: ",timec)#computes and displays the time taken
json_pack = json_wma(time_act,windspeed_act,wind_direct,wind_temp,speed_m2,speed_max2,speed_m10,speed_max10)#generate json pack with values
time_json=loads(json_pack)["time"];
if (time_json is not ""):
publishers[0].write_line_db_sub(json_pack)#write line to db using the first publisher object
for pub in publishers:
pub.publish(json_concat(json_pack,pub.topic))
#write line to db
#debug mode
if (debug_mode ==1):
errorcounter = wind.errorcounter
kompasswinkel_act = wind.readCompassAngle()
json_pack_debug = json_debug(time_act,windspeed_act,wind_direct,errorcounter,kompasswinkel_act) #print information on console
print(json_pack_debug)
if(threadf.lock.locked()):#release thread lock
threadf.lock.release()
#check if sensor was read, in case not, it will skip the sleep in order to get the reading faster
# print("Cycle duration:",time.time()-start,"s")# print time
try:
if(time_json is not ""):
time.sleep(1-(time.time()-start)) #treats exception if time bigger than 1.0s
except:
print("Cycle>1s lasted:",time.time()-start,"seconds")
if __name__ == "__main__":
main()