6 changed files with 294 additions and 0 deletions
@ -0,0 +1,21 @@ |
|||||||
|
MIT License |
||||||
|
|
||||||
|
Copyright (c) 2021 Fredrik Ekre |
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy |
||||||
|
of this software and associated documentation files (the "Software"), to deal |
||||||
|
in the Software without restriction, including without limitation the rights |
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||||
|
copies of the Software, and to permit persons to whom the Software is |
||||||
|
furnished to do so, subject to the following conditions: |
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all |
||||||
|
copies or substantial portions of the Software. |
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
||||||
|
SOFTWARE. |
||||||
@ -0,0 +1,18 @@ |
|||||||
|
name = "TeeStreams" |
||||||
|
uuid = "c24d529b-c83e-40e0-9fc3-05832c05a0ba" |
||||||
|
version = "1.0.0" |
||||||
|
|
||||||
|
[compat] |
||||||
|
julia = "1.5" |
||||||
|
|
||||||
|
[extras] |
||||||
|
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" |
||||||
|
CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2" |
||||||
|
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" |
||||||
|
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" |
||||||
|
SimpleBufferStream = "777ac1f9-54b0-4bf8-805c-2214025038e7" |
||||||
|
Tar = "a4e569a6-e804-4fa4-b0f3-eef7a1d5b13e" |
||||||
|
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" |
||||||
|
|
||||||
|
[targets] |
||||||
|
test = ["CodecZlib", "CodecZstd", "HTTP", "SHA", "SimpleBufferStream", "Tar", "Test"] |
||||||
@ -0,0 +1,46 @@ |
|||||||
|
# TeeStreams |
||||||
|
|
||||||
|
Simplify writing to multiple streams at once. |
||||||
|
|
||||||
|
### Example: Compress with multiple encodings |
||||||
|
|
||||||
|
```julia |
||||||
|
using TeeStreams, CodecZlib, CodecZstd |
||||||
|
|
||||||
|
function compress(file) |
||||||
|
open(file, "r") do src |
||||||
|
teeopen(tee -> write(tee, src), |
||||||
|
(GzipCompressorStream, file * ".gz", "w"), |
||||||
|
(ZstdCompressorStream, file * ".zst", "w") |
||||||
|
) |
||||||
|
end |
||||||
|
end |
||||||
|
|
||||||
|
compress("Project.toml") |
||||||
|
``` |
||||||
|
|
||||||
|
### Example: Pass data to checksum function and to disk |
||||||
|
|
||||||
|
```julia |
||||||
|
using TeeStreams, SHA, SimpleBufferStream |
||||||
|
|
||||||
|
function download_verify(url, expected_shasum) |
||||||
|
filename = split(url, '/')[end] |
||||||
|
buf = BufferStream() |
||||||
|
dl_task = @async begin |
||||||
|
teeopen(buf, (filename, "w")) do tee |
||||||
|
write(tee, open(`curl -fsSL $url`)) |
||||||
|
end |
||||||
|
end |
||||||
|
shasum = fetch(@async bytes2hex(SHA.sha256(buf))) |
||||||
|
if shasum != expected_shasum |
||||||
|
error("something went wrong") |
||||||
|
end |
||||||
|
wait(dl_task) |
||||||
|
end |
||||||
|
|
||||||
|
url = "https://julialang-s3.julialang.org/bin/linux/x64/1.5/julia-1.5.3-linux-x86_64.tar.gz" |
||||||
|
expected_shasum = "f190c938dd6fed97021953240523c9db448ec0a6760b574afd4e9924ab5615f1" |
||||||
|
|
||||||
|
download_verify(url, expected_shasum) |
||||||
|
``` |
||||||
@ -0,0 +1,89 @@ |
|||||||
|
module TeeStreams |
||||||
|
|
||||||
|
export TeeStream, teeopen, teeclose |
||||||
|
|
||||||
|
struct TeeStream{T <: NTuple{<:Any, IO}} <: IO |
||||||
|
streams::T |
||||||
|
opened_idx::Union{<:NTuple{<:Any,Bool}, Nothing} |
||||||
|
function TeeStream{T}(ios::T, opened_idx=nothing) where T <: NTuple{<:Any, IO} |
||||||
|
tee = new{T}(ios, opened_idx) |
||||||
|
# check_writable(tee) |
||||||
|
return tee |
||||||
|
end |
||||||
|
end |
||||||
|
TeeStream(ios::IO...) = TeeStream{typeof(ios)}(ios) |
||||||
|
|
||||||
|
# See https://docs.julialang.org/en/v1/base/io-network/#Base.unsafe_write |
||||||
|
function Base.unsafe_write(tee::TeeStream, p::Ptr{UInt8}, nb::UInt) |
||||||
|
# check_writable(tee) |
||||||
|
@sync for s in tee.streams |
||||||
|
@async begin |
||||||
|
# TODO: Is it enough to rely on write locks on each s? |
||||||
|
n = unsafe_write(s, p, nb) |
||||||
|
check_written(n, nb) |
||||||
|
end |
||||||
|
end |
||||||
|
return Int(nb) |
||||||
|
end |
||||||
|
function Base.write(tee::TeeStream, b::UInt8) |
||||||
|
# check_writable(tee) |
||||||
|
for s in tee.streams |
||||||
|
n = write(s, b) |
||||||
|
check_written(n, 1) |
||||||
|
end |
||||||
|
return 1 |
||||||
|
end |
||||||
|
|
||||||
|
maybe_open(io) = io |
||||||
|
maybe_open(io::Tuple) = open(io...) |
||||||
|
function teeopen(args::Union{IO, Tuple}...) |
||||||
|
opened_idx = ntuple(i -> args[i] isa Tuple, length(args)) |
||||||
|
streams = map(maybe_open, args) |
||||||
|
return TeeStream{typeof(streams)}(streams, opened_idx) |
||||||
|
end |
||||||
|
|
||||||
|
function teeopen(f::Function, args::Union{IO,Tuple}...) |
||||||
|
tee = teeopen(args...) |
||||||
|
try |
||||||
|
f(tee) |
||||||
|
finally |
||||||
|
close(tee) |
||||||
|
# teeclose(tee) |
||||||
|
end |
||||||
|
end |
||||||
|
|
||||||
|
function teeclose(tee::TeeStream) |
||||||
|
if tee.opened_idx === nothing |
||||||
|
return |
||||||
|
end |
||||||
|
for i in 1:length(tee.streams) |
||||||
|
tee.opened_idx[i] || continue |
||||||
|
close(tee.streams[i]) |
||||||
|
end |
||||||
|
end |
||||||
|
|
||||||
|
Base.close(tee::TeeStream) = foreach(close, tee.streams) |
||||||
|
# function Base.close(tee::TeeStream) |
||||||
|
# for s in tee.streams |
||||||
|
# close(s) |
||||||
|
# end |
||||||
|
# end |
||||||
|
Base.flush(tee::TeeStream) = foreach(flush, tee.streams) |
||||||
|
Base.isreadable(tee::TeeStream) = false |
||||||
|
Base.isopen(tee::TeeStream) = all(isopen, tee.streams) |
||||||
|
Base.iswritable(tee::TeeStream) = all(iswritable, tee.streams) |
||||||
|
# All streams do not define iswritable reliably so just try and throw if things doesn't work |
||||||
|
# function check_writable(tee::TeeStream) |
||||||
|
# if !(isopen(tee) && iswritable(tee)) |
||||||
|
# error("stream is closed or not writeable") |
||||||
|
# end |
||||||
|
# end |
||||||
|
function check_written(n, m) |
||||||
|
if n != m |
||||||
|
error("could not write the requested bytes: stream is closed or not writeable?") |
||||||
|
end |
||||||
|
return nothing |
||||||
|
end |
||||||
|
|
||||||
|
end # module |
||||||
|
|
||||||
@ -0,0 +1,119 @@ |
|||||||
|
using TeeStreams, Test, SimpleBufferStream, HTTP, |
||||||
|
CodecZlib, CodecZstd, SHA, Tar |
||||||
|
|
||||||
|
|
||||||
|
function write_things(io) |
||||||
|
# String |
||||||
|
write(io, "hello, string\n") |
||||||
|
# Single byte and byte arrays |
||||||
|
for b in Vector{UInt8}("hello, ") |
||||||
|
write(io, b) |
||||||
|
end |
||||||
|
write(io, Vector{UInt8}("bytes\n")) |
||||||
|
# Write from file (IOStream) |
||||||
|
f, s = mktemp() |
||||||
|
write(s, "hello, file\n"); close(s) |
||||||
|
open(io1 -> write(io, io1), f) |
||||||
|
# Read from external process (Base.Process) |
||||||
|
write(io, open(`$(Base.julia_cmd()[1]) -e 'println("hello, process")'`)) |
||||||
|
# print(ln) |
||||||
|
print(io, "hello, "); println(io, "print") |
||||||
|
return io |
||||||
|
end |
||||||
|
|
||||||
|
@testset "TeeStreams" begin |
||||||
|
correct = String(take!(write_things(IOBuffer()))) |
||||||
|
|
||||||
|
# write(::TeeStream, x) for different x and tee'd streams |
||||||
|
iob = IOBuffer() |
||||||
|
ioc = IOContext(IOBuffer()) |
||||||
|
f, iof = mktemp() |
||||||
|
bs = BufferStream() |
||||||
|
tee = TeeStream(iob, ioc, iof, bs) |
||||||
|
write_things(tee) |
||||||
|
close(iof); close(bs) |
||||||
|
@test String(take!(iob)) == String(take!(ioc.io)) == |
||||||
|
read(f, String) == read(bs, String) == correct |
||||||
|
try |
||||||
|
close(tee) |
||||||
|
write_things(tee) |
||||||
|
catch err |
||||||
|
@test err isa CompositeException |
||||||
|
@test length(err.exceptions) == 4 |
||||||
|
@test all(x -> x isa TaskFailedException, err.exceptions) |
||||||
|
end |
||||||
|
|
||||||
|
mktempdir() do tmpd; f = joinpath(tmpd, "file") |
||||||
|
# tee = teeopen() |
||||||
|
## with teeclose |
||||||
|
iob = IOBuffer() |
||||||
|
ioc = IOContext(IOBuffer()) |
||||||
|
tee = teeopen(iob, ioc, (f, "w")) |
||||||
|
write_things(tee) |
||||||
|
teeclose(tee) |
||||||
|
@test String(take!(iob)) == String(take!(ioc.io)) == read(f, String) == correct |
||||||
|
## with close |
||||||
|
iob = IOBuffer() |
||||||
|
ioc = IOContext(IOBuffer()) |
||||||
|
tee = teeopen(iob, ioc, (f, "w")) |
||||||
|
write_things(tee) |
||||||
|
close(tee) |
||||||
|
@test_throws ArgumentError String(take!(iob)) |
||||||
|
@test_throws ArgumentError String(take!(ioc.io)) |
||||||
|
@test read(f, String) == correct |
||||||
|
|
||||||
|
# teeopen() do tee |
||||||
|
iob = IOBuffer() |
||||||
|
ioc = IOContext(IOBuffer()) |
||||||
|
teeopen(iob, ioc, (f, "w")) do tee |
||||||
|
write_things(tee) |
||||||
|
end |
||||||
|
@test_throws ArgumentError String(take!(iob)) |
||||||
|
@test_throws ArgumentError String(take!(ioc.io)) |
||||||
|
@test read(f, String) == correct |
||||||
|
end |
||||||
|
|
||||||
|
# Test some integration with other packages |
||||||
|
mktempdir() do tmpd |
||||||
|
url = "https://julialang-s3.julialang.org/bin/linux/x64/1.5/julia-1.5.3-linux-x86_64.tar.gz" |
||||||
|
expected_shasum = "f190c938dd6fed97021953240523c9db448ec0a6760b574afd4e9924ab5615f1" |
||||||
|
|
||||||
|
# HTTP.Stream |
||||||
|
# + |
||||||
|
# | |
||||||
|
# +--------+----------+ |
||||||
|
# | | |
||||||
|
# v v |
||||||
|
# BufferStream GzipDecompressor |
||||||
|
# + + |
||||||
|
# | | |
||||||
|
# v v |
||||||
|
# SHA.sha256 BufferStream |
||||||
|
# + |
||||||
|
# | |
||||||
|
# v |
||||||
|
# Tar.rewrite |
||||||
|
# + |
||||||
|
# | |
||||||
|
# +--------+---------+ |
||||||
|
# | | |
||||||
|
# v v |
||||||
|
# GzipCompressor ZstdCompressor |
||||||
|
|
||||||
|
tee = teeopen( |
||||||
|
(GzipCompressorStream, joinpath(tmpd, "julia.tar.gz"), "w"), |
||||||
|
(ZstdCompressorStream, joinpath(tmpd, "julia.tar.zst"), "w"), |
||||||
|
) |
||||||
|
bs = BufferStream() |
||||||
|
bs2 = BufferStream() |
||||||
|
gzd = GzipDecompressorStream(bs2) |
||||||
|
tee2 = teeopen(bs, gzd) |
||||||
|
@sync begin |
||||||
|
tar_task = @async Tar.rewrite(bs2, tee) |
||||||
|
dl_task = @async HTTP.get(url; response_stream=tee2) |
||||||
|
sha_task = @async bytes2hex(SHA.sha256(bs)) |
||||||
|
@test fetch(sha_task) == expected_shasum |
||||||
|
end |
||||||
|
end |
||||||
|
|
||||||
|
end #testset |
||||||
Loading…
Reference in new issue