量化策略
- 机器学习模型
- 随机森林:输入一组特征(或称为属性、变量),随机选择一部分特征进行构建一个决策树,每棵树给出一个类别预测,对于分类问题,每棵树给出一个类别预测,最终的预测结果是通过多数投票确定的;对于回归问题,则是所有树预测结果的平均值。
- 神经网络:构建输入层、隐藏层和输出层组成的神经网络,每个层由一个或多个激活函数(如ReLU、sigmoid、tanh等)组成神经元,激活函数用来学习和模拟复杂的函数映射。输入数据在网络中从输入层经过每一层的神经元处理,直到输出层。在训练过程中,网络使用反向传播算法来计算损失函数关于每个参数的梯度,并使用梯度下降或其他优化算法来更新参数。通过多次迭代训练,不断调整权重和偏置,直到模型在训练数据上达到满意的性能。
- 统计模型
- 回归分析: 包含线性回归、非线性回归、逻辑回归(因子回归)
- 时间序列分析
- ARIMA: 由自回归(AR)、差分(I)和移动平均(MA)三部分组成。自回归部分捕捉时间序列的自相关关系,使用过去时刻的观测值来预测当前时刻的值。自回归模型的阶数为p。差分是将非平稳时间序列转换为平稳时间序列的关键步骤。差分的次数d表示为了使序列平稳需要进行多少次差分。移动平均部分关注的是自回归模型中的误差项的累加,反映了在t时刻的目标值与前t-1~q个误差值之间存在的线性关系,其中q为移动平均项数。
系统架构
- 高可用性设计
- 负载均衡:将工作负载(如网络流量或请求)均匀分配到多个服务器或计算资源上的技术,以优化资源使用、最大化吞吐量、最小化响应时间,并避免任何单一点过载。
- 冗余服务:系统中部署额外的服务实例,以确保主服务发生故障时,备用服务可以接管工作,从而保证服务的连续性。
- 数据库复制:将一个数据库中的数据实时或准实时地复制到另一个数据库的过程,以实现数据的冗余和备份。
- 故障转移机制:当系统的主要组件发生故障时,自动将工作负载转移到备用组件的过程,以确保服务的连续性和数据的完整性。
- 架构模式
- 分层架构模式:将系统分解为多个层次,每一层都有特定的职责,例如表示层、业务逻辑层、数据访问层和数据层。
- 微服务架构模式:将应用程序构建为一系列小型、独立的服务,每个服务运行在自己的进程中,并通过轻量级的通信机制(通常是HTTP RESTful API)进行交互
- 事件驱动架构模式:系统组件之间的交互是基于事件的发布和订阅,而不是直接的函数调用
- 代码执行效率和准确性
- 可以通过单元测试、代码审查、性能分析工具(如cProfile)来确保效率和准确性。
pr = cProfile.Profile()
pr.enable()
my_function()
pr.disable()
pr.dump_stats('profile_output.prof')
import pstats
p = pstats.Stats('profile_output.prof')
p.sort_stats('cumulative').print_stats(10) # 显示累积时间最长的10个函数
# 或者
p.sort_stats('time').print_stats(10) # 显示内部时间最长的10个函数
cProfile
的输出结果包括多个列,以下是一些常见的列:
ncalls
:函数被调用的次数。tottime
:在函数内部花费的总时间(不包括调用其他函数的时间)。percall
:每次调用函数的平均时间。cumtime
:包括函数调用其他函数在内的总时间。percall
:包括调用其他函数在内的每次调用的平均时间。filename:lineno(function)
:函数所在的文件和行号。
性能调优
包括代码层面的优化(如算法优化、数据结构选择)、系统层面的优化(如内存管理、垃圾回收调优)。
- 内存管理、垃圾回收调优:
Python使用引用计数来跟踪每个对象的引用数量。当一个对象的引用计数达到零时,它将被垃圾回收。 Python还使用内存池来管理对象的分配和释放,以减少内存碎片和提高性能。Python使用一个私有的垃圾收集器来处理循环引用,这个收集器主要关注包含容器对象(如列表、字典、类实例等)的循环引用。Python提供了一个gc
模块,允许你控制垃圾收集器的行为,包括启动垃圾收集、获取当前内存使用情况和设置阈值。可以通过调整gc
模块中的gc.set_threshold()
函数来控制垃圾收集器的行为,该函数接受三个参数:gc.set_threshold(threshold0, threshold1, threshold2)
,这些参数决定了何时启动垃圾收集。
- 对象创建优化:使用
局部变量
而不是全局变量
,或者使用生成器
而不是列表推导式
来处理大型数据集 - 数据类型优化:使用
tuple
代替list
,或者使用set
代替dict
来存储无序集合。使用array.array
或numpy
数组来存储大量数值数据。 - 对于类实例,使用
__slots__
可以节省内存,因为它防止了实例字典的创建
内存泄漏诊断
- memory_profiler
memory_profiler
可帮助开发者找到可能的内存泄漏点,即那些分配了但未被及时释放的内存。
# myscript.py
from memory_profiler import profile
@profile
def my_func():
a = [1] * (10 ** 6)
b = [2] * (2 * 10 ** 7)
del b
return a
if __name__ == "__main__":
my_func()
python -m memory_profiler myscript.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
4 53.9 MiB 53.9 MiB 1 @profile
5 def my_func():
6 61.5 MiB 7.6 MiB 1 a = [1] * (10 ** 6)
7 214.1 MiB 152.6 MiB 1 b = [2] * (2 * 10 ** 7)
8 61.5 MiB -152.6 MiB 1 del b
9 61.5 MiB 0.0 MiB 1 return a
- tracemalloc
tracemalloc
可以追踪 Python 分配的内存块,并提供对象分配内存时的 traceback 信息
import tracemalloc
tracemalloc.start()
# ... 运行你的应用程序 ...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 ]")
for stat in top_stats[:10]:
print(stat)
这段代码首先启动了内存跟踪,然后获取了一个内存快照,并按行号统计了内存分配情况,最后打印出了内存分配最多的前10个文件的统计信息
并发编程
- GIL影响
- Python的全局解释器锁(Global Interpreter Lock,简称GIL)限制了在多核处理器上,同一时间只有一个线程可以执行。GIL的主要作用是保护Python的内存管理机制,因为CPython(Python的官方实现)的内存管理并不是线程安全的。GIL确保了在任何时候只有一个线程可以执行Python代码,从而避免了多线程中的竞态条件。
- 对于I/O密集型任务,GIL的影响较小,因为线程在等待I/O操作时会释放GIL,允许其他线程运行
- 对于CPU密集型任务,GIL的影响较大,因为线程在执行计算时会持有GIL,限制了多核处理器的并行计算能力。
- 可以通过使用多进程来绕过GIL,因为每个进程有自己的Python解释器和内存空间,所以不受GIL的限制
- 使用C扩展或Cython等工具编写性能关键部分的代码,这些代码可以释放GIL或完全在C层面运行,从而实现真正的并行计算。
- 可以考虑使用Jython(运行在Java虚拟机上)或IronPython(运行在.NET框架上)等其他Python实现,它们没有GIL的限制
- 多线程和多进程
- 多线程是指在同一个进程中并行运行多个线程的技术。同一进程下的线程可以共享进程的资源,如内存、文件句柄等。线程间的上下文切换比进程间的上下文切换开销小,因为它们共享相同的内存空间。适用于I/O密集型任务,如Web服务器、文件处理等,因为线程可以在等待I/O操作时释放GIL,允许其他线程运行。
- 多进程是指创建多个进程,每个进程有自己的内存空间和系统资源。进程间的通信需要通过特定的机制,如管道、信号、共享内存等。每个进程可以在不同的CPU核心上同时运行,实现真正的并行处理。适用于CPU密集型任务,如科学计算、图像处理等,因为多进程可以实现真正的并行计算,绕过GIL的限制。
- 协程
- 协程是一种程序组件,协程通常由程序自身控制挂起和恢复执行,而不是由操作系统内核管理,非常适合处理I/O密集型任务。协程的创建和销毁开销远小于线程,可以在单个线程内创建成千上万个协程。
- 协程之间需要协作,主动让出控制权,才能实现多任务并发。
- 使用
async
定义异步函数,使用await
挂起当前协程,让出控制权。
- 异步
- 异步编程是一种编程范式,允许程序在等待异步操作完成时继续执行其他任务,而不是被动等待。
asyncio
库提供了事件循环和异步I/O操作的支持。- 使用
async
定义异步函数,使用await
挂起当前协程,让出控制权。
网络通信
原始socket通信、HTTP通信、Web 框架、WebSocket 通信、RPC等
import requests
response = requests.get('https://www.example.com')
print(response.text)
分布式系统
- 分布式数据库
- MongoDB,一个基于文档的NoSQL数据库。
- Redis,一个基于内存的键值存储数据库。Redis 提供了 RDB 和 AOF 两种持久化方式,确保数据的安全性。
RDB
:在指定的时间间隔内生成数据集的时间点快照。AOF
:记录每次写操作命令,重启时重播这些命令来恢复数据。 - Hadoop,一个基于分布式文档的NoSQL数据库。
- 分布式缓存
- Redis, 支持设置键的过期时间,这使得缓存数据可以自动从存储中删除,避免内存占用过多。
- Memcached,一个简单内存键值内存对象存储系统,自动过期功能实现缓存
import redis
# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 设置键值对
r.set('foo', 'bar')
# 获取键值
value = r.get('foo')
print(value) # 输出:b'bar'
# 设置键 'foo' 10 秒后过期
r.set('foo', 'bar', ex=10)
- 缓存穿透:查询不存在的数据,导致请求直接打到数据库。可以通过布隆过滤器或设置空值来解决。
- 缓存雪崩:大量缓存同时过期,导致大量请求同时打到数据库。可以通过设置不同的过期时间来避免。
- 缓存击穿:一个缓存 key 到期的同时,有大量请求到达,导致数据库压力突增。可以通过加锁或使用延迟双删策略来解决。
- 分布式锁
Redis 可以用于实现分布式锁,以保证操作的原子性
# 尝试获取锁
if r.setnx('lock:key', 'value'):
# 获取锁成功,执行业务逻辑
# ...
# 释放锁
r.delete('lock:key')
- 消息队列
- queue模块,Python标准库中的
queue
模块提供了多生产者和消费者模式下的队列实现,特别适合多线程时的消息交换 - Kafka,基于发布-订阅模式,消息以 topic 为单位进行组织和存储。Kafka由
Producer(生产者)
、Consumer(消费者)
、Broker(Kafka 集群中的一台服务器就是一个 broker)
、Topic(主题
、ZooKeeper(管理 Kafka 集群的元数据和分布式协调)
组成 - Celery,基于分布式消息传递,通常用于异步执行耗时的任务,如发送邮件、处理视频转码、执行数据库迁移等。
# celery_task.py
import celery
import time
backend='redis://127.0.0.1:6379/1'
broker='redis://127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def send_email(name):
print("向%s发送邮件..."%name)
time.sleep(5)
print("向%s发送邮件完成"%name)
return "ok"
# produce_task.py
from celery_task import send_email
result = send_email.delay("yuan")
print(result.id)
result2 = send_email.delay("alex")
print(result2.id)
`celery worker` `-``A celery_app_task` `-``l info`
# result.py
from celery.result import AsyncResult
from celery_task import cel
async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
web 框架
-
Django,提供了一站式的解决方案,包括ORM、管理后台、路由系统、模板引擎等。Django遵循MVC(模型-视图-控制器)设计模式,处理HTTP请求的过程涉及几个关键组件:请求(Request)、中间件(Middleware)、视图(View)、模板(Template)和响应(Response)。Django通过WSGI与Web服务器(如Gunicorn、uWSGI)通信,WSGI是Django应用程序的入口点,它将请求传递给Django处理,并将响应返回给客户端。
-
FastAPI,基于标准Python类型提示,支持异步请求处理。FastAPI 使用基于路径和方法的路由系统来确定哪个路径操作函数(即视图函数)应该处理这个请求。FastAPI 允许使用装饰器来定义路由和相应的 HTTP 方法。一旦路由确定,FastAPI 将请求传递给相应的路径操作函数。FastAPI通过ASGI与Web服务器(如Uvicorn、Daphne)通信,ASGI支持异步请求处理,使得FastAPI可以处理大量的并发连接
-
WSGI 和 ASGI 区别
- WSGI是为同步Web应用设计的,它遵循一个请求一个响应的模型。在处理请求时,WSGI服务器会阻塞直到请求处理完成并返回响应。WSGI适合于CPU密集型任务或I/O等待时间较短的应用。
- ASGI是为了异步Web应用设计的,使用事件循环来管理异步任务。允许服务器在不阻塞的情况下处理大量并发连接。ASGI特别适合于I/O密集型应用,如实时通信、API网关等。