from json import loads import db_handle from jsonwind import json_concat import mqtt_comm import time import threading global store_count #global variable, so no need to run sql_count global lock lock = threading.Lock() #thread functions are defined here, as well as global variables class Publisher(): def __init__(self,publish_flag,pub_arc_flag,topic,MQTT_client,qos,daemonTh,ThName,source,store_flag,order,n): global lock self.topic=topic; self.qos=qos self.source=source self.client_mqtt = MQTT_client self.publish_flag=publish_flag self.pub_arc_flag=pub_arc_flag self.daemon=daemonTh self.threadName= ThName self.source=source self.store_flag = store_flag self.order =order self.n = n if (daemonTh==True): self.t1=threading.Thread #function to assure that data has been published def getTopic(self): return self.topic def publish_and_wait(self,count,message): if (loads(message)["time"] is not ""): if (self.client_mqtt.connected): (r_code,msg_id)=self.client_mqtt.client.publish(self.topic,message,qos = self.qos) if(r_code==0 and self.source =='archive'): if(msg_id%500==0): print("Message",msg_id, "was delivered with success from the", self.source) return True if(r_code==0 and self.source is not'archive'): print("Message",msg_id, "was delivered with success from the", self.source) return True if(r_code is not 0 and self.store_flag): print("Error no:",r_code,"saving message to archive") self.write_line_db_sub(message) return False else: if(self.store_flag): self.write_line_db_sub(message) def publish(self,message=None): global store_count if (self.publish_flag ==True and self.pub_arc_flag == False): self.publish_and_wait(store_count,json_concat(message,self.topic)) if (self.publish_flag ==True and self.pub_arc_flag==True and self.daemon==True and store_count>2): self.publish_from_archive() if (self.publish_flag ==True and self.pub_arc_flag==True and self.daemon==False): self.check_publish_stored_files() #function to write one line to the DB def write_line_db_sub(self,msg_pack): global store_count global lock lock.acquire() data = loads(msg_pack); sqlite_db=db_handle.Sqlite3_DB(); sqlite_db.cursor.execute("INSERT into %s values (?, ?);"%(sqlite_db.tablename),[data["time"] , msg_pack]) sqlite_db.conn.commit(); store_count=store_count+1 if (store_count >1): print("Message ",store_count," stored"); sqlite_db.conn.close() lock.release() return; #function to publish from archive lines and delete them after def check_publish_stored_files(self): if(self.client_mqtt.connected): starter = time.time()#count time to execute the function global store_count sqlite_db=db_handle.Sqlite3_DB() if(store_count>0): if (self.order=='oldest'): print("SQL Archive has",store_count,"rows.") if (store_count >self.n): end= self.n; else: end= store_count i=0 if (self.order=='oldest'): # if the published files are gonna be the oldest (publish from archive) for row in sqlite_db.select_oldest(end): lock.acquire() if(self.publish_and_wait(store_count,json_concat(row[0],self.topic))): store_count = store_count-1 i=i+1 time.sleep(0.002) if (lock.locked()): lock.release() sqlite_db.remove_oldests_from_db(i) time_publish=time.time()-starter # print("############################################################") # print("time taken to publish",end,self.order," numbers was",time_publish) # print("############################################################") if (self.order=='newest'): # if the published files are gonna be the newest (publish actual data from sensor) end = 1 #only allows 1 line to be selected, since there's no sleep for row in sqlite_db.select_newest(end): if(self.publish_and_wait(store_count,json_concat(row[0],self.topic))): store_count = store_count-1 i=i+1 sqlite_db.remove_newest_from_db(i)#delete line sqlite_db.conn.commit() sqlite_db.conn.close() return #thread that runs when publishing from archive def publish_from_archive(self): ThNames= []; for thread in threading.enumerate(): ThNames.append(thread.name); if (self.threadName in ThNames): print("Thread",self.threadName,"is busy")#check if not empty else: self.t1 = threading.Thread(name =self.threadName,target=self.check_publish_stored_files, args=()) self.t1.setDaemon(True); self.t1.start() def write_line_db_sub(self,msg_pack): global store_count data = loads(msg_pack); sqlite_db=db_handle.Sqlite3_DB(); sqlite_db.cursor.execute("INSERT into %s values (?, ?);"%(sqlite_db.tablename),[data["time"] , msg_pack]) sqlite_db.conn.commit(); store_count=store_count+1 if (store_count>1): print("Message ",store_count," stored"); sqlite_db.conn.close() return;