Amplicon PiCrust#
This document contains many functions for all kinds of amplicon analysis used in unlock
Connection settings#
from irods.session import iRODSSession
import ssl
from irods.meta import iRODSMeta
import os, sys
# iRODS authentication information
host = "unlock-icat.irods.surfsara.nl"
port = 1247
zone = "unlock"
user = os.getenv('irodsUserName')
password = os.getenv('irodsPassword')
# SSL settings
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=None, capath=None, cadata=None)
ssl_settings = {'irods_client_server_negotiation': 'request_server_negotiation',
'irods_client_server_policy': 'CS_NEG_REQUIRE',
'irods_encryption_algorithm': 'AES-256-CBC',
'irods_encryption_key_size': 32,
'irods_encryption_num_hash_rounds': 16,
'irods_encryption_salt_size': 8,
"irods_authentication_scheme": "pam",
'ssl_context': context}
Data retrieval#
from irods.models import Collection, DataObject
from irods.column import Criterion
import os
GROUP = "wur.unlock.hivghana_drp0070"
# Standard authentication
with iRODSSession(host = host, port = port, user = user, password = password, zone = zone, **ssl_settings) as session:
# Set the filters to select the files needed for downloading
results = session.query(Collection, DataObject).filter( \
Criterion('like', Collection.name, '%'+GROUP+'%')).filter( \
Criterion('like', DataObject.name, '%_predicted.tsv.gz'))
for r in results:
# Download the result files
data_file = r[Collection.name] + "/" + r[DataObject.name]
# Original path in irods is local path from current location
if not os.path.exists("." + data_file):
os.makedirs("." + r[Collection.name], exist_ok=True)
print("Downloading: " + data_file, end="\r")
session.data_objects.get(data_file, "." + data_file)
else:
print("Already received: " + data_file + " "*10, end="\r")
Downloading: /unlock/home/wur.unlock.hivghana_drp0070/STU_DRP007092/OBS_XDRS176960/SAM_DRS176960/amplicon/ASY_DRR243924/NGTAX_Silva138.1-picrust_100_DRR243924/3_PICRUSt2/TIGRFAM_predicted.tsv.gz
MIMOSA data preparation#
All NG-Tax TURTLE data needs to be downloaded from the iRODS instance
# ...
from irods.models import Collection, DataObject
from irods.column import Criterion
import os
# Standard authentication
with iRODSSession(host = host, port = port, user = user, password = password, zone = zone, **ssl_settings) as session:
# Set the filters to select the files needed for downloading
results = session.query(Collection, DataObject).filter( \
Criterion('like', Collection.name, '%'+GROUP+'%')).filter( \
Criterion('like', DataObject.name, '%ttl'))
# Download the result files
for r in results:
# Collection and data object
data_file = r[Collection.name] + "/" + r[DataObject.name]
# Original path in irods is local path from current location
if not os.path.exists("." + data_file):
# Create the directory if it does not exist
os.makedirs("." + r[Collection.name], exist_ok=True)
print("Downloading: " + data_file + " "*10, end="\r")
# Download the file
session.data_objects.get(data_file, "." + data_file)
else:
print("Already received: " + data_file + " "*10, end="\r")
Already received: /unlock/trash/home/jkoehorst/wur.unlock.hivghana_drp0070/demo-13.ttl 76960/amplicon/ASY_DRR243924/NGTAX_Silva138.1-picrust_100_DRR243924/2_Classification/DRR243924_NG-Tax_100.ttl
def create_repository(label):
import requests
# RDF4J Repository config file
content = """
#
# RDF4J configuration template for a GraphDB repository
#
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
@prefix rep: <http://www.openrdf.org/config/repository#>.
@prefix sr: <http://www.openrdf.org/config/repository/sail#>.
@prefix sail: <http://www.openrdf.org/config/sail#>.
@prefix graphdb: <http://www.ontotext.com/config/graphdb#>.
[] a rep:Repository ;
rep:repositoryID "XXXX" ;
rdfs:label "" ;
rep:repositoryImpl [
rep:repositoryType "graphdb:SailRepository" ;
sr:sailImpl [
sail:sailType "graphdb:Sail" ;
graphdb:read-only "false" ;
# Inference and Validation
graphdb:ruleset "rdfsplus-optimized" ;
graphdb:disable-sameAs "true" ;
graphdb:check-for-inconsistencies "false" ;
# Indexing
graphdb:entity-id-size "32" ;
graphdb:enable-context-index "false" ;
graphdb:enablePredicateList "true" ;
graphdb:enable-fts-index "false" ;
graphdb:fts-indexes ("default" "iri") ;
graphdb:fts-string-literals-index "default" ;
graphdb:fts-iris-index "none" ;
# Queries and Updates
graphdb:query-timeout "0" ;
graphdb:throw-QueryEvaluationException-on-timeout "false" ;
graphdb:query-limit-results "0" ;
# Settable in the file but otherwise hidden in the UI and in the RDF4J console
graphdb:base-URL "http://example.org/owlim#" ;
graphdb:defaultNS "" ;
graphdb:imports "" ;
graphdb:repository-type "file-repository" ;
graphdb:storage-folder "storage" ;
graphdb:entity-index-size "10000000" ;
graphdb:in-memory-literal-properties "true" ;
graphdb:enable-literal-index "true" ;
]
].
"""
content = content.replace("XXXX", label)
writer = open("config.ttl","w")
print(content, file=writer)
writer.close()
headers = {
# requests won't add a boundary if this header is set when you pass files=
# 'Content-Type': 'multipart/form-data',
}
files = {
'config': open('./config.ttl', 'rb'),
}
response = requests.post('http://localhost:7200/rest/repositories', headers=headers, files=files)
# print(response.__dict__)
# Process the turtle files
from rdflib import Graph
import requests, json
import time
# Check if GraphDB runs:...
# docker volume create graphdb
# docker run -v graphdb:/opt/graphdb/dist/data --name graphdb -p 7200:7200 ontotext/graphdb:10.1.3 -Xmx6g
graphdb = 'http://localhost:7200'
# GraphDB restrictions
REPO = GROUP.replace("wur.unlock.", "wur_unlock_")
def check_graphdb():
headers = {
'Accept': 'application/json',
}
response = requests.get(graphdb + '/rest/repositories', headers=headers)
content = json.loads(response.content.decode())
repo_available = False
print("Available repositories:")
for repository in content:
if repository['id'] == REPO:
print(repository['id'])
repo_available = True
if not repo_available:
# Create repository
create_repository(REPO)
def load_data(data_file):
headers = {
'Content-Type': 'application/x-turtle',
'Accept': 'application/json'
}
with open(data_file, 'rb') as f:
response = requests.post(graphdb + '/repositories/'+REPO+'/statements', data=f, headers=headers)
if "Unknown repository:" in response.content.decode():
raise Exception(response.content.decode())
###
to_be_loaded = set()
check_graphdb()
for current_dir, subdirs, files in os.walk( './unlock/' ):
for file in files:
if (file.endswith(".ttl")) and GROUP in current_dir:
to_be_loaded.add(current_dir + "/" + file)
for index, file_path in enumerate(to_be_loaded):
current = time.time()
load_data(file_path)
end = time.time()
diff = int(end - current)
print("Loaded file ",index, "of", len(to_be_loaded),"in", diff, "seconds with",file_path.split("/")[-1], end="\r")
Available repositories:
test
gbol
Workflow
refine
ngtax
envo
example
geo
bacdive
Gyan2
wur_unlock_hivghana_drp0070
isa
bla
observations
mgnify
meesike
ena
playground
step
nl-restaurants
Gyan
Loaded file 28 of 29 in 2 seconds with DRR243850_NG-Tax_100.ttll
MIMOSA table generator#
from SPARQLWrapper import SPARQLWrapper, JSON
# Make connection with sparql endpoint containing all NG-Tax 2.0 data
sparql = SPARQLWrapper("http://localhost:7200/repositories/" + REPO)
sparql.setReturnFormat(JSON)
# Get all sample name mappings
# Query for column headers to sample name
query = """
PREFIX gbol: <http://gbol.life/0.1/>
PREFIX schema: <http://schema.org/>
PREFIX sschema: <https://schema.org/>
PREFIX jerm: <http://jermontology.org/ontology/JERMOntology#>
select distinct ?sample_name ?sample_id where {
?ngtax_sample gbol:name ?sample_name .
?ngtax_sample gbol:metadata ?mappingFile .
?mappingFile a gbol:MappingFile .
?mappingFile gbol:fastQforward ?fwd .
?mappingFile gbol:fastQreverse ?rev .
?sample jerm:hasPart/jerm:hasPart/schema:identifier ?fwd .
?sample jerm:hasPart/jerm:hasPart/schema:identifier ?rev .
?sample a jerm:Sample .
?sample schema:identifier ?sample_id .
}"""
sparql.setQuery(query)
ret = sparql.queryAndConvert()
# Iterate over the sample names
sample_name_lookup = {}
for index, r in enumerate(ret["results"]["bindings"]):
sample_name_lookup[r['sample_name']['value']] = r['sample_id']['value']
from SPARQLWrapper import SPARQLWrapper, JSON
# Make connection with sparql endpoint containing all NG-Tax 2.0 data
sparql = SPARQLWrapper("http://localhost:7200/repositories/" + REPO)
sparql.setReturnFormat(JSON)
# Obtain all samples to make it more scalable
query = """
PREFIX gbol:<http://gbol.life/0.1/>
SELECT DISTINCT ?sample
WHERE {
?sample a gbol:Sample .
}
"""
sparql.setQuery(query)
ret = sparql.queryAndConvert()
# Create containers for the forward, reverse ASV and the sample list
container_fseq = {}
container_rseq = {}
all_samples = set()
# Iterate over the samples
for index, r in enumerate(ret["results"]["bindings"]):
# For each sample get all ASVs and abundance information
sample = r['sample']['value']
print("Processed sample number", index, end='\r')
query = """
PREFIX gbol:<http://gbol.life/0.1/>
SELECT DISTINCT ?name ?fseq ?rseq ?count
WHERE {
VALUES ?sample { <"""+sample+"""> }
?sample a gbol:Sample .
?sample gbol:name ?name .
?sample gbol:asv ?asv .
?asv a gbol:ASVSet .
?asv gbol:forwardASV ?fasv .
?fasv gbol:sequence ?fseq .
?asv gbol:reverseASV ?rasv .
?rasv gbol:sequence ?rseq .
?asv gbol:clusteredReadCount ?count .
}"""
# Set query and iterate over results
sparql.setQuery(query)
ret = sparql.queryAndConvert()
for r in ret["results"]["bindings"]:
name = r['name']['value']
fseq = r['fseq']['value']
rseq = r['rseq']['value']
count = int(r['count']['value'])
# Sample renaming using the previous lookup
name = sample_name_lookup[name]
# List of sample names
all_samples.add(name)
# Fseq object fill method
if fseq not in container_fseq:
container_fseq[fseq] = {}
if name not in container_fseq[fseq]:
container_fseq[fseq][name] = 0
container_fseq[fseq][name] += count
# Rseq object fill method
if rseq not in container_rseq:
container_rseq[rseq] = {}
if name not in container_rseq[rseq]:
container_rseq[rseq][name] = 0
container_rseq[rseq][name] += count
Processed sample number 8
import numpy as np
import pandas as pd
# Create an abundance matrix using numpy
def create_abundance_matrix(name, container, all_samples):
matrix = np.zeros((len(container), len(all_samples)), dtype=int)
all_samples = sorted(list(all_samples))
asvs = sorted(container.keys())
for row_index, asv in enumerate(asvs):
for sample in container[asv]:
coll_index = all_samples.index(sample)
value = container[asv][sample]
matrix[row_index,coll_index] = int(value)
# Write to file as panda frame with column and row names
df = pd.DataFrame(matrix, columns=all_samples, index=asvs)
df.index.name="OTU"
df.to_csv(name, sep="\t")
# Creating the RSEQ tsv file
create_abundance_matrix("fseq.tsv", container_fseq, all_samples)
# Creating the FSEQ tsv file
create_abundance_matrix("rseq.tsv", container_rseq, all_samples)