如果你想定期运行Python脚本,最流行的选择是Crontab脚本,但Crontab有以下缺点:
1、执行二级任务不方便。 2. 当您有数百个计划任务需要运行时,管理Crontab 变得特别不方便。另一种选择是Celery,但Celery 配置起来比较麻烦。如果你只需要一个轻量级的调度工具,Celery 并不是正确的选择。
如果你想要一个尽可能简单易用、不需要外部依赖、最好涵盖Crontab所有基本功能的轻量级任务调度工具,那么Schedule模块是最好的选择。
使用它来安排任务只需要几行代码。请尝试使用它。
# Python实用指南
进口时间表
导入时间
默认作业():
print('我在工作.')
时间表.每(10).分钟.do(工作)
而True:
调度.run_pending()
睡眠时间(1)
上面的代码显示作业函数每10 分钟运行一次。这个非常简单方便。您需要做的就是引入一个调度模块并通过调用schedule.every(timenumber).time type.do(job)来公开一个重复任务。
发布的循环任务必须使用run_pending 函数检查它是否已运行,因此需要一个While 循环来不断轮询该函数。
了解有关日程模块的安装以及基本和高级用法的更多信息。
1. 准备工作
在开始之前,您需要确保Python 和pip 已成功安装在您的计算机上。如果没有,请访问这篇文章《超详细Python安装指南》进行安装。
(方案一)如果想使用Python进行数据分析,可以直接安装Anaconda。 Anaconda是Python数据分析和挖掘的好帮手,包含Python和pip。
(选项2)此外,我们建议使用VSCode编辑器,它有很多优点。 Python编程的最佳拍档——VSCode详细指南。
选择以下方法之一并输入命令安装依赖项:
1.Windows环境打开Cmd(开始-运行-CMD)。
2.MacOS环境打开终端(命令+空格进入终端)。
3.如果您使用的是VSCode编辑器或Pycharm,可以直接使用界面底部的终端。
点安装计划
2.基本使用
最基本的用法在文章开头已经讲解过了。以下是调度任务的更多示例。
# Python实用指南
进口时间表
导入时间
默认作业():
print('我在工作.')
# 每10分钟运行一次任务
时间表.每(10).分钟.do(工作)
# 每小时运行一次任务
时间表.每个().小时.do(工作)
# 每天执行10:30个任务
Schedule.every().day.at('10:30').do(工作)
# 每月运行一次任务
Schedule.every().monday.do(工作)
# 每周三13:15 分钟运行任务
Schedule.every().wednesday.at('13:15').do(作业)
# 每分钟17秒运行一次任务
Schedule.every().minut.at(':17').do(作业)
而True:
调度.run_pending()
睡眠时间(1)
如您所见,上面的示例涵盖了从几个月到几秒的配置。但是,如果您只想运行该任务一次,可以这样设置:
# Python实用指南
进口时间表
导入时间
def job_that_executes_once():
# 这里写的任务只会执行一次.
返回时间表。取消工作
Schedule.every().day.at('22:30').do(job_that_executes_once)
而True:
调度.run_pending()
睡眠时间(1)
传递参数
仅当有参数需要传递给作业执行时才需要执行此操作。
# Python实用指南
进口时间表
Defgreet(姓名):
print('你好', 姓名)
# do() 将附加参数传递给作业函数
Schedule.every(2).Seconds.do(问候语, name='Alice')
Schedule.every(4).seconds.do(问候语, name='Bob')
获取所有当前职位
如果您想获得当前所有工作:
# Python实用指南
进口时间表
绝对你好():
打印('你好世界')
Schedule.every().Second.do(你好)
all_jobs=Schedule.get_jobs()
取消所有作业
如果触发任何机制,则必须立即清除当前程序的所有作业。
# Python实用指南
进口时间表
Defgreet(姓名):
print('你好{}'.format(name))
Schedule.every().Second.do(问候语)
时间表.clear()
标签功能
当您设置作业时,您可以对其进行标记,以便于后续的作业管理。这允许您过滤作业和取消作业。
# Python实用指南
进口时间表
Defgreet(姓名):
print('你好{}'.format(name))
# .tag 标签
Schedule.every().day.do(greetings, 'Andrea').tag('每日任务', '朋友')
Schedule.every().hour.do(greet, 'John').tag('每小时任务', '朋友')
Schedule.every().hour.do(greet, 'Monica').tag('每小时任务', '客户')
Schedule.every().day.do(greetings, 'Derek').tag('每日任务', '客人')
#get_jobs(label):可以获取带有该标签的所有任务
朋友=schedule.get_jobs('朋友')
# 取消所有标记为daily-task 的任务
Schedule.clear('每日任务')
设定作业截止日期
如果您需要在特定时间之前完成工作,您可以执行以下操作:
# Python实用指南
进口时间表
从日期时间导入日期时间、时间增量、时间
默认作业():
打印('嘘')
# 每小时运行一次作业,并在18:30 之后停止
Schedule.every(1).hours.until('18:30').do(作业)
# 每小时运行一次作业(今天2030-01-01 18:33)
Schedule.every(1).hours.until('2030-01-01 18:33').do(作业)
# 每小时运行一次作业,8小时后停止
Schedule.every(1).hours.until(timedelta(hours=8)).do(作业)
# 每小时运行一次作业,并在11:32:42 之后停止
Schedule.every(1).time.until(时间(11,33,42)).do(作业)
# 每小时运行一次作业,并在2020 年5 月17 日之后停止11:36:20
Schedule.every(1).hours.until(日期时间(2020,5,17,11,36,20)).do(作业)
截止日期之后,作业将不会运行。
无论计划如何,立即运行所有作业
如果某个机制被触发,需要所有作业立即运行:可以调用schedule.run_all() :
# Python实用指南
进口时间表
def job_1():
打印('fuu')
def job_2():
打印('酒吧')
Schedule.every().monday.at('12:40').do(job_1)
Schedule.every().tuesday.at('16:40').do(job_2)
调度.run_all()
# 立即运行所有作业,每个作业之间间隔10 秒
Schedule.run_all(delay_seconds=10)
3. 高级使用
装修工安排工作
如果你觉得配置你的工作太冗长,你也可以使用装饰器模式。
# Python实用指南
从计划中导入间隔、重复、待执行
导入时间
# 这个装饰器的效果相当于schedule.every(10)。
@重复(每10分钟一次)
默认作业():
print('我是一个预定的工作')
而True:
运行挂起()
睡眠时间(1)
并行执行
默认情况下,计划按顺序运行所有作业。其背后的原因是,很难找到一个令所有人满意的并行执行模型。
但是,您可以通过在多个线程中运行每个作业来绕过此限制。
# Python实用指南
导入线程
导入时间
进口时间表
默认作业1():
print('正在线程%s 中运行' % threading.current_thread())
def job2():
print('正在线程%s 中运行' % threading.current_thread())
默认作业3():
print('正在线程%s 中运行' % threading.current_thread())
def run_threaded(job_func):
job_thread=threading.Thread(target=job_func)
job_thread.start()
Schedule.every(10).Seconds.do(run_threaded, job1)
Schedule.every(10).Seconds.do(run_threaded, job2)
Schedule.every(10).seconds.do(run_threaded, job3)
而True:
调度.run_pending()
睡眠时间(1)
记录
调度模块还支持日志记录,可以按如下方式使用:
# Python实用指南
进口时间表
导入日志
日志记录.basicConfig()
日程记录器=logging.getLogger('schedule')
# 日志级别为DEBUG
Schedule_logger.setLevel(level=logging.DEBUG)
默认作业():
print('你好,日志')
Schedule.every().Seconds.do(作业)
调度.run_all()
时间表.clear()
效果是:
DEBUG:schedule: 运行*所有* 1 个作业,它们之间有0 秒的延迟
DEBUG:schedule: 执行作业Job(interval=1,unit=seconds,do=job,args=(),kwargs={})
你好,日志
DEBUG:schedule: 删除“所有”作业
异常处理
时间表不会自动捕获异常。当异常发生时,直接抛出异常。这将中断所有后续作业,因此您需要捕获这些异常。
尽管您可以手动捕获,但不可预见的情况可能需要以编程方式自动捕获。为此,请添加一个装饰器。
# Python实用指南
导入功能工具
def catch_Exceptions(cancel_on_failure=False):
def catch_Exceptions_decorator(job_func):
@functools.wraps(job_func)
def 包装器(*args,**kwargs):
尝试:
返回job_func(*args, **kwargs)
不包括:
导入回溯
打印(traceback.format_exc())
如果cancel_on_failure:
返回时间表。取消工作
返回包装器
返回catch_Exceptions_decorator
@catch_Exceptions(cancel_on_failure=True)
def bad_task():
返回1/0
Schedule.every(5).Minute.do(坏任务)
这样,执行过程中bad_task 中发生的任何错误都会被catch_Exceptions 捕获。这对于保证计划任务的正常运行非常重要。
如果您喜欢今天的Python 实践教程,请继续关注我们。
版权声明:本文由今日头条转载,如有侵犯您的版权,请联系本站编辑删除。