requires: {}. provides: {#Stream. #ReadStream. #WriteStream. #PositionableStream. #BlockStream. #DummyStream. #LineNumberedStream. #FilterStream. #CollectStream. #EchoStream. #WrapperStream}. prototypes addPrototype: #Stream derivedFrom: {Cloneable}. "The shared protocol of all Stream objects, also providing a common instantiation protocol." s@(Stream traits) newOn: c "Create a new stream of the same kind targetted on the given object." [| new | new: s clone. new on: c. new ]. s@(Stream traits) on: _ "(Re-)target the stream to some object. This modifies it in-place. Overriding this will also customize newOn: for most uses." [overrideThis]. s@(Stream traits) flush "Do nothing by default." [s]. s@(Stream traits) atEnd "Answer whether the end of the Stream has been reached." [overrideThis]. s@(Stream traits) elementType "Returns a prototype for an appropriate element type for the Stream. This is the most generic type." [Root]. s@(Stream traits) arrayType "Returns a prototype for an appropriate array type for the Stream. This is the most generic type." [Array]. Stream addPrototype: #Condition derivedFrom: {Condition}. "Streaming-specific errors." Stream Condition addSlot: #stream. c@(Stream Condition traits) newStream: s [| newC | newC: c new. newC stream: s. newC ]. Stream addPrototype: #Exhaustion derivedFrom: {Stream Condition. Error}. "Stream or resource exhaustion, ie #atEnd Conditions." e@(Stream Exhaustion traits) describe [ DebugConsole ; 'The stream has reached its end, unhandled.' ]. s@(Stream traits) exhausted [ (s Exhaustion newStream: s) signal ]. s@(Stream traits) endCheck [ s atEnd ifTrue: [s exhausted] ]. prototypes addPrototype: #ReadStream derivedFrom: {Stream}. "Streams that read from some source." s@(ReadStream traits) newOn: source [ s clone on: source ]. s@(ReadStream traits) reader "A ReadStream is already its own resource." [s]. s@(ReadStream traits) next "Obtain and answer the next element from the Stream." [overrideThis]. s@(ReadStream traits) next: n putInto: seq startingAt: start "Returns a partial copy if not all elements can be read into the other collection." [| obj | 0 below: n do: [| :index | (obj: s next) ifNil: [^ (seq copyFrom: 0 to: start + index)]. seq at: start + index put: obj]. n ]. s@(ReadStream traits) nextPutInto: seq startingAt: start [s next: seq size putInto: seq startingAt: start]. s@(ReadStream traits) nextPutInto: seq [s next: seq size putInto: seq startingAt: 0]. s@(ReadStream traits) next: n putInto: seq@(Sequence traits) "Places the next N elements into a Sequence at the starting indices." [s next: n putInto: seq startingAt: 0]. s@(ReadStream traits) next: n putInto: seq@(ExtensibleArray traits) "Places the next N elements at the end of a given ExtensibleArray." [ n timesRepeat: [seq addLast: s next]. n ]. s@(ReadStream traits) next: n "Answer the next N elements." [| arr | arr: (s arrayType newSize: n). s next: n putInto: arr startingAt: 0. arr ]. s@(ReadStream traits) next: n put: obj "Make the next N values the argument object." [ n timesRepeat: [s nextPut: obj]. obj ]. s@(ReadStream traits) upTo: obj "Answer all objects up to one equal to the argument." [| newES elem | newES: ExtensibleArray newEmpty. [s atEnd or: [(elem: s next) = obj]] whileFalse: [newES add: elem]. newES ]. s@(ReadStream traits) upToEnd "Supply all the elements up to the end of the stream." [| newES | newES: ExtensibleArray newEmpty. [s atEnd] whileFalse: [newES add: s next]. newES ]. s@(ReadStream traits) do: block "Call the block on all the elements in turn until the Stream is empty." [ [s atEnd] whileFalse: [block applyWith: s next]. s ]. prototypes addPrototype: #WriteStream derivedFrom: {Stream}. "Streams that write to some target." s@(WriteStream traits) writer "A WriteStream is already its own resource." [s]. s@(WriteStream traits) nextPut: _ "Place the given element on the Stream." [overrideThis]. s@(WriteStream traits) nextPutAll: c "Place the Collection's contents into the stream." [ c do: [| :obj | s nextPut: obj]. s ]. s@(WriteStream traits) nextPutAll: seq from: start to: end "Place a range of the Sequence's contents into the Stream." [ seq from: start to: end do: [| :each | s nextPut: each] ]. s@(WriteStream traits) next: n putAll: seq startingAt: start [ 0 below: n do: [| :index | s nextPut: (seq at: start + index)]. seq ]. s@(WriteStream traits) next: n putAll: seq [ s next: n putAll: seq startingAt: 0 ]. s@(WriteStream traits) ; c "Syntactic sugaring to make collection-insertion similar to concatenation." [ s nextPutAll: c ]. source@(Stream traits) >> sink "Write the contents from source to target one element at a time." [ [source atEnd] whileFalse: [sink nextPut: source next] ]. sink@(Stream traits) << source [source >> sink]. source@(Stream traits) copyTo: sink chunkSize: n "Write the contents from source to target in big blocks." "TODO: correct next:putInto: to return the actual number transferred, and set the position to that." [| buffer position total | buffer: (source elementType newSize: n). total: 0. [source atEnd] whileFalse: [position: n. source next: n putInto: buffer. total: total + position. sink nextPut: position from: buffer]. total ]. source@(Stream traits) copyTo: sink "Write the contents from source to target in big blocks." "TODO: implement this as >>/<< for ExternalStreams." [source copyTo: sink chunkSize: 4096]. prototypes addPrototype: #ReadWriteStream derivedFrom: {ReadStream. WriteStream}. s@(ReadWriteStream traits) iterator "A ReadWriteStream is already its own resource." [s]. prototypes addPrototype: #PeekableStream derivedFrom: {Stream}. "A mixin supporting protocols which rely on peek alone, but not full positionability." s@(PeekableStream traits) peek [overrideThis]. s@(PeekableStream traits) nextWhile: testBlock "Answer the next elements for which the test block is True. Note that this relies on a Collection iterator Stream." [| result | result: s collectionType newEmpty writer. [s atEnd ifTrue: [^ result contents]. (testBlock applyWith: s peek)] whileTrue: [result nextPut: s next]. result contents ]. s@(PeekableStream traits) nextUntil: testBlock "Answer the next elements for which the test block is not True." [ s nextWhile: [| :each | (testBlock applyWith: each) not] ]. s@(PeekableStream traits) nextDelimitedBy: separatorBlock "Answer the next elements between segments where the separator test is True." [ s skipWhile: separatorBlock. s nextUntil: separatorBlock ]. prototypes addPrototype: #PositionableStream derivedFrom: {PeekableStream}. "PositionableStreams have an index and iterate over some sequenced collection, but with a specific limit on the stream." PositionableStream addSlot: #position valued: 0. PositionableStream addSlot: #readLimit valued: 0. s@(PositionableStream traits) newOn: c "Override on: in derived objects in order to customize this." [ s clone on: c ]. s@(PositionableStream traits) newWith: c [ s clone with: c ]. s@(PositionableStream traits) newOn: c from: start to: end [ s clone on: c from: start to: end ]. s@(PositionableStream traits) on: _ "Reset the position." [s reset]. s@(PositionableStream traits) contents "Answer the contents of the target up to the readLimit." [overrideThis]. s@(PositionableStream traits) collectionType "Answer the default collection prototype to dump contents into." [String]. s@(PositionableStream traits) last [overrideThis]. s@(PositionableStream traits) next: n [ s next: n putInto: (s collectionType newSize: n) ]. s@(PositionableStream traits) nextMatchAll: c "Whether the next N objects in the Stream are in the other collection (which generally should be a Sequence, ie have linear order)." [| pos | pos: s position. c do: [| :each | s next = each ifFalse: [s position: pos. ^ False]]. True ]. s@(PositionableStream traits) upTo: obj "Answer all objects up to one equal to the argument." [| newS elem | newS: (s collectionType newSize: 100) writer. [s atEnd or: [(elem: s next) = obj]] whileFalse: [newS nextPut: elem]. newS contents ]. s@(PositionableStream traits) upToEnd [| newS obj | newS: (s collectionType newSize: 100) writer. [(obj: s next) isNil] whileFalse: [newS nextPut: obj]. newS contents ]. s@(PositionableStream traits) peek "Returns the results of next without advancing the stream." [| obj | s atEnd ifTrue: [^ Nil]. obj: s next. s position: s position - 1. obj ]. s@(PositionableStream traits) peek: n "Answer the next N results without advancing the stream." [| result origPosition | origPosition: s position. result: (s next: n). s position: origPosition. result ]. s@(PositionableStream traits) peekFor: obj "Returns whether the object is next in the stream. Advances if true." [| next | s atEnd ifTrue: [^ False]. next: s next. obj = next ifFalse: [s position: s position - 1. False] ]. s@(PositionableStream traits) peekForwardBy: offset "Answers the element the given number of positions after the current position." [overrideThis]. s@(PositionableStream traits) peekBackBy: offset "Answers the element the given number of positions before the current position." [s peekForwardBy: offset negated]. s@(PositionableStream traits) upToAnyOf: c "Answer all objects up to the first occurrence of something in the collection." [| start endMatch newC | start: s position. (s match: c) ifTrue: [endMatch: s position. s position: start. newC: (s next: endMatch - start - c size). s position: endMatch. newC] ifFalse: [s position: start. s upToEnd] ]. s@(PositionableStream traits) upToAll: pattern@(Collection traits) "Answer all the elements up to the occurrence of the pattern in the Stream, but not including anything from the pattern." [| result patternSize start end checkStart window | patternSize: pattern size. start: (checkStart: s position). result: (s collectionType newSize: 200) writer. result position: 0. [s readLimit - s position < patternSize "Not enough room left to match." ifTrue: [s position: start. ^ s upToEnd] ifFalse: [window: (s next: patternSize). window = pattern "The pattern has been found. Answer up to the pattern." ifTrue: [end: s position - patternSize. s position: start. ^ (s next: end - start)] "Increment the position past the last start." ifFalse: [s position: (start: start + 1)]]] loop. result contents ]. s@(PositionableStream traits) upToEnd "Answer all the elements up to the limit by a copy." [overrideThis]. s@(PositionableStream traits) through: obj "Answer the next elements up to and including the object given." "This default implementation should be overridden for efficiency." [(s upTo: obj) copyWith: obj]. s@(PositionableStream traits) throughAll: pattern [| result | result: (s collectionType newSize: 40) writer. result ; (s upToAll: pattern). s atEnd ifFalse: [result ; (s next: pattern size)]. result contents ]. s@(PositionableStream traits) atEnd [ s position >= s readLimit ]. s@(PositionableStream traits) reset [ s position: 0. s ]. s@(PositionableStream traits) resetContents [ s position: 0. s readLimit: 0. s ]. s@(PositionableStream traits) setToEnd "Position the Stream after the last readable element." [ s position: s readLimit. s ]. s@(PositionableStream traits) skip: n [ s position: s position + n. s ]. s@(PositionableStream traits) skipTo: obj [ [s atEnd] whileFalse: [s next = obj ifTrue: [^ True]]. False ]. s@(PositionableStream traits) setFrom: start to: end [ s position: start. s readLimit: end + 1. s ]. s@(PositionableStream traits) beginsWith: seq "Answer whether the Stream's next elements match the Sequence." [ s size >= seq size and: [| starters oldPosition | oldPosition: s position. starters: (s next: seq size). s position: oldPosition. starters = seq] ]. s@(PositionableStream traits) retract: n "Retract N elements." [ s skip: n negated. s size. s readLimit: (s readLimit - n). s ]. s@(PositionableStream traits) retract "Retract one element." [s retract: 1]. prototypes addPrototype: #PositionableReadStream derivedFrom: {PositionableStream. ReadStream}. prototypes addPrototype: #PositionableWriteStream derivedFrom: {PositionableStream. WriteStream}. prototypes addPrototype: #PositionableReadWriteStream derivedFrom: {PositionableStream. ReadStream. WriteStream}. prototypes addPrototype: #DummyStream derivedFrom: {ReadWriteStream}. "DummyStreams always return Nil's and can't be repositioned or written to, but pretend that they can, eating up input and providing no output." s@(Stream traits) newFrom: _@Nil [DummyStream]. _@Nil iterator [DummyStream]. _@Nil reader [DummyStream]. _@Nil writer [DummyStream]. _@(DummyStream traits) next []. _@(DummyStream traits) nextPut: _ []. _@(DummyStream traits) nextPutAll: _ []. _@(DummyStream traits) position [0]. _@(DummyStream traits) position: _ []. _@(DummyStream traits) position: _ []. Method traits addPrototype: #ReadStream derivedFrom: {ReadStream}. "Method ReadStreams take their next element from the recalculation of a code closure with no arguments. This is effectively a means to poll some condition or to generate a sequence." Method ReadStream addSlot: #block valued: []. "The calculation for generating the Stream elements." s@(Stream traits) newFrom: block@(Method traits) [block ReadStream newOn: block]. m@(Method traits) iterator "Mimics the Collection interface for making iterators that target them." [m reader]. m@(Method traits) reader "Mimics the Collection interface for making ReadStreams that target them." [m ReadStream newOn: m]. s@(Method ReadStream traits) on: block [ s block: block ]. s@(Method ReadStream traits) next [ s block do ]. s@(Method ReadStream traits) atEnd [False]. Method traits addPrototype: #WriteStream derivedFrom: {WriteStream}. "Method WriteStream applies each fed element into the block as its sole argument." Method WriteStream addSlot: #block valued: [| :_ |]. "The operation to be performed on each element." s@(Stream traits) newTo: block@(Method traits) [block WriteStream newOn: block]. m@(Method traits) writer "Mimics the Collection interface for making ReadStreams that target them." [m WriteStream newOn: m]. s@(Method WriteStream traits) on: block [ s block: block ]. s@(Method WriteStream traits) nextPut: obj [ s block applyWith: obj ]. s@(Method WriteStream traits) atEnd [False]. prototypes addPrototype: #FilterStream derivedFrom: {ReadStream}. "FilterStreams take source streams and apply a test block to each element, only returning or acting on elements that satisfy the test." "TODO: make a Write- or ReadWrite- variant?" FilterStream addSlot: #source valued: Stream clone. "The stream which gets filtered." FilterStream addSlot: #block valued: [| :_ | True]. "The default filter, which filters nothing." FilterStream addSlot: #nextValue valued: Nil. s@(Stream traits) select: block [| newS | newS: (FilterStream newOn: s). newS block: block. newS ]. s@(FilterStream traits) on: source [ s source: source. s ]. s@(FilterStream traits) atEnd [ s nextValue ifNil: [s source do: [| :each | (s block applyWith: each) ifTrue: [s nextValue: each. ^ False]]]. s nextValue isNil ]. s@(FilterStream traits) next [| result | result: s nextValue. "If there is no nextValue, check whether the source has dried up." (result isNil and: [s atEnd]) ifTrue: [s exhausted]. s source do: [| :each | (s block applyWith: each) ifTrue: [s nextValue: each. ^ result]]. s nextValue: Nil. result ]. prototypes addPrototype: #CollectStream derivedFrom: {ReadStream}. "CollectStreams take source streams and apply a block to each element, returning the block's result for each." CollectStream addSlot: #source valued: Stream clone. "The stream which gets filtered." CollectStream addSlot: #block valued: [| :x | x]. "The default action, which is an identity." s@(Stream traits) collect: block [| newS | newS: (CollectStream newOn: s). newS block: block. newS ]. s@(CollectStream traits) on: source [ s nextValue: Nil. s atEnd: False. s source: source. s ]. s@(CollectStream traits) next [ s block applyWith: s source next ]. s@(CollectStream traits) atEnd [ s source atEnd ]. prototypes addPrototype: #ConcatenatedReadStream derivedFrom: {PositionableReadStream}. "A Stream acting as the result of concatenating all the source streams' sources into a single source to be read." ConcatenatedReadStream addSlot: #sources valued: ExtensibleArray newEmpty. "The Streams which are concatenated." ConcatenatedReadStream addSlot: #currentSource. "The current Stream being read." ConcatenatedReadStream addSlot: #position valued: 0. "The overall position of the stream, taken by tabulating." s1@(Stream traits) ; s2@(Stream traits) [ ConcatenatedReadStream newOn: {s1. s2} ]. cs@(ConcatenatedReadStream traits) ; s@(Stream traits) [ cs sources: (cs sources addLast: s) ]. s@(ConcatenatedReadStream traits) on: sources [ s sources: (sources as: s sources). s currentSource: s sources first. s position: 0. s ]. s@(ConcatenatedReadStream traits) atEnd [ s currentSource = s sources last and: [s currentSource atEnd] ]. s@(ConcatenatedReadStream traits) next [ s currentSource atEnd ifTrue: [| nextSrcIdx | nextSrcIdx: (s indexOf: s currentSource) + 1. nextSrcIdx = s sources size ifTrue: [^ Nil]. s currentSource: (s sources at: nextSrcIdx)]. s currentSource next ]. Stream traits addPrototype: #EchoStream derivedFrom: {ReadWriteStream}. "An EchoStream wraps some original stream and duplicates any interaction, reading or writing, done on it to another stream. This relies on EchoStream having defined all of the stream interaction methods that the client relies upon." "Inspired by Henry Lieberman's 1986 paper to the first OOPSLA, titled: _Using Prototypical Objects to Implement Shared Behavior in Object Oriented Systems_ and archived at: http://lieber.www.media.mit.edu/people/lieber/Lieberary/OOP/Delegation/Delegation.html" Stream EchoStream addSlot: #original. "The stream to echo." Stream EchoStream addSlot: #dribble "valued: Console writer". "The target for the echo'ing operation, called dribble after the Lieberman / Lisp terminology." s@(Stream traits) echoTo: log "Creates and returns a new EchoStream from the first to the log Stream." [| echo | echo: (s EchoStream newOn: s). echo echoTo: log ]. s@(Stream traits) echo "Creates and returns a new EchoStream to the console." [s echoTo: Console writer]. e@(Stream EchoStream traits) newOn: s [ e clone on: s ]. e@(Stream EchoStream traits) on: s [ e original: s. e ]. e@(Stream EchoStream traits) echoTo: s "Chooses another Stream to dribble to, ensuring that echoTo: is not repeated." [ e dribble: s. e ]. e@(Stream EchoStream traits) next [ e dribble nextPut: e original next ]. e@(Stream EchoStream traits) next: n [ e dribble nextPutAll: (e original next: n) ]. e@(Stream EchoStream traits) nextPut: obj [ e dribble nextPut: (e original nextPut: obj) ]. e@(Stream EchoStream traits) nextPutAll: seq [ e dribble nextPutAll: (e original nextPutAll: seq) ]. PositionableStream traits addPrototype: #EchoStream derivedFrom: {Stream EchoStream}. e@(PositionableStream EchoStream traits) position: n [ e dribble position: (e original position: n) ]. Stream traits addPrototype: #WrapperStream derivedFrom: {ReadWriteStream}. "A WrapperStream wraps some stream and forwards operations to it where applicable. This is mainly to be used by deriving from it and overriding certain definitions for specific situations." Stream WrapperStream addSlot: #original valued: Stream clone. "The stream to be wrapped." w@(Stream WrapperStream traits) newOn: s [ w clone on: s ]. w@(Stream WrapperStream traits) on: s [ w original: s. w ]. w@(Stream WrapperStream traits) next [ w original next ]. w@(Stream WrapperStream traits) next: n [ w original next: n ]. w@(Stream WrapperStream traits) nextPut: obj [ w original nextPut: obj ]. w@(Stream WrapperStream traits) nextPutAll: seq [ w original nextPutAll: seq ]. PositionableStream traits addPrototype: #WrapperStream derivedFrom: {Stream WrapperStream}. w@(PositionableStream WrapperStream traits) position: n [ w original position: n ]. w@(PositionableStream WrapperStream traits) contents [ w original contents ].