From e5921752ef2a5e48fe915f080a8c7c28d0c55e5a Mon Sep 17 00:00:00 2001 From: Markus Bauer Date: Mon, 27 Mar 2023 23:18:29 +0200 Subject: [PATCH] Improve server, move folder --- src/{ => server}/SimpleSseSever.py | 110 +++++++++++++++++------------ src/{ => server}/exceptions.py | 0 2 files changed, 66 insertions(+), 44 deletions(-) rename src/{ => server}/SimpleSseSever.py (77%) rename src/{ => server}/exceptions.py (100%) diff --git a/src/SimpleSseSever.py b/src/server/SimpleSseSever.py similarity index 77% rename from src/SimpleSseSever.py rename to src/server/SimpleSseSever.py index 5877026..1e5b681 100644 --- a/src/SimpleSseSever.py +++ b/src/server/SimpleSseSever.py @@ -1,10 +1,13 @@ import queue +import uuid from exceptions import TooManyConnections class Sse(object): - def __init__(self, default_retry=2000): + def __init__(self, id=None, retry=2000): + if id: + self.set_event_id( id ) self._buffer = [] - self.set_retry(default_retry) + self._retry = retry def set_retry(self, num): """ @@ -77,7 +80,6 @@ class Sse(object): yield item self.flush() - class SseStream(): __queues = {} __maxConnections = 10 @@ -103,35 +105,8 @@ class SseStream(): The number of stored messages in the queue, set to <=0 for unlimited size ( default value is 50 ) """ SseStream.__queue_size = queueSize - def __init__(self, id): - """ Init a new stream - --- - Paremeters: - ------ - id: string - The unique id of the client - """ - if len( self.__queues ) >= self.__maxConnections: - raise TooManyConnections - - if not self.__queues.get( id ): - self._queue = queue.Queue( self.__queue_size ) - self.__queues[ id ] = self._queue - - print(f"Queue size: {self.__queue_size}; Connections: { len( self.__queues ) }/{self.__maxConnections}; ID: {id}") - - def __del__(self): - """ Deregister client from queue - clean up """ - print( "Stream object is deleted") - - def __iter__(self): - while True: - msg = self._queue.get(block=True, timeout=None) - if isinstance( msg, Sse ): - for data in msg: - yield data.encode('utf-8') - - def addMessage(self, event_name, data, id = 0): + @staticmethod + def addMessage(event_name, data, id = 0): """ Adds a event to the queue so it can be sent out --- @@ -149,33 +124,80 @@ class SseStream(): else: event.reset_event_id() event.add_message( event_name, data ) - self._queue.put( event ) + for k, v in SseStream.__queues.items(): + v.put( event ) + def __init__(self, id:uuid): + """ Init a new stream + --- + Paremeters: + id: uuid + ------ + """ + self._id = id + print() + print() + print(f'New SSE Stream with ID: {id}') + if len( self.__queues ) >= self.__maxConnections: + raise TooManyConnections + if not self.__queues.get( id ): + self._queue = queue.Queue( self.__queue_size ) + self.__queues[ id ] = self._queue + print( self.__queues ) + + print(f"Queue size: {self.__queue_size}; Connections: { len( self.__queues ) }/{self.__maxConnections}; ID: {id}") + print() + print() + + def __del__(self): + """ Deregister client from queue - clean up """ + try: + del self.__queues[ self._id ] + print( f"Stream object {self._id} is deleted") + except KeyError: + print( f"Stream object {self._id} not found") + pass + + def __iter__(self): + while True: + msg = self._queue.get(block=True, timeout=None) + print(msg) + if isinstance( msg, Sse ): + for data in msg: + yield data.encode('utf-8') def main(): SseStream.setMaxConnections(5) SseStream.setQueueSize(25) - print("Main...") - teststream = SseStream("Test") + print("Starting main...") - for i in range(1, 10): - teststream.addMessage( "test", f'Message Nr: {i}', i ) - from flask import Flask, current_app, Blueprint + import threading + def filler(): + counter = 0 + import time + while True: + counter += 1 + time.sleep(5) + SseStream.addMessage("Ping", str(time.time()), id = counter) - sse = Blueprint('sse', __name__) - - @sse.route('') - def stream(): - return current_app.response_class( teststream, mimetype='text/event-stream' ) + t = threading.Thread( target=filler ) + t.start() + from flask import Flask, current_app app = Flask(__name__) app.debug = True - app.register_blueprint(sse, url_prefix='/stream') + + @app.route('/stream') + def stream(): + return current_app.response_class( SseStream(uuid.uuid4()), mimetype='text/event-stream' ) + @app.route('/') + def index(): + return "OK" app.run() diff --git a/src/exceptions.py b/src/server/exceptions.py similarity index 100% rename from src/exceptions.py rename to src/server/exceptions.py