mycat源码解读1-登录mycat

mycat的启动过程及mycat登录部分代码

mycat的启动类是MycatStartup,启动的时候会启动两个Server。分别是FrontendServer和ManagerServer。ManageServer先不管,暂时关注于FrontendServer.

  • FrontendServer的初始化为

    1
    2
    3
    4
    5
    ServerConnectionFactory sf = new ServerConnectionFactory();
    ...
    ...
    server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
    + "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool);
  • 再看ServerConnectionFactory的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class ServerConnectionFactory extends FrontendConnectionFactory {
    @Override
    protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
    SystemConfig sys = MycatServer.getInstance().getConfig().getSystem();
    //使用该connection,channel丢给该connection。
    ServerConnection c = new ServerConnection(channel);
    MycatServer.getInstance().getConfig().setSocketParams(c, true);
    c.setPrivileges(MycatPrivileges.instance());
    c.setQueryHandler(new ServerQueryHandler(c));
    c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c));
    c.setPrepareHandler(new ServerPrepareHandler(c));
    c.setTxIsolation(sys.getTxIsolation());
    c.setSession2(new NonBlockingSession(c));
    return c;
    }
    }
  • 每次请求获得的Connection为ServerConnection。ServerConnection直接使用父类的构造方法.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public FrontendConnection(NetworkChannel channel) throws IOException {
    super(channel);
    InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress();
    InetSocketAddress remoteAddr = null;
    if (channel instanceof SocketChannel) {
    remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();
    } else if (channel instanceof AsynchronousSocketChannel) {
    remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress();
    }
    this.host = remoteAddr.getHostString();
    this.port = localAddr.getPort();
    this.localPort = remoteAddr.getPort();
    //将handler指向为FrontendAuthenticator
    this.handler = new FrontendAuthenticator(this);
    }
  • 可以看到使用的handler为FrontendAuthenticator,该handler起个认证的作用。看该类的handle方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    public void handle(byte[] data) {
    // check quit packet
    if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM_QUIT) {
    source.close("quit packet");
    return;
    }
    AuthPacket auth = new AuthPacket();
    auth.read(data);
    ...
    ...
    default:
    success(auth);
    }
    }
    protected void success(AuthPacket auth) {
    source.setAuthenticated(true);
    source.setUser(auth.user);
    source.setSchema(auth.database);
    source.setCharsetIndex(auth.charsetIndex);
    //成功后将handler指向FrontendCommandHandler
    source.setHandler(new FrontendCommandHandler(source));
    if (LOGGER.isInfoEnabled()) {
    StringBuilder s = new StringBuilder();
    s.append(source).append('\'').append(auth.user).append("' login success");
    byte[] extra = auth.extra;
    if (extra != null && extra.length > 0) {
    s.append(",extra:").append(new String(extra));
    }
    LOGGER.info(s.toString());
    }
    ByteBuffer buffer = source.allocate();
    source.write(source.writeToBuffer(AUTH_OK, buffer));
    boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & auth.clientFlags);
    boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ;
    if(clientCompress&&usingCompress)
    {
    source.setSupportCompress(true);
    }
    }
  • 到此步骤为止,完成登录验证。登录成功后FrontendCOnnection的handler指向了FrontendCommandHandler。后续继续阅读mycat 请求sql部分代码。