Как обновить значение tcp-сервера pymodbus в соответствии с сообщением, подписанным zmq?

Я новичок. Мой текущий проект заключается в том, что когда текущий конец решит запустить службу Modbus, я создам процесс для службы Modbus. Затем значение получается в родительском процессе, через ZeroMQ PUB/SUB для передачи значения, теперь я хочу обновить значение регистра modbus в сервисном процессе modbus.

Я попробовал метод, упомянутый pymodbus, предоставленный updating_server.py и twisted.internet.task.LoopingCall() для обновления значения регистра, но это сделает невозможным подключение клиента к моему серверу. Я не знаю, почему?

Используйте LoopingCall() для установки сервера, журнал при подключении клиента.

/home/xiaohe/Pictures/Selection_050.png

Затем я попытался поставить и загрузку, и startTCPserver в асинхронный цикл, но обновление вводилось только в первый раз после запуска, а затем не вводилось.

В настоящее время я использую LoopingCall() для обработки обновлений, но я не думаю, что это хороший способ.

Это код, которым я инициализировал PUB и все теги, которые могут читать тег.

from loop import cycle
import asyncio
from multiprocessing import Process
from persistence import models as pmodels
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
from zmq.asyncio import Context
from common import logging
from server.modbustcp import i3ot_tcp as sertcp
import common.config as cfg
import communication.admin as ca
import json
import os
import signal
from datetime import datetime
from server.opcuaserver import i3ot_opc as seropc

async def main():
    future = []
    task = []
    global readers, readers_old, task_flag
    logger.debug("connecting to database and create table.")
    pmodels.connect_create()
    logger.debug("init read all address to create loop task.")
    cycle.init_readers(readers)
    ctx = Context()
    publisher = ctx.socket(zmq.PUB)
    logger.debug("init publish [%s].", addrs)
    publisher.bind(addrs)
    readers_old = readers.copy()
    for reader in readers:
        task.append(asyncio.ensure_future(
            cycle.run_readers(readers[reader], publisher)))
    if not len(task):
        task_flag = True
    logger.debug("task length [%s - %s].", len(task), task)
    opcua_server = LocalServer(seropc.opc_server, "opcua")
    future = [
        start_get_all_address(),
        start_api(),
        create_address_loop(publisher, task),
        modbus_server(),
        opcua_server.run()
    ]
    logger.debug("run loop...")
    await asyncio.gather(*future)

asyncio.run(main(), debug=False)

Это нужно, чтобы получить значение тега устройства и опубликовать его.

async def run_readers(reader, publisher):
    while True:
        await reader.run(publisher)


class DataReader:
    def __init__(self, freq, clients):
        self._addresses = []
        self._frequency = freq
        self._stop_signal = False
        self._clients = clients
        self.signature = sign_data_reader(self._addresses)

    async def run(self, publisher):
        while not self._stop_signal:
            for addr in self._addresses:
                await addr.read()
                data = {
                    "type": "value",
                    "data": addr._final_value
                }
                publisher.send_pyobj(data)
                if addr._status:
                    if addr.alarm_log:
                        return_alarm_log = pbasic.get_log_by_time(addr.alarm_log['date'])
                        if return_alarm_log:
                            data = {
                                "type": "alarm",
                                "data": return_alarm_log
                            }
                            publisher.send_pyobj(data)
                    self.data_send(addr)
                    logger.debug("run send data")
            await asyncio.sleep(int(self._frequency))

    def stop(self):
        self._stop_signal = True

импорт сервера Modbus

from common import logging
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
import common.config as cfg
import struct
import os
import signal
from datetime import datetime
from twisted.internet.task import LoopingCall
def updating_writer(a):
    logger.info("in updates of modbus tcp server.")
    context = a[0]
    # while True:
    if check_pid(os.getppid()) is False:
        os.kill(os.getpid(), signal.SIGKILL)
    url = ("ipc://{}" .format(cfg.get('ipc', 'pubsub')))
    logger.debug("connecting to [%s].", url)
    ctx = zmq.Context()
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect(url)
    subscriber.setsockopt(zmq.SUBSCRIBE, b"")
    slave_id = 0x00
    msg = subscriber.recv_pyobj()
    logger.debug("updates.")
    if msg['data']['data_type'] in modbus_server_type and msg['type'] == 'value':
        addr = pservice.get_mbaddress_to_write_value(msg['data']['id'])
        if addr:
            logger.debug(
                "local address and length [%s - %s].",
                addr['local_address'], addr['length'])
            values = get_value_by_type(msg['data']['data_type'], msg['data']['final'])
            logger.debug("modbus server updates values [%s].", values)
            register = get_register(addr['type'])
            logger.debug(
                "register [%d] local address [%d] and value [%s].",
                register, addr['local_address'], values)
            context[slave_id].setValues(register, addr['local_address'], values)
        # time.sleep(1)
def tcp_server(pid):
    logger.info("Get server configure and device's tags.")
    st = datetime.now()
    data = get_servie_and_all_tags()
    if data:
        logger.debug("register address space.")
        register_address_space(data)
    else:
        logger.debug("no data to create address space.")

    length = register_number()
    store = ModbusSlaveContext(
        di=ModbusSequentialDataBlock(0, [0] * length),
        co=ModbusSequentialDataBlock(0, [0] * length),
        hr=ModbusSequentialDataBlock(0, [0] * length),
        ir=ModbusSequentialDataBlock(0, [0] * length)
    )
    context = ModbusServerContext(slaves=store, single=True)

    identity = ModbusDeviceIdentification()
    identity.VendorName = 'pymodbus'
    identity.ProductCode = 'PM'
    identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
    identity.ProductName = 'pymodbus Server'
    identity.ModelName = 'pymodbus Server'
    identity.MajorMinorRevision = '2.2.0'

    # ----------------------------------------------------------------------- #
    # set loop call and run server
    # ----------------------------------------------------------------------- #

    try:
        logger.debug("thread start.")
        loop = LoopingCall(updating_writer, (context, ))
        loop.start(1, now=False)

        # process = Process(target=updating_writer, args=(context, os.getpid(),))
        # process.start()

        address = (data['tcp_ip'], int(data['tcp_port']))
        nt = datetime.now() - st
        logger.info("modbus tcp server begin has used [%s] s.", nt.seconds)
        pservice.write_server_status_by_type('modbus', 'running')
        StartTcpServer(context, identity=identity, address=address)
    except Exception as e:
        logger.debug("modbus server start error [%s].", e)
        pservice.write_server_status_by_type('modbus', 'closed')

Это код, который я создал для процесса Modbus.

def process_stop(p_to_stop):
    global ptcp_flag
    pid = p_to_stop.pid
    os.kill(pid, signal.SIGKILL)
    logger.debug("process has closed.")
    ptcp_flag = False


def ptcp_create():
    global ptcp_flag
    pid = os.getpid()
    logger.debug("sentry pid [%s].", pid)
    ptcp = Process(target=sertcp.tcp_server, args=(pid,))
    ptcp_flag = True
    return ptcp


async def modbus_server():
    logger.debug("get mosbuc server's status.")
    global ptcp_flag
    name = 'modbus'
    while True:
        ser = pservice.get_server_status_by_name(name)
        if ser['enabled']:
            if ser['tcp_status'] == 'closed' or ser['tcp_status'] == 'running':
                tags = pbasic.get_tag_by_name(name)
                if len(tags):
                    if ptcp_flag is False:
                        logger.debug("[%s] status [%s].", ser['tcp_name'], ptcp_flag)
                        ptcp = ptcp_create()
                        ptcp.start()
                    else:
                        logger.debug("modbus server is running ...")
                else:
                    logger.debug("no address to create [%s] server.", ser['tcp_name'])
                    pservice.write_server_status_by_type(name, "closed")
            else:
                logger.debug("[%s] server is running ...", name)
        else:
            if ptcp_flag:
                process_stop(ptcp)
                logger.debug("[%s] has been closed.", ser['tcp_name'])
                pservice.write_server_status_by_type(name, "closed")
            logger.debug("[%s] server not allowed to running.", name)
        await asyncio.sleep(5)

Это команда, которую запускает Docker.

/usr/bin/docker run --privileged --network host --name scout-sentry -v /etc/scout.cfg:/etc/scout.cfg -v /var/run:/var/run -v /sys:/sys -v /dev/mem:/dev/mem -v /var/lib/scout:/data --rm shulian/scout-sentry

Это файл конфигурации Docker /etc/scout.cfg.

[scout]
mode=product

[logging]
level=DEBUG

[db]
path=/data

[ipc]
cs=/var/run/scout-cs.sock
pubsub=/var/run/pubsub.sock

Я хочу иметь возможность запускать функцию обновления значения Modbus, когда от ZeroMQ приходит сообщение, и оно будет обновляться правильно.


person fish    schedule 28.08.2019    source источник
comment
Ваша проблема не сформулирована с помощью MCVE-кода и, помимо неполного кода, она невоспроизводима, поэтому с ней трудно помочь. Предоставляются ли вашему процессу права записи в целевом каталоге config-setup, где транспортный класс ipc:// собирается установить соединение ZeroMQ? Является ли целевое расположение AccessNode управляемым конфигурацией абсолютным путем /some/loc/in/fs-tree/with/write-rights или @-абстрактным путем? Он уже существует до вызова?   -  person user3666197    schedule 28.08.2019
comment
@ user3666197 Спасибо за ответ! Мой проект работает в докере, и соответствующие файлы отображаются в контейнере. Определено, что PUB/SUB zmq может обмениваться данными, а значение регистра хранения, считанное на картинке, является обновлением zmq SUB.   -  person fish    schedule 28.08.2019
comment
Вы уверены, что в вашем Docker-контейнере есть конфигурация для передачи межплатформенного взаимодействия с использованием транспортного класса ipc://? Не могли бы вы проверить ту же идею с транспортным классом tcp://, используемым в первую очередь для проверки сквозной видимости?   -  person user3666197    schedule 28.08.2019
comment
Я уверен, что смог нормально общаться.   -  person fish    schedule 29.08.2019
comment
Я уверен, что до сих пор не видел ни одной PUB-стороны для правильного общения. MCVE-код неполный (также отсутствует явная копия всех импортов - например, from twisted.internet.task import LoopingCall). Сообщество StackOverflow основано на отличной формулировке вопросов, чтобы другие могли воспроизвести вашу проблему и решить ее, не тратя огромное количество времени на поиск и повторный сбор фрагментов информации, которые отсутствовали в формулировке проблемы.   -  person user3666197    schedule 29.08.2019
comment
Мне очень жаль. Все импорты и PUB были обновлены. PUB находится в разделе родительского процесса, потому что PUB и SUB успешно общались, поэтому я его не выставлял. Конфигурация сервера Modbus считывается из базы данных, а адрес Modbus тега — это адрес регистра, автоматически выделяемый в зависимости от того, добавлен ли он в сервер Modbus.   -  person fish    schedule 29.08.2019
comment
Я правильно понял, что PUB-AccessNode(s?) и SUB-AccessNode находятся на одной стороне горизонта абстракции контейнера Docker?   -  person user3666197    schedule 29.08.2019
comment
Я не уверен, что понял, что вы сказали, но я поставил команду запуска docker и файл конфигурации, который использовал. Спасибо.   -  person fish    schedule 29.08.2019


Ответы (1)


Начнем изнутри наружу.

В : ...это сделает невозможным подключение клиента к моему серверу. Я не знаю, почему?

ZeroMQ — это интеллектуальное промежуточное ПО для обмена сообщениями/сигналами без посредников или, что еще лучше, платформа для интеллектуального обмена сообщениями. Если вы чувствуете, что не так хорошо знакомы с искусством Zen-of-Zero, как с архитектурой ZeroMQ, вы можете начать с Принципы ZeroMQ менее чем за пять секунд прежде чем углубляться в подробности.


Основа :

Архетип масштабируемой формальной коммуникации, заимствованный из ZeroMQ PUB/SUB, не требует нулевой стоимости.

Это означает, что каждая настройка инфраструктуры (как на стороне PUB, так и на стороне SUB) занимает довольно значительное время, и никто не может быть уверен, когда настройка AccessNode приведет к состоянию RTO. Таким образом, сторона SUB (как предложено выше) должна быть либо постоянной сущностью, либо пользователь не должен рассчитывать на то, что он сделает ее RTO в нулевое время после восстановления twisted.internet.task.LoopingCall().

Предпочтительный способ: создайте экземпляр своего (полу-)постоянного zmq.Context(), настройте его так, чтобы он обслуживал <aContextInstance>.socket( zmq.PUB ) по мере необходимости. экзосистема подвергает ваш код (белый список и безопасное определение размера и защита ресурсов являются наиболее вероятными кандидатами, но детали связаны с вашим доменом приложения и рисками, с которыми вы готовы столкнуться, будучи готовыми справиться с ними).

ZeroMQ настоятельно не рекомендует совместно использовать (zero-sharing) <aContextInstance>.socket()-экземпляры, однако zmq.Context()-экземпляр можно совместно использовать/повторно использовать (см. ZeroMQ Principles... ) / передается более чем одному потоку ( если нужно ).

Все методы <aSocketInstance>{.bind()|.connect()}- являются дорогостоящими, поэтому постарайтесь настроить точки доступа к инфраструктуре и их надлежащий способ обработки ошибок, прежде чем пытаться использовать опосредованные ими службы связи.

Каждый <aSocketInstance>.setsockopt( zmq.SUBSCRIBE, ... ) дорогой в том смысле, что он может принимать (в зависимости от (локальной/удаленной) версии) форму нелокального, распределенного поведения - локальная сторона "устанавливает" подписку, а удаленная сторона должен быть «информирован» о таком изменении состояния и «реализует» операции в соответствии с фактическим (распространяемым) состоянием. В то время как в более ранних версиях все сообщения отправлялись со стороны PUB, а все SUB-стороны заполнялись такими данными и оставлялись для «фильтрации», которая будут перемещены во внутреннюю очередь на локальной стороне, более новые версии «реализуют» Тематический фильтр на стороне PUB, что еще больше увеличивает задержку установки нового метода работы в действии.


Далее идет modus-operandi: как <aSocketInstance>.recv() получает результаты:

В своем состоянии API по умолчанию .recv()-методы блокируют, потенциально бесконечно блокируют, если не приходят сообщения.

Решение: избегайте блокирующих форм вызова <aSocket>.recv()-методов ZeroMQ, всегда используя их zmq.NOBLOCK-режимы, или, скорее, проверяйте наличие или отсутствие любых ожидаемых сообщений с помощью доступных <aSocket>.poll( zmq.POLLIN, <timeout> )-методов с нулевым или контролируемые тайм-ауты. Это делает вас мастером, который принимает решение о потоке выполнения кода. Не делая этого, вы сознательно позволяете своему коду зависеть от внешней последовательности (или отсутствия) событий, а ваша архитектура склонна к ужасным проблемам с обработкой бесконечных состояний блокировки (или потенциально неустранимых распределенных блокировок поведения многих агентов или блокировок). )

Избегайте неконтролируемого скрещивания циклов событий — например, передачи циклов, управляемых ZeroMQ, во внешний обработчик, похожий на «обратный вызов», или кодовых блоков, украшенных async, где стек (не)блокирующей логики может разрушить первоначальную идею. просто переводя систему в неразрешимое состояние, когда события пропускают ожидаемую последовательность событий, а живые блокировки не подлежат восстановлению или происходит только первый проход.

Объединение asyncio-кода с twisted-LoopingCall()- и async/await-декорированным кодом + ZeroMQ блокировка .recv()-s является либо Частич-оф-Филлигрейн-Точного-Искусства-Истинно-Дзен-Мастера , или верный билет в ад - при всем уважении к Искусству-Истинно-Дзен-Мастеров :о)

Итак, да, необходимо комплексное мышление — добро пожаловать в мир распределенных вычислений!

person user3666197    schedule 29.08.2019