Amplicon Mimosa#

This document contains many functions for all kinds of amplicon analysis used in unlock

Connection settings#

from irods.session import iRODSSession
import ssl
from irods.meta import iRODSMeta
import os, sys

# iRODS authentication information
host = "unlock-icat.irods.surfsara.nl"
port = 1247
zone = "unlock"
user = os.getenv('irodsUserName')
password = os.getenv('irodsPassword')

# SSL settings
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=None, capath=None, cadata=None)

ssl_settings = {'irods_client_server_negotiation': 'request_server_negotiation',
                'irods_client_server_policy': 'CS_NEG_REQUIRE',
                'irods_encryption_algorithm': 'AES-256-CBC',
                'irods_encryption_key_size': 32,
                'irods_encryption_num_hash_rounds': 16,
                'irods_encryption_salt_size': 8,
                "irods_authentication_scheme": "pam",
                'ssl_context': context}
                

MIMOSA data preparation#

All NG-Tax TURTLE data needs to be downloaded from the iRODS instance

# ...
from irods.models import Collection, DataObject
from irods.column import Criterion
import os

GROUP = "wur.unlock.hivghana_drp0070"

# Standard authentication
with iRODSSession(host = host, port = port, user = user, password = password, zone = zone, **ssl_settings) as session:
    # Set the filters to select the files needed for downloading
    results = session.query(Collection, DataObject).filter( \
        Criterion('like', Collection.name, '%'+GROUP+'%')).filter( \
        Criterion('like', DataObject.name, '%ttl'))
    # Download the result files
    for r in results:
        # Collection and data object
        data_file = r[Collection.name] + "/" + r[DataObject.name]
        # Original path in irods is local path from current location
        if not os.path.exists("." + data_file):
            # Create the directory if it does not exist
            os.makedirs("." + r[Collection.name], exist_ok=True)
            print("Downloading: " + data_file + " "*10, end="\r")
            # Download the file
            session.data_objects.get(data_file, "." + data_file)
        else:
            print("Already received: " + data_file + " "*10, end="\r")
Already received: /unlock/trash/home/jkoehorst/wur.unlock.hivghana_drp0070/demo-13.ttl          76960/amplicon/ASY_DRR243924/NGTAX_Silva138.1-picrust_100_DRR243924/2_Classification/DRR243924_NG-Tax_100.ttl          
def create_repository(label):
  import requests
  # RDF4J Repository config file
  content = """
#
# RDF4J configuration template for a GraphDB repository
#
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
@prefix rep: <http://www.openrdf.org/config/repository#>.
@prefix sr: <http://www.openrdf.org/config/repository/sail#>.
@prefix sail: <http://www.openrdf.org/config/sail#>.
@prefix graphdb: <http://www.ontotext.com/config/graphdb#>.

[] a rep:Repository ;
    rep:repositoryID "XXXX" ;
    rdfs:label "" ;
    rep:repositoryImpl [
        rep:repositoryType "graphdb:SailRepository" ;
        sr:sailImpl [
            sail:sailType "graphdb:Sail" ;

            graphdb:read-only "false" ;

            # Inference and Validation
            graphdb:ruleset "rdfsplus-optimized" ;
            graphdb:disable-sameAs "true" ;
            graphdb:check-for-inconsistencies "false" ;

            # Indexing
            graphdb:entity-id-size "32" ;
            graphdb:enable-context-index "false" ;
            graphdb:enablePredicateList "true" ;
            graphdb:enable-fts-index "false" ;
            graphdb:fts-indexes ("default" "iri") ;
            graphdb:fts-string-literals-index "default" ;
            graphdb:fts-iris-index "none" ;

            # Queries and Updates
            graphdb:query-timeout "0" ;
            graphdb:throw-QueryEvaluationException-on-timeout "false" ;
            graphdb:query-limit-results "0" ;

            # Settable in the file but otherwise hidden in the UI and in the RDF4J console
            graphdb:base-URL "http://example.org/owlim#" ;
            graphdb:defaultNS "" ;
            graphdb:imports "" ;
            graphdb:repository-type "file-repository" ;
            graphdb:storage-folder "storage" ;
            graphdb:entity-index-size "10000000" ;
            graphdb:in-memory-literal-properties "true" ;
            graphdb:enable-literal-index "true" ;
        ]
    ].
  """
  content = content.replace("XXXX", label)
  writer = open("config.ttl","w")
  print(content, file=writer)
  writer.close()

  headers = {
      # requests won't add a boundary if this header is set when you pass files=
      # 'Content-Type': 'multipart/form-data',
  }

  files = {
      'config': open('./config.ttl', 'rb'),
  }

  response = requests.post('http://localhost:7200/rest/repositories', headers=headers, files=files)
  print(response.__dict__)
# Process the turtle files
from rdflib import Graph
import requests, json
import time  

# Check if GraphDB runs:...
# docker volume create graphdb
# docker run -v graphdb:/opt/graphdb/dist/data --name graphdb -p 7200:7200 ontotext/graphdb:10.1.3 -Xmx6g
graphdb = 'http://localhost:7200'

# GraphDB restrictions
REPO = GROUP.replace("wur.unlock.", "wur_unlock_")

def check_graphdb():
    headers = {
        'Accept': 'application/json',
    }

    response = requests.get(graphdb + '/rest/repositories', headers=headers)
    content = json.loads(response.content.decode())
    repo_available = False
    print("Available repositories:")
    for repository in content:
        if repository['id'] == REPO:
            print(repository['id'])
            repo_available = True
    if not repo_available:
        # Create repository
        create_repository(REPO)

def load_data(data_file):
    headers = {
    'Content-Type': 'application/x-turtle',
    'Accept': 'application/json'
    }

    with open(data_file, 'rb') as f:
        response  = requests.post(graphdb + '/repositories/'+REPO+'/statements', data=f, headers=headers)
        if "Unknown repository:" in response.content.decode():
            raise Exception(response.content.decode())

###
to_be_loaded = set()
check_graphdb()

for current_dir, subdirs, files in os.walk( './unlock/' ):
    for file in files:
        if (file.endswith(".ttl")) and GROUP in current_dir:
            to_be_loaded.add(current_dir + "/" + file)

if len(to_be_loaded) == 0:
    print("No files to be loaded")

for index, file_path in enumerate(to_be_loaded):
    current = time.time()
    load_data(file_path)
    end = time.time()
    diff = int(end - current)
    print("Loaded file ",index, "of", len(to_be_loaded),"in", diff, "seconds with",file_path.split("/")[-1], " "*10, end="\r")
            
Available repositories:
wur_unlock_hivghana_drp0070
Loaded file  28 of 29 in 0 seconds with DRR243845.ttl                      

MIMOSA table generator#

from SPARQLWrapper import SPARQLWrapper, JSON
# Make connection with sparql endpoint containing all NG-Tax 2.0 data
sparql = SPARQLWrapper("http://localhost:7200/repositories/" + REPO)
sparql.setReturnFormat(JSON)

# Get all sample name mappings
# Query for column headers to sample name
query = """
PREFIX gbol: <http://gbol.life/0.1/>
PREFIX schema: <http://schema.org/>
PREFIX sschema: <https://schema.org/>
PREFIX jerm: <http://jermontology.org/ontology/JERMOntology#>
select distinct ?sample_name ?sample_id where { 
	?ngtax_sample gbol:name ?sample_name .
    ?ngtax_sample gbol:metadata ?mappingFile .
    ?mappingFile a gbol:MappingFile .
    ?mappingFile gbol:fastQforward ?fwd .
    ?mappingFile gbol:fastQreverse ?rev .
    ?sample jerm:hasPart/jerm:hasPart/schema:identifier ?fwd .
    ?sample jerm:hasPart/jerm:hasPart/schema:identifier ?rev .
    ?sample a jerm:Sample .
    ?sample schema:identifier ?sample_id .
}"""

sparql.setQuery(query)
ret = sparql.queryAndConvert()
# Iterate over the sample names
sample_name_lookup = {}
for index, r in enumerate(ret["results"]["bindings"]):
    sample_name_lookup[r['sample_name']['value']] = r['sample_id']['value']
    
from SPARQLWrapper import SPARQLWrapper, JSON
# Make connection with sparql endpoint containing all NG-Tax 2.0 data
sparql = SPARQLWrapper("http://localhost:7200/repositories/" + REPO)
sparql.setReturnFormat(JSON)

# Obtain all samples to make it more scalable
query = """
PREFIX gbol:<http://gbol.life/0.1/>
SELECT DISTINCT ?sample
WHERE {
    ?sample a gbol:Sample .
}
"""

sparql.setQuery(query)
ret = sparql.queryAndConvert()
# Create containers for the forward, reverse ASV and the sample list
container_fseq = {}
container_rseq = {}
all_samples = set()
# Iterate over the samples
for index, r in enumerate(ret["results"]["bindings"]):
    # For each sample get all ASVs and abundance information
    sample = r['sample']['value']
    print("Processed sample number", index, end='\r')
    query = """
    PREFIX gbol:<http://gbol.life/0.1/>
    SELECT DISTINCT ?name ?fseq ?rseq ?count
    WHERE {
    VALUES ?sample { <"""+sample+"""> }
    ?sample a gbol:Sample .
    ?sample gbol:name ?name .
    ?sample gbol:asv ?asv .
    ?asv a gbol:ASVSet . 
    ?asv gbol:forwardASV ?fasv .
    ?fasv gbol:sequence ?fseq .
    ?asv gbol:reverseASV ?rasv . 
    ?rasv gbol:sequence ?rseq .
    ?asv gbol:clusteredReadCount ?count .
    }"""
    # Set query and iterate over results
    sparql.setQuery(query)
    ret = sparql.queryAndConvert()
    for r in ret["results"]["bindings"]:
        name = r['name']['value']
        fseq = r['fseq']['value']
        rseq = r['rseq']['value']
        count = int(r['count']['value'])
        # Sample renaming using the previous lookup
        name = sample_name_lookup[name]
        # List of sample names
        all_samples.add(name)
        # Fseq object fill method
        if fseq not in container_fseq:
            container_fseq[fseq] = {}
        if name not in container_fseq[fseq]:
            container_fseq[fseq][name] = 0
        container_fseq[fseq][name] += count

        # Rseq object fill method
        if rseq not in container_rseq:
            container_rseq[rseq] = {}
        if name not in container_rseq[rseq]:
            container_rseq[rseq][name] = 0
        container_rseq[rseq][name] += count
Processed sample number 8
import numpy as np
import pandas as pd

# Create an abundance matrix using numpy
def create_abundance_matrix(name, container, all_samples):
    matrix = np.zeros((len(container), len(all_samples)), dtype=int)
    all_samples = sorted(list(all_samples))
    asvs = sorted(container.keys())
    for row_index, asv in enumerate(asvs):
        for sample in container[asv]:
            coll_index = all_samples.index(sample)
            value = container[asv][sample]
            matrix[row_index,coll_index] = int(value)

    # Write to file as panda frame with column and row names
    df = pd.DataFrame(matrix, columns=all_samples, index=asvs)
    df.index.name="OTU"
    df.to_csv(name, sep="\t")
# Creating the RSEQ tsv file
create_abundance_matrix("fseq.tsv", container_fseq, all_samples)
# Creating the FSEQ tsv file
create_abundance_matrix("rseq.tsv", container_rseq, all_samples)
# First three lines of the file
with open("fseq.tsv") as f:
    print(f.readline(), end="")
    print(f.readline(), end="")
    print(f.readline(), end="")
OTU	DRS176881	DRS176886	DRS176892	DRS176899	DRS176917	DRS176930	DRS176935	DRS176942	DRS176960
AGGGGAATATTGGACAATGGGCGCAAGCCTGATCCAGCCATACCGCGTGGGTGAAGAAGGCCTTCGGGTTGTAAAGCCCTTTTGTTGGGAAAGAAATCCA	0	0	113	0	0	0	0	0	0
AGGGGGATATTGCACAATGGGGGAAACCCTGATGCAGCGACGCCGCGTGGAGGAAGAAGGTTTTCGGATTGTAAACTCCTGTCGTTAGGGACGATAATGA	0	222	0	0	0	0	0	0	0