python 绘制蜡烛图

发布时间:2023年12月28日

1.用途:绘制期货数据蜡烛图;具体应用于数据分析。
2.功能:
2.1.实现数据平移
    方法1:单击鼠标后移动的别处释放
    方法2:鼠标压下平移
    方法3:左右箭头键平移
2.2.缩放
    方法1:滚轮缩放
    方法2:上下箭头键缩放
2.3.十字光标:显示光标处日期价格

3.缺点:数据太多时耗时多,十字光标只能显示在主图,附图不能显示。有高手请指教。
4.数据:日线数据(当然其他周期也可)
5.具体实现:
    TCursor单光标
    Candle 蜡烛图类
6.应用:
    candle = Candle()
    candle.plot(df)

7.备注:
v0.2:2023/12/27
Candle修复当光标移动到当前数据外报错;删除当光标移动到数据外自动平移;
提取出数据含当前数据作为gloabl字典
TCursor修改全局参数;

v0.3:2023/12/27
增加ratioButton;主要用来自动切换不同周期,如日线,周线,月线,季线,年线;
调整x轴刻度间隔。

原数据:只用到date,open?? ?high?? ?low?? ?close?? ?volume? ? 我的数据有62000行

date	codemon	varmon	open	high	low	close	volume	amount	position	settle	前结算	main	var
1994/9/15	A	A	2220	2224	2220	2220	0		3689	0		main	A
1994/9/16	A	A	2228	2228	2210	2210	0		3574	0		main	A
2004/11/3	A0501	A0501	2644	2647	2630	2636	51756	136508.12	137943			not	A
2004/11/4	A0501	A0501	2627	2630	2615	2623	44481	116643.29	136166			not	A
2004/11/5	A0501	A0501	2602	2622	2601	2621	42961	112022.78	136262			not	A
2004/11/8	A0501	A0501	2629	2640	2617	2629	57325	150813.26	140600			not	A
2004/11/9	A0501	A0501	2632	2640	2628	2634	32965	86836.74	139376			not	A
2004/11/10	A0501	A0501	2644	2652	2637	2643	61681	163124	133836			not	A
2004/11/11	A0501	A0501	2650	2660	2612	2635	71765	189398.07	134233			not	A
2023/7/28	A2405	A2405	4875	4877	4857	4865	272	1323.46	2160			not	A
2023/7/31	A2405	A2405	4857	4871	4830	4859	491	2380.46	2293			not	A
2023/8/1	A2405	A2405	4852	4868	4838	4850	272	1319.17	2407			not	A
2023/8/2	A2405	A2405	4852	4856	4832	4837	397	1923.4	2600			not	A
2023/8/3	A2405	A2405	4826	4849	4814	4840	345	1666.15	2678			not	A
2023/8/4	A2405	A2405	4849	4878	4849	4865	702	3416.92	2579			not	A
2023/8/7	A2405	A2405	4876	4930	4843	4915	924	4522.95	2388			not	A
2023/8/8	A2405	A2405	4907	4917	4888	4898	429	2101.7	2427			not	A
2023/8/9	A2405	A2405	4899	4999	4889	4996	1154	5724.85	2328			not	A

版本v0.3

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.3
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
# import talib
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursor
from matplotlib.widgets import RadioButtons

plt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题

DATA = dict(data=None, freq='', df=None, n=0, cur_data=None,
            cur_n=0, cur_start=None, cur_end=None, cur_x=-1)


class _TMultiCursor(object):
    """
    一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现
    single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图
    single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图
    注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)
    用法:
    import matplotlib.pyplot as plt
    import numpy as np
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2*np.pi*t))
    ax2.plot(t, np.sin(4*np.pi*t))
    cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)
    plt.show()

    #单子图上面的十字星光标
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2 * np.pi * t))
    ax2.plot(t, np.sin(4 * np.pi * t))

    cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)
    plt.show()

    #多子图上面同时出现十字星光标
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2*np.pi*t))
    ax2.plot(t, np.sin(4*np.pi*t))

    # cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)
    cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,
        horizOn=True, vertOn=True, color='r', lw=0.5)
    plt.show()
    """

    def __init__(self, fig, axes, single=0, **lineprops):
        self.canvas = fig.canvas
        self.axes = axes
        self.single = single
        self.text = self.axes[0].text(
            0.72, 0.9, '', transform=self.axes[0].transAxes)
        if not lineprops:
            lineprops = dict(color='gray', lw=0.5, ls='dashdot')
        if single not in [0, 1]:
            raise ValueError(
                'Unrecognized single value: ' +
                str(single) +
                ', must be 0 or 1')

        xmin, xmax = axes[0].get_xlim()
        ymin, ymax = axes[0].get_ylim()
        xmid = 0.5 * (xmin + xmax)
        ymid = 0.5 * (ymin + ymax)

        self.background = None
        self.needclear = False

        lineprops['animated'] = True  # for blt
        self.lines = [
            [ax.axhline(ymid, visible=False, **lineprops) for ax in axes],
            [ax.axvline(xmid, visible=False, **lineprops) for ax in axes]
        ]

        self.canvas.mpl_connect('motion_notify_event', self.onmove)
        self.canvas.mpl_connect('draw_event', self.clear)

    def clear(self, event):
        self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))
        for line in self.lines[0] + self.lines[1]:
            line.set_visible(False)
        self.text.set_visible(False)

    def onmove(self, event):
        if event.inaxes is None:
            return
        if not self.canvas.widgetlock.available(self):
            return

        self.needclear = True

        for i in range(len(self.axes)):
            if event.inaxes == self.axes[i]:
                if self.single == 0:
                    for line in self.lines[1]:
                        line.set_xdata((event.xdata, event.xdata))
                        line.set_visible(True)

                    line = self.lines[0][i]
                    line.set_ydata((event.ydata, event.ydata))
                    line.set_visible(True)
                else:
                    for line in self.lines[0]:
                        line.set_ydata((event.ydata, event.ydata))
                        line.set_visible(True)

                    line = self.lines[1][i]
                    line.set_xdata((event.xdata, event.xdata))
                    line.set_visible(True)
            else:
                self.lines[self.single][i].set_visible(False)

        if self.background is not None:
            # self.text.set_visible(False)
            self.text.set_visible(True)
            x, y = event.xdata, event.ydata
            self.text.set_text('x=%1.2f, y=%1.2f' % (x, y))

            self.canvas.restore_region(self.background)

        for lines in self.lines:
            for line in lines:
                if line.get_visible():
                    line.axes.draw_artist(line)

        self.canvas.blit()


class TCursor:
    """单子图更快的光标"""

    def __init__(self, ax):
        self.ax = ax
        self.background = None

        xmin, xmax = ax.get_xlim()
        ymin, ymax = ax.get_ylim()
        xmid = 0.5 * (xmin + xmax)
        ymid = 0.5 * (ymin + ymax)
        self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')
        self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')

        # 轴坐标中的文本位置
        bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
        self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,
                            verticalalignment='top', bbox=bbox, clip_on=False)
        self._creating_background = False
        ax.figure.canvas.mpl_connect('draw_event', self.on_draw)

    def on_draw(self, event):
        self.create_new_background()

    def set_cross_hair_visible(self, visible):
        need_redraw = self.horizontal_line.get_visible() != visible
        self.horizontal_line.set_visible(visible)
        self.vertical_line.set_visible(visible)
        self.text.set_visible(visible)
        return need_redraw

    def create_new_background(self):
        if not self._creating_background:
            self._creating_background = True
            self.set_cross_hair_visible(False)
            self.ax.figure.canvas.draw()
            self.background = self.ax.figure.canvas.copy_from_bbox(
                self.ax.bbox)
            self.set_cross_hair_visible(True)
            self._creating_background = False

    def on_mouse_move(self, event):
        if self.background is None:
            self.create_new_background()

        if not event.inaxes:
            need_redraw = self.set_cross_hair_visible(False)
            if need_redraw:
                self.ax.figure.canvas.restore_region(self.background)
                self.ax.figure.canvas.blit(self.ax.bbox)
        else:
            if DATA['cur_data'] is None:
                return
            idx = int(event.xdata)
            # 光标超出范围平移数据,不在显示文本
            if idx < 0 or idx >= DATA['cur_n']:
                return

            self.set_cross_hair_visible(True)
            # update the line positions
            x, y = int(event.xdata), int(event.ydata)
            xmin, xmax = self.ax.get_xlim()
            ymin, ymax = self.ax.get_ylim()
            xmid = 0.5 * (xmin + xmax)
            ymid = 0.5 * (ymin + ymax)

            self.horizontal_line.set_ydata([y])
            self.vertical_line.set_xdata([x])

            _text = '\n'.join((
                r'open: %.1f' % DATA['cur_data']["open"].iloc[idx],
                r'high: %.1f' % DATA['cur_data']["high"].iloc[idx],
                r'low %.1f' % DATA['cur_data']["low"].iloc[idx],
                r'close: %.1f' % DATA['cur_data']["close"].iloc[idx],
                r'date:%s' % DATA['cur_data'].index[idx].strftime('%Y%m%d')))
            self.text.set_text(_text)
            self.ax.figure.canvas.restore_region(self.background)
            self.ax.draw_artist(self.horizontal_line)
            self.ax.draw_artist(self.vertical_line)
            self.ax.draw_artist(self.text)
            self.ax.figure.canvas.blit(self.ax.bbox)


class Candle:

    @classmethod
    def to_freq(cls, df, freq='W-FRI', cols=None):
        # type:(pd.DataFrame,str,list)->pd.DataFrame
        """
        用途:转换数据频率
        参数:
        freq:str
            *S秒;*T,min;*H小时
            *B工作日;*D日历日
            *W-FRI周频(星期五)
            *M月末;*SM 半月结束(15日和月末)
            *Q季末
            *A(S)-Jun 年度,锚定于6月底
            *A,*Y 年终频率
        cols:list-like =None要返回的列名
        """
        if cols is None:
            cols = ['date', 'name', 'open', 'high', 'low', 'close', 'volume']
            cols = [v for v in cols if v in df.columns]
        df = df[cols]
        rdf = pd.DataFrame()
        if 'name' in cols:
            rdf['name'] = df['name'].resample(freq).first()
        rdf['open'] = df['open'].resample(freq).first()
        rdf['high'] = df['high'].resample(freq).max()
        rdf['low'] = df['low'].resample(freq).min()
        rdf['close'] = df['close'].resample(freq).last()
        if 'date' in cols:
            rdf['start'] = df['date'].resample(freq).first()
            rdf['end'] = df['date'].resample(freq).last()
        if 'volume' in cols:
            rdf['volume'] = df['volume'].resample(
                freq).last()  # sum()这一月的每天成交量的和
        # g1.resample('W', on='date').first()
        return rdf

    @classmethod
    def get_font(cls):
        return {
            '常用': 'Times New Roman',
            # 中文字体
            '黑体': 'SimHei',
            '微软雅黑': 'Microsoft YaHei',
            '微软正黑体': 'Microsoft JhengHei',
            '新宋体': 'NSimSun',
            '新细明体': 'PMingLiU',
            '细明体': 'MingLiU',
            '华文新魏': 'STXinwei',
            '华文行楷': 'STXingkai',
            '华文隶书': 'STLliti',
            '花纹琥珀': 'STHupo',
            '华文彩云': 'STCaiyun',
            '方正姚体': 'FZYaoti',
            '方正舒体': 'FZShuTi',
            '标楷体': 'DFKai-SB',
            '华文仿宋': 'STFangsong',
            '华文中宋': 'STZhongsong',
            '华文宋体': 'STSong',
            '华文楷体': 'STKaiti',
            '华文细黑': 'STXihei',
            '幼圆': 'YouYuan',
            '隶书': 'LiSu',
            '楷体_GB 2313': 'Kaiti_GB2313',
            '仿宋_GB2313': 'FangSong_GB2313',
            '仿宋': 'FangSong'
        }

    @classmethod
    def _get_mystyle_(cls):
        my_color = mpf.make_marketcolors(
            up='r', down='g', edge='inherit',
            wick='inherit', volume='inherit')
        color = '(0.82, 0.83, 0.85)'
        return mpf.make_mpf_style(
            marketcolors=my_color, figcolor=color, gridcolor=color)

    def set_title(self, title='kline '):
        freqs = {'d': 'day', 'w': 'week', 'm': 'mon', 'q': 'quarter', 'y': 'year'}
        title += freqs.get(DATA['freq'], 'other')
        self.fig.suptitle(title)

    def _init_(self):
        def set_fig():
            font = {'fontname': 'simHei', 'size': '16', 'color': 'black',
                    'weight': 'bold', 'va': 'bottom', 'ha': 'center'}
            fs, fc = (12, 8), (0.82, 0.83, 0.85)
            self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)
            # self.fig.text(0.50, 0.94, 'kline day', **font)

        def set_axis():
            axkline = self.fig.add_axes([0.06, 0.25, 0.90, 0.70])
            axvol = self.fig.add_axes([0.06, 0.15, 0.90, 0.10], sharex=axkline)
            axvol.set_ylabel('vol')
            self.axes = {'axkline': axkline, 'axvol': axvol}

            axcolor = 'lightgoldenrodyellow'
            self.axradio = self.fig.add_axes(
                [0.01, 0.50, 0.05, 0.20], facecolor='none')
            # 设置边框颜色透明度
            names = ['bottom', 'top', 'left', 'right']
            for name in names:
                self.axradio.spines[name].set(alpha=0.1, color='g')

            self.radio = RadioButtons(
                self.axradio, ('d', 'w', 'm', 'q', 'y'))  # facecolor='none'
            for _text in self.radio.labels:
                _text.set(alpha=0.5, color='g')
            self.radio.on_clicked(self.radiobtn_on_clicked)

        set_fig()
        set_axis()

        self.fig.canvas.mpl_connect('axes_enter_event', self.enter_axes)
        self.fig.canvas.mpl_connect('axes_leave_event', self.leave_axes)
        self.fig.canvas.mpl_connect('button_press_event', self.on_butpress)
        # 拖动-鼠标按下移动到新位置后释放
        self.fig.canvas.mpl_connect('button_release_event', self.on_butrelease)
        # 鼠标按下拖动
        self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)
        self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小
        self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)

        # plt.tight_layout()
        plt.subplots_adjust(hspace=0.2)

    def __init__(self):
        self.style = self._get_mystyle_()
        self.fig = None
        self.axes = {}
        self.cursor = None
        self.btnpressed = False
        self._init_()

    def _plot_(self):
        """ 根据最新的参数,重新绘制整个图表"""

        # def get_ma(cols=[5, 10]):
        #     def op(x): return talib.MA(DATA['cur_data']['close'], timeperiod=x)
        #     d = {('MA%s' % v): op(v) for v in cols}
        #     return pd.DataFrame(d)

        def get_locator():
            if DATA['freq'] == 'y':
                return 1
            elif DATA['freq'] == 'q':
                return 3
            elif DATA['cur_n'] <= 90:
                return 5
            else:
                return int(DATA['cur_n'] / 18)

        def format_date(x, pos):
            if x < 0 or x > len(DATA['cur_data'].index) - 1:
                return ''
            return DATA['cur_data'].index[int(x)]

        self._set_curdata_()

        # data = get_ma(cols=[5, 10])
        # ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]
        # color=(0.6, 0.75, 0.6),ylabel=''
        fmt = tk.FuncFormatter(format_date)
        self.axes['axkline'].xaxis.set_major_formatter(fmt)
        val = get_locator()
        self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(val))

        # 绘制图表
        mpf.plot(DATA['cur_data'],
                 ax=self.axes['axkline'],
                 volume=self.axes['axvol'],
                 mav=[5, 10],
                 # addplot=ap,
                 type='candle',
                 style=self.style,
                 datetime_format='%Y%m%d',
                 ylabel='',
                 ylabel_lower='',
                 xrotation=60, warn_too_much_data=DATA['n'])
        self.set_title()
        plt.show()


    def plot(self, df):
        """ 根据最新的参数,重新绘制整个图表"""

        def set_idx_startend():
            if DATA['cur_start'] is None or DATA['cur_end'] is None:
                if DATA['n'] <= 200:
                    DATA['cur_start'] = 0
                    DATA['cur_end'] = DATA['n']
                elif 200 < DATA['n']:
                    DATA['cur_start'] = 100
                    DATA['cur_end'] = DATA['cur_start'] + 100
            if DATA['cur_start'] >= DATA['cur_end']:
                DATA['cur_start'] = 0

        self._set_data_(DATA['freq'], df)
        set_idx_startend()
        self._plot_()

    @classmethod
    def _set_data_(cls, freq, df=None):
        if df is not None:
            DATA['data'] = df
        freqs = {'w': 'W-FRI', 'm': 'M', 'q': 'Q', 'y': 'Y'}
        if freq not in freqs:
            DATA['df'] = DATA['data']
        else:
            DATA['df'] = cls.to_freq(DATA['data'], freqs[freq])
        DATA['n'] = len(DATA['df'])

    def radiobtn_on_clicked(self, label):
        DATA['freq'] = label
        self._set_data_(DATA['freq'])
        self._clear_()
        self._plot_()

    @classmethod
    def _set_curdata_(cls, start=None, end=None):
        if start is None:
            start = DATA['cur_start']
        if end is None:
            end = DATA['cur_end']

        end = DATA['n'] if end > DATA['n'] else end
        start = 0 if (start < 0 or start >= end) else start
        if start == end:
            start, end = 0, DATA['n']
        DATA['cur_start'], DATA['cur_end'] = start, end
        DATA['cur_data'] = DATA['df'].iloc[start: end]
        DATA['cur_n'] = len(DATA['cur_data'])

    def enter_axes(self, event):
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        if self.cursor is None:
            self.cursor = TCursor(self.axes['axkline'])
            self.fig.canvas.mpl_connect(
                'motion_notify_event', self.cursor.on_mouse_move)
            self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)

        event.canvas.draw_idle()

    def leave_axes(self, event):
        if event.inaxes == self.axes['axkline']:
            self.cursor = None
            event.canvas.draw_idle()
            self._clear_()
            self._plot_()

    def on_butpress(self, event):
        # 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        self.cursor = None
        if event.button != 1:
            return
        if event.xdata is None:
            DATA['cur_x'] = -1
            self.btnpressed = False
        else:
            self.btnpressed = True
            DATA['cur_x'] = int(event.xdata)

    # 鼠标拖动:-鼠标按下移动到新位置后释放
    def on_butrelease(self, event):
        # 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,
        # 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        self.cursor = None
        event.canvas.draw_idle()

        if event.xdata is not None:
            diff = int(event.xdata) - DATA['cur_x']
            start = DATA['cur_start'] - diff
            end = DATA['cur_end'] - diff
            self._set_curdata_(start, end)
            self._clear_()
            self._plot_()
            self.btnpressed = False
            DATA['cur_x'] = int(event.xdata)
        else:
            DATA['cur_x'] = -1

    def on_motion(self, event):  # 鼠标按下拖动,鼠标不按下平移显示光标
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return

        if not self.btnpressed:  # 显示光标
            if event.xdata is None:
                DATA['cur_x'] = -1
                return

            idx = int(event.xdata)
            # 光标超出范围平移数据,不在显示文本
            if idx < 0 or idx >= DATA['cur_n']:
                return
                # if idx < 0:
                #     start = DATA['cur_start'] - 1
                #     end = DATA['cur_end'] - 1
                # else:  # idx >= DATA['cur_n']:
                #     start = DATA['cur_start'] + 1
                #     end = DATA['cur_end'] + 1
                # self.cursor = None
                # self._set_curdata_(start, end)
                # self._clear_()
                # self._plot_()
            else:  # 显示光标文本
                if DATA['cur_start'] < 0:
                    DATA['cur_start'] = 0
                if DATA['cur_end'] > DATA['n']:
                    DATA['cur_end'] = DATA['n']
                self._set_curdata_()
                if self.cursor is None:
                    self.cursor = TCursor(self.axes['axkline'])
                    self.fig.canvas.mpl_connect(
                        'motion_notify_event', self.cursor.on_mouse_move)
                    self.fig.canvas.mpl_connect(
                        'draw_event', self.cursor.on_draw)
        else:
            # 设定平移的左右界限,如果平移后超出界限,则不再平移
            self.cursor = None
            if event.xdata is not None:
                dx = int(event.xdata - DATA['cur_x'])
                start = DATA['cur_start'] - dx
                end = DATA['cur_end'] - dx
                self._set_curdata_(start, end)
                self._clear_()
                self._plot_()

    # 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,
    # 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。
    def on_scroll(self, event):
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        self.cursor = None
        diff = int(DATA['cur_n'] * 0.1 / 2)
        if event.button == 'down':
            start = DATA['cur_start'] + diff
            end = DATA['cur_end'] - diff
            self._set_curdata_(start, end)
        elif event.button == 'up':
            start = DATA['cur_start'] - diff
            end = DATA['cur_end'] + diff
            self._set_curdata_(start, end)

        self._clear_()
        self._plot_()

    def _clear_(self):
        self.axes['axkline'].clear()
        self.axes['axvol'].clear()
        # for ax in self.axes.values():
        #     ax.cla()
        #     # plt.clf()

    # 键盘按下处理
    def on_key_press(self, event):
        self.cursor = None

        if event.key in ['up', 'down', 'left', 'right']:
            diff = int(DATA['cur_n'] * 0.1 / 2)
            diff = 1 if diff == 0 else diff
            start, end = 0, 0
            if event.key == 'up':  # 缩放
                start = DATA['cur_start'] - diff
                end = DATA['cur_end'] + diff
            elif event.key == 'down':  # 缩放
                start = DATA['cur_start'] + diff
                end = DATA['cur_end'] - diff
            elif event.key == 'left':  # 平移
                start = DATA['cur_start'] - 1
                end = DATA['cur_end'] - 1
            elif event.key == 'right':  # 平移
                start = DATA['cur_start'] + 1
                end = DATA['cur_end'] + 1

            self._set_curdata_(start, end)
            self.axes['axkline'].clear()
            self.axes['axvol'].clear()
            self._plot_()


if __name__ == '__main__':
    # 读取示例数据
    df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')
    df['date'] = pd.to_datetime(df['date'])
    df = df.set_index('date')
    candle = Candle()
    candle.plot(df)

版本v0.2

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.2
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
import talib
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursor
from matplotlib.widgets import RadioButtons


plt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题


DATA = dict(data=None,freq='',df=None, n=0, cur_data=None,
            cur_n=0,cur_start=None, cur_end=None, cur_x=-1)


class _TMultiCursor(object):
    """
    一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现
    single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图
    single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图
    注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)
    用法:
    import matplotlib.pyplot as plt
    import numpy as np
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2*np.pi*t))
    ax2.plot(t, np.sin(4*np.pi*t))
    cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)
    plt.show()

    #单子图上面的十字星光标
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2 * np.pi * t))
    ax2.plot(t, np.sin(4 * np.pi * t))

    cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)
    plt.show()

    #多子图上面同时出现十字星光标
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2*np.pi*t))
    ax2.plot(t, np.sin(4*np.pi*t))

    # cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)
    cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,
        horizOn=True, vertOn=True, color='r', lw=0.5)
    plt.show()
    """

    def __init__(self, fig, axes, single=0, **lineprops):
        self.canvas = fig.canvas
        self.axes = axes
        self.single = single
        self.text = self.axes[0].text(
            0.72, 0.9, '', transform=self.axes[0].transAxes)
        if not lineprops:
            lineprops = dict(color='gray', lw=0.5, ls='dashdot')
        if single not in [0, 1]:
            raise ValueError(
                'Unrecognized single value: ' +
                str(single) +
                ', must be 0 or 1')

        xmin, xmax = axes[0].get_xlim()
        ymin, ymax = axes[0].get_ylim()
        xmid = 0.5 * (xmin + xmax)
        ymid = 0.5 * (ymin + ymax)

        self.background = None
        self.needclear = False

        lineprops['animated'] = True  # for blt
        self.lines = [
            [ax.axhline(ymid, visible=False, **lineprops) for ax in axes],
            [ax.axvline(xmid, visible=False, **lineprops) for ax in axes]
        ]

        self.canvas.mpl_connect('motion_notify_event', self.onmove)
        self.canvas.mpl_connect('draw_event', self.clear)

    def clear(self, event):
        self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))
        for line in self.lines[0] + self.lines[1]:
            line.set_visible(False)
        self.text.set_visible(False)

    def onmove(self, event):
        if event.inaxes is None:
            return
        if not self.canvas.widgetlock.available(self):
            return

        self.needclear = True

        for i in range(len(self.axes)):
            if event.inaxes == self.axes[i]:
                if self.single == 0:
                    for line in self.lines[1]:
                        line.set_xdata((event.xdata, event.xdata))
                        line.set_visible(True)

                    line = self.lines[0][i]
                    line.set_ydata((event.ydata, event.ydata))
                    line.set_visible(True)
                else:
                    for line in self.lines[0]:
                        line.set_ydata((event.ydata, event.ydata))
                        line.set_visible(True)

                    line = self.lines[1][i]
                    line.set_xdata((event.xdata, event.xdata))
                    line.set_visible(True)
            else:
                self.lines[self.single][i].set_visible(False)

        if self.background is not None:
            # self.text.set_visible(False)
            self.text.set_visible(True)
            x, y = event.xdata, event.ydata
            self.text.set_text('x=%1.2f, y=%1.2f' % (x, y))

            self.canvas.restore_region(self.background)

        for lines in self.lines:
            for line in lines:
                if line.get_visible():
                    line.axes.draw_artist(line)

        self.canvas.blit()


class TCursor:
    """单子图更快的光标"""

    def __init__(self, ax):
        self.ax = ax
        self.background = None

        xmin, xmax = ax.get_xlim()
        ymin, ymax = ax.get_ylim()
        xmid = 0.5 * (xmin + xmax)
        ymid = 0.5 * (ymin + ymax)
        self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')
        self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')

        # 轴坐标中的文本位置
        bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
        self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,
                            verticalalignment='top', bbox=bbox, clip_on=False)
        self._creating_background = False
        ax.figure.canvas.mpl_connect('draw_event', self.on_draw)

    def on_draw(self, event):
        self.create_new_background()

    def set_cross_hair_visible(self, visible):
        need_redraw = self.horizontal_line.get_visible() != visible
        self.horizontal_line.set_visible(visible)
        self.vertical_line.set_visible(visible)
        self.text.set_visible(visible)
        return need_redraw

    def create_new_background(self):
        if not self._creating_background:
            self._creating_background = True
            self.set_cross_hair_visible(False)
            self.ax.figure.canvas.draw()
            self.background = self.ax.figure.canvas.copy_from_bbox(
                self.ax.bbox)
            self.set_cross_hair_visible(True)
            self._creating_background = False

    def on_mouse_move(self, event):
        if self.background is None:
            self.create_new_background()

        if not event.inaxes:
            need_redraw = self.set_cross_hair_visible(False)
            if need_redraw:
                self.ax.figure.canvas.restore_region(self.background)
                self.ax.figure.canvas.blit(self.ax.bbox)
        else:
            if DATA['cur_data'] is None:
                return
            idx = int(event.xdata)
            # 光标超出范围平移数据,不在显示文本
            if idx < 0 or idx >= DATA['cur_n']:
                return

            self.set_cross_hair_visible(True)
            # update the line positions
            x, y = int(event.xdata), int(event.ydata)
            xmin, xmax = self.ax.get_xlim()
            ymin, ymax = self.ax.get_ylim()
            xmid = 0.5 * (xmin + xmax)
            ymid = 0.5 * (ymin + ymax)

            self.horizontal_line.set_ydata([y])
            self.vertical_line.set_xdata([x])

            _text = '\n'.join((
                r'open: %.1f' % DATA['cur_data']["open"].iloc[idx],
                r'high: %.1f' % DATA['cur_data']["high"].iloc[idx],
                r'low %.1f' % DATA['cur_data']["low"].iloc[idx],
                r'close: %.1f' % DATA['cur_data']["close"].iloc[idx],
                r'date:%s' % DATA['cur_data'].index[idx].strftime('%Y%m%d')))
            self.text.set_text(_text)
            self.ax.figure.canvas.restore_region(self.background)
            self.ax.draw_artist(self.horizontal_line)
            self.ax.draw_artist(self.vertical_line)
            self.ax.draw_artist(self.text)
            self.ax.figure.canvas.blit(self.ax.bbox)


class Candle:

    @classmethod
    def to_freq(cls, df, freq='W-FRI', cols=None):
        # type:(pd.DataFrame,str,list)->pd.DataFrame
        """
        用途:转换数据频率
        参数:
        freq:str
            *S秒;*T,min;*H小时
            *B工作日;*D日历日
            *W-FRI周频(星期五)
            *M月末;*SM 半月结束(15日和月末)
            *Q季末
            *A(S)-Jun 年度,锚定于6月底
            *A,*Y 年终频率
        cols:list-like =None要返回的列名
        """
        if cols is None:
            cols = ['date', 'name', 'open', 'high', 'low', 'close', 'volume']
            cols = [v for v in cols if v in df.columns]
        df = df[cols]
        rdf = pd.DataFrame()
        if 'name' in cols:
            rdf['name'] = df['name'].resample(freq).first()
        rdf['open'] = df['open'].resample(freq).first()
        rdf['high'] = df['high'].resample(freq).max()
        rdf['low'] = df['low'].resample(freq).min()
        rdf['close'] = df['close'].resample(freq).last()
        if 'date' in cols:
            rdf['start'] = df['date'].resample(freq).first()
            rdf['end'] = df['date'].resample(freq).last()
        if 'volume' in cols:
            rdf['volume'] = df['volume'].resample(freq).last()  # sum()这一月的每天成交量的和
        # g1.resample('W', on='date').first()
        return rdf

    @classmethod
    def get_font(cls):
        return {
            '常用': 'Times New Roman',
            # 中文字体
            '黑体': 'SimHei',
            '微软雅黑': 'Microsoft YaHei',
            '微软正黑体': 'Microsoft JhengHei',
            '新宋体': 'NSimSun',
            '新细明体': 'PMingLiU',
            '细明体': 'MingLiU',
            '华文新魏': 'STXinwei',
            '华文行楷': 'STXingkai',
            '华文隶书': 'STLliti',
            '花纹琥珀': 'STHupo',
            '华文彩云': 'STCaiyun',
            '方正姚体': 'FZYaoti',
            '方正舒体': 'FZShuTi',
            '标楷体': 'DFKai-SB',
            '华文仿宋': 'STFangsong',
            '华文中宋': 'STZhongsong',
            '华文宋体': 'STSong',
            '华文楷体': 'STKaiti',
            '华文细黑': 'STXihei',
            '幼圆': 'YouYuan',
            '隶书': 'LiSu',
            '楷体_GB 2313': 'Kaiti_GB2313',
            '仿宋_GB2313': 'FangSong_GB2313',
            '仿宋': 'FangSong'
        }

    @classmethod
    def _get_mystyle_(cls):
        my_color = mpf.make_marketcolors(
            up='r', down='g', edge='inherit',
            wick='inherit', volume='inherit')
        color = '(0.82, 0.83, 0.85)'
        return mpf.make_mpf_style(
            marketcolors=my_color, figcolor=color, gridcolor=color)

    def _init_(self):
        def set_fig():
            font = {'fontname': 'simHei', 'size': '16', 'color': 'black',
                    'weight': 'bold', 'va': 'bottom', 'ha': 'center'}
            fs, fc = (12, 8), (0.82, 0.83, 0.85)
            self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)
            # self.fig.text(0.50, 0.94, 'kline day', **font)
            self.fig.suptitle('kline day')

        def set_axis():
            axkline = self.fig.add_axes([0.06, 0.25, 0.90, 0.70])
            axvol = self.fig.add_axes([0.06, 0.15, 0.90, 0.10], sharex=axkline)
            axvol.set_ylabel('vol')
            self.axes = {'axkline': axkline, 'axvol': axvol}

            axcolor = 'lightgoldenrodyellow'
            self.axradio = self.fig.add_axes(
                [0.01, 0.50, 0.05, 0.20], facecolor='none')
            self.radio = RadioButtons(
                self.axradio, ('d', 'w', 'm', 'q', 'y'))  # facecolor='none'
            for _text in self.radio.labels:
                _text.set(alpha=0.5, color='g')
            self.radio.on_clicked(self.radiobtn_on_clicked)

        set_fig()
        set_axis()

        self.fig.canvas.mpl_connect('axes_enter_event', self.enter_axes)
        self.fig.canvas.mpl_connect('axes_leave_event', self.leave_axes)
        self.fig.canvas.mpl_connect('button_press_event', self.on_butpress)
        # 拖动-鼠标按下移动到新位置后释放
        self.fig.canvas.mpl_connect('button_release_event', self.on_butrelease)
        # 鼠标按下拖动
        self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)
        self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小
        self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)

        # plt.tight_layout()
        plt.subplots_adjust(hspace=0.2)

    def __init__(self):
        self.style = self._get_mystyle_()
        self.fig = None
        self.axes = {}
        self.cursor = None
        self.btnpressed = False
        self._init_()

    def _plot_(self):
        """ 根据最新的参数,重新绘制整个图表"""
        def get_ma(cols=[5, 10]):
            def op(x): return talib.MA(DATA['cur_data']['close'], timeperiod=x)
            d = {('MA%s' % v): op(v) for v in cols}
            return pd.DataFrame(d)

        def format_date(x, pos):
            if x < 0 or x > len(DATA['cur_data'].index) - 1:
                return ''
            return DATA['cur_data'].index[int(x)]

        self._update_curdata_()
        data = get_ma(cols=[5, 10])
        ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]
        # color=(0.6, 0.75, 0.6),ylabel=''
        self.axes['axkline'].xaxis.set_major_formatter(
            tk.FuncFormatter(format_date))
        self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(
            5 if DATA['cur_n'] <= 90 else int(DATA['cur_n'] / 18)))

        # 绘制图表
        mpf.plot(DATA['cur_data'],
                 ax=self.axes['axkline'],
                 volume=self.axes['axvol'],
                 addplot=ap,
                 type='candle',
                 style=self.style,
                 datetime_format='%Y%m%d',
                 ylabel='',
                 ylabel_lower='',
                 xrotation=45, warn_too_much_data=DATA['n'])
        plt.show()

    def plot(self, df):
        """ 根据最新的参数,重新绘制整个图表"""

        def set_idx_startend():
            if DATA['cur_start'] is None or DATA['cur_end'] is None:
                if DATA['n'] <= 200:
                    DATA['cur_start'] = 0
                    DATA['cur_end'] = DATA['n']
                elif 200 < DATA['n']:
                    DATA['cur_start'] = 100
                    DATA['cur_end'] = DATA['cur_start'] + 100

        self._set_data_(DATA['freq'],df)
        set_idx_startend()
        self._plot_()

    @classmethod
    def _set_data_(cls,freq,df=None):
        if df is not None:
            DATA['data'] = df
        freqs = {'w':'W-FRI','m':'M','q':'Q','y':'Y'}
        if freq not in freqs:
            DATA['df'] = DATA['data']
        else:
            DATA['df'] = cls.to_freq(DATA['data'],freqs[freq])
        DATA['n'] = len(DATA['df'])

    def radiobtn_on_clicked(self, label):
        pass
        # DATA['freq']=label
        # self._set_data_(DATA['freq'])
        # self._plot_()

    @classmethod
    def _update_curdata_(cls):
        DATA['cur_data'] = DATA['df'].iloc[DATA['cur_start']: DATA['cur_end']]
        DATA['cur_n'] = len(DATA['cur_data'])

    @classmethod
    def _set_curdata_(cls, start, end):
        start = 0 if start < 0 else start
        end = DATA['n'] if end > DATA['n'] else end
        DATA['cur_start'] = start
        DATA['cur_end'] = end
        DATA['cur_data'] = DATA['df'].iloc[start: end]
        DATA['cur_n'] = len(DATA['cur_data'])

    def enter_axes(self, event):
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        if self.cursor is None:
            self.cursor = TCursor(self.axes['axkline'])
            self.fig.canvas.mpl_connect(
                'motion_notify_event', self.cursor.on_mouse_move)
            self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)

        event.canvas.draw_idle()

    def leave_axes(self, event):
        if event.inaxes == self.axes['axkline']:
            self.cursor = None
            event.canvas.draw_idle()
            self._clear_()
            self._plot_()

    def on_butpress(self, event):
        # 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        self.cursor = None
        if event.button != 1:
            return
        if event.xdata is None:
            DATA['cur_x'] = -1
            self.btnpressed = False
        else:
            self.btnpressed = True
            DATA['cur_x'] = int(event.xdata)

    # 鼠标拖动:-鼠标按下移动到新位置后释放
    def on_butrelease(self, event):
        # 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,
        # 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        self.cursor = None
        event.canvas.draw_idle()

        if event.xdata is not None:
            diff = int(event.xdata) - DATA['cur_x']
            start = DATA['cur_start'] - diff
            end = DATA['cur_end'] - diff
            self._set_curdata_(start, end)
            self._clear_()
            self._plot_()
            self.btnpressed = False
            DATA['cur_x'] = int(event.xdata)
        else:
            DATA['cur_x'] = -1

    def on_motion(self, event):  # 鼠标按下拖动,鼠标不按下平移显示光标
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return

        if not self.btnpressed:  # 显示光标
            if event.xdata is None:
                DATA['cur_x'] = -1
                return

            idx = int(event.xdata)
            # 光标超出范围平移数据,不在显示文本
            if idx < 0 or idx >= DATA['cur_n']:
                return
                # if idx < 0:
                #     start = DATA['cur_start'] - 1
                #     end = DATA['cur_end'] - 1
                # else:  # idx >= DATA['cur_n']:
                #     start = DATA['cur_start'] + 1
                #     end = DATA['cur_end'] + 1
                # self.cursor = None
                # self._set_curdata_(start, end)
                # self._clear_()
                # self._plot_()
            else:  # 显示光标文本
                if DATA['cur_start'] < 0:
                    DATA['cur_start'] = 0
                if DATA['cur_end'] > DATA['n']:
                    DATA['cur_end'] = DATA['n']
                self._update_curdata_()
                if self.cursor is None:
                    self.cursor = TCursor(self.axes['axkline'])
                    self.fig.canvas.mpl_connect(
                        'motion_notify_event', self.cursor.on_mouse_move)
                    self.fig.canvas.mpl_connect(
                        'draw_event', self.cursor.on_draw)
        else:
            # 设定平移的左右界限,如果平移后超出界限,则不再平移
            self.cursor = None
            if event.xdata is not None:
                dx = int(event.xdata - DATA['cur_x'])
                start = DATA['cur_start'] - dx
                end = DATA['cur_end'] - dx
                self._set_curdata_(start, end)
                self._clear_()
                self._plot_()

    # 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,
    # 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。

    def on_scroll(self, event):
        if event.inaxes is None or event.inaxes != self.axes['axkline']:
            return
        self.cursor = None
        diff = int(DATA['cur_n'] * 0.1 / 2)
        if event.button == 'down':
            start = DATA['cur_start'] + diff
            end = DATA['cur_end'] - diff
            self._set_curdata_(start, end)
        elif event.button == 'up':
            start = DATA['cur_start'] - diff
            end = DATA['cur_end'] + diff
            self._set_curdata_(start, end)

        self._clear_()
        self._plot_()

    def _clear_(self):
        self.axes['axkline'].clear()
        self.axes['axvol'].clear()
        # for ax in self.axes.values():
        #     ax.cla()
        #     # plt.clf()

    # 键盘按下处理
    def on_key_press(self, event):
        self.cursor = None

        if event.key in ['up', 'down', 'left', 'right']:
            diff = int(DATA['cur_n'] * 0.1 / 2)
            diff = 1 if diff == 0 else diff
            start, end = 0, 0
            if event.key == 'up':  # 缩放
                start = DATA['cur_start'] - diff
                end = DATA['cur_end'] + diff
            elif event.key == 'down':  # 缩放
                start = DATA['cur_start'] + diff
                end = DATA['cur_end'] - diff
            elif event.key == 'left':  # 平移
                start = DATA['cur_start'] - 1
                end = DATA['cur_end'] - 1
            elif event.key == 'right':  # 平移
                start = DATA['cur_start'] + 1
                end = DATA['cur_end'] + 1

            self._set_curdata_(start, end)
            self.axes['axkline'].clear()
            self.axes['axvol'].clear()
            self._plot_()


if __name__ == '__main__':
    # 读取示例数据
    df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')
    df['date'] = pd.to_datetime(df['date'])
    df = df.set_index('date')
    # df1=Candle.to_freq(df)
    # print(df1)

    candle = Candle()
    candle.plot(df)

版本v0.1?

# -*- coding: utf-8 -*-
"""
@Project:TcyQuant
@File:   candle.py
@Auth:   tcy
@Date:   2023/12/21 19:50
@Desc:
@Ver :   0.0.1
@Emial:  3615693665@qq.com
@City:   China Shanghai Songjiang Yexie
"""
import talib
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib import ticker as tk
from matplotlib.widgets import Cursor, MultiCursor


plt.rcParams['font.sans-serif'] = ['simHei']  # 以黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决保存图像符号“-”显示为放块的问题
CURDATA = None


class _TMultiCursor(object):
    """
    一个用于多个子图(横排或者竖排)的十字星光标,可以在多个子图上同时出现
    single=0表示仅仅一个子图显示水平线,所有子图显示垂直线,用于竖排的子图
    single=1表示仅仅一个子图显示垂直线,所有子图显示水平线,用于横排的子图
    注意:为了能让光标响应事件处理,必须保持对它的引用(比如有个变量保存)
    用法:
    import matplotlib.pyplot as plt
    import numpy as np
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2*np.pi*t))
    ax2.plot(t, np.sin(4*np.pi*t))
    cursor = TMultiCursor(fig.canvas, (ax1, ax2), single=0, color='w', lw=0.5)
    plt.show()

    #单子图上面的十字星光标
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2 * np.pi * t))
    ax2.plot(t, np.sin(4 * np.pi * t))

    cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)
    plt.show()

    #多子图上面同时出现十字星光标
    fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
    t = np.arange(0.0, 2.0, 0.01)
    ax1.plot(t, np.sin(2*np.pi*t))
    ax2.plot(t, np.sin(4*np.pi*t))

    # cursor = Cursor(ax1, useblit=True, color='r', lw=0.5)
    cursor = MultiCursor(fig.canvas, (ax1, ax2), useblit=True,
        horizOn=True, vertOn=True, color='r', lw=0.5)
    plt.show()
    """

    def __init__(self, fig, axes, single=0, **lineprops):
        self.canvas = fig.canvas
        self.axes = axes
        self.single = single
        self.text = self.axes[0].text(
            0.72, 0.9, '', transform=self.axes[0].transAxes)
        if not lineprops:
            lineprops = dict(color='gray', lw=0.5, ls='dashdot')
        if single not in [0, 1]:
            raise ValueError(
                'Unrecognized single value: ' +
                str(single) +
                ', must be 0 or 1')

        xmin, xmax = axes[0].get_xlim()
        ymin, ymax = axes[0].get_ylim()
        xmid = 0.5 * (xmin + xmax)
        ymid = 0.5 * (ymin + ymax)

        self.background = None
        self.needclear = False

        lineprops['animated'] = True  # for blt
        self.lines = [
            [ax.axhline(ymid, visible=False, **lineprops) for ax in axes],
            [ax.axvline(xmid, visible=False, **lineprops) for ax in axes]
        ]

        self.canvas.mpl_connect('motion_notify_event', self.onmove)
        self.canvas.mpl_connect('draw_event', self.clear)

    def clear(self, event):
        self.background = (self.canvas.copy_from_bbox(self.canvas.figure.bbox))
        for line in self.lines[0] + self.lines[1]:
            line.set_visible(False)
        self.text.set_visible(False)

    def onmove(self, event):
        if event.inaxes is None:
            return
        if not self.canvas.widgetlock.available(self):
            return

        self.needclear = True

        for i in range(len(self.axes)):
            if event.inaxes == self.axes[i]:
                if self.single == 0:
                    for line in self.lines[1]:
                        line.set_xdata((event.xdata, event.xdata))
                        line.set_visible(True)

                    line = self.lines[0][i]
                    line.set_ydata((event.ydata, event.ydata))
                    line.set_visible(True)
                else:
                    for line in self.lines[0]:
                        line.set_ydata((event.ydata, event.ydata))
                        line.set_visible(True)

                    line = self.lines[1][i]
                    line.set_xdata((event.xdata, event.xdata))
                    line.set_visible(True)
            else:
                self.lines[self.single][i].set_visible(False)

        if self.background is not None:
            # self.text.set_visible(False)
            self.text.set_visible(True)
            x, y = event.xdata, event.ydata
            self.text.set_text('x=%1.2f, y=%1.2f' % (x, y))

            self.canvas.restore_region(self.background)

        for lines in self.lines:
            for line in lines:
                if line.get_visible():
                    line.axes.draw_artist(line)

        self.canvas.blit()
class TCursor:
    """单子图更快的光标"""

    def __init__(self, ax):
        self.ax = ax
        self.background = None

        xmin, xmax = ax.get_xlim()
        ymin, ymax = ax.get_ylim()
        xmid = 0.5 * (xmin + xmax)
        ymid = 0.5 * (ymin + ymax)
        self.horizontal_line = ax.axhline(ymid, color='k', lw=0.8, ls='--')
        self.vertical_line = ax.axvline(xmid, color='k', lw=0.8, ls='--')

        # 轴坐标中的文本位置
        bbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
        self.text = ax.text(0.72, 0.9, '', transform=ax.transAxes, fontsize=8,
                            verticalalignment='top', bbox=bbox, clip_on=False)
        self._creating_background = False
        ax.figure.canvas.mpl_connect('draw_event', self.on_draw)

    def on_draw(self, event):
        self.create_new_background()

    def set_cross_hair_visible(self, visible):
        need_redraw = self.horizontal_line.get_visible() != visible
        self.horizontal_line.set_visible(visible)
        self.vertical_line.set_visible(visible)
        self.text.set_visible(visible)
        return need_redraw

    def create_new_background(self):
        if not self._creating_background:
            self._creating_background = True
            self.set_cross_hair_visible(False)
            self.ax.figure.canvas.draw()
            self.background = self.ax.figure.canvas.copy_from_bbox(
                self.ax.bbox)
            self.set_cross_hair_visible(True)
            self._creating_background = False

    def on_mouse_move(self, event):
        if self.background is None:
            self.create_new_background()

        if not event.inaxes:
            need_redraw = self.set_cross_hair_visible(False)
            if need_redraw:
                self.ax.figure.canvas.restore_region(self.background)
                self.ax.figure.canvas.blit(self.ax.bbox)
        else:
            self.set_cross_hair_visible(True)
            # update the line positions
            x, y = int(event.xdata), int(event.ydata)
            xmin, xmax = self.ax.get_xlim()
            ymin, ymax = self.ax.get_ylim()
            xmid = 0.5 * (xmin + xmax)
            ymid = 0.5 * (ymin + ymax)

            self.horizontal_line.set_ydata([y])
            self.vertical_line.set_xdata([x])
            idx = int(event.xdata)
            _text = '\n'.join((
                r'open: %.1f' % CURDATA["open"].iloc[idx],
                r'high: %.1f' % CURDATA["high"].iloc[idx],
                r'low %.1f' % CURDATA["low"].iloc[idx],
                r'close: %.1f' % CURDATA["close"].iloc[idx],
                r'date:%s' % CURDATA.index[idx].strftime('%Y%m%d')))
            self.text.set_text(_text)
            self.ax.figure.canvas.restore_region(self.background)
            self.ax.draw_artist(self.horizontal_line)
            self.ax.draw_artist(self.vertical_line)
            self.ax.draw_artist(self.text)
            self.ax.figure.canvas.blit(self.ax.bbox)
class Candle:

    @classmethod
    def get_font(cls):
        return {
            '常用': 'Times New Roman',
            # 中文字体
            '黑体': 'SimHei',
            '微软雅黑': 'Microsoft YaHei',
            '微软正黑体': 'Microsoft JhengHei',
            '新宋体': 'NSimSun',
            '新细明体': 'PMingLiU',
            '细明体': 'MingLiU',
            '华文新魏': 'STXinwei',
            '华文行楷': 'STXingkai',
            '华文隶书': 'STLliti',
            '花纹琥珀': 'STHupo',
            '华文彩云': 'STCaiyun',
            '方正姚体': 'FZYaoti',
            '方正舒体': 'FZShuTi',
            '标楷体': 'DFKai-SB',
            '华文仿宋': 'STFangsong',
            '华文中宋': 'STZhongsong',
            '华文宋体': 'STSong',
            '华文楷体': 'STKaiti',
            '华文细黑': 'STXihei',
            '幼圆': 'YouYuan',
            '隶书': 'LiSu',
            '楷体_GB 2313': 'Kaiti_GB2313',
            '仿宋_GB2313': 'FangSong_GB2313',
            '仿宋': 'FangSong'
        }

    @classmethod
    def _get_mystyle_(cls):
        my_color = mpf.make_marketcolors(
            up='r', down='g', edge='inherit',
            wick='inherit', volume='inherit')
        color = '(0.82, 0.83, 0.85)'
        return mpf.make_mpf_style(
            marketcolors=my_color, figcolor=color, gridcolor=color)

    def _init_(self):
        def set_fig():
            font = {'fontname': 'simHei', 'size': '16', 'color': 'black',
                    'weight': 'bold', 'va': 'bottom', 'ha': 'center'}
            fs, fc = (12, 8), (0.82, 0.83, 0.85)
            self.fig = mpf.figure(style=self.style, figsize=fs, facecolor=fc)
            self.fig.text(0.50, 0.94, 'kline day', **font)

        def set_axis():
            axkline = self.fig.add_axes([0.08, 0.25, 0.88, 0.60])
            axvol = self.fig.add_axes([0.08, 0.15, 0.88, 0.10], sharex=axkline)
            axvol.set_ylabel('vol')
            self.axes = {'axkline': axkline, 'axvol': axvol}

        set_fig()
        set_axis()

        self.fig.canvas.mpl_connect('axes_enter_event', self.enter_axes)
        self.fig.canvas.mpl_connect('axes_leave_event', self.leave_axes)
        self.fig.canvas.mpl_connect('button_press_event', self.on_butpress)
        # 拖动-鼠标按下移动到新位置后释放
        self.fig.canvas.mpl_connect('button_release_event', self.on_butrelease)
        # 鼠标按下拖动
        self.fig.canvas.mpl_connect('motion_notify_event', self.on_motion)
        self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)  # 放大缩小
        self.fig.canvas.mpl_connect('key_press_event', self.on_key_press)

        # plt.tight_layout()
        plt.subplots_adjust(hspace=0.2)

    def __init__(self):
        self.style = self._get_mystyle_()
        self.fig = None
        self.axes = {}
        self.cursor = None

        self.df = None
        self.n_data = 0
        self.curdata = None
        self.n_curdata = 0
        self.idx_start = None
        self.idx_end = None
        self.x_curdata = -1
        self.btnpressed = False
        self._init_()

    def _plot_(self):
        """ 根据最新的参数,重新绘制整个图表"""
        def get_ma(cols=[5, 10]):
            def op(x): return talib.MA(self.curdata['close'], timeperiod=x)
            d = {('MA%s' % v): op(v) for v in cols}
            return pd.DataFrame(d)

        def format_date(x, pos):
            if x < 0 or x > len(elf.curdata.index) - 1:
                return ''
            return show_data.index[int(x)]

        global CURDATA
        self.curdata = self.df.iloc[self.idx_start: self.idx_end]
        CURDATA = self.curdata
        self.n_curdata = len(self.curdata)
        data = get_ma(cols=[5, 10])
        ap = [mpf.make_addplot(data, ax=self.axes['axkline'])]
        # color=(0.6, 0.75, 0.6),ylabel=''
        self.axes['axkline'].xaxis.set_major_formatter(
            tk.FuncFormatter(format_date))
        self.axes['axkline'].xaxis.set_major_locator(tk.MultipleLocator(
            5 if self.n_curdata <= 90 else int(self.n_curdata / 18)))

        # 绘制图表
        mpf.plot(self.curdata,
                 ax=self.axes['axkline'],
                 volume=self.axes['axvol'],
                 addplot=ap,
                 type='candle',
                 style=self.style,
                 datetime_format='%Y%m%d',
                 ylabel='',
                 ylabel_lower='',
                 xrotation=45, warn_too_much_data=self.n_data)
        # self.draw_cross()
        plt.show()

    def plot(self, df):
        """ 根据最新的参数,重新绘制整个图表"""

        def set_idx_startend():
            if self.idx_start is None or self.idx_end is None:
                if self.n_data <= 200:
                    self.idx_start = 0
                    self.idx_end = self.n_data
                elif 200 < self.n_data:
                    self.idx_start = 100
                    self.idx_end = self.idx_start + 100

        self.df = df
        self.n_data = len(self.df)
        set_idx_startend()
        self._plot_()

    def enter_axes(self, event):
        ax = event.inaxes
        if ax is None:
            return
        if self.cursor is None:
            self.cursor = TCursor(self.axes['axkline'])
            self.fig.canvas.mpl_connect(
                'motion_notify_event', self.cursor.on_mouse_move)
            self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)

        event.canvas.draw_idle()

    def leave_axes(self, event):
        self.cursor = None
        event.canvas.draw_idle()

    def on_butpress(self, event):
        # 在松开鼠标后才真正移动K线,需记录鼠标按下时的坐标,并标记目前鼠标已按下
        self.cursor = None
        event.canvas.draw_idle()

        if not (event.inaxes == self.axes['axkline']):
            return
        if event.button != 1:
            return
        if event.xdata is None:
            self.x_curdata = -1
            self.btnpressed = False
        else:
            self.btnpressed = True
            self.x_curdata = int(event.xdata)

    # 鼠标拖动:-鼠标按下移动到新位置后释放
    def on_butrelease(self, event):
        # 松开鼠标拖动到新位置与之前鼠标按下时的位置做差求得距离,
        # 在显示90天的基础上修改起止日期,重新获取数据,对子图和画布都做清理后重新显示
        self.cursor = None
        event.canvas.draw_idle()

        if event.xdata is None:
            return

        diff = int(event.xdata) - self.x_curdata
        self.idx_start -= diff
        self.idx_end -= diff
        if self.idx_start < 0:
            self.idx_start = 0
        if self.idx_end > self.n_data:
            self.idx_end = self.n_data

        for ax in self.axes.values():
            ax.cla()
            # plt.clf()

        self._plot_()
        self.btnpressed = False
        self.x_curdata = -1

    def on_motion(self, event):  # 鼠标按下拖动

        if not self.btnpressed:
            ax = event.inaxes
            if ax is None:
                return
            # idx = int(event.xdata)
            # if idx < 0 or idx >= self.n_curdata:
            #     return

            if self.cursor is None:
                self.cursor = TCursor(self.axes['axkline'])
                self.fig.canvas.mpl_connect(
                    'motion_notify_event', self.cursor.on_mouse_move)
                self.fig.canvas.mpl_connect('draw_event', self.cursor.on_draw)
        else:
            self.cursor = None
            if not event.inaxes == self.axes['axkline']:
                return
            if event.xdata is None:
                return

            dx = int(event.xdata - self.x_curdata)
            self.idx_start -= dx
            self.idx_end -= dx
            # 设定平移的左右界限,如果平移后超出界限,则不再平移
            if self.idx_start <= 0:
                self.idx_start = 0
            if self.idx_end > self.n_data:
                self.idx_end = self.n_data

            self.axes['axkline'].clear()
            self.axes['axvol'].clear()
            self._plot_()

    # 设置默认显示90天的数据,当放大缩小时,每次在当前基础上变化10 %,
    # 然后用新的起止日期获取数据,清理当前画布后重新显示新数据。
    def on_scroll(self, event):
        ax = event.inaxes
        if ax is None:
            return

        diff = int(self.n_curdata * 0.1 / 2)
        if event.button == 'down':
            self.idx_start += diff
            self.idx_end -= diff
        elif event.button == 'up':
            self.idx_start -= diff
            self.idx_end += diff
        if self.idx_end > self.n_data:
            self.idx_end = self.n_data
        if self.idx_start < 0:
            self.idx_start = 0
        self.curdata = self.df.iloc[self.idx_start: self.idx_end]
        global CURDATA
        CURDATA = self.curdata
        for ax in self.axes.values():
            ax.cla()
            # plt.clf()
        self._plot_()

        self.cursor = None
        event.canvas.draw_idle()

    # 键盘按下处理
    def on_key_press(self, event):
        self.cursor = None
        event.canvas.draw_idle()

        diff = int(self.n_curdata * 0.1 / 2)
        if diff == 0:
            diff = 1
        if event.key == 'up':  # 向上,看仔细1倍
            self.idx_start -= diff
            self.idx_end += diff
        elif event.key == 'down':  # 向下,看多1倍标的
            self.idx_start += diff
            self.idx_end -= diff
        elif event.key == 'left':
            self.idx_start -= 1
            self.idx_end -= 1
        elif event.key == 'right':
            self.idx_start += 1
            self.idx_end += 1

        if self.idx_start < 0:
            self.idx_start = 0
        if self.idx_end > self.n_data:
            self.idx_end = self.n_data

        self.axes['axkline'].clear()
        self.axes['axvol'].clear()
        self._plot_()


if __name__ == '__main__':
    # 读取示例数据
    df = pd.read_csv(r'D:\futuresdata_cn\data\day\okday\A.csv')
    df['date'] = pd.to_datetime(df['date'])
    df = df.set_index('date')

    candle = Candle()
    candle.plot(df)

?

文章来源:https://blog.csdn.net/tcy23456/article/details/135212065
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。