Defines the PIPE function that works as a pipe between two ports, streaming data from the first to the second.
A powerful feature of Unix systems is the ability to "connect together" commands so that the output of one becomes the input of another; this is called "piping" because it's like connecting the output of a command to the input of another command using a "pipe". We introduce the same concept in REBOL: the ability to connect a port to another port with a "pipe", using the pipe function. This function will read all the data in the first port, and write it to the second port. A number of options make it flexible, allowing users to do what they want without knowing the details of how ports work.
The pipe function takes two arguments: the source port, and the destination port. It is possible to supply a file! or url!, and they will be opened and closed automatically (in /direct/binary mode). Data is copied from source to destination, by default 256 KB at a time, until the source port returns none (i.e. end of file). It is possible to use the /with refinement to override the default 256 KB chunk size, and the /part refinement to limit the amount of data to copy.
The /thru refinement allows specifying a "filter" port to stream the data through. For example, you can provide a crypt:// port to encrypt or decrypt all data while it's being copied. This allows performing all kinds of transformations to the data being copied, without it having to be read all in memory.
〈Overview〉 ≡
pipe: func [
{Create a pipe between two ports and stream all data from source to dest} [catch]
source [port! file! url!]
dest [port! file! url!]
/thru filter [port! url!]
/part size [integer!]
;/skip length [integer!]
/with chunk-size [integer!]
/local 〈pipe's locals〉
] [
〈Stream all data from source to dest, filtering thru filter, etc.〉
]
The core of the function is a while loop. The condition and body are built dynamically depending on the mode of operation (presence of a filter, /part being used etc.); this makes the function both flexible and efficient.
〈Stream all data from source to dest, filtering thru filter, etc.〉 ≡
〈Check arguments〉
;if length [source: system/words/skip source length]
〈Build body and condition〉
while condition body
〈Update ports and clean up〉
condition and body are local to the function.
〈pipe's locals〉 ≡
condition body
First of all, the arguments are checked; chunk-size is defaulted to 256 KB, size and chunk-size if specified must be greater than zero, and source, dest and filter are opened if they are provided as files or urls. (filter cannot be a file! since files are not filters.)
〈Check arguments〉 ≡
chunk-size: any [chunk-size 256 * 1024] ; default 256k
unless any [not size size > 0] [throw make error! compose [script invalid-arg (join size " (size must be greater than zero)")]]
unless chunk-size > 0 [throw make error! compose [script invalid-arg (join chunk-size " (chunk-size must be greater than zero)")]]
unless port? source [
source: open/binary/direct/read source
close-source?: yes
]
unless port? dest [
dest: open/binary/direct/write/new dest
close-dest?: yes
]
if url? filter [
filter: open/binary filter
close-filter?: yes
]
As you can see, we need close-source?, close-dest? and close-filter? as locals.
〈pipe's locals〉 +≡
close-source? close-dest? close-filter?
condition just checks size in /part mode (we will decrement size as we copy data), while if /part was not specified we'll loop as long as copy on the source port does not return none.
In the body, we need to do the copy and decrement size in the /part case, and if we're using a filter, we need to filter the data coming from source before inserting it into dest. Notice that the filtering is done sort of backward (first we copy from the filter, then we insert data into it): this is in order to make sure that we always call update on the filter port before calling copy on it for the last time; otherwise, ports such as crypt:// in decryption mode will not work correctly. Calling copy on the filter before we have inserted any data is not a problem, because it will just return none as there's no data yet.
〈Build body and condition〉 ≡
condition: either size [
[size > 0]
] [
[data: copy/part source chunk-size]
]
body: compose [
(either size [
[
data: copy/part source min size chunk-size
either data [size: size - length? data] [break]
]
] [
[ ]
])
(either filter [
[
; done this way to work around crypt:// problem
if filtered: copy filter [insert dest filtered]
insert filter data
]
] [
[
insert dest data
]
])
]
We need to add data and filtered to the list of local words:
〈pipe's locals〉 +≡
data filtered
After we've done copying all the data, we need to call update on the filter port and copy over any remaining data, then call update on the destination port (the attempt is there because file ports don't like update). Then, we close any ports that we opened.
〈Update ports and clean up〉 ≡
if filter [
update filter
if data: copy filter [insert dest data]
]
attempt [update dest]
case/all [
close-source? [close source]
close-dest? [close dest]
close-filter? [close filter]
]