diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..141677e --- /dev/null +++ b/build.sbt @@ -0,0 +1,27 @@ +name := "csv2rdf.scala" + +version := "0.1" + +scalaVersion := "2.13.1" + +val AkkaVersion = "2.6.4" + +libraryDependencies ++= Seq( + "org.apache.jena" % "jena-arq" % "3.14.0", + "com.github.scopt" %% "scopt" % "3.7.1", + "com.github.tototoshi" %% "scala-csv" % "1.3.6", + "org.slf4j" % "slf4j-simple" % "1.7.26", + "com.damnhandy" % "handy-uri-templates" % "2.1.8", + "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "2.0.0-RC2", + "com.typesafe.akka" %% "akka-stream" % AkkaVersion, + "com.typesafe.akka" %% "akka-http" % "10.1.11" +) + +excludeDependencies += "commons-logging" % "commons-logging" + +assemblyMergeStrategy in assembly := { + case "module-info.class" => MergeStrategy.discard + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} \ No newline at end of file diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..f37b0aa --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version = 1.3.10 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..197ebde --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1") \ No newline at end of file diff --git a/src/main/scala/uk/org/floop/csv2rdf/Run.scala b/src/main/scala/uk/org/floop/csv2rdf/Run.scala new file mode 100644 index 0000000..8ad9953 --- /dev/null +++ b/src/main/scala/uk/org/floop/csv2rdf/Run.scala @@ -0,0 +1,146 @@ +package uk.org.floop.csv2rdf + +import java.net.URI +import java.nio.file.Paths + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.HttpRequest +import akka.stream.alpakka.csv.scaladsl.CsvParsing +import akka.stream.scaladsl.{FileIO, Sink, Source} +import com.damnhandy.uri.template.UriTemplate +import org.apache.jena.rdf.model.{ModelFactory, Property, RDFList} +import org.apache.jena.riot.{Lang, RDFDataMgr} + +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.jdk.CollectionConverters._ + +case class Config(inputCSV: String = "") + +object CSVW { + val namespace = "http://www.w3.org/ns/csvw#" + private val model = ModelFactory.createDefaultModel + private def makeProperty(n: String): Property = model.createProperty(namespace, n) + val url: Property = makeProperty("url") + val tableSchema: Property = makeProperty( "tableSchema") + val aboutUrl: Property = makeProperty( "aboutUrl") + val column: Property = makeProperty( "column") + val datatype: Property = makeProperty( "datatype") + val name: Property = makeProperty( "name") + val propertyUrl: Property = makeProperty( "propertyUrl") + val required: Property = makeProperty( "required") + val title: Property = makeProperty( "title") + val valueUrl: Property = makeProperty( "valueUrl") + val virtual: Property = makeProperty( "virtual") + + def tableSpec(csvUrl: URI, metadataUrl: URI): (UriTemplate, List[Column]) = { + val metadataModel = RDFDataMgr.loadModel(metadataUrl.toString, Lang.JSONLD) + for { + tableStatement <- metadataModel.listStatements(null, CSVW.url, null).asScala + if tableStatement.getObject.isLiteral + resolvedTable = metadataUrl.resolve(tableStatement.getObject.asLiteral.getString) + if csvUrl == resolvedTable + table = tableStatement.getSubject + tableSchema <- metadataModel.listObjectsOfProperty(table, CSVW.tableSchema).asScala + if tableSchema.isResource + about <- metadataModel.listObjectsOfProperty(tableSchema.asResource, CSVW.aboutUrl).asScala.map(_.asLiteral.getLexicalForm) + aboutTemplate = UriTemplate.fromTemplate(about) + columnList <- metadataModel.listObjectsOfProperty(tableSchema.asResource, CSVW.column).asScala + if columnList.canAs(classOf[RDFList]) + } yield { + val columns = { + for { + columnList <- columnList.as(classOf[RDFList]).iterator.asScala + columnResource = columnList.asResource + } yield Column( + columnResource.getProperty(CSVW.name).getObject.asLiteral.getLexicalForm, + columnResource.listProperties(CSVW.title).asScala.map(_.getObject.asLiteral.getLexicalForm).toList, + Option(columnResource.getProperty(CSVW.datatype)).map(x => new URI(x.getObject.asResource.getURI)), + Option(columnResource.getProperty(CSVW.required)).map(_.getObject.asLiteral.getBoolean), + Option(columnResource.getProperty(CSVW.virtual)).map(_.getObject.asLiteral.getBoolean), + Option(columnResource.getProperty(CSVW.propertyUrl)).map(x => + UriTemplate.fromTemplate(x.getObject.asLiteral.getLexicalForm) + ), + Option(columnResource.getProperty(CSVW.valueUrl)).map(x => + UriTemplate.fromTemplate(x.getObject.asLiteral.getLexicalForm) + ) + ) + }.toList + (aboutTemplate, columns) + } + }.next +} + +case class Column(name: String, titles: List[String], + datatype: Option[URI], required: Option[Boolean], + virtual: Option[Boolean], propertyUrl: Option[UriTemplate], + valueUrl: Option[UriTemplate] + ) + +object Run extends App { + implicit val system: ActorSystem = ActorSystem("csv2rdf") + implicit val executionContext: ExecutionContextExecutor = system.dispatcher + val packageVersion: String = getClass.getPackage.getImplementationVersion + val parser = new scopt.OptionParser[Config]("csv2rdf") { + head("csv2rdf", packageVersion) + arg[String]("") action { (x, c) => + c.copy(inputCSV = x) + } text "URI of input CSV, can be local or remote" + } + parser.parse(args, Config()) match { + case Some(config) => + val csv = new URI(config.inputCSV) + val metadata = new URI(config.inputCSV + "-metadata.json") + val (aboutTemplate, columns) = CSVW.tableSpec(csv, metadata) + val csvFlow = { + if (csv.isAbsolute) { + Source.futureSource { + Http() + .singleRequest(HttpRequest(uri = csv.toString)) + .map { response => + response.entity.withoutSizeLimit.dataBytes + } + } + } else { + val file = Paths.get(config.inputCSV) + FileIO.fromPath(file) + } + }.via(CsvParsing.lineScanner()) + csvFlow + .async + .prefixAndTail(1) + .flatMapConcat { case (headers, tail) => + val orderedColumns = for { + headerTitleBytes <- headers.head // there can only be one + } yield { + val filteredColumns = for { + column <- columns + columnTitle <- column.titles + if columnTitle.equalsIgnoreCase(headerTitleBytes.utf8String) + } yield column + filteredColumns.headOption + } + tail.map { row => + for { + (v, optCol) <- (row, orderedColumns).zipped + c <- optCol + } yield (c.name, v.utf8String) + } + }.mapAsyncUnordered(4) { rowNameValues => + val javaMap = rowNameValues.toMap[String, AnyRef].asJava + val aboutUrl = aboutTemplate.expand(javaMap) + Future { + for { + c <- columns + propertyTemplate <- c.propertyUrl + prop = propertyTemplate.expand(javaMap) + value <- c.valueUrl match { + case Some(valueTemplate) => Some(s"<${valueTemplate.expand(javaMap)}>") + case None => Option(javaMap.get(c.name)) + } + } yield (aboutUrl, prop, value) + } + }.runWith(Sink.foreach(println)) + case None => + } +}