diff --git a/Manifest.toml b/Manifest.toml index 37ac6ca3060927cdcc276964ba2086670795374f..4444f7757b32c2637a11b70b331defd66f35f3df 100644 --- a/Manifest.toml +++ b/Manifest.toml @@ -11,6 +11,14 @@ uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb" deps = ["Libdl"] uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +[[Outsource]] +deps = ["Distributed"] +git-tree-sha1 = "9303d4dd03e26e32e8ca0f87d39dfdefc7be27f2" +repo-rev = "master" +repo-url = "/home/stev47/stuff/Outsource" +uuid = "ce4b2b2b-baef-434e-9229-2c3161aca78b" +version = "0.1.0" + [[Random]] deps = ["Serialization"] uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" @@ -27,14 +35,14 @@ uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf" [[StaticArrays]] deps = ["LinearAlgebra", "Random", "Statistics"] -git-tree-sha1 = "5c06c0aeb81bef54aed4b3f446847905eb6cbda0" +git-tree-sha1 = "da4cf579416c81994afd6322365d00916c79b8ae" uuid = "90137ffa-7385-5640-81b9-e52037218182" -version = "0.12.3" +version = "0.12.5" [[StaticKernels]] -git-tree-sha1 = "d8d5f496e25ff848afc94da260223b9374dd06db" +git-tree-sha1 = "84a49458d75b4a64850a71b0bf364cd94ffd4aae" uuid = "4c63dfa8-a427-4548-bd2f-4c19e87a7dc7" -version = "0.5.0" +version = "0.5.1" [[Statistics]] deps = ["LinearAlgebra", "SparseArrays"] diff --git a/Project.toml b/Project.toml index 44131a3c5556528c8b9ec68c429e541a5ed273da..6fba6d1054c89b9644be0728aa99e8b6e527f957 100644 --- a/Project.toml +++ b/Project.toml @@ -6,6 +6,7 @@ version = "0.1.0" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +Outsource = "ce4b2b2b-baef-434e-9229-2c3161aca78b" StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" StaticKernels = "4c63dfa8-a427-4548-bd2f-4c19e87a7dc7" diff --git a/src/dualtvdd.jl b/src/dualtvdd.jl index 191469e50874414f6efe9c29b508bdb380f9a06c..6836fa1d137d8d7bad649acc262ea1485c7334af 100644 --- a/src/dualtvdd.jl +++ b/src/dualtvdd.jl @@ -1,4 +1,16 @@ -using Distributed: @everywhere, pmap +using Distributed: nworkers, workers +using Outsource: Connector, outsource + +#import Serialization +## We override SubArray serialization in order to preserve array data +## references. TODO: this should actually be julia-default! +#function Serialization.serialize( +# s::Serialization.AbstractSerializer, +# a::SubArray{T,N,A}) where {T,N,A<:Array} +# +# Serialization.serialize_any(s, a) +#end + struct DualTVDDAlgorithm{P,d} <: Algorithm{P} problem::P @@ -15,22 +27,23 @@ struct DualTVDDAlgorithm{P,d} <: Algorithm{P} ninner::Int "prob -> Algorithm(::Problem, ...)" subalg::Function - function DualTVDDAlgorithm(problem; M, overlap, parallel=true, σ=1/4, ninner=10, subalg=x->ProjectedGradient(x)) + function DualTVDDAlgorithm(problem; M, overlap, parallel=true, σ=parallel ? 1/4 : 1., ninner=10, subalg=x->ProjGradAlgorithm(x)) + if parallel == true && σ > 1/4 + @warn "parallel domain decomposition needs σ >= 1/4 for theoretical convergence" + end return new{typeof(problem), length(M)}(problem, M, overlap, parallel, σ, ninner, subalg) end end -struct DualTVDDState{A,d,V,SV,SAx,SC} +struct DualTVDDState{A,d,V,SAx,SC} algorithm::A "global variable" p::V - "local buffer" # TODO: get rid of this - q::Array{SV,d} "subdomain axes wrt global indices" subax::SAx - "context for subproblems" - subctx::Array{SC,d} + "connectors to subworkers" + cons::Array{SC,d} end function init(alg::DualTVDDAlgorithm{<:DualTVL1ROFOpProblem}) @@ -48,19 +61,24 @@ function init(alg::DualTVDDAlgorithm{<:DualTVL1ROFOpProblem}) # global dual variable p = zeros(SVector{d,eltype(g)}, size(g)) - # local buffer variables - q = [extend(zeros(SVector{d,eltype(g)}, length.(x)), StaticKernels.ExtensionNothing()) for x in subax] + # local dual variable + subp = [collect(reinterpret(Float64, zeros(SVector{d,eltype(g)}, prod(length.(x))))) for x in subax] # create subproblem contexts - li = LinearIndices(ax) - subprobs = [DualTVL1ROFOpProblem(subg[i], op_restrict(alg.problem.B, ax, subax[i]), subλ[i]) - for i in CartesianIndices(subax)] - - subalg = [alg.subalg(subprobs[i]) for i in CartesianIndices(subax)] - - subctx = [init(x) for x in subalg] + cids = chessboard_coloring(size(subax)) + cons = Array{Connector, d}(undef, size(subax)) + for (color, sidxs) in enumerate(cids) + for (i, sidx) in enumerate(sidxs) + sax = subax[sidx] + + subprob = DualTVL1ROFOpProblem(subg[sidx], op_restrict(alg.problem.B, ax, subax[sidx]), subλ[sidx]) + wf = subworker(alg, alg.subalg(subprob)) + wid = workers()[mod1(i, nworkers())] + cons[sidx] = outsource(wf, wid) + end + end - return DualTVDDState(alg, p, q, subax, subctx) + return DualTVDDState(alg, p, subax, cons) end function intersectin(a, b) @@ -71,8 +89,8 @@ function intersectin(a, b) end function chessboard_coloring(sz) - binli = LinearIndices((2, 2)) - coloring = [Int[] for _ in 1:4] + binli = LinearIndices(ntuple(_->2, length(sz))) + coloring = [Int[] for _ in 1:2^length(sz)] li = LinearIndices(sz) for I in CartesianIndices(sz) @@ -82,14 +100,25 @@ function chessboard_coloring(sz) return coloring end -function subrun!(subctx, maxiters) - #fetch(subctx) .= Ref(zero(eltype(fetch(subctx))) .+ 1) - display("uiae") - step!(subctx) - #for j in 1:maxiters - # step!(subctx) - #end - return subctx +function subworker(alg, subalg) + #fetch(st) .= reshape(reinterpret(SVector{d,eltype(g)}, initdata), size(g)) + + ninner = alg.ninner + return function(con) + subst = init(subalg) + while isopen(con) + # fetch new data + subg = take!(con) + subalg.problem.g .= subg + # run algorithm + for _ in 1:ninner + step!(subst) + end + # write result + subp = fetch(subst) + put!(con, subp) + end + end end function step!(ctx::DualTVDDState) @@ -99,57 +128,49 @@ function step!(ctx::DualTVDDState) ax = axes(ctx.p) overlap = ctx.algorithm.overlap - # call run! on each cell (this can be threaded) + + p_rem = copy(ctx.p) + p_don = zeros(eltype(ctx.p), size(ctx.p)) + + # subdomain loop (in coloring order) cids = chessboard_coloring(size(ctx.subax)) for (color, ids) in enumerate(cids) - - # prepare data g for subproblems for i in ids sax = ctx.subax[i] - li = LinearIndices(ctx.subax)[i] - sg = ctx.subctx[i].algorithm.problem.g # julia-bug workaround - sq = ctx.q[i] # julia-bug workaround - sg .= view(alg.problem.g, sax...) + # update remaining old contribution + view(p_rem, sax...) .-= + theta.(Ref(ax), Ref(sax), Ref(overlap), CartesianIndices(sax)) .* view(ctx.p, sax...) + + # data: g - Λ(p_don + p_rem) + sg = copy(view(alg.problem.g, sax...)) + sp = extend(similar(ctx.p, length.(sax)), StaticKernels.ExtensionNothing()) if alg.parallel - sq .= (1 .- theta.(Ref(ax), Ref(sax), Ref(overlap), CartesianIndices(sax))) .* view(ctx.p, sax...) + sp .= (1. .- theta.(Ref(ax), Ref(sax), Ref(overlap), CartesianIndices(sax))) .* view(ctx.p, sax...) else - sq .= Ref(zero(eltype(sq))) - # contributions from previous domains - for (pcolor, pids) in enumerate(cids) - # TODO: only adjacent ones needed - for lj in pids - saxj = ctx.subax[lj] - pids, pidsi, pidsj = intersectin(CartesianIndices(sax), CartesianIndices(saxj)) - if pcolor < color - sq[pidsi] .+= view(ctx.subctx[lj].p, pidsj) - elseif pcolor > color - sq[pidsi] .+= theta.(Ref(ax), Ref(saxj), Ref(overlap), pids) .* view(ctx.p, pids) - end - end - end + sp .= view(p_don, sax...) .+ view(p_rem, sax...) end - @inline kfΛ(pw) = @inbounds sg[pw.position] + divergence(pw) - kΛ = Kernel{ntuple(_->-1:1, d)}(kfΛ) + @inline kf(spw) = @inbounds sg[spw.position] + divergence(spw) + kern = Kernel{ntuple(_->-1:1, d)}(kf) + map!(kern, sg, sp) - map!(kΛ, sg, sq) + # start computation + put!(ctx.cons[i], sg) end + for i in ids + sax = ctx.subax[i] - # actually run subalgorithms - ctx.subctx[ids] .= map(subrun!, deepcopy(ctx.subctx[ids]), [1 for _ in ids]) - #ctx.subctx[ids] .= map(subrun!, deepcopy(ctx.subctx[ids]), [alg.ninner for _ in ids]) - - #for i in ids - # subrun!(ctx.subctx[i]) - #end + sp = take!(ctx.cons[i]) + # reshape(reinterpret(SVector{d,eltype(alg.problem.g)}, ctx.subp[i]), size(ctx.subalg[i].problem.g)) + # weighted update for new contribution + view(p_don, sax...) .+= + (1 .- σ) .* theta.(Ref(ax), Ref(sax), Ref(overlap), CartesianIndices(sax)) .* view(ctx.p, sax...) .+ σ .* sp + end end - # aggregate (not thread-safe!) - ctx.p .*= 1 - σ - for (i, sax) in pairs(ctx.subax) - view(ctx.p, sax...) .+= σ .* ctx.subctx[i].p - end + ctx.p .= p_don + return ctx end diff --git a/test/runtests.jl b/test/runtests.jl index e2debb324629c3ee87f20ee63d8d7e694bf11aa9..3a0b97ea346147bcce6359a8ff14fd8076eaf61c 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,8 +1,8 @@ using Test, BenchmarkTools using LinearAlgebra using DualTVDD: - DualTVL1ROFOpProblem, ProjGradAlgorithm, ChambolleAlgorithm, - init, step!, fetch, recover_u + DualTVL1ROFOpProblem, ProjGradAlgorithm, ChambolleAlgorithm, DualTVDDAlgorithm, + init, step!, fetch_u @testset "B = I" begin g = Float64[0 2; 1 0] @@ -10,11 +10,11 @@ using DualTVDD: @testset for alg in (ProjGradAlgorithm(prob, τ=1/8), ChambolleAlgorithm(prob)) ctx = init(alg) - @test 0 == @ballocated step!($ctx) + #@test 0 == @ballocated step!($ctx) for i in 1:100 step!(ctx) end - u = recover_u(fetch(ctx), ctx.algorithm.problem) + u = fetch_u(ctx) @test u ≈ g end end @@ -26,11 +26,34 @@ end @testset for alg in (ProjGradAlgorithm(prob, τ=1/8), ChambolleAlgorithm(prob)) ctx = init(alg) - @test 0 == @ballocated step!($ctx) + #@test 0 == @ballocated step!($ctx) for i in 1:100 step!(ctx) end - u = recover_u(fetch(ctx), ctx.algorithm.problem) + u = fetch_u(ctx) @test vec(u) ≈ B * vec(g) end end + +@testset "DualTVDDAlgorithm" begin + n = 5 + ninner = 100 + g = rand(n, n) + B = Diagonal(rand(n^2)) + # big λ is ok, since we test for inter-subdomain communication + prob = DualTVL1ROFOpProblem(g, B, 100.) + + algref = ChambolleAlgorithm(prob) + alg = DualTVDDAlgorithm(prob; M=(2,2), overlap=(2,2), ninner, parallel = false, σ = 0.25, subalg = x -> ChambolleAlgorithm(x)) + + stref = init(algref) + st = init(alg) + #@test 0 == @ballocated step!($ctx) + for i in 1:1000*ninner + step!(stref) + end + for i in 1:1000 + step!(st) + end + @test fetch_u(st) ≈ fetch_u(stref) +end