¿Cómo ejecutar repetidamente una función cada x segundos?

Resuelto davidmytton asked hace 15 años • 23 respuestas

Quiero ejecutar repetidamente una función en Python cada 60 segundos para siempre (como NSTimer en Objective C o setTimeout en JS). Este código se ejecutará como un demonio y es efectivamente como llamar al script de Python cada minuto usando un cron, pero sin necesidad de que el usuario lo configure.

En esta pregunta sobre un cron implementado en Python , la solución parece efectivamente simplemente dormir() durante x segundos. No necesito una funcionalidad tan avanzada, así que quizás algo como esto funcione.

while True:
    # Code executed here
    time.sleep(60)

¿Hay algún problema previsible con este código?

davidmytton avatar Jan 24 '09 04:01 davidmytton
Aceptado

Si su programa aún no tiene un bucle de eventos, use el módulo programado , que implementa un programador de eventos de propósito general.

import sched, time

def do_something(scheduler): 
    # schedule the next call first
    scheduler.enter(60, 1, do_something, (scheduler,))
    print("Doing stuff...")
    # then do your stuff

my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()

Si ya está utilizando una biblioteca de bucles de eventos como asyncio, trio, tkinter, PyQt5, gobject, kivyy muchas otras, simplemente programe la tarea utilizando los métodos de su biblioteca de bucles de eventos existente.

nosklo avatar Jan 23 '2009 21:01 nosklo

Bloquee su bucle de tiempo en el reloj del sistema de esta manera:

import time
starttime = time.monotonic()
while True:
    print("tick")
    time.sleep(60.0 - ((time.monotonic() - starttime) % 60.0))

Utilice un reloj 'monótono' para funcionar correctamente; time() se ajusta mediante cambios de hora solar/legal, sincronización ntp, etc.

Dave Rove avatar Aug 11 '2014 20:08 Dave Rove

Si desea una forma sin bloqueo de ejecutar su función periódicamente, en lugar de un bucle infinito de bloqueo, usaría un temporizador subproceso. De esta manera, su código puede seguir ejecutándose y realizar otras tareas y aún así llamar a su función cada n segundos. Utilizo mucho esta técnica para imprimir información de progreso en tareas largas que requieren un uso intensivo de CPU/Disco/Red.

Aquí está el código que publiqué en una pregunta similar, con control start() y stop():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Uso:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Características:

  • Solo biblioteca estándar, sin dependencias externas
  • start()y stop()es seguro llamar varias veces incluso si el cronómetro ya se ha iniciado o detenido
  • La función a llamar puede tener argumentos posicionales y con nombre.
  • Puede cambiar intervalen cualquier momento, será efectivo después de la próxima ejecución. ¡ Lo mismo para argse kwargsincluso function!
MestreLion avatar Jul 11 '2016 22:07 MestreLion