Read Cypher query results into Apache Flink and write datasets to Neo4j using Cypher batches.
Contains Apache Flink specific input and output formats to read Cypher results from Neo4j and write data back in parallel using Cypher batches.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Neo4jInputFormat<Tuple3<Integer, String, Integer>> input = Neo4jInputFormat
.buildNeo4jInputFormat()
.setRestURI("http://localhost:7475/db/data/")
.setUsername("neo4j")
.setPassword("password")
.setCypherQuery("MATCH (n:User) RETURN id(n), n.name, n.born")
.setConnectTimeout(10_000)
.setReadTimeout(10_000)
.finish();
DataSet<Tuple3<Integer, String, Integer>> vertices = env.createInput(input,
new TupleTypeInfo<Tuple3<Integer, String, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
));
// do something
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Neo4jOutputFormat<Tuple2<String, Integer>> outputFormat = Neo4jOutputFormat
.buildNeo4jOutputFormat()
.setRestURI("http://localhost:7475/db/data/")
.setConnectTimeout(1_000)
.setReadTimeout(1_000)
.setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})")
.addParameterKey(0, "name")
.addParameterKey(1, "born")
.setTaskBatchSize(1000)
.finish();
env.fromElements(new Tuple2<>("Alice", 1984),new Tuple2<>("Bob", 1976)).output(outputFormat);
env.execute();
<repositories>
<repository>
<id>dbleipzig</id>
<name>Database Group Leipzig University</name>
<url>https://wdiserv1.informatik.uni-leipzig.de:443/archiva/repository/dbleipzig/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependency>
<groupId>org.s1ck</groupId>
<artifactId>flink-neo4j</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
Licensed under the Apache License, Version 2.0.