cassava-conduit
Conduit interface for cassava package
https://github.com/domdere/cassava-conduit
LTS Haskell 23.4: | 0.6.6 |
Stackage Nightly 2025-01-12: | 0.6.6 |
Latest on Hackage: | 0.6.6 |
cassava-conduit-0.6.6@sha256:bec74b430edd8488a1e11c8fe1e52f7a259a0ac9401d046d2882f62a36b27770,2693
Module documentation for 0.6.6
- Data
- Data.Csv
cassava-conduit
Conduit interface for cassava package
Streaming to CSV is not 100% complete at this stage, and doesn’t support encoding to CSV with a header yet
Example Usage
The examples project
There is a project containing some examples of the usage, but the gist is here:
import Data.Csv
import Data.Conduit
import Data.Csv.Conduit
data InputRecord = ...
instance FromRecord InputRecord where
...
data OutputRecord = ...
instance ToRecord OutputRecord where
...
decodeOpts :: Word8 -> DecodeOptions
encodeOpts :: Word8 -> EncodeOptions
processInput :: InputRecord -> OutputRecord
-- |
-- A Conduit pipeline that streams from '../exampledata/sampleinput.psv', decodes it from a pipe seperated format,
-- processes it with 'processInput' and the encodes it to pipe seperated format and streams it out to '../exampledata/sampleoutput.psv'
-- The first time it encounters a parse error, it will stop streaming and return the error, dropping any decoded records that came through in that batch also...
--
conduitPipeline :: (MonadError CsvParseError m, MonadResource m) => m ()
conduitPipeline = sourceFile "../exampledata/sampleinput.psv" $$ fromCsv (decodeOpts $ fromIntegral $ ord '|') HasHeader =$= map processInput =$= toCsv (encodeOpts $ fromIntegral $ ord '|') =$= sinkFile "../exampledata/sampleoutput.psv"
main :: IO ()
main = do
res <- runEitherT $ bimapEitherT showError id $ runResourceT conduitPipeline
either putStrLn return res
Building the examples project
$ cd examples
$ cabal sandbox init
$ cabal sandbox add-source ../
$ cabal install --only-dependencies
$ cabal build
Building the project
./mafia build
Running Unit Tests
./mafia test
Changes
0.6.1
- Add
fromCsvStreamErrorNoThrow
andfromNamedCsvStreamErrorNoThrow
, functions that also put halting errors on the conduit instead of throwing inMonadError
.
0.6.0
streamParser
and functions using it would swallow certain runaway errors. This is fixed.
0.5.x
0.4.0.2 -> 0.5.0
EitherT
->ExceptT
0.4.x
0.4.0.1 -> 0.4.0.2
- Bumped
conduit
to1.3.*
(from1.2.*
) - The
conduit-extra
dep was unecessary
0.3.5.1 -> 0.4.0.0
cassava
was bumped from 0.4.*
to 0.5.*
.
There were some semantic changes going from cassava 0.4.* -> 0.5.*, see here
But I don’t think they muddle with any of the explicit cassava-conduit semantics:
QuoteMinimal
semantics are not defined bycassava-conduit
, so it will change behaviour, butcassava-conduit
won’t compensate for it (hence the bump from0.3
to0.4
).- It doesn’t use
foldl'
- cassava-conduit doesn’t use
encodeByNamedWith
so theencIncludeHeader
flag shouldn’t have any effect.
0.3.x
0.2.2 -> 0.3.0
Some new error types, and error contain T.Text
and not String
now
data CsvParseError =
CsvParseError BS.ByteString T.Text
| IncrementalError T.Text
deriving (Show, Eq)
-- | When you want to include errors in the stream, this error type represents errors that halt the stream.
-- They do not appear inside the conduit and will instead get returned from running the conduit.
--
data CsvStreamHaltParseError = HaltingCsvParseError BS.ByteString T.Text -- ^ the remaining bytestring that was read in but not parsed yet, and the stringy error msg describing the fail.
deriving (Show, Eq)
-- | When you want to include errors in the stream, these are the errors that can be included in the stream,
-- they are usually problems restricted to individual records, and streaming can resume from the next record
-- you just have to decide on something sensible to do with the per record errors.
--
data CsvStreamRecordParseError = CsvStreamRecordParseError T.Text deriving (Show, Eq) -- ^ The stringy error describing why this record could not be parsed.
New error types are to separate out errors that stop streaming (and hence imply there are valid records that might be omitted) and errors that can be skipped, allowing valid records after to be processed…
-- |
-- Same as `fromCsv` but allows for errors to be handled in the pipeline instead
--
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromCsvStreamError opts h f = {-# SCC fromCsvStreamError_p #-} streamParser f $ decodeWith opts h
-- |
-- Like `fromNamedCsvStream` but allows for errors to be handled in the pipeline itself.
--
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromNamedCsvStreamError opts f = {-# SCC fromCsvStreamError_p #-} streamHeaderParser f $ decodeByNameWith opts
0.2.x
0.1.0 -> 0.2.0
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> Conduit BS.ByteString m a
0.1.x
0.0.1 -> 0.1.0
fromNamedCsv :: (Show a, Monad m, FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsvStreamError :: (Monad m, FromNamedRecord a) => DecodeOptions -> Conduit BS.ByteString m (Either CsvParseError a)