PTSQ 量化的基本步骤为:
1、给模型插入QDQ节点
-
1、自动插入
使用 quant_modules.initialize() 自动插入量化节点 -
2、手动插入
使用 quant_modules.initialize() 初始化量化操作或使用 QuantDescriptor() 自定义初始化量化操作
编写代码为模型插入量化节点
2、数据标定校准
-
1、给模型喂数据,收集每个层的输入输出信息
-
2、手动插入
根据统计信息,计算动态范围range和scale,保存在QDQ节点中
3、敏感层分析
- 1、只开启(或关闭)某一层的量化,然后进行精度对比,如果精度影响过大,关闭这一层的量化,前向计算时使用fp16。
- 2、。
3、导出模型
- 1、quant_nn.TensorQuantizer.use_fb_fake_quant 属性设置为 true
- 2、torch.onnx.export() 导出 ONNX 模型
yoloV7 ptq 量化代码:
import os
import yaml
import test
import torch
import collections
from pathlib import Path
from models.yolo import Model
from pytorch_quantization import calib
from absl import logging as quant_logging
from utils.datasets import create_dataloader
from pytorch_quantization import quant_modules
from pytorch_quantization import nn as quant_nn
from pytorch_quantization.tensor_quant import QuantDescriptor
from pytorch_quantization.nn.modules import _utils as quant_nn_utilsdef load_yolov7_model(weight, device='cpu'):ckpt = torch.load(weight, map_location=device)model = Model("cfg/training/yolov7.yaml", ch=3, nc=20).to(device)state_dict = ckpt['model'].float().state_dict()model.load_state_dict(state_dict, strict=False)return modeldef prepare_val_dataset(cocodir, batch_size=32):dataloader = create_dataloader(f"{cocodir}/val2017.txt",imgsz=640,batch_size=batch_size,opt=collections.namedtuple("Opt", "single_cls")(False),augment=False, hyp=None, rect=True, cache=False, stride=32, pad=0.5, image_weights=False)[0]return dataloaderdef prepare_train_dataset(cocodir, batch_size=4):with open("data/hyp.scratch.p5.yaml") as f:hyp = yaml.load(f, Loader=yaml.SafeLoader)dataloader = create_dataloader(f"{cocodir}/train2017.txt",imgsz=640,batch_size=batch_size,opt=collections.namedtuple("Opt", "single_cls")(False),augment=True, hyp=hyp, rect=True, cache=False, stride=32, pad=0, image_weights=False)[0]return dataloader# input: Max ==> Histogram
def initialize():quant_desc_input = QuantDescriptor(calib_method='histogram')quant_nn.QuantConv2d.set_default_quant_desc_input(quant_desc_input)quant_nn.QuantMaxPool2d.set_default_quant_desc_input(quant_desc_input)quant_nn.QuantLinear.set_default_quant_desc_input(quant_desc_input)quant_logging.set_verbosity(quant_logging.ERROR)def prepare_model(weight, device):# quant_modules.initialize()initialize()model = load_yolov7_model(weight, device)model.float()model.eval()with torch.no_grad():model.fuse() # conv bn 进行层的合并, 加速return model#核心函数,用于将一个标准的 pytorch 模块实例转换为其量化版本
def tranfer_torch_to_quantization(nn_instance, quant_module):quant_instances = quant_module.__new__(quant_module)# 属性赋值for k, val in vars(nn_instance).items():setattr(quant_instances, k, val)# 初始化def __init__(self):# 返回两个 QuantDescriptor 的实例 self.__class__ 是 quant_instance 的类, QuantConv2dquant_desc_input, quant_desc_weight = quant_nn_utils.pop_quant_desc_in_kwargs(self.__class__)if isinstance(self, quant_nn_utils.QuantInputMixin):self.init_quantizer(quant_desc_input)# 加快量化速度if isinstance(self._input_quantizer._calibrator, calib.HistogramCalibrator):self._input_quantizer._calibrator._torch_hist = Trueelse:self.init_quantizer(quant_desc_input, quant_desc_weight)if isinstance(self._input_quantizer._calibrator, calib.HistogramCalibrator):self._input_quantizer._calibrator._torch_hist = Trueself._weight_quantizer._calibrator._torch_hist = True__init__(quant_instances)return quant_instances#该函数递归的遍历模型中的所有子模块,寻找可以被量化版本替换的模块。
def torch_module_find_quant_module(model, module_list, prefix=''):for name in model._modules:submodule = model._modules[name]path = name if prefix == '' else prefix + '.' + nametorch_module_find_quant_module(submodule, module_list, prefix=path) # 递归记录路径submodule_id = id(type(submodule))if submodule_id in module_list:# 转换model._modules[name] = tranfer_torch_to_quantization(submodule, module_list[submodule_id])#该函数是手动插入量化节点的起始函数,目的是为模型生成一个替换映射,并调用递归函数来查找和替换模型中的模块。
def replace_to_quantization_model(model):module_list = {}for entry in quant_modules._DEFAULT_QUANT_MAP:module = getattr(entry.orig_mod, entry.mod_name) # module -> torch.nn.modules.conv.Conv1dmodule_list[id(module)] = entry.replace_modtorch_module_find_quant_module(model, module_list)def evaluate_coco(model, loader, save_dir='', conf_thres=0.001, iou_thres=0.65):if save_dir and os.path.dirname(save_dir) != "":os.makedirs(os.path.dirname(save_dir), exist_ok=True)return test.test("data/voc.yaml",save_dir=Path(save_dir),conf_thres=conf_thres,iou_thres=iou_thres,model=model,dataloader=loader,is_coco=True,plots=False,half_precision=True,save_json=False)[0][3]def collect_stats(model, data_loader, device, num_batch=10):model.eval()# 开启校准器for name, module in model.named_modules():if isinstance(module, quant_nn.TensorQuantizer):if module._calibrator is not None:module.disable_quant()module.enable_calib()else:module.disable()# testwith torch.no_grad():for i, datas in enumerate(data_loader):imgs = datas[0].to(device, non_blocking=True).float() / 255.0model(imgs)#print(i, imgs.shape)if i >= num_batch:break# 关闭校准器for name, module in model.named_modules():if isinstance(module, quant_nn.TensorQuantizer):if module._calibrator is not None:module.enable_quant()module.disable_calib()else:module.enable()
def compute_amax(model, **kwargs):for name, module in model.named_modules():if isinstance(module, quant_nn.TensorQuantizer):if module._calibrator is not None:if isinstance(module._calibrator, calib.MaxCalibrator):module.load_calib_amax()else:module.load_calib_amax(**kwargs)module._amax = module._amax.to(device)
def calibrate_model(model, dataloader, device):# 收集前向信息collect_stats(model, dataloader, device)# 获取动态范围,计算 amax 值,scale 值compute_amax(model, method='mse')# 判断层是否是量化层
def have_quantizer(layer):for name, module in layer.named_modules():if isinstance(module, quant_nn.TensorQuantizer):return Truereturn Falseclass disable_quantization:# 初始化def __init__(self, model):self.model = model# 应用 关闭量化def apply(self, disabled=True):for name, module in self.model.named_modules():if isinstance(module, quant_nn.TensorQuantizer):module._disabled = disableddef __enter__(self):self.apply(disabled=True)def __exit__(self, *args, **kwargs):self.apply(disabled=False)# 重启量化
class enable_quantization:def __init__(self, model):self.model = modeldef apply(self, enabled=True):for name, module in self.model.named_modules():if isinstance(module, quant_nn.TensorQuantizer):module._disabled = not enableddef __enter__(self):self.apply(enabled=True)return selfdef __exit__(self, *args, **kwargs):self.apply(enabled=False)import json
class SummaryTools:def __init__(self, file):self.file = fileself.data = []def append(self, item):self.data.append(item)json.dump(self.data, open(self.file, "w"), indent=4)def sensitive_analysis(model, loader):save_file = "senstive_analysis.json"summary = SummaryTools(save_file)# for 循环每一个层print(f"Sensitive analysis by each layer...")for i in range(0, len(model.model)):layer = model.model[i]# 判断 layer 是否是量化层if have_quantizer(layer): # 如果是量化层# 使该层的量化失效,不进行 int8 的量化,使用 fp16 精度运算disable_quantization(layer).apply()# 计算 map 值ap = evaluate_coco(model, loader)# 保存精度值,json 文件summary.append([ap, f"model.{i}"])print(f"layer {i} ap: {ap}")# 重启层的量化,还原enable_quantization(layer).apply()else:print(f"ignore model.{i} because it is {type(layer)}")# 循环结束,打印前 10 个影响比较大的层summary = sorted(summary.data, key=lambda x: x[0], reverse=True)print("Sensitive Summary")for n, (ap, name) in enumerate(summary[:10]):print(f"Top{n}: Using fp16 {name}, ap = {ap:.5f}")def export_model(model, save_file, input_dummy, device, dynamic_batch=True):model.eval()with torch.no_grad():torch.onnx.export(model, input_dummy, save_file, opset_version=13,input_names=['input'], output_names=['output'],dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}} if dynamic_batch else None)def export_ptq(model, save_file, input_dummy, device, dynamic_batch=True):# 打开 fake 算子quant_nn.TensorQuantizer.use_fb_fake_quant = Truemodel.eval()with torch.no_grad():torch.onnx.export(model, input_dummy, save_file, opset_version=13,input_names=['input'], output_names=['output'],dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}} if dynamic_batch else None)quant_nn.TensorQuantizer.use_fb_fake_quant = Falseif __name__ == "__main__":weight = "yolov7_last.pt"device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')print(device)# 加载数据print("Evalute Dataset...")cocodir = "/home/xx/dataSet/VOCdevkit/COCO2007/"val_dataloader = prepare_val_dataset(cocodir)train_dataloader = prepare_train_dataset(cocodir)quant = Trueif not quant:# 加载 pth 模型model = load_yolov7_model(weight, device = 'cpu')model.float()model.eval()with torch.no_grad():model.fuse() # conv bn 进行层的合并, 加速# pth 模型验证print("Evalute Origin...")ap = evaluate_coco(model, val_dataloader)inputs = torch.randn(1, 3, 640, 640)export_model(model, 'yolov7_fp32.onnx', inputs, device, dynamic_batch=True)else:# init quant and fuse modelmodel = prepare_model(weight, device)#获取伪量化模型(手动 initial(), 手动插入 QDQ)replace_to_quantization_model(model)# 模型标定print("calibrate_model")calibrate_model(model, train_dataloader, device)##PTQ 模型验证print("Evaluate PTQ...")ptq_ap = evaluate_coco(model, val_dataloader)#敏感层分析sensitive_analysis(model, val_dataloader)# 导出模型inputs = torch.randn(1, 3, 640, 640, device='cuda')export_ptq(model, 'yolov7_ptq_int8.onnx', inputs, device, dynamic_batch=True)