Contents:

1. Introduction

A powerful feature of Unix systems is the ability to "connect together" commands so that the output of one becomes the input of another; this is called "piping" because it's like connecting the output of a command to the input of another command using a "pipe". We introduce the same concept in REBOL: the ability to connect a port to another port with a "pipe", using the pipe function. This function will read all the data in the first port, and write it to the second port. A number of options make it flexible, allowing users to do what they want without knowing the details of how ports work.

2. Overview

The pipe function takes two arguments: the source port, and the destination port. It is possible to supply a file! or url!, and they will be opened and closed automatically (in /direct/binary mode). Data is copied from source to destination, by default 256 KB at a time, until the source port returns none (i.e. end of file). It is possible to use the /with refinement to override the default 256 KB chunk size, and the /part refinement to limit the amount of data to copy.

The /thru refinement allows specifying a "filter" port to stream the data through. For example, you can provide a crypt:// port to encrypt or decrypt all data while it's being copied. This allows performing all kinds of transformations to the data being copied, without it having to be read all in memory.

Overview

pipe: func [
 {Create a pipe between two ports and stream all data from source to dest} [catch]
 source [port! file! url!]
 dest [port! file! url!]
 /thru filter [port! url!]
 /part size [integer!]
 ;/skip length [integer!]
 /with chunk-size [integer!]

 /local pipe's locals
] [
 Stream all data from source to dest, filtering thru filter, etc.
]

3. Stream all data from source to dest, filtering thru filter, etc.

The core of the function is a while loop. The condition and body are built dynamically depending on the mode of operation (presence of a filter, /part being used etc.); this makes the function both flexible and efficient.

Stream all data from source to dest, filtering thru filter, etc.

Check arguments
;if length [source: system/words/skip source length]
Build body and condition
while condition body
Update ports and clean up

3.1 pipe's locals

condition and body are local to the function.

pipe's locals

condition body

3.2 Check arguments

First of all, the arguments are checked; chunk-size is defaulted to 256 KB, size and chunk-size if specified must be greater than zero, and source, dest and filter are opened if they are provided as files or urls. (filter cannot be a file! since files are not filters.)

Check arguments

chunk-size: any [chunk-size 256 * 1024] ; default 256k
unless any [not size size > 0] [throw make error! compose [script invalid-arg (join size " (size must be greater than zero)")]]
unless chunk-size > 0 [throw make error! compose [script invalid-arg (join chunk-size " (chunk-size must be greater than zero)")]]
unless port? source [
 source: open/binary/direct/read source
 close-source?: yes
]
unless port? dest [
 dest: open/binary/direct/write/new dest
 close-dest?: yes
]
if url? filter [
 filter: open/binary filter
 close-filter?: yes
]

As you can see, we need close-source?, close-dest? and close-filter? as locals.

pipe's locals +≡

close-source? close-dest? close-filter?

3.3 Build body and condition

condition just checks size in /part mode (we will decrement size as we copy data), while if /part was not specified we'll loop as long as copy on the source port does not return none.

In the body, we need to do the copy and decrement size in the /part case, and if we're using a filter, we need to filter the data coming from source before inserting it into dest. Notice that the filtering is done sort of backward (first we copy from the filter, then we insert data into it): this is in order to make sure that we always call update on the filter port before calling copy on it for the last time; otherwise, ports such as crypt:// in decryption mode will not work correctly. Calling copy on the filter before we have inserted any data is not a problem, because it will just return none as there's no data yet.

Build body and condition

condition: either size [
 [size > 0]
] [
 [data: copy/part source chunk-size]
]
body: compose [
 (either size [
  [
   data: copy/part source min size chunk-size
   either data [size: size - length? data] [break]
  ]
 ] [
  [ ]
 ])
 (either filter [
  [
   ; done this way to work around crypt:// problem
   if filtered: copy filter [insert dest filtered]
   insert filter data
  ]
 ] [
  [
   insert dest data
  ]
 ])
]

We need to add data and filtered to the list of local words:

pipe's locals +≡

data filtered

3.4 Update ports and clean up

After we've done copying all the data, we need to call update on the filter port and copy over any remaining data, then call update on the destination port (the attempt is there because file ports don't like update). Then, we close any ports that we opened.

Update ports and clean up

if filter [
 update filter
 if data: copy filter [insert dest data]
]
attempt [update dest]
case/all [
 close-source? [close source]
 close-dest? [close dest]
 close-filter? [close filter]
]