import queue import uuid from exceptions import TooManyConnections class Sse(object): def __init__(self, id=None, retry=2000): if id: self.set_event_id( id ) self._buffer = [] self._retry = retry def set_retry(self, num): """ Set distinct retry timeout instead the default value. """ self._retry = num self._buffer.append("retry: {0}\n".format(self._retry)) def set_event_id(self, event_id): if event_id: self._buffer.append("id: {0}\n".format(event_id)) else: # Reset event id self._buffer.append("id\n") def reset_event_id(self): """ Send a reset event id. """ self.set_event_id(None) def _parse_text(self, text, encoding): # parse text if is list, tuple or set instance if isinstance(text, (list, tuple, set)): for item in text: if isinstance(item, bytes): item = item.decode(encoding) for subitem in item.splitlines(): yield subitem else: if isinstance(text, bytes): text = text.decode(encoding) for item in text.splitlines(): yield item def add_message(self, event, text, encoding='utf-8'): """ Add messaget with eventname to the buffer. :param str event: event name :param str/list text: event content. Must be a str or list of str :param bool split: splits str content by lines. default(true) """ self._buffer.append("event: {0}\n".format(event)) for text_item in self._parse_text(text, encoding): self._buffer.append("data: {0}\n".format(text_item)) self._buffer.append("\n") def __str__(self): return self.__unicode__() def __unicode__(self): return "".join(self._buffer) def flush(self): """ Reset the internal buffer to initial state. """ self._buffer = [] def __iter__(self): for item in self._buffer: yield item self.flush() class SseStream(): __queues = {} __maxConnections = 10 __queue_size = 50 @staticmethod def setMaxConnections(maxConnections = 10): """ Set maximum number of concurrent connections --- Paremeters: ------ maxConnections: int, optional The number of maximum concurrent connections ( default value is 10) """ SseStream.__maxConnections = maxConnections @staticmethod def setQueueSize(queueSize = 50): """ Set the queue size for stored messages --- Parementers ------ queueSize: int, optional The number of stored messages in the queue, set to <=0 for unlimited size ( default value is 50 ) """ SseStream.__queue_size = queueSize @staticmethod def addMessage(event_name, data, id = 0): """ Adds a event to the queue so it can be sent out --- Parameters: ------ event_name: str Name of the event data: string or list of strings Datat to be send id: int id of the event, if set to 0 no ID will be sent """ event = Sse() if id > 0: event.set_event_id(id) else: event.reset_event_id() event.add_message( event_name, data ) for k, v in SseStream.__queues.items(): v.put( event ) def __init__(self, id:uuid): """ Init a new stream --- Paremeters: id: uuid ------ """ self._id = id print() print() print(f'New SSE Stream with ID: {id}') if len( self.__queues ) >= self.__maxConnections: raise TooManyConnections if not self.__queues.get( id ): self._queue = queue.Queue( self.__queue_size ) self.__queues[ id ] = self._queue def __del__(self): """ Deregister client from queue - clean up """ try: del self.__queues[ self._id ] except KeyError: pass def __iter__(self): while True: msg = self._queue.get(block=True, timeout=None) if isinstance( msg, Sse ): for data in msg: yield data.encode('utf-8') def getStatus(self): status = {} status['currentConnections'] = len( self.__queues ) status['maxConnections'] = self.__maxConnections status['id'] = str(self._id) return status def main(): SseStream.setMaxConnections(5) SseStream.setQueueSize(25) print("Starting main...") import threading def filler(): counter = 0 import time while True: counter += 1 time.sleep(0.75) SseStream.addMessage("Ping", str(time.time()), id = counter) t = threading.Thread( target=filler ) t.start() from flask import Flask, current_app, request app = Flask(__name__) app.debug = True import json @app.route('/stream') def stream(): if request.accept_mimetypes['text/event-stream'] == 1: stream = SseStream(uuid.uuid4()) SseStream.addMessage("Status", json.dumps( stream.getStatus() ) ) return current_app.response_class( stream, mimetype='text/event-stream' ) else: return '''
Below a list of messages