Skip to content

Commit 146f210

Browse files
authored
Add decoding to Ch.stream/4 (#277)
* add decoding to Ch.stream/4 * more info in changelog * note CSVWithNames and other format are not affected * fix formatting * cleanup * cleanup tests * assume 0.5.7 is released * update changelog * cleanup * shorter * shorter
1 parent e87a8f0 commit 146f210

File tree

8 files changed

+978
-232
lines changed

8 files changed

+978
-232
lines changed

CHANGELOG.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,49 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
- added **automatic decoding** to `Ch.stream/4` when using `RowBinaryWithNamesAndTypes` format: https://github.com/plausible/ch/pull/277.
6+
7+
Previously, this function returned raw bytes.
8+
9+
To **restore the previous behavior** (raw bytes/no automatic decoding), pass `decode: false` in the options (**fourth** argument).
10+
11+
**Example of required change to preserve the previous behavior**
12+
13+
```elixir
14+
# before, no decoding by default
15+
DBConnection.run(pool, fn conn ->
16+
conn
17+
|> Ch.stream("select number from numbers(10)")
18+
|> Enum.into([])
19+
end)
20+
21+
# after, to keep the same behaviour add `decode: false` option
22+
DBConnection.run(pool, fn conn ->
23+
conn
24+
|> Ch.stream("select number from numbers(10)", _params = %{}, decode: false)
25+
|> Enum.into([])
26+
end)
27+
```
28+
29+
Queries using other explicit formats like `CSVWithNames` are **unaffected** and can remain as they are.
30+
31+
**Examples of unaffected queries**
32+
33+
```elixir
34+
DBConnection.run(pool, fn conn ->
35+
conn
36+
|> Ch.stream("select number from numbers(10) format CSVWithNames")
37+
|> Enum.into([])
38+
end)
39+
40+
DBConnection.run(pool, fn conn ->
41+
conn
42+
|> Ch.stream("select number from numbers(10)", _params = %{}, format: "CSVWithNames")
43+
|> Enum.into([])
44+
end)
45+
```
46+
347
## 0.5.7 (2025-11-26)
448

549
- fix type decoding for strings containing newlines https://github.com/plausible/ch/pull/278

bench/stream.exs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,24 @@ Benchee.run(
3232
fn conn ->
3333
conn
3434
|> Ch.stream(
35-
"SELECT number FROM system.numbers_mt LIMIT {limit:UInt64} FORMAT RowBinary",
36-
%{"limit" => limit}
35+
"SELECT number FROM system.numbers_mt LIMIT {limit:UInt64}",
36+
%{"limit" => limit},
37+
decode: false
3738
)
3839
|> Stream.run()
3940
end,
4041
timeout: :infinity
4142
)
4243
end,
43-
"Ch.stream with manual RowBinary decoding" => fn %{pool: pool, limit: limit} ->
44+
"Ch.stream with decoding" => fn %{pool: pool, limit: limit} ->
4445
DBConnection.run(
4546
pool,
4647
fn conn ->
4748
conn
4849
|> Ch.stream(
49-
"SELECT number FROM system.numbers_mt LIMIT {limit:UInt64} FORMAT RowBinary",
50+
"SELECT number FROM system.numbers_mt LIMIT {limit:UInt64}",
5051
%{"limit" => limit}
5152
)
52-
|> Stream.each(fn %Ch.Result{data: data} ->
53-
data |> IO.iodata_to_binary() |> Ch.RowBinary.decode_rows([:u64])
54-
end)
5553
|> Stream.run()
5654
end,
5755
timeout: :infinity

lib/ch/connection.ex

Lines changed: 101 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule Ch.Connection do
22
@moduledoc false
33
use DBConnection
44
require Logger
5-
alias Ch.{Error, Query, Result}
5+
alias Ch.{Error, Query, Result, RowBinary}
66
alias Mint.HTTP1, as: HTTP
77

88
@user_agent "ch/" <> Mix.Project.config()[:version]
@@ -104,31 +104,42 @@ defmodule Ch.Connection do
104104
@impl true
105105
def handle_declare(query, params, opts, conn) do
106106
conn = maybe_reconnect(conn)
107-
%Query{command: command} = query
107+
%Query{command: command, decode: decode} = query
108108
{query_params, extra_headers, body} = params
109109

110110
path = path(conn, query_params, opts)
111111
headers = headers(conn, extra_headers, opts)
112+
timeout = timeout(conn, opts)
112113

113114
with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body),
114-
{:ok, conn} <- eat_ok_status_and_headers(conn, timeout(conn, opts)) do
115-
{:ok, query, %Result{command: command}, conn}
115+
{:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do
116+
result = %Result{
117+
command: command,
118+
columns: columns,
119+
rows: [],
120+
num_rows: 0,
121+
headers: headers,
122+
data: []
123+
}
124+
125+
{:ok, query, result, {conn, reader}}
116126
end
117127
end
118128

119-
@spec eat_ok_status_and_headers(conn, timeout) ::
120-
{:ok, %{conn: conn, buffer: [Mint.Types.response()]}}
121-
| {:error, Ch.Error.t(), conn}
122-
| {:disconnect, Mint.Types.error(), conn}
123-
defp eat_ok_status_and_headers(conn, timeout) do
129+
defp recv_declare(conn, decode, timeout) do
130+
acc = %{decode: decode, step: :status, buffer: [], headers: []}
131+
recv_declare_continue(conn, acc, timeout)
132+
end
133+
134+
defp recv_declare_continue(conn, acc, timeout) do
124135
case HTTP.recv(conn, 0, timeout) do
125136
{:ok, conn, responses} ->
126-
case eat_ok_status_and_headers(responses) do
127-
{:ok, data} ->
128-
{:ok, %{conn: conn, buffer: data}}
137+
case handle_recv_declare(responses, acc) do
138+
{:ok, columns, headers, reader} ->
139+
{:ok, conn, columns, headers, reader}
129140

130-
:more ->
131-
eat_ok_status_and_headers(conn, timeout)
141+
{:more, acc} ->
142+
recv_declare_continue(conn, acc, timeout)
132143

133144
:error ->
134145
all_responses_result =
@@ -155,49 +166,102 @@ defmodule Ch.Connection do
155166
end
156167
end
157168

158-
defp eat_ok_status_and_headers([{:status, _ref, 200} | rest]) do
159-
eat_ok_status_and_headers(rest)
169+
defp handle_recv_declare([{:status, _ref, status} | responses], %{step: :status} = acc) do
170+
case status do
171+
200 -> handle_recv_declare(responses, %{acc | step: :headers})
172+
_other -> :error
173+
end
174+
end
175+
176+
defp handle_recv_declare([{:headers, _ref, headers} | responses], %{step: :headers} = acc) do
177+
with %{decode: true} <- acc,
178+
"RowBinaryWithNamesAndTypes" <- get_header(headers, "x-clickhouse-format") do
179+
handle_recv_declare(responses, %{acc | headers: headers, step: :columns})
180+
else
181+
_ ->
182+
reader = %{decode: false, responses: responses}
183+
{:ok, _columns = nil, headers, reader}
184+
end
160185
end
161186

162-
defp eat_ok_status_and_headers([{:status, _ref, _status} | _rest]), do: :error
163-
defp eat_ok_status_and_headers([{:headers, _ref, _headers} | data]), do: {:ok, data}
164-
defp eat_ok_status_and_headers([]), do: :more
187+
defp handle_recv_declare([{:data, _ref, data} | responses], %{step: :columns} = acc) do
188+
buffer = maybe_concat_buffer(acc.buffer, data)
189+
190+
case RowBinary.decode_header(buffer) do
191+
{:ok, names, types, buffer} ->
192+
reader = %{buffer: buffer, types: types, state: nil, responses: responses}
193+
{:ok, names, acc.headers, reader}
194+
195+
:more ->
196+
handle_recv_declare(responses, %{acc | buffer: buffer})
197+
end
198+
end
199+
200+
defp handle_recv_declare([], acc), do: {:more, acc}
201+
202+
@compile inline: [maybe_concat_buffer: 2]
203+
defp maybe_concat_buffer("", data), do: data
204+
defp maybe_concat_buffer(buffer, data) when is_binary(buffer), do: buffer <> data
205+
defp maybe_concat_buffer([], data), do: data
165206

166207
@impl true
167-
def handle_fetch(query, %Result{} = result, opts, %{conn: conn, buffer: buffer}) do
168-
case buffer do
169-
[] -> handle_fetch(query, result, opts, conn)
170-
_not_empty -> {halt_or_cont(buffer), %{result | data: extract_data(buffer)}, conn}
208+
def handle_fetch(query, %Result{} = result, opts, {conn, reader}) do
209+
case reader do
210+
%{responses: []} ->
211+
handle_fetch_recv(query, result, opts, conn, reader)
212+
213+
%{decode: false, responses: responses} ->
214+
case responses do
215+
[{:data, _ref, data} | responses] ->
216+
result = %Result{result | data: data}
217+
reader = %{reader | responses: responses}
218+
{:cont, result, {conn, reader}}
219+
220+
[{:done, _ref}] ->
221+
reader = %{reader | responses: []}
222+
{:halt, result, {conn, reader}}
223+
end
224+
225+
%{buffer: buffer, types: types, state: state, responses: responses} ->
226+
case responses do
227+
[{:data, _ref, data} | responses] ->
228+
buffer = maybe_concat_buffer(buffer, data)
229+
{rows, buffer, state} = RowBinary.decode_rows_continue(buffer, types, state)
230+
result = %Result{result | data: data, rows: rows, num_rows: length(rows)}
231+
reader = %{reader | buffer: buffer, state: state, responses: responses}
232+
{:cont, result, {conn, reader}}
233+
234+
[{:done, _ref}] ->
235+
reader = %{reader | responses: []}
236+
{:halt, result, {conn, reader}}
237+
end
171238
end
172239
end
173240

174-
def handle_fetch(_query, %Result{} = result, opts, conn) do
175-
case HTTP.recv(conn, 0, timeout(conn, opts)) do
241+
defp handle_fetch_recv(query, result, opts, conn, reader) do
242+
timeout = timeout(conn, opts)
243+
244+
case HTTP.recv(conn, 0, timeout) do
176245
{:ok, conn, responses} ->
177-
{halt_or_cont(responses), %{result | data: extract_data(responses)}, conn}
246+
reader = %{reader | responses: responses}
247+
handle_fetch(query, result, opts, {conn, reader})
178248

179249
{:error, conn, reason, _responses} ->
180250
{:disconnect, reason, conn}
181251
end
182252
end
183253

184-
defp halt_or_cont([{:done, _ref}]), do: :halt
185-
defp halt_or_cont([_ | rest]), do: halt_or_cont(rest)
186-
defp halt_or_cont([]), do: :cont
187-
188-
defp extract_data([{:data, _ref, data} | rest]), do: [data | extract_data(rest)]
189-
defp extract_data([] = empty), do: empty
190-
defp extract_data([{:done, _ref}]), do: []
191-
192254
@impl true
193-
def handle_deallocate(_query, %Result{} = result, _opts, conn) do
255+
def handle_deallocate(_query, %Result{} = result, _opts, {conn, _reader}) do
194256
case HTTP.open_request_count(conn) do
195257
0 ->
196-
# TODO data: [], anything else?
197258
{:ok, %{result | data: []}, conn}
198259

199260
1 ->
200-
{:disconnect, Error.exception("cannot stop stream before receiving full response"), conn}
261+
error =
262+
Error.exception("stopping stream before receiving full response by closing connection")
263+
264+
{:disconnect, error, conn}
201265
end
202266
end
203267

0 commit comments

Comments
 (0)