Как реализовать push-сервер во фреймворке Flask?

Я пытаюсь создать небольшой сайт с функцией push-уведомления на сервер в микро-веб-фреймворке Flask, но я не знал, есть ли фреймворк для непосредственной работы.

Я использовал Juggernaut, но, похоже, он не работает с redis-py в текущей версии, а Juggernaut недавно устарел.

Есть ли у кого-нибудь предложения по моему делу?


person little-eyes    schedule 01.09.2012    source источник


Ответы (4)


Взгляните на События, отправленные сервером. Server-Sent Events - это API-интерфейс браузера, который позволяет вам держать открытым сокет для вашего сервера, подписываясь на поток обновлений. Для получения дополнительной информации прочтите сообщение Алекса Маккоу (автора Juggernaut) о почему он убивает джаггернаута и почему более простые события, отправленные сервером, в большинстве случаев являются лучшим инструментом для работы, чем веб-узлы.

Протокол действительно прост. Просто добавьте в ответ mimetype text/event-stream. Браузер будет поддерживать соединение открытым и прислушиваться к обновлениям. Событие, отправленное с сервера, представляет собой строку текста, начинающуюся с data: и следующей за ним новой строки.

data: this is a simple message
<blank line>

Если вы хотите обмениваться структурированными данными, просто выгрузите свои данные как json и отправьте json по сети.

Преимущество заключается в том, что вы можете использовать SSE во Flask без необходимости в дополнительном сервере. На github есть простой пример приложения чата, который использует redis в качестве базы данных pub / sub.

def event_stream():
    pubsub = red.pubsub()
    pubsub.subscribe('chat')
    for message in pubsub.listen():
        print message
        yield 'data: %s\n\n' % message['data']


@app.route('/post', methods=['POST'])
def post():
    message = flask.request.form['message']
    user = flask.session.get('user', 'anonymous')
    now = datetime.datetime.now().replace(microsecond=0).time()
    red.publish('chat', u'[%s] %s: %s' % (now.isoformat(), user, message))


@app.route('/stream')
def stream():
    return flask.Response(event_stream(),
                          mimetype="text/event-stream")

Вам не нужно использовать gunicron для запуска примера приложения. Просто убедитесь, что вы используете потоки при запуске приложения, потому что в противном случае соединение SSE заблокирует ваш сервер разработки:

if __name__ == '__main__':
    app.debug = True
    app.run(threaded=True)

На стороне клиента вам просто нужна функция обработчика Javascript, которая будет вызываться при отправке нового сообщения с сервера.

var source = new EventSource('/stream');
source.onmessage = function (event) {
     alert(event.data);
};

Отправленные сервером события поддерживаются последними версиями браузеров Firefox, Chrome и Safari. Internet Explorer еще не поддерживает отправленные сервером события, но ожидается, что он будет поддерживать их в версии 10. Есть два рекомендуемых полифилла для поддержки старых браузеров.

person Peter Hoffmann    schedule 02.09.2012
comment
Привет @PeterSmith, я пробовал этот подход, однако предупреждение (event.data) никогда не появляется. Я запускаю свое приложение Flask через порт 8000 и нажимаю на порт 8001. Поэтому я помещаю var source = new EventSource ('localhost : 8001 / push '); а в приложении Flask есть страница, на которой пользователь может что-то опубликовать. Сообщение транслируется и принимается всеми другими пользователями. У тебя есть идеи? - person little-eyes; 04.09.2012
comment
Почему вы запускаете push на другой порт? Одна из причин использования SSE заключается в том, что он работает в вашем приложении через обычный http. как вы запускали свои флеш-приложения? через сервер разработки? Вы добавили thread = True? Какой браузер вы используете? - person Peter Hoffmann; 04.09.2012
comment
Привет @PeterHoffmann, я использую торнадо с Flask. Возможно ли, что я соединю экземпляр приложения и экземпляр толкателя в торнадо? Скажите другой хендлер? Стоит ли еще ставить многопоточность? - person little-eyes; 04.09.2012
comment
Есть ли один поток для каждого подключенного клиента для обработки своего потока событий? Это похоже на длинный опрос. Если да, это не масштабируется. Для меня вопрос о маленьких глазках имеет смысл. - person chmike; 10.12.2012
comment
Это будет масштабироваться при использовании gevent + monkeypatch - person Ilmo Euro; 16.04.2013
comment
Как вы действительно можете быть уверены, что Flask закрывает соединение? Если я часто перезагружаю страницу, я получаю кучу устаревших соединений, и когда я нажимаю Ctrl-C, приложение фляги по-прежнему обслуживает запросы, потому что соединения открыты: - / - person flexd; 22.05.2013
comment
Внимание, у меня был включен ProfilerMiddleware, и он не работал, поэтому не используйте его. - person Adam; 16.09.2017
comment
@PeterHoffmann - знаете ли вы, как обрабатывать отключенного клиента, как указано в коде, на который вы ссылаетесь? Вроде как обрабатывается автоматически, но я не уверен. - person Shuzheng; 01.02.2021

Redis избыточен: используйте события, отправленные сервером (SSE)

Поздно к вечеринке (как обычно), но, IMHO, использование Redis может быть излишним.

Пока вы работаете в Python + Flask, рассмотрите возможность использования функций генератора, как описано в отличная статья Панисуана Джо Часинга. Суть этого:

В вашем клиенте index.html

var targetContainer = document.getElementById("target_div");
var eventSource = new EventSource("/stream")
  eventSource.onmessage = function(e) {
  targetContainer.innerHTML = e.data;
};
...
<div id="target_div">Watch this space...</div>

На вашем сервере Flask:

def get_message():
    '''this could be any function that blocks until data is ready'''
    time.sleep(1.0)
    s = time.ctime(time.time())
    return s

@app.route('/')
def root():
    return render_template('index.html')

@app.route('/stream')
def stream():
    def eventStream():
        while True:
            # wait for source data to be available, then push it
            yield 'data: {}\n\n'.format(get_message())
    return Response(eventStream(), mimetype="text/event-stream")
person fearless_fool    schedule 22.08.2018
comment
Когда я пытаюсь использовать спящий режим в функции get_message, поток не достигает браузера. Без сна он работает нормально, но я не хочу, чтобы столько сообщений за 1 секунду. Итак, у вас есть идеи, почему это не работает, когда в функции есть сон? - person Deepak Banka; 03.07.2019
comment
@DeepakBanka: Код сработал у меня. Можете ли вы поместить print () в get_message (), чтобы убедиться, что он вызывается так, как вы ожидаете? И использовать инструменты разработчика вашего браузера для отслеживания трафика между сервером и браузером? Это может дать некоторую информацию ... - person fearless_fool; 04.07.2019
comment
@fearleaa_fool есть опасения, что синхронные ограничения python помешают этому масштабироваться. Я знаю, что в django аналогичная интеграция sse / websockets требует отдельных асинхронных серверов. - person JZL003; 12.01.2020
comment
@ JZL003: Я использовал Flask только для микросервисов с ограниченным количеством клиентов. Я передам слово тому, кто работал с серверами Flask в приложениях с высокой степенью параллелизма ... - person fearless_fool; 13.01.2020

В продолжение ответа @ peter-hoffmann я написал расширение Flask специально для обработки событий, отправленных сервером. . Он называется Flask-SSE, и это доступно на PyPI. Чтобы установить его, запустите:

$ pip install flask-sse

Вы можете использовать это так:

from flask import Flask
from flask_sse import sse

app = Flask(__name__)
app.config["REDIS_URL"] = "redis://localhost"
app.register_blueprint(sse, url_prefix='/stream')

@app.route('/send')
def send_message():
    sse.publish({"message": "Hello!"}, type='greeting')
    return "Message sent!"

А для подключения к потоку событий из Javascript он работает так:

var source = new EventSource("{{ url_for('sse.stream') }}");
source.addEventListener('greeting', function(event) {
    var data = JSON.parse(event.data);
    // do what you want with this data
}, false);

Документация доступна на ReadTheDocs. Обратите внимание, что вам понадобится работающий Redis сервер для обработки pub / sub.

person singingwolfboy    schedule 19.04.2016
comment
@ManuelGodoy: /stream - person singingwolfboy; 30.11.2019
comment
@ManuelGodoy В прошлом году я бросил библиотеку, потому что не мог этого понять. Спасибо за вопрос. - person vincent; 20.05.2020

Как участник https://github.com/WolfgangFahl/pyFlaskBootstrap4, я столкнулся с той же потребностью и создал план фляги для событий, отправленных сервером, который не зависит от redis.

Это решение основано на других ответах, которые были даны здесь в прошлом.

https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/fb4/sse_bp.py имеет исходный код (см. также sse_bp.py ниже).

Модульные тесты можно найти на https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/tests/test_sse.py

Идея состоит в том, что вы можете использовать разные режимы для создания потока SSE:

  • предоставляя функцию
  • предоставив генератор
  • с помощью вспомогательного класса PubSub
  • используя вспомогательный класс PubSub и одновременно используя pydispatch.

По состоянию на 12 февраля 2021 года это альфа-код, которым я все же хочу поделиться. Прокомментируйте, пожалуйста, здесь или как проблемы в проекте.

На http://fb4demo.bitplan.com/events есть демонстрация и описание примера. использовать, например, для индикатора выполнения или отображения времени по адресу: http://wiki.bitplan.com/index.php/PyFlaskBootstrap4#Server_Sent_Events

пример клиентского javascript / html кода

<div id="event_div">Watch this space...</div>
<script>
    function fillContainerFromSSE(id,url) {
        var targetContainer = document.getElementById(id);
        var eventSource = new EventSource(url)
        eventSource.onmessage = function(e) {
            targetContainer.innerHTML = e.data;
        };
    };
    fillContainerFromSSE("event_div","/eventfeed");
</script>

пример кода на стороне сервера


def getTimeEvent(self):
        '''
        get the next time stamp
        '''
        time.sleep(1.0)
        s=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        return s   

def eventFeed(self):
        '''
        create a Server Sent Event Feed
        '''
        sse=self.sseBluePrint
        # stream from the given function
        return sse.streamFunc(self.getTimeEvent)

sse_bp.py

'''
Created on 2021-02-06
@author: wf
'''
from flask import Blueprint, Response, request, abort,stream_with_context
from queue import Queue
from pydispatch import dispatcher
import logging

class SSE_BluePrint(object):
    '''
    a blueprint for server side events 
    '''
    def __init__(self,app,name:str,template_folder:str=None,debug=False,withContext=False):
        '''
        Constructor
        '''
        self.name=name
        self.debug=debug
        self.withContext=False
        if template_folder is not None:
            self.template_folder=template_folder
        else:
            self.template_folder='templates'    
        self.blueprint=Blueprint(name,__name__,template_folder=self.template_folder)
        self.app=app
        app.register_blueprint(self.blueprint)
        
        @self.app.route('/sse/<channel>')
        def subscribe(channel):
            def events():
                PubSub.subscribe(channel)
            self.stream(events)
                
    def streamSSE(self,ssegenerator): 
        '''
        stream the Server Sent Events for the given SSE generator
        '''  
        response=None
        if self.withContext:
            if request.headers.get('accept') == 'text/event-stream':
                response=Response(stream_with_context(ssegenerator), content_type='text/event-stream')
            else:
                response=abort(404)    
        else:
            response= Response(ssegenerator, content_type='text/event-stream')
        return response
        
    def streamGen(self,gen):
        '''
        stream the results of the given generator
        '''
        ssegen=self.generateSSE(gen)
        return self.streamSSE(ssegen)   
            
    def streamFunc(self,func,limit=-1):
        '''
        stream a generator based on the given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            an SSE Response stream
        '''
        gen=self.generate(func,limit)
        return self.streamGen(gen)
                
    def generate(self,func,limit=-1):
        '''
        create a SSE generator from a given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            a generator for the function
        '''   
        count=0
        while limit==-1 or count<limit:
            # wait for source data to be available, then push it
            count+=1
            result=func()
            yield result
        
    def generateSSE(self,gen):
        for result in gen:
            yield 'data: {}\n\n'.format(result)
            
    def enableDebug(self,debug:bool):
        '''
        set my debugging
        
        Args:
            debug(bool): True if debugging should be switched on
        '''
        self.debug=debug
        if self.debug:
            logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03d %(levelname)s:\t%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
            
    def publish(self, message:str, channel:str='sse', debug=False):
        """
        Publish data as a server-sent event.
        
        Args:
            message(str): the message to send
            channel(str): If you want to direct different events to different
                clients, you may specify a channel for this event to go to.
                Only clients listening to the same channel will receive this event.
                Defaults to "sse".
            debug(bool): if True  enable debugging
        """
        return PubSub.publish(channel=channel, message=message,debug=debug)

    def subscribe(self,channel,limit=-1,debug=False):
        def stream():
            for message in PubSub.subscribe(channel,limit,debug=debug):
                yield str(message)
                
        return self.streamGen(stream)
    
class PubSub:
    '''
    redis pubsub duck replacement
    '''
    pubSubByChannel={}
    
    def __init__(self,channel:str='sse',maxsize:int=15, debug=False,dispatch=False):
        '''
        Args:
            channel(string): the channel name
            maxsize(int): the maximum size of the queue
            debug(bool): whether debugging should be switched on
            dispatch(bool): if true use the pydispatch library - otherwise only a queue
        '''
        self.channel=channel
        self.queue=Queue(maxsize=maxsize)
        self.debug=debug
        self.receiveCount=0
        self.dispatch=False
        if dispatch:
            dispatcher.connect(self.receive,signal=channel,sender=dispatcher.Any)
        
    @staticmethod
    def reinit():
        '''
        reinitialize the pubSubByChannel dict
        '''
        PubSub.pubSubByChannel={}
        
    @staticmethod
    def forChannel(channel):    
        '''
        return a PubSub for the given channel
        
        Args:
            channel(str): the id of the channel
        Returns:
            PubSub: the PubSub for the given channel
        '''
        if channel in PubSub.pubSubByChannel:
            pubsub=PubSub.pubSubByChannel[channel]
        else:
            pubsub=PubSub(channel)
            PubSub.pubSubByChannel[channel]=pubsub
        return pubsub
    
    @staticmethod    
    def publish(channel:str,message:str,debug=False):
        '''
        publish a message via the given channel
        
        Args:
            channel(str): the id of the channel to use
            message(str): the message to publish/send
        Returns:
            PubSub: the pub sub for the channel
            
        '''
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        pubsub.send(message)
        return pubsub
        
    @staticmethod    
    def subscribe(channel,limit=-1,debug=False): 
        '''
        subscribe to the given channel
        
        Args:
            channel(str): the id of the channel to use
            limit(int): limit the maximum amount of messages to be received        
            debug(bool): if True debugging info is printed
        '''  
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        return pubsub.listen(limit)
    
    def send(self,message):
        '''
        send the given message
        '''
        sender=object();
        if self.dispatch:
            dispatcher.send(signal=self.channel,sender=sender,msg=message)
        else:
            self.receive(sender,message)
        
    def receive(self,sender,message):
        '''
        receive a message
        '''
        if sender is not None:
            self.receiveCount+=1;
            if self.debug:
                logging.debug("received %d:%s" % (self.receiveCount,message))
            self.queue.put(message)
        
    def listen(self,limit=-1):
        '''
        listen to my channel
        
        this is a generator for the queue content of received messages
        
        Args:
            limit(int): limit the maximum amount of messages to be received
        
        Return:
            generator: received messages to be yielded
        '''
        if limit>0 and self.receiveCount>limit:
            return
        yield self.queue.get()
    
    def unsubscribe(self):
        '''
        unsubscribe me
        '''
        if self.dispatch:
            dispatcher.disconnect(self.receive, signal=self.channel)
        pass
person Wolfgang Fahl    schedule 12.02.2021