取得ツイートのCSV出力からAWS上で定期実行するまで Twitter API
というものを使うとTwitter上の情報を集めたりできて便利らしいので実験する。
取得ツイートのCSV出力 → AWS上で実行 → AWS上で定期実行 までの作業を一通り行う。
最終的に以下のようにする。
前回(ツイート取得まで)はこれ。
Twitter API v2の使い方1 (Twitterアカウント作成からツイート取得まで)
Twitterアカウントの作成からAPIを使用してツイート取得するまで。Twitter APIというものを使うとTwitter上の情報を集めたりできて便利。すご…
1. 取得ツイートのCSV出力 前回作成したツイート取得スクリプトを改造。 取得したスクリプトをCSVファイルに出力する。 Excelで開くことを見越したファイル形式にする。
1.1 コード 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import requests import os import csv import re #### FUNCTIONS # query:検索ワード tweet_fields:取得データ APIリクエスト用のURLを作成 def create_url(query, tweet_fields): if(any(tweet_fields)): formatted_tweet_fields = "tweet.fields=" + ",".join(tweet_fields) else: formatted_tweet_fields = "" url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}".format(query, formatted_tweet_fields) return url # bearer_token:APIキー APIリクエスト用のヘッダーを作成 def create_headers(bearer_token): headers = {"Authorization": "Bearer {}".format(bearer_token)} return headers # url:APIリクエスト用のURL headers:APIリクエスト用のヘッダー APIリクエスト def connect_to_endpoint(url, headers): response = requests.request("GET", url, headers=headers) print(response.status_code) if response.status_code != 200: # HTTPレスポンスステータスコードが200以外ならエラー raise Exception(response.status_code, response.text) return response.json() # filename:出力ファイル名 header:ヘッダー contents_list:中身 def save_csvfile(filename, header, contents_list): if(not os.path.isfile(filename)): # 既存の同名ファイルがなければ新規作成 with open(filename, "w", newline="", encoding="utf-16") as f: w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL) w.writeheader() with open(filename, "a", newline="", encoding="utf-16") as f: w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL) for contents in contents_list: w.writerow(contents) #### MAIN def main(): #### VARIABLES # APIキー https://developer.twitter.com から取得 BEARER_TOKEN = r"さっき取得したBEARER_TOKENをペースト" # 検索ワード e.g. query = "テスト" / query = "テスト OR test" # OR 検索 AND検索 -検索 などしたい場合はそのように書く query = 'ボボボーボ・ボーボボ' # 取得データ e.g. tweet_fields = [["id", "text", "created_at", "author_id"] # created_at(投稿時刻), author_id(アカウントID)などの情報が欲しい場合はtweet_fieldsに書く tweet_fields = ["id", "text", "created_at", "author_id"] #### データ取得 url = create_url(query, tweet_fields) headers = create_headers(BEARER_TOKEN) json_response = connect_to_endpoint(url, headers) data = json_response["data"] #### 取得データ保存 csv_file_name = re.sub(r'[\\/:*?"<>|.]+','',query) + ".csv" # ファイルに使えない文字削除 save_csvfile(csv_file_name, tweet_fields, data) if __name__ == "__main__": main()
1.2 実行結果を良い感じにする query
の中身とマッチした検索結果が取得できる。 バズったツイートがあると同一ツイートのリツイートで埋まるので
1 query = 'ボボボーボ・ボーボボ -is:retweet'
のように-is:retweet
を使ってリツイートを除くと良い。
tweet_fields
の中身を増やすとID、テキスト以外に様々な情報が手に入る。
query
で使用できる検索オプション、tweet_fields
で取得できる情報に関しては以下の記事にまとめてある。
2 AWS上で実行する 作成したツイート取得プログラムを定期的に実行しようと思うとPCをつけっぱなしにする必要があり不便。 そこで、作成したプログラムをAWS上で実行する。
具体的にはAWS Lambdaを使用する。
2.1 AWS S3 バケット作成 Amazon S3にアクセスhttps://s3.console.aws.amazon.com/
「バケットを作成」
バケット名決定
他は全てデフォルト設定のまま。
「バケットを作成」
2.2. AWS Lambda 関数作成 AWS Lambdaの関数に移動。
「関数を作成」
デフォルトで「一から作成」になってる、そのまま。
「関数名」を決定。 「ランタイム」をPythonにする。
「関数の作成」
こんなかんじになる。
下の方にスクロールすると「関数コード」という実行する関数を用意する場所がある。
「TEST」を押して「Configure test event」を選択。
「イベント名」を決定して「作成」
再度「TEST」を押してこんなかんじになればOK。
3 AWS Lambda に置く関数をローカルPCで作成 「関数コード」のサンプルを参考に既存のプログラムをLambda用に改造する。 「TEST」を押すなどして「AWS Lambda」に実行リクエストが送られると「lambda_function.py」内の「lambda_handler()」が実行される。
「AWS Lambda」で動くようにして、ついでに出力したCSVファイルを「AWS S3」に保存するようにしたものが以下のコード。
3.1. コード lambda_function.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 import requests import csv import re import boto3 from botocore.errorfactory import ClientError #### FUNCTIONS # query:検索ワード tweet_fields:取得データ APIリクエスト用のURLを作成 def create_url(query, tweet_fields): if(any(tweet_fields)): formatted_tweet_fields = "tweet.fields=" + ",".join(tweet_fields) else: formatted_tweet_fields = "" url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}".format(query, formatted_tweet_fields) return url # bearer_token:APIキー APIリクエスト用のヘッダーを作成 def create_headers(bearer_token): headers = {"Authorization": "Bearer {}".format(bearer_token)} return headers # url:APIリクエスト用のURL headers:APIリクエスト用のヘッダー APIリクエスト def connect_to_endpoint(url, headers): response = requests.request("GET", url, headers=headers) print(response.status_code) if response.status_code != 200: raise Exception(response.status_code, response.text) return response.json() # s3:s3のクライアント bucketname:バケット名 filepath:保存ファイル名 同名ファイルが存在したらTrue def s3_key_exists(s3, bucketname, filepath): try: s3.head_object(Bucket=bucketname, Key=filepath) return True except ClientError: return False # bucketname:バケット名 filepath:保存ファイル名 s3からCSVファイルを読み込む def read_csv_from_s3(bucketname, filepath): s3 = boto3.client("s3") if(s3_key_exists(s3, bucketname, filepath)): # s3に目当てのファイルがあれば読み込み、なければNone response = s3.get_object(Bucket=bucketname, Key=filepath) contents = response['Body'].read().decode('utf-16') contents = contents.strip().replace("\t",",").splitlines() # 「csv.DictReader」に合わせて整形 old_data = csv.DictReader(contents, quoting=csv.QUOTE_ALL) #辞書のジェネレータみたいなの返す return old_data else: return None # filepath:保存ファイル名 header:列名 old_data:既存データ add_data:追記データ LambdaにCSV出力 def write_csv_to_lambda(filepath, header, old_data, add_data): if(old_data is None): with open(filepath, "w", newline="", encoding="utf-16") as f: w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL) w.writeheader() for data in add_data: w.writerow(data) else: with open(filepath, "w", newline="", encoding="utf-16") as f: w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL) w.writeheader() for data in old_data: w.writerow(data) for data in add_data: w.writerow(data) # lambda_filepath:保存ファイル名(Lambda) bucketname:バケット名 s3_filepath:保存ファイル名(S3) LambdaからS3にファイル転送 def upload_from_lambda_to_s3(lambda_filepath, bucketname, s3_filepath): s3 = boto3.resource("s3") s3.meta.client.upload_file(lambda_filepath, bucketname, s3_filepath) #### MAIN def main(): #### VARIABLES # 保存先のAWS S3 バケット名 bucketname = "twitter-test-bucket-20210308" # APIキー https://developer.twitter.com から取得 BEARER_TOKEN = r"さっき取得したBEARER_TOKENをペースト" # 検索ワード e.g. query = "テスト" / query = "テスト OR test" # OR 検索 AND検索 -検索 などしたい場合はそのように書く query = 'ボボボーボ・ボーボボ -is:retweet' # 取得データ e.g. tweet_fields = [["id", "text", "created_at", "author_id"] # created_at(投稿時刻), author_id(アカウントID)などの情報が欲しい場合はtweet_fieldsに書く tweet_fields = ["id", "text", "created_at", "author_id"] #### データ取得 url = create_url(query, tweet_fields) headers = create_headers(BEARER_TOKEN) json_response = connect_to_endpoint(url, headers) add_data = json_response["data"] #### 取得データ保存 s3_filepath = re.sub(r'[\\/:*?"<>|.]+','',query) + ".csv" # ファイルに使えない文字削除 lambda_filepath = "/tmp/" + s3_filepath old_data = read_csv_from_s3(bucketname, s3_filepath) write_csv_to_lambda(lambda_filepath, tweet_fields, old_data, add_data) upload_from_lambda_to_s3(lambda_filepath, bucketname, s3_filepath) def lambda_handler(event, context): main()
3.2. AWS Lambda上でrequestsを使用する 上記のコードを「関数コード」に貼り付けて「Deploy」(変更を反映させる)。
その後「Test」を押して実行すると以下のようなエラーが発生する。
1 2 3 4 5 Response { "errorMessage": "Unable to import module 'lambda_function': No module named 'requests'", "errorType": "Runtime.ImportModuleError" }
曰くrequests
がない。
requests
はPythonの標準ライブラリでないためローカルPCで動かす際にはrequests
を別途インストールする必要があった。 今回コードを実行しているAWS Lamnbdaは素のPython環境であるためrequests
はない。 したがって動かない。
ここで、requests
を AWS Lambdaにインストールする必要がある。
3.2.1. requestsごとZipファイルにまとめてアップロード 「関数コード」右上の「アクション」に「.zip ファイルをアップロード」というコマンドがある。
これを利用してrequests
を AWS Lambdaにインストールすることができる。 そのためにrequests
を含むアップロード用のzipファイルを用意する。
pip install 〇〇
として〇〇ライブラリをインストールすると関連ファイルがPython環境のあたりに良い感じにダウンロードされる。
ここで、pip install -t ×× 〇〇
とするとダウンロード先がx×フォルダになる。
これを利用してrequests
の関連ファイルを取得する。
1 2 mkdir uploads pip install -t uploads requests
するとこんなフォルダができる。
先ほど作ったフォルダにlambda_function.py
(さっき作ったコード)も一緒に入れる。
するとこんなフォルダができる。
さっきのフォルダの中身を全選択して「圧縮」→「.zip」
AWS Lambdaの画面に戻って「関数コード」右上の「アクション」に「.zip ファイルをアップロード」をクリック。
「アップロード」でさっき作ったzipファイルを選択。
「保存」を押して「OK」を押すとこんなかんじになる。
その後、「Test」を押して実行すると以下のようなエラーが発生する。
1 2 3 4 5 Response { "errorMessage": "Failed to upload /tmp/ボボボーボ・ボーボボ -isretweet.csv to twitter-test-bucket-20210308/ボボボーボ・ボーボボ -isretweet.csv: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied", "errorType": "S3UploadFailedError", }
AWS S3へのファイルアップロードに失敗したらしい。 これは「AWS Lambda」の関数に「AWS S3」へのアクセス権が与えられていないことにより発生している。
したがって「AWS IAM」により「AWS S3」へのアクセス権を「AWS Lambda」に与える必要がある
3.3 IAMアクセス権を与える 「AWS S3」へのアクセス権を「AWS Lambda」に与えるためには、
「AWS IAM」によって「AWS S3」へのアクセス権を持つロールを作成する
作成したロールを「AWS Lambda」の関数に与える
の2手順が必要となる。
「AWS IAM」によって「AWS S3」へのアクセス権を持つロールを作成する https://console.aws.amazon.com/iam にアクセス。
画面左の「ロール」を選択後、「ロールの作成」をクリック。
信頼されたエンティティの種類は最初から「AWSサービス」になっている。
ユースケースの選択で「Lambda」を選択。
「次のステップ:アクセス権限」をクリック。
ポリシーのフィルタで「s3」と検索すると出てくる「AmazonS3FullAccess」を選択。
「次のステップ:タグ」をクリック。
「次のステップ:確認」をクリック。
「ロール名」を適当につける。
「ロールの作成」をクリック。
作成したロールを「AWS Lambda」の関数に与える 「AWS Lambda」の画面に戻る。 先ほどの関数の「アクセス権限」クリック。
実行ロールの「編集」をクリック。
実行ロールのチェックを「既存のロールを使用する」にする。
既存のロールを先ほど作成したロールにする。
「保存」をクリック。
あらためて関数コードから「Test」を実行。 エラーが発生しなければうまくいってる。
3.4 S3への出力結果を確認する 再度Amazon S3にアクセスhttps://s3.console.aws.amazon.com/
さっき作ったバケットをクリック。
CSVファイルが出力されている。
ファイルを選択して「アクション」→「ダウンロード」とするとダウンロードできる。
4. AWS上で定期実行 今の状態だと「TEST」をクリックするたびにツイートを取得することができる。 これを改善し、何もしなくても自動的にツイートを取得するようにする。
4.1. トリガーを追加 「AWS Lambda」のさっき作った関数の「デザイナー」にある「トリガーを追加」をクリック。
「EventBridge(CloudWatch Events)」を選択
新規ルールの作成を選択。 適当な「ルール名」を付ける。
定期実行したい間隔を指定。 5分おきならrate(5 minutes)
、5時間おきならrate(5 hours)
。 単数のときは単数形にする。(rate(1 minute)
、rate(1 hour)
)
「追加」をクリックする。
5. エラー フォルダの中身ではなくフォルダごとzipファイルにしてアップロード後に実行するとエラー。lambda_function.py
が見つからないと以下のエラーが発生する。 今回はフォルダの階層が1つ増えたためlambda_function.py
が見つからなかったことが原因。
1 2 3 4 5 Response { "errorMessage": "Unable to import module 'lambda_function': No module named 'lambda_function'", "errorType": "Runtime.ImportModuleError" }
Twitter API v2の使い方1 (Twitterアカウント作成からツイート取得まで)
Twitterアカウントの作成からAPIを使用してツイート取得するまで。Twitter APIというものを使うとTwitter上の情報を集めたりできて便利。すご…
Twitter API v2の使い方3[Tweet情報取得編](検索オプション・属性のまとめ)
Twitter API v2の使い方Twitter API v2は下の表のようなことができる。機能説明Tweet lookupツイートのID からツイートを検索…
Twitter API v2の使い方4[ユーザー情報取得編]
Twitter API v2の使い方Twitter API v2は大きく分けると2つ、ツイートユーザー2つの情報が取得できる。以前はツイート検索をしたかったため…
Twitter API v2の使い方5[フォロワードクラウド]
フォロワードクラウドワードクラウドというものがある。これを使うと文章中の出現頻度が高い単語を大きく表示して文章の内容を画像1枚で図示することができる。前回の記事…
Twitter API v2の使い方6[コウメ太夫氏の#まいにちチクショーを取得する]
Twitter API v2を使って特定アカウントのツイートを取得Twitter API v2の機能の中にUser Tweet timelineというものがあり…