[Python] 漫画のWikipediaの説明文から発表年を推定する

こんにちは。Link-Uの町屋敷です。

今回は、テキストデータを解析する一例として、

前回抽出した漫画のWikipediaの文章データを使って、

入力データを説明文、出力データを発表年として、入力データから出力データを推定して行きたいと思います。

また、入力データのどの要素(今回なら単語)がその回帰や分類に効力があるのかを調べる方法も紹介していきたいです。

インフォボックスから発表年のデータを取得する

まず、正解データとして使用する発表年データをインフォボックスから収集しよう。

Wikipediaのインフォボックスはjsonで要素infoboxに文字列として前回保存した。

これを適当なパーサーで処理したら完了! …とはいかない。

まず、前回保存した本文とインフォボックスjsonを結合する。

結合方法は両方のファイルにtitle情報が含まれているので紐づけするだけ。

def LoadTextJsonGenerator(files):
    for jf in files:
        with open(jf) as f:
            json_data = pd.read_json(f, lines=True)
            yield json_data.to_dict()
            
def JoinJsonData(info_json_data):
    text_json_files = sorted(glob(TEXT_JSON_DIR + '/*/*'))
    
    ltjg = LoadTextJsonGenerator(text_json_files)

    text_data = ltjg.__next__()
    for i, ijd in enumerate(info_json_data):    
        while True:
            if not ijd['title']:
                ijd['title'] = 'NULL'
            id = int(ijd['id'])
            if id in text_data['id'].values():
                data_index = list(text_data['id'].values()).index(id)
                info_json_data[i]['text'] = text_data['text'][data_index]
                if ijd['title'] == 'NULL':
                    info_json_data[i]['title'] = text_data['title'][data_index]
                break
            else:
                #print((i,id)) #確認用
                text_data = ltjg.__next__() #StopIterationしたらどっかバグってる

def Preprocess():
    with open(INFOBOX_JSON_DIR + '/' + INFOBOX_FILE_NAME) as infof:
        info_json_data = json.load(infof)
        
    JoinJsonData(info_json_data)
    with open(WRITE_JSON_DIR  + '/' + WRITE_JSON_FILE_NAME, 'w', encoding='utf_8') as jw: 
        json.dump(info_json_data, jw, ensure_ascii=False)

 

どうやらインフォボックスの要素名は微妙な表記ブレや要素名だけあってデータが入っていないことが割とよくあるので、

表記ブレに対応しつつ、もったいないけどデータが入っていない漫画のデータを捨てる必要がある。

このへんは正規表現で頑張ったらなんとかなる。今回は開始、発表期間、連載期間、発表号に表記ブレしていた。(ソース全文は一番下)

infobox = j['infobox'][0] #複数ある場合でも最初のもののみを使う
publication_year = re.search('\| *[開始|発表期間|連載期間|発表号].*?([1|2][8|9|0]\d\d)[年|\.]', infobox, re.MULTILINE | re.DOTALL) #@UndefinedVariable
if not publication_year:
    labels[i] = -1
    vain_count += 1
else:
    labels[i] = publication_year[1]

これで、labelsに年数が入った。データが入っていないときlabelsに-1を入れっておくことで後で対応する文章を消すことができる。

なかなかきれいなポアソン分布。

まあ一定時間で区切ったカウントデータだしね……

テキストを学習機に入れられる形に変形する

次に入力データを加工する

labelsの使えるデータの数を数えると5000ちょいだった。

この数でニューラルネットにに直接ぶち込んでLSTMとかを使って解析するのは厳しいので、BoWに加工する。

具体的には、文章を単語単位に分離して、そのうちの名詞、形容詞、動詞に番号を付けて、文章中のその各単語の出現回数を数える。

with MeCab() as mecab:

文章を単語単位に分離する手法は形態素解析と言われるが、ライブラリで簡単にできる。有名なのはChasenとかMecabだが、今回はMecabとpythonでMecabを使えるようなするNatto-pyを使う。インストール方法は調べたら山程出てくるので省略。

            words = []
            text = j['text']
            for mp in mecab.parse(text, as_nodes=True):
                if not mp.is_eos():
                    feature_splits = mp.feature.split(',')
                    if feature_splits[0] in ['名詞', '動詞', '形容詞']:
                        if feature_splits[1] in ['数']:
                            continue
                        elif feature_splits[2] in ['人名']:
                            continue
                        elif feature_splits[6] in ['*']:
                            continue
                        words.append(feature_splits[6])

if feature_splits[0] in [‘名詞’, ‘動詞’, ‘形容詞’]:の行で品詞を絞って、

その後、出現する単語のうち3とか2001などの数は、答えを書いている可能性があるので除去、

また、人名もその人が活躍する時期はある程度偏ってるはずなので、ほぼ答えになるじゃんと言う事で削除した。

def MakeDict(all_words):
    dictionary = corpora.Dictionary(all_words)
    print(dictionary.token2id)
    for no_below in [5,20,40]:
        for no_above in [0.1,0.3,0.5]:
            dictionary.filter_extremes(no_below=no_below, no_above=no_above)
            dictionary.save_as_text('filtered_dic_below{0}_above{1}.txt'.format(no_below, no_above))

単語への番号付けは、専用の辞書を作って行う。

これもgensimというライブラリがある。Mecabと同様これも情報は大量にあるので省略(例えばここここ

def MakeDict(all_words):
    dictionary = corpora.Dictionary(all_words)
    print(dictionary.token2id)
    for no_below in [5,20,40]:
        for no_above in [0.1,0.3,0.5]:
            dictionary.filter_extremes(no_below=no_below, no_above=no_above)
            dictionary.save_as_text('filtered_dic_below{0}_above{1}.txt'.format(no_below, no_above))

辞書を作る部分がここで、no_belowやno_aboveで作り分けたが今回のデータセットでは違いはほぼなかったので、no_below=5, no_above=0.1を使うことにする。

def MakeFeatures(make_dict = False, dict_param = [5, 0.1]):
    
    all_words = joblib.load('{0}/all_wordss.pkl'.format(WRITE_JOBLIB_DIR))
    labels = joblib.load('{0}/publication_years.pkl'.format(WRITE_JOBLIB_DIR))
    if (make_dict):
        MakeDict(all_words)
    
    dictionary = corpora.Dictionary.load_from_text('filtered_dic_below{0}_above{1}.txt'.format(dict_param[0], dict_param[1]))
    
    dl = len(dictionary)
    features = []
    for w, l in zip(all_words, labels):
        tmp = dictionary.doc2bow(w)
        dense = list(matutils.corpus2dense([tmp], num_terms=len(dictionary)).T[0])
        if not l == -1:
            features.append(dense)
    features = np.array(features)
    features = np.reshape(features, (-1, dl))
    labels = [int(v) for v in labels if v != -1]
    joblib.dump(features, '{0}/features.pkl'.format(WRITE_JOBLIB_DIR))
    joblib.dump(labels, '{0}/labels.pkl'.format(WRITE_JOBLIB_DIR))

ここで、先程作った辞書を使ってBoWに変換し、labelが-1のデータを削除する

ここでTFIDFを使ってもいいが、今回はパス。これでデータセットの成形は完了。

集計した単語のうち数が多かったTop50をおいておく。

データの学習

本来はTPOTとかを駆使していろいろな学習機で調査するべきだが、時間がないので決定木グラディアントブースティングマシーン(以下、GBM)を用いた回帰と比較対象用に線形回帰を行う。

グラディアントブースティングを使用できるライブラリはsklearn,XGBoost,lightgbmと大きく3つあるが、機能の多さと実行の速さを考えるとlightgbmが良い。

def TuneXgboostRgr():
    
    features = joblib.load('{0}/features.pkl'.format(WRITE_JOBLIB_DIR))
    labels  = joblib.load('{0}/labels.pkl'.format(WRITE_JOBLIB_DIR))
    
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.1, random_state=1234)
    
    params = { 
        'learning_rate' : [0.3,0.2,0.1]
    }
    
    gs = GridSearchCV(estimator = LGBMRegressor(
                            num_leaves=31,
                            learning_rate =0.1, 
                            n_estimators=1000,
                            max_depth=9,
                            objective='regression',
                            min_sum_hessian_in_leaf=1
                        ),
                        param_grid = params, 
                        cv=5)

    gs.fit(X_train, y_train)
    print('\n')
    print(gs.cv_results_)
    print('\n')
    print(gs.best_params_)
    print('\n')
    print(gs.best_score_)

まず、ハイパーパラメーターのチューニングをする。paramsにチューニングしたい変数と値を配列で入れるとクロスバリデーションもして一番いいパラメーターを調査してくれる。

ただし、paramsに大量の変数を設定すると永遠に終わらなくなるので、1,2種類ずついれて何回もやる。詳しくはグリッドサーチで検索!

def LearnRegressor(clf_name = 'gbm'):
    from sklearn.linear_model import LinearRegression
    features = joblib.load('{0}/features.pkl'.format(WRITE_JOBLIB_DIR))
    labels  = joblib.load('{0}/labels.pkl'.format(WRITE_JOBLIB_DIR))
    
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.1, random_state=1234)
    
    if clf_name == 'gbm':
        rgr = LGBMRegressor( 
            num_leaves=1000, 
            max_depth=9, 
            learning_rate=0.06, 
            n_estimators=100, 
            objective='regression',
        )
    elif clf_name == 'linear':
        rgr = LinearRegression()
    #rgr = XGBRegressor( n_estimators = 100, learning_rate=0.1)
    
    print('Learning Start')
    time = timer()
    rgr.fit(X_train,y_train)
    time = timer() - time
    print('Learning Finish\n Time: {0}'.format(time))
    y_pred = rgr.predict(X_test)
    
    joblib.dump(rgr, '{0}/{1}_regressor.pkl'.format(WRITE_JOBLIB_DIR, clf_name))
    for yt, yp in zip (y_test, y_pred):
        print((yt,yp))
    print(mean_squared_error(y_test, y_pred))

 

ハイパーパラメーターのチューニングが終了したら、テストデータで評価する。

結果がこちら、左が正解、中央がGBM,右が線形回帰

比べてみると線形回帰はたまに未来や19世紀などのありえない数値を予測している。

実際誤差を計算するとGBM 59.1に対して線形回帰は329.5と大差を付けている。

(  正解, GBM , linear)

(2012, 2008, 2011)
(2008, 2011, 2007)
(2004, 2001, 2002)
(2010, 2006, 2025)
(2004, 2006, 2003)
(2006, 2004, 2012)
(2017, 2012, 2041)
(2006, 2007, 2005)
(2008, 2006, 1999)
(1977, 1985, 1987)
(2003, 2006, 2011)
(2008, 2006, 1986)
(2015, 2005, 1998)
(1992, 1998, 2003)
(2017, 2008, 2012)
(2003, 2006, 2000)
(2010, 2006, 2011)
(2010, 2006, 2011)
(2004, 2003, 2002)
(2010, 2006, 1958)
(1985, 2002, 1997)
(1994, 2003, 1988)
(1990, 1996, 2004)
(1999, 2002, 2006)
(1984, 1995, 2010)
(1987, 2000, 1993)
(1962, 2001, 2000)
(1993, 1992, 2005)
(1997, 2006, 2009)
(1988, 1982, 1989)
(1972, 1989, 1959)
(1987, 1999, 1989)
(1968, 1973, 1967)
(1970, 1982, 1970)
(1991, 2000, 2000)
(1998, 2007, 1994)
(1999, 1998, 1996)
(1994, 1993, 2001)
(1968, 1970, 1872)

回帰を行う学習器を生成できました。

これで、入力データのどの要素がその回帰や分類に効力があるのかを調べることが出来ます。

線形回帰は相関係数を計算すればすぐに見れます。

実はGBMのようなブースティングモデルの場合でも同様に簡単に調査できて、

そのまま重要度として保存されています。(参考)

両者の結果を見比べてみましょう。

 

 

線形回帰の方は法則がそんなに見えません、いんするとかいう謎単語も含まれてるし……

でも、機械学習で出した結果です!って言われたらこんなもんかとは思いそう。

GBMのほうは、昭和や平成、ビデオ、インターネットや現像といった明らかに時代に関係のあるものが紛れているのがわかります。

やたら社名が入っているのは何故だろう……

でもどちらの信用度が高いかは明らかでしょう。

まとめ

今回は漫画のWikipediaの文章データを使って、発表年を推定する方法を紹介しました。

学習器がどの要素(今回なら単語)を注視しているかも調べましたがある単語が強く関係あるとわかったところであまり使い道がありません。

次回では、別の方法でもっといい調査の方法を紹介します。

プログラム全文

import json
from glob import glob
from functools import reduce
import re
from timeit import default_timer as timer

import numpy as np
import pandas as pd
from natto import MeCab
import joblib
from gensim import corpora, matutils

import matplotlib
from matplotlib import pyplot as plt
from matplotlib.font_manager import FontProperties

from sklearn.ensemble import GradientBoostingRegressor
from sklearn.ensemble import GradientBoostingClassifier

#Xgboostは重いので使わない(5倍くらい違う)
#from xgboost import XGBRegressor
#from xgboost import XGBClassifier

from lightgbm import LGBMRegressor
from lightgbm import LGBMClassifier

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

TEXT_JSON_DIR = '../WikipediaComic/whole_data'
INFOBOX_JSON_DIR = '.'
INFOBOX_FILE_NAME = 'wiki_infobox_Infobox_animanga_Manga.json'
WRITE_JSON_DIR = '.'
WRITE_JSON_FILE_NAME = 'joined.json' 

WRITE_JOBLIB_DIR = '.'

#Matplotlibの日本語設定
font_path = '/usr/share/fonts/truetype/takao-gothic/TakaoPGothic.ttf'
font_prop = FontProperties(fname=font_path)
matplotlib.rcParams['font.family'] = font_prop.get_name()

def LoadTextJsonGenerator(files):
    for jf in files:
        with open(jf) as f:
            json_data = pd.read_json(f, lines=True)
            yield json_data.to_dict()
            
def JoinJsonData(info_json_data):
    text_json_files = sorted(glob(TEXT_JSON_DIR + '/*/*'))
    
    ltjg = LoadTextJsonGenerator(text_json_files)

    text_data = ltjg.__next__()
    for i, ijd in enumerate(info_json_data):    
        while True:
            if not ijd['title']:
                ijd['title'] = 'NULL'
            id = int(ijd['id'])
            if id in text_data['id'].values():
                data_index = list(text_data['id'].values()).index(id)
                info_json_data[i]['text'] = text_data['text'][data_index]
                if ijd['title'] == 'NULL':
                    info_json_data[i]['title'] = text_data['title'][data_index]
                break
            else:
                #print((i,id)) #確認用
                text_data = ltjg.__next__() #StopIterationしたらどっかバグってる

def Preprocess():
    with open(INFOBOX_JSON_DIR + '/' + INFOBOX_FILE_NAME) as infof:
        info_json_data = json.load(infof)
        
    JoinJsonData(info_json_data)
    with open(WRITE_JSON_DIR  + '/' + WRITE_JSON_FILE_NAME, 'w', encoding='utf_8') as jw: 
        json.dump(info_json_data, jw, ensure_ascii=False)
    
def ExtractWords():
    with open(WRITE_JSON_DIR  + '/' + WRITE_JSON_FILE_NAME, 'r', encoding='utf_8') as jw: 
        json_data = json.load(jw)
    
    all_words = [[0]] * len(json_data) 
    with MeCab() as mecab:
        for i, j in enumerate(json_data):
            if i % 100 == 0:
                print(i)
            words = []
            text = j['text']
            for mp in mecab.parse(text, as_nodes=True):
                if not mp.is_eos():
                    feature_splits = mp.feature.split(',')
                    if feature_splits[0] in ['名詞', '動詞', '形容詞']:
                        if feature_splits[1] in ['数']:
                            continue
                        elif feature_splits[2] in ['人名']:
                            continue
                        elif feature_splits[6] in ['*']:
                            continue
                        words.append(feature_splits[6])
            all_words[i] = words
            
    joblib.dump(all_words, '{0}/all_wordss.pkl'.format(WRITE_JOBLIB_DIR), compress = True)

def ExtractLabelFromInfobox():
    with open(WRITE_JSON_DIR  + '/' + WRITE_JSON_FILE_NAME, 'r', encoding='utf_8') as jw: 
        json_data = json.load(jw)    
        labels = [0] * len(json_data)
        vain_count = 0
        for i, j in enumerate(json_data):
            if i % 100 == 0:
                print(i)
            infobox = j['infobox'][0] #複数ある場合でも最初のもののみを使う
            publication_year = re.search('\| *[開始|発表期間|連載期間|発表号].*?([1|2][8|9|0]\d\d)[年|\.]', infobox, re.MULTILINE | re.DOTALL) #@UndefinedVariable
            if not publication_year:
                labels[i] = -1
                vain_count += 1
            else:
                labels[i] = publication_year[1]
        joblib.dump(labels, '{0}/publication_years.pkl'.format(WRITE_JOBLIB_DIR), compress = True)
    print(vain_count)
     
def MakeDict(all_words):
    dictionary = corpora.Dictionary(all_words)
    print(dictionary.token2id)
    for no_below in [5,20,40]:
        for no_above in [0.1,0.3,0.5]:
            dictionary.filter_extremes(no_below=no_below, no_above=no_above)
            dictionary.save_as_text('filtered_dic_below{0}_above{1}.txt'.format(no_below, no_above))
            
def MakeFeatures(make_dict = False, dict_param = [5, 0.1]):
    
    all_words = joblib.load('{0}/all_wordss.pkl'.format(WRITE_JOBLIB_DIR))
    labels = joblib.load('{0}/publication_years.pkl'.format(WRITE_JOBLIB_DIR))
    if (make_dict):
        MakeDict(all_words)
    
    dictionary = corpora.Dictionary.load_from_text('filtered_dic_below{0}_above{1}.txt'.format(dict_param[0], dict_param[1]))
    
    dl = len(dictionary)
    features = []
    for w, l in zip(all_words, labels):
        tmp = dictionary.doc2bow(w)
        dense = list(matutils.corpus2dense([tmp], num_terms=len(dictionary)).T[0])
        if not l == -1:
            features.append(dense)
    features = np.array(features)
    features = np.reshape(features, (-1, dl))
    labels = [int(v) for v in labels if v != -1]
    joblib.dump(features, '{0}/features.pkl'.format(WRITE_JOBLIB_DIR))
    joblib.dump(labels, '{0}/labels.pkl'.format(WRITE_JOBLIB_DIR))
            


def LearnRegressor(clf_name = 'gbm'):
    from sklearn.linear_model import LinearRegression
    features = joblib.load('{0}/features.pkl'.format(WRITE_JOBLIB_DIR))
    labels  = joblib.load('{0}/labels.pkl'.format(WRITE_JOBLIB_DIR))
    
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.1, random_state=1234)
    
    if clf_name == 'gbm':
        rgr = LGBMRegressor( 
            num_leaves=1000, 
            max_depth=9, 
            learning_rate=0.06, 
            n_estimators=100, 
            objective='regression',
        )
    elif clf_name == 'linear':
        rgr = LinearRegression()
    #rgr = XGBRegressor( n_estimators = 100, learning_rate=0.1)
    
    print('Learning Start')
    time = timer()
    rgr.fit(X_train,y_train)
    time = timer() - time
    print('Learning Finish\n Time: {0}'.format(time))
    y_pred = rgr.predict(X_test)
    
    joblib.dump(rgr, '{0}/{1}_regressor.pkl'.format(WRITE_JOBLIB_DIR, clf_name))
    for yt, yp in zip (y_test, y_pred):
        print((yt,yp))
    print(mean_squared_error(y_test, y_pred))

def TuneXgboostRgr():
    
    features = joblib.load('{0}/features.pkl'.format(WRITE_JOBLIB_DIR))
    labels  = joblib.load('{0}/labels.pkl'.format(WRITE_JOBLIB_DIR))
    
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.1, random_state=1234)
    
    params = { 
        'learning_rate' : [0.3,0.2,0.1]
    }
    
    gs = GridSearchCV(estimator = LGBMRegressor(
                            num_leaves=31,
                            learning_rate =0.1, 
                            n_estimators=1000,
                            max_depth=9,
                            objective='regression',
                            min_sum_hessian_in_leaf=1
                        ),
                        param_grid = params, 
                        cv=5)

    gs.fit(X_train, y_train)
    print('\n')
    print(gs.cv_results_)
    print('\n')
    print(gs.best_params_)
    print('\n')
    print(gs.best_score_)

if __name__ == '__main__':
    #Preprocess()
    #ExtractWords()
    #ExtractLabelFromInfobox()
    #MakeFeatures()
    #LearnRegressor()
    InspectRegressor()
    #TuneXgboostRgr()
    
    
    
    
    

 

参考サイト

https://qiita.com/conta_/items/4b031a44acceb137ec73
https://yubais.net/doc/matplotlib/bar.html

https://qiita.com/buruzaemon/items/975027cea6371b2c5ec3
https://qiita.com/hoto17296/items/e1f80fef8536a0e5e7db
https://qiita.com/yasunori/items/31a23eb259482e4824e2