一、印度金融市场数据特点
印度作为全球增长最快的主要经济体之一,其金融市场具有以下显著特征:
- 双交易所体系:国家证券交易所(NSE)和孟买证券交易所(BSE)
- 高流动性品种:Nifty 50指数成分股、银行股等
- 独特交易机制:T+2结算制度,上午9:15至下午3:30交易时间(IST)
- 丰富IPO市场:2023年印度IPO数量位居全球前列
二、环境配置与基础对接
1. API密钥获取与配置
# 配置StockTV API
API_KEY = "your_api_key_here" # 通过官网或客服获取
BASE_URL = "https://api.stocktv.top"# 印度市场特定参数
INDIA_COUNTRY_ID = 14 # 印度国家代码
NSE_EXCHANGE_ID = 46 # NSE交易所代码
BSE_EXCHANGE_ID = 74 # BSE交易所代码
2. 安装必要库
pip install requests websocket-client pandas plotly
三、印度K线数据专业对接
1. 多周期K线获取接口
import pandas as pddef get_india_kline(symbol, exchange, interval="15m"):"""获取印度股票K线数据:param symbol: 股票代码(如RELIANCE):param exchange: 交易所(NSE/BSE):param interval: 时间间隔(1m/5m/15m/1h/1d)"""url = f"{BASE_URL}/stock/kline"params = {"symbol": symbol,"exchange": exchange,"interval": interval,"countryId": INDIA_COUNTRY_ID,"key": API_KEY}response = requests.get(url, params=params)data = response.json()# 转换为Pandas DataFramedf = pd.DataFrame(data['data'])df['time'] = pd.to_datetime(df['time'], unit='ms') # 转换印度时区(IST)df['time'] = df['time'].dt.tz_localize('UTC').dt.tz_convert('Asia/Kolkata')return df# 获取Reliance Industries的15分钟K线(NSE)
reliance_kline = get_india_kline("RELIANCE", "NSE", "15m")
2. 专业级K线可视化
import plotly.graph_objects as godef plot_advanced_kline(df):fig = go.Figure(data=[go.Candlestick(x=df['time'],open=df['open'],high=df['high'],low=df['low'],close=df['close'],increasing_line_color='green',decreasing_line_color='red')])fig.update_layout(title='印度股票K线图',xaxis_title='印度标准时间(IST)',yaxis_title='价格(INR)',xaxis_rangeslider_visible=False,template="plotly_dark")# 添加成交量柱状图fig.add_trace(go.Bar(x=df['time'],y=df['volume'],name='成交量',marker_color='rgba(100, 100, 255, 0.6)',yaxis='y2'))fig.update_layout(yaxis2=dict(title='成交量',overlaying='y',side='right'))fig.show()plot_advanced_kline(reliance_kline)
四、印度市场实时数据对接
1. WebSocket实时行情订阅
import websocket
import json
import threadingclass IndiaMarketData:def __init__(self):self.symbol_map = {} # 存储symbol到股票名称的映射def on_message(self, ws, message):data = json.loads(message)# 处理实时行情更新if data.get('type') == 'stock':symbol = data['symbol']print(f"实时行情 {self.symbol_map.get(symbol, symbol)}: "f"最新价 {data['last']} 成交量 {data['volume']}")# 处理指数更新elif data.get('type') == 'index':print(f"指数更新 {data['name']}: {data['last']} ({data['chgPct']}%)")def subscribe_symbols(self, ws):# 订阅Nifty 50成分股(示例)nifty_stocks = ["RELIANCE", "TCS", "HDFCBANK", "INFY"]for symbol in nifty_stocks:self.symbol_map[symbol] = get_stock_name(symbol)# 订阅请求subscribe_msg = {"action": "subscribe","countryId": INDIA_COUNTRY_ID,"symbols": nifty_stocks,"indices": ["NSEI"] # Nifty 50指数}ws.send(json.dumps(subscribe_msg))def start(self):ws = websocket.WebSocketApp(f"wss://ws-api.stocktv.top/connect?key={API_KEY}",on_message=self.on_message,on_open=lambda ws: self.subscribe_symbols(ws))# 启动WebSocket连接wst = threading.Thread(target=ws.run_forever)wst.start()# 辅助函数:获取股票名称
def get_stock_name(symbol):url = f"{BASE_URL}/stock/queryStocks"params = {"symbol": symbol,"countryId": INDIA_COUNTRY_ID,"key": API_KEY}response = requests.get(url, params=params)return response.json()['data'][0]['name']# 启动实时数据服务
india_data = IndiaMarketData()
india_data.start()
2. 实时数据存储方案
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetimeBase = declarative_base()class RealTimeData(Base):__tablename__ = 'india_realtime_data'id = Column(Integer, primary_key=True)symbol = Column(String(20))exchange = Column(String(10))last_price = Column(Float)volume = Column(Integer)timestamp = Column(DateTime)created_at = Column(DateTime, default=datetime.utcnow)# 初始化数据库连接
engine = create_engine('sqlite:///india_market.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)def save_realtime_data(data):session = Session()try:record = RealTimeData(symbol=data['symbol'],exchange=data.get('exchange', 'NSE'),last_price=data['last'],volume=data['volume'],timestamp=datetime.fromtimestamp(data['timestamp']))session.add(record)session.commit()except Exception as e:print(f"保存数据失败: {e}")session.rollback()finally:session.close()# 在on_message回调中调用
# save_realtime_data(data)
五、印度IPO新股数据深度对接
1. 获取IPO日历与详情
def get_india_ipo_list(status="upcoming"):"""获取印度IPO列表:param status: upcoming(即将上市)/recent(近期上市)"""url = f"{BASE_URL}/stock/getIpo"params = {"countryId": INDIA_COUNTRY_ID,"status": status,"key": API_KEY}response = requests.get(url, params=params)return response.json()# 获取即将上市的IPO
upcoming_ipos = get_india_ipo_list("upcoming")
print("即将上市的IPO:")
for ipo in upcoming_ipos['data'][:5]:print(f"{ipo['company']} ({ipo['symbol']}) - 发行价: ₹{ipo['ipoPrice']}")# 获取近期上市的IPO表现
recent_ipos = get_india_ipo_list("recent")
print("\n近期IPO表现:")
for ipo in recent_ipos['data'][:5]:change = (ipo['last'] - ipo['ipoPrice']) / ipo['ipoPrice'] * 100print(f"{ipo['company']}: 发行价 ₹{ipo['ipoPrice']} → 当前 ₹{ipo['last']} ({change:.2f}%)")
2. IPO数据分析与可视化
import plotly.express as pxdef analyze_ipo_performance():# 获取过去6个月的IPO数据ipos = get_india_ipo_list("recent")['data']df = pd.DataFrame(ipos)# 计算首日/首周涨跌幅df['listing_gain'] = (df['listingPrice'] - df['ipoPrice']) / df['ipoPrice'] * 100df['current_gain'] = (df['last'] - df['ipoPrice']) / df['ipoPrice'] * 100# 绘制散点图fig = px.scatter(df, x='listing_gain', y='current_gain', hover_data=['company', 'symbol'],title="印度IPO表现分析",labels={'listing_gain':'首日涨幅(%)', 'current_gain':'当前涨幅(%)'})# 添加参考线fig.add_hline(y=0, line_dash="dash")fig.add_vline(x=0, line_dash="dash")fig.show()return dfipo_analysis = analyze_ipo_performance()
六、生产环境最佳实践
1. 错误处理与重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10),before_sleep=lambda retry_state: logger.warning(f"重试 {retry_state.attempt_number} 次,原因: {retry_state.outcome.exception()}"))
def safe_api_call(url, params):try:response = requests.get(url, params=params, timeout=10)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:logger.error(f"API请求失败: {e}")raise
2. 性能优化方案
import redis
from functools import lru_cache# 初始化Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)@lru_cache(maxsize=100)
def get_stock_info(symbol):"""缓存股票基本信息"""cache_key = f"stock:{symbol}:info"cached = r.get(cache_key)if cached:return json.loads(cached)url = f"{BASE_URL}/stock/queryStocks"params = {"symbol": symbol,"countryId": INDIA_COUNTRY_ID,"key": API_KEY}data = safe_api_call(url, params)r.setex(cache_key, 3600, json.dumps(data)) # 缓存1小时return data# 批量获取K线数据优化
def batch_get_kline(symbols, interval):"""批量获取K线数据,减少API调用次数"""results = {}with ThreadPoolExecutor(max_workers=5) as executor:future_to_symbol = {executor.submit(get_india_kline, sym, "NSE", interval): sym for sym in symbols}for future in as_completed(future_to_symbol):symbol = future_to_symbol[future]try:results[symbol] = future.result()except Exception as e:logger.error(f"获取{symbol}数据失败: {e}")return results
七、总结与资源
关键要点回顾
- K线数据:支持多周期获取,专业级可视化方案
- 实时行情:WebSocket低延迟连接,支持NSE/BSE双交易所
- IPO数据:完整的新股上市日历与表现追踪
扩展资源
- 印度证券交易委员会(SEBI)官网
- NSE官方数据文档
- StockTV完整API文档
特别提示:印度市场有特殊的节假日安排和交易规则,建议在实现中考虑:
- 处理IST时区转换(UTC+5:30)
- 关注SEBI监管政策变化
- 对IPO锁定期等特殊规则进行额外处理