Проблема кластера узлов с использованием Socket.io и Redis

Хорошо, у меня есть экспресс-API, где у меня также есть socket.io, работающий для получения/отправки событий в реальном времени... все работает просто отлично. Мне нужно кластеризовать мое приложение. Я настроил все на основе приведенного ниже кода. Я раскручиваю воркеры, они получают коннекты и все работает, кроме того, что теперь я не могу "взорвать" все коннекты socket.io. Вот установка (взято из этого):

var express = require('express'),
    cluster = require('cluster'),
    net = require('net'),
    sio = require('socket.io'),
    sio_redis = require('socket.io-redis');

var port = 3000,
    num_processes = require('os').cpus().length;

if (cluster.isMaster) {
    // This stores our workers. We need to keep them to be able to reference
    // them based on source IP address. It's also useful for auto-restart,
    // for example.
    var workers = [];

    // Helper function for spawning worker at index 'i'.
    var spawn = function(i) {
        workers[i] = cluster.fork();

        // Optional: Restart worker on exit
        workers[i].on('exit', function(worker, code, signal) {
            console.log('respawning worker', i);
            spawn(i);
        });
    };

    // Spawn workers.
    for (var i = 0; i < num_processes; i++) {
        spawn(i);
    }

    // Helper function for getting a worker index based on IP address.
    // This is a hot path so it should be really fast. The way it works
    // is by converting the IP address to a number by removing the dots,
    // then compressing it to the number of slots we have.
    //
    // Compared against "real" hashing (from the sticky-session code) and
    // "real" IP number conversion, this function is on par in terms of
    // worker index distribution only much faster.
    var workerIndex = function (ip, len) {
    var _ip = ip.split(/['.'|':']/),
        arr = [];

    for (el in _ip) {
        if (_ip[el] == '') {
            arr.push(0);
        }
        else {
            arr.push(parseInt(_ip[el], 16));
        }
    }

    return Number(arr.join('')) % len;
}

    // Create the outside facing server listening on our port.
    var server = net.createServer({ pauseOnConnect: true }, function(connection) {
        // We received a connection and need to pass it to the appropriate
        // worker. Get the worker for this connection's source IP and pass
        // it the connection.
        var worker = workers[worker_index(connection.remoteAddress, num_processes)];
        worker.send('sticky-session:connection', connection);
    }).listen(port);
} else {
    // Note we don't use a port here because the master listens on it for us.
    var app = new express();

    // Here you might use middleware, attach routes, etc.

    // Don't expose our internal server to the outside.
    var server = app.listen(0, 'localhost'),
        io = sio(server);

    // Tell Socket.IO to use the redis adapter. By default, the redis
    // server is assumed to be on localhost:6379. You don't have to
    // specify them explicitly unless you want to change them.
    io.adapter(sio_redis({ host: 'localhost', port: 6379 }));

    // Here you might use Socket.IO middleware for authorization etc.

    // Listen to messages sent from the master. Ignore everything else.
    process.on('message', function(message, connection) {
        if (message !== 'sticky-session:connection') {
            return;
        }

        // Emulate a connection event on the server by emitting the
        // event with the connection the master sent us.
        server.emit('connection', connection);

        connection.resume();
    });
}

Итак, я подключаюсь с разных машин для проверки параллелизма, рабочие делают свое дело, и все хорошо, но когда я получаю соединение ввода-вывода, я регистрирую ОБЩЕЕ количество «подключенных», и это всегда 1 на экземпляр. Мне нужен способ сказать

allClusterForks.emit(stuff)

Я получаю соединение с правильным рабочим идентификатором, но «ВСЕ СОЕДИНЕНИЯ» всегда возвращает 1.

io.on('connection', function(socket) {
    console.log('Connected to worker %s', process.pid);
    console.log("Adapter ROOMS %s ", io.sockets.adapter.rooms);
    console.log("Adapter SIDS %s ", io.sockets.adapter.sids);
    console.log("SOCKETS CONNECTED %s ", Object.keys(io.sockets.connected).length);
});

Я вижу подписку/отмену подписки, поступающую с помощью Redis MONITOR

1454701383.188231 [0 127.0.0.1:63150] "subscribe" "socket.io#/#gXJscUUuVQGzsYJfAAAA#"
1454701419.130100 [0 127.0.0.1:63167] "subscribe" "socket.io#/#geYSvYSd5zASi7egAAAA#"
1454701433.842727 [0 127.0.0.1:63167] "unsubscribe" "socket.io#/#geYSvYSd5zASi7egAAAA#"
1454701444.630427 [0 127.0.0.1:63150] "unsubscribe" "socket.io#/#gXJscUUuVQGzsYJfAAAA#"

Это соединения с двух разных машин, я ожидаю, что с помощью адаптера socket io redis эти подписки будут поступать на одно и то же соединение redis, но они разные.

Я просто что-то пропустил? Для этого существует удивительное отсутствие документации/статей, которые не являются либо полностью устаревшими/неправильными/двусмысленными.

РЕДАКТИРОВАТЬ: Node v5.3.0 Redis v3.0.6 Socket.io v1.3.7


person Rob Bennet    schedule 05.02.2016    source источник


Ответы (1)


Итак, если кто-то столкнется с этим, я понял, что на самом деле «просмотр» количества подключенных сокетов в процессах - это не вещь, а трансляция или передача им. Так что я в основном просто "тестировал" без всякой причины. Все работает как положено. Я БУДУ переписывать адаптер socket.io-redis, чтобы он позволял проверять количество процессов.

Несколько лет назад был запрос на включение, чтобы реализовать поддержку того, что я пытался сделать. https://github.com/socketio/socket.io-redis/pull/15 и я мог бы попытаться очистить это и повторно отправить.

person Rob Bennet    schedule 05.02.2016