Есть ли в Python функции, похожие на setInterval()
в JavaScript?
Спасибо
Есть ли в Python функции, похожие на setInterval()
в JavaScript?
Спасибо
Возможно, это правильный фрагмент, который вы искали:
import threading
def set_interval(func, sec):
def func_wrapper():
set_interval(func, sec)
func()
t = threading.Timer(sec, func_wrapper)
t.start()
return t
t.cancel()
? Вроде игнорирует.
- person imrek; 03.04.2017
Это версия, в которой вы можете начать и остановиться. Это не блокировка. Также нет сбоя, так как ошибка времени выполнения не добавляется (важно для длительного выполнения с очень коротким интервалом, например, для звука)
import time, threading
StartTime=time.time()
def action() :
print('action ! -> time : {:.1f}s'.format(time.time()-StartTime))
class setInterval :
def __init__(self,interval,action) :
self.interval=interval
self.action=action
self.stopEvent=threading.Event()
thread=threading.Thread(target=self.__setInterval)
thread.start()
def __setInterval(self) :
nextTime=time.time()+self.interval
while not self.stopEvent.wait(nextTime-time.time()) :
nextTime+=self.interval
self.action()
def cancel(self) :
self.stopEvent.set()
# start action every 0.6s
inter=setInterval(0.6,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-StartTime))
# will stop interval in 5s
t=threading.Timer(5,inter.cancel)
t.start()
Выход:
just after setInterval -> time : 0.0s
action ! -> time : 0.6s
action ! -> time : 1.2s
action ! -> time : 1.8s
action ! -> time : 2.4s
action ! -> time : 3.0s
action ! -> time : 3.6s
action ! -> time : 4.2s
action ! -> time : 4.8s
setInterval(1, func1)
и setInterval(1, func2)
, можно ли оптимизировать, чтобы несколько функций находились в одной строке? как я могу вместо этого использовать async
сопрограммы?
- person abe; 27.05.2020
inter1=setInterval(0.6,action1)
inter2=setInterval(2,action2)
- person doom; 17.11.2020
Просто сделайте это красиво и просто.
import threading
def setInterval(func,time):
e = threading.Event()
while not e.wait(time):
func()
def foo():
print "hello"
# using
setInterval(foo,5)
# output:
hello
hello
.
.
.
РЕДАКТИРОВАТЬ: этот код не блокирует
import threading
class ThreadJob(threading.Thread):
def __init__(self,callback,event,interval):
'''runs the callback function after interval seconds
:param callback: callback function to invoke
:param event: external event for controlling the update operation
:param interval: time in seconds after which are required to fire the callback
:type callback: function
:type interval: int
'''
self.callback = callback
self.event = event
self.interval = interval
super(ThreadJob,self).__init__()
def run(self):
while not self.event.wait(self.interval):
self.callback()
event = threading.Event()
def foo():
print "hello"
k = ThreadJob(foo,event,2)
k.start()
print "It is non-blocking"
func()
; кроме того, почему не просто time.sleep()
?
- person ivan_pozdeev; 02.06.2018
Немного измените ответ Nailxx, и вы получили ответ!
from threading import Timer
def hello():
print "hello, world"
Timer(30.0, hello).start()
Timer(30.0, hello).start() # after 30 seconds, "hello, world" will be printed
Timer
в одной строке!
- person Serge Stroobandt; 10.12.2017
Модуль sched
предоставляет эти возможности для общего кода Python. Однако, как следует из документации, если ваш код является многопоточным, может иметь смысл использовать вместо этого класс threading.Timer
.
Я думаю, это то, что вам нужно:
#timertest.py
import sched, time
def dostuff():
print "stuff is being done!"
s.enter(3, 1, dostuff, ())
s = sched.scheduler(time.time, time.sleep)
s.enter(3, 1, dostuff, ())
s.run()
Если вы добавите еще одну запись в планировщик в конце повторяющегося метода, он просто продолжит работу.
apscheduler
с (time.time, time.sleep, timezone='UTC')
, чтобы избежать ошибок, связанных с переходом на летнее время (DST).
- person Serge Stroobandt; 10.12.2017
Я использую sched для создания setInterval
функции суть
import functools
import sched, time
s = sched.scheduler(time.time, time.sleep)
def setInterval(sec):
def decorator(func):
@functools.wraps(func)
def wrapper(*argv, **kw):
setInterval(sec)(func)
func(*argv, **kw)
s.enter(sec, 1, wrapper, ())
return wrapper
s.run()
return decorator
@setInterval(sec=3)
def testInterval():
print ("test Interval ")
testInterval()
Простые утилиты setInterval
from threading import Timer
def setInterval(timer, task):
isStop = task()
if not isStop:
Timer(timer, setInterval, [timer, task]).start()
def hello():
print "do something"
return False # return True if you want to stop
if __name__ == "__main__":
setInterval(2.0, hello) # every 2 seconds, "do something" will be printed
В приведенных выше решениях, если возникает ситуация, когда программа завершает работу, нет гарантии, что она завершится корректно, всегда рекомендуется закрывать программу с помощью мягкого уничтожения, и у большинства из них не было функции остановки, я нашел хорошую статью на носителе, написанном Sankalp, который решает обе эти проблемы (запускать периодические задачи в python) обратитесь к прикрепленной ссылке, чтобы получить более подробное представление. В приведенном ниже примере библиотека с именем signal используется для отслеживания убийства - мягкого или жесткого убийства.
import threading, time, signal
from datetime import timedelta
WAIT_TIME_SECONDS = 1
class ProgramKilled(Exception):
pass
def foo():
print time.ctime()
def signal_handler(signum, frame):
raise ProgramKilled
class Job(threading.Thread):
def __init__(self, interval, execute, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = False
self.stopped = threading.Event()
self.interval = interval
self.execute = execute
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)
if __name__ == "__main__":
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
job.start()
while True:
try:
time.sleep(1)
except ProgramKilled:
print "Program killed: running cleanup code"
job.stop()
break
#output
#Tue Oct 16 17:47:51 2018
#Tue Oct 16 17:47:52 2018
#Tue Oct 16 17:47:53 2018
#^CProgram killed: running cleanup code
Вышеупомянутый метод не совсем помог мне, так как мне нужно было отменить интервал. Я превратил функцию в класс и придумал следующее:
class setInterval():
def __init__(self, func, sec):
def func_wrapper():
self.t = threading.Timer(sec, func_wrapper)
self.t.start()
func()
self.t = threading.Timer(sec, func_wrapper)
self.t.start()
def cancel(self):
self.t.cancel()
В последнее время у меня такая же проблема, как и у вас. И я нахожу эти решения:
1. вы можете использовать библиотеку: threading.Time (здесь есть введение выше)
2. вы можете использовать библиотеку: sched (это тоже есть введение выше) em >
3. вы можете использовать библиотеку: Advanced Python Scheduler (рекомендуется)
Некоторые ответы выше, в которых используются func_wrapper
и threading.Timer
, действительно работают, за исключением того, что он порождает новый поток каждый раз, когда вызывается интервал, что вызывает проблемы с памятью.
Базовый пример ниже примерно реализует аналогичный механизм, помещая интервал в отдельный поток. Он спит с заданным интервалом. Перед тем, как перейти к коду, вам следует знать о некоторых ограничениях:
JavaScript является однопоточным, поэтому, когда функция внутри setInterval
запускается, ничто другое не будет работать одновременно (за исключением рабочего потока, но давайте поговорим об общем случае использования setInterval
. Таким образом, потоки безопасны. Но здесь, в этой реализации, вы может столкнуться с условиями гонки, если не используется threading.rLock
.
В приведенной ниже реализации используется time.sleep
для имитации интервалов, но при добавлении времени выполнения func
общее время для этого интервала может оказаться больше, чем вы ожидаете. Поэтому, в зависимости от вариантов использования, вы можете захотеть «меньше спать» (за вычетом времени, затраченного на вызов func
)
Я только грубо протестировал это, и вам определенно не следует использовать глобальные переменные, как я, не стесняйтесь настраивать их так, чтобы они соответствовали вашей системе.
Хватит говорить, вот код:
# Python 2.7
import threading
import time
class Interval(object):
def __init__(self):
self.daemon_alive = True
self.thread = None # keep a reference to the thread so that we can "join"
def ticktock(self, interval, func):
while self.daemon_alive:
time.sleep(interval)
func()
num = 0
def print_num():
global num
num += 1
print 'num + 1 = ', num
def print_negative_num():
global num
print '-num = ', num * -1
intervals = {} # keep track of intervals
g_id_counter = 0 # roughly generate ids for intervals
def set_interval(interval, func):
global g_id_counter
interval_obj = Interval()
# Put this interval on a new thread
t = threading.Thread(target=interval_obj.ticktock, args=(interval, func))
t.setDaemon(True)
interval_obj.thread = t
t.start()
# Register this interval so that we can clear it later
# using roughly generated id
interval_id = g_id_counter
g_id_counter += 1
intervals[interval_id] = interval_obj
# return interval id like it does in JavaScript
return interval_id
def clear_interval(interval_id):
# terminate this interval's while loop
intervals[interval_id].daemon_alive = False
# kill the thread
intervals[interval_id].thread.join()
# pop out the interval from registry for reusing
intervals.pop(interval_id)
if __name__ == '__main__':
num_interval = set_interval(1, print_num)
neg_interval = set_interval(3, print_negative_num)
time.sleep(10) # Sleep 10 seconds on main thread to let interval run
clear_interval(num_interval)
clear_interval(neg_interval)
print "- Are intervals all cleared?"
time.sleep(3) # check if both intervals are stopped (not printing)
print "- Yup, time to get beers"
Ожидаемый результат:
num + 1 = 1
num + 1 = 2
-num = -2
num + 1 = 3
num + 1 = 4
num + 1 = 5
-num = -5
num + 1 = 6
num + 1 = 7
num + 1 = 8
-num = -8
num + 1 = 9
num + 1 = 10
-num = -10
Are intervals all cleared?
Yup, time to get beers
Мой модуль Python 3 jsinterval.py
будет вам полезен! Вот:
"""
Threaded intervals and timeouts from JavaScript
"""
import threading, sys
__all__ = ['TIMEOUTS', 'INTERVALS', 'setInterval', 'clearInterval', 'setTimeout', 'clearTimeout']
TIMEOUTS = {}
INTERVALS = {}
last_timeout_id = 0
last_interval_id = 0
class Timeout:
"""Class for all timeouts."""
def __init__(self, func, timeout):
global last_timeout_id
last_timeout_id += 1
self.timeout_id = last_timeout_id
TIMEOUTS[str(self.timeout_id)] = self
self.func = func
self.timeout = timeout
self.threadname = 'Timeout #%s' %self.timeout_id
def run(self):
func = self.func
delx = self.__del__
def func_wrapper():
func()
delx()
self.t = threading.Timer(self.timeout/1000, func_wrapper)
self.t.name = self.threadname
self.t.start()
def __repr__(self):
return '<JS Timeout set for %s seconds, launching function %s on timeout reached>' %(self.timeout, repr(self.func))
def __del__(self):
self.t.cancel()
class Interval:
"""Class for all intervals."""
def __init__(self, func, interval):
global last_interval_id
self.interval_id = last_interval_id
INTERVALS[str(self.interval_id)] = self
last_interval_id += 1
self.func = func
self.interval = interval
self.threadname = 'Interval #%s' %self.interval_id
def run(self):
func = self.func
interval = self.interval
def func_wrapper():
timeout = Timeout(func_wrapper, interval)
self.timeout = timeout
timeout.run()
func()
self.t = threading.Timer(self.interval/1000, func_wrapper)
self.t.name = self.threadname
self.t.run()
def __repr__(self):
return '<JS Interval, repeating function %s with interval %s>' %(repr(self.func), self.interval)
def __del__(self):
self.timeout.__del__()
def setInterval(func, interval):
"""
Create a JS Interval: func is the function to repeat, interval is the interval (in ms)
of executing the function.
"""
temp = Interval(func, interval)
temp.run()
idx = int(temp.interval_id)
del temp
return idx
def clearInterval(interval_id):
try:
INTERVALS[str(interval_id)].__del__()
del INTERVALS[str(interval_id)]
except KeyError:
sys.stderr.write('No such interval "Interval #%s"\n' %interval_id)
def setTimeout(func, timeout):
"""
Create a JS Timeout: func is the function to timeout, timeout is the timeout (in ms)
of executing the function.
"""
temp = Timeout(func, timeout)
temp.run()
idx = int(temp.timeout_id)
del temp
return idx
def clearTimeout(timeout_id):
try:
TIMEOUTS[str(timeout_id)].__del__()
del TIMEOUTS[str(timeout_id)]
except KeyError:
sys.stderr.write('No such timeout "Timeout #%s"\n' %timeout_id)
РЕДАКТИРОВАНИЕ КОДА. Исправлена утечка памяти (обнаружена @benjaminz). Теперь ВСЕ потоки очищаются по окончании. Почему происходит эта утечка? Это происходит из-за неявных (или даже явных) ссылок. В моем случае TIMEOUTS
и INTERVALS
. Таймауты самоочищаются автоматически (после этого патча), потому что они используют оболочку функции, которая вызывает функцию, а затем самоуничтожается. Но как это происходит? Объекты нельзя удалить из памяти, пока не будут удалены все ссылки или не будет использован gc
модуль. Объяснение: нет возможности создавать (в моем коде) нежелательные ссылки на таймауты / интервалы. У них есть только ОДИН реферер: _6 _ / _ 7_ dicts. И, когда они прерваны или завершены (без прерывания могут завершиться только тайм-ауты), они удаляют единственную существующую ссылку на себя: соответствующий им элемент dict. Классы идеально инкапсулируются с использованием __all__
, поэтому нет места для утечек памяти.
Вот решение с малым дрейфом времени, которое использует поток для периодической сигнализации объекта Event. Run () потока почти ничего не делает в ожидании тайм-аута; отсюда небольшой временной дрейф.
# Example of low drift (time) periodic execution of a function.
import threading
import time
# Thread that sets 'flag' after 'timeout'
class timerThread (threading.Thread):
def __init__(self , timeout , flag):
threading.Thread.__init__(self)
self.timeout = timeout
self.stopFlag = False
self.event = threading.Event()
self.flag = flag
# Low drift run(); there is only the 'if'
# and 'set' methods between waits.
def run(self):
while not self.event.wait(self.timeout):
if self.stopFlag:
break
self.flag.set()
def stop(self):
stopFlag = True
self.event.set()
# Data.
printCnt = 0
# Flag to print.
printFlag = threading.Event()
# Create and start the timer thread.
printThread = timerThread(3 , printFlag)
printThread.start()
# Loop to wait for flag and print time.
while True:
global printCnt
# Wait for flag.
printFlag.wait()
# Flag must be manually cleared.
printFlag.clear()
print(time.time())
printCnt += 1
if printCnt == 3:
break;
# Stop the thread and exit.
printThread.stop()
printThread.join()
print('Done')
Большинство приведенных выше ответов не закрывают поток должным образом. При использовании записной книжки Jupyter я заметил, что когда было отправлено явное прерывание, потоки все еще работали и, что еще хуже, они продолжали умножаться, начиная с 1 потока, 2, 4 и т. Д. Мой метод ниже основан на ответе @doom, но чисто обрабатывает прерывания, запуская бесконечный цикл в основном потоке для прослушивания событий SIGINT и SIGTERM
Не стесняйтесь предлагать улучшения
import time
import threading
import signal
# Record the time for the purposes of demonstration
start_time=time.time()
class ProgramKilled(Exception):
"""
An instance of this custom exception class will be thrown everytime we get an SIGTERM or SIGINT
"""
pass
# Raise the custom exception whenever SIGINT or SIGTERM is triggered
def signal_handler(signum, frame):
raise ProgramKilled
# This function serves as the callback triggered on every run of our IntervalThread
def action() :
print('action ! -> time : {:.1f}s'.format(time.time()-start_time))
# https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval
class IntervalThread(threading.Thread) :
def __init__(self,interval,action, *args, **kwargs) :
super(IntervalThread, self).__init__()
self.interval=interval
self.action=action
self.stopEvent=threading.Event()
self.start()
def run(self) :
nextTime=time.time()+self.interval
while not self.stopEvent.wait(nextTime-time.time()) :
nextTime+=self.interval
self.action()
def cancel(self) :
self.stopEvent.set()
def main():
# Handle SIGINT and SIFTERM with the help of the callback function
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# start action every 1s
inter=IntervalThread(1,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-start_time))
# will stop interval in 500s
t=threading.Timer(500,inter.cancel)
t.start()
# https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
while True:
try:
time.sleep(1)
except ProgramKilled:
print("Program killed: running cleanup code")
inter.cancel()
break
if __name__ == "__main__":
main()
засыпать, пока не начнется следующий интервал длины seconds
: (не одновременно)
def sleep_until_next_interval(self, seconds):
now = time.time()
fall_asleep = seconds - now % seconds
time.sleep(fall_asleep)
while True:
sleep_until_next_interval(10) # 10 seconds - worktime
# work here
просто и без дрейфа.
Я написал свой код, чтобы сделать очень очень гибкий setInterval в python. Держи:
import threading
class AlreadyRunning(Exception):
pass
class IntervalNotValid(Exception):
pass
class setInterval():
def __init__(this, func=None, sec=None, args=[]):
this.running = False
this.func = func # the function to be run
this.sec = sec # interval in second
this.Return = None # The returned data
this.args = args
this.runOnce = None # asociated with run_once() method
this.runOnceArgs = None # asociated with run_once() method
if (func is not None and sec is not None):
this.running = True
if (not callable(func)):
raise TypeError("non-callable object is given")
if (not isinstance(sec, int) and not isinstance(sec, float)):
raise TypeError("A non-numeric object is given")
this.TIMER = threading.Timer(this.sec, this.loop)
this.TIMER.start()
def start(this):
if (not this.running):
if (not this.isValid()):
raise IntervalNotValid("The function and/or the " +
"interval hasn't provided or invalid.")
this.running = True
this.TIMER = threading.Timer(this.sec, this.loop)
this.TIMER.start()
else:
raise AlreadyRunning("Tried to run an already run interval")
def stop(this):
this.running = False
def isValid(this):
if (not callable(this.func)):
return False
cond1 = not isinstance(this.sec, int)
cond2 = not isinstance(this.sec, float)
if (cond1 and cond2):
return False
return True
def loop(this):
if (this.running):
this.TIMER = threading.Timer(this.sec, this.loop)
this.TIMER.start()
function_, Args_ = this.func, this.args
if (this.runOnce is not None): # someone has provide the run_once
runOnce, this.runOnce = this.runOnce, None
result = runOnce(*(this.runOnceArgs))
this.runOnceArgs = None
# if and only if the result is False. not accept "None"
# nor zero.
if (result is False):
return # cancel the interval right now
this.Return = function_(*Args_)
def change_interval(this, sec):
cond1 = not isinstance(sec, int)
cond2 = not isinstance(sec, float)
if (cond1 and cond2):
raise TypeError("A non-numeric object is given")
# prevent error when providing interval to a blueprint
if (this.running):
this.TIMER.cancel()
this.sec = sec
# prevent error when providing interval to a blueprint
# if the function hasn't provided yet
if (this.running):
this.TIMER = threading.Timer(this.sec, this.loop)
this.TIMER.start()
def change_next_interval(this, sec):
if (not isinstance(sec, int) and not isinstance(sec, float)):
raise TypeError("A non-numeric object is given")
this.sec = sec
def change_func(this, func, args=[]):
if (not callable(func)):
raise TypeError("non-callable object is given")
this.func = func
this.args = args
def run_once(this, func, args=[]):
this.runOnce = func
this.runOnceArgs = args
def get_return(this):
return this.Return
Вы можете получить множество функций и гибкость. Выполнение этого кода не заморозит ваш код, вы можете изменить интервал во время выполнения, вы можете изменить функцию во время выполнения, вы можете передавать аргументы, вы можете получить возвращаемый объект из своей функции и многое другое. Вы тоже можете делать свои трюки!
вот очень простой и базовый пример его использования:
import time
def interval(name="world"):
print(f"Hello {name}!")
# function named interval will be called every two seconds
# output: "Hello world!"
interval1 = setInterval(interval, 2)
# function named interval will be called every 1.5 seconds
# output: "Hello Jane!"
interval2 = setInterval(interval, 1.5, ["Jane"])
time.sleep(5) #stop all intervals after 5 seconds
interval1.stop()
interval2.stop()
Посмотрите мой проект Github, чтобы увидеть больше примеров и следить за следующими обновлениями: D https://github.com/Hzzkygcs/setInterval-python
Вот что-нибудь простое:
import time
delay = 10 # Seconds
def setInterval():
print('I print in intervals!')
time.sleep(delay)
setInterval()
В Python все работает по-другому: вам нужно либо sleep()
(если вы хотите заблокировать текущий поток), либо запустить новый поток. См. http://docs.python.org/library/threading.html.
from threading import Timer
def hello():
print "hello, world"
t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
Timer(your_interval, your_function_to_call).start()
- person nkrkv; 23.04.2010
my_timer.stop()
- person nkrkv; 23.04.2010
setInterval()
декоратор или без потоков с использованием Tkinter, Gtk, циклов событий Twisted - person jfs   schedule 24.05.2013call_repeatedly(interval, function, *args)
- person jfs   schedule 20.03.2014