DolphinDB 的响应式状态引擎(Reactive State Engine),接收一个在历史数据上经过验证的 DolphinDB 因子代码,应用于实时行情数据,输出实时因子值,实现了高频因子流批一体的解决方案。具体教程请参考DolphinDB 响应式状态引擎介绍

如上“快照数据的因子计算”节中基于快照历史行情数据计算的时间加权订单斜率、成交价加权净委买比例、十档委买增额和十档买卖委托均价线性回归斜率因子表达式,将这些因子直接代入响应式状态引擎,以实现对流数据的实时因子计算。

在基于历史数据批量计算加权平均订单失衡率因子的表达式中,在进行因子标准化处理时,使用了迭代的算法(当前标准差很小时,使用上一标准化的因子值),这类因子在流式计算时需要改为流式引擎计算支持的表达式。而其它的高频因子计算中没有涉及迭代算法,可以直接代入流式表达式进行实时因子计算。

十档平均委卖订单斜率因子流式计算实现如下:

@state

def wavgSOIRStream(bidQty,askQty,lag=20){

Imbalance_=rowWavg((bidQty-askQty)\(bidQty+askQty),

10 9 8 7 6 5 4 3 2 1)

Imbalance= ffill(Imbalance_).nullFill(0)

mean = mavg(prev(Imbalance), (lag-1), 2)

std = mstdp(prev(Imbalance) *1000000, (lag-1),2) \ 1000000

factorValue = conditionalIterate(std >= 0.0000001,

(Imbalance - mean) \ std, cumlastNot)

return ffill(factorValue).nullFill(0)

}把批计算中的 iif(std >= 0.0000001,(Imbalance - mean) \ std, NULL) 改为 conditionalIterate(std >= 0.0000001,(Imbalance - mean) \ std, cumlastNot)。

conditionalIterate 函数只适用于响应式状态引擎,通过条件迭代实现因子中的递归逻辑。假设该函数计算结果对应输出表的列为 factor,且迭代仅基于前一个值,对于第 k 条记录(k = 0, 1, 2 …),其计算逻辑为:

cond[k] == true:factor[k] = trueValuecond[k] == false:factor[k] = falseIterFunc(factor)[k-1]基于Level-2 行情快照数据的高频因子流式计算脚本如下:

metrics = array(ANY, 5)

metrics[0]=

metrics[1] =

metrics[2] =

metrics[3] =

metrics[4] =

share streamTable(1:0, `SecurityID`DateTime`BidPrice`OfferPrice`BidOrderQty`OfferOrderQty`TotalValueTrade`TotalVolumeTrade, [STRING,TIMESTAMP,DOUBLE[],DOUBLE[],DOUBLE[],DOUBLE[],INT,INT]) as Streamdata

result = table(1000:0, `SecurityID`DateTime`TimeWeightedOrderSlope`Level10_InferPriceTrend`TraPriceWeightedNetBuyQuoteVolumeRatio`height_Imbalance, [STRING,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE])

rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=Streamdata, outputTable=result, keyColumn="SecurityID")

subscribeTable(tableName=`Streamdata, actionName="factors", handler=tableInsert{rse})

snapshotTB=select* from loadTable("dfs://TSDB_snapshot","snapshot") where date(TradeTime)=2022.04.14 and SecurityID in [`600000,`000001]

data1=select SecurityID,TradeTime as DateTime,BidPrice,OfferPrice,BidOrderQty,OfferOrderQty,TotalValueTrade,TotalVolumeTrade from snapshotTB

Streamdata.append!(data1)

流式计算输出结果展示:


华硕R454LJ笔记本电脑的性能与特点(性能卓越,轻薄便携的理想选择)
不甘心的意思