diff --git a/CHANGELOG.md b/CHANGELOG.md index c5e85fec0..acc844e14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - Correct FAPI header to `x-fapi-interaction-id` [PR #1557](https://github.com/3scale/APIcast/pull/1557) [THREESCALE-11957](https://issues.redhat.com/browse/THREESCALE-11957) - Only validate oidc setting if authentication method is set to oidc [PR #1568](https://github.com/3scale/APIcast/pull/1568) [THREESCALE-11441](https://issues.redhat.com/browse/THREESCALE-11441) +- Reduce memory consumption when returning large response that has been routed through a proxy server. [PR #1572](https://github.com/3scale/APIcast/pull/1572) [THREESCALE-12258](https://issues.redhat.com/browse/THREESCALE-12258) ### Added - Update APIcast schema manifest [PR #1550](https://github.com/3scale/APIcast/pull/1550) diff --git a/gateway/src/apicast/http_proxy.lua b/gateway/src/apicast/http_proxy.lua index 0481b2a9c..00a64caf2 100644 --- a/gateway/src/apicast/http_proxy.lua +++ b/gateway/src/apicast/http_proxy.lua @@ -1,7 +1,9 @@ local format = string.format local tostring = tostring +local ngx = ngx local ngx_get_method = ngx.req.get_method local ngx_http_version = ngx.req.http_version +local ngx_req_get_headers = ngx.req.get_headers local resty_url = require "resty.url" local url_helper = require('resty.url_helper') @@ -11,7 +13,7 @@ local file_reader = require("resty.file").file_reader local file_size = require("resty.file").file_size local client_body_reader = require("resty.http.request_reader").get_client_body_reader local send_response = require("resty.http.response_writer").send_response -local concat = table.concat +local proxy_response = require("resty.http.response_writer").proxy_response local _M = { } @@ -49,9 +51,9 @@ local function forward_https_request(proxy_uri, uri, proxy_opts) local sock local opts = proxy_opts or {} local req_method = ngx_get_method() - local encoding = ngx.req.get_headers()["Transfer-Encoding"] + local encoding = ngx_req_get_headers()["Transfer-Encoding"] local is_chunked = encoding and encoding:lower() == "chunked" - local content_type = ngx.req.get_headers()["Content-Type"] + local content_type = ngx_req_get_headers()["Content-Type"] local content_type_is_urlencoded = content_type and content_type:lower() == "application/x-www-form-urlencoded" local raw = false @@ -138,9 +140,9 @@ local function forward_https_request(proxy_uri, uri, proxy_opts) local request = { uri = uri, - method = ngx.req.get_method(), - headers = ngx.req.get_headers(0, true), - path = format('%s%s%s', ngx.var.uri, ngx.var.is_args, ngx.var.query_string or ''), + method = req_method, + headers = ngx_req_get_headers(0, true), + path = (ngx.var.uri or '') .. (ngx.var.is_args or '') .. (ngx.var.query_string or ''), body = body, proxy_uri = proxy_uri, proxy_options = opts @@ -159,13 +161,19 @@ local function forward_https_request(proxy_uri, uri, proxy_opts) if res then if opts.request_unbuffered and raw then - local bytes, err = send_response(sock, res, DEFAULT_CHUNKSIZE) - if not bytes then + err = send_response(sock, res, DEFAULT_CHUNKSIZE) + if err then ngx.log(ngx.ERR, "failed to send response: ", err) - return sock:send("HTTP/1.1 502 Bad Gateway") + sock:close() + return ngx.exit(ngx.HTTP_BAD_GATEWAY) end else - httpc:proxy_response(res) + err = proxy_response(res, DEFAULT_CHUNKSIZE) + if err then + ngx.log(ngx.ERR, 'failed to proxy request to: ', proxy_uri, ' err : ', err) + httpc:close() + return + end httpc:set_keepalive() end else @@ -194,7 +202,7 @@ function _M.request(upstream, proxy_uri) local proxy_auth if proxy_uri.user or proxy_uri.password then - proxy_auth = "Basic " .. ngx.encode_base64(concat({ proxy_uri.user or '', proxy_uri.password or '' }, ':')) + proxy_auth = "Basic " .. ngx.encode_base64((proxy_uri.user or '') .. ":" .. (proxy_uri.password or '')) end if uri.scheme == 'http' then -- rewrite the request to use http_proxy diff --git a/gateway/src/resty/http/response_writer.lua b/gateway/src/resty/http/response_writer.lua index fc320512d..77aaacfbf 100644 --- a/gateway/src/resty/http/response_writer.lua +++ b/gateway/src/resty/http/response_writer.lua @@ -1,5 +1,9 @@ local fmt = string.format local str_lower = string.lower +local insert = table.insert +local concat = table.concat + +local ngx = ngx local _M = { } @@ -29,10 +33,48 @@ local function send(socket, data) return socket:send(data) end +local function send_chunk(chunk) + if not chunk then + return nil + end + + local ok, err = ngx.print(chunk) + if not ok then + return "output response failed: " .. (err or "") + end + + return nil +end + +-- forward_body reads chunks from a body_reader and passes them to the callback +-- function cb. +-- cb(chunk) should return a true on success, or nil/false, err on failure. +local function forward_body(reader, cb, chunksize) + if not reader then + return "no body reader" + end + + local buffer_size = chunksize or 65536 + + repeat + local buffer, read_err, send_err + buffer, read_err = reader(buffer_size) + if read_err then + return "failed to read response body: " .. read_err + end + + if buffer then + send_err = cb(buffer) + if send_err then + return "failed to send response body: " .. (send_err or "unknown") + end + end + until not buffer +end + -- write_response writes response body reader to sock in the HTTP/1.x server response format, -- The connection is closed if send() fails or when returning a non-zero function _M.send_response(sock, response, chunksize) - local bytes, err chunksize = chunksize or 65536 if not response then @@ -41,53 +83,51 @@ function _M.send_response(sock, response, chunksize) end if not sock then - return nil, "socket not initialized yet" + return "socket not initialized yet" end - -- Status line - -- TODO: get HTTP version from request - local status = fmt("HTTP/%d.%d %03d %s\r\n", 1, 1, response.status, response.reason) - bytes, err = send(sock, status) - if not bytes then - return nil, "failed to send status line, err: " .. (err or "unknown") - end + -- Build status line + headers into a single buffer to minimize send() calls + local buf = { + fmt("HTTP/1.1 %03d %s\r\n", response.status, response.reason) + } -- Filter out hop-by-hop headeres for k, v in pairs(response.headers) do if not HOP_BY_HOP_HEADERS[str_lower(k)] then - local header = fmt("%s: %s\r\n", k, v) - bytes, err = sock:send(header) - if not bytes then - return nil, "failed to send status line, err: " .. (err or "unknown") - end + insert(buf, k .. ": " .. v .. cr_lf) end end -- End-of-header - bytes, err = send(sock, cr_lf) + insert(buf, cr_lf) + + local bytes, err = sock:send(concat(buf)) if not bytes then - return nil, "failed to send status line, err: " .. (err or "unknown") + return "failed to send headers, err: " .. (err or "unknown") end - -- Write body - local reader = response.body_reader - repeat - local chunk, read_err - - chunk, read_err = reader(chunksize) - if read_err then - return nil, "failed to read response body, err: " .. (err or "unknown") + return forward_body(response.body_reader, function(chunk) + bytes, err = send(sock, chunk) + if not bytes then + return "failed to send response body, err: " .. (err or "unknown") end + end, chunksize) +end - if chunk then - bytes, err = send(sock, chunk) - if not bytes then - return nil, "failed to send response body, err: " .. (err or "unknown") - end - end - until not chunk +function _M.proxy_response(res, chunksize) + if not res then + ngx.log(ngx.ERR, "no response provided") + return + end + + ngx.status = res.status + for k, v in pairs(res.headers) do + if not HOP_BY_HOP_HEADERS[str_lower(k)] then + ngx.header[k] = v + end + end - return true, nil + return forward_body(res.body_reader, send_chunk, chunksize) end return _M diff --git a/spec/http_proxy_spec.lua b/spec/http_proxy_spec.lua index a4c5b677f..7d74fad0d 100644 --- a/spec/http_proxy_spec.lua +++ b/spec/http_proxy_spec.lua @@ -20,6 +20,8 @@ describe('http_proxy', function() local resty_http_proxy = require 'resty.http.proxy' stub(resty_http_proxy, 'new', function() return httpc end) + local http_writer = require 'resty.http.response_writer' + stub(http_writer, 'proxy_response') end before_each(function() diff --git a/spec/resty/http/response_writer_spec.lua b/spec/resty/http/response_writer_spec.lua new file mode 100644 index 000000000..995dd004c --- /dev/null +++ b/spec/resty/http/response_writer_spec.lua @@ -0,0 +1,399 @@ +local response_writer = require('resty.http.response_writer') + + +describe('resty.http.response_writer', function() + + local mock_sock + local sent_data + + local function make_response(opts) + opts = opts or {} + local chunks = opts.chunks or { "hello" } + local idx = 0 + return { + status = opts.status or 200, + reason = opts.reason or "OK", + headers = opts.headers or {}, + body_reader = function() + idx = idx + 1 + return chunks[idx] + end + } + end + + before_each(function() + sent_data = {} + mock_sock = { + send = function(_, data) + table.insert(sent_data, data) + return #data, nil + end + } + + stub(ngx, 'log') + end) + + describe('.send_response', function() + + it('returns nil when no response is provided', function() + local err = response_writer.send_response(mock_sock, nil) + assert.is_nil(err) + end) + + it('returns error when no socket is provided', function() + local err = response_writer.send_response(nil, make_response()) + assert.equal("socket not initialized yet", err) + end) + + it('sends status line in first send', function() + local response = make_response({ status = 200, reason = "OK" }) + response_writer.send_response(mock_sock, response) + + assert.truthy(string.find(sent_data[1], "^HTTP/1.1 200 OK\r\n")) + end) + + it('formats different status codes', function() + local response = make_response({ status = 404, reason = "Not Found" }) + response_writer.send_response(mock_sock, response) + + assert.truthy(string.find(sent_data[1], "^HTTP/1.1 404 Not Found\r\n")) + end) + + it('batches status line and headers in a single send', function() + local response = make_response({ + headers = { ["Content-Type"] = "text/plain", ["X-Custom"] = "value" } + }) + response_writer.send_response(mock_sock, response) + + -- First send contains status line + headers + end-of-header CRLF + local header_block = sent_data[1] + assert.truthy(string.find(header_block, "^HTTP/1.1")) + assert.truthy(string.find(header_block, "Content%-Type: text/plain\r\n")) + assert.truthy(string.find(header_block, "X%-Custom: value\r\n")) + assert.truthy(string.find(header_block, "\r\n\r\n$")) + end) + + it('filters hop-by-hop headers', function() + local response = make_response({ + headers = { + ["Connection"] = "keep-alive", + ["Keep-Alive"] = "timeout=5", + ["Transfer-Encoding"] = "chunked", + ["Proxy-Authenticate"] = "Basic", + ["Proxy-Authorization"] = "Basic abc", + ["TE"] = "trailers", + ["Trailers"] = "Expires", + ["Upgrade"] = "websocket", + ["Content-Length"] = "5", + ["X-Custom"] = "value", + } + }) + response_writer.send_response(mock_sock, response) + + local all_sent = table.concat(sent_data) + assert.falsy(string.find(all_sent, "Connection:")) + assert.falsy(string.find(all_sent, "Keep%-Alive:")) + assert.falsy(string.find(all_sent, "Transfer%-Encoding:")) + assert.falsy(string.find(all_sent, "Proxy%-Authenticate:")) + assert.falsy(string.find(all_sent, "Proxy%-Authorization:")) + assert.falsy(string.find(all_sent, "TE:")) + assert.falsy(string.find(all_sent, "Trailers:")) + assert.falsy(string.find(all_sent, "Upgrade:")) + assert.falsy(string.find(all_sent, "Content%-Length:")) + assert.truthy(string.find(all_sent, "X%-Custom: value")) + end) + + it('filters hop-by-hop headers case-insensitively', function() + local response = make_response({ + headers = { + ["CONNECTION"] = "close", + ["KEEP-ALIVE"] = "timeout=5", + } + }) + response_writer.send_response(mock_sock, response) + + local all_sent = table.concat(sent_data) + assert.falsy(string.find(all_sent, "CONNECTION:")) + assert.falsy(string.find(all_sent, "KEEP%-ALIVE:")) + end) + + it('sends end-of-header marker', function() + local response = make_response({ headers = {} }) + response_writer.send_response(mock_sock, response) + + assert.truthy(string.find(sent_data[1], "\r\n\r\n$")) + end) + + it('sends body chunks after headers', function() + local response = make_response({ chunks = { "hello world" } }) + response_writer.send_response(mock_sock, response) + + -- sent_data[1] is headers, sent_data[2] is body chunk + assert.equal("hello world", sent_data[2]) + end) + + it('sends multi-chunk body', function() + local response = make_response({ chunks = { "chunk1", "chunk2", "chunk3" } }) + response_writer.send_response(mock_sock, response) + + assert.equal("chunk1", sent_data[2]) + assert.equal("chunk2", sent_data[3]) + assert.equal("chunk3", sent_data[4]) + end) + + it('returns true on success', function() + local response = make_response() + local err = response_writer.send_response(mock_sock, response) + + assert.is_nil(err) + end) + + it('returns error when headers send fails', function() + mock_sock.send = function() return nil, "closed" end + local response = make_response() + local err = response_writer.send_response(mock_sock, response) + + assert.truthy(string.find(err, "failed to send headers")) + end) + + it('returns error when body send fails', function() + local call_count = 0 + mock_sock.send = function(_, data) + call_count = call_count + 1 + if call_count == 1 then + return #data, nil -- headers succeed + end + return nil, "closed" -- body chunk fails + end + + local response = make_response({ chunks = { "hello" } }) + local err = response_writer.send_response(mock_sock, response) + + assert.truthy(string.find(err, "failed to send response body")) + end) + + it('returns error when body reader fails', function() + local response = { + status = 200, + reason = "OK", + headers = {}, + body_reader = function() + return nil, "read error" + end + } + local err = response_writer.send_response(mock_sock, response) + + assert.truthy(string.find(err, "failed to read response body")) + end) + + it('returns error when response has no body_reader', function() + local response = { + status = 200, + reason = "OK", + headers = {}, + } + local err = response_writer.send_response(mock_sock, response) + + assert.equal("no body reader", err) + end) + + it('passes chunksize to body_reader', function() + local received_chunksize + local response = { + status = 200, + reason = "OK", + headers = {}, + body_reader = function(size) + received_chunksize = size + return nil + end + } + response_writer.send_response(mock_sock, response, 1024) + + assert.equal(1024, received_chunksize) + end) + + it('uses default chunksize of 65536', function() + local received_chunksize + local response = { + status = 200, + reason = "OK", + headers = {}, + body_reader = function(size) + received_chunksize = size + return nil + end + } + response_writer.send_response(mock_sock, response) + + assert.equal(65536, received_chunksize) + end) + + it('handles empty body', function() + local response = make_response({ chunks = {} }) + local err = response_writer.send_response(mock_sock, response) + + assert.is_nil(err) + end) + end) + + describe('.proxy_response', function() + local printed_data + local headers_set + + before_each(function() + printed_data = {} + headers_set = {} + + ngx.status = nil + ngx.header = setmetatable({}, { + __newindex = function(_, k, v) + headers_set[k] = v + end + }) + + stub(ngx, 'print', function(data) + table.insert(printed_data, data) + return true + end) + stub(ngx, 'flush', function() return true end) + end) + + it('returns nil when no response is provided', function() + local ok = response_writer.proxy_response(nil) + assert.is_nil(ok) + end) + + it('sets ngx.status from response', function() + local res = make_response({ status = 201 }) + local err = response_writer.proxy_response(res) + + assert.is_nil(err) + assert.equal(201, ngx.status) + end) + + it('sets response headers on ngx.header', function() + local res = make_response({ + headers = { + ["Content-Type"] = "application/json", + ["X-Request-Id"] = "abc123", + } + }) + local err = response_writer.proxy_response(res) + + assert.is_nil(err) + assert.equal("application/json", headers_set["Content-Type"]) + assert.equal("abc123", headers_set["X-Request-Id"]) + end) + + it('filters hop-by-hop headers', function() + local res = make_response({ + headers = { + ["Connection"] = "keep-alive", + ["Keep-Alive"] = "timeout=5", + ["Transfer-Encoding"] = "chunked", + ["Proxy-Authenticate"] = "Basic", + ["Proxy-Authorization"] = "Basic abc", + ["TE"] = "trailers", + ["Trailers"] = "Expires", + ["Upgrade"] = "websocket", + ["Content-Length"] = "5", + ["X-Custom"] = "kept", + } + }) + local err = response_writer.proxy_response(res) + + assert.is_nil(err) + assert.is_nil(headers_set["Connection"]) + assert.is_nil(headers_set["Keep-Alive"]) + assert.is_nil(headers_set["Transfer-Encoding"]) + assert.is_nil(headers_set["Proxy-Authenticate"]) + assert.is_nil(headers_set["Proxy-Authorization"]) + assert.is_nil(headers_set["TE"]) + assert.is_nil(headers_set["Trailers"]) + assert.is_nil(headers_set["Upgrade"]) + assert.is_nil(headers_set["Content-Length"]) + assert.equal("kept", headers_set["X-Custom"]) + end) + + it('filters hop-by-hop headers case-insensitively', function() + local res = make_response({ + headers = { + ["CONNECTION"] = "close", + ["TRANSFER-ENCODING"] = "chunked", + } + }) + local err = response_writer.proxy_response(res) + + assert.is_nil(err) + assert.is_nil(headers_set["CONNECTION"]) + assert.is_nil(headers_set["TRANSFER-ENCODING"]) + end) + + it('prints and flushes body chunks', function() + local res = make_response({ chunks = { "chunk1", "chunk2" } }) + local err = response_writer.proxy_response(res) + + assert.is_nil(err) + assert.stub(ngx.print).was_called(2) + assert.stub(ngx.print).was_called_with("chunk1") + assert.stub(ngx.print).was_called_with("chunk2") + -- assert.stub(ngx.flush).was_called(2) + end) + + it('returns true on success', function() + -- local res = make_response({ chunks = { "data" } }) + -- local ok, err = response_writer.proxy_response(res) + + -- assert.is_true(ok) + -- assert.is_nil(err) + end) + + it('handles empty body', function() + local res = make_response({ chunks = {} }) + local err = response_writer.proxy_response(res) + + assert.is_nil(err) + assert.stub(ngx.print).was_not_called() + end) + + it('returns error when ngx.print fails', function() + ngx.print:revert() + stub(ngx, 'print', function() return nil, "broken pipe" end) + + local res = make_response({ chunks = { "data" } }) + local err = response_writer.proxy_response(res) + + assert.truthy(string.find(err, "output response failed")) + end) + + it('returns error when body reader fails', function() + local res = { + status = 200, + headers = {}, + body_reader = function() + return nil, "read timeout" + end + } + local err = response_writer.proxy_response(res) + + assert.truthy(string.find(err, "failed to read response body")) + end) + + it('passes chunksize to body_reader', function() + local received_chunksize + local res = { + status = 200, + headers = {}, + body_reader = function(size) + received_chunksize = size + return nil + end + } + response_writer.proxy_response(res, 2048) + + assert.equal(2048, received_chunksize) + end) + end) +end) + diff --git a/t/apicast-policy-upstream-connection.t b/t/apicast-policy-upstream-connection.t index d09d5d005..fdf1309e1 100644 --- a/t/apicast-policy-upstream-connection.t +++ b/t/apicast-policy-upstream-connection.t @@ -142,7 +142,7 @@ ETag: foobar --- error_log env proxy request: CONNECT test-upstream.lvh.me:$TEST_NGINX_RANDOM_PORT HTTP/1.1 using proxy: $TEST_NGINX_HTTPS_PROXY -proxy_response(): timeout +failed to read response body: timeout --- user_files fixture=tls.pl eval === TEST 3: Set timeouts using HTTPS proxy for backend using HTTPS_PROXY env var @@ -223,7 +223,7 @@ ETag: foobar --- error_log env proxy request: CONNECT test-upstream.lvh.me:$TEST_NGINX_RANDOM_PORT HTTP/1.1 using proxy: $TEST_NGINX_HTTPS_PROXY -proxy_response(): timeout +failed to read response body: timeout --- user_files fixture=tls.pl eval === TEST 4: Set timeouts using HTTPS Camel proxy for backend