Browse Source

Some docstrings and examples.

pull/1/head
Fredrik Ekre 5 years ago
parent
commit
a1eae2b093
  1. 43
      README.md
  2. 100
      src/TeeStreams.jl
  3. 66
      test/runtests.jl

43
README.md

@ -2,6 +2,26 @@ @@ -2,6 +2,26 @@
Simplify writing to multiple streams at once.
## Usage
```julia
tee = TeeStream(io::IO...)
```
Construct a tee stream by wrapping multiple writable IO objects.
```julia
TeeStream(f::Function, io::IO...) do tee
# ...
end
```
Construct a tee stream by wrapping multiple writable IO objects and
call function `f` on the tee. Automatically calls `close` on the tee
before returning.
`close` and `flush` on a tee stream closes/flushes all the wrapped streams.
### Example: Compress with multiple encodings
```julia
@ -9,17 +29,19 @@ using TeeStreams, CodecZlib, CodecZstd @@ -9,17 +29,19 @@ using TeeStreams, CodecZlib, CodecZstd
function compress(file)
open(file, "r") do src
teeopen(tee -> write(tee, src),
(GzipCompressorStream, file * ".gz", "w"),
(ZstdCompressorStream, file * ".zst", "w")
)
TeeStream(
GzipCompressorStream(open(file * ".gz", "w")),
ZstdCompressorStream(open(file * ".zst", "w"))
) do tee
write(tee, src)
end
end
end
compress("Project.toml")
```
### Example: Pass data to checksum function and to disk
### Example: Write data to checksum function and to disk
```julia
using TeeStreams, SHA, SimpleBufferStream
@ -27,16 +49,19 @@ using TeeStreams, SHA, SimpleBufferStream @@ -27,16 +49,19 @@ using TeeStreams, SHA, SimpleBufferStream
function download_verify(url, expected_shasum)
filename = split(url, '/')[end]
buf = BufferStream()
dl_task = @async begin
teeopen(buf, (filename, "w")) do tee
@sync begin
@async begin
TeeStream(buf, open(filename, "w")) do tee
write(tee, open(`curl -fsSL $url`))
end
end
shasum = fetch(@async bytes2hex(SHA.sha256(buf)))
@async begin
shasum = bytes2hex(SHA.sha256(buf))
if shasum != expected_shasum
error("something went wrong")
end
wait(dl_task)
end
end
end
url = "https://julialang-s3.julialang.org/bin/linux/x64/1.5/julia-1.5.3-linux-x86_64.tar.gz"

100
src/TeeStreams.jl

@ -1,7 +1,23 @@ @@ -1,7 +1,23 @@
module TeeStreams
export TeeStream, teeopen, teeclose
export TeeStream
"""
```julia
tee = TeeStream(io::IO...)
```
Construct a tee stream by wrapping multiple writable IO objects.
```julia
TeeStream(f::Function, io::IO...) do tee
# ...
end
```
Construct a tee stream by wrapping multiple writable IO objects and
call function `f` on the tee. Automatically calls `close` on the tee
before returning.
"""
struct TeeStream{T <: NTuple{<:Any, IO}} <: IO
streams::T
opened_idx::Union{<:NTuple{<:Any,Bool}, Nothing}
@ -11,11 +27,19 @@ struct TeeStream{T <: NTuple{<:Any, IO}} <: IO @@ -11,11 +27,19 @@ struct TeeStream{T <: NTuple{<:Any, IO}} <: IO
return tee
end
end
TeeStream(ios::IO...) = TeeStream{typeof(ios)}(ios)
function TeeStream(f::Function, ios::IO...)
tee = TeeStream(ios...)
try
f(tee)
finally
close(tee)
end
end
# See https://docs.julialang.org/en/v1/base/io-network/#Base.unsafe_write
function Base.unsafe_write(tee::TeeStream, p::Ptr{UInt8}, nb::UInt)
# check_writable(tee)
@sync for s in tee.streams
@async begin
# TODO: Is it enough to rely on write locks on each s?
@ -34,44 +58,24 @@ function Base.write(tee::TeeStream, b::UInt8) @@ -34,44 +58,24 @@ function Base.write(tee::TeeStream, b::UInt8)
return 1
end
maybe_open(io) = io
maybe_open(io::Tuple) = open(io...)
function teeopen(args::Union{IO, Tuple}...)
opened_idx = ntuple(i -> args[i] isa Tuple, length(args))
streams = map(maybe_open, args)
return TeeStream{typeof(streams)}(streams, opened_idx)
end
"""
close(tee::TeeStream)
function teeopen(f::Function, args::Union{IO,Tuple}...)
tee = teeopen(args...)
try
f(tee)
finally
close(tee)
# teeclose(tee)
end
end
Close all streams wrapped in the tee stream.
"""
Base.close(tee::TeeStream) = foreach(close, tee.streams)
function teeclose(tee::TeeStream)
if tee.opened_idx === nothing
return
end
for i in 1:length(tee.streams)
tee.opened_idx[i] || continue
close(tee.streams[i])
end
end
"""
flush(tee::TeeStream)
Base.close(tee::TeeStream) = foreach(close, tee.streams)
# function Base.close(tee::TeeStream)
# for s in tee.streams
# close(s)
# end
# end
Flush all streams wrapped in the tee stream.
"""
Base.flush(tee::TeeStream) = foreach(flush, tee.streams)
Base.isreadable(tee::TeeStream) = false
Base.isopen(tee::TeeStream) = all(isopen, tee.streams)
Base.iswritable(tee::TeeStream) = all(iswritable, tee.streams)
# All streams do not define iswritable reliably so just try and throw if things doesn't work
# function check_writable(tee::TeeStream)
# if !(isopen(tee) && iswritable(tee))
@ -85,5 +89,35 @@ function check_written(n, m) @@ -85,5 +89,35 @@ function check_written(n, m)
return nothing
end
# maybe_open(io) = io
# maybe_open(io::Tuple) = open(io...)
# function teeopen(args::Union{IO, Tuple}...)
# opened_idx = ntuple(i -> args[i] isa Tuple, length(args))
# streams = map(maybe_open, args)
# return TeeStream{typeof(streams)}(streams, opened_idx)
# end
# function teeopen(f::Function, args::Union{IO,Tuple}...)
# tee = teeopen(args...)
# try
# f(tee)
# finally
# close(tee)
# # teeclose(tee)
# end
# end
# function teeclose(tee::TeeStream)
# if tee.opened_idx === nothing
# return
# end
# for i in 1:length(tee.streams)
# tee.opened_idx[i] || continue
# close(tee.streams[i])
# end
# end
end # module

66
test/runtests.jl

@ -15,16 +15,20 @@ function write_things(io) @@ -15,16 +15,20 @@ function write_things(io)
write(s, "hello, file\n"); close(s)
open(io1 -> write(io, io1), f)
# Read from external process (Base.Process)
write(io, open(`$(Base.julia_cmd()[1]) -e 'println("hello, process")'`))
open(`$(Base.julia_cmd()[1]) -e 'println("hello, process")'`) do proc
write(io, proc)
end
# print(ln)
print(io, "hello, "); println(io, "print")
return io
end
@testset "TeeStreams" begin
# write(::TeeStream, x) for different x and tee'd streams
correct = String(take!(write_things(IOBuffer())))
# write(::TeeStream, x) for different x and tee'd streams
# tee = TeeStream(...)
iob = IOBuffer()
ioc = IOContext(IOBuffer())
f, iof = mktemp()
@ -43,34 +47,20 @@ end @@ -43,34 +47,20 @@ end
@test all(x -> x isa TaskFailedException, err.exceptions)
end
mktempdir() do tmpd; f = joinpath(tmpd, "file")
# tee = teeopen()
## with teeclose
iob = IOBuffer()
ioc = IOContext(IOBuffer())
tee = teeopen(iob, ioc, (f, "w"))
write_things(tee)
teeclose(tee)
@test String(take!(iob)) == String(take!(ioc.io)) == read(f, String) == correct
## with close
iob = IOBuffer()
ioc = IOContext(IOBuffer())
tee = teeopen(iob, ioc, (f, "w"))
write_things(tee)
close(tee)
@test_throws ArgumentError String(take!(iob))
@test_throws ArgumentError String(take!(ioc.io))
@test read(f, String) == correct
# teeopen() do tee
iob = IOBuffer()
ioc = IOContext(IOBuffer())
teeopen(iob, ioc, (f, "w")) do tee
# TeeStream(...) do tee
mktempdir() do tmpd;
f1 = joinpath(tmpd, "file")
io1 = open(f1, "w")
f2 = joinpath(tmpd, "file2")
io2 = open(f2, "w")
@test isopen(io1)
@test isopen(io2)
TeeStream(io1, io2) do tee
write_things(tee)
end
@test_throws ArgumentError String(take!(iob))
@test_throws ArgumentError String(take!(ioc.io))
@test read(f, String) == correct
@test !isopen(io1)
@test !isopen(io2)
@test read(f1, String) == read(f2, String) == correct
end
# Test some integration with other packages
@ -100,18 +90,18 @@ end @@ -100,18 +90,18 @@ end
# v v
# GzipCompressor ZstdCompressor
tee = teeopen(
(GzipCompressorStream, joinpath(tmpd, "julia.tar.gz"), "w"),
(ZstdCompressorStream, joinpath(tmpd, "julia.tar.zst"), "w"),
buffer_shasum = BufferStream()
buffer_tar = BufferStream()
decompressor = GzipDecompressorStream(buffer_tar)
tee = TeeStream(buffer_shasum, decompressor)
compressors = TeeStream(
GzipCompressorStream(open(joinpath(tmpd, "julia.tar.gz"), "w")),
ZstdCompressorStream(open(joinpath(tmpd, "julia.tar.zst"), "w")),
)
bs = BufferStream()
bs2 = BufferStream()
gzd = GzipDecompressorStream(bs2)
tee2 = teeopen(bs, gzd)
@sync begin
tar_task = @async Tar.rewrite(bs2, tee)
dl_task = @async HTTP.get(url; response_stream=tee2)
sha_task = @async bytes2hex(SHA.sha256(bs))
dl_task = @async HTTP.get(url; response_stream=tee)
sha_task = @async bytes2hex(SHA.sha256(buffer_shasum))
tar_task = @async Tar.rewrite(buffer_tar, compressors)
@test fetch(sha_task) == expected_shasum
end
end

Loading…
Cancel
Save