Amplicon Mimosa#

This document contains many functions for all kinds of amplicon analysis used at the UNLOCK FAIR data platform.

Note

Please note that this document is a work in progress.

MIMOSA Data retrieval using iBridges#

This page assumes you have already installed iBridges and created an irods json file. Check the iBridges python client page for the latest information on how to install, authenticate, and retrieve data from iBridges.

First, fill in your SRAM username and token:

from pathlib import Path
from ibridges import Session
from getpass import getpass

# Set iRODS environment directory and environment file
irods_env_dir = "~/.irods"
irods_env_file = "irods_environment.json"

# Provide your SRAM username
if not 'username' in locals():
    username = input("Enter your SRAM username: ")

env_loc = Path(irods_env_dir) / irods_env_file
env_file = Path.expanduser(Path(env_loc))

if not 'password' in locals():
    password = getpass("Enter your SRAM password: ")

try:
    session = Session(env_file, password=password)
except Exception as e:
    print("Failed to establish session:", e)
    del password

Set the INVESTIGATION and STUDY variables to the corresponding names of your Investigation and Study.

This will download All NG-Tax TURTLE data from the iRODS instance

from ibridges import IrodsPath, search_data, download
from pathlib import Path
import os

# Set investigation and study variables. Investigation name corresponds to the folder located at /unlock/home/wur.<name> and study to a subfolder /stu_<name>
investigation = "SET_INVESTIGATION" 
study = "SET_STUDY"

# Define where to download files locally
download_path = "./unlock_downloads/"+investigation+"/"+study 

# Create the directory if it doesn't exist yet
download_dir = Path.expanduser(Path(download_path))
download_dir.mkdir(parents=True, exist_ok=True )

# Define specified file pattern (e.g. ttl files)
pattern="%.ttl"
# Define search path
search = f"/unlock/home/wur.{investigation}/stu_{study}/"
data = search_data(session, path=IrodsPath(session, search), path_pattern=pattern)

# Set options and counters
overwrite = False
downloaded, skipped = 0,0
unique_folders = []

# Iterate over search results and download files
for item in data:
    path = str(item.absolute())  # Convert path to string
    local_destination = Path(download_path)  # Base local directory
    file_destination = local_destination / item.name  # Construct local file path

    if item.dataobject_exists():  # Check if file
        local_destination.mkdir(parents=True, exist_ok=True)
        if not file_destination.exists() or overwrite:  # Download only if not present
            download(session, item, file_destination, overwrite=overwrite)
            downloaded += 1
        else:
            skipped += 1

# Print download summary
print("\nDownloaded: ", downloaded)
print("Skipped: ", skipped)
def create_repository(label):
  import requests
  # RDF4J Repository config file
  content = """
#
# RDF4J configuration template for a GraphDB repository
#
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
@prefix rep: <http://www.openrdf.org/config/repository#>.
@prefix sr: <http://www.openrdf.org/config/repository/sail#>.
@prefix sail: <http://www.openrdf.org/config/sail#>.
@prefix graphdb: <http://www.ontotext.com/config/graphdb#>.

[] a rep:Repository ;
    rep:repositoryID "XXXX" ;
    rdfs:label "" ;
    rep:repositoryImpl [
        rep:repositoryType "graphdb:SailRepository" ;
        sr:sailImpl [
            sail:sailType "graphdb:Sail" ;

            graphdb:read-only "false" ;

            # Inference and Validation
            graphdb:ruleset "rdfsplus-optimized" ;
            graphdb:disable-sameAs "true" ;
            graphdb:check-for-inconsistencies "false" ;

            # Indexing
            graphdb:entity-id-size "32" ;
            graphdb:enable-context-index "false" ;
            graphdb:enablePredicateList "true" ;
            graphdb:enable-fts-index "false" ;
            graphdb:fts-indexes ("default" "iri") ;
            graphdb:fts-string-literals-index "default" ;
            graphdb:fts-iris-index "none" ;

            # Queries and Updates
            graphdb:query-timeout "0" ;
            graphdb:throw-QueryEvaluationException-on-timeout "false" ;
            graphdb:query-limit-results "0" ;

            # Settable in the file but otherwise hidden in the UI and in the RDF4J console
            graphdb:base-URL "http://example.org/owlim#" ;
            graphdb:defaultNS "" ;
            graphdb:imports "" ;
            graphdb:repository-type "file-repository" ;
            graphdb:storage-folder "storage" ;
            graphdb:entity-index-size "10000000" ;
            graphdb:in-memory-literal-properties "true" ;
            graphdb:enable-literal-index "true" ;
        ]
    ].
  """
  content = content.replace("XXXX", label)
  writer = open("config.ttl","w")
  print(content, file=writer)
  writer.close()

  headers = {
      # requests won't add a boundary if this header is set when you pass files=
      # 'Content-Type': 'multipart/form-data',
  }

  files = {
      'config': open('./config.ttl', 'rb'),
  }

  response = requests.post('http://localhost:7200/rest/repositories', headers=headers, files=files)
  print(response.__dict__)
# Process the turtle files
from rdflib import Graph
import requests, json
import time  

# Check if GraphDB runs:...
# docker volume create graphdb
# docker run -v graphdb:/opt/graphdb/dist/data --name graphdb -p 7200:7200 ontotext/graphdb:10.1.3 -Xmx6g
graphdb = 'http://localhost:7200'

# GraphDB restrictions
REPO = investigation.replace(".", "_")

def check_graphdb():
    headers = {
        'Accept': 'application/json',
    }

    response = requests.get(graphdb + '/rest/repositories', headers=headers)
    content = json.loads(response.content.decode())
    repo_available = False
    print("Available repositories:")
    for repository in content:
        if repository['id'] == REPO:
            print(repository['id'])
            repo_available = True
    if not repo_available:
        # Create repository
        create_repository(REPO)

def load_data(data_file):
    headers = {
    'Content-Type': 'application/x-turtle',
    'Accept': 'application/json'
    }

    with open(data_file, 'rb') as f:
        response  = requests.post(graphdb + '/repositories/'+REPO+'/statements', data=f, headers=headers)
        if "Unknown repository:" in response.content.decode():
            raise Exception(response.content.decode())

###
to_be_loaded = set()
check_graphdb()

search_path = f"./unlock_downloads/{investigation}/{study}"
for current_dir, subdirs, files in os.walk(search_path):
    for file in files:
        if (file.endswith(".ttl")) and investigation in current_dir:
            to_be_loaded.add(current_dir + "/" + file)

if len(to_be_loaded) == 0:
    print("No files to be loaded")

for index, file_path in enumerate(to_be_loaded):
    current = time.time()
    load_data(file_path)
    end = time.time()
    diff = int(end - current)
    print("Loaded file ",index, "of", len(to_be_loaded),"in", diff, "seconds with",file_path.split("/")[-1], " "*10, end="\r")
            

MIMOSA table generator#

from SPARQLWrapper import SPARQLWrapper, JSON
# Make connection with sparql endpoint containing all NG-Tax 2.0 data
sparql = SPARQLWrapper("http://localhost:7200/repositories/" + REPO)
sparql.setReturnFormat(JSON)

# Get all sample name mappings
# Query for column headers to sample name
query = """
PREFIX gbol: <http://gbol.life/0.1/>
PREFIX schema: <http://schema.org/>
PREFIX jerm: <http://jermontology.org/ontology/JERMOntology#>
SELECT DISTINCT ?sample_name ?sample_id WHERE { 
	?ngtax_sample gbol:name ?sample_name .
    ?ngtax_sample gbol:metadata ?mappingFile .
    ?mappingFile a gbol:MappingFile .
    ?mappingFile gbol:fastQforward ?fwd .
    ?mappingFile gbol:fastQreverse ?rev .
    ?sample jerm:hasPart/jerm:hasPart/schema:identifier ?fwd .
    ?sample jerm:hasPart/jerm:hasPart/schema:identifier ?rev .
    ?sample a jerm:Sample .
    ?sample schema:identifier ?sample_id .
}"""

sparql.setQuery(query)
ret = sparql.queryAndConvert()
# Iterate over the sample names
sample_name_lookup = {}
for index, r in enumerate(ret["results"]["bindings"]):
    sample_name_lookup[r['sample_name']['value']] = r['sample_id']['value']
    
from SPARQLWrapper import SPARQLWrapper, JSON
# Make connection with sparql endpoint containing all NG-Tax 2.0 data
sparql = SPARQLWrapper("http://localhost:7200/repositories/" + REPO)
sparql.setReturnFormat(JSON)

# Obtain all samples to make it more scalable
query = """
PREFIX gbol:<http://gbol.life/0.1/>
SELECT DISTINCT ?sample
WHERE {
    ?sample a gbol:Sample .
}
"""

sparql.setQuery(query)
ret = sparql.queryAndConvert()
# Create containers for the forward, reverse ASV and the sample list
container_fseq = {}
container_rseq = {}
all_samples = set()
# Iterate over the samples
for index, r in enumerate(ret["results"]["bindings"]):
    # For each sample get all ASVs and abundance information
    sample = r['sample']['value']
    print("Processed sample number", index, end='\r')
    query = """
    PREFIX gbol:<http://gbol.life/0.1/>
    SELECT DISTINCT ?name ?fseq ?rseq ?count
    WHERE {
    VALUES ?sample { <"""+sample+"""> }
    ?sample a gbol:Sample .
    ?sample gbol:name ?name .
    ?sample gbol:asv ?asv .
    ?asv a gbol:ASVSet . 
    ?asv gbol:forwardASV ?fasv .
    ?fasv gbol:sequence ?fseq .
    ?asv gbol:reverseASV ?rasv . 
    ?rasv gbol:sequence ?rseq .
    ?asv gbol:clusteredReadCount ?count .
    }"""
    # Set query and iterate over results
    sparql.setQuery(query)
    ret = sparql.queryAndConvert()
    for r in ret["results"]["bindings"]:
        raw_name = r['name']['value']
        # look up, but if missing just use the raw name
        name = sample_name_lookup.get(raw_name, raw_name)
        fseq = r['fseq']['value']
        rseq = r['rseq']['value']
        count = int(r['count']['value'])
        all_samples.add(name)
        # Fseq object fill method
        if fseq not in container_fseq:
            container_fseq[fseq] = {}
        if name not in container_fseq[fseq]:
            container_fseq[fseq][name] = 0
        container_fseq[fseq][name] += count

        # Rseq object fill method
        if rseq not in container_rseq:
            container_rseq[rseq] = {}
        if name not in container_rseq[rseq]:
            container_rseq[rseq][name] = 0
        container_rseq[rseq][name] += count
import numpy as np
import pandas as pd

# Create an abundance matrix using numpy
def create_abundance_matrix(name, container, all_samples):
    matrix = np.zeros((len(container), len(all_samples)), dtype=int)
    all_samples = sorted(list(all_samples))
    asvs = sorted(container.keys())
    for row_index, asv in enumerate(asvs):
        for sample in container[asv]:
            coll_index = all_samples.index(sample)
            value = container[asv][sample]
            matrix[row_index,coll_index] = int(value)

    # Write to file as panda frame with column and row names
    df = pd.DataFrame(matrix, columns=all_samples, index=asvs)
    df.index.name="OTU"
    df.to_csv(name, sep="\t")
# Creating the RSEQ tsv file
create_abundance_matrix("fseq.tsv", container_fseq, all_samples)
# Creating the FSEQ tsv file
create_abundance_matrix("rseq.tsv", container_rseq, all_samples)
# First three lines of the file
with open("fseq.tsv") as f:
    print(f.readline(), end="")
    print(f.readline(), end="")
    print(f.readline(), end="")