Несколько геймпадов с Python Evdev?

Я нашел способ использовать геймпад на Python, используя модуль Evdev (см. ссылку в конце). В этом руководстве автор использует только один геймпад, но он также заявляет, что должна быть возможность использовать несколько геймпадов с кодом, основанным на следующем:

from evdev import InputDevice
from select import select
gamepad = InputDevice('/dev/input/event0')
while True:
    r,w,x = select([gamepad], [], [])
    for event in gamepad.read():
        print(event)

Select.select, похоже, ждет, пока не будет нажата кнопка, так что программа прерывается до тех пор, пока это не произойдет. Как я могу изменить код для использования нескольких геймпадов или для выполнения другого кода в ожидании нажатия кнопок? Или есть лучшая альтернатива использованию evdev в этом отношении?

http://ericgoebelbecker.com/2015/06/raspberry-pi-and-gamepad-programming-part-1-reading-the-device/


person bsteur    schedule 01.11.2017    source источник
comment
Просто совет: вы можете использовать SDL/SDL2, так как он имеет оболочку Python.   -  person rbaleksandar    schedule 01.11.2017
comment
Существует пример одновременного использования нескольких событийных устройств в документации.   -  person gvalkov    schedule 27.08.2018
comment
Вероятно, вам следует читать асинхронно, если вы не хотите блокировать. python-evdev. readthedocs.io/en/latest/   -  person Nick    schedule 03.06.2019


Ответы (2)


Как я могу изменить код для использования нескольких геймпадов или для выполнения другого кода в ожидании нажатия кнопок?

Ознакомьтесь с документацией для InputDevice.read.

read()
Чтение нескольких входных событий с устройства. Возвращает объект генератора, который дает InputEvent экземпляров. Повышает BlockingIOError, если в данный момент нет доступных событий.

Select будет блокироваться до тех пор, пока не будет доступно событие ввода. Вместо этого мы можем читать события, пока не получим BlockingIOError. А затем переходите к следующему геймпаду или выполняйте любую другую работу, которую необходимо выполнить в основном цикле.

Вы также можете использовать InputDevice.read_one

read_one()
Чтение и возврат одного входного события как экземпляра InputEvent.

Возвратите None, если нет ожидающих входных событий.

person nondebug    schedule 01.11.2017

У меня тоже была такая потребность (у меня два BT геймпада). Итак, я написал свой код, который делает:

  • Создает поток (threading.Thread()), который каждую секунду проверяет, подключены ли они к предустановленным путям событий /dev/input/event* (16 и 20 — значения по умолчанию, их можно изменить). Если он их не находит, он пробует следующее событие (например: 17/21). Если он находит один или оба, он соединяет их. Если нет, он продолжает проверять каждую секунду.
  • Для подключения более одного геймпада используется selector как документы говорят.

Вы можете увидеть мой код здесь, но я предлагаю вам перейти на мой Github для получения обновлений.

Click to expand
#!/usr/bin/env python3

import time, argparse, subprocess
try:
    from obs_api import client, consola, c2, c3, c, thread
except:
    from obs_api_no_obs import client, consola, c2, c3, c, thread
    import sys

from evdev import InputDevice, ecodes
from selectors import DefaultSelector, EVENT_READ

class Bt:
    def __init__(self):
        self.but = [307, 308, 305, 304, 315]
        self.gamepad1, self.gamepad2 = None, None
        self.selector = DefaultSelector()
        self.devices_list = list()
        self.devices_dict = dict()
        self.bt_on = True

    def bt_send_hat(self, path, que, val):
        client.send_message('/bt', [int(path[-2:]), que, val])
        c2(f'/bt, {int(path[-2:])}, {que}, {val}')
        if val == 0: self.devices_dict[path] = 'c'
        else:        self.devices_dict[path] = que

    def bt_send(self, path, que, val):
        client.send_message('/bt', [int(path[-2:]), que, val])
        c2(f'/bt, {int(path[-2:])}, {que}, {val}')

    def reconnect(self):
        device1 = '13:57:90:05:0E:31'
        device2 = '13:6E:0E:07:0E:31'
        ps1 = subprocess.Popen(['bluetoothctl', 'info', device1], stdout=subprocess.PIPE)
        ps2 = subprocess.Popen(['bluetoothctl', 'info', device2], stdout=subprocess.PIPE)
        stdout1 = subprocess.check_output(['grep', 'Connected'], stdin=ps1.stdout).decode("utf-8")
        stdout2 = subprocess.check_output(['grep', 'Connected'], stdin=ps2.stdout).decode("utf-8")
        
        if 'No' in stdout1 or 'no' in stdout1:
            subprocess.Popen(['bluetoothctl', 'connect', device1])
            c3(f'bluetoothctl connect {device1}')
        
        if 'No' in stdout2 or 'no' in stdout2:
            subprocess.Popen(['bluetoothctl', 'connect', device2])
            c3(f'bluetoothctl connect {device2}')

    def is_none(self, num, dev):
        gamepad = f'gamepad{num}'
        device = f'/dev/input/event{dev}'
        try:
            vars(self)[gamepad] = InputDevice(device)
            try:
                self.selector.unregister(vars(self)[gamepad])
            except:
                c3(f'Todavía no registrado {device}', c.azul)
            
            try:
                     self.selector.register(vars(self)[gamepad], EVENT_READ)
                     c3(f'Registrado {device}', c.cian)
            except:
                c3(f'{device} already registred', c.cian)

        except OSError as e:
            c3(f'No está conectado {device}')

            # Probando device + 1
            dev += 1
            device = f'/dev/input/event{dev}'
            try:
                vars(self)[gamepad] = InputDevice(device)
                try:     self.selector.unregister(vars(self)[gamepad])
                except:  c3(f'Todavía no registrado {device}', c.azul)
                try:
                        self.selector.register(vars(self)[gamepad], EVENT_READ)
                        c3(f'Registrado {device}', c.cian)
                except:  c3(f'{device} already registred', c.cian)
            except OSError as e:
                c3(f'Ni tampoco...     {device}')

    def check_devices(self):
        while self.bt_on:
            # Si no están cargados, los intenta cargar y registrarlos en selector
            if self.gamepad1 is None:
                self.is_none(1, self.devices_list[0])
            if self.gamepad2 is None:
                self.is_none(2, self.devices_list[1])
            time.sleep(1)

    def input_bt(self, gp1, gp2):
        self.devices_list = [gp1, gp2]
        self.devices_dict= {f'/dev/input/event{gp1}':'c',
                            f'/dev/input/event{gp2}':'c'}
        
        client.send_message('/bt_init', [gp1, gp2])

        thread(self.check_devices)
        
        time.sleep(2)
        while self.bt_on:
            
            # Si ninguno de los dos está cargado, vuelve a intentar conectarlos
            if self.gamepad1 is None and self.gamepad2 is None:
                c3('No está conectado ninguno')
                time.sleep(1)
                continue

            # Revisa la lista de selector, esperando que llegue algo
            for key, mask in self.selector.select():
                device = key.fileobj
                path   = key.fileobj.path
                
                # Intenta leer en device. Si salta error...
                try:
                    for event in device.read():
                        et, ec, ev = event.type, event.code, event.value
                        if et == ecodes.EV_ABS:
                            # Analogo
                            if ec == 1: self.bt_send(path, 'h', -ev)
                            if ec == 0: self.bt_send(path, 'v', -ev)
                            if   ec == 16 and ev == -1: self.bt_send_hat(path, 't', 1)
                            elif ec == 16 and ev ==  1: self.bt_send_hat(path, 'b', 1)
                            elif ec == 17 and ev == -1: self.bt_send_hat(path, 'r', 1)
                            elif ec == 17 and ev ==  1: self.bt_send_hat(path, 'l', 1)
                            if   ec == 1  and ev ==  0: self.bt_send_hat(path, 'r', 0)
                            if   ec == 1  and ev ==  0: self.bt_send_hat(path, 'l', 0)
                            if   ec == 0  and ev ==  0: self.bt_send_hat(path, 't', 0)
                            if   ec == 0  and ev ==  0: self.bt_send_hat(path, 'b', 0)
                            
                        if et == ecodes.EV_KEY:
                            if   ec == self.but[0]: self.bt_send(path, 0, ev)
                            elif ec == self.but[1]: self.bt_send(path, 1, ev)
                            elif ec == self.but[2]: self.bt_send(path, 2, ev)
                            elif ec == self.but[3]: self.bt_send(path, 3, ev)
                            elif ec == self.but[4]: self.bt_send(path, 4, ev)
                
                # ... es porque el gamepad se apagó. Lo cierra y lo desregistra de selector
                except OSError as e:
                    device.close()
                    c3('input_bt() - Except - Se apagó un gamepad')
                    
                    if path[-2:] == '16':
                        c3(f'¿Se apagó /dev/input/event{self.devices_list[0]}? Desregistrándolo...')
                        if self.gamepad1 != None:
                            self.selector.unregister(self.gamepad1)
                            self.gamepad1 = None
                    
                    if path[-2:] == '20':
                        c3(f'¿Se apagó /dev/input/event{self.devices_list[1]}? Desregistrándolo...')
                        if self.gamepad2 != None:
                            self.selector.unregister(self.gamepad2)
                            self.gamepad2 = None
            
            # c4('input_bt() Fin de WHILE')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--gp1', type=int, default=16,help='Gamepad 1')
    parser.add_argument('--gp2', type=int, default=20,help='Gamepad 2')
    args = parser.parse_args()
    
    bt = Bt()

    try:
        consola(f'"R": reconnect()', c.naranja)
        consola(f'"Q": quit', c.naranja)

        thread(bt.input_bt, [args.gp1, args.gp2])

        while True:
            tecla = input()
            if tecla == 'r':
                bt.reconnect()
            elif tecla == 'q':
                sys.exit()
    except KeyboardInterrupt:
        print(' Bye')

Кроме того, это модуль, который загружается, если вы не я ????.

Click to expand
#!/usr/bin/env python3

import random, threading

from pythonosc import udp_client
targetIp = "127.0.0.1"
targetPort = 10000
client = udp_client.SimpleUDPClient(targetIp, targetPort)
client.send_message("/init", 1)

class Gd:
    def __init__(self) -> None:
        self.gd = {'verbose': [True, True, True, True]}

globalDict = Gd()

class Color:
    def __init__(self):
        self.reset = '\x1b[0m'
        self.blanco = '\x1b[97m'
        self.negro = '\x1b[90m'
        self.rojo = '\x1b[91m'
        self.verde = '\x1b[92m'
        self.azul = '\x1b[94m'
        self.amarillo = '\x1b[93m'
        self.magenta = '\x1b[95m'
        self.magenta_bold = '\x1b[95;1m'
        self.azul_bold = '\x1b[94;1m'
        self.cian = '\x1b[96m'
        self.naranja = '\x1b[38;5;202m'
        self.violeta = '\x1b[38;5;129m'
        self.rosa = '\x1b[38;5;213m'
        self.ocre = '\x1b[38;5;172m'
        self.marron = '\x1b[38;5;52m'
        self.musgo = '\x1b[38;5;58m'
        self.error = '\x1b[93;41m'
        self.remoto = '\x1b[93;42m'
        self.debug = '\x1b[93;44m'
        self.lista_attrs = []
        self.attrs = self.__dict__
        
        for k, v in self.attrs.items():
            if k not in ['lista_attrs', 'attrs', 'random']:
                self.lista_attrs.append(v)
    
        self.random = random.choice(self.lista_attrs)
c = Color()

# Threading
def thread(function, args=[]):
    t = threading.Thread(
        target=function,
        args=(args),
        name=f'{function}({args})',
        daemon=True)
    t.start()

def c1(texto, color_texto=c.azul_bold):
    if globalDict.gd['verbose'][0]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def c2(texto, color_texto=c.azul):
    if globalDict.gd['verbose'][1]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def c3(texto, color_texto=c.cian):
    if globalDict.gd['verbose'][2]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def c4(texto, color_texto=c.rosa):
    if globalDict.gd['verbose'][3]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def consola(texto, color_texto=c.verde):
    texto = str(texto)
    print(color_texto, texto, c.reset)
person Mario Mey    schedule 08.07.2021