diff --git a/src/SimpleSseSever.py b/src/SimpleSseSever.py index b82d076..2924bc4 100644 --- a/src/SimpleSseSever.py +++ b/src/SimpleSseSever.py @@ -1,7 +1,90 @@ import queue import sse +import json from exceptions import TooManyConnections +class Sse(object): + def __init__(self, default_retry=2000): + self._buffer = [] + self.set_retry(default_retry) + + def set_retry(self, num): + """ + Set distinct retry timeout instead the default + value. + """ + self._retry = num + self._buffer.append("retry: {0}\n".format(self._retry)) + + def set_event_id(self, event_id): + if event_id: + self._buffer.append("id: {0}\n".format(event_id)) + else: + # Reset event id + self._buffer.append("id\n") + + def reset_event_id(self): + """ + Send a reset event id. + """ + self.set_event_id(None) + + def _parse_text(self, text, encoding): + # parse text if is list, tuple or set instance + if isinstance(text, (list, tuple, set)): + for item in text: + if isinstance(item, bytes): + item = item.decode(encoding) + + for subitem in item.splitlines(): + yield subitem + + else: + if isinstance(text, bytes): + text = text.decode(encoding) + + for item in text.splitlines(): + yield item + + def add_message(self, event, text, encoding='utf-8'): + """ + Add messaget with eventname to the buffer. + + :param str event: event name + :param str/list text: event content. Must be a str or list of str + :param bool split: splits str content by lines. default(true) + """ + + self._buffer.append("event: {0}\n".format(event)) + + for text_item in self._parse_text(text, encoding): + self._buffer.append("data: {0}\n".format(text_item)) + + self._buffer.append("\n") + + def __str__(self): + return self.__unicode__() + + def __unicode__(self): + return "".join(self._buffer) + + def flush(self): + """ + Reset the internal buffer to initial state. + """ + self._buffer = [] + + def __iter__(self): + return self + + def __next__(self): + print("Next in SSE") + print(self._buffer) + for item in self._buffer: + return item + self.flush() + + class SseStream(): __queues = {} __maxConnections = 10 @@ -49,15 +132,19 @@ class SseStream(): print( "Stream object is deleted") def __iter__(self): + print("Iter called") return self def __next__(self): """ Method which is called regular to send out the event """ + print("Next called") + print(f'Queue Size: {self._queue.qsize()}') while True: - msg = self.queue.get(block=True, timeout=None) - if isinstance( msg, sse.Sse ): + msg = self._queue.get(block=True, timeout=None) + if isinstance( msg, Sse ): for data in msg: - yield data + print(data) + return data.encode('utf-8') def addMessage(self, event_name, data, id = 0): """ @@ -71,14 +158,14 @@ class SseStream(): Datat to be send id: int id of the event, if set to 0 no ID will be sent """ - event = sse.Sse() - if id > 0: - event.set_event_id(id) - else: - event.reset_event_id() + event = Sse() + # if id > 0: + # event.set_event_id(id) + # else: + # event.reset_event_id() event.add_message( event_name, data ) + self._queue.put( event ) - self._queue.put(event) @@ -86,8 +173,26 @@ class SseStream(): def main(): SseStream.setMaxConnections(5) - SseStream.setQueueSize(10) - sse = SseStream("Test") + SseStream.setQueueSize(25) + print("Main...") + teststream = SseStream("Test") + + for i in range(1, 10): + teststream.addMessage( "test", f'Message Nr: {i}', i ) + + from flask import Flask, current_app, Blueprint + + sse = Blueprint('sse', __name__) + + @sse.route('') + def stream(): + return current_app.response_class( teststream, mimetype='text/event-stream' ) + + app = Flask(__name__) + app.debug = True + app.register_blueprint(sse, url_prefix='/stream') + app.run() + if __name__ == "__main__": main() \ No newline at end of file