最近比较忙,没时间看股票,为了能在预期价格买入,我专门写了个监听程序,当股票价格达到预期价格时,将会推送消息到微信,我使用的是自建企业微信,所以不公开此部分代码,改成你自己的就行了。
我是个佛系股民,所以适合这种方式,如果你需要经常操作,不建议使用。
python源码如下,使用3.7版本,股票数据使用的XX财经,不建议使用baostock,数据不及时,建议5分钟执行一次。
复制
import aiohttp,asyncio,json,time,re,os,datetime class StockListen: def __init__(self): #定义需要监听的股票代码列表 self.stock_list = ['1.600050','1.601988','1.601288','1.601939'] #定义预期价格列表 self.expect_price = [6.6,3.0,2.7,5] #定义数据接口api地址 self.api_url = 'https://push2his.eastmoney.com/api/qt/stock/kline/get' #定义推送地址 self.wechat_url='你的推送地址' #初始化异步请求对象 self.session = aiohttp.ClientSession() #定义本地记录文件路径stock.conf self.stock_conf = 'stock.conf' async def get_data(self,stock_code): #定义请求头 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36', 'referer':'https://quote.eastmoney.com' } #获取当前时间戳 now_time = int(time.time()*1000) #定义请求参数 params = { 'cb': 'jQuery351010027202936442903_1692684817016', 'secid': stock_code, 'ut': 'fa5fd1943c7b386f172d6893dbfba10b', 'fields1': 'f1,f2,f3,f4,f5,f6', 'fields2': 'f51,f52,f53,f54,f55', 'klt': '101', 'fqt': '1', 'end': '20500101', 'lmt': '1', '_': now_time } #发起异步请求 async with self.session.get(self.api_url, params=params, headers=headers) as resp: #获取响应内容 data = await resp.text() #正则匹配提取jQuery351010027202936442903_1692684817016()中的内容 data = re.findall('jQuery351010027202936442903_1692684817016\((.*)\);', data)[0] #将数据转换为json格式 data_json = json.loads(data) if data_json['data'] is not None: return data_json['data'] else: return None #企业微信异步推送,这里可以换成你自己的 async def send_wechat(self,title,content): #准备请求数据 curtime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) data = "你的数据" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36', 'Content-Type': 'application/json' } #发送请求 async with self.session.post(url=self.wechat_url,data=json.dumps(data),headers=headers) as response: #获取响应数据 response = await response.text() #返回响应数据 return response #遍历股票代码列表 async def get_stock_code(self): #遍历股票代码列表self.stock_list并通过索引下标获取对应价格self.expect_price for i in range(len(self.stock_list)): time.sleep(5) #获取股票代码 stock_code = self.stock_list[i] #获取股票价格 expect_price = self.expect_price[i] #通过get_data方法获得股票数据 data = await self.get_data(stock_code) if data is None: return {"code":100,"msg":"数据获取失败"} #判断股票价格是否低于预期 print(data['klines'][0]) #,分割数据 klines = data['klines'][0].split(',') if float(klines[2]) < float(expect_price): #判断今日是否已经推送过 lastrecord = await self.get_stock_push_time(stock_code) if lastrecord is None: #通过send_wechat方法发送消息 response = await self.send_wechat(data['name']+stock_code,klines[2]) await self.record_stock_push_time(stock_code) print(response) else: current_time = datetime.datetime.now().strftime("%Y-%m-%d") if lastrecord != current_time: #通过send_wechat方法发送消息 response = await self.send_wechat(data['name']+stock_code,klines[2]) await self.record_stock_push_time(stock_code) print(response) else: print("今日已推送") else: #如果高于预期则不发送消息 print(stock_code+'价格未达到预期') async def get_stock_push_time(self,stock_code): # 检查文件是否存在 if os.path.exists("stock.conf"): with open("stock.conf", "r") as file: lines = file.readlines() for line in lines: if line.startswith(stock_code + ","): return line.split(",")[1].strip() return None #记录推送时间到本地文件stock.conf async def record_stock_push_time(self,stock_code): current_time = datetime.datetime.now().strftime("%Y-%m-%d") record = f"{stock_code},{current_time}" # 检查文件是否存在 if os.path.exists("stock.conf"): with open("stock.conf", "r") as file: lines = file.readlines() else: lines = [] # 更新股票推送时间 found_stock = False for i in range(len(lines)): if lines[i].startswith(stock_code + ","): lines[i] = record + "\n" found_stock = True break # 如果股票代码不存在,则添加新记录 if not found_stock: lines.append(record + "\n") # 将记录写入文件 with open("stock.conf", "w") as file: file.writelines(lines) #单例测试 if __name__ == '__main__': async def ansyc_test(): #获取股票代码 stock_code = '1.600050' #实例化对象 eastmoney_stock_data =StockListen() #发起异步请求 data=await eastmoney_stock_data.get_stock_code() await eastmoney_stock_data.session.close() return data result=asyncio.get_event_loop().run_until_complete(ansyc_test()) print(result)
将上面的代码放在服务器上,再配合宝塔面板等定时任务,即可实现股票购买价位推送。
评论 (0)