From a1eae2b093a53f730c08f14130ae4758ac290790 Mon Sep 17 00:00:00 2001 From: Fredrik Ekre Date: Sun, 31 Jan 2021 00:21:23 +0100 Subject: [PATCH] Some docstrings and examples. --- README.md | 51 +++++++++++++++++------ src/TeeStreams.jl | 100 +++++++++++++++++++++++++++++++--------------- test/runtests.jl | 66 +++++++++++++----------------- 3 files changed, 133 insertions(+), 84 deletions(-) diff --git a/README.md b/README.md index 01e8259..e1af82d 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,26 @@ Simplify writing to multiple streams at once. +## Usage + +```julia +tee = TeeStream(io::IO...) +``` + +Construct a tee stream by wrapping multiple writable IO objects. + +```julia +TeeStream(f::Function, io::IO...) do tee + # ... +end +``` + +Construct a tee stream by wrapping multiple writable IO objects and +call function `f` on the tee. Automatically calls `close` on the tee +before returning. + +`close` and `flush` on a tee stream closes/flushes all the wrapped streams. + ### Example: Compress with multiple encodings ```julia @@ -9,17 +29,19 @@ using TeeStreams, CodecZlib, CodecZstd function compress(file) open(file, "r") do src - teeopen(tee -> write(tee, src), - (GzipCompressorStream, file * ".gz", "w"), - (ZstdCompressorStream, file * ".zst", "w") - ) + TeeStream( + GzipCompressorStream(open(file * ".gz", "w")), + ZstdCompressorStream(open(file * ".zst", "w")) + ) do tee + write(tee, src) + end end end compress("Project.toml") ``` -### Example: Pass data to checksum function and to disk +### Example: Write data to checksum function and to disk ```julia using TeeStreams, SHA, SimpleBufferStream @@ -27,16 +49,19 @@ using TeeStreams, SHA, SimpleBufferStream function download_verify(url, expected_shasum) filename = split(url, '/')[end] buf = BufferStream() - dl_task = @async begin - teeopen(buf, (filename, "w")) do tee - write(tee, open(`curl -fsSL $url`)) + @sync begin + @async begin + TeeStream(buf, open(filename, "w")) do tee + write(tee, open(`curl -fsSL $url`)) + end + end + @async begin + shasum = bytes2hex(SHA.sha256(buf)) + if shasum != expected_shasum + error("something went wrong") + end end end - shasum = fetch(@async bytes2hex(SHA.sha256(buf))) - if shasum != expected_shasum - error("something went wrong") - end - wait(dl_task) end url = "https://julialang-s3.julialang.org/bin/linux/x64/1.5/julia-1.5.3-linux-x86_64.tar.gz" diff --git a/src/TeeStreams.jl b/src/TeeStreams.jl index 99f71c1..42c5e38 100644 --- a/src/TeeStreams.jl +++ b/src/TeeStreams.jl @@ -1,7 +1,23 @@ module TeeStreams -export TeeStream, teeopen, teeclose +export TeeStream +""" +```julia +tee = TeeStream(io::IO...) +``` +Construct a tee stream by wrapping multiple writable IO objects. + +```julia +TeeStream(f::Function, io::IO...) do tee + # ... +end +``` + +Construct a tee stream by wrapping multiple writable IO objects and +call function `f` on the tee. Automatically calls `close` on the tee +before returning. +""" struct TeeStream{T <: NTuple{<:Any, IO}} <: IO streams::T opened_idx::Union{<:NTuple{<:Any,Bool}, Nothing} @@ -11,11 +27,19 @@ struct TeeStream{T <: NTuple{<:Any, IO}} <: IO return tee end end + TeeStream(ios::IO...) = TeeStream{typeof(ios)}(ios) +function TeeStream(f::Function, ios::IO...) + tee = TeeStream(ios...) + try + f(tee) + finally + close(tee) + end +end # See https://docs.julialang.org/en/v1/base/io-network/#Base.unsafe_write function Base.unsafe_write(tee::TeeStream, p::Ptr{UInt8}, nb::UInt) - # check_writable(tee) @sync for s in tee.streams @async begin # TODO: Is it enough to rely on write locks on each s? @@ -34,44 +58,24 @@ function Base.write(tee::TeeStream, b::UInt8) return 1 end -maybe_open(io) = io -maybe_open(io::Tuple) = open(io...) -function teeopen(args::Union{IO, Tuple}...) - opened_idx = ntuple(i -> args[i] isa Tuple, length(args)) - streams = map(maybe_open, args) - return TeeStream{typeof(streams)}(streams, opened_idx) -end +""" + close(tee::TeeStream) -function teeopen(f::Function, args::Union{IO,Tuple}...) - tee = teeopen(args...) - try - f(tee) - finally - close(tee) - # teeclose(tee) - end -end +Close all streams wrapped in the tee stream. +""" +Base.close(tee::TeeStream) = foreach(close, tee.streams) -function teeclose(tee::TeeStream) - if tee.opened_idx === nothing - return - end - for i in 1:length(tee.streams) - tee.opened_idx[i] || continue - close(tee.streams[i]) - end -end +""" + flush(tee::TeeStream) -Base.close(tee::TeeStream) = foreach(close, tee.streams) -# function Base.close(tee::TeeStream) -# for s in tee.streams -# close(s) -# end -# end +Flush all streams wrapped in the tee stream. +""" Base.flush(tee::TeeStream) = foreach(flush, tee.streams) + Base.isreadable(tee::TeeStream) = false Base.isopen(tee::TeeStream) = all(isopen, tee.streams) Base.iswritable(tee::TeeStream) = all(iswritable, tee.streams) + # All streams do not define iswritable reliably so just try and throw if things doesn't work # function check_writable(tee::TeeStream) # if !(isopen(tee) && iswritable(tee)) @@ -85,5 +89,35 @@ function check_written(n, m) return nothing end + +# maybe_open(io) = io +# maybe_open(io::Tuple) = open(io...) +# function teeopen(args::Union{IO, Tuple}...) +# opened_idx = ntuple(i -> args[i] isa Tuple, length(args)) +# streams = map(maybe_open, args) +# return TeeStream{typeof(streams)}(streams, opened_idx) +# end + +# function teeopen(f::Function, args::Union{IO,Tuple}...) +# tee = teeopen(args...) +# try +# f(tee) +# finally +# close(tee) +# # teeclose(tee) +# end +# end + +# function teeclose(tee::TeeStream) +# if tee.opened_idx === nothing +# return +# end +# for i in 1:length(tee.streams) +# tee.opened_idx[i] || continue +# close(tee.streams[i]) +# end +# end + + end # module diff --git a/test/runtests.jl b/test/runtests.jl index fb1da3b..214cf43 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -15,16 +15,20 @@ function write_things(io) write(s, "hello, file\n"); close(s) open(io1 -> write(io, io1), f) # Read from external process (Base.Process) - write(io, open(`$(Base.julia_cmd()[1]) -e 'println("hello, process")'`)) + open(`$(Base.julia_cmd()[1]) -e 'println("hello, process")'`) do proc + write(io, proc) + end # print(ln) print(io, "hello, "); println(io, "print") return io end @testset "TeeStreams" begin + # write(::TeeStream, x) for different x and tee'd streams + correct = String(take!(write_things(IOBuffer()))) - # write(::TeeStream, x) for different x and tee'd streams + # tee = TeeStream(...) iob = IOBuffer() ioc = IOContext(IOBuffer()) f, iof = mktemp() @@ -43,34 +47,20 @@ end @test all(x -> x isa TaskFailedException, err.exceptions) end - mktempdir() do tmpd; f = joinpath(tmpd, "file") - # tee = teeopen() - ## with teeclose - iob = IOBuffer() - ioc = IOContext(IOBuffer()) - tee = teeopen(iob, ioc, (f, "w")) - write_things(tee) - teeclose(tee) - @test String(take!(iob)) == String(take!(ioc.io)) == read(f, String) == correct - ## with close - iob = IOBuffer() - ioc = IOContext(IOBuffer()) - tee = teeopen(iob, ioc, (f, "w")) - write_things(tee) - close(tee) - @test_throws ArgumentError String(take!(iob)) - @test_throws ArgumentError String(take!(ioc.io)) - @test read(f, String) == correct - - # teeopen() do tee - iob = IOBuffer() - ioc = IOContext(IOBuffer()) - teeopen(iob, ioc, (f, "w")) do tee + # TeeStream(...) do tee + mktempdir() do tmpd; + f1 = joinpath(tmpd, "file") + io1 = open(f1, "w") + f2 = joinpath(tmpd, "file2") + io2 = open(f2, "w") + @test isopen(io1) + @test isopen(io2) + TeeStream(io1, io2) do tee write_things(tee) end - @test_throws ArgumentError String(take!(iob)) - @test_throws ArgumentError String(take!(ioc.io)) - @test read(f, String) == correct + @test !isopen(io1) + @test !isopen(io2) + @test read(f1, String) == read(f2, String) == correct end # Test some integration with other packages @@ -100,18 +90,18 @@ end # v v # GzipCompressor ZstdCompressor - tee = teeopen( - (GzipCompressorStream, joinpath(tmpd, "julia.tar.gz"), "w"), - (ZstdCompressorStream, joinpath(tmpd, "julia.tar.zst"), "w"), + buffer_shasum = BufferStream() + buffer_tar = BufferStream() + decompressor = GzipDecompressorStream(buffer_tar) + tee = TeeStream(buffer_shasum, decompressor) + compressors = TeeStream( + GzipCompressorStream(open(joinpath(tmpd, "julia.tar.gz"), "w")), + ZstdCompressorStream(open(joinpath(tmpd, "julia.tar.zst"), "w")), ) - bs = BufferStream() - bs2 = BufferStream() - gzd = GzipDecompressorStream(bs2) - tee2 = teeopen(bs, gzd) @sync begin - tar_task = @async Tar.rewrite(bs2, tee) - dl_task = @async HTTP.get(url; response_stream=tee2) - sha_task = @async bytes2hex(SHA.sha256(bs)) + dl_task = @async HTTP.get(url; response_stream=tee) + sha_task = @async bytes2hex(SHA.sha256(buffer_shasum)) + tar_task = @async Tar.rewrite(buffer_tar, compressors) @test fetch(sha_task) == expected_shasum end end