diff --git a/build.sbt b/build.sbt index 141677e..96cff94 100644 --- a/build.sbt +++ b/build.sbt @@ -24,4 +24,6 @@ case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) -} \ No newline at end of file +} + +assemblyJarName in assembly := "csv2rdf.jar" \ No newline at end of file diff --git a/src/main/scala/uk/org/floop/csv2rdf/Run.scala b/src/main/scala/uk/org/floop/csv2rdf/Run.scala index 8ad9953..b9102dd 100644 --- a/src/main/scala/uk/org/floop/csv2rdf/Run.scala +++ b/src/main/scala/uk/org/floop/csv2rdf/Run.scala @@ -1,5 +1,6 @@ package uk.org.floop.csv2rdf +import java.io.File import java.net.URI import java.nio.file.Paths @@ -7,31 +8,35 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.stream.alpakka.csv.scaladsl.CsvParsing -import akka.stream.scaladsl.{FileIO, Sink, Source} +import akka.stream.scaladsl.{Compression, FileIO, Sink, Source, StreamConverters} +import akka.util.ByteString import com.damnhandy.uri.template.UriTemplate import org.apache.jena.rdf.model.{ModelFactory, Property, RDFList} import org.apache.jena.riot.{Lang, RDFDataMgr} +import org.apache.jena.vocabulary.XSD import scala.concurrent.{ExecutionContextExecutor, Future} import scala.jdk.CollectionConverters._ -case class Config(inputCSV: String = "") +case class Config(inputCSV: String = "", outputRDF: Option[File] = None) object CSVW { val namespace = "http://www.w3.org/ns/csvw#" private val model = ModelFactory.createDefaultModel + private def makeProperty(n: String): Property = model.createProperty(namespace, n) + val url: Property = makeProperty("url") - val tableSchema: Property = makeProperty( "tableSchema") - val aboutUrl: Property = makeProperty( "aboutUrl") - val column: Property = makeProperty( "column") - val datatype: Property = makeProperty( "datatype") - val name: Property = makeProperty( "name") - val propertyUrl: Property = makeProperty( "propertyUrl") - val required: Property = makeProperty( "required") - val title: Property = makeProperty( "title") - val valueUrl: Property = makeProperty( "valueUrl") - val virtual: Property = makeProperty( "virtual") + val tableSchema: Property = makeProperty("tableSchema") + val aboutUrl: Property = makeProperty("aboutUrl") + val column: Property = makeProperty("column") + val datatype: Property = makeProperty("datatype") + val name: Property = makeProperty("name") + val propertyUrl: Property = makeProperty("propertyUrl") + val required: Property = makeProperty("required") + val title: Property = makeProperty("title") + val valueUrl: Property = makeProperty("valueUrl") + val virtual: Property = makeProperty("virtual") def tableSpec(csvUrl: URI, metadataUrl: URI): (UriTemplate, List[Column]) = { val metadataModel = RDFDataMgr.loadModel(metadataUrl.toString, Lang.JSONLD) @@ -55,20 +60,70 @@ } yield Column( columnResource.getProperty(CSVW.name).getObject.asLiteral.getLexicalForm, columnResource.listProperties(CSVW.title).asScala.map(_.getObject.asLiteral.getLexicalForm).toList, - Option(columnResource.getProperty(CSVW.datatype)).map(x => new URI(x.getObject.asResource.getURI)), + Option(columnResource.getProperty(CSVW.datatype)).map(x => new URI(expandPrefix(x.getObject.asResource.getURI))), Option(columnResource.getProperty(CSVW.required)).map(_.getObject.asLiteral.getBoolean), Option(columnResource.getProperty(CSVW.virtual)).map(_.getObject.asLiteral.getBoolean), Option(columnResource.getProperty(CSVW.propertyUrl)).map(x => - UriTemplate.fromTemplate(x.getObject.asLiteral.getLexicalForm) + UriTemplate.fromTemplate(expandPrefix(x.getObject.asLiteral.getLexicalForm)) ), Option(columnResource.getProperty(CSVW.valueUrl)).map(x => - UriTemplate.fromTemplate(x.getObject.asLiteral.getLexicalForm) + UriTemplate.fromTemplate(expandPrefix(x.getObject.asLiteral.getLexicalForm)) ) ) }.toList (aboutTemplate, columns) } }.next + + val prefixes = Map( + "as" -> "https://www.w3.org/ns/activitystreams#", + "csvw" -> "http://www.w3.org/ns/csvw#", + "dc" -> "http://purl.org/dc/terms/", + "dcat" -> "http://www.w3.org/ns/dcat#", + "dqv" -> "http://www.w3.org/ns/dqv#", + "duv" -> "https://www.w3.org/ns/duv#", + "grddl" -> "http://www.w3.org/2003/g/data-view#", + "ldp" -> "http://www.w3.org/ns/ldp#", + "ma" -> "http://www.w3.org/ns/ma-ont#", + "oa" -> "http://www.w3.org/ns/oa#", + "odrl" -> "http://www.w3.org/ns/odrl/2/", + "org" -> "http://www.w3.org/ns/org#", + "owl" -> "http://www.w3.org/2002/07/owl#", + "prov" -> "http://www.w3.org/ns/prov#", + "qb" -> "http://purl.org/linked-data/cube#", + "rdf" -> "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "rdfa" -> "http://www.w3.org/ns/rdfa#", + "rdfs" -> "http://www.w3.org/2000/01/rdf-schema#", + "rif" -> "http://www.w3.org/2007/rif#", + "rr" -> "http://www.w3.org/ns/r2rml#", + "sd" -> "http://www.w3.org/ns/sparql-service-description#", + "skos" -> "http://www.w3.org/2004/02/skos/core#", + "skosxl" -> "http://www.w3.org/2008/05/skos-xl#", + "ssn" -> "http://www.w3.org/ns/ssn/", + "sosa" -> "http://www.w3.org/ns/sosa/", + "time" -> "http://www.w3.org/2006/time#", + "void" -> "http://rdfs.org/ns/void#", + "wdr" -> "http://www.w3.org/2007/05/powder#", + "wdrs" -> "http://www.w3.org/2007/05/powder-s#", + "xhv" -> "http://www.w3.org/1999/xhtml/vocab#", + "xml" -> "http://www.w3.org/XML/1998/namespace", + "xsd" -> "http://www.w3.org/2001/XMLSchema#" + ) + + def expandPrefix(s: String): String = { + val colon = s.indexOf(':') + if (colon != -1) { + prefixes.get(s.substring(0, colon)) match { + case Some(namespace) => s"${namespace}${s.substring(colon + 1)}" + case None => s + } + } else { + s + } + } + + val xsdString: String = XSD.xstring.getURI + } case class Column(name: String, titles: List[String], @@ -83,30 +138,38 @@ val packageVersion: String = getClass.getPackage.getImplementationVersion val parser = new scopt.OptionParser[Config]("csv2rdf") { head("csv2rdf", packageVersion) + opt[File]('o', "out") + .action((x, c) => c.copy(outputRDF = Some(x))) + .text("Optionally send output to file instead of stdout. Add .gz to compress.") arg[String]("") action { (x, c) => c.copy(inputCSV = x) } text "URI of input CSV, can be local or remote" } parser.parse(args, Config()) match { case Some(config) => - val csv = new URI(config.inputCSV) - val metadata = new URI(config.inputCSV + "-metadata.json") + val gzipped = config.inputCSV.endsWith(".gz") + val csv = new URI(if (gzipped) config.inputCSV.substring(0, config.inputCSV.length - 3) else config.inputCSV) + val metadata = new URI(csv.toString + "-metadata.json") val (aboutTemplate, columns) = CSVW.tableSpec(csv, metadata) val csvFlow = { if (csv.isAbsolute) { Source.futureSource { Http() - .singleRequest(HttpRequest(uri = csv.toString)) + .singleRequest(HttpRequest(uri = config.inputCSV)) .map { response => response.entity.withoutSizeLimit.dataBytes } } } else { - val file = Paths.get(config.inputCSV) - FileIO.fromPath(file) + val fileFlow = FileIO.fromPath(Paths.get(config.inputCSV)) + if (gzipped) fileFlow.via(Compression.gunzip()) else fileFlow } }.via(CsvParsing.lineScanner()) - csvFlow + val (compressSink, sink) = config.outputRDF match { + case Some(out) => (out.getName.endsWith(".gz"), FileIO.toPath(out.toPath)) + case None => (false, StreamConverters.fromOutputStream(() => System.out)) + } + val toRDF = csvFlow .async .prefixAndTail(1) .flatMapConcat { case (headers, tail) => @@ -136,11 +199,24 @@ prop = propertyTemplate.expand(javaMap) value <- c.valueUrl match { case Some(valueTemplate) => Some(s"<${valueTemplate.expand(javaMap)}>") - case None => Option(javaMap.get(c.name)) + case None => + val v = javaMap.get(c.name) + c.datatype.map(_.toString) match { + case Some(CSVW.xsdString) => Some(s""""${v}"""") + case Some(uri) => Some(s""""${v}"^^<${uri}>""") + case None => Some(s""""${v}"""") + } } } yield (aboutUrl, prop, value) } - }.runWith(Sink.foreach(println)) + } + .mapConcat(identity) + .map( stmt => ByteString(s"<${stmt._1}> <${stmt._2}> ${stmt._3} .\n") ) + if (compressSink) { + toRDF.via(Compression.gzip).runWith(sink) + } else { + toRDF.runWith(sink) + } case None => } }