Dart IO - Streaming Strings in a Nutshell

Starting Point#

Let's start with the example from the io-library-tour on Streaming file Contents:

import 'dart:async';
import 'dart:io';
import 'dart:convert';

Future main() async {
  var config = File('config.txt');
  Stream<List<int>> inputStream = config.openRead();

  var lines = inputStream
      .transform(utf8.decoder)
      .transform(LineSplitter());
  try {
    await for (var line in lines) {
      print('Got ${line.length} characters from stream');
    }
    print('file is now closed');
  } catch (e) {
    print(e);
  }
}

What you can see there is that a 'config.txt' file is processed in a streamed fashion. As part of the processing there are 2 transformations going on.

  1. utf8.decoder that converts a list of unsigned 8-bit integers to a string
  2. LineSplitter that splits the one string into single pieces line by line

The await for will then process the stream basically line by line, where as the EOL-String is part of the yielded list.

Let's dive in#

So how is this transform working? For this we going to write a small transformator that will transform every string to a UPPER CASED string.

Cool, how to start this?#

Let's check the API for transform on Stream. There we find a StreamTransformer<T, S> that needs to be passed over. But after checking we figure out that there is higher level concept that implements this interface and simplifies a lot. It's called a Converter<S, T>. So our implementation could like this:

class UpperCase extends Converter<String, String> {
  @override
  String convert(String input) => input.toUpperCase();
}

Well, that was easy! Let's run the whole program and check how it looks:

import 'dart:async';
import 'dart:io';
import 'dart:convert';

class UpperCase extends Converter<String, String> {
  @override
  String convert(String input) => input.toUpperCase();
}

Future main() async {
  var config = File(Platform.script.toFilePath());
  Stream<List<int>> inputStream = config.openRead();

  var lines = inputStream
      .transform(utf8.decoder)
      .transform(LineSplitter())
      .transform(UpperCase());
  try {
    await for (var line in lines) {
      print('Got ${line.length} characters from stream');
      print(line);
    }
    print('file is now closed');
  } catch (e) {
    print(e);
  }
}
$ dart io_expedition_iter0.dart

Unsupported operation: This converter does not support chunked conversions: Instance of 'UpperCase'

Oooops!

What the hell are chunked conversions?#

Let's find out where this exception is originated. That is Converter<S, T>:

  /**
   * Starts a chunked conversion.
   *
   * The returned sink serves as input for the long-running conversion. The
   * given [sink] serves as output.
   */
  Sink<S> startChunkedConversion(Sink<T> sink) {
    throw new UnsupportedError(
        "This converter does not support chunked conversions: $this");
  }

It shows us at least that for some reason a Converter seems to operate in 2 ways:

The doc block indicates that this is for long-running conversion used. Still unclear how or why this is the choosen path by the runtime.

Let's focus on how to solve that#

As you can see from the signature a Sink<S> is expected to be returned. In our case a Sink<String> that is simply a destination for sending Strings to. So let's intercept the streaming with a small decorator class like below:

class UpperCaseConversionSink extends StringConversionSinkBase {
  EventSink<String> wrapped;

  UpperCaseConversionSink(this.wrapped);

  @override
  void addSlice(String str, int start, int end, bool isLast) {
    wrapped.add(str.toUpperCase());
  }

  @override
  void close() {
    wrapped.close();
  }
}

and let's implement the start of chunked conversion in the UpperCase Converter like this:

  @override
  Sink<String> startChunkedConversion(Sink<String> sink) {
    return UpperCaseConversionSink(sink);
  }
$ dart io_expedition_iter1.dart

Got 19 characters from stream
LIBRARY IO_TESTING;
Got 0 characters from stream

Got 20 characters from stream
IMPORT 'DART:ASYNC';
Got 17 characters from stream
IMPORT 'DART:IO';
Got 22 characters from stream

# [...]

Nice! That works.

Let's refactor a bit#

As you can see the small decorator sink UpperCaseConversionSink has now also knowledge about the conversion technique as well as the UpperCase converter itself. That duplication can be cleaned by introducing a more generic sink that accepts a converter and delegates the concrete conversion back to the converter. Let's see how this might looks:

class StringEventConverterSink extends StringConversionSinkBase {
  EventSink<String> innerSink;
  Converter<String, String> converter;

  // [sink] is wrapped and [converter] knows about the concrete conversion algorithm
  StringEventConverterSink(Sink<String> sink, Converter<String, String> converter) {
    this.innerSink = sink;
    this.converter = converter;
  }

  @override
  void addSlice(String str, int start, int end, bool isLast) {
    innerSink.add(converter.convert(str));
  }

  @override
  void close() {
    innerSink.close();
  }
}

the usage of this looks then like:

class UpperCaseConverter extends Converter<String, String> {
  @override
  String convert(String input) => input.toUpperCase();

  @override
  Sink<String> startChunkedConversion(Sink<String> sink) {
    return StringEventConverterSink(sink, this);
  }
}

The full final code can be found on my github page.

What about closures#

Sure, we can even simplify further and make the Converter itself more generic in a way that it only accepts a closure to do the job. So that our usage would look as simple as this

  .transform(StringConverter((String x) => x.toUpperCase()));

So we will introduce a generic StringConverter that accepts this closure:

class StringConverter extends Converter<String, String> {
  String Function(String x) convertFunction;

  StringConverter(this.convertFunction);

  @override
  String convert(String input) => 
      convertFunction(input);

  @override
  Sink<String> startChunkedConversion(Sink<String> sink) => 
      StringEventConverterSink(sink, this);
}

The full code is on my github page too

Round up#

For me the only open question is: dow does Dart decide whether a conversion can happen direct or in a chunked fashion.

If you can clarify this, feel free to leave a comment or share resources that illustrate that further.

Thanks for reading

$ dart --version
Dart VM version: 2.0.0 (Fri Aug 3 10:53:23 2018 +0200) on "macos_x64"