This commit is contained in:
Markus Bauer 2021-05-18 12:17:02 +02:00
parent 8d3284f261
commit 86f0569201
1 changed files with 116 additions and 11 deletions

View File

@ -1,7 +1,90 @@
import queue import queue
import sse import sse
import json
from exceptions import TooManyConnections from exceptions import TooManyConnections
class Sse(object):
def __init__(self, default_retry=2000):
self._buffer = []
self.set_retry(default_retry)
def set_retry(self, num):
"""
Set distinct retry timeout instead the default
value.
"""
self._retry = num
self._buffer.append("retry: {0}\n".format(self._retry))
def set_event_id(self, event_id):
if event_id:
self._buffer.append("id: {0}\n".format(event_id))
else:
# Reset event id
self._buffer.append("id\n")
def reset_event_id(self):
"""
Send a reset event id.
"""
self.set_event_id(None)
def _parse_text(self, text, encoding):
# parse text if is list, tuple or set instance
if isinstance(text, (list, tuple, set)):
for item in text:
if isinstance(item, bytes):
item = item.decode(encoding)
for subitem in item.splitlines():
yield subitem
else:
if isinstance(text, bytes):
text = text.decode(encoding)
for item in text.splitlines():
yield item
def add_message(self, event, text, encoding='utf-8'):
"""
Add messaget with eventname to the buffer.
:param str event: event name
:param str/list text: event content. Must be a str or list of str
:param bool split: splits str content by lines. default(true)
"""
self._buffer.append("event: {0}\n".format(event))
for text_item in self._parse_text(text, encoding):
self._buffer.append("data: {0}\n".format(text_item))
self._buffer.append("\n")
def __str__(self):
return self.__unicode__()
def __unicode__(self):
return "".join(self._buffer)
def flush(self):
"""
Reset the internal buffer to initial state.
"""
self._buffer = []
def __iter__(self):
return self
def __next__(self):
print("Next in SSE")
print(self._buffer)
for item in self._buffer:
return item
self.flush()
class SseStream(): class SseStream():
__queues = {} __queues = {}
__maxConnections = 10 __maxConnections = 10
@ -49,15 +132,19 @@ class SseStream():
print( "Stream object is deleted") print( "Stream object is deleted")
def __iter__(self): def __iter__(self):
print("Iter called")
return self return self
def __next__(self): def __next__(self):
""" Method which is called regular to send out the event """ """ Method which is called regular to send out the event """
print("Next called")
print(f'Queue Size: {self._queue.qsize()}')
while True: while True:
msg = self.queue.get(block=True, timeout=None) msg = self._queue.get(block=True, timeout=None)
if isinstance( msg, sse.Sse ): if isinstance( msg, Sse ):
for data in msg: for data in msg:
yield data print(data)
return data.encode('utf-8')
def addMessage(self, event_name, data, id = 0): def addMessage(self, event_name, data, id = 0):
""" """
@ -71,14 +158,14 @@ class SseStream():
Datat to be send Datat to be send
id: int id: int
id of the event, if set to 0 no ID will be sent """ id of the event, if set to 0 no ID will be sent """
event = sse.Sse() event = Sse()
if id > 0: # if id > 0:
event.set_event_id(id) # event.set_event_id(id)
else: # else:
event.reset_event_id() # event.reset_event_id()
event.add_message( event_name, data ) event.add_message( event_name, data )
self._queue.put( event )
self._queue.put(event)
@ -86,8 +173,26 @@ class SseStream():
def main(): def main():
SseStream.setMaxConnections(5) SseStream.setMaxConnections(5)
SseStream.setQueueSize(10) SseStream.setQueueSize(25)
sse = SseStream("Test") print("Main...")
teststream = SseStream("Test")
for i in range(1, 10):
teststream.addMessage( "test", f'Message Nr: {i}', i )
from flask import Flask, current_app, Blueprint
sse = Blueprint('sse', __name__)
@sse.route('')
def stream():
return current_app.response_class( teststream, mimetype='text/event-stream' )
app = Flask(__name__)
app.debug = True
app.register_blueprint(sse, url_prefix='/stream')
app.run()
if __name__ == "__main__": if __name__ == "__main__":
main() main()