Stored Procedure to Import Data

A while back I showed you how to write an extension to import the MaxMind city data set. Today is just a repeat of that exercise but instead of using an extension, we will use a stored procedure.

The documentation spells out how to write your own procedures in Chapter 6 so I’m not going to go over that again, but I do want to point out a few things.

One is that the stored procedure starts its own Transaction, so if you are just reading or writing something from the database you don’t even have to worry about it. This is very convenient except when you want to load large amounts of data. So what I like to do is to have the “top level” procedure do nothing but call another thread where the actual work will be and just return when it is done. You’ll notice I’ll grab a GraphDatabaseService in our context that gets passed in (use GraphDatabaseAPI if using Neo4j version 3.1.x). Code below:

 
    @Context
    public GraphDatabaseService db;

    @Procedure(name = "com.maxdemarzi.import.locations", mode = Mode.WRITE)
    @Description("CALL com.maxdemarzi.import.locations(file)")
    public Stream<StringResult> importLocations(@Name("file") String file) throws InterruptedException {
        long start = System.nanoTime();

        Thread t1 = new Thread(new ImportLocationsRunnable(file, db, log));
        t1.start();
        t1.join();

        return Stream.of(new StringResult("Locations imported in " + TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) + " seconds"));
    }

In the Runnable is where all the magic happens. I put a hard limit here of 1000, but your server may be better with 5000 or 10000 changes per transaction. What we want to do is batch our writes, if we do one change at a time, we will destroy our performance since Neo4j is an ACID database and every a tiny little change of a property from true to false, will force a write to disk.

 
public class ImportLocationsRunnable implements Runnable {

    private static final int TRANSACTION_LIMIT = 1000;
    private String file;
    private GraphDatabaseService db;
    private Log log;

    public ImportLocationsRunnable(String file, GraphDatabaseService db, Log log) {
        this.file = file;
        this.db = db;
        this.log = log;
    }

In our actual “run” method we check to make sure the file exists….

 
 @Override
    public void run() {
        Reader in;
        Iterable<CSVRecord> records = null;
        try {
            in = new FileReader("/" + file);
            records = CSVFormat.EXCEL.withHeader().parse(in);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            log.error("ImportLocationsRunnable Import - File not found: " + file);
        } catch (IOException e) {
            e.printStackTrace();
            log.error("ImportLocationsRunnable Import - IO Exception: " + file);
        }

…and begin our Transaction OUTSIDE the try block. Then loop over each record in the csv file.

 
        Transaction tx = db.beginTx();
        try {
            int count = 0;

            assert records != null;
            for (CSVRecord record : records) {
                count++;

As we import along, we check to see if we have reached our transaction limit and then commit, and refresh our transaction with a new one. This will take the hit to disk once for the 1000 changes, as well as relieve memory pressure from having to keep all that work in memory.

 
              if (count % TRANSACTION_LIMIT == 0) {
                    tx.success();
                    tx.close();
                    tx = db.beginTx();
                }
            }

            tx.success();
        } finally {
            tx.close();
        }

    }

…and yes it’s single threaded, but if your data is only a few million records, it will be fine. Once you start talking about loading 1 Billion records or more, then maybe a good time to multi-thread this code or use a different import mechanism.

As always, code is on github.

Tagged , , , , , , , , ,

Leave a comment