openresty - lua API(7) - cosocket,请求转发

2018-08-02 18:22:06

Openresty 提供了强大的 cosocket 功能,它源自 nginx 的 upstream 机制,但又超越了 upstream 机制,结合了 nginx 的事件机制和 Lua 协程特性,以同步非阻塞模式实现 socket 编程,可以高效的与后端服务器通信,而且还支持连接池功能。

ngx.socket.tcp

syntax: tcpsock = ngx.socket.tcp()
context: rewrite_by_lua*, access_by_lua*, content_by_lua*, ngx.timer.*, \
        ssl_certificate_by_lua*, ssl_session_fetch_by_lua*

创建一个 tcp 的 cosocket 对象,使用完后最好手动关闭或者通过 setkeepalive 存入连接池。如果没有关闭或者存入连接池,那么如下2中情况发生也会关闭。

1.当前请求的 Lua 句柄结束。
2.被 Lua GC 垃圾回收。

tcpsock:connect

syntax: ok, err = tcpsock:connect(host, port, options_table?)

向远程发起 tcp 或者 unix domain 连接,首先会检查连接池,如果有会优先使用而不会创建新连接,出错返回 nil。

如果使用域名,那么需要配置 resolver,如果返回多个ip,那么会随机选择一个。

resolver 8.8.8.8;  # use Google's public DNS nameserver
location /test {
     resolver 8.8.8.8;

     content_by_lua_block {
         local sock = ngx.socket.tcp()
         local ok, err = sock:connect("www.google.com", 80)
         if not ok then
             ngx.say("failed to connect to google: ", err)
             return
         end
         ngx.say("successfully connected to google!")
         sock:close()
     }
 }

unix domain 例子

 local sock = ngx.socket.tcp()
 local ok, err = sock:connect("unix:/tmp/memcached.sock")
 if not ok then
     ngx.say("failed to connect to the memcached unix domain socket: ", err)
     return
 end

连接超时可以由 lua_socket_connect_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

 local sock = ngx.socket.tcp()
 sock:settimeout(1000)  -- one second timeout
 local ok, err = sock:connect(host, port)

如果重复调用连接,那么之前的连接将会关闭。

tcpsock:sslhandshake

syntax: session, err = tcpsock:sslhandshake(reused_session?, server_name?, 
                        ssl_verify?, send_status_req?)

发起 ssl/tls 握手

tcpsock:send

syntax: bytes, err = tcpsock:send(data)

同步非阻塞发送数据,返回发送的字节数,失败返回 nil。data可以是字符串或者是包含多个字符串的表格,如果表格里有多个字符串待发送,直接放在表格里发送可以省去在 lua 里连接字符串的开销。

发送超时可以由 lua_socket_send_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

 sock:settimeout(1000)  -- one second timeout
 local bytes, err = sock:send(request)

发生错误会自动关闭连接。

tcpsock:receive

syntax: data, err, partial = tcpsock:receive(size)

同步非阻塞接收数据,成功返回接收到的数据,失败返回 nil。

如果 size 是数字,那么读取到 size 个字节后才返回,或者报错返回。

如果 size 为以下值:

*a:读取数据直到连接关闭。
*l:读取一行,换行符不会返回

如果没指定,默认为读取一行。

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

sock:settimeout(1000)  -- one second timeout
 local line, err, partial = sock:receive()
 if not line then
     ngx.say("failed to read a line: ", err)
     return
 end
 ngx.say("successfully read a line: ", line)

超时错误不会关闭连接,其他错误会自动关闭连接。

tcpsock:receiveany

syntax: data, err = tcpsock:receiveany(max)

同步非阻塞接收数据,最大为 max 字节,成功返回接收到的数据,失败返回 nil。

如果接收的数据超过了 max,那么值返回 max 字节,其他的数据在下次读取时返回。

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。 

sock:settimeouts(1000, 1000, 1000)  -- 1秒超时,同时设置3个 connect/read/write
 local data, err = sock:receiveany(10 * 1024 * 1024) -- read any data, at most 10K
 if not data then
     ngx.say("failed to read any data:",err)
     return
 end
 ngx.say("successful:", data)

超时错误不会关闭连接,其他错误会自动关闭连接。

 tcpsock:receiveuntil

syntax: iterator = tcpsock:receiveuntil(pattern, options?)

返回迭代器函数,可以持续调用来读取数据。

 local reader = sock:receiveuntil("\r\n--abcedhb")
 local data, err, partial = reader()
 if not data then
     ngx.say("failed to read the data stream: ", err)
 end
 ngx.say("read the data stream: ", data)

如果接收到的数据为 'hello, world! -agentzh\r\n--abcedhb blah blah',那么将返回 'hello, world! -agentzh'。

迭代器函数可以指定 size ,暗示读取多少个字节。

 local reader = sock:receiveuntil("\r\n--abcedhb")

 while true do
     local data, err, partial = reader(4)
     if not data then
         if err then
             ngx.say("failed to read the data stream: ", err)
             break
         end

         ngx.say("read done")
         break
     end
     ngx.say("read chunk: [", data, "]")
 end
read chunk: [hell]
read chunk: [o, w]
read chunk: [orld]
read chunk: [! -a]
read chunk: [gent]
read chunk: [zh]
read done

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。  

 local readline = sock:receiveuntil("\r\n")

 sock:settimeout(1000)  -- one second timeout
 line, err, partial = readline()
 if not line then
     ngx.say("failed to read a line: ", err)
     return
 end
 ngx.say("successfully read a line: ", line)

如果在 option 参数里指定,inclusive = true,那么里面的分割字符串也会返回。

 local reader = tcpsock:receiveuntil("_END_", { inclusive = true })
 local data = reader()
 ngx.say(data)

比如发送 'hello world _END_ blah blah blah',那么将受到 'hello world _END_'。

超时错误不会关闭连接,其他错误会自动关闭连接。

tcpsock:close

syntax: ok, err = tcpsock:close()

主动关闭连接,成功返回 1,失败返回 nil。

如果调用了 setkeepalive,则无需调用 close。

tcpsock:settimeout

syntax: tcpsock:settimeout(time)

设置超时时间,单位毫秒,优先级高于全局设置 lua_socket_connect_timeout、lua_socket_send_timeout、and lua_socket_read_timeout。

这个函数不会影响 lua_socket_keepalive_timeout 设置,应该由 setkeepalive(timeout) 来设置。

tcpsock:settimeouts

syntax: tcpsock:settimeouts(connect_timeout, send_timeout, read_timeout)

同时设置连接、发送、接收的超时时间,单位毫秒。

tcpsock:setoption

syntax: tcpsock:setoption(option, value?)

设置连接参数,目前暂时没有实现。

tcpsock:setkeepalive

syntax: ok, err = tcpsock:setkeepalive(timeout?, size?)

将连接存入连接池,成功返回 1,失败返回 nil。调用该函数了,就没必要再调用 close()。

注意:这个设置是针对单个 worker 进程的。

timeout  指定在连接池里多久没被使用则关闭的超时时间,单位毫秒,默认为 lua_socket_keepalive_timeout 设置的时间。

size         连接池可以存放多少个相同key的连接。tcp下key为host-port。unix domain下key为文件路径。超过了该数量,最旧的连接将会被关闭以便腾出空间。默认为 lua_socket_pool_size 设置的大小。

 如果当前连接缓冲区还有未读取完的数据,那么将会报错,err为 'connection in dubious state'。

tcpsock:getreusedtimes

syntax: count, err = tcpsock:getreusedtimes()

返回当前连接被重用的次数,如果是新连接或者是更本没有存入到连接池,总是返回0。如果该连接是从连接池里拿出来的,肯定返回大于0的数值。所以可以用来判断当前连接是否来自连接池。失败返回 nil。

ngx.socket.connect

syntax: tcpsock, err = ngx.socket.connect(host, port)
syntax: tcpsock, err = ngx.socket.connect("unix:/path/to/unix-domain.socket")

将 ngx.socket.tcp() 和 connect() 组合成一个函数完成,类似于下面代码的组合

 local sock = ngx.socket.tcp()
 local ok, err = sock:connect(...)
 if not ok then
     return nil, err
 end
 return sock

调用此函数就不能通过 settimeout 函数来设置连接超时时间了,必须使用 lua_socket_connect_timeout 指令。

例子

location /tcp {
    content_by_lua_block {
        local sock = ngx.socket.tcp()
        local ok, err =	sock:connect("127.0.0.1", 8888)
        if not ok then
            ngx.say("failed to connect to 8888:", err)
            return
        end
        ngx.say("successfully connected")
		
        local bytes, err = sock:send("hello\n")
		
        while true do
            local line,	err, partial = sock:receive()
            if line then
                ngx.say("received:", line)
                ngx.flush()  #为了实时看到输出
            else
                ngx.say("err:",	err)
                ngx.say("line:", line)
                break;
            end
			
        end
		
        sock:close()
    }
}

接下来利用linux命令 nc 来建立 tcp 服务器,监听 8888 端口。

[root@192 ~]# nc -l 8888
hello
aaa
bbb
ccc
ddd
eee
^C

然后开始请求,此时会阻塞,上面会受到 hello,然后在上面nc命令里可以发送数据。

[root@192 ~]# curl localhost/tcp
successfully connected
received:aaa
received:bbb
received:ccc
received:ddd
received:eee
err:closed
line:nil


ngx.socket.udp

syntax: udpsock = ngx.socket.udp()

创建 udp 或者 基于数据报的 unix domain cosocket 对象。

udpsock:setpeername

syntax: ok, err = udpsock:setpeername(host, port)

syntax: ok, err = udpsock:setpeername("unix:/path/to/unix-domain.socket")

成功返回1,失败返回 nil。 因为 udp 是无连接,这个只是用来标记,以便接下的读取发送。 

如果使用域名,那么需要配置 resolver,如果返回多个ip,那么会随机选择一个。  

resolver 8.8.8.8;  # use Google's public DNS nameserver
location /test {
     resolver 8.8.8.8;

     content_by_lua_block {
         local sock = ngx.socket.udp()
         local ok, err = sock:setpeername("my.memcached.server.domain", 11211)
         if not ok then
             ngx.say("failed to connect to memcached: ", err)
             return
         end
         ngx.say("successfully connected to memcached!")
         sock:close()
     }
 }

unix domain

 local sock = ngx.socket.udp()
 local ok, err = sock:setpeername("unix:/tmp/some-datagram-service.sock")
 if not ok then
     ngx.say("failed to connect to the datagram unix domain socket: ", err)
     return
 end

注意,多次调用将会覆盖前面的。

udpsock:send

syntax: ok, err = udpsock:send(data)

发送数据,成功返回1,失败返回 nil。data可以是字符串或者是包含多个字符串的表格,如果表格里有多个字符串待发送,直接放在表格里发送可以省去在 lua 里连接字符串的开销。

udpsock:receive

syntax: data, err = udpsock:receive(size?)

接收数据,成功返回数据,失败返回 nil。

如果指定 size,那么 接收缓冲区大小就为 size,如果超过 8192 则会使用 8192。默认就为8192。

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。 

 sock:settimeout(1000)  -- one second timeout
 local data, err = sock:receive()
 if not data then
     ngx.say("failed to read a packet: ", err)
     return
 end
 ngx.say("successfully read a packet: ", data)

udpsock:close

syntax: ok, err = udpsock:close()

主动关闭,成功返回1,失败返回nil。

udpsock:settimeout

syntax: udpsock:settimeout(time)

设置读取超时时间,单位毫秒,优先级高于 lua_socket_read_timeout。


备注

1.测试环境centos7 64位,openresty 版本为 1.13.6.2。
2..原文地址http://www.freecls.com/a/2712/f5


©著作权归作者所有
收藏
推荐阅读
简介
天降大任于斯人也,必先苦其心志。