
Tools for helping integrate OSM data in hdfs using csv or avro files, using flink (implements PBF stream reading). This project reconstruct the relations for ways, polygons to be used in GIS Analysis

MIT License



Tools for integrating big pbf OSM data files in CSV or Avro files, with the geometry reconstructed. (lines, polygons).

Goal - Ease the use of OSM datas in flink / Hadoop clusters

This project provide inputFormat for reading PBF OSM files and create lines, polygons, relations, permitting to use OSM data in big data stacks.

Polylines (Ways) Polygons are reconstructed, for a direct use, ESRI-Geometry provide geometry primitives for buffers, intersections, quadtree .. etc

This project use flink as the main framework, as this is built on top of MapReduce object, this can be ported to the standard Spark or Hadoop framwork.

Design And Output file format

we use the ESRI-Geometry java API https://github.com/Esri/geometry-api-java for efficient ser/deser

a detailed explaination of the output file format is described Here : Output File Format Description

Using the tool as a standalone command line (5 mins startup)

This tool can now be used as a single commandline, as well as a job on a cluster.

Using the jar as a standalone jar :

be sure you have at least java 7 or 8 in the path

java -jar osm-flink-tools-[version]-all.jar rhone-alpes-latest.osm.pbf .\

this command line will create 4 folders containing the reconstructed geometries , as described here : Output File Format Description

if you wish to generate utf-8 or local specific csv files, add -Dfile.encoding=UTF-8 in the command line to use the given charset in the output files, this helps integrate worldwide files or foreign countries. (this must be specified before the -jar option)

Using the Tool in a flink cluster or flink Job

ProcessOSM class, that can be used as a single process (Yarn or flink cluster):

public static void main(String[] args) throws Exception {

	if (args.length < 2)
		throw new Exception("not enought parameters");
	String inputPbf = args[0];

	System.out.println(" input pbf :" + inputPbf);

	String outputResultFolder = args[1];
	System.out.println(" output result folder :" + outputResultFolder);

	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	OSMResultsStreams rs = constructOSMStreams(env, inputPbf);

	rs.retNodesWithAttributes.map(new MapFunction<NodeEntity, Tuple4<Long, Double, Double, String>>() {
		public Tuple4<Long, Double, Double, String> map(NodeEntity value) throws Exception {
			return new Tuple4<>(value.id, value.x, value.y, MapStringTools.convertToString(value.fields));
	}).writeAsCsv(outputResultFolder + "/nodes.csv");

	rs.retPolygons.map(new MapFunction<ComplexEntity, Tuple3<Long, String, String>>() {
		public Tuple3<Long, String, String> map(ComplexEntity value) throws Exception {
			return new Tuple3<>(value.id, GeometryTools.toAscii(value.shapeGeometry),
	}).writeAsCsv(outputResultFolder + "/polygons.csv");

	rs.retWaysEntities.map(new MapFunction<ComplexEntity, Tuple3<Long, String, String>>() {
		public Tuple3<Long, String, String> map(ComplexEntity value) throws Exception {
			return new Tuple3<>(value.id, GeometryTools.toAscii(value.shapeGeometry),

	}).writeAsCsv(outputResultFolder + "/ways.csv");

	rs.retRelations.map(new MapFunction<Relation, Tuple3<Long, String, String>>() {
		public Tuple3<Long, String, String> map(Relation value) throws Exception {

			StringBuilder sb = new StringBuilder();
			if (value.relatedObjects != null) {

				for (RelatedObject r : value.relatedObjects) {
					HashMap<String, Object> h = new HashMap<>();
					h.put("relid", r.relatedId);
					h.put("role", r.role);
					h.put("type", r.type);
					if (sb.length() > 0) {

			return new Tuple3<>(value.id, MapStringTools.convertToString(value.fields), sb.toString());

	}).writeAsCsv(outputResultFolder + "/rels.csv");
