KISS 🇺🇦

Stop the war!

Stop the war in Ukraine! Fuck putin!

More information is at: https://war.ukraine.ua/.

There is a fund to support the Ukrainian Army: https://savelife.in.ua/en/donate/, and there is a special bank account that accepts funds in multiple currencies: https://bank.gov.ua/en/about/support-the-armed-forces. I donated to them. Please donate if you can!

Killer putin

Killer putin. Source: politico.eu.

Arrested putin

"It hasn't happened yet, but it will happen sooner or later. Beautiful photo, isn't it?" Source: twitter.

Showing download progress with pipes

| comments

I’m trying to learn stream processing in Haskell. There are multiple libraries available for that: pipes, conduit, streamly, etc. I know little about them and decided to start learning pipes.

I’m interested in processing data from a network socket. There is the pipes-http library for that and its doc has the basic example at https://hackage.haskell.org/package/pipes-http-1.0.6/docs/Pipes-HTTP.html. The example streams the body of an HTTP response to stdout — it works great. To start with, I’d like to extend it to display the total number of bytes downloaded so far. The pipe is of type ByteString, so it’ll produce chunks of data; it doesn’t matter how big those chunks are. I need to keep track of the total number of bytes downloaded (a job for State) and print it upon receiving each chunk. The task turned out to be more complicated than I thought.

Base

The code here is available in the pipes-streaming-size repo.

I’m starting with the sample code from the docs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env stack
-- stack --resolver lts-19.6 script --package pipes,pipes-bytestring,pipes-http

import Pipes
import Pipes.HTTP
import qualified Pipes.ByteString as PB

type Size = Int

saveStream :: String -> IO ()
saveStream url = do
  manager <- newManager tlsManagerSettings
  req <- parseUrlThrow url

  withHTTP req manager $ \resp ->
    runEffect
      $ responseBody resp
      >-> PB.stdout

main :: IO ()
main = saveStream "https://httpbin.org/drip?duration=2&numbytes=4&code=200&delay=0"

Running it works fine:

1
2
$ ./pipes-streaming-size.hs
****

There are four stars returned by the API over two seconds.

Failure

StateT?

The first solution I came up with for tracking the size is using a StateT Size IO like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
type Size = Int

-- | Prints the total size of the downloaded stream so far when each new piece
-- is received.
printTotalSize :: Pipe BL.ByteString BL.ByteString (StateT Size IO) ()
printTotalSize = forever $ do
  bs <- await

  modify' (+ (BL.length bs))
  size <- get
  liftIO $ print size

  yield bs

The function awaits a value, updates the state, prints the size and yields the received value. It should be used between the producer and consumer:

1
2
3
4
5
withHTTP req manager $ \resp ->
  flip evalStateT 0 $ runEffect
    $ responseBody resp
    >-> printTotalSize
    >-> PB.stdout

However this doesn’t work:

1
2
3
4
5
6
7
8
9
~/haskell/pipes-streaming-size/pipes-streaming-size.hs:32:11: error:
     Couldn't match type StateT Size IO with IO
      Expected: Pipes.Proxy () PB.ByteString () PB.ByteString IO ()
        Actual: Pipe PB.ByteString PB.ByteString (StateT Size IO) ()
     In the second argument of (>->), namely printTotalSize
      In the first argument of (>->), namely
        responseBody resp >-> printTotalSize
      In the second argument of ($), namely
        responseBody resp >-> printTotalSize >-> PB.stdout

That’s because the producer responseBody resp’s type is Producer ByteString IO () — with the hardcoded IO. My code would work if withHTTP had a MonadIO restriction instead.

Unlift IO?

I stumbled upon the unliftio library that describes a very similar issue with functions like bracket and catch: https://github.com/fpco/unliftio. However I had two issues with it:

  • How to apply it to a Pipe? I couldn’t implement a function that would “unlift” an IO pipe into a MonadIO pipe.
  • Even if I could, would it work with StateT? The limitations section says no.

Splitting a pipe

I thought about splitting the incoming pipe into two: one for printing data to stdout as is, and another for keeping the track of state. I believe the tee function can do that, however it does have the requirement that the base monad of both pipes must be the same, so this doesn’t help me a bit.

Success

Pipes.Lift

I came across the Pipes.Lift module accidentally. It’s even in the base pipes library! The module has functions that allow to “hide” a StateT from within a pipe; I don’t need the final state, so I can use evalStateP :: Monad m => s -> Pipe a b (StateT s m) r -> Pipe a b m r. Wrapping printTotalSize is enough:

1
2
3
4
5
6
7
import Pipes.Lift (evalStateP)

withHTTP req manager $ \resp ->
  runEffect
    $ responseBody resp
    >-> evalStateP 0 printTotalSize
    >-> PB.stdout

This works:

1
2
3
4
5
6
$ ./pipes-streaming-size.hs
1
*2
*3
*4
*

Those numbers are how many bytes the pipe has received so far.

All the functions there use the distribute function. I have no clue how it works, however I see that it uses unsafeHoist inside…

Looks like this is the way to lift IO-based pipes into pipes based on IO monad transformers.

scanM

Researching this and any related questions, I saw an answer on StackOverflow with a suggestion to use scanM. It wasn’t clear from the type signature whether it would forward all the intermediate values in real time. My code:

1
2
3
4
5
6
7
8
9
10
11
12
13
import qualified Pipes.Prelude as P

type SizeState = (Size, BL.ByteString)

scanTotalSize :: SizeState -> BL.ByteString -> IO SizeState
scanTotalSize (size, _) bs = let newSize = size + (BL.length bs)
  in print newSize >> pure (newSize, bs)

withHTTP req manager $ \resp ->
  runEffect
    $ responseBody resp
    >-> P.scanM scanTotalSize (pure (0, BL.empty)) (pure . snd)
    >-> PB.stdout

Turns out it works as well! The scanTotalSize’s code is shorted because it doesn’t need to await and yield, but on the other hand it receives the bytestring from the previous iteration even though it doesn’t care about it.

Explicit recursion

Finally, this StackOverflow answer to “Stateful generators with Haskell pipes” suggested using an explicit recursion. I tried this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
printTotalSize :: Pipe BL.ByteString BL.ByteString IO ()
printTotalSize = iter 0
  where
    iter :: Size -> Pipe BL.ByteString BL.ByteString IO ()
    iter size = do
      bs <- await

      let newSize = size + (BL.length bs)
      liftIO $ print newSize

      yield bs
      iter newSize

withHTTP req manager $ \resp ->
  runEffect
    $ responseBody resp
    >-> printTotalSize
    >-> PB.stdout

Guess what? It works as well. No StateT, so there is no need to lift the pipe, but keeping track of state manually may make code more complicated in more complex cases.

The end

These are the solutions that I’ve found. There could be better ways to do it; I barely understand the library and its concepts. For example, there is no way to figure out when a pipe ends in a Consumer because await doesn’t return a Maybe value; apparently the pipes work in a way that doesn’t require this?!

Comments