web后台架构设计

白俊遥博客

docker-compose部署msyql主从同步,mysql-master,mysql-slave,redis,elasticsearch

白俊遥博客

项目服务:

  1. elasticsearch:用于存储系统日志,在kibana上可视化;mysql数据同步es ,用于app上filter查询

        系统日志:1.存储在es  2.存储在服务器log文件

            aws上es管理:https://ap-south-1.console.aws.amazon.com/es/home?region=ap-south-1

            kibana地址:https://search-******-d4wrxzi5nw5xlaboupwtl26aka.ap-south-1.es.amazonaws.com/_plugin/kibana/

白俊遥博客


定期清理一个月前的日志:(es的具体用法详见有道云笔记)

def es_log_clean():
    delete_query = {
        "query": {
            "range": {
                "now": {
                    "lt": "now-30d/d"
                }
            }
        }
    }
    res = es.delete_by_query(index=index_name, body=delete_query)
    print(res)
    
或者:   
POST /pro_api_log/_delete_by_query
{
    "query": {
        "range" : {
            "now" : {
                "lt" :  "now-30d/d"
            }
        }
    }
}

       服务器日志(按照天来切分): 

def init_logger(logger_name, config, spe_name=None):
    # [%(pathname)s:%(lineno)d]
    formatter = logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s")

    # create a logger
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)

    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    console_handler.setFormatter(formatter)

    # File Handler
    if spe_name and isinstance(spe_name, str):
        file_handler = logging.handlers.TimedRotatingFileHandler(spe_name, 'D', 1, 10, 'UTF-8')
    else:
        file_handler = logging.handlers.TimedRotatingFileHandler('/var/log/wemore.log', 'D', 1, 30, 'UTF-8')
    file_handler.setLevel(config.log_level)
    file_handler.setFormatter(formatter)

    # email_handler = logging.handlers.SMTPHandler(
    #     (config.email["Server"], config.email["Port"]),
    #     config.email["From"],
    #     config.email["To"],
    #     config.email["Subject"],
    #     credentials=(config.email["From"], config.email["Credit"])
    # )
    # email_handler.setLevel(config.email['Level'])

    # 给logger添加Handler
    # logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    # logger.addHandler(email_handler)

    return logger


log = init_logger('flask_app', conf)

    mysql数据同步到es:实现原理:python-mysql-replication是基于MySQL复制原理实现的,把自己伪装成一个slave不断的从MySQL数据库获取binlog并解析。

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import *
from pymysqlreplication.event import RotateEvent

因为mysql-master,mysql-slave的server_id为1,2,所以设置为3

# 如果是同步集群的binlog日志,这里的server_id不能与集群中的server_id相同
stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                            server_id=3,
                            blocking=True,
                            only_events=only_events,
                            only_schemas=only_schemas,
                            only_tables=only_tables,
                            log_file=log_file,
                            log_pos=log_pos,
                            resume_stream=True)  # log_file与log_pos参数只有在resume_stream为True的时候才有效
for binlogevent in stream:
    ct = (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
    print("%s : --%s--" % (ct, binlogevent.__class__.__name__), flush=True)
    print(binlogevent.dump())
    handler = None
    if binlogevent.__class__.__name__ == "UpdateRowsEvent":
        handler = UpdateRowsEventHandler(binlogevent.table, 'update', binlogevent.rows)
    elif binlogevent.__class__.__name__ == "WriteRowsEvent":
        handler = WriteRowsEventHandler(binlogevent.table, 'write', binlogevent.rows)
    elif binlogevent.__class__.__name__ == "DeleteRowsEvent":
        handler = DeleteRowsEventHandler(binlogevent.table, 'delete', binlogevent.rows)
    else:
        NotImplemented

    if not handler:
        continue


打赏,支持一下

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

Powered by WEIXIA.XIN,学的不仅是技术,更是梦想!!!

Davidvivi博客
请先登录后发表评论
  • 最新评论
  • 总共 0条评论