# -*- coding: utf-8 -*- """ CEFR level classifier for Swedish as a second language. Both learner and expert written texts can be assessed. """ from __future__ import division, with_statement import os import codecs import cPickle import json import sys #sys.path.append("/export/cgi-bin_sb/icall/HitEx") # activate Virtualenv to access Python packages installed there activate_this = os.path.join("/export/cgi-bin_sb/larkalabb", 'venv/bin/activate_this.py') execfile(activate_this, dict(__file__=activate_this)) import numpy as np from sklearn import svm import call_sparv from kelly import process_csv, cefr_scale from ling_units import Text from dataset import Dataset from item_collector import ItemCollector from auxiliaries.dset_proc_aux import put_feature_value as addict from auxiliaries.dset_proc_aux import put_feature_value_list as addict_list global hitex_path hitex_path = "/export/cgi-bin_sb/larkalabb/HitEx/" def set_ref_level(ref_level): """ Sets reference level, defaulting to B1 if no level is provided. """ if ref_level: level = ref_level else: level = "B1" # default return level def create_dataset(dset_path, analysis_level, dset_type, input_files, dset_file, level): """Processes the input text, creates a Dataset instance of it and extracts statistics to be used as input for feature extraction. """ collector = ItemCollector(dset_path, analysis_level, dset_type, input_files, level=level) dset_inst = Dataset(dset_path,analysis_level,dset_type) dset_inst.get_set(collector) #dset_inst.print_info() dset_inst.save_set(dset_file) return dset_inst def extract_features(dset_inst, dset_path, analysis_level, dset_type, input_files, dset_file, level): "Extracts feature values for the input text. Feature set described in Pilán et al. (2015)." num_label = False # 'False' for categorical (A1,A2 etc.), 'True' for # numerical labels (1,2 etc.) with_relevance = False # GDEX type filtering - not relevant here save_info = False # whether to save extracted features etc # load lists (TO DO: from cPickled file instead?) kelly_list = process_csv(hitex_path + "word_lists/kelly_sv.csv") #TO DO: keep only the other version of SVALex? svalex_list = process_csv(hitex_path + "word_lists/SVALex_final.csv") # check if necessary arff_labels ="{%s}" % level values_line = dset_inst.extract_features(dset_file,kelly_list, svalex_list, num_label, arff_labels, save_info, with_relevance) feature_values = [float(v) for v in values_line[0].split(" ")] feature_values = np.array([feature_values]) return feature_values def load_classifier(produced_by): """ Loads a saved classifier based who the text was written by ('produced_by'): 'learners' or 'experts'. """ os.chdir(hitex_path + "classifiers/") if produced_by == "learner": saved_clf = "essay_classifier.pkl" elif produced_by == "expert": saved_clf = "readingtext_classifier.pkl" else: print "'produced_by' must equal 'learner' or 'expert'" with open(saved_clf, 'rb') as fid: loaded_clf = cPickle.load(fid) return loaded_clf def classify(loaded_clf, instance): """Classifies instance with a previously saved classifier (SVM from scikit-learn). Returns the predicted CEFR level.""" pred = loaded_clf.predict(instance)[0] conv_lbl = {"A1":1, "A2":2, "B1":3, "B2":4, "C1":5, "C2":6} pred_cefr = [k for k,v in conv_lbl.items() if v == pred][0] return pred_cefr def compute_stats(dset_inst, feature_values): """ Computes statistics for dataset instances. Features must be extracted before using instance as argument. """ stats = dset_inst.stats_objects # frequent words without lemmas: sparv_fix = ["som", "än", "att", "många", "fler", "flera", "flest", "flesta"] l2_lists = {} for lname in ["svalex", "swell"]: with codecs.open(hitex_path + "word_lists/" + lname +".pkl") as f: l = cPickle.load(f) l2_lists[lname] = l avg_stats = {} for text in stats: summed_stats = {"svalex_CEFR":{}, "swell_CEFR":{}, "levelled_text":[]} nr_sents = len(text) # compute stats per sentence for sent_stats in text: tokens = sent_stats["tokens"] nr_tokens = len(sent_stats["tokens"]) addict(summed_stats,"nr_tokens", nr_tokens) avg_tok_len = sum(sent_stats["tok_len"]) / len(sent_stats["tok_len"]) addict(summed_stats,"avg_tok_len", avg_tok_len) addict(summed_stats, "long_tokens", sent_stats.get("long_w", 0)) avg_dep_len = round(sum(sent_stats["dep_len"]) / nr_tokens, 2) # length of dependency arcs addict(summed_stats,"avg_dep_len", avg_dep_len) # Kelly - only amount saved in SentStatistics, not the actual tokens if "kelly_CEFR" in summed_stats: for k,v in sent_stats["voc_cefr"].items(): addict(summed_stats["kelly_CEFR"], k, int(v)) else: summed_stats["kelly_CEFR"] = sent_stats["voc_cefr"] # CEFR from SweLL and SVALex for token in tokens: #print token.word token_info = [token.word] #print token.word.encode("utf-8"), token.pos summed_stats["punct"] = 0 if token.pos in ["MID", "MAD", "PAD"]: addict(summed_stats, "punct", 1) for listname, l2_list in sorted(l2_lists.items()): if token.lemma: lemma = token.lemma[0] k = (lemma, token.pos) if k in l2_list: level = l2_list[k] else: level = "?" elif token.pos in ["RG", "RO"]: # digits, punctuation counted as A1 level = "A1" elif token.pos in ["MID", "MAD", "PAD"]: level = "" elif token.word in sparv_fix: level = "A1" else: level = "-" # non-lemmatized tokens, same for both resources token_info.append(level) if level: addict_list(summed_stats[listname + "_CEFR"],level,token.word) summed_stats["levelled_text"].append(tuple(token_info)) #word, svalex_CEFR, swell_CEFR # average over sentences for k, v in summed_stats.items(): if type(v) == int and "avg" in k: avg_stats[k] = round(v / nr_sents, 2) elif type(v) == list: #levelled_text avg_stats[k] = v elif type(v) == dict and "CEFR" in k: avg_stats[k] = {} for cefr, words in v.items(): try: avg_stats[k][cefr] = len(words) except TypeError: #kelly_CEFR avg_stats[k][cefr] = int(words) nr_words = summed_stats["nr_tokens"] - summed_stats["punct"] avg_stats["LIX"] = int((nr_words / nr_sents) + \ (summed_stats["long_tokens"] *100 / nr_words)) avg_stats["nominal_ratio"] = round(feature_values[0][42], 2) # smoothed value avg_stats["PNtoNN"] = round(feature_values[0][13], 2) # smoothed value when non-zero avg_stats["nr_sents"] = nr_sents avg_stats["avg_sent_len"] = round(summed_stats["nr_tokens"] / nr_sents, 2) avg_stats["nr_tokens"] = summed_stats["nr_tokens"] avg_stats["avg_tok_len"] = round(summed_stats["avg_tok_len"] / nr_sents, 2) avg_stats["avg_dep_len"] = round(summed_stats["avg_dep_len"] / nr_sents, 2) avg_stats["non-lemmatized"] = avg_stats["svalex_CEFR"].get("-",0) for wl in ["kelly", "svalex", "swell"]: if "-" in avg_stats [wl + "_CEFR"]: del avg_stats[wl + "_CEFR"]["-"] # TO DO: implement only wordlist based suggested CEFR level #avg = round(sum([v for k,v in avg_stats[wl + "_CEFR"].items()])/avg_stats["nr_tokens"]) #avg_stats["CEFR_" + wl + "_avg"] = [k for k,v in cefr_scale.items() if v == avg][0] return avg_stats def analyze_lg_complexity(text, ref_level, produced_by, CEFR_ML=True): """ Analyze a text for linguistic complexity in terms of CEFR levels and some human-interpretable indicators. """ text_analysis = {} # Annotate with Sparv annotated_text = call_sparv.call_sparv(text) input_files = [annotated_text] # Process and classify text dset_path = hitex_path + "datasets/" analysis_level = "single_text" dset_type = "all" dset_file = analysis_level + "_"+ dset_type + "_dset.pkl" #os.chmod(dset_path+analysis_level+"/"+dset_type+"/"+dset_file, 755) level = set_ref_level(ref_level) dset_inst = create_dataset(dset_path, analysis_level, dset_type, input_files, dset_file, level) feature_values = extract_features(dset_inst, dset_path, analysis_level, dset_type, input_files, dset_file, level) stats = compute_stats(dset_inst, feature_values) for k in stats: text_analysis[k] = stats[k] if CEFR_ML: loaded_clf = load_classifier(produced_by) cefr_level = classify(loaded_clf, feature_values) text_analysis["CEFR_ML"] = cefr_level jsonized_result = json.dumps(text_analysis, sort_keys=True, indent=4) return jsonized_result # text_analysis # ## Example run # text = u"Du är hungrig. Han sover. De läser en bok." # ref_level = "A1" # level of test or student if known # # used for features with prefix "diff_" # produced_by = "learner" # learner or expert # result = analyze_lg_complexity(text, ref_level, produced_by) # print result