rexx-things/samples/oorexx/pipe.cls
2025-03-12 20:50:48 +00:00

1079 lines
42 KiB
OpenEdge ABL
Executable File

/*----------------------------------------------------------------------------*/
/* */
/* Copyright (c) 1995, 2004 IBM Corporation. All rights reserved. */
/* Copyright (c) 2005-2022 Rexx Language Association. All rights reserved. */
/* */
/* This program and the accompanying materials are made available under */
/* the terms of the Common Public License v1.0 which accompanies this */
/* distribution. A copy is also available at the following address: */
/* https://www.oorexx.org/license.html */
/* */
/* Redistribution and use in source and binary forms, with or */
/* without modification, are permitted provided that the following */
/* conditions are met: */
/* */
/* Redistributions of source code must retain the above copyright */
/* notice, this list of conditions and the following disclaimer. */
/* Redistributions in binary form must reproduce the above copyright */
/* notice, this list of conditions and the following disclaimer in */
/* the documentation and/or other materials provided with the distribution. */
/* */
/* Neither the name of Rexx Language Association nor the names */
/* of its contributors may be used to endorse or promote products */
/* derived from this software without specific prior written permission. */
/* */
/* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS */
/* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT */
/* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS */
/* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT */
/* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */
/* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED */
/* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, */
/* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY */
/* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */
/* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS */
/* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
/* */
/*----------------------------------------------------------------------------*/
/******************************************************************************/
/* pipe.rex Open Object Rexx Samples */
/* */
/* A pipeline implementation */
/* */
/* -------------------------------------------------------------------------- */
/* */
/* Description: */
/* This program demonstrates the use of ::class and ::method directives to */
/* create a simple implementation of a CMS-like pipeline function. */
/******************************************************************************/
/**
* Base pipeStage class. Most sub classes need only override the process() method to
* implement a pipeStage. The transformed results are passed down the pipeStage chain
* by calling the write method.
*/
::class pipeStage public -- base pipeStage class
::method init
expose next secondary
next = .nil
secondary = .nil -- all pipeStages have a secondary output potential
::method '|' class -- concatenate an instance of a pipeStage with following pipeStage
use strict arg follower
me = self~new -- create a new pipeStage instance
return me|follower -- perform the hook up
::method '>' class -- concatenate an instance of a pipeStage with following pipeStage
use strict arg follower
me = self~new -- create a new pipeStage instance
return me>follower -- perform the hook up
::method '>>' class -- concatenate an instance of a pipeStage to the secondary stream with following pipeStage
use strict arg follower
me = self~new -- create a new pipeStage instance
return me>>follower -- perform the hook up
::method '|' -- concatente two pipe stream
use strict arg follower
follower = follower~new -- make sure this is an instance
self~append(follower) -- do the chain append logic
return self -- we're our own return value
::method '>' -- synonym for '|' usually reserved for branchout connections
use strict arg follower
follower = follower~new -- make sure this is an instance
self~append(follower) -- do the chain append logic
return self -- we're our own return value
::method '>>' -- attach a pipe stage to the secondary socket
use strict arg follower
follower = follower~new -- make sure this is an instance
self~appendSecondary(follower) -- do the chain append logic. but to the secondary this time
return self -- we're our own return value
::method append -- append a pipeStage to the entire chain
expose next
use strict arg follower
if .nil == next then do -- if we're the end already, just update the next
next = follower
end
else do
next~append(follower) -- have our successor append it.
end
::method appendSecondary -- append a stage to the secondary output of entire chain
expose next secondary
use strict arg follower
if .nil == secondary then do -- if we're the end already, just update the next
secondary = follower -- append this to the secondary port.
end
else do
secondary~appendSecondary(follower) -- have our successor append it.
end
::method insert -- insert a pipeStage after this one, but before the next
expose next
user strict arg newpipeStage
newpipeStage~next = next -- just hook into the chain
next = newpipeStage
::method '[]' class -- create a pipeStage instance with arguments
forward to (self) message('NEW') -- just forward this as a new message
::method go -- execute using a provided object
expose source -- get the source supplier
use strict arg source -- set to the supplied object
self~begin -- have the stages initize any state data
engine = source~supplier -- get a data supplier
do while engine~available -- while more data
self~process(engine~item) -- pump this down the pipe
engine~next -- get the next data item
end
self~eof -- signal that processing is finished
::method secondary attribute -- a potential secondary attribute
::method next attribute -- next stage of the pipeStage
::method source attribute -- source of the initial data
-- we implement a new method on the instance
-- so that we can issue a new method during the pipeline
-- chaining process. this just returns the instance unchanged
::method new
return self -- just return ourself
::method process -- default data processing
use strict arg value -- get the data item
self~write(value) -- send this down the line
::method write -- handle the result from a process method
expose next
use strict arg data
if .nil <> next then do
next~process(data) -- only forward if we have a successor
end
::method writeSecondary -- handle a secondary output result from a process method
expose secondary
use strict arg data
if .nil <> secondary then do
secondary~process(data) -- only forward if we have a successor
end
::method processSecondary -- handle a secondary output result from a process method
forward message('PROCESS') -- this by default is a merge operation
::method begin -- process pipe stage initialization
expose next secondary
if .nil <> next then do
next~begin -- only forward if we have a successor
end
if .nil <> secondary then do -- and do the same with the seconcary stream
secondary~begin
end
::method eof -- process "end-of-pipe" condition
expose next secondary
if .nil <> next then do
next~eof -- only forward if we have a successor
end
if .nil <> secondary then do
secondary~eof -- only forward if we have a successor
end
::method secondaryEof -- process "end-of-pipe" condition
-- we just ignore this one, and rely on the secondary
::method secondaryBegin -- process startup condition
-- we just ignore this one, and rely on the secondary
::method secondaryConnector -- retrieve a secondary connector for a pipeStage
return new .SecondaryConnector(self)
::class SecondaryConnector subclass pipeStage
::method init
expose pipeStage
use strict arg pipeStage -- this just hooks up
self~init:super -- forward the initialization
::method process -- processing operations connect with pipeStage secondaries
expose pipeStage
forward to(pipeStage) message('processSecondary')
::method begin -- processing operations connect with pipeStage secondaries
expose pipeStage
forward to(pipeStage) message('secondaryBegin')
::method eof -- processing operations connect with pipeStage secondaries
expose pipeStage
forward to(pipeStage) message('secondaryEof')
-- base class for pipe stages that need to accumulate data with actual processing
-- deferred until the eof event is received. Sorting is a good example of this type of
-- stage
::class bufferingStage public subclass pipeStage
::method begin -- sorter initialization method
expose items -- list of sorted items
items = .array~new -- create a new list
forward class(super) -- forward the initialization
::attribute items private -- access to the accumulated items
::method process -- process sorter piped data item
expose items -- access internal state data
use strict arg value -- access the passed value
items~append(value) -- append the value to the accumulator array
::method eof -- end of processing clean up
expose items
items = .nil
forward class(super) -- propagate the event
::method writeItems private -- send along the items
expose items
do i = 1 to items~items -- copy all sorted items to the primary stream
self~write(items[i])
end
::class sort public subclass bufferingStage -- sort piped data
::method eof -- process the "end-of-pipe"
self~items~sort -- sort the accumulated items
self~writeItems -- now release the Kraken
forward class(super) -- make sure we propagate the done message
::class sortWith public subclass pipeStage -- sort piped data
::method init
expose comparator
use strict arg comparator
::method eof -- process the "end-of-pipe"
expose comparator
self~items~sortWith(comparator) -- sort the accumulated items
self~writeItems -- now release the Kraken
forward class(super) -- make sure we propagate the done message
::class reverse public subclass pipeStage -- a string reversal pipeStage
::method process -- pipeStage processing item
use strict arg value -- get the data item
self~write(value~reverse) -- send it along in reversed form
::class split public subclass pipeStage -- split the record at a give delimiter
::method init
expose delimiter
use strict arg delimiter
forward class(super) -- make sure we propagate the done message
::method process -- pipeStage processing item
expose delimiter
use strict arg value -- get the data item
loop line over value~makeArray(delimiter) -- split the record into smaller chunks
self~write(line) -- send each piece down the pipe
end
::class strip public subclass pipeStage -- strip leading or trailing blanks
::method init
expose option chars
use strict arg option = 'B', chars = '2009'x -- default is blanks and horizontal tabs
forward class(super) -- make sure we propagate the done message
::method process
expose option chars
use strict arg value
self~write(value~strip(option, chars)) -- strip and send along
::class upper public subclass pipeStage -- a uppercasing pipeStage
::method process -- pipeStage processing item
use strict arg value -- get the data item
self~write(value~upper) -- send it along in upper form
::class lower public subclass pipeStage -- a lowercasing pipeStage
::method process -- pipeStage processing item
use strict arg value -- get the data item
self~write(value~lower) -- send it along in lower form
::class changestr public subclass pipeStage -- a string replacement pipeStage
::method init
expose old new count
use strict arg old, new, count = 999999999 -- old and new are required, default count is max value
self~init:super -- forward the initialization
::method process -- pipeStage processing item
expose old new count
use strict arg value -- get the data item
self~write(value~changestr(old, new, count)) -- send it along in altered form
::class delstr public subclass pipeStage -- a string deletion pipeStage
::method init
expose offset length
use strict arg offset, length -- both are required.
self~init:super -- forward the initialization
::method process -- pipeStage processing item
expose offset length
use strict arg value -- get the data item
self~write(value~delstr(offset, length)) -- send it along in altered form
::class left public subclass pipeStage -- a splitter pipeStage
::method init
expose length
use strict arg length -- the length is the left part
self~init:super -- forward the initialization
::method process -- pipeStage processing item
expose offset length
use strict arg value -- get the data item
self~write(value~left(length)) -- send the left portion along the primary stream
self~writeSecondary(value~substr(length + 1)) -- the secondary gets the remainder portion
::class right public subclass pipeStage -- a splitter pipeStage
::method init
expose length
use strict arg length -- the length is the left part
self~init:super -- forward the initialization
::method process -- pipeStage processing item
expose offset length
use strict arg value -- get the data item
self~write(value~substr(length + 1)) -- the remainder portion goes down main pipe
self~writeSecondary(value~left(length)) -- send the left portion along the secondary stream
::class insert public subclass pipeStage -- insert a string into each line
::method init
expose insert offset
use strict arg insert, offset -- we need an offset and an insertion string
self~init:super -- forward the initialization
::method process -- pipeStage processing item
expose insert offset
use strict arg value -- get the data item
self~write(value~insert(insert, offset)) -- send the left portion along the primary stream
::class overlay public subclass pipeStage -- insert a string into each line
::method init
expose overlay offset
use strict arg overlay, offset -- we need an offset and an insertion string
self~init:super -- forward the initialization
::method process -- pipeStage processing item
expose insert offset
use strict arg value -- get the data item
self~write(value~overlay(overlay, offset)) -- send the left portion along the primary stream
::class dropnull public subclass pipeStage -- drop null records
::method process -- pipeStage processing item
use strict arg value -- get the data item
if value \== '' then do -- forward along non-null records
self~write(value)
end
::class dropFirst public subclass pipeStage -- drop the first n records
::method init
expose count
use strict arg count
self~init:super -- forward the initialization
::method begin
expose counter
counter = 0
forward class(super)
::method process
expose count counter
use strict arg value
counter += 1 -- if we've dropped our quota, start forwarding
if counter > count then do
self~write(value)
end
else do
self~writeSecondary(value) -- non-selected records go down the secondary stream
end
::class takeLast public subclass bufferingStage -- take the last n records
::method init
expose count
use strict arg count
self~init:super -- forward the initialization
::method eof
expose count
array = self~items
if array~items < count then do -- didn't even receive that many items?
loop line over array
self~write(line) -- send everything down the main pipe
end
end
else do
first = array~items - count -- this is the count of discarded items
loop i = 1 to first
self~writeSecondary(array[i]) -- the discarded ones go to the secondary pipe
end
loop i = first + 1 to array~items
self~write(array[i]) -- the remainder ones go down the main pipe
end
end
forward class(super) -- finish the eof process
::class takeFirst public subclass pipeStage -- take the first n records
::method init
expose count
use strict arg count
self~init:super -- forward the initialization
::method begin
expose counter
counter = 0
forward class(super)
::method process
expose count counter
use strict arg value
counter += 1 -- if we've dropped our quota, stop forwarding
if counter > count then do
self~writeSecondary(value)
end
else do
self~write(value) -- still in the first bunch, send to main pipe
end
::class dropLast public subclass bufferingStage -- drop the last n records
::method init
expose count
use strict arg count
self~init:super -- forward the initialization
::method eof
expose count
array = self~items -- get the buffered records
if array~items < count then do -- didn't even receive that many items?
loop line over array
self~writeSecondary(line) -- send everything down the secondary pipe
end
end
else do
first = array~items - count -- this is the count of selected items
loop i = 1 to first
self~write(array[i]) -- the selected go to the main pipe
end
loop i = first + 1 to array~items
self~writeSecondary(array[i]) -- the discarded ones go down the secondary pipe
end
end
forward class(super) -- finish the eof process
::class x2c public subclass pipeStage -- translate records to hex characters
::method process -- pipeStage processing item
use strict arg value -- get the data item
self~write(value~x2c)
::class bitbucket public subclass pipeStage -- just consume the records
::method process -- pipeStage processing item
nop -- do nothing with the data
::class fanout public subclass pipeStage -- write records to both output streams
::method process -- pipeStage processing item
use strict arg value -- get the data item
self~write(value) -- and write to both branches of the pipe
self~writeSecondary(value)
::class merge public subclass pipeStage -- merge the results from primary and secondary streams
::method begin
expose mainDone secondaryEof -- need pair of EOF conditions
mainDone = .false
secondaryEof = .false
forward class(super)
::method eof
expose mainDone secondaryEof -- need interlock flags
if secondaryEof then do -- the other input hit EOF already?
forward class(super) -- handle as normal
end
mainDone = .true -- mark this branch as finished.
::method secondaryBegin -- begin on the secondary input
forward message('BEGIN')
::method secondaryEof -- eof on the seconary input
expose mainDone secondaryEof -- need interlock flags
secondaryEof = .true -- mark ourselves finished
if mainDone then do -- if both branches finished, do normal end work
forward message('EOF')
end
::class fanin public subclass pipeStage -- process main stream, then secondary stream
::method begin
expose mainDone secondaryEof array -- need pair of EOF conditions
mainDone = .false
secondaryEof = .false
array = .array~new -- accumulator for secondary
forward class(super) -- forward the initialization
::method processSecondary -- handle the secondary input
expose array
use strict arg value
array~append(value) -- just append to the end of the array
::method eof
expose mainDone secondaryEof array -- need interlock flags
if secondaryEof then do -- the other input hit EOF already?
loop i = 1 to array~items -- need to write out the deferred items
self~write(array[i])
end
forward class(super) -- handle as normal
end
mainDone = .true -- mark this branch as finished.
::method secondaryEof -- eof on the seconary input
expose mainDone secondaryEof -- need interlock flags
secondaryEof = .true -- mark ourselves finished
if mainDone then do -- if both branches finished, do normal done.
forward message('EOF')
end
::class duplicate public subclass pipeStage -- duplicate each record N times
::method init
expose copies
use strict arg copies = 1 -- by default, we do one duplicate
self~init:super -- forward the initialization
::method process -- pipeStage processing item
expose copies
use strict arg value -- get the data item
loop copies + 1 -- write this out with the duplicate count
self~write(value)
end
::class nodup public subclass pipeStage -- remove any duplicate items
::method begin
expose alreadySeen
alreadySeen = .set~new
forward class(super)
::method process -- pipeStage processing item
expose alreadySeen
use strict arg value -- get the data item
if !alreadySeen~hasItem(value) then do -- only process items we have never seen before
alreadySeen~put(value) -- remember we've seen this one an pass it along
self~write(value)
end
::method eof -- handle pipe completion
expose alreadySeen
alreadySeen = .nil
forward class(super)
-- select all records that completely matches a regex pattern
::class regexMatch subclass pipeStage public
::method init
expose pattern
usg strict arg pattern
if \pattern~isA(.RegularExpression) then do -- the pattern can be provided as a RegularExpression instance or a string
p = .RegularExpression~new(pattern)
pattern = p
end
::attribute pattern private
::method process
expose pattern
use strict arg value
if pattern~match(value) then do -- if an exact match, send it along
self~write(value)
end
else do
self~writeSecondary(value) -- handle on the secondary pipe if not selected
end
-- select on whether a record contains a regular expression anywhere within the string
::class regexContains subclass regexMatch public
::method process
use strict arg value
if self~pattern~position(value) \= 0 then do -- if found in the string, selected
self~write(value)
end
else do
self~writeSecondary(value) -- handle on the secondary pipe if not selected
end
::class displayer subclass pipeStage public
::method process -- process a data item
use strict arg value -- get the data value
say value -- display this item
forward class(super)
::class any public subclass pipeStage -- a string selector pipeStage
::method init -- process initial strings
expose patterns -- access the exposed item
patterns = arg(1,'a') -- get the patterns list
self~init:super -- forward the initialization
::method process -- process a selection pipeStage
expose patterns -- expose the pattern list
use strict arg value -- access the data item
do i = 1 to patterns~size -- loop through all the patterns
-- this pattern in the data?
if (value~pos(patterns[i]) <> 0) then do
self~write(value) -- send it along
return -- stop the loop
end
end
self~writeSecondary(value) -- send all mismatches down the other branch, if there
::class all public subclass pipeStage -- a string selector pipeStage
::method init -- process initial strings
expose patterns -- access the exposed item
patterns = arg(1,'a') -- get the patterns list
self~init:super -- forward the initialization
::method process -- process a selection pipeStage
expose patterns -- expose the pattern list
use strict arg value -- access the data item
do i = 1 to patterns~size -- loop through all the patterns
-- this pattern in the data?
if (value~pos(patterns[i]) == 0) then do
self~writeSecondary(value) -- send it along
return -- stop the loop
end
end
self~write(value) -- all patterns were found, so this goes to the main branch
::class startsWith public subclass pipeStage -- a string selector pipeStage
::method init -- process initial strings
expose match -- access the exposed item
use strict arg match -- get the patterns list
self~init:super -- forward the initialization
::method process -- process a selection pipeStage
expose match -- expose the pattern list
use strict arg value -- access the data item
if (value~pos(match) == 1) then do -- match string occur in first position?
self~write(value) -- send it along
end
else do
self~writeSecondary(value) -- send all mismatches down the other branch, if there
end
::class notall public subclass pipeStage -- a string de-selector pipeStage
::method init -- process initial strings
expose patterns -- access the exposed item
patterns = arg(1,'a') -- get the patterns list
self~init:super -- forward the initialization
::method process -- process a selection pipeStage
expose patterns -- expose the pattern list
use strict arg value -- access the data item
do i = 1 to patterns~size -- loop through all the patterns
-- this pattern in the data?
if (value~pos(patterns[i]) <> 0) then do
self~writeSecondary(value) -- send it along the secondary...don't want this one
return -- stop the loop
end
end
self~write(value) -- none found, this is main branch
::class stemcollector subclass pipeStage public -- collect items in a stem
::method init -- initialize a collector
expose stem. -- expose target stem
use strict arg stem. -- get the stem variable target
self~init:super -- forward the initialization
::method begin
expose stem.
stem.~empty -- empty this out
stem.0 = 0 -- start with zero items
::method process -- process a stem pipeStage item
expose stem. -- expose the stem
use strict arg value -- get the data item
stem.0 = stem.0 + 1 -- stem the item count
stem.[stem.0] = value -- save the value
self~write(value) -- there may be following stages
::class arraycollector subclass pipeStage public -- collect items in an array
::method init -- initialize a collector
expose array -- expose target stem
use strict arg array -- get the stem variable target
self~init:super -- forward the initialization
::method begin
expose array
array~empty
::method process -- process a stem pipeStage item
expose array
use strict arg value -- get the data item
array~append(value) -- append the item
self~write(value) -- there may be following stages
::class streamCollector subclass pipeStage public -- collect items in a stream
::method init -- initialize a collector
expose stream -- expose target stream
use strict arg stream -- get the stem variable target
if \stream~isA(.outputStream) then do -- if not already a stream object, make it one
stream = .stream~new(stream)
end
self~init:super -- forward the initialization
::method process -- process a stem pipeStage item
expose stream
use strict arg value -- get the data item
stream~lineout(value) -- append the item
self~write(value) -- there may be following stages
::class between subclass pipeStage public -- write only records from first trigger record
-- up to a matching record
::method init
expose startString endString
use strict arg startString, endString
self~init:super -- forward the initialization
::method begin
expose started finished
started = .false -- not processing any lines yet
finished = .false
forward class(super)
::method process
expose startString endString started finished
use strict arg value
if \started then do -- not turned on yet? see if we've hit the trigger
if value~pos(startString) > 0 then do
started = .true
self~write(value) -- pass along
end
else do
self~writeSecondary(value) -- non-selected lines go to the secondary bucket
end
return
end
if \finished then do -- still processing?
if value~pos(endString) > 0 then do -- check for the end position
finished = .true
end
self~write(value) -- pass along
end
else do
self~writeSecondary(value) -- non-selected lines go to the secondary bucket
end
::class after subclass pipeStage public -- write only records from first trigger record
::method init
expose startString
use strict arg startString
self~init:super -- forward the initialization
::method begin
expose started
started = .false
forward class(super)
::method process
expose startString started
use strict arg value
if \started then do -- not turned on yet? see if we've hit the trigger
if value~pos(startString) = 0 then do
self~writeSecondary(value) -- pass along the secondary stream
return
end
started = .true
end
self~write(value) -- pass along
::class before subclass pipeStage public -- write only records before first trigger record
::method init
expose endString
use strict arg endString
self~init:super -- forward the initialization
::method begin
expose finished
finished = .false
forward class(super)
::method process
expose endString finished
use strict arg value
if \finished then do -- still processing?
if value~pos(endString) > 0 then do -- check for the end position
finished = .true
end
self~write(value) -- pass along
end
else do
self~writeSecondary(value) -- non-selected lines go to the secondary bucket
end
::class buffer subclass bufferingStage public -- write only records before first trigger record
::method init
expose count delimiter
use strict arg count = 1, delimiter = ("")
self~init:super -- forward the initialization
::method eof
expose count delimiter
buffer = self~items -- get the buffered records
loop i = 1 to count -- now write copies of the set to the stream
if i > 1 then do
self~write(delimiter) -- put a delimiter between the sets
end
loop j = 1 to buffer~items -- and send along the buffered lines
self~write(buffer[i])
end
end
forward class(super) -- and send the done message along
::class lineCount subclass pipeStage public -- count number of records passed through the pipeStage
::method begin
expose counter
counter = 0
forward class(super)
::method process
expose counter
use strict arg value
counter += 1 -- just bump the counter on each record
::method eof
expose counter
self~write(counter); -- write out the counter message
forward class(super) -- and send the done message along
::class charCount subclass pipeStage public -- count number of characters passed through the pipeStage
::method begin
expose counter
counter = 0
forward class(super)
::method process
expose counter
use strict arg value
counter += value~length -- just bump the counter for the length of each record
::method eof
expose counter
self~write(counter); -- write out the counter message
forward class(super) -- and send the done message along
::class wordCount subclass pipeStage public -- count number of characters passed through the pipeStage
::method begin
expose counter
counter = 0
forward class(super) -- forward the initialization
::method process
expose counter
use strict arg value
counter += value~words -- just bump the counter for the number of words
::method eof
expose counter
self~write(counter); -- write out the counter message
forward class(super) -- and send the done message along
/**
* A simple splitter sample that splits the stream based on a pivot value.
* strings that compare < the pivot value are routed to pipeStage 1. All other
* strings are routed to pipeStage 2
*/
::class pivot subclass pipeStage public
::method init
expose pivotvalue
self~init:super -- forward the initialization
-- we did the initialization first, as we're about to override the pipeStages
-- store the pipeStage value and hook up the two output streams
use strict arg pivotvalue, self~next, self~secondary
::method process -- process the split
expose pivotvalue
use strict arg value
if value < pivotvalue then do -- simple split test
self~write(value)
end
else do
self~writeSecondary(value)
end
/**
* a base class for pipeStages that split the processing stream into two or more
* pipeStages. The default behavior is to broadcast each line down all of the branches.
* To customize, override process() and route the transformed lines down the
* appropriate branch(es) using result with a target index specified. If you wish
* to use the default broadcast behavior, just call self~process:super(newValue) to
* perform the broadcast.
*/
::class splitter subclass pipeStage public
::method init
expose stages
stages = arg(1, 'A') -- just save the arguments as an array
self~init:super -- forward the initialization
::method append -- override for the single append version
expose stages
use strict arg follower
do stage over stages -- append the follower to each of the filter chains
stage~append(follower)
end
::method insert -- this doesn't make sense for a fan out
raise syntax 93.963 -- Can't do this, so raise an unsupported error
::method write -- broadcast a result to a particular filter
expose stages
use strict arg which, value -- which is the fiter index, value is the result
stages[which]~process(value); -- have the filter handle this
::method begin -- broadcast a start message down all of the branches
expose stages
do stage over stages
stage~begin
end
::method eof -- broadcast a done message down all of the branches
expose stages
do stage over stages
stage~eof
end
::method process -- process the stage stream
expose stages
use strict arg value
do stage over stages -- send this down all of the branches
stage~process(value)
end
::class commandSupplier public -- a supplier to feed the results of a command into a pipeline
::method '[]' class -- create a pipeStage instance with arguments
forward to (self) message('NEW') -- just forward this as a new message
::method init
expose command
use strict arg command
::method supplier
expose command
output = .array~new -- capture in an array
address command command with output using (output) error using (output)
return output~supplier -- supplier from the array and return that
::class stemSupplier public
::method '[]' class -- create a pipeStage instance with arguments
forward to (self) message('NEW') -- just forward this as a new message
::method init
expose stem.
use strict arg stem.
::method supplier -- create a supplier object using stem.0 conventions
expose stem.
output = .array~new(stem.0)
loop i = 1 to stem.0 -- copy the stem elements into an array
output~append(stem.i)
end
return output~supplier -- get a supplier from the array copy
::requires "rxregexp.cls"