如果你有一些函数或者逻辑不想立即执行,希望延迟执行可以使用 Sched模块
Sched 模块是标准库,可用于创建机器人和其他监控和自动化应用程序。sched 模块实现了一个通用的事件调度程序,用于在特定时间运行任务。它提供了类似的工具,如Windows或Linux中的任务调度程序,但主要优点是与Python自己的sched模块平台差异可以忽略不计。
任务调度,在什么时间,执行什么函数,函数使用什么参数
在一个延迟或规定时间之后执行事件,需要采用enter()方法,包含4个参数:
- 间隔时间(具体值决定与delayfunc, 这里为秒)
- 优先级(两个事件在同一时间到达的情况)
- 调用的函数
- 函数参数
import sched, time
s = sched.scheduler(time.time, time.sleep) # 生成调度器
def print_time():
print ( "From print_time", time.time() )
def print_some_times():
print (time.time())
s.enter(5, 1, print_time, ())
# 加入调度事件
# 四个参数分别是:
# 间隔事件(具体值决定与delayfunc, 这里为秒);
# 优先级(两个事件在同一时间到达的情况);
# 触发的函数;
# 函数参数;
s.enter(10, 1, print_time, ())
# 运行
s.run()
print ( time.time() )
if __name__ == '__main__':
print_some_times()
详细的例子
# Python program for Creating
# an event scheduler
import sched
import time
# Creating an instance of the
# scheduler class
scheduler = sched.scheduler(time.time,
time.sleep)
调度程序实例提供的基本方法
- scheduler.enter():可以将事件安排在延迟后运行,也可以安排在特定时间运行。为了延迟安排它们,使用了方法。enter()
- 语法:scheduler.enter(delay, priority, action, argument=(), kwargs={})
- 参数: delay: 表示延迟时间
的数字 priority: 优先级值
action: 调用函数
argument:
参数的元组
import sched
import time
# instance is created
scheduler = sched.scheduler(time.time,
time.sleep)
# function to print time
# and name of the event
def print_event(name):
print('EVENT:', time.time(), name)
# printing starting time
print ('START:', time.time())
# first event with delay of
# 1 second
e1 = scheduler.enter(1, 1,
print_event, ('1 st', ))
# second event with delay of
# 2 seconds
e1 = scheduler.enter(2, 1,
print_event, (' 2nd', ))
# executing the events
scheduler.run()
START: 1580389814.152131
EVENT: 1580389815.1548214 1 st
EVENT: 1580389816.1533117 2nd
scheduler.enterabs()时间将事件添加到调度器的内部队列中,当调用调度器的方法时,调度器队列中的条目被逐个执行。enterabs()run()
语法:scheduler.enterabs(time, priority, action, argument=(), kwargs={})
参数: time: 必须执行
事件/操作的时间 priority: 事件操作的优先级: 构成事件的函数 argument: 事件函数的位置参数 kwargs :
# library imported
import sched
import time
# instance is created
scheduler = sched.scheduler(time.time,
time.sleep)
# function to print time and
# name of the event
def print_event(name):
print('EVENT:', time.time(), name)
# printing starting time
print ('START:', time.time())
# event x with delay of 1 second
# enters queue using enterabs method
e_x = scheduler.enterabs(time.time()+1, 1,
print_event,
argument = ("Event X", ));
# executing the events
scheduler.run()
START: 1580389960.5845037
EVENT: 1580389961.5875661 Event X
scheduler.cancel()从队列中删除事件。如果该事件不是队列中当前的事件,则此方法将引发 .ValueError
语法:scheduler.cancel(event)
参数: event:
应删除的事件
import sched
import time
# instance is created
scheduler = sched.scheduler(time.time,
time.sleep)
# function to print time and
# name of the event
def print_event(name):
print('EVENT:', time.time(), name)
# printing starting time
print ('START:', time.time())
# first event with delay
# of 1 second
e1 = scheduler.enter(1, 1,
print_event,
('1 st', ))
# second event with delay
# of 2 seconds
e2 = scheduler.enter(2, 1,
print_event,
(' 2nd', ))
# removing 1st event from
# the event queue
scheduler.cancel(e1)
# executing the events
scheduler.run()
START: 1580390119.54074
EVENT: 1580390121.5439944 2nd
scheduler.empty()如果事件队列为空,则返回 True。它不需要争论。
import sched
import time
# instance is created
scheduler = sched.scheduler(time.time,
time.sleep)
# function to print time
# and name of the event
def print_event(name):
print('EVENT:', time.time(), name)
# printing starting time
print ('START:', time.time())
# checking if event queue is
# empty or not
print(scheduler.empty())
# event entering into queue
e1 = scheduler.enter(2, 1,
print_event,
('1 st', ))
# checking if event queue is
# empty or not
print(scheduler.empty())
# executing the events
scheduler.run()
START: 1580390318.1343799
True
False
EVENT: 1580390320.136075 1 st
scheduler.queue 只读属性,按运行顺序返回即将发生的事件的列表。每个事件都显示为一个命名元组,其中包含以下字段:time、priority、action、argument、kwargs。
# library imported
import sched
import time
# instance is created
scheduler = sched.scheduler(time.time,
time.sleep)
# function to print time
# and name of the event
def print_event(name):
print('EVENT:', time.time(), name)
# printing starting time
print ('START:', time.time())
# event entering into queue
e1 = scheduler.enter(2, 1,
print_event,
('1 st', ))
# printing the details of
# upcoming events in event queue
print(scheduler.queue)
# executing the events
scheduler.run()
开始:1580390446.8743565
[事件(time=1580390448.8743565,优先级=1,操作=,参数=('1 st',),kwargs={})]
事件:1580390448.876916 1 st
如何使用绝对时间调试任务 enterabs
def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
if kwargs is _sentinel:
kwargs = {}
event = Event(time, priority, action, argument, kwargs)
with self._lock:
heapq.heappush(self._queue, event)
return event # The ID
相比如 enter的时间是一个相对时间这一点可以从API文档中的说明看到
def enter(self, delay, priority, action, argument=(), kwargs=_sentinel):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
time = self.timefunc() + delay
return self.enterabs(time, priority, action, argument, kwargs)
time = self.timefunc() + delay
接着调用 enterabs