2021-05-16 20:48:34 +00:00
|
|
|
import queue
|
2023-03-27 21:18:29 +00:00
|
|
|
import uuid
|
2021-05-16 20:48:34 +00:00
|
|
|
from exceptions import TooManyConnections
|
|
|
|
|
|
2021-05-18 10:17:02 +00:00
|
|
|
class Sse(object):
|
2023-03-27 21:18:29 +00:00
|
|
|
def __init__(self, id=None, retry=2000):
|
|
|
|
|
if id:
|
|
|
|
|
self.set_event_id( id )
|
2021-05-18 10:17:02 +00:00
|
|
|
self._buffer = []
|
2023-03-27 21:18:29 +00:00
|
|
|
self._retry = retry
|
2021-05-18 10:17:02 +00:00
|
|
|
|
|
|
|
|
def set_retry(self, num):
|
|
|
|
|
"""
|
|
|
|
|
Set distinct retry timeout instead the default
|
|
|
|
|
value.
|
|
|
|
|
"""
|
|
|
|
|
self._retry = num
|
|
|
|
|
self._buffer.append("retry: {0}\n".format(self._retry))
|
|
|
|
|
|
|
|
|
|
def set_event_id(self, event_id):
|
|
|
|
|
if event_id:
|
|
|
|
|
self._buffer.append("id: {0}\n".format(event_id))
|
|
|
|
|
else:
|
|
|
|
|
# Reset event id
|
|
|
|
|
self._buffer.append("id\n")
|
|
|
|
|
|
|
|
|
|
def reset_event_id(self):
|
|
|
|
|
"""
|
|
|
|
|
Send a reset event id.
|
|
|
|
|
"""
|
|
|
|
|
self.set_event_id(None)
|
|
|
|
|
|
|
|
|
|
def _parse_text(self, text, encoding):
|
|
|
|
|
# parse text if is list, tuple or set instance
|
|
|
|
|
if isinstance(text, (list, tuple, set)):
|
|
|
|
|
for item in text:
|
|
|
|
|
if isinstance(item, bytes):
|
|
|
|
|
item = item.decode(encoding)
|
|
|
|
|
|
|
|
|
|
for subitem in item.splitlines():
|
|
|
|
|
yield subitem
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
if isinstance(text, bytes):
|
|
|
|
|
text = text.decode(encoding)
|
|
|
|
|
|
|
|
|
|
for item in text.splitlines():
|
|
|
|
|
yield item
|
|
|
|
|
|
|
|
|
|
def add_message(self, event, text, encoding='utf-8'):
|
|
|
|
|
"""
|
|
|
|
|
Add messaget with eventname to the buffer.
|
|
|
|
|
|
|
|
|
|
:param str event: event name
|
|
|
|
|
:param str/list text: event content. Must be a str or list of str
|
|
|
|
|
:param bool split: splits str content by lines. default(true)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self._buffer.append("event: {0}\n".format(event))
|
|
|
|
|
|
|
|
|
|
for text_item in self._parse_text(text, encoding):
|
|
|
|
|
self._buffer.append("data: {0}\n".format(text_item))
|
|
|
|
|
|
|
|
|
|
self._buffer.append("\n")
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return self.__unicode__()
|
|
|
|
|
|
|
|
|
|
def __unicode__(self):
|
|
|
|
|
return "".join(self._buffer)
|
|
|
|
|
|
|
|
|
|
def flush(self):
|
|
|
|
|
"""
|
|
|
|
|
Reset the internal buffer to initial state.
|
|
|
|
|
"""
|
|
|
|
|
self._buffer = []
|
|
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
|
for item in self._buffer:
|
2021-05-18 10:31:11 +00:00
|
|
|
yield item
|
2021-05-18 10:17:02 +00:00
|
|
|
self.flush()
|
|
|
|
|
|
2021-05-16 20:48:34 +00:00
|
|
|
class SseStream():
|
|
|
|
|
__queues = {}
|
|
|
|
|
__maxConnections = 10
|
|
|
|
|
__queue_size = 50
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def setMaxConnections(maxConnections = 10):
|
|
|
|
|
""" Set maximum number of concurrent connections
|
|
|
|
|
---
|
|
|
|
|
Paremeters:
|
|
|
|
|
------
|
|
|
|
|
maxConnections: int, optional
|
|
|
|
|
The number of maximum concurrent connections ( default value is 10) """
|
|
|
|
|
SseStream.__maxConnections = maxConnections
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def setQueueSize(queueSize = 50):
|
|
|
|
|
""" Set the queue size for stored messages
|
|
|
|
|
---
|
|
|
|
|
Parementers
|
|
|
|
|
------
|
|
|
|
|
queueSize: int, optional
|
|
|
|
|
The number of stored messages in the queue, set to <=0 for unlimited size ( default value is 50 ) """
|
|
|
|
|
SseStream.__queue_size = queueSize
|
|
|
|
|
|
2023-03-27 21:18:29 +00:00
|
|
|
@staticmethod
|
|
|
|
|
def addMessage(event_name, data, id = 0):
|
|
|
|
|
"""
|
|
|
|
|
Adds a event to the queue so it can be sent out
|
|
|
|
|
---
|
|
|
|
|
Parameters:
|
|
|
|
|
------
|
|
|
|
|
event_name: str
|
|
|
|
|
Name of the event
|
|
|
|
|
data: string or list of strings
|
|
|
|
|
Datat to be send
|
|
|
|
|
id: int
|
|
|
|
|
id of the event, if set to 0 no ID will be sent """
|
|
|
|
|
event = Sse()
|
|
|
|
|
if id > 0:
|
|
|
|
|
event.set_event_id(id)
|
|
|
|
|
else:
|
|
|
|
|
event.reset_event_id()
|
|
|
|
|
event.add_message( event_name, data )
|
|
|
|
|
|
|
|
|
|
for k, v in SseStream.__queues.items():
|
|
|
|
|
v.put( event )
|
|
|
|
|
|
|
|
|
|
def __init__(self, id:uuid):
|
2021-05-16 20:48:34 +00:00
|
|
|
""" Init a new stream
|
|
|
|
|
---
|
|
|
|
|
Paremeters:
|
2023-03-27 21:18:29 +00:00
|
|
|
id: uuid
|
2021-05-16 20:48:34 +00:00
|
|
|
------
|
|
|
|
|
"""
|
2023-03-27 21:18:29 +00:00
|
|
|
self._id = id
|
|
|
|
|
|
|
|
|
|
print()
|
|
|
|
|
print()
|
|
|
|
|
print(f'New SSE Stream with ID: {id}')
|
2021-05-16 20:48:34 +00:00
|
|
|
if len( self.__queues ) >= self.__maxConnections:
|
|
|
|
|
raise TooManyConnections
|
|
|
|
|
|
|
|
|
|
if not self.__queues.get( id ):
|
|
|
|
|
self._queue = queue.Queue( self.__queue_size )
|
|
|
|
|
self.__queues[ id ] = self._queue
|
2023-03-27 21:18:29 +00:00
|
|
|
print( self.__queues )
|
2021-05-16 20:48:34 +00:00
|
|
|
|
|
|
|
|
print(f"Queue size: {self.__queue_size}; Connections: { len( self.__queues ) }/{self.__maxConnections}; ID: {id}")
|
2023-03-27 21:18:29 +00:00
|
|
|
print()
|
|
|
|
|
print()
|
2021-05-16 20:48:34 +00:00
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
""" Deregister client from queue - clean up """
|
2023-03-27 21:18:29 +00:00
|
|
|
try:
|
|
|
|
|
del self.__queues[ self._id ]
|
|
|
|
|
print( f"Stream object {self._id} is deleted")
|
|
|
|
|
except KeyError:
|
|
|
|
|
print( f"Stream object {self._id} not found")
|
|
|
|
|
pass
|
2021-05-16 20:48:34 +00:00
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
|
while True:
|
2021-05-18 10:17:02 +00:00
|
|
|
msg = self._queue.get(block=True, timeout=None)
|
2023-03-27 21:18:29 +00:00
|
|
|
print(msg)
|
2021-05-18 10:17:02 +00:00
|
|
|
if isinstance( msg, Sse ):
|
2021-05-16 20:48:34 +00:00
|
|
|
for data in msg:
|
2021-05-18 10:31:11 +00:00
|
|
|
yield data.encode('utf-8')
|
2021-05-16 20:48:34 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
SseStream.setMaxConnections(5)
|
2021-05-18 10:17:02 +00:00
|
|
|
SseStream.setQueueSize(25)
|
2023-03-27 21:18:29 +00:00
|
|
|
print("Starting main...")
|
2021-05-18 10:17:02 +00:00
|
|
|
|
|
|
|
|
|
2023-03-27 21:18:29 +00:00
|
|
|
import threading
|
|
|
|
|
def filler():
|
|
|
|
|
counter = 0
|
|
|
|
|
import time
|
|
|
|
|
while True:
|
|
|
|
|
counter += 1
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
SseStream.addMessage("Ping", str(time.time()), id = counter)
|
2021-05-18 10:17:02 +00:00
|
|
|
|
2023-03-27 21:18:29 +00:00
|
|
|
t = threading.Thread( target=filler )
|
|
|
|
|
t.start()
|
2021-05-18 10:17:02 +00:00
|
|
|
|
2023-03-27 21:18:29 +00:00
|
|
|
from flask import Flask, current_app
|
2021-05-18 10:17:02 +00:00
|
|
|
app = Flask(__name__)
|
|
|
|
|
app.debug = True
|
2023-03-27 21:18:29 +00:00
|
|
|
|
|
|
|
|
@app.route('/stream')
|
|
|
|
|
def stream():
|
|
|
|
|
return current_app.response_class( SseStream(uuid.uuid4()), mimetype='text/event-stream' )
|
|
|
|
|
@app.route('/')
|
|
|
|
|
def index():
|
|
|
|
|
return "OK"
|
2021-05-18 10:17:02 +00:00
|
|
|
app.run()
|
|
|
|
|
|
2021-05-16 20:48:34 +00:00
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|