I’m trying to learn stream processing in Haskell. There are multiple libraries available for that: pipes, conduit, streamly, etc. I know little about them and decided to start learning pipes.
I’m interested in processing data from a network socket. There is the pipes-http library for that and its doc has the basic example at https://hackage.haskell.org/package/pipes-http-1.0.6/docs/Pipes-HTTP.html. The example streams the body of an HTTP response to stdout — it works great. To start with, I’d like to extend it to display the total number of bytes downloaded so far. The pipe is of type ByteString, so it’ll produce chunks of data; it doesn’t matter how big those chunks are. I need to keep track of the total number of bytes downloaded (a job for State) and print it upon receiving each chunk. The task turned out to be more complicated than I thought.
There are four stars returned by the API over two seconds.
Failure
StateT?
The first solution I came up with for tracking the size is using a StateT Size IOlike this:
12345678910111213
typeSize=Int-- | Prints the total size of the downloaded stream so far when each new piece-- is received.printTotalSize::PipeBL.ByteStringBL.ByteString(StateTSizeIO)()printTotalSize=forever$dobs<-awaitmodify'(+(BL.lengthbs))size<-getliftIO$printsizeyieldbs
The function awaits a value, updates the state, prints the size and yields the received value. It should be used between the producer and consumer:
That’s because the producer responseBody resp’s type is Producer ByteString IO () — with the hardcoded IO. My code would work if withHTTP had a MonadIO restriction instead.
Unlift IO?
I stumbled upon the unliftio library that describes a very similar issue with functions like bracket and catch: https://github.com/fpco/unliftio. However I had two issues with it:
How to apply it to a Pipe? I couldn’t implement a function that would “unlift” an IO pipe into a MonadIO pipe.
Even if I could, would it work with StateT? The limitations section says no.
Splitting a pipe
I thought about splitting the incoming pipe into two: one for printing data to stdout as is, and another for keeping the track of state. I believe the tee function can do that, however it does have the requirement that the base monad of both pipes must be the same, so this doesn’t help me a bit.
Those numbers are how many bytes the pipe has received so far.
All the functions there use the distribute function. I have no clue how it works, however I see that it uses unsafeHoist inside…
Looks like this is the way to lift IO-based pipes into pipes based on IO monad transformers.
scanM
Researching this and any related questions, I saw an answer on StackOverflow with a suggestion to use scanM. It wasn’t clear from the type signature whether it would forward all the intermediate values in real time. My code:
Turns out it works as well! The scanTotalSize’s code is shorted because it doesn’t need to await and yield, but on the other hand it receives the bytestring from the previous iteration even though it doesn’t care about it.
Guess what? It works as well. No StateT, so there is no need to lift the pipe, but keeping track of state manually may make code more complicated in more complex cases.
The end
These are the solutions that I’ve found. There could be better ways to do it; I barely understand the library and its concepts. For example, there is no way to figure out when a pipe ends in a Consumer because await doesn’t return a Maybe value; apparently the pipes work in a way that doesn’t require this?!