mycat源码解读2-sql处理

本章解读mycat接收到一条sql命令后使如何处理的

本部分紧接mycat 登录部分的代码阅读.

接下来就是具体的sql指令处理。
从该段代码中可以看到连接成功后,handler指向了FrontendCommandHandler,这里的source是FronendConnection,也就是我们的连接。

  • 登录成功之后的请求均由FrontendCommandHandler处理,再看FrontendCommandHandler的handler方法.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    public void handle(byte[] data)
    {
    if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
    {
    MySQLMessage mm = new MySQLMessage(data);
    int packetLength = mm.readUB3();
    if(packetLength+4==data.length)
    {
    source.loadDataInfileData(data);
    }
    return;
    }
    switch (data[4])
    {
    case MySQLPacket.COM_INIT_DB:
    commands.doInitDB();
    source.initDB(data);
    break;
    case MySQLPacket.COM_QUERY:
    commands.doQuery();
    source.query(data);
    break;
    case MySQLPacket.COM_PING:
    commands.doPing();
    source.ping();
    break;
    case MySQLPacket.COM_QUIT:
    commands.doQuit();
    source.close("quit cmd");
    break;
    case MySQLPacket.COM_PROCESS_KILL:
    commands.doKill();
    source.kill(data);
    break;
    case MySQLPacket.COM_STMT_PREPARE:
    commands.doStmtPrepare();
    source.stmtPrepare(data);
    break;
    case MySQLPacket.COM_STMT_SEND_LONG_DATA:
    commands.doStmtSendLongData();
    source.stmtSendLongData(data);
    break;
    case MySQLPacket.COM_STMT_RESET:
    commands.doStmtReset();
    source.stmtReset(data);
    break;
    case MySQLPacket.COM_STMT_EXECUTE:
    commands.doStmtExecute();
    source.stmtExecute(data);
    break;
    case MySQLPacket.COM_STMT_CLOSE:
    commands.doStmtClose();
    source.stmtClose(data);
    break;
    case MySQLPacket.COM_HEARTBEAT:
    commands.doHeartbeat();
    source.heartbeat(data);
    break;
    default:
    commands.doOther();
    source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
    "Unknown command");
    }
    }
  • 此步骤根据不同的包头解析请求,我们以一次查询为例继续往下看。查询的部分走的代码是

    1
    2
    3
    4
    case MySQLPacket.COM_QUERY:
    commands.doQuery();
    source.query(data);
    break;
  • 其中commands.doQuery()只是做了一次计数器上的变化,告知查询次数+1,无其他动作,看`source.query(data);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public void query(String sql) {
    //一大堆的校验啊,安全啊什么的代码
    ...
    ...
    // 执行查询
    if (queryHandler != null) {
    queryHandler.setReadOnly(privileges.isReadOnly(user));
    queryHandler.query(sql);
    } else {
    writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");
    }
    }
  • 此处的queryHandler为ServerQueryHandler.query.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public void query(String sql) {
    ServerConnection c = this.source;
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
    }
    //
    int rs = ServerParse.parse(sql);
    int sqlType = rs & 0xff;
    switch (sqlType) {
    //一大堆的case分支,我们的查询直接走到default部分
    //explain sql
    default:
    if(readOnly){
    LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
    c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
    break;
    }
    c.execute(sql, rs & 0xff);
    }
    }

sql交给ServerConecction来execute

1
2
3
4
5
6
7
8
public void execute(String sql, int type) {
...
...
//前面是一大堆的状态检查之类的东西,直接看最关键的一句
routeEndExecuteSQL(sql, type, schema);
}

routeEndExecuteSQL(sql, type, schema);

  • 这个方法会根据当前session和sql语句来得出路由信息,RouteResultset rrs 。ServerConnection会根据路由计算结果来执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    @Override
    public void execute(RouteResultset rrs, int type) {
    // clear prev execute resources
    clearHandlesResources();
    if (LOGGER.isDebugEnabled()) {
    StringBuilder s = new StringBuilder();
    LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
    }
    // 检查路由结果是否为空
    RouteResultsetNode[] nodes = rrs.getNodes();
    if (nodes == null || nodes.length == 0 || nodes[0].getName() == null || nodes[0].getName().equals("")) {
    source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
    "No dataNode found ,please check tables defined in schema:" + source.getSchema());
    return;
    }
    boolean autocommit = source.isAutocommit();
    final int initCount = target.size();
    if (nodes.length == 1) {
    singleNodeHandler = new SingleNodeHandler(rrs, this);
    if (this.isPrepared()) {
    singleNodeHandler.setPrepared(true);
    }
    try {
    if(initCount > 1){
    checkDistriTransaxAndExecute(rrs,1,autocommit);
    }else{
    singleNodeHandler.execute();
    }
    } catch (Exception e) {
    LOGGER.warn(new StringBuilder().append(source).append(rrs).toString(), e);
    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
    }
    } else {
    multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this);
    if (this.isPrepared()) {
    multiNodeHandler.setPrepared(true);
    }
    try {
    if(((type == ServerParse.DELETE || type == ServerParse.INSERT || type == ServerParse.UPDATE) && !rrs.isGlobalTable() && nodes.length > 1)||initCount > 1) {
    checkDistriTransaxAndExecute(rrs,2,autocommit);
    } else {
    multiNodeHandler.execute();
    }
    } catch (Exception e) {
    LOGGER.warn(new StringBuilder().append(source).append(rrs).toString(), e);
    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
    }
    }
    if (this.isPrepared()) {
    this.setPrepared(false);
    }
    }
  • 这里有两个分支,singleNodeHandler和MultiNodeQueryHandler。根据路由计算结果需要多少node决定走哪个分支。singleNodeHandler和mulitNoderHandler都继承phsyicNodeHandler,我们现在的sql只对应1个mysql实例,走向了singleNodeHandler,看看他的execute方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public void execute() throws Exception {
    startTime=System.currentTimeMillis();
    ServerConnection sc = session.getSource();
    this.isRunning = true;
    this.packetId = 0;
    final BackendConnection conn = session.getTarget(node);
    LOGGER.debug("rrs.getRunOnSlave() " + rrs.getRunOnSlave());
    node.setRunOnSlave(rrs.getRunOnSlave()); // 实现 master/slave注解
    LOGGER.debug("node.getRunOnSlave() " + node.getRunOnSlave());
    if (session.tryExistsCon(conn, node)) {
    _execute(conn);
    } else {
    // create new connection
    MycatConfig conf = MycatServer.getInstance().getConfig();
    LOGGER.debug("node.getRunOnSlave() " + node.getRunOnSlave());
    node.setRunOnSlave(rrs.getRunOnSlave()); // 实现 master/slave注解
    LOGGER.debug("node.getRunOnSlave() " + node.getRunOnSlave());
    PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
    dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this, node);
    }
    }

final BackendConnection conn = session.getTarget(node);这行代码能根据node数获取到对应的BackendConnection,也就是一个mysqlConnection,这个connection相当于一个mysqlClient,具体的执行singleNodeHandler会通过这个conn来执行。

下一篇讲mysqlConnection是如何被获取并执行的