Reading a stream of JsonDocuments separated by new lines (ndjson)
There are formats where you need to read a stream of JsonDocument
s separated by a character such as a new line. For instance, you can try to read data from a ndjson response. In this post, I'll show you how to read such a stream.
A basic solution would be to split the stream by the separator and then parse each part. However, the JSON document may contain the separator character. For instance, if you use a \n
as the separator and the document is indented, the document will contain \n
characters. So, you cannot just split the stream by the separator. The same issue may happen with other characters as they may appear in string values.
A better solution is to use a PipeReader
to read the stream and try to parse a JsonDocument
. If the parsing fails, it means that the document is not complete. So, we need to wait for more data. If the parsing succeeds, we can return the document and continue reading the stream.
If you are not familiar with PipeReader
and System.IO.Pipelines
, I recommend reading the following post: System.IO.Pipelines: High-performance IO in .NET.
// Create a stream with 3 json documents separated by a new line
var stream = new MemoryStream();
JsonSerializer.Serialize(stream, new { Nickname = "meziantou" });
stream.WriteByte((byte)'\n');
JsonSerializer.Serialize(stream, new { Nickname = "alice" });
stream.WriteByte((byte)'\n');
JsonSerializer.Serialize(stream, new { Nickname = "bob" });
stream.Position = 0;
// Read all the documents
var reader = PipeReader.Create(stream);
await foreach (var jsonDocument in JsonDocumentStream.ParseJsonDocumentStream(reader))
{
// Use the JsonDocument
Console.WriteLine(jsonDocument.RootElement);
// Deserialize the JsonDocument
Console.WriteLine(jsonDocument.Deserialize<User>());
jsonDocument.Dispose();
}
You need to install the System.IO.Pipelines
package to use PipeReader
:
dotnet add package System.IO.Pipelines
class JsonDocumentStream
{
public static IAsyncEnumerable<JsonDocument> ParseJsonDocumentStream(Stream stream, byte? separator = null)
{
var reader = PipeReader.Create(stream);
return ParseJsonDocumentStream(reader, separator);
}
public static async IAsyncEnumerable<JsonDocument> ParseJsonDocumentStream(PipeReader reader, byte? separator = null)
{
while (true)
{
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
while (!buffer.IsEmpty)
{
// Support custom separator
// Note that for spaces, such as new lines, JsonDocument.TryParseValue will
// ignore them. So, there is no need to trim the buffer.
if (separator.HasValue && buffer.First.Span[0] == separator)
{
buffer = buffer.Slice(1);
reader.AdvanceTo(buffer.Start, buffer.End);
}
if (TryParseJson(ref buffer, out var jsonDocument))
yield return jsonDocument;
// Cannot be inline as the method is async, so Utf8JsonReader cannot be instantiated (ref struct)
static bool TryParseJson(ref ReadOnlySequence<byte> buffer, [NotNullWhen(true)] out JsonDocument? jsonDocument)
{
var reader = new Utf8JsonReader(buffer, isFinalBlock: false, default);
if (JsonDocument.TryParseValue(ref reader, out jsonDocument))
{
buffer = buffer.Slice(reader.BytesConsumed);
return true;
}
return false;
}
}
if (result.IsCompleted)
break;
// Remove the processed data from the buffer and make the remaining data visible again
reader.AdvanceTo(buffer.Start, buffer.End);
}
reader.Complete();
}
}
Do you have a question or a suggestion about this post? Contact me!