Newer
Older
example_classification / owl_classification.py
#!/bin/env python3
import argparse
import csv
from urllib.parse import urljoin

from rdflib import Graph, URIRef, RDF, OWL, BNode, Literal, RDFS
from rdflib.collection import Collection

g = Graph()
g.bind('owl', OWL)

parser = argparse.ArgumentParser(description='Create statistical classification as OWL')
parser.add_argument(
    'codelist',
    type=argparse.FileType('r'),
    help='Codelist CSV file.')
parser.add_argument(
    'classification',
    help='Base URI for this classification.')
parser.add_argument(
    'codes',
    help='Base URI for the codelist.')
parser.add_argument(
    'property',
    help='Defining property.')

args = parser.parse_args()
reader = csv.DictReader(args.codelist)
parent2children = {}
notation2label = {}
notation2comment = {}
for row in reader:
    if 'Parent Notation' in row:
        notation2label[row['Notation']] = row['Label']
        if 'Description' in row and row['Description'] is not None and row['Description'] != '':
            notation2comment[row['Notation']] = row['Description']
        pn = row['Parent Notation']
        if pn == '':
            continue
        if pn in parent2children:
            parent2children[pn].append(row['Notation'])
        else:
            parent2children[pn] = [row['Notation']]

defined = set()

for parent, children in parent2children.items():
    parentNode = URIRef(urljoin(args.classification, parent))
    if parentNode not in defined:
        g.add((parentNode, RDF.type, OWL.Class))
        g.add((parentNode, RDFS.label, Literal(notation2label[parent], lang='en-gb')))
        if parent in notation2comment:
            g.add((parentNode, RDFS.comment, Literal(notation2comment[parent], lang='en-gb')))
        restriction = BNode()
        g.add((parentNode, OWL.equivalentClass, restriction))
        g.add((restriction, RDF.type, OWL.Restriction))
        g.add((restriction, OWL.onProperty, URIRef(args.property)))
        g.add((restriction, OWL.hasValue, URIRef(urljoin(args.codes, parent))))
        defined.add(parentNode)
    childrenNode = BNode()
    g.add((parentNode, OWL.disjointUnionOf, childrenNode))
    childNodes = []
    for child in children:
        childNode = URIRef(urljoin(args.classification, child))
        childNodes.append(childNode)
        if childNode not in defined:
            g.add((childNode, RDF.type, OWL.Class))
            g.add((childNode, RDFS.label, Literal(notation2label[child], lang='en-gb')))
            if child in notation2comment:
                g.add((childNode, RDFS.comment, Literal(notation2comment[child], lang='en-gb')))
            restriction = BNode()
            g.add((childNode, OWL.equivalentClass, restriction))
            g.add((restriction, RDF.type, OWL.Restriction))
            g.add((restriction, OWL.onProperty, URIRef(args.property)))
            g.add((restriction, OWL.hasValue, URIRef(urljoin(args.codes, child))))
            defined.add(childNode)
    c = Collection(g, childrenNode, childNodes)

print(g.serialize(format='turtle').decode('utf-8'))