Halo
发布于 2024-11-05 / 79 阅读 / 0 评论 / 0 点赞

python 量化架构

量化策略

  1. 机器学习模型
  • 随机森林:输入一组特征(或称为属性、变量),随机选择一部分特征进行构建一个决策树,每棵树给出一个类别预测,对于分类问题,每棵树给出一个类别预测,最终的预测结果是通过多数投票确定的;对于回归问题,则是所有树预测结果的平均值。
  • 神经网络:构建输入层、隐藏层和输出层组成的神经网络,每个层由一个或多个激活函数(如ReLU、sigmoid、tanh等)组成神经元,激活函数用来学习和模拟复杂的函数映射。输入数据在网络中从输入层经过每一层的神经元处理,直到输出层。在训练过程中,网络使用反向传播算法来计算损失函数关于每个参数的梯度,并使用梯度下降或其他优化算法来更新参数。通过多次迭代训练,不断调整权重和偏置,直到模型在训练数据上达到满意的性能。
  1. 统计模型
  • 回归分析: 包含线性回归、非线性回归、逻辑回归(因子回归)
  1. 时间序列分析
  • ARIMA: 由自回归(AR)、差分(I)和移动平均(MA)三部分组成。自回归部分捕捉时间序列的自相关关系,使用过去时刻的观测值来预测当前时刻的值。自回归模型的阶数为p。差分是将非平稳时间序列转换为平稳时间序列的关键步骤。差分的次数d表示为了使序列平稳需要进行多少次差分。移动平均部分关注的是自回归模型中的误差项的累加,反映了在t时刻的目标值与前t-1~q个误差值之间存在的线性关系,其中q为移动平均项数。

系统架构

  1. 高可用性设计
  • 负载均衡:将工作负载(如网络流量或请求)均匀分配到多个服务器或计算资源上的技术,以优化资源使用、最大化吞吐量、最小化响应时间,并避免任何单一点过载。
  • 冗余服务:系统中部署额外的服务实例,以确保主服务发生故障时,备用服务可以接管工作,从而保证服务的连续性。
  • 数据库复制:将一个数据库中的数据实时或准实时地复制到另一个数据库的过程,以实现数据的冗余和备份。
  • 故障转移机制:当系统的主要组件发生故障时,自动将工作负载转移到备用组件的过程,以确保服务的连续性和数据的完整性。
  1. 架构模式
  • 分层架构模式:将系统分解为多个层次,每一层都有特定的职责,例如表示层、业务逻辑层、数据访问层和数据层。
  • 微服务架构模式:将应用程序构建为一系列小型、独立的服务,每个服务运行在自己的进程中,并通过轻量级的通信机制(通常是HTTP RESTful API)进行交互
  • 事件驱动架构模式:系统组件之间的交互是基于事件的发布和订阅,而不是直接的函数调用
  1. 代码执行效率和准确性
  • 可以通过单元测试、代码审查、性能分析工具(如cProfile)来确保效率和准确性。
pr = cProfile.Profile()
pr.enable()
my_function()
pr.disable()
pr.dump_stats('profile_output.prof')
import pstats
p = pstats.Stats('profile_output.prof')
p.sort_stats('cumulative').print_stats(10)  # 显示累积时间最长的10个函数
# 或者
p.sort_stats('time').print_stats(10)  # 显示内部时间最长的10个函数

cProfile 的输出结果包括多个列,以下是一些常见的列:

  • ncalls:函数被调用的次数。
  • tottime:在函数内部花费的总时间(不包括调用其他函数的时间)。
  • percall:每次调用函数的平均时间。
  • cumtime:包括函数调用其他函数在内的总时间。
  • percall:包括调用其他函数在内的每次调用的平均时间。
  • filename:lineno(function):函数所在的文件和行号。

性能调优

包括代码层面的优化(如算法优化、数据结构选择)、系统层面的优化(如内存管理、垃圾回收调优)。

  1. 内存管理、垃圾回收调优:
    Python使用引用计数来跟踪每个对象的引用数量。当一个对象的引用计数达到零时,它将被垃圾回收。 Python还使用内存池来管理对象的分配和释放,以减少内存碎片和提高性能。Python使用一个私有的垃圾收集器来处理循环引用,这个收集器主要关注包含容器对象(如列表、字典、类实例等)的循环引用。Python提供了一个gc模块,允许你控制垃圾收集器的行为,包括启动垃圾收集、获取当前内存使用情况和设置阈值。可以通过调整gc模块中的gc.set_threshold()函数来控制垃圾收集器的行为,该函数接受三个参数:gc.set_threshold(threshold0, threshold1, threshold2),这些参数决定了何时启动垃圾收集。
  • 对象创建优化:使用局部变量而不是全局变量,或者使用生成器而不是列表推导式来处理大型数据集
  • 数据类型优化:使用tuple代替list,或者使用set代替dict来存储无序集合。使用array.arraynumpy数组来存储大量数值数据。
  • 对于类实例,使用__slots__可以节省内存,因为它防止了实例字典的创建

内存泄漏诊断

  1. memory_profiler
    memory_profiler 可帮助开发者找到可能的内存泄漏点,即那些分配了但未被及时释放的内存。
# myscript.py
from memory_profiler import profile

@profile
def my_func():
    a = [1] * (10 ** 6)
    b = [2] * (2 * 10 ** 7)
    del b
	return a

if __name__ == "__main__":
    my_func()
python -m memory_profiler myscript.py
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     4     53.9 MiB     53.9 MiB           1   @profile
     5                                         def my_func():
     6     61.5 MiB      7.6 MiB           1       a = [1] * (10 ** 6)
     7    214.1 MiB    152.6 MiB           1       b = [2] * (2 * 10 ** 7)
     8     61.5 MiB   -152.6 MiB           1       del b
     9     61.5 MiB      0.0 MiB           1       return a
  1. tracemalloc
    tracemalloc 可以追踪 Python 分配的内存块,并提供对象分配内存时的 traceback 信息
import tracemalloc
tracemalloc.start()
# ... 运行你的应用程序 ...
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 ]")
for stat in top_stats[:10]:
    print(stat)

这段代码首先启动了内存跟踪,然后获取了一个内存快照,并按行号统计了内存分配情况,最后打印出了内存分配最多的前10个文件的统计信息

并发编程

  1. GIL影响
  • Python的全局解释器锁(Global Interpreter Lock,简称GIL)限制了在多核处理器上,同一时间只有一个线程可以执行。GIL的主要作用是保护Python的内存管理机制,因为CPython(Python的官方实现)的内存管理并不是线程安全的。GIL确保了在任何时候只有一个线程可以执行Python代码,从而避免了多线程中的竞态条件。
  • 对于I/O密集型任务,GIL的影响较小,因为线程在等待I/O操作时会释放GIL,允许其他线程运行
  • 对于CPU密集型任务,GIL的影响较大,因为线程在执行计算时会持有GIL,限制了多核处理器的并行计算能力。
  • 可以通过使用多进程来绕过GIL,因为每个进程有自己的Python解释器和内存空间,所以不受GIL的限制
  • 使用C扩展或Cython等工具编写性能关键部分的代码,这些代码可以释放GIL或完全在C层面运行,从而实现真正的并行计算。
  • 可以考虑使用Jython(运行在Java虚拟机上)或IronPython(运行在.NET框架上)等其他Python实现,它们没有GIL的限制
  1. 多线程和多进程
  • 多线程是指在同一个进程中并行运行多个线程的技术。同一进程下的线程可以共享进程的资源,如内存、文件句柄等。线程间的上下文切换比进程间的上下文切换开销小,因为它们共享相同的内存空间。适用于I/O密集型任务,如Web服务器、文件处理等,因为线程可以在等待I/O操作时释放GIL,允许其他线程运行。
  • 多进程是指创建多个进程,每个进程有自己的内存空间和系统资源。进程间的通信需要通过特定的机制,如管道、信号、共享内存等。每个进程可以在不同的CPU核心上同时运行,实现真正的并行处理。适用于CPU密集型任务,如科学计算、图像处理等,因为多进程可以实现真正的并行计算,绕过GIL的限制。
  1. 协程
  • 协程是一种程序组件,协程通常由程序自身控制挂起和恢复执行,而不是由操作系统内核管理,非常适合处理I/O密集型任务。协程的创建和销毁开销远小于线程,可以在单个线程内创建成千上万个协程。
  • 协程之间需要协作,主动让出控制权,才能实现多任务并发。
  • 使用async定义异步函数,使用await挂起当前协程,让出控制权。
  1. 异步
  • 异步编程是一种编程范式,允许程序在等待异步操作完成时继续执行其他任务,而不是被动等待。
  • asyncio库提供了事件循环和异步I/O操作的支持。
  • 使用async定义异步函数,使用await挂起当前协程,让出控制权。

网络通信

原始socket通信、HTTP通信、Web 框架、WebSocket 通信、RPC等

import requests

response = requests.get('https://www.example.com')
print(response.text)

分布式系统

  1. 分布式数据库
  • MongoDB,一个基于文档的NoSQL数据库。
  • Redis,一个基于内存的键值存储数据库。Redis 提供了 RDB 和 AOF 两种持久化方式,确保数据的安全性。RDB:在指定的时间间隔内生成数据集的时间点快照。AOF:记录每次写操作命令,重启时重播这些命令来恢复数据。
  • Hadoop,一个基于分布式文档的NoSQL数据库。
  1. 分布式缓存
  • Redis, 支持设置键的过期时间,这使得缓存数据可以自动从存储中删除,避免内存占用过多。
  • Memcached,一个简单内存键值内存对象存储系统,自动过期功能实现缓存
import redis

# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 设置键值对
r.set('foo', 'bar')

# 获取键值
value = r.get('foo')
print(value)  # 输出:b'bar'

# 设置键 'foo' 10 秒后过期
r.set('foo', 'bar', ex=10)
  • 缓存穿透:查询不存在的数据,导致请求直接打到数据库。可以通过布隆过滤器或设置空值来解决。
  • 缓存雪崩:大量缓存同时过期,导致大量请求同时打到数据库。可以通过设置不同的过期时间来避免。
  • 缓存击穿:一个缓存 key 到期的同时,有大量请求到达,导致数据库压力突增。可以通过加锁或使用延迟双删策略来解决。
  1. 分布式锁
    Redis 可以用于实现分布式锁,以保证操作的原子性
# 尝试获取锁
if r.setnx('lock:key', 'value'):
    # 获取锁成功,执行业务逻辑
    # ...

# 释放锁
r.delete('lock:key')
  1. 消息队列
  • queue模块,Python标准库中的queue模块提供了多生产者和消费者模式下的队列实现,特别适合多线程时的消息交换
  • Kafka,基于发布-订阅模式,消息以 topic 为单位进行组织和存储。Kafka由Producer(生产者)Consumer(消费者)Broker(Kafka 集群中的一台服务器就是一个 broker)Topic(主题ZooKeeper(管理 Kafka 集群的元数据和分布式协调)组成
  • Celery,基于分布式消息传递,通常用于异步执行耗时的任务,如发送邮件、处理视频转码、执行数据库迁移等。
# celery_task.py
import celery
import time
backend='redis://127.0.0.1:6379/1'
broker='redis://127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def send_email(name):
    print("向%s发送邮件..."%name)
    time.sleep(5)
    print("向%s发送邮件完成"%name)
    return "ok"
# produce_task.py
from celery_task import send_email
result = send_email.delay("yuan")
print(result.id)
result2 = send_email.delay("alex")
print(result2.id)
`celery worker` `-``A celery_app_task` `-``l info`
# result.py
from celery.result import AsyncResult
from celery_task import cel

async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

web 框架

  1. Django,提供了一站式的解决方案,包括ORM、管理后台、路由系统、模板引擎等。Django遵循MVC(模型-视图-控制器)设计模式,处理HTTP请求的过程涉及几个关键组件:请求(Request)、中间件(Middleware)、视图(View)、模板(Template)和响应(Response)。Django通过WSGI与Web服务器(如Gunicorn、uWSGI)通信,WSGI是Django应用程序的入口点,它将请求传递给Django处理,并将响应返回给客户端。

  2. FastAPI,基于标准Python类型提示,支持异步请求处理。FastAPI 使用基于路径和方法的路由系统来确定哪个路径操作函数(即视图函数)应该处理这个请求。FastAPI 允许使用装饰器来定义路由和相应的 HTTP 方法。一旦路由确定,FastAPI 将请求传递给相应的路径操作函数。FastAPI通过ASGI与Web服务器(如Uvicorn、Daphne)通信,ASGI支持异步请求处理,使得FastAPI可以处理大量的并发连接

  3. WSGI 和 ASGI 区别

  • WSGI是为同步Web应用设计的,它遵循一个请求一个响应的模型。在处理请求时,WSGI服务器会阻塞直到请求处理完成并返回响应。WSGI适合于CPU密集型任务或I/O等待时间较短的应用。
  • ASGI是为了异步Web应用设计的,使用事件循环来管理异步任务。允许服务器在不阻塞的情况下处理大量并发连接。ASGI特别适合于I/O密集型应用,如实时通信、API网关等。

评论