/*----------------------------------------------------------------------------*/ /* */ /* Copyright (c) 1995, 2004 IBM Corporation. All rights reserved. */ /* Copyright (c) 2005-2022 Rexx Language Association. All rights reserved. */ /* */ /* This program and the accompanying materials are made available under */ /* the terms of the Common Public License v1.0 which accompanies this */ /* distribution. A copy is also available at the following address: */ /* https://www.oorexx.org/license.html */ /* */ /* Redistribution and use in source and binary forms, with or */ /* without modification, are permitted provided that the following */ /* conditions are met: */ /* */ /* Redistributions of source code must retain the above copyright */ /* notice, this list of conditions and the following disclaimer. */ /* Redistributions in binary form must reproduce the above copyright */ /* notice, this list of conditions and the following disclaimer in */ /* the documentation and/or other materials provided with the distribution. */ /* */ /* Neither the name of Rexx Language Association nor the names */ /* of its contributors may be used to endorse or promote products */ /* derived from this software without specific prior written permission. */ /* */ /* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS */ /* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT */ /* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS */ /* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT */ /* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */ /* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED */ /* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, */ /* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY */ /* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ /* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS */ /* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* */ /*----------------------------------------------------------------------------*/ /******************************************************************************/ /* pipe.rex Open Object Rexx Samples */ /* */ /* A pipeline implementation */ /* */ /* -------------------------------------------------------------------------- */ /* */ /* Description: */ /* This program demonstrates the use of ::class and ::method directives to */ /* create a simple implementation of a CMS-like pipeline function. */ /******************************************************************************/ /** * Base pipeStage class. Most sub classes need only override the process() method to * implement a pipeStage. The transformed results are passed down the pipeStage chain * by calling the write method. */ ::class pipeStage public -- base pipeStage class ::method init expose next secondary next = .nil secondary = .nil -- all pipeStages have a secondary output potential ::method '|' class -- concatenate an instance of a pipeStage with following pipeStage use strict arg follower me = self~new -- create a new pipeStage instance return me|follower -- perform the hook up ::method '>' class -- concatenate an instance of a pipeStage with following pipeStage use strict arg follower me = self~new -- create a new pipeStage instance return me>follower -- perform the hook up ::method '>>' class -- concatenate an instance of a pipeStage to the secondary stream with following pipeStage use strict arg follower me = self~new -- create a new pipeStage instance return me>>follower -- perform the hook up ::method '|' -- concatente two pipe stream use strict arg follower follower = follower~new -- make sure this is an instance self~append(follower) -- do the chain append logic return self -- we're our own return value ::method '>' -- synonym for '|' usually reserved for branchout connections use strict arg follower follower = follower~new -- make sure this is an instance self~append(follower) -- do the chain append logic return self -- we're our own return value ::method '>>' -- attach a pipe stage to the secondary socket use strict arg follower follower = follower~new -- make sure this is an instance self~appendSecondary(follower) -- do the chain append logic. but to the secondary this time return self -- we're our own return value ::method append -- append a pipeStage to the entire chain expose next use strict arg follower if .nil == next then do -- if we're the end already, just update the next next = follower end else do next~append(follower) -- have our successor append it. end ::method appendSecondary -- append a stage to the secondary output of entire chain expose next secondary use strict arg follower if .nil == secondary then do -- if we're the end already, just update the next secondary = follower -- append this to the secondary port. end else do secondary~appendSecondary(follower) -- have our successor append it. end ::method insert -- insert a pipeStage after this one, but before the next expose next user strict arg newpipeStage newpipeStage~next = next -- just hook into the chain next = newpipeStage ::method '[]' class -- create a pipeStage instance with arguments forward to (self) message('NEW') -- just forward this as a new message ::method go -- execute using a provided object expose source -- get the source supplier use strict arg source -- set to the supplied object self~begin -- have the stages initize any state data engine = source~supplier -- get a data supplier do while engine~available -- while more data self~process(engine~item) -- pump this down the pipe engine~next -- get the next data item end self~eof -- signal that processing is finished ::method secondary attribute -- a potential secondary attribute ::method next attribute -- next stage of the pipeStage ::method source attribute -- source of the initial data -- we implement a new method on the instance -- so that we can issue a new method during the pipeline -- chaining process. this just returns the instance unchanged ::method new return self -- just return ourself ::method process -- default data processing use strict arg value -- get the data item self~write(value) -- send this down the line ::method write -- handle the result from a process method expose next use strict arg data if .nil <> next then do next~process(data) -- only forward if we have a successor end ::method writeSecondary -- handle a secondary output result from a process method expose secondary use strict arg data if .nil <> secondary then do secondary~process(data) -- only forward if we have a successor end ::method processSecondary -- handle a secondary output result from a process method forward message('PROCESS') -- this by default is a merge operation ::method begin -- process pipe stage initialization expose next secondary if .nil <> next then do next~begin -- only forward if we have a successor end if .nil <> secondary then do -- and do the same with the seconcary stream secondary~begin end ::method eof -- process "end-of-pipe" condition expose next secondary if .nil <> next then do next~eof -- only forward if we have a successor end if .nil <> secondary then do secondary~eof -- only forward if we have a successor end ::method secondaryEof -- process "end-of-pipe" condition -- we just ignore this one, and rely on the secondary ::method secondaryBegin -- process startup condition -- we just ignore this one, and rely on the secondary ::method secondaryConnector -- retrieve a secondary connector for a pipeStage return new .SecondaryConnector(self) ::class SecondaryConnector subclass pipeStage ::method init expose pipeStage use strict arg pipeStage -- this just hooks up self~init:super -- forward the initialization ::method process -- processing operations connect with pipeStage secondaries expose pipeStage forward to(pipeStage) message('processSecondary') ::method begin -- processing operations connect with pipeStage secondaries expose pipeStage forward to(pipeStage) message('secondaryBegin') ::method eof -- processing operations connect with pipeStage secondaries expose pipeStage forward to(pipeStage) message('secondaryEof') -- base class for pipe stages that need to accumulate data with actual processing -- deferred until the eof event is received. Sorting is a good example of this type of -- stage ::class bufferingStage public subclass pipeStage ::method begin -- sorter initialization method expose items -- list of sorted items items = .array~new -- create a new list forward class(super) -- forward the initialization ::attribute items private -- access to the accumulated items ::method process -- process sorter piped data item expose items -- access internal state data use strict arg value -- access the passed value items~append(value) -- append the value to the accumulator array ::method eof -- end of processing clean up expose items items = .nil forward class(super) -- propagate the event ::method writeItems private -- send along the items expose items do i = 1 to items~items -- copy all sorted items to the primary stream self~write(items[i]) end ::class sort public subclass bufferingStage -- sort piped data ::method eof -- process the "end-of-pipe" self~items~sort -- sort the accumulated items self~writeItems -- now release the Kraken forward class(super) -- make sure we propagate the done message ::class sortWith public subclass pipeStage -- sort piped data ::method init expose comparator use strict arg comparator ::method eof -- process the "end-of-pipe" expose comparator self~items~sortWith(comparator) -- sort the accumulated items self~writeItems -- now release the Kraken forward class(super) -- make sure we propagate the done message ::class reverse public subclass pipeStage -- a string reversal pipeStage ::method process -- pipeStage processing item use strict arg value -- get the data item self~write(value~reverse) -- send it along in reversed form ::class split public subclass pipeStage -- split the record at a give delimiter ::method init expose delimiter use strict arg delimiter forward class(super) -- make sure we propagate the done message ::method process -- pipeStage processing item expose delimiter use strict arg value -- get the data item loop line over value~makeArray(delimiter) -- split the record into smaller chunks self~write(line) -- send each piece down the pipe end ::class strip public subclass pipeStage -- strip leading or trailing blanks ::method init expose option chars use strict arg option = 'B', chars = '2009'x -- default is blanks and horizontal tabs forward class(super) -- make sure we propagate the done message ::method process expose option chars use strict arg value self~write(value~strip(option, chars)) -- strip and send along ::class upper public subclass pipeStage -- a uppercasing pipeStage ::method process -- pipeStage processing item use strict arg value -- get the data item self~write(value~upper) -- send it along in upper form ::class lower public subclass pipeStage -- a lowercasing pipeStage ::method process -- pipeStage processing item use strict arg value -- get the data item self~write(value~lower) -- send it along in lower form ::class changestr public subclass pipeStage -- a string replacement pipeStage ::method init expose old new count use strict arg old, new, count = 999999999 -- old and new are required, default count is max value self~init:super -- forward the initialization ::method process -- pipeStage processing item expose old new count use strict arg value -- get the data item self~write(value~changestr(old, new, count)) -- send it along in altered form ::class delstr public subclass pipeStage -- a string deletion pipeStage ::method init expose offset length use strict arg offset, length -- both are required. self~init:super -- forward the initialization ::method process -- pipeStage processing item expose offset length use strict arg value -- get the data item self~write(value~delstr(offset, length)) -- send it along in altered form ::class left public subclass pipeStage -- a splitter pipeStage ::method init expose length use strict arg length -- the length is the left part self~init:super -- forward the initialization ::method process -- pipeStage processing item expose offset length use strict arg value -- get the data item self~write(value~left(length)) -- send the left portion along the primary stream self~writeSecondary(value~substr(length + 1)) -- the secondary gets the remainder portion ::class right public subclass pipeStage -- a splitter pipeStage ::method init expose length use strict arg length -- the length is the left part self~init:super -- forward the initialization ::method process -- pipeStage processing item expose offset length use strict arg value -- get the data item self~write(value~substr(length + 1)) -- the remainder portion goes down main pipe self~writeSecondary(value~left(length)) -- send the left portion along the secondary stream ::class insert public subclass pipeStage -- insert a string into each line ::method init expose insert offset use strict arg insert, offset -- we need an offset and an insertion string self~init:super -- forward the initialization ::method process -- pipeStage processing item expose insert offset use strict arg value -- get the data item self~write(value~insert(insert, offset)) -- send the left portion along the primary stream ::class overlay public subclass pipeStage -- insert a string into each line ::method init expose overlay offset use strict arg overlay, offset -- we need an offset and an insertion string self~init:super -- forward the initialization ::method process -- pipeStage processing item expose insert offset use strict arg value -- get the data item self~write(value~overlay(overlay, offset)) -- send the left portion along the primary stream ::class dropnull public subclass pipeStage -- drop null records ::method process -- pipeStage processing item use strict arg value -- get the data item if value \== '' then do -- forward along non-null records self~write(value) end ::class dropFirst public subclass pipeStage -- drop the first n records ::method init expose count use strict arg count self~init:super -- forward the initialization ::method begin expose counter counter = 0 forward class(super) ::method process expose count counter use strict arg value counter += 1 -- if we've dropped our quota, start forwarding if counter > count then do self~write(value) end else do self~writeSecondary(value) -- non-selected records go down the secondary stream end ::class takeLast public subclass bufferingStage -- take the last n records ::method init expose count use strict arg count self~init:super -- forward the initialization ::method eof expose count array = self~items if array~items < count then do -- didn't even receive that many items? loop line over array self~write(line) -- send everything down the main pipe end end else do first = array~items - count -- this is the count of discarded items loop i = 1 to first self~writeSecondary(array[i]) -- the discarded ones go to the secondary pipe end loop i = first + 1 to array~items self~write(array[i]) -- the remainder ones go down the main pipe end end forward class(super) -- finish the eof process ::class takeFirst public subclass pipeStage -- take the first n records ::method init expose count use strict arg count self~init:super -- forward the initialization ::method begin expose counter counter = 0 forward class(super) ::method process expose count counter use strict arg value counter += 1 -- if we've dropped our quota, stop forwarding if counter > count then do self~writeSecondary(value) end else do self~write(value) -- still in the first bunch, send to main pipe end ::class dropLast public subclass bufferingStage -- drop the last n records ::method init expose count use strict arg count self~init:super -- forward the initialization ::method eof expose count array = self~items -- get the buffered records if array~items < count then do -- didn't even receive that many items? loop line over array self~writeSecondary(line) -- send everything down the secondary pipe end end else do first = array~items - count -- this is the count of selected items loop i = 1 to first self~write(array[i]) -- the selected go to the main pipe end loop i = first + 1 to array~items self~writeSecondary(array[i]) -- the discarded ones go down the secondary pipe end end forward class(super) -- finish the eof process ::class x2c public subclass pipeStage -- translate records to hex characters ::method process -- pipeStage processing item use strict arg value -- get the data item self~write(value~x2c) ::class bitbucket public subclass pipeStage -- just consume the records ::method process -- pipeStage processing item nop -- do nothing with the data ::class fanout public subclass pipeStage -- write records to both output streams ::method process -- pipeStage processing item use strict arg value -- get the data item self~write(value) -- and write to both branches of the pipe self~writeSecondary(value) ::class merge public subclass pipeStage -- merge the results from primary and secondary streams ::method begin expose mainDone secondaryEof -- need pair of EOF conditions mainDone = .false secondaryEof = .false forward class(super) ::method eof expose mainDone secondaryEof -- need interlock flags if secondaryEof then do -- the other input hit EOF already? forward class(super) -- handle as normal end mainDone = .true -- mark this branch as finished. ::method secondaryBegin -- begin on the secondary input forward message('BEGIN') ::method secondaryEof -- eof on the seconary input expose mainDone secondaryEof -- need interlock flags secondaryEof = .true -- mark ourselves finished if mainDone then do -- if both branches finished, do normal end work forward message('EOF') end ::class fanin public subclass pipeStage -- process main stream, then secondary stream ::method begin expose mainDone secondaryEof array -- need pair of EOF conditions mainDone = .false secondaryEof = .false array = .array~new -- accumulator for secondary forward class(super) -- forward the initialization ::method processSecondary -- handle the secondary input expose array use strict arg value array~append(value) -- just append to the end of the array ::method eof expose mainDone secondaryEof array -- need interlock flags if secondaryEof then do -- the other input hit EOF already? loop i = 1 to array~items -- need to write out the deferred items self~write(array[i]) end forward class(super) -- handle as normal end mainDone = .true -- mark this branch as finished. ::method secondaryEof -- eof on the seconary input expose mainDone secondaryEof -- need interlock flags secondaryEof = .true -- mark ourselves finished if mainDone then do -- if both branches finished, do normal done. forward message('EOF') end ::class duplicate public subclass pipeStage -- duplicate each record N times ::method init expose copies use strict arg copies = 1 -- by default, we do one duplicate self~init:super -- forward the initialization ::method process -- pipeStage processing item expose copies use strict arg value -- get the data item loop copies + 1 -- write this out with the duplicate count self~write(value) end ::class nodup public subclass pipeStage -- remove any duplicate items ::method begin expose alreadySeen alreadySeen = .set~new forward class(super) ::method process -- pipeStage processing item expose alreadySeen use strict arg value -- get the data item if !alreadySeen~hasItem(value) then do -- only process items we have never seen before alreadySeen~put(value) -- remember we've seen this one an pass it along self~write(value) end ::method eof -- handle pipe completion expose alreadySeen alreadySeen = .nil forward class(super) -- select all records that completely matches a regex pattern ::class regexMatch subclass pipeStage public ::method init expose pattern usg strict arg pattern if \pattern~isA(.RegularExpression) then do -- the pattern can be provided as a RegularExpression instance or a string p = .RegularExpression~new(pattern) pattern = p end ::attribute pattern private ::method process expose pattern use strict arg value if pattern~match(value) then do -- if an exact match, send it along self~write(value) end else do self~writeSecondary(value) -- handle on the secondary pipe if not selected end -- select on whether a record contains a regular expression anywhere within the string ::class regexContains subclass regexMatch public ::method process use strict arg value if self~pattern~position(value) \= 0 then do -- if found in the string, selected self~write(value) end else do self~writeSecondary(value) -- handle on the secondary pipe if not selected end ::class displayer subclass pipeStage public ::method process -- process a data item use strict arg value -- get the data value say value -- display this item forward class(super) ::class any public subclass pipeStage -- a string selector pipeStage ::method init -- process initial strings expose patterns -- access the exposed item patterns = arg(1,'a') -- get the patterns list self~init:super -- forward the initialization ::method process -- process a selection pipeStage expose patterns -- expose the pattern list use strict arg value -- access the data item do i = 1 to patterns~size -- loop through all the patterns -- this pattern in the data? if (value~pos(patterns[i]) <> 0) then do self~write(value) -- send it along return -- stop the loop end end self~writeSecondary(value) -- send all mismatches down the other branch, if there ::class all public subclass pipeStage -- a string selector pipeStage ::method init -- process initial strings expose patterns -- access the exposed item patterns = arg(1,'a') -- get the patterns list self~init:super -- forward the initialization ::method process -- process a selection pipeStage expose patterns -- expose the pattern list use strict arg value -- access the data item do i = 1 to patterns~size -- loop through all the patterns -- this pattern in the data? if (value~pos(patterns[i]) == 0) then do self~writeSecondary(value) -- send it along return -- stop the loop end end self~write(value) -- all patterns were found, so this goes to the main branch ::class startsWith public subclass pipeStage -- a string selector pipeStage ::method init -- process initial strings expose match -- access the exposed item use strict arg match -- get the patterns list self~init:super -- forward the initialization ::method process -- process a selection pipeStage expose match -- expose the pattern list use strict arg value -- access the data item if (value~pos(match) == 1) then do -- match string occur in first position? self~write(value) -- send it along end else do self~writeSecondary(value) -- send all mismatches down the other branch, if there end ::class notall public subclass pipeStage -- a string de-selector pipeStage ::method init -- process initial strings expose patterns -- access the exposed item patterns = arg(1,'a') -- get the patterns list self~init:super -- forward the initialization ::method process -- process a selection pipeStage expose patterns -- expose the pattern list use strict arg value -- access the data item do i = 1 to patterns~size -- loop through all the patterns -- this pattern in the data? if (value~pos(patterns[i]) <> 0) then do self~writeSecondary(value) -- send it along the secondary...don't want this one return -- stop the loop end end self~write(value) -- none found, this is main branch ::class stemcollector subclass pipeStage public -- collect items in a stem ::method init -- initialize a collector expose stem. -- expose target stem use strict arg stem. -- get the stem variable target self~init:super -- forward the initialization ::method begin expose stem. stem.~empty -- empty this out stem.0 = 0 -- start with zero items ::method process -- process a stem pipeStage item expose stem. -- expose the stem use strict arg value -- get the data item stem.0 = stem.0 + 1 -- stem the item count stem.[stem.0] = value -- save the value self~write(value) -- there may be following stages ::class arraycollector subclass pipeStage public -- collect items in an array ::method init -- initialize a collector expose array -- expose target stem use strict arg array -- get the stem variable target self~init:super -- forward the initialization ::method begin expose array array~empty ::method process -- process a stem pipeStage item expose array use strict arg value -- get the data item array~append(value) -- append the item self~write(value) -- there may be following stages ::class streamCollector subclass pipeStage public -- collect items in a stream ::method init -- initialize a collector expose stream -- expose target stream use strict arg stream -- get the stem variable target if \stream~isA(.outputStream) then do -- if not already a stream object, make it one stream = .stream~new(stream) end self~init:super -- forward the initialization ::method process -- process a stem pipeStage item expose stream use strict arg value -- get the data item stream~lineout(value) -- append the item self~write(value) -- there may be following stages ::class between subclass pipeStage public -- write only records from first trigger record -- up to a matching record ::method init expose startString endString use strict arg startString, endString self~init:super -- forward the initialization ::method begin expose started finished started = .false -- not processing any lines yet finished = .false forward class(super) ::method process expose startString endString started finished use strict arg value if \started then do -- not turned on yet? see if we've hit the trigger if value~pos(startString) > 0 then do started = .true self~write(value) -- pass along end else do self~writeSecondary(value) -- non-selected lines go to the secondary bucket end return end if \finished then do -- still processing? if value~pos(endString) > 0 then do -- check for the end position finished = .true end self~write(value) -- pass along end else do self~writeSecondary(value) -- non-selected lines go to the secondary bucket end ::class after subclass pipeStage public -- write only records from first trigger record ::method init expose startString use strict arg startString self~init:super -- forward the initialization ::method begin expose started started = .false forward class(super) ::method process expose startString started use strict arg value if \started then do -- not turned on yet? see if we've hit the trigger if value~pos(startString) = 0 then do self~writeSecondary(value) -- pass along the secondary stream return end started = .true end self~write(value) -- pass along ::class before subclass pipeStage public -- write only records before first trigger record ::method init expose endString use strict arg endString self~init:super -- forward the initialization ::method begin expose finished finished = .false forward class(super) ::method process expose endString finished use strict arg value if \finished then do -- still processing? if value~pos(endString) > 0 then do -- check for the end position finished = .true end self~write(value) -- pass along end else do self~writeSecondary(value) -- non-selected lines go to the secondary bucket end ::class buffer subclass bufferingStage public -- write only records before first trigger record ::method init expose count delimiter use strict arg count = 1, delimiter = ("") self~init:super -- forward the initialization ::method eof expose count delimiter buffer = self~items -- get the buffered records loop i = 1 to count -- now write copies of the set to the stream if i > 1 then do self~write(delimiter) -- put a delimiter between the sets end loop j = 1 to buffer~items -- and send along the buffered lines self~write(buffer[i]) end end forward class(super) -- and send the done message along ::class lineCount subclass pipeStage public -- count number of records passed through the pipeStage ::method begin expose counter counter = 0 forward class(super) ::method process expose counter use strict arg value counter += 1 -- just bump the counter on each record ::method eof expose counter self~write(counter); -- write out the counter message forward class(super) -- and send the done message along ::class charCount subclass pipeStage public -- count number of characters passed through the pipeStage ::method begin expose counter counter = 0 forward class(super) ::method process expose counter use strict arg value counter += value~length -- just bump the counter for the length of each record ::method eof expose counter self~write(counter); -- write out the counter message forward class(super) -- and send the done message along ::class wordCount subclass pipeStage public -- count number of characters passed through the pipeStage ::method begin expose counter counter = 0 forward class(super) -- forward the initialization ::method process expose counter use strict arg value counter += value~words -- just bump the counter for the number of words ::method eof expose counter self~write(counter); -- write out the counter message forward class(super) -- and send the done message along /** * A simple splitter sample that splits the stream based on a pivot value. * strings that compare < the pivot value are routed to pipeStage 1. All other * strings are routed to pipeStage 2 */ ::class pivot subclass pipeStage public ::method init expose pivotvalue self~init:super -- forward the initialization -- we did the initialization first, as we're about to override the pipeStages -- store the pipeStage value and hook up the two output streams use strict arg pivotvalue, self~next, self~secondary ::method process -- process the split expose pivotvalue use strict arg value if value < pivotvalue then do -- simple split test self~write(value) end else do self~writeSecondary(value) end /** * a base class for pipeStages that split the processing stream into two or more * pipeStages. The default behavior is to broadcast each line down all of the branches. * To customize, override process() and route the transformed lines down the * appropriate branch(es) using result with a target index specified. If you wish * to use the default broadcast behavior, just call self~process:super(newValue) to * perform the broadcast. */ ::class splitter subclass pipeStage public ::method init expose stages stages = arg(1, 'A') -- just save the arguments as an array self~init:super -- forward the initialization ::method append -- override for the single append version expose stages use strict arg follower do stage over stages -- append the follower to each of the filter chains stage~append(follower) end ::method insert -- this doesn't make sense for a fan out raise syntax 93.963 -- Can't do this, so raise an unsupported error ::method write -- broadcast a result to a particular filter expose stages use strict arg which, value -- which is the fiter index, value is the result stages[which]~process(value); -- have the filter handle this ::method begin -- broadcast a start message down all of the branches expose stages do stage over stages stage~begin end ::method eof -- broadcast a done message down all of the branches expose stages do stage over stages stage~eof end ::method process -- process the stage stream expose stages use strict arg value do stage over stages -- send this down all of the branches stage~process(value) end ::class commandSupplier public -- a supplier to feed the results of a command into a pipeline ::method '[]' class -- create a pipeStage instance with arguments forward to (self) message('NEW') -- just forward this as a new message ::method init expose command use strict arg command ::method supplier expose command output = .array~new -- capture in an array address command command with output using (output) error using (output) return output~supplier -- supplier from the array and return that ::class stemSupplier public ::method '[]' class -- create a pipeStage instance with arguments forward to (self) message('NEW') -- just forward this as a new message ::method init expose stem. use strict arg stem. ::method supplier -- create a supplier object using stem.0 conventions expose stem. output = .array~new(stem.0) loop i = 1 to stem.0 -- copy the stem elements into an array output~append(stem.i) end return output~supplier -- get a supplier from the array copy ::requires "rxregexp.cls"