Ways to load a file to PostgresPosted on

To write a post about async queries I first need to load some data to Postgres. The data came form https://www.kaggle.com/CooperUnion/anime-recommendations-database. I wanted to use real data and I’m an anime fan, so it was the perfect fit. It’s a CSV file so there is no normalization. That’s ok. If there was this would be a short post. I decided to try several ways of loading the data to see which would be fastest. To be honest, I already knew which one was going to win I wanted to see how close I could get using other methods without over engineering it or getting cute with the code. I ended up going there anyway, but the method that came in a close second was a simple solution.

The data

A little bit about the data before diving into the solutions. The file has seven columns, id, name, genres, type, episodes, rating, and members. Each anime could have zero or more genres with occasional duplication packed into a single quoted comma delimited list. At first, I thought all columns had values but quickly learned only id, name, and members where guaranteed. I load the data into a staging table using psql’s \copy. I then tried several ways to load the data into these tables.

create table type(
    id integer primary key generated by default as identity,
    name varchar(10) unique not null);

create table anime(
    id integer primary key,
    name varchar(100) not null,
    type_id integer references type (id),
    episodes integer,
    rating numeric(4,2),
    members integer,
    constraint name_type unique(name, type_id));

create table genre(
    id integer primary key generated by default as identity,
    name varchar(20) unique not null);

create table anime_genre(
    id integer primary key generated by default as identity,
    anime_id integer not null references anime (id),
    genre_id integer not null references genre (id),
    constraint anime_genre_uc unique (anime_id, genre_id));

Pure SQL

Having spent a lot of time working with relational databases, my first idea was to use insert statements to load the data. Four SQL statements, insert the unique types, insert the anime, insert the genres and finally insert the records to map anime to genre. Not much exciting here. The source used “Unknown” when the number of episodes wasn’t known so I had to convert that to null. Dealing with the genres was a little trickier. Fortunately, Postgres has a way to convert a string to an array and then the array to rows. I highlight that below where it I used to create the genres and map anime to genre.

insert into type(
    name) 
(select distinct type 
   from anime_staging 
  where type is not null);

insert into anime (
    select anime_id, 
    name, 
    (select type.id 
       from type 
      where anime_staging.type = type.name),
    case 
      when episodes = 'Unknown' then null 
      else episodes::integer 
    end,
    rating,
    members
    from anime_staging);

insert into genre (
    name) 
(select distinct trim(genre) 
   from anime_staging, 
        lateral unnest(string_to_array(genres, ',')) genre);

insert into anime_genre(
    anime_id, 
    genre_id) 
(select distinct anime_id, 
        (select genre.id 
           from genre 
          where genre.name = trim(genre)) 
   from anime_staging, 
        lateral unnest(string_to_array(genres, ',')) genre);

The pure SQL approach ran on average just over a second. That’s the time to beat in C#. The closest I came was around three seconds, the rest were significantly slower.

The C# Approach

I tried five approaches, 3 were variations on when to insert the row, one used async database calls and the last used a bulk insert. I’m only going to go into depth on 2 of them here, all of them are available at the GitHub link at the bottom.

The first three all followed a similar pattern, and all performed about the same. Horribly. Taking between 90 and 110 seconds. Two read all the rows into a list and then inserted them into the table, one opening a connection for each insert, the other inserting them all in one connection. The third as a precursor to the async version used yield in the read instead of returning the list all at once. The timings are in the table below. It shouldn’t come as a surprise that the write them all in one connection version was slightly faster than the other two.

Version time (seconds)
One Row at a Time 109
Insert all at once 93
Insert one at a time 110

The Async Approach

I decided I wanted to test an async version, I started with the one row at a time approach. The difference is this version uses the async versions of the database methods. I was also going to make it multi-threaded, but Parallel.ForEach and async don’t mix. For now, this one is just async. Getting async and yield to work together required a little extra work. You can’t use async/await with yield, but you can yield a task. Read returns IEnumerable<Task<Anime>> and calls an async method that returns Task. The important code is below.

        private async Task<Anime> ReadAsync(DbDataReader reader)
        {
            var hasNextRow = await reader.ReadAsync();
            if (!hasNextRow)
            {
                return null;
            }
            var id = reader.GetInt32(0);
            var episodes = reader.GetString(4);
            return new Anime
            {
                Id = id,
                Name = reader.GetString(1),
                GenreIds = reader.IsDBNull(2) ? new List<int?>() : reader.GetString(2).Split(',').Select(x => genres.GetId(x.Trim())).Distinct().ToList(),
                TypeId = reader.IsDBNull(3) ? (int?)null : types.GetId(reader.GetString(3)),
                Episodes = episodes == "Unknown" ? (int?)null : Int32.Parse(episodes),
                Rating = reader.IsDBNull(5) ? (double?)null : reader.GetDouble(5),
                Members = reader.IsDBNull(6) ? (int?)null : reader.GetInt32(6)
            };
        }

        public IEnumerable<Task<Anime>> Read()
        {
            using (var connection = new NpgsqlConnection(configuration["connectionString"]))
            {
                connection.Open();
                using (var command = connection.CreateCommand())
                {
                    command.CommandText = "select anime_id, name, genres, type, episodes, rating, members from anime_staging";
                    using (var reader = command.ExecuteReader())
                    {
                        var row = ReadAsync(reader);
                        while (reader.IsOnRow)
                        {
                            yield return row;
                            row = ReadAsync(reader);
                        }
                    }
                }
            }
        }

In the main method, I iterate over the rows calling an async lambda function and then use Task.WhenAll to wait for all the inserts to complete.

                var saves = animeStaging.Read().Select(async (animeAsync) =>
                {
                    var anime = await animeAsync;
                    await animes.Save(anime);
                    await animeGenres.Save(anime.Id, anime.GenreIds);
                });
                await Task.WhenAll(saves);

The call to animeStaging.Read yields a task per result. Line 3 waits for each task to complete. Line 4 saves the anime object and awaits the insert. Line 5 does the same for the genre ids. The result of the async lambda is a Task. Select repeats this for each yielded result and returns an IEnumberable of Task. That IEnumerable is passed to Task.WhenAll which returns a Task that completes when all Task in the IEnumerable complete. Even though all this is still single threaded releasing the thread when waiting for I/O cuts the run time in half to 45 seconds. That makes this the second fastest C# version. The fastest C# version doesn’t use async calls at all.

Binary Copy

This version uses Npgsql’s BeginBinaryImport and that uses the Postgres copy command to do a bulk insert of the rows. The staging data is read into a List of Anime and then bulk inserted into the anime and anime_genre tables. If you must use code to load the data, then this is your best bet. As you can see from the code below it’s easy and best of all it only took 3 seconds to run.

        public void Save(List<Anime> animes)
        {
            using (var connection = new NpgsqlConnection(configuration["connectionString"]))
            {
                connection.Open();
                using (var writer = connection.BeginBinaryImport("COPY anime (id, name, type_id, episodes, rating, members) from STDIN (FORMAT BINARY)"))
                {
                    foreach (var anime in animes)
                    {
                        writer.StartRow();
                        writer.Write(anime.Id, NpgsqlTypes.NpgsqlDbType.Integer);
                        writer.Write(anime.Name, NpgsqlTypes.NpgsqlDbType.Varchar);
                        if (anime.TypeId.HasValue)
                            writer.Write(anime.TypeId.Value, NpgsqlTypes.NpgsqlDbType.Integer);
                        else
                            writer.WriteNull();
                        if (anime.Episodes.HasValue)
                            writer.Write(anime.Episodes.Value, NpgsqlTypes.NpgsqlDbType.Integer);
                        else
                            writer.WriteNull();
                        if (anime.Rating.HasValue)
                            writer.Write(anime.Rating.Value, NpgsqlTypes.NpgsqlDbType.Numeric);
                        else
                            writer.WriteNull();
                        if (anime.Members.HasValue)
                            writer.Write(anime.Members.Value, NpgsqlTypes.NpgsqlDbType.Integer);
                        else
                            writer.WriteNull();
                    }
                    writer.Complete();
                }
            }
        }

Summary

The binary copy version was the fastest of the .Net attempts, but it was still three times slower than the pure SQL approach. A pure SQL approach will always beat procedural code. If you must use procedural code, then binary copy is your best bet. The async version was fun and frustrating to write, but I don’t think it was a good fit for this problem. If you are importing a lot of data and must use procedural code, then a combination of binary copy and threading will be your best bet. I plan to write a post about threading, finding a larger dataset and using binary copy with threading sounds like it would be fun. My next post will be on using async database calls in a REST API, that should see the same performance gains from the Why async is important to a web service post.

The code for this post is at https://github.com/jamesdalton/ways-to-load-a-file-to-postgres

Comments

Leave a Reply