本章解读mycat接收到一条sql命令后使如何处理的
本部分紧接mycat 登录部分的代码阅读.
接下来就是具体的sql指令处理。
从该段代码中可以看到连接成功后,handler指向了FrontendCommandHandler,这里的source是FronendConnection,也就是我们的连接。
登录成功之后的请求均由FrontendCommandHandler处理,再看FrontendCommandHandler的handler方法.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465public void handle(byte[] data){if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData()){MySQLMessage mm = new MySQLMessage(data);int packetLength = mm.readUB3();if(packetLength+4==data.length){source.loadDataInfileData(data);}return;}switch (data[4]){case MySQLPacket.COM_INIT_DB:commands.doInitDB();source.initDB(data);break;case MySQLPacket.COM_QUERY:commands.doQuery();source.query(data);break;case MySQLPacket.COM_PING:commands.doPing();source.ping();break;case MySQLPacket.COM_QUIT:commands.doQuit();source.close("quit cmd");break;case MySQLPacket.COM_PROCESS_KILL:commands.doKill();source.kill(data);break;case MySQLPacket.COM_STMT_PREPARE:commands.doStmtPrepare();source.stmtPrepare(data);break;case MySQLPacket.COM_STMT_SEND_LONG_DATA:commands.doStmtSendLongData();source.stmtSendLongData(data);break;case MySQLPacket.COM_STMT_RESET:commands.doStmtReset();source.stmtReset(data);break;case MySQLPacket.COM_STMT_EXECUTE:commands.doStmtExecute();source.stmtExecute(data);break;case MySQLPacket.COM_STMT_CLOSE:commands.doStmtClose();source.stmtClose(data);break;case MySQLPacket.COM_HEARTBEAT:commands.doHeartbeat();source.heartbeat(data);break;default:commands.doOther();source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,"Unknown command");}}此步骤根据不同的包头解析请求,我们以一次查询为例继续往下看。查询的部分走的代码是
1234case MySQLPacket.COM_QUERY:commands.doQuery();source.query(data);break;其中commands.doQuery()只是做了一次计数器上的变化,告知查询次数+1,无其他动作,看`source.query(data);
123456789101112131415public void query(String sql) {//一大堆的校验啊,安全啊什么的代码......// 执行查询if (queryHandler != null) {queryHandler.setReadOnly(privileges.isReadOnly(user));queryHandler.query(sql);} else {writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");}}此处的queryHandler为ServerQueryHandler.query.
1234567891011121314151617181920212223public void query(String sql) {ServerConnection c = this.source;if (LOGGER.isDebugEnabled()) {LOGGER.debug(new StringBuilder().append(c).append(sql).toString());}//int rs = ServerParse.parse(sql);int sqlType = rs & 0xff;switch (sqlType) {//一大堆的case分支,我们的查询直接走到default部分//explain sqldefault:if(readOnly){LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");break;}c.execute(sql, rs & 0xff);}}
sql交给ServerConecction来execute
routeEndExecuteSQL(sql, type, schema);
这个方法会根据当前session和sql语句来得出路由信息,RouteResultset rrs 。ServerConnection会根据路由计算结果来执行。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758@Overridepublic void execute(RouteResultset rrs, int type) {// clear prev execute resourcesclearHandlesResources();if (LOGGER.isDebugEnabled()) {StringBuilder s = new StringBuilder();LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");}// 检查路由结果是否为空RouteResultsetNode[] nodes = rrs.getNodes();if (nodes == null || nodes.length == 0 || nodes[0].getName() == null || nodes[0].getName().equals("")) {source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,"No dataNode found ,please check tables defined in schema:" + source.getSchema());return;}boolean autocommit = source.isAutocommit();final int initCount = target.size();if (nodes.length == 1) {singleNodeHandler = new SingleNodeHandler(rrs, this);if (this.isPrepared()) {singleNodeHandler.setPrepared(true);}try {if(initCount > 1){checkDistriTransaxAndExecute(rrs,1,autocommit);}else{singleNodeHandler.execute();}} catch (Exception e) {LOGGER.warn(new StringBuilder().append(source).append(rrs).toString(), e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());}} else {multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this);if (this.isPrepared()) {multiNodeHandler.setPrepared(true);}try {if(((type == ServerParse.DELETE || type == ServerParse.INSERT || type == ServerParse.UPDATE) && !rrs.isGlobalTable() && nodes.length > 1)||initCount > 1) {checkDistriTransaxAndExecute(rrs,2,autocommit);} else {multiNodeHandler.execute();}} catch (Exception e) {LOGGER.warn(new StringBuilder().append(source).append(rrs).toString(), e);source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());}}if (this.isPrepared()) {this.setPrepared(false);}}这里有两个分支,singleNodeHandler和MultiNodeQueryHandler。根据路由计算结果需要多少node决定走哪个分支。singleNodeHandler和mulitNoderHandler都继承phsyicNodeHandler,我们现在的sql只对应1个mysql实例,走向了singleNodeHandler,看看他的execute方法
1234567891011121314151617181920212223242526public void execute() throws Exception {startTime=System.currentTimeMillis();ServerConnection sc = session.getSource();this.isRunning = true;this.packetId = 0;final BackendConnection conn = session.getTarget(node);LOGGER.debug("rrs.getRunOnSlave() " + rrs.getRunOnSlave());node.setRunOnSlave(rrs.getRunOnSlave()); // 实现 master/slave注解LOGGER.debug("node.getRunOnSlave() " + node.getRunOnSlave());if (session.tryExistsCon(conn, node)) {_execute(conn);} else {// create new connectionMycatConfig conf = MycatServer.getInstance().getConfig();LOGGER.debug("node.getRunOnSlave() " + node.getRunOnSlave());node.setRunOnSlave(rrs.getRunOnSlave()); // 实现 master/slave注解LOGGER.debug("node.getRunOnSlave() " + node.getRunOnSlave());PhysicalDBNode dn = conf.getDataNodes().get(node.getName());dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this, node);}}
final BackendConnection conn = session.getTarget(node);
这行代码能根据node数获取到对应的BackendConnection,也就是一个mysqlConnection,这个connection相当于一个mysqlClient,具体的执行singleNodeHandler会通过这个conn来执行。
下一篇讲mysqlConnection是如何被获取并执行的