prototypes ensureDelegatedNamespace: #streams. streams addPrototype: #Stream derivedFrom: {Cloneable}. "The shared protocol of all Stream objects, also providing a common instantiation protocol." s@(Stream traits) newOn: c "Create a new stream of the same kind targetted on the given object." [| new | new: s clone. new on: c. new ]. s@(Stream traits) on: _ "(Re-)target the stream to some object. This modifies it in-place. Overriding this will also customize newOn: for most uses." [overrideThis]. s@(Stream traits) flush "Do nothing by default." [s]. s@(Stream traits) isAtEnd "Answer whether the end of the Stream has been reached." [overrideThis]. s@(Stream traits) elementType "Returns a prototype for an appropriate element type for the Stream. This is the most generic type." [Root]. s@(Stream traits) arrayType "Returns a prototype for an appropriate array type for the Stream. This is the most generic type." [Array]. Stream addPrototype: #Condition derivedFrom: {Condition}. "Streaming-specific errors." Stream Condition addSlot: #stream. c@(Stream Condition traits) newStream: s [| newC | newC: c new. newC stream: s. newC ]. Stream addPrototype: #Exhaustion derivedFrom: {Stream Condition. Error}. "Stream or resource exhaustion, ie #isAtEnd Conditions." e@(Stream Exhaustion traits) describe [ DebugConsole ; 'The stream has reached its end, unhandled.' ]. s@(Stream traits) exhausted [ (s Exhaustion newStream: s) signal ]. s@(Stream traits) endCheck [ s isAtEnd ifTrue: [s exhausted] ]. streams addPrototype: #ReadStream derivedFrom: {Stream}. "Streams that read from some source." s@(ReadStream traits) newOn: source [ s clone on: source ]. s@(ReadStream traits) reader "A ReadStream is already its own resource." [s]. s@(ReadStream traits) next "Obtain and answer the next element from the Stream." [overrideThis]. s@(ReadStream traits) next: n putInto: seq startingAt: start "Returns a partial copy if not all elements can be read into the other collection." [| obj | 0 below: n do: [| :index | (obj: s next) ifNil: [^ (seq copyFrom: 0 to: start + index)]. seq at: start + index put: obj]. n ]. s@(ReadStream traits) nextPutInto: seq startingAt: start [ s next: seq size putInto: seq startingAt: start ]. s@(ReadStream traits) nextPutInto: seq [ s next: seq size putInto: seq startingAt: 0 ]. s@(ReadStream traits) next: n putInto: seq@(Sequence traits) "Places the next N elements into a Sequence at the starting indices." [ s next: n putInto: seq startingAt: 0 ]. s@(ReadStream traits) next: n putInto: seq@(ExtensibleArray traits) "Places the next N elements at the end of a given ExtensibleArray." [ seq addAllLast: (s next: n). n ]. s@(ReadStream traits) next: n "Answer the next N elements." [| arr | arr: (s arrayType newSize: n). s next: n putInto: arr startingAt: 0. arr ]. s@(ReadStream traits) upToSatisfying: testBlock "Answer all objects up to one satisfying the block test." [| result elem | result: s arrayType newEmpty writer. [s isAtEnd or: [testBlock applyWith: (elem: s next)]] whileFalse: [result nextPut: elem]. result contents ]. s@(ReadStream traits) upTo: obj "Answer all objects up to one equal to the argument." [s upToSatisfying: [| :each | each = obj]]. s@(ReadStream traits) upToAnyOf: c@(Collection traits) "Answer all objects up to one contained by the argument." [s upToSatisfying: [| :each | c includes: each]]. s@(ReadStream traits) upToAll: seq@(Sequence traits) "TODO: make this actually work!" [| result seqIndex each | result: s arrayType newEmpty writer. seqIndex: 0. result ; (s upTo: (seq at: seqIndex)). [s isAtEnd /\ (seqIndex >= seq size)] whileFalse: [each: r next. each = (seq at: seqIndex) ifTrue: [seqIndex: seqIndex + 1] ifFalse: [result nextPutAll: seq from: 0 to: seqIndex. result nextPut: each. seqIndex: 0]]. result contents ]. s@(ReadStream traits) upToEnd "Supply all the elements up to the end of the stream." [| result | result: s arrayType newEmpty writer. [s isAtEnd] whileFalse: [result nextPut: s next]. result contents ]. s@(ReadStream traits) do: block "Call the block on all the elements in turn until the Stream is empty." [ [s isAtEnd] whileFalse: [block applyWith: s next]. s ]. streams addPrototype: #WriteStream derivedFrom: {Stream}. "Streams that write to some target." s@(WriteStream traits) writer "A WriteStream is already its own resource." [s]. s@(WriteStream traits) nextPut: _ "Place the given element on the Stream." [overrideThis]. s@(WriteStream traits) next: n put: obj "Make the next N values the argument object." [ n timesRepeat: [s nextPut: obj]. obj ]. s@(WriteStream traits) nextPutAll: c "Place the Collection's contents into the stream." [ s next: c size putAll: c startingAt: 0. s ]. s@(WriteStream traits) nextPutAll: c@(SubSequence traits) "Place the Collection's contents into the stream." [ s next: c end - c start putAll: c sequence startingAt: c start. s ]. s@(WriteStream traits) nextPutAll: seq from: start to: end "Place a range of the Sequence's contents into the Stream." [ s next: end - start putAll: seq from: start. s ]. s@(WriteStream traits) next: n putAll: seq startingAt: start [ 0 below: n do: [| :index | s nextPut: (seq at: start + index)]. seq ]. s@(WriteStream traits) next: n putAll: seq [ s next: n putAll: seq startingAt: 0 ]. s@(WriteStream traits) ; c "Syntactic sugaring to make collection-insertion similar to concatenation." [ s nextPutAll: c ]. source@(Stream traits) >> sink "Write the contents from source to target one element at a time." [ [source isAtEnd] whileFalse: [sink nextPut: source next]. sink ]. sink@(Stream traits) << source [source >> sink]. source@(Stream traits) copyTo: sink chunkSize: n "Write the contents from source to target in big blocks." "TODO: correct next:putInto: to return the actual number transferred, and set the position to that." [| buffer position total | buffer: (source elementType newSize: n). total: 0. [source isAtEnd] whileFalse: [position: n. source next: n putInto: buffer. total: total + position. sink nextPut: position from: buffer]. total ]. source@(Stream traits) copyTo: sink "Write the contents from source to target in big blocks." "TODO: implement this as >>/<< for ExternalStreams." [source copyTo: sink chunkSize: 4096]. streams addPrototype: #ReadWriteStream derivedFrom: {ReadStream. WriteStream}. s@(ReadWriteStream traits) iterator "A ReadWriteStream is already its own resource." [s]. streams addPrototype: #PeekableStream derivedFrom: {Stream}. "A mixin supporting protocols which rely on peek alone, but not full positionability." s@(PeekableStream traits) peek [overrideThis]. s@(PeekableStream traits) nextWhile: testBlock "Answer the next elements for which the test block is True. Note that this relies on a Collection iterator Stream." [| result | result: s collectionType newEmpty writer. [s isAtEnd ifTrue: [^ result contents]. (testBlock applyWith: s peek)] whileTrue: [result nextPut: s next]. result contents ]. s@(PeekableStream traits) nextUntil: testBlock "Answer the next elements for which the test block is not True." [ s nextWhile: [| :each | (testBlock applyWith: each) not] ]. s@(PeekableStream traits) nextDelimitedBy: separatorBlock "Answer the next elements between segments where the separator test is True." [ s skipWhile: separatorBlock. s nextUntil: separatorBlock ]. streams addPrototype: #PositionableStream derivedFrom: {PeekableStream}. "PositionableStreams have an index and iterate over some sequenced collection, but with a specific limit on the stream." PositionableStream addSlot: #position valued: 0. PositionableStream addSlot: #readLimit valued: 0. s@(PositionableStream traits) newOn: c "Override on: in derived objects in order to customize this." [ s clone on: c ]. s@(PositionableStream traits) newWith: c [ s clone with: c ]. s@(PositionableStream traits) newOn: c from: start to: end [ s clone on: c from: start to: end ]. s@(PositionableStream traits) on: _ "Reset the position." [s reset]. s@(PositionableStream traits) contents "Answer the contents of the target up to the readLimit." [overrideThis]. s@(PositionableStream traits) collectionType "Answer the default collection prototype to dump contents into." [String]. s@(PositionableStream traits) last [overrideThis]. s@(PositionableStream traits) next: n [ s next: n putInto: (s collectionType newSize: n) ]. s@(PositionableStream traits) nextMatchAll: c "Whether the next N objects in the Stream are in the other collection (which generally should be a Sequence, ie have linear order)." [| pos | pos: s position. c do: [| :each | s next = each ifFalse: [s position: pos. ^ False]]. True ]. s@(PositionableStream traits) upTo: obj "Answer all objects up to one equal to the argument." [| newS elem | newS: (s collectionType newSize: 100) writer. [s isAtEnd or: [(elem: s next) = obj]] whileFalse: [newS nextPut: elem]. newS contents ]. s@(PositionableStream traits) upToEnd [| newS obj | newS: (s collectionType newSize: 100) writer. [(obj: s next) isNil] whileFalse: [newS nextPut: obj]. newS contents ]. s@(PositionableStream traits) peek "Returns the results of next without advancing the stream." [| obj | s isAtEnd ifTrue: [^ Nil]. obj: s next. s position: s position - 1. obj ]. s@(PositionableStream traits) peek: n "Answer the next N results without advancing the stream." [| result origPosition | origPosition: s position. result: (s next: n). s position: origPosition. result ]. s@(PositionableStream traits) peekFor: obj "Returns whether the object is next in the stream. Advances if true." [| next | s isAtEnd ifTrue: [^ False]. next: s next. obj = next ifFalse: [s position: s position - 1. False] ]. s@(PositionableStream traits) peekForwardBy: offset "Answers the element the given number of positions after the current position." [overrideThis]. s@(PositionableStream traits) peekBackBy: offset "Answers the element the given number of positions before the current position." [s peekForwardBy: offset negated]. s@(PositionableStream traits) upToAnyOf: c "Answer all objects up to the first occurrence of something in the collection." [| start endMatch newC | start: s position. (s match: c) ifTrue: [endMatch: s position. s position: start. newC: (s next: endMatch - start - c size). s position: endMatch. newC] ifFalse: [s position: start. s upToEnd] ]. s@(PositionableStream traits) upToAll: pattern@(Collection traits) "Answer all the elements up to the occurrence of the pattern in the Stream, but not including anything from the pattern." [| result patternSize start end checkStart window | patternSize: pattern size. start: (checkStart: s position). result: (s collectionType newSize: 200) writer. result position: 0. [s readLimit - s position < patternSize "Not enough room left to match." ifTrue: [s position: start. ^ s upToEnd] ifFalse: [window: (s next: patternSize). window = pattern "The pattern has been found. Answer up to the pattern." ifTrue: [end: s position - patternSize. s position: start. ^ (s next: end - start)] "Increment the position past the last start." ifFalse: [s position: (start: start + 1)]]] loop. result contents ]. s@(PositionableStream traits) upToEnd "Answer all the elements up to the limit by a copy." [overrideThis]. s@(PositionableStream traits) through: obj "Answer the next elements up to and including the object given." "This default implementation should be overridden for efficiency." [(s upTo: obj) copyWith: obj]. s@(PositionableStream traits) throughAll: pattern [| result | result: (s collectionType newSize: 40) writer. result ; (s upToAll: pattern). s isAtEnd ifFalse: [result ; (s next: pattern size)]. result contents ]. s@(PositionableStream traits) isAtEnd [ s position >= s readLimit ]. s@(PositionableStream traits) reset [ s position: 0. s ]. s@(PositionableStream traits) resetContents [ s position: 0. s readLimit: 0. s ]. s@(PositionableStream traits) setToEnd "Position the Stream after the last readable element." [ s position: s readLimit. s ]. s@(PositionableStream traits) skip: n [ s position: s position + n. s ]. s@(PositionableStream traits) skipTo: obj [ [s isAtEnd] whileFalse: [s next = obj ifTrue: [^ True]]. False ]. s@(PositionableStream traits) setFrom: start to: end [ s position: start. s readLimit: end + 1. s ]. s@(PositionableStream traits) beginsWith: seq "Answer whether the Stream's next elements match the Sequence." [ s size >= seq size and: [| starters oldPosition | oldPosition: s position. starters: (s next: seq size). s position: oldPosition. starters = seq] ]. s@(PositionableStream traits) retract: n "Retract N elements." [ s skip: n negated. s size. s readLimit: (s readLimit - n). s ]. s@(PositionableStream traits) retract "Retract one element." [s retract: 1]. streams addPrototype: #PositionableReadStream derivedFrom: {PositionableStream. ReadStream}. streams addPrototype: #PositionableWriteStream derivedFrom: {PositionableStream. WriteStream}. streams addPrototype: #PositionableReadWriteStream derivedFrom: {PositionableStream. ReadStream. WriteStream}. streams addPrototype: #DummyStream derivedFrom: {ReadWriteStream}. "DummyStreams always return Nil's and can't be repositioned or written to, but pretend that they can, eating up input and providing no output." s@(Stream traits) newFrom: _@Nil [DummyStream]. _@Nil iterator [DummyStream]. _@Nil reader [DummyStream]. _@Nil writer [DummyStream]. _@(DummyStream traits) next []. _@(DummyStream traits) nextPut: _ []. _@(DummyStream traits) nextPutAll: _ []. _@(DummyStream traits) position [0]. _@(DummyStream traits) position: _ []. _@(DummyStream traits) position: _ []. Method traits addPrototype: #ReadStream derivedFrom: {ReadStream}. "Method ReadStreams take their next element from the recalculation of a code closure with no arguments. This is effectively a means to poll some condition or to generate a sequence." Method ReadStream addSlot: #block valued: []. "The calculation for generating the Stream elements." s@(Stream traits) newFrom: block@(Method traits) [block ReadStream newOn: block]. m@(Method traits) iterator "Mimics the Collection interface for making iterators that target them." [m reader]. m@(Method traits) reader "Mimics the Collection interface for making ReadStreams that target them." [m ReadStream newOn: m]. s@(Method ReadStream traits) on: block [ s block: block ]. s@(Method ReadStream traits) next [ s block do ]. s@(Method ReadStream traits) isAtEnd [False]. Method traits addPrototype: #WriteStream derivedFrom: {WriteStream}. "Method WriteStream applies each fed element into the block as its sole argument." Method WriteStream addSlot: #block valued: [| :_ |]. "The operation to be performed on each element." s@(Stream traits) newTo: block@(Method traits) [block WriteStream newOn: block]. m@(Method traits) writer "Mimics the Collection interface for making ReadStreams that target them." [m WriteStream newOn: m]. s@(Method WriteStream traits) on: block [ s block: block ]. s@(Method WriteStream traits) nextPut: obj [ s block applyWith: obj ]. s@(Method WriteStream traits) isAtEnd [False]. streams addPrototype: #StreamProcessor derivedFrom: {ReadStream}. StreamProcessor addSlot: #source valued: Stream clone. s@(StreamProcessor traits) on: source [ s source: source. s ]. s@(StreamProcessor traits) arrayType [s source arrayType]. s@(StreamProcessor traits) isAtEnd [s source isAtEnd]. streams addPrototype: #FilterStream derivedFrom: {StreamProcessor}. "FilterStreams take source streams and apply a test block to each element, only returning or acting on elements that satisfy the test." "TODO: make a Write- or ReadWrite- variant?" FilterStream addSlot: #block valued: [| :_ | True]. "The default filter, which filters nothing." FilterStream addSlot: #nextValue valued: Nil. "The result of sending #next is pre-computed, to determine isAtEnd while not relying on the source stream being repositionable." s@(Stream traits) select: block [| newS | newS: (FilterStream newOn: s). newS block: block. newS ]. s@(Stream traits) reject: block [| newS | newS: (FilterStream newOn: s). newS block: [block do not]. newS ]. s@(FilterStream traits) on: source [ s nextValue: Nil. s source: source. s ]. s@(FilterStream traits) isAtEnd [ s nextValue ifNil: [s source do: [| :each | (s block applyWith: each) ifTrue: [s nextValue: each. ^ False]]]. s nextValue isNil ]. s@(FilterStream traits) next [| result | result: s nextValue. "If there is no nextValue, check whether the source has dried up." (result isNil and: [s isAtEnd]) ifTrue: [s exhausted]. s source do: [| :each | (s block applyWith: each) ifTrue: [s nextValue: each. ^ result]]. s nextValue: Nil. result ]. streams addPrototype: #CollectStream derivedFrom: {StreamProcessor}. "CollectStreams take source streams and apply a block to each element, returning the block's result for each." CollectStream addSlot: #block valued: [| :x | x]. "The default action, which is an identity." s@(Stream traits) collect: block [| newS | newS: (CollectStream newOn: s). newS block: block. newS ]. s@(CollectStream traits) next [ s block applyWith: s source next ]. streams addPrototype: #ConcatenatedReadStream derivedFrom: {PositionableReadStream}. "A Stream acting as the result of concatenating all the source streams' sources into a single source to be read." ConcatenatedReadStream addSlot: #sources valued: ExtensibleArray newEmpty. "The Streams which are concatenated." ConcatenatedReadStream addSlot: #currentSource. "The current Stream being read." ConcatenatedReadStream addSlot: #position valued: 0. "The overall position of the stream, taken by tabulating." s1@(Stream traits) ; s2@(Stream traits) [ ConcatenatedReadStream newOn: {s1. s2} ]. cs@(ConcatenatedReadStream traits) ; s@(Stream traits) [ cs sources: (cs sources addLast: s) ]. s@(ConcatenatedReadStream traits) on: sources [ s sources: (sources as: s sources). s currentSource: s sources first. s position: 0. s ]. s@(ConcatenatedReadStream traits) isAtEnd [ s currentSource = s sources last and: [s currentSource isAtEnd] ]. s@(ConcatenatedReadStream traits) next [ s currentSource isAtEnd ifTrue: [| nextSrcIdx | nextSrcIdx: (s indexOf: s currentSource) + 1. nextSrcIdx = s sources size ifTrue: [^ Nil]. s currentSource: (s sources at: nextSrcIdx)]. s currentSource next ]. Stream traits addPrototype: #EchoStream derivedFrom: {ReadWriteStream}. "An EchoStream wraps some original stream and duplicates any interaction, reading or writing, done on it to another stream. This relies on EchoStream having defined all of the stream interaction methods that the client relies upon." "Inspired by Henry Lieberman's 1986 paper to the first OOPSLA, titled: _Using Prototypical Objects to Implement Shared Behavior in Object Oriented Systems_ and archived at: http://lieber.www.media.mit.edu/people/lieber/Lieberary/OOP/Delegation/Delegation.html" Stream EchoStream addSlot: #original. "The stream to echo." Stream EchoStream addSlot: #dribble "valued: Console writer". "The target for the echo'ing operation, called dribble after the Lieberman / Lisp terminology." s@(Stream traits) echoTo: log "Creates and returns a new EchoStream from the first to the log Stream." [| echo | echo: (s EchoStream newOn: s). echo echoTo: log ]. s@(Stream traits) echo "Creates and returns a new EchoStream to the console." [s echoTo: Console writer]. e@(Stream EchoStream traits) newOn: s [ e clone on: s ]. e@(Stream EchoStream traits) on: s [ e original: s. e ]. e@(Stream EchoStream traits) echoTo: s "Chooses another Stream to dribble to, ensuring that echoTo: is not repeated." [ e dribble: s. e ]. e@(Stream EchoStream traits) next [ e dribble nextPut: e original next ]. e@(Stream EchoStream traits) next: n [ e dribble nextPutAll: (e original next: n) ]. e@(Stream EchoStream traits) nextPut: obj [ e dribble nextPut: (e original nextPut: obj) ]. e@(Stream EchoStream traits) nextPutAll: seq [ e dribble nextPutAll: (e original nextPutAll: seq) ]. PositionableStream traits addPrototype: #EchoStream derivedFrom: {Stream EchoStream}. e@(PositionableStream EchoStream traits) position: n [ e dribble position: (e original position: n) ]. Stream traits addPrototype: #WrapperStream derivedFrom: {ReadWriteStream}. "A WrapperStream wraps some stream and forwards operations to it where applicable. This is an abstraction to be specialized." Stream WrapperStream addSlot: #original valued: Stream clone. "The stream to be wrapped." w@(Stream WrapperStream traits) newOn: s [ w clone on: s ]. w@(Stream WrapperStream traits) on: s [ w original: s. w ]. w@(Stream WrapperStream traits) next [ w original next ]. w@(Stream WrapperStream traits) next: n [ w original next: n ]. w@(Stream WrapperStream traits) nextPut: obj [ w original nextPut: obj ]. w@(Stream WrapperStream traits) nextPutAll: seq [ w original nextPutAll: seq ]. PositionableStream traits addPrototype: #WrapperStream derivedFrom: {Stream WrapperStream}. w@(PositionableStream WrapperStream traits) position: n [ w original position: n ]. w@(PositionableStream WrapperStream traits) contents [ w original contents ].