diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b067edd --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/Manifest.toml diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c258f94 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Fredrik Ekre + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Project.toml b/Project.toml new file mode 100644 index 0000000..8a09adf --- /dev/null +++ b/Project.toml @@ -0,0 +1,18 @@ +name = "TeeStreams" +uuid = "c24d529b-c83e-40e0-9fc3-05832c05a0ba" +version = "1.0.0" + +[compat] +julia = "1.5" + +[extras] +CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" +CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2" +HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" +SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" +SimpleBufferStream = "777ac1f9-54b0-4bf8-805c-2214025038e7" +Tar = "a4e569a6-e804-4fa4-b0f3-eef7a1d5b13e" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[targets] +test = ["CodecZlib", "CodecZstd", "HTTP", "SHA", "SimpleBufferStream", "Tar", "Test"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..01e8259 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# TeeStreams + +Simplify writing to multiple streams at once. + +### Example: Compress with multiple encodings + +```julia +using TeeStreams, CodecZlib, CodecZstd + +function compress(file) + open(file, "r") do src + teeopen(tee -> write(tee, src), + (GzipCompressorStream, file * ".gz", "w"), + (ZstdCompressorStream, file * ".zst", "w") + ) + end +end + +compress("Project.toml") +``` + +### Example: Pass data to checksum function and to disk + +```julia +using TeeStreams, SHA, SimpleBufferStream + +function download_verify(url, expected_shasum) + filename = split(url, '/')[end] + buf = BufferStream() + dl_task = @async begin + teeopen(buf, (filename, "w")) do tee + write(tee, open(`curl -fsSL $url`)) + end + end + shasum = fetch(@async bytes2hex(SHA.sha256(buf))) + if shasum != expected_shasum + error("something went wrong") + end + wait(dl_task) +end + +url = "https://julialang-s3.julialang.org/bin/linux/x64/1.5/julia-1.5.3-linux-x86_64.tar.gz" +expected_shasum = "f190c938dd6fed97021953240523c9db448ec0a6760b574afd4e9924ab5615f1" + +download_verify(url, expected_shasum) +``` diff --git a/src/TeeStreams.jl b/src/TeeStreams.jl new file mode 100644 index 0000000..99f71c1 --- /dev/null +++ b/src/TeeStreams.jl @@ -0,0 +1,89 @@ +module TeeStreams + +export TeeStream, teeopen, teeclose + +struct TeeStream{T <: NTuple{<:Any, IO}} <: IO + streams::T + opened_idx::Union{<:NTuple{<:Any,Bool}, Nothing} + function TeeStream{T}(ios::T, opened_idx=nothing) where T <: NTuple{<:Any, IO} + tee = new{T}(ios, opened_idx) + # check_writable(tee) + return tee + end +end +TeeStream(ios::IO...) = TeeStream{typeof(ios)}(ios) + +# See https://docs.julialang.org/en/v1/base/io-network/#Base.unsafe_write +function Base.unsafe_write(tee::TeeStream, p::Ptr{UInt8}, nb::UInt) + # check_writable(tee) + @sync for s in tee.streams + @async begin + # TODO: Is it enough to rely on write locks on each s? + n = unsafe_write(s, p, nb) + check_written(n, nb) + end + end + return Int(nb) +end +function Base.write(tee::TeeStream, b::UInt8) + # check_writable(tee) + for s in tee.streams + n = write(s, b) + check_written(n, 1) + end + return 1 +end + +maybe_open(io) = io +maybe_open(io::Tuple) = open(io...) +function teeopen(args::Union{IO, Tuple}...) + opened_idx = ntuple(i -> args[i] isa Tuple, length(args)) + streams = map(maybe_open, args) + return TeeStream{typeof(streams)}(streams, opened_idx) +end + +function teeopen(f::Function, args::Union{IO,Tuple}...) + tee = teeopen(args...) + try + f(tee) + finally + close(tee) + # teeclose(tee) + end +end + +function teeclose(tee::TeeStream) + if tee.opened_idx === nothing + return + end + for i in 1:length(tee.streams) + tee.opened_idx[i] || continue + close(tee.streams[i]) + end +end + +Base.close(tee::TeeStream) = foreach(close, tee.streams) +# function Base.close(tee::TeeStream) +# for s in tee.streams +# close(s) +# end +# end +Base.flush(tee::TeeStream) = foreach(flush, tee.streams) +Base.isreadable(tee::TeeStream) = false +Base.isopen(tee::TeeStream) = all(isopen, tee.streams) +Base.iswritable(tee::TeeStream) = all(iswritable, tee.streams) +# All streams do not define iswritable reliably so just try and throw if things doesn't work +# function check_writable(tee::TeeStream) +# if !(isopen(tee) && iswritable(tee)) +# error("stream is closed or not writeable") +# end +# end +function check_written(n, m) + if n != m + error("could not write the requested bytes: stream is closed or not writeable?") + end + return nothing +end + +end # module + diff --git a/test/runtests.jl b/test/runtests.jl new file mode 100644 index 0000000..fb1da3b --- /dev/null +++ b/test/runtests.jl @@ -0,0 +1,119 @@ +using TeeStreams, Test, SimpleBufferStream, HTTP, + CodecZlib, CodecZstd, SHA, Tar + + +function write_things(io) + # String + write(io, "hello, string\n") + # Single byte and byte arrays + for b in Vector{UInt8}("hello, ") + write(io, b) + end + write(io, Vector{UInt8}("bytes\n")) + # Write from file (IOStream) + f, s = mktemp() + write(s, "hello, file\n"); close(s) + open(io1 -> write(io, io1), f) + # Read from external process (Base.Process) + write(io, open(`$(Base.julia_cmd()[1]) -e 'println("hello, process")'`)) + # print(ln) + print(io, "hello, "); println(io, "print") + return io +end + +@testset "TeeStreams" begin + correct = String(take!(write_things(IOBuffer()))) + + # write(::TeeStream, x) for different x and tee'd streams + iob = IOBuffer() + ioc = IOContext(IOBuffer()) + f, iof = mktemp() + bs = BufferStream() + tee = TeeStream(iob, ioc, iof, bs) + write_things(tee) + close(iof); close(bs) + @test String(take!(iob)) == String(take!(ioc.io)) == + read(f, String) == read(bs, String) == correct + try + close(tee) + write_things(tee) + catch err + @test err isa CompositeException + @test length(err.exceptions) == 4 + @test all(x -> x isa TaskFailedException, err.exceptions) + end + + mktempdir() do tmpd; f = joinpath(tmpd, "file") + # tee = teeopen() + ## with teeclose + iob = IOBuffer() + ioc = IOContext(IOBuffer()) + tee = teeopen(iob, ioc, (f, "w")) + write_things(tee) + teeclose(tee) + @test String(take!(iob)) == String(take!(ioc.io)) == read(f, String) == correct + ## with close + iob = IOBuffer() + ioc = IOContext(IOBuffer()) + tee = teeopen(iob, ioc, (f, "w")) + write_things(tee) + close(tee) + @test_throws ArgumentError String(take!(iob)) + @test_throws ArgumentError String(take!(ioc.io)) + @test read(f, String) == correct + + # teeopen() do tee + iob = IOBuffer() + ioc = IOContext(IOBuffer()) + teeopen(iob, ioc, (f, "w")) do tee + write_things(tee) + end + @test_throws ArgumentError String(take!(iob)) + @test_throws ArgumentError String(take!(ioc.io)) + @test read(f, String) == correct + end + + # Test some integration with other packages + mktempdir() do tmpd + url = "https://julialang-s3.julialang.org/bin/linux/x64/1.5/julia-1.5.3-linux-x86_64.tar.gz" + expected_shasum = "f190c938dd6fed97021953240523c9db448ec0a6760b574afd4e9924ab5615f1" + + # HTTP.Stream + # + + # | + # +--------+----------+ + # | | + # v v + # BufferStream GzipDecompressor + # + + + # | | + # v v + # SHA.sha256 BufferStream + # + + # | + # v + # Tar.rewrite + # + + # | + # +--------+---------+ + # | | + # v v + # GzipCompressor ZstdCompressor + + tee = teeopen( + (GzipCompressorStream, joinpath(tmpd, "julia.tar.gz"), "w"), + (ZstdCompressorStream, joinpath(tmpd, "julia.tar.zst"), "w"), + ) + bs = BufferStream() + bs2 = BufferStream() + gzd = GzipDecompressorStream(bs2) + tee2 = teeopen(bs, gzd) + @sync begin + tar_task = @async Tar.rewrite(bs2, tee) + dl_task = @async HTTP.get(url; response_stream=tee2) + sha_task = @async bytes2hex(SHA.sha256(bs)) + @test fetch(sha_task) == expected_shasum + end + end + +end #testset