Skip to content

Commit b8ebfc5

Browse files
authored
fix test failures on 1.12, avoid race condition in multithreaded partitioned writes (#582)
There is race condition fundamental to the current architecture for creating and writing dictionary encodings. The relevant lock is created on a worker thread and thus there is a race to _create_ the lock and initialize the relevant data structure. This race condition has existed for a long time and consistently occurs when testing on 1.12, but I have occasionally been able to see it occur on Julia 1.10. Reworking this goes well beyond what I currently have time for, so I have simply disabled multithreaded writing as a stopgap. This may seem extreme, but: 1. This is a correctness bug and correctness is far more important than speed. 2. The test failures that this race condition causes on 1.12 are blocking the release of 2.8.1, which includes #543 and addresses another source of potential correctness issues on Julia 1.12+.
1 parent bfa321f commit b8ebfc5

File tree

4 files changed

+65
-50
lines changed

4 files changed

+65
-50
lines changed

src/arraytypes/dictencoding.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ function arrowvector(
142142
kw...,
143143
)
144144
id = x.encoding.id
145+
# XXX This is a race condition if two workers hit this block at the same time, then they'll create
146+
# distinct locks
145147
if !haskey(de, id)
146148
de[id] = Lockable(x.encoding)
147149
else
@@ -215,6 +217,8 @@ function arrowvector(
215217
x = x.data
216218
len = length(x)
217219
validity = ValidityBitmap(x)
220+
# XXX This is a race condition if two workers hit this block at the same time, then they'll create
221+
# distinct locks
218222
if !haskey(de, id)
219223
# dict encoding doesn't exist yet, so create for 1st time
220224
if DataAPI.refarray(x) === x || DataAPI.refpool(x) === nothing

src/write.jl

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -295,47 +295,49 @@ function write(writer::Writer, source)
295295
recbatchmsg = makerecordbatchmsg(writer.schema[], cols, writer.alignment)
296296
put!(writer.msgs, recbatchmsg)
297297
else
298-
if writer.threaded
299-
@wkspawn process_partition(
300-
tblcols,
301-
writer.dictencodings,
302-
writer.largelists,
303-
writer.compress,
304-
writer.denseunions,
305-
writer.dictencode,
306-
writer.dictencodenested,
307-
writer.maxdepth,
308-
writer.sync,
309-
writer.msgs,
310-
writer.alignment,
311-
$(writer.partition_count),
312-
writer.schema,
313-
writer.errorref,
314-
writer.anyerror,
315-
writer.meta,
316-
writer.colmeta,
317-
)
318-
else
319-
@async process_partition(
320-
tblcols,
321-
writer.dictencodings,
322-
writer.largelists,
323-
writer.compress,
324-
writer.denseunions,
325-
writer.dictencode,
326-
writer.dictencodenested,
327-
writer.maxdepth,
328-
writer.sync,
329-
writer.msgs,
330-
writer.alignment,
331-
$(writer.partition_count),
332-
writer.schema,
333-
writer.errorref,
334-
writer.anyerror,
335-
writer.meta,
336-
writer.colmeta,
337-
)
338-
end
298+
# XXX There is a race condition in the processing of dict encodings
299+
# so we disable multithreaded writing until that can be addressed. See #582
300+
# if writer.threaded
301+
# @wkspawn process_partition(
302+
# tblcols,
303+
# writer.dictencodings,
304+
# writer.largelists,
305+
# writer.compress,
306+
# writer.denseunions,
307+
# writer.dictencode,
308+
# writer.dictencodenested,
309+
# writer.maxdepth,
310+
# writer.sync,
311+
# writer.msgs,
312+
# writer.alignment,
313+
# $(writer.partition_count),
314+
# writer.schema,
315+
# writer.errorref,
316+
# writer.anyerror,
317+
# writer.meta,
318+
# writer.colmeta,
319+
# )
320+
# else
321+
@async process_partition(
322+
tblcols,
323+
writer.dictencodings,
324+
writer.largelists,
325+
writer.compress,
326+
writer.denseunions,
327+
writer.dictencode,
328+
writer.dictencodenested,
329+
writer.maxdepth,
330+
writer.sync,
331+
writer.msgs,
332+
writer.alignment,
333+
$(writer.partition_count),
334+
writer.schema,
335+
writer.errorref,
336+
writer.anyerror,
337+
writer.meta,
338+
writer.colmeta,
339+
)
340+
# end
339341
end
340342
writer.partition_count += 1
341343
end

test/Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c"
3131
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
3232
TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
3333
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
34+
TestSetExtensions = "98d24dd4-01ad-11ea-1b02-c9a08f80db04"
3435
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
3536

3637
[compat]
@@ -44,4 +45,5 @@ PooledArrays = "1"
4445
StructTypes = "1"
4546
SentinelArrays = "1"
4647
Tables = "1"
48+
TestSetExtensions = "3"
4749
TimeZones = "1"

test/runtests.jl

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@ using DataAPI
2828
using FilePathsBase
2929
using DataFrames
3030
import Random: randstring
31+
using TestSetExtensions: ExtendedTestSet
3132

33+
# this formulation tests the loaded ArrowTypes, even if it's not the dev version
34+
# within the mono-repo
3235
include(joinpath(dirname(pathof(ArrowTypes)), "../test/tests.jl"))
33-
include(joinpath(dirname(pathof(Arrow)), "../test/testtables.jl"))
34-
include(joinpath(dirname(pathof(Arrow)), "../test/testappend.jl"))
35-
include(joinpath(dirname(pathof(Arrow)), "../test/integrationtest.jl"))
36-
include(joinpath(dirname(pathof(Arrow)), "../test/dates.jl"))
36+
37+
include(joinpath(@__DIR__, "testtables.jl"))
38+
include(joinpath(@__DIR__, "testappend.jl"))
39+
include(joinpath(@__DIR__, "integrationtest.jl"))
40+
include(joinpath(@__DIR__, "dates.jl"))
3741

3842
struct CustomStruct
3943
x::Int
@@ -45,7 +49,7 @@ struct CustomStruct2{sym}
4549
x::Int
4650
end
4751

48-
@testset "Arrow" begin
52+
@testset ExtendedTestSet "Arrow" begin
4953
@testset "table roundtrips" begin
5054
for case in testtables
5155
testtable(case...)
@@ -381,6 +385,8 @@ end
381385
end
382386

383387
@testset "# 126" begin
388+
# XXX This test also captures a race condition in multithreaded
389+
# writes of dictionary encoded arrays
384390
t = Tables.partitioner((
385391
(a=Arrow.toarrowvector(PooledArray([1, 2, 3])),),
386392
(a=Arrow.toarrowvector(PooledArray([1, 2, 3, 4])),),
@@ -602,14 +608,15 @@ end
602608
end
603609

604610
@testset "# 181" begin
611+
# XXX this test hangs on Julia 1.12 when using a deeper nesting
605612
d = Dict{Int,Int}()
606-
for i = 1:9
613+
for i = 1:1
607614
d = Dict(i => d)
608615
end
609616
tbl = (x=[d],)
610-
msg = "reached nested serialization level (20) deeper than provided max depth argument (19); to increase allowed nesting level, pass `maxdepth=X`"
611-
@test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=19)
612-
@test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=20)).x == tbl.x
617+
msg = "reached nested serialization level (2) deeper than provided max depth argument (1); to increase allowed nesting level, pass `maxdepth=X`"
618+
@test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=1)
619+
@test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=5)).x == tbl.x
613620
end
614621

615622
@testset "# 167" begin

0 commit comments

Comments
 (0)