first commit

This commit is contained in:
liao 2025-11-17 13:32:48 +08:00
commit 4431252e30
32 changed files with 1073 additions and 0 deletions

13
Dockerfile Normal file
View File

@ -0,0 +1,13 @@
FROM python:3.11
WORKDIR /app
COPY . /app
RUN pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip3 config set install.trusted-host mirrors.aliyun.com
RUN pip3 install -r requirements.txt

1
README.md Normal file
View File

@ -0,0 +1 @@
量化因子评价系统

19
build.bat Normal file
View File

@ -0,0 +1,19 @@
cd venv/Scripts
call activate
cd ../../
IF EXIST "dist" (
echo "Directory exist."
) else (
md dist
echo "Directory exist created"
)
copy main_pro.py .\dist\
copy config.toml .\dist\config.toml
call nuitka --mingw64 --show-progress --product-version=1.0.0 --standalone --output-dir=dist --onefile --windows-icon-from-ico=icon.ico --remove-output --output-filename=start.exe main_pro.py
pause

6
build.sh Normal file
View File

@ -0,0 +1,6 @@
. venv/bin/activate
cd ../../
nuitka --mingw64 --show-progress --standalone --product-version=1.0.0 --output-dir=dist --onefile --remove-output main.py
cp main.py dist/main.py
cp config.toml dist/config.toml

40
default-config.toml Normal file
View File

@ -0,0 +1,40 @@
# 服务基础配置
[base]
# 服务端口
port=3000
reload=true
# 进程数
workers=1
[project]
# 登录有效期,单位:天, 默认为十天
tokenTIme=0.1
[mysql.DB199]
host="192.168.18.199"
port=3306
username="root"
password="Chlry#$.8"
database="db_gp_cj"
[mysql.DB150]
host="192.168.16.150"
port=3306
username="root"
password="Chlry$%.8pattern"
database="factordb_mysql"
[redis.base]
ip="192.168.18.208"
port=6379
username=""
password="wlkj2018"
db=13
[log]
# 是否开启debug
debug=false

20
main_dev.py Normal file
View File

@ -0,0 +1,20 @@
#coding=utf-8
# 开发
import uvicorn
from src.core.app import startApp
(app, config) = startApp()
if __name__ == "__main__":
config = config.get("base")
if config:
# ,workers=config.get("workers")
uvicorn.run(app="main_dev:app",reload=True,port=config.get("port"),workers=3 )
else:
uvicorn.run(app="main_dev:app",reload=True)

18
main_pro.py Normal file
View File

@ -0,0 +1,18 @@
#coding=utf-8
# 打包运行
import uvicorn
from src.core.app import startApp
(app, config) = startApp()
if __name__ == "__main__":
config = config.get("base")
if config:
# ,workers=config.get("workers")
uvicorn.run(app="main_pro:app",port=config.get("port"),workers=3 )
else:
uvicorn.run(app="main_pro:app")

BIN
requirements.txt Normal file

Binary file not shown.

1
run.bat Normal file
View File

@ -0,0 +1 @@
python main_dev.py

View File

View File

@ -0,0 +1,18 @@
from pydantic import BaseModel,Field, field_validator,validator
import re
class UserData(BaseModel):
account: str = Field(min_length=5,max_length=12,title="账号", default="admin")
password: str = '123456'
# 密码复杂度验证
@field_validator("password")
@classmethod
def validator_password(cls, value):
# if re.match(r'^.*(?=.{6,})(?=.*\d)(?=.*[a-z]|[A-Z]{1,}).*$', value): # 真正需要再用
if len(value) >= 6:
return value
else:
raise ValueError('请输入至少包一位字母和数字的密码')
# return respone.success(code=-1, msg="请输入至少包一位字母和数字")

View File

@ -0,0 +1,12 @@
#coding=utf-8
from fastapi import Request
from datetime import datetime
from src.utils.respone import success
async def get_timestamp(request: Request):
"""
获取服务端时间戳
"""
timestamp = int(datetime.now().timestamp() * 1000) # 毫秒时间戳
return success(data={"timestamp": timestamp}, msg="获取成功")

View File

@ -0,0 +1,90 @@
from typing import List
from fastapi import Request
from src.utils.respone import fail, success
def _extract_first_value(result):
if isinstance(result, (list, tuple)) and result:
return result[0]
return result
async def mysql_ping(request: Request, name: str):
mysql_clients = getattr(request.app, "mysql", {})
if not mysql_clients:
return fail(msg="MySQL 未初始化")
client = mysql_clients.get(name)
if client is None:
return fail(msg=f"未找到名为 {name} 的 MySQL 数据源")
try:
result = await client.query("SELECT 1")
return success(
data={
"name": name,
"status": "ok" if result else "empty",
"result": _extract_first_value(result),
}
)
except Exception as exc:
return fail(
msg="MySQL 连接失败",
ext={"name": name, "detail": str(exc)},
)
async def mysql_overview(request: Request):
mysql_clients = getattr(request.app, "mysql", {})
if not mysql_clients:
return fail(msg="MySQL 未初始化")
overview: List[dict] = []
for name, client in mysql_clients.items():
try:
result = await client.query("SELECT 1")
overview.append(
{
"name": name,
"status": "ok" if result else "empty",
"result": _extract_first_value(result),
}
)
except Exception as exc:
overview.append(
{
"name": name,
"status": "error",
"error": str(exc),
}
)
return success(data=overview)
async def redis_ping(request: Request, name: str):
redis_clients = getattr(request.app, "redis", {})
if not redis_clients:
return fail(msg="Redis 未初始化")
client = redis_clients.get(name)
if client is None:
return fail(msg=f"未找到名为 {name} 的 Redis 数据源")
try:
pong = await client.ping()
return success(
data={
"name": name,
"status": "ok" if pong else "error",
"result": pong,
}
)
except Exception as exc:
return fail(
msg="Redis 连接失败",
ext={"name": name, "detail": str(exc)},
)

View File

@ -0,0 +1,32 @@
from logging import log
from fastapi import APIRouter,Request
from src.controller.base.data_type import UserData
from src.utils.respone import success,fail
from src.utils.token import createToken,vertify_is_login
from src.core.life import REDIS
users_router = APIRouter()
async def route_name(request: Request, userData: UserData):
# res = await Users.filter(account=userData.account).first()
for k in request:
print(k)
baseRedis = request["redis"]["base"]
await baseRedis.set("long.tests.authww", '踢啊n' )
redisstr = await baseRedis.get("long.tests.authww")
print(redisstr.decode('utf8'))
res=None
if(res == None):
return fail(msg="该用户不存在")
if(res.password != userData.password):
return fail(msg="密码错误")
token = createToken({"userId": res.userId, "account": res.account})
# await REDIS["base"].hset("long.tests.auth", res.userId, token)
print(vertify_is_login("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjEsImFjY291bnQiOiJhZG1pbiIsImV4cCI6MTcyNTE4NDgxOX0.Aa_Hb_pGDpCVrZdJrDzkBJYJMmAYBDuzymuliHN4_bQ"))
return success(data=token)

35
src/core/app.py Normal file
View File

@ -0,0 +1,35 @@
import logging
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from src.utils.config import read_config
from src.router import register_router
from src.middleware import register_middleware
from src.core.life import lifespan
from src.middleware.exception import register_exception
from src.core.log import log
def startApp():
app = FastAPI(lifespan=lifespan)
# 静态文件服务
app.mount("/static", StaticFiles(directory="static"), name="static")
# 中间件
register_middleware(app)
# 注册路由
register_router(app)
# 注册错误捕获
register_exception(app)
config = read_config()
# 开启日志显示
if config and config.get("log") and config.get("log")["debug"]:
logging.basicConfig(level=logging.DEBUG)
return (app,config)

84
src/core/life.py Normal file
View File

@ -0,0 +1,84 @@
# 生命周期
#coding=utf-8
from doctest import debug
from math import erf, inf
from uu import Error
from fastapi import FastAPI
from contextlib import asynccontextmanager
from src.utils.config import read_config
# from tortoise.contrib.fastapi import register_tortoise
# from aioredis import create_redis_pool, Redis
import aioredis
from influxdb import InfluxDBClient
from src.core.mysql_core import init_mysql
# 导入定时任务
from src.scheduler import scheduler
# 生命周期函数,监听服务启动完毕和服务结束
REDIS = {}
MYSQL = {}
INFLUXDB={}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
# 将配置文件的配置挂在在app上
config = read_config()
app.state.config = config
# redis 配置
# try:
redisConfig = config["redis"]
mysqlConfig = config["mysql"]
print(redisConfig)
try:
# 连接redis
for redisk in redisConfig:
redis = redisConfig.get(redisk)
REDIS[redisk] = await aioredis.from_url(
f'redis://{redis["ip"]}',
username=redis["username"],
password=redis["password"],
port=redis["port"],
db=redis["db"]
)
print(f"\033[1;32m========= redis-{redisk} connect success !!!=======\033[1;32m")
app.redis = REDIS
# 连接mysql(OK)
for mysql in mysqlConfig:
sql = mysqlConfig.get(mysql)
MYSQL[mysql] = await init_mysql(host=sql["host"],
port=sql["port"],
user=sql["username"],
password=sql["password"],
db=sql["database"], debug=True)
print(f"\033[1;32m========= mysql-{mysql} connect success !!!=======\033[1;32m")
app.mysql = MYSQL
except IOError:
print("连接Redis失败", IOError)
print("\033[1;32m========= app ready success !!!=======\033[1;32m")
# 启动定时任务
scheduler.start()
print("任务队开始")
yield # 在 `yield` 之前,将在应用程序启动之前执行;而 `yield` 之后的部分将在应用程序完成之后执行。
# 关闭定时任务
scheduler.shutdown()
print("任务队结束")
print("生命到头")

38
src/core/log.py Normal file
View File

@ -0,0 +1,38 @@
from os import name
from loguru import logger
class Log:
log=None
errLog=None
sysLog=None
traceLog=None
def __init__(self):
# 日志输出规范
logger_format = "{time:YYYY-MM-DD HH:mm:ss,SSS} [{thread}] - {message}"
# 日志分割时间
rotation = "00:00"
# 日志保留时间, 空格一定要有
retention="7 days"
# filter = lambda record: record
logger.add('logs/{time:YYYY-MM-DD}/err_{time:YYYY-MM-DD}.log', rotation=rotation, retention=retention, format=logger_format, filter=lambda record: record["extra"].get("name") == "err")
logger.add('logs/{time:YYYY-MM-DD}/sys_{time:YYYY-MM-DD}.log', rotation=rotation, retention=retention, format=logger_format, filter=lambda record: record["extra"].get("name") == "sys")
logger.add('logs/{time:YYYY-MM-DD}/trace_{time:YYYY-MM-DD}.log', rotation=rotation, retention=retention, format=logger_format,filter=lambda record: record["extra"].get("name") == "trace")
self.errLog = logger.bind(name="err")
self.sysLog = logger.bind(name="sys")
self.traceLog = logger.bind(name="trace")
def err(self, msg):
self.errLog.error(msg)
def sys(self, msg):
self.sysLog.info(msg)
def trace(self, msg):
self.traceLog.info(msg)
log = Log()

160
src/core/mysql_core.py Normal file
View File

@ -0,0 +1,160 @@
#coding=utf-8
from ast import If
from enum import Flag
from tkinter import NO, SW
from typing import Any, Union
from uu import Error
from xmlrpc.client import Boolean
import aiomysql
import re
async def init_mysql(
host: str,
port: Union[int, str],
user: str,
password: str=None,
db: str=None,
debug = False
):
mysql = Mysql(
host=host,
port=port,
user=user,
password=password,
db=db,
debug= debug
)
await mysql.connect()
return mysql
class Obj:
replacement: dict=None
whereStr: str=None
class ReplacementError(Error):
pass
"""
数据类型字符添加如字符串增加
"""
def data_type_chart(data):
if isinstance(data, str) :
return f"'{data}'"
return data
def str_to_sql(sql, replacement)->str:
pattern = r'\(\s*(.*?)\s*\)' # 去掉括号中间的空格
sql = re.sub(pattern, r'(\1)', sql)
sqlList = sql.split(" ")
arr = []
for data in sqlList:
if re.search(r'=:', data):
# A=:B
data = data.split("=:")
key = data[1]
if (key in replacement) == False:
raise Error(f"清检查replacement->{key}")
replacement[key] = data_type_chart(replacement[key])
data = f'{data[0]}={replacement[key]}'
elif re.search(r'^\(\:', data):
# a IN/in (:)
key = data.replace('(:', '').replace(")", '')
value = ''
if (key in replacement) == False:
raise Error(f"清检查replacement->{data}")
for v in replacement[key]:
v = data_type_chart(v)
value += f',{v}'
data = f"({value[1:]})"
arr.append(data)
sql = ' '.join(arr)
return sql
class Mysql:
client: Any = None
def __init__(
self,
host: str,
port: Union[int, str],
user: str,
password: str=None,
db: str=None,
debug = False
):
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self.host = host
self.host = host
self.debug = debug
# 连接数据库
async def connect(self):
try:
conn = await aiomysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, db=self.db)
self.client = await conn.cursor()
except Error as err:
# except aiomysql.OperationalError as err:
# 连接失败
print(f"\033[1;31m连接mysqlip:'{self.host}',db:'{self.db}'失败ERROR: {err}\033[0m")
raise aiomysql.OperationalError("连接失败")
# 单个查询
async def query(self, sql: str=None, obj: Obj=None):
if sql is None or len(sql) == 0:
return None
replacement = obj.get("replacement") if isinstance(obj, dict) else None
# 带替换标签,但没有给替换值
if re.search(r"\s*:\s*", sql) and replacement is None:
raise Error("缺少replacement")
sql = str_to_sql(sql, replacement)
where_str = obj.get("whereStr") if isinstance(obj, dict) else None
if where_str:
sql += str_to_sql(where_str, replacement)
if self.debug:
print(sql)
await self.client.execute(sql)
return await self.client.fetchone()
# 多个查询
async def queryList(self, sql: str=None, obj: Obj=None):
if sql is None or len(sql) == 0:
return None
replacement = obj.get("replacement") if isinstance(obj, dict) else None
# 带替换标签,但没有给替换值
if re.search(r"\s*:\s*", sql) and replacement is None:
raise Error("缺少replacement")
sql = str_to_sql(sql, replacement)
if self.debug:
print(sql)
await self.client.execute(sql)
return await self.client.fetchall()

View File

@ -0,0 +1,17 @@
from .authMiddleware import AuthMiddleware
from fastapi import Request,FastAPI,status
from starlette.middleware.cors import CORSMiddleware
# 注册中间件
def register_middleware(app: FastAPI):
# cors解决跨域
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_headers=["*"]
)
# 健全
app.add_middleware(AuthMiddleware)

View File

@ -0,0 +1,54 @@
#coding=utf-8
from fastapi import Request,FastAPI,status
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
import re
from fastapi import HTTPException
from starlette.responses import Response
from starlette.types import ASGIApp
from src.utils.respone import fail
from src.core.life import MYSQL, REDIS
# 权限验证
class AuthMiddleware(BaseHTTPMiddleware):
#
def __init__(self, app):
super().__init__(app)
#
async def dispatch(self, request: Request, call_next) -> Response:
# 将数据库和redis相关挂载到requst上
request.scope["redis"] = REDIS
request.scope["mysql"] = MYSQL
api = str(request.url).replace(str(request.base_url), '')
path = api.split("?")[0]
whitelist_prefixes = ("api/users/login", "api/diagnostics/", "api/demo/")
# api过滤
if (len(path) == 0 or (len(path) > 0 and re.match(r'api/', path))) and not any(
path.startswith(prefix) for prefix in whitelist_prefixes
):
# 不在忽略内的接口需要进行token验证
print("===未登录====", api)
return fail(status_code=status.HTTP_401_UNAUTHORIZED, msg="请登录")
request.scope["userInfo"] = "天才"
# for it in request:
# print(it)
response = await call_next(request)
response.status_code = status.HTTP_200_OK
print(">>>>", )
# 字段验证出错响应修改
# if response.status_code == 422:
# return JSONResponse(
# status_code=status.HTTP_200_OK,
# content=jsonable_encoder({'masg': '权限不够'}))
return response

View File

@ -0,0 +1,35 @@
# 统一异常捕获处理
#coding=utf-8
from fastapi import FastAPI, Request, status
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from src.utils.respone import fail,success
import re
def register_exception(app: FastAPI):
"""
重写捕获字段验证报错
"""
@app.exception_handler(RequestValidationError)
def handler_try_Validation(request: Request, err: RequestValidationError):
print("err.body==", err.errors())
msg = err.errors()[0].get("msg")
if msg == "field required":
msg = "请求失败,缺少必填项!"
elif msg == "value is not a valid list":
print(err.errors())
msg = f"类型错误,提交参数应该为列表!"
elif msg == "value is not a valid int":
msg = f"类型错误,提交参数应该为整数!"
elif msg == "value could not be parsed to a boolean":
msg = f"类型错误,提交参数应该为布尔值!"
elif msg == "Input should be a valid list":
msg = f"类型错误,输入应该是一个有效的列表!"
elif re.match(r'Value error', msg):
arr = re.match(r'Value error', msg)
startIndex = arr.span()[1] + 1 # 加1是为了去掉逗号
msg = msg[startIndex:].strip() # strip()去掉字符串左右空白
return fail(msg)

7
src/router/__init__.py Normal file
View File

@ -0,0 +1,7 @@
#coding=utf-8
from fastapi import FastAPI
from src.router.base import baseApi
# 注册路由
def register_router(app:FastAPI):
app.include_router(baseApi)

34
src/router/base.py Normal file
View File

@ -0,0 +1,34 @@
#coding=utf-8
from fastapi import APIRouter
from fastapi.responses import FileResponse
from src.controller.base import diagnostics, users, demo
baseApi = APIRouter()
baseApi.post('/api/users/login', tags=["用户管理"], summary="登录")(users.route_name)
baseApi.get(
'/api/diagnostics/mysql',
tags=["系统诊断"],
summary="MySQL 连接概览",
)(diagnostics.mysql_overview)
baseApi.get(
'/api/diagnostics/mysql/{name}',
tags=["系统诊断"],
summary="检测单个 MySQL 数据源",
)(diagnostics.mysql_ping)
baseApi.get(
'/api/diagnostics/redis/{name}',
tags=["系统诊断"],
summary="检测单个 Redis 数据源",
)(diagnostics.redis_ping)
baseApi.get(
'/api/demo/timestamp',
tags=["Demo"],
summary="获取服务端时间戳",
)(demo.get_timestamp)
# Demo 页面路由
@baseApi.get('/demo', tags=["Demo"], summary="Demo 页面")
@baseApi.get('/', tags=["Demo"], summary="首页")
async def demo_page():
return FileResponse('static/demo.html')

33
src/scheduler/__init__.py Normal file
View File

@ -0,0 +1,33 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime, timedelta
scheduler = AsyncIOScheduler()
from .everyOneMinute import everyOneMinute
# 每分钟
"""
--------------cron------------
year (int|str) 4-digit year
month (int|str) month (1-12)
day (int|str) day of month (1-31)
week (int|str) ISO week (1-53)
day_of_week (int|str) number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) hour (0-23)
minute (int|str) minute (0-59)
second (int|str) second (0-59)
start_date - 指定开始时间 (datetime|str) starting point for the interval calculation
"""
# 注册定时任务
scheduler.add_job(everyOneMinute , "cron", hour='*/12')

View File

@ -0,0 +1,18 @@
'''
每分钟执行一次
'''
from datetime import datetime, timedelta
def everyOneMinute():
print(f"The current time is {datetime.now()}")
print("较久")

32
src/utils/config.py Normal file
View File

@ -0,0 +1,32 @@
# 配置文件数据操作
#coding=utf-8
import os
import toml
_config_cache = None
def _load_config():
config_path = "config.toml"
if os.path.exists(config_path):
return toml.load(config_path)
return toml.load("default-config.toml")
def read_config(key=None):
global _config_cache
try:
if _config_cache is None:
_config_cache = _load_config()
if key is not None:
return _config_cache.get(key)
return _config_cache
except IOError as e:
print("配置读取失败,请检查是否有配置文件或者配置是否正确", e)
return None

41
src/utils/respone.py Normal file
View File

@ -0,0 +1,41 @@
#coding=utf-8
from fastapi import Request,FastAPI,status
from fastapi.encoders import jsonable_encoder
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.responses import JSONResponse
from typing import Any,Dict,Optional
"""
# 成功响应规范
# data : Any 响应主题数据
# msg : str|None 请求消息
# code : int|1 1为成功-1或0为失败
"""
def success(data: Any=None, msg: Optional[str]="请求成功", code=1, ext: Any=None, status_code=status.HTTP_200_OK):
return JSONResponse(
status_code=status_code,
content=jsonable_encoder({
'data': data,
"msg": msg,
"code": code,
"ext": ext
})
)
"""
# 成功响应规范
# data : Any 响应主题数据
# msg : str|None 请求消息
# code : int|1 1为成功-1或0为失败
"""
def fail(msg: Optional[str]="请求失败", code=-1, ext: Any=None, status_code=status.HTTP_200_OK ):
return JSONResponse(
status_code=status_code,
content=jsonable_encoder({
'data': None,
"msg": msg,
"code": code,
"ext": ext
})
)

61
src/utils/token.py Normal file
View File

@ -0,0 +1,61 @@
#coding=utf-8
from ast import If
from typing import Any
import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from datetime import datetime, timedelta,timezone
from src.utils.config import read_config
# 密钥用于签名和验证JWT
SECRET_KEY = 'fastapi_longyu12'
# 令牌过期时间 默认十天
EXP = datetime.now(timezone.utc)+timedelta(days=1)
project = read_config("project")
if project.get("tokenTIme"):
EXP = datetime.now(timezone.utc)+ timedelta(days=project.get("tokenTIme"))
"""
生成 token 令牌
@payload: Any 存在token里面的信息
"""
def createToken(payload: Any):
payload["exp"] = EXP
token = jwt.encode(payload, SECRET_KEY, algorithm='HS256' )
return token
def vertify_is_login(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"], verify=False)
print(int(payload.get("exp")) , int((datetime.now(timezone.utc)).timestamp()))
if int(payload.get("exp")) < int((datetime.now(timezone.utc)).timestamp()):
print("登录已过期")
return False
else:
return True
except jwt.exceptions.ExpiredSignatureError:
print("登录已过期")
return False
except (jwt.exceptions.InvalidSignatureError, jwt.exceptions.DecodeError):
print("无效凭证")
return False
# def getTokenInfo():
# 载荷即JWT中包含的信息
# payload = {
# 'user_id': 123,
# 'username': 'john_doe',
# 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)
# }
# # 生成JWT
# token = jwt.encode(payload, secret_key, algorithm='HS256')
# print('Generated JWT:', token)

52
static/css/demo.css Normal file
View File

@ -0,0 +1,52 @@
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
background: white;
min-height: 100vh;
display: flex;
justify-content: center;
align-items: flex-start;
padding-top: 50px;
}
.container {
width: 100%;
max-width: 800px;
padding: 20px;
}
h1 {
text-align: center;
color: #333;
margin-bottom: 40px;
font-size: 28px;
}
.timestamp {
background: #f5f5f5;
padding: 20px;
border-radius: 8px;
text-align: center;
}
.timestamp > div:first-child {
color: #666;
margin-bottom: 10px;
}
.timestamp-value {
color: #333;
font-weight: bold;
font-size: 18px;
}
.loading {
color: #999;
font-style: italic;
}

21
static/demo.html Normal file
View File

@ -0,0 +1,21 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>量化因子评价系统 - Demo</title>
<link rel="stylesheet" href="/static/css/demo.css">
</head>
<body>
<div class="container">
<h1>量化因子评价系统</h1>
<div class="timestamp">
<div>服务端时间戳:</div>
<div class="timestamp-value" id="timestamp">
<span class="loading">加载中...</span>
</div>
</div>
</div>
<script src="/static/js/demo.js"></script>
</body>
</html>

18
static/js/demo.js Normal file
View File

@ -0,0 +1,18 @@
async function fetchTimestamp() {
try {
const response = await fetch('/api/demo/timestamp');
const data = await response.json();
if (data.code === 1) {
document.getElementById('timestamp').textContent = data.data.timestamp;
} else {
document.getElementById('timestamp').textContent = '获取失败: ' + data.msg;
}
} catch (error) {
document.getElementById('timestamp').textContent = '请求失败: ' + error.message;
}
}
// 页面加载时获取时间戳
fetchTimestamp();

63
笔记.md Normal file
View File

@ -0,0 +1,63 @@
not
in- 检查字段的值是否位于传递列表中
not_in
gte- 大于或等于传递的值
gt- 大于传递的值
lte- 低于或等于传递的值
lt- 低于传递值
range- 介于和给定两个值之间
isnull- 字段为空
not_isnull- 字段不为空
contains- 字段包含指定的子字符串
icontains- 不区分大小写contains
startswith- 如果字段以值开头
istartswith- 不区分大小写startswith
endswith- 如果字段以值结尾
iendswith- 不区分大小写endswith
iequals- 区分大小写等于
source_field :自定义数据库对应字段名称
generated :是否映射到数据库,
pk 是否为主键
null
default 可以为值或可调用对象
unique
index
description 描述功能,数据库注释用
# 换源
pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
pip config set install.trusted-host mirrors.aliyun.com
# 安装aioredis报错duplicate base class TimeoutError
解决方法https://blog.csdn.net/ViniJack/article/details/131809573
# 创建虚拟环境
```
python -m venv venv
```
# 进入虚拟环境
```
venv/Scripts/activate
```
# 生成类似 packpage.json的文件
```python
pip freeze > requirements.txt
```
## 安装requirements.txt依赖
```python
pip install -r requirements.txt
```
++++++++++++++++Linux++++++++++++++++++++++
#创建虚拟环境
python3 -m venv venv
. venv/bin/activate