Twitter API v2の使い方2 (ツイートのCSV出力からAWS上で定期実行するまで)

Twitter API v2の使い方2 (ツイートのCSV出力からAWS上で定期実行するまで)

取得ツイートのCSV出力からAWS上で定期実行するまで

Twitter APIというものを使うとTwitter上の情報を集めたりできて便利らしいので実験する。

取得ツイートのCSV出力 → AWS上で実行 → AWS上で定期実行
までの作業を一通り行う。

最終的に以下のようにする。



前回(ツイート取得まで)はこれ。

1. 取得ツイートのCSV出力

前回作成したツイート取得スクリプトを改造。
取得したスクリプトをCSVファイルに出力する。
Excelで開くことを見越したファイル形式にする。

1.1 コード

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import requests
import os
import csv
import re

#### FUNCTIONS
# query:検索ワード tweet_fields:取得データ APIリクエスト用のURLを作成
def create_url(query, tweet_fields):
if(any(tweet_fields)):
formatted_tweet_fields = "tweet.fields=" + ",".join(tweet_fields)
else:
formatted_tweet_fields = ""

url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}".format(query, formatted_tweet_fields)
return url

# bearer_token:APIキー APIリクエスト用のヘッダーを作成
def create_headers(bearer_token):
headers = {"Authorization": "Bearer {}".format(bearer_token)}
return headers

# url:APIリクエスト用のURL headers:APIリクエスト用のヘッダー APIリクエスト
def connect_to_endpoint(url, headers):
response = requests.request("GET", url, headers=headers)
print(response.status_code)
if response.status_code != 200: # HTTPレスポンスステータスコードが200以外ならエラー
raise Exception(response.status_code, response.text)
return response.json()

# filename:出力ファイル名 header:ヘッダー contents_list:中身
def save_csvfile(filename, header, contents_list):
if(not os.path.isfile(filename)): # 既存の同名ファイルがなければ新規作成
with open(filename, "w", newline="", encoding="utf-16") as f:
w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL)
w.writeheader()

with open(filename, "a", newline="", encoding="utf-16") as f:
w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL)
for contents in contents_list:
w.writerow(contents)

#### MAIN
def main():
#### VARIABLES
# APIキー https://developer.twitter.com から取得
BEARER_TOKEN = r"さっき取得したBEARER_TOKENをペースト"
# 検索ワード e.g. query = "テスト" / query = "テスト OR test"
# OR 検索 AND検索 -検索 などしたい場合はそのように書く
query = 'ボボボーボ・ボーボボ'
# 取得データ e.g. tweet_fields = [["id", "text", "created_at", "author_id"]
# created_at(投稿時刻), author_id(アカウントID)などの情報が欲しい場合はtweet_fieldsに書く
tweet_fields = ["id", "text", "created_at", "author_id"]

#### データ取得
url = create_url(query, tweet_fields)
headers = create_headers(BEARER_TOKEN)
json_response = connect_to_endpoint(url, headers)
data = json_response["data"]

#### 取得データ保存
csv_file_name = re.sub(r'[\\/:*?"<>|.]+','',query) + ".csv" # ファイルに使えない文字削除
save_csvfile(csv_file_name, tweet_fields, data)

if __name__ == "__main__":
main()

1.2 実行結果を良い感じにする

queryの中身とマッチした検索結果が取得できる。
バズったツイートがあると同一ツイートのリツイートで埋まるので

1
query = 'ボボボーボ・ボーボボ  -is:retweet'

のように-is:retweetを使ってリツイートを除くと良い。

tweet_fieldsの中身を増やすとID、テキスト以外に様々な情報が手に入る。

queryで使用できる検索オプション、tweet_fieldsで取得できる情報に関しては以下の記事にまとめてある。

2 AWS上で実行する

作成したツイート取得プログラムを定期的に実行しようと思うとPCをつけっぱなしにする必要があり不便。
そこで、作成したプログラムをAWS上で実行する。

具体的にはAWS Lambdaを使用する。

2.1 AWS S3 バケット作成

Amazon S3にアクセス
https://s3.console.aws.amazon.com/

「バケットを作成」



バケット名決定



他は全てデフォルト設定のまま。

「バケットを作成」



2.2. AWS Lambda 関数作成

AWS Lambdaの関数に移動。

「関数を作成」



デフォルトで「一から作成」になってる、そのまま。

「関数名」を決定。
「ランタイム」をPythonにする。



「関数の作成」



こんなかんじになる。



下の方にスクロールすると「関数コード」という実行する関数を用意する場所がある。

「TEST」を押して「Configure test event」を選択。



「イベント名」を決定して「作成」




再度「TEST」を押してこんなかんじになればOK。



3 AWS Lambda に置く関数をローカルPCで作成

「関数コード」のサンプルを参考に既存のプログラムをLambda用に改造する。
「TEST」を押すなどして「AWS Lambda」に実行リクエストが送られると「lambda_function.py」内の「lambda_handler()」が実行される。

「AWS Lambda」で動くようにして、ついでに出力したCSVファイルを「AWS S3」に保存するようにしたものが以下のコード。

3.1. コード

lambda_function.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import requests
import csv
import re
import boto3
from botocore.errorfactory import ClientError

#### FUNCTIONS
# query:検索ワード tweet_fields:取得データ APIリクエスト用のURLを作成
def create_url(query, tweet_fields):
if(any(tweet_fields)):
formatted_tweet_fields = "tweet.fields=" + ",".join(tweet_fields)
else:
formatted_tweet_fields = ""
url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}".format(query, formatted_tweet_fields)
return url

# bearer_token:APIキー APIリクエスト用のヘッダーを作成
def create_headers(bearer_token):
headers = {"Authorization": "Bearer {}".format(bearer_token)}
return headers

# url:APIリクエスト用のURL headers:APIリクエスト用のヘッダー APIリクエスト
def connect_to_endpoint(url, headers):
response = requests.request("GET", url, headers=headers)
print(response.status_code)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
return response.json()

# s3:s3のクライアント bucketname:バケット名 filepath:保存ファイル名 同名ファイルが存在したらTrue
def s3_key_exists(s3, bucketname, filepath):
try:
s3.head_object(Bucket=bucketname, Key=filepath)
return True
except ClientError:
return False

# bucketname:バケット名 filepath:保存ファイル名 s3からCSVファイルを読み込む
def read_csv_from_s3(bucketname, filepath):
s3 = boto3.client("s3")
if(s3_key_exists(s3, bucketname, filepath)): # s3に目当てのファイルがあれば読み込み、なければNone
response = s3.get_object(Bucket=bucketname, Key=filepath)
contents = response['Body'].read().decode('utf-16')
contents = contents.strip().replace("\t",",").splitlines() # 「csv.DictReader」に合わせて整形
old_data = csv.DictReader(contents, quoting=csv.QUOTE_ALL) #辞書のジェネレータみたいなの返す
return old_data
else:
return None

# filepath:保存ファイル名 header:列名 old_data:既存データ add_data:追記データ LambdaにCSV出力
def write_csv_to_lambda(filepath, header, old_data, add_data):
if(old_data is None):
with open(filepath, "w", newline="", encoding="utf-16") as f:
w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL)
w.writeheader()
for data in add_data:
w.writerow(data)
else:
with open(filepath, "w", newline="", encoding="utf-16") as f:
w = csv.DictWriter(f, fieldnames=header, dialect="excel-tab", quoting=csv.QUOTE_ALL)
w.writeheader()
for data in old_data:
w.writerow(data)
for data in add_data:
w.writerow(data)

# lambda_filepath:保存ファイル名(Lambda) bucketname:バケット名 s3_filepath:保存ファイル名(S3) LambdaからS3にファイル転送
def upload_from_lambda_to_s3(lambda_filepath, bucketname, s3_filepath):
s3 = boto3.resource("s3")
s3.meta.client.upload_file(lambda_filepath, bucketname, s3_filepath)

#### MAIN
def main():
#### VARIABLES
# 保存先のAWS S3 バケット名
bucketname = "twitter-test-bucket-20210308"
# APIキー https://developer.twitter.com から取得
BEARER_TOKEN = r"さっき取得したBEARER_TOKENをペースト"
# 検索ワード e.g. query = "テスト" / query = "テスト OR test"
# OR 検索 AND検索 -検索 などしたい場合はそのように書く
query = 'ボボボーボ・ボーボボ -is:retweet'
# 取得データ e.g. tweet_fields = [["id", "text", "created_at", "author_id"]
# created_at(投稿時刻), author_id(アカウントID)などの情報が欲しい場合はtweet_fieldsに書く
tweet_fields = ["id", "text", "created_at", "author_id"]

#### データ取得
url = create_url(query, tweet_fields)
headers = create_headers(BEARER_TOKEN)
json_response = connect_to_endpoint(url, headers)
add_data = json_response["data"]

#### 取得データ保存
s3_filepath = re.sub(r'[\\/:*?"<>|.]+','',query) + ".csv" # ファイルに使えない文字削除
lambda_filepath = "/tmp/" + s3_filepath
old_data = read_csv_from_s3(bucketname, s3_filepath)
write_csv_to_lambda(lambda_filepath, tweet_fields, old_data, add_data)
upload_from_lambda_to_s3(lambda_filepath, bucketname, s3_filepath)

def lambda_handler(event, context):
main()

3.2. AWS Lambda上でrequestsを使用する

上記のコードを「関数コード」に貼り付けて「Deploy」(変更を反映させる)。



その後「Test」を押して実行すると以下のようなエラーが発生する。

1
2
3
4
5
Response
{
"errorMessage": "Unable to import module 'lambda_function': No module named 'requests'",
"errorType": "Runtime.ImportModuleError"
}

曰くrequestsがない。

requestsはPythonの標準ライブラリでないためローカルPCで動かす際にはrequestsを別途インストールする必要があった。
今回コードを実行しているAWS Lamnbdaは素のPython環境であるためrequestsはない。
したがって動かない。

ここで、requestsを AWS Lambdaにインストールする必要がある。

3.2.1. requestsごとZipファイルにまとめてアップロード

「関数コード」右上の「アクション」に「.zip ファイルをアップロード」というコマンドがある。



これを利用してrequestsを AWS Lambdaにインストールすることができる。
そのためにrequestsを含むアップロード用のzipファイルを用意する。

  • requestsファイルを手に入れる

pip install 〇〇として〇〇ライブラリをインストールすると関連ファイルがPython環境のあたりに良い感じにダウンロードされる。

ここで、pip install -t ×× 〇〇とするとダウンロード先がx×フォルダになる。

これを利用してrequestsの関連ファイルを取得する。

1
2
mkdir uploads
pip install -t uploads requests

するとこんなフォルダができる。



  • lambda_function.pyを入れる

先ほど作ったフォルダにlambda_function.py(さっき作ったコード)も一緒に入れる。

するとこんなフォルダができる。



  • zipファイルにする

さっきのフォルダの中身を全選択して「圧縮」→「.zip」



  • AWS Lambdaにアップロード

AWS Lambdaの画面に戻って「関数コード」右上の「アクション」に「.zip ファイルをアップロード」をクリック。

「アップロード」でさっき作ったzipファイルを選択。



「保存」を押して「OK」を押すとこんなかんじになる。



その後、「Test」を押して実行すると以下のようなエラーが発生する。

1
2
3
4
5
Response
{
"errorMessage": "Failed to upload /tmp/ボボボーボ・ボーボボ -isretweet.csv to twitter-test-bucket-20210308/ボボボーボ・ボーボボ -isretweet.csv: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied",
"errorType": "S3UploadFailedError",
}

AWS S3へのファイルアップロードに失敗したらしい。
これは「AWS Lambda」の関数に「AWS S3」へのアクセス権が与えられていないことにより発生している。

したがって「AWS IAM」により「AWS S3」へのアクセス権を「AWS Lambda」に与える必要がある

3.3 IAMアクセス権を与える

「AWS S3」へのアクセス権を「AWS Lambda」に与えるためには、

  1. 「AWS IAM」によって「AWS S3」へのアクセス権を持つロールを作成する

  2. 作成したロールを「AWS Lambda」の関数に与える

の2手順が必要となる。

  • 「AWS IAM」によって「AWS S3」へのアクセス権を持つロールを作成する

https://console.aws.amazon.com/iam
にアクセス。

画面左の「ロール」を選択後、「ロールの作成」をクリック。



信頼されたエンティティの種類は最初から「AWSサービス」になっている。

ユースケースの選択で「Lambda」を選択。



「次のステップ:アクセス権限」をクリック。



ポリシーのフィルタで「s3」と検索すると出てくる「AmazonS3FullAccess」を選択。



「次のステップ:タグ」をクリック。



「次のステップ:確認」をクリック。



「ロール名」を適当につける。



「ロールの作成」をクリック。



  • 作成したロールを「AWS Lambda」の関数に与える

「AWS Lambda」の画面に戻る。
先ほどの関数の「アクセス権限」クリック。



実行ロールの「編集」をクリック。



実行ロールのチェックを「既存のロールを使用する」にする。

既存のロールを先ほど作成したロールにする。



「保存」をクリック。



あらためて関数コードから「Test」を実行。
エラーが発生しなければうまくいってる。

3.4 S3への出力結果を確認する

再度Amazon S3にアクセス
https://s3.console.aws.amazon.com/

さっき作ったバケットをクリック。

CSVファイルが出力されている。

ファイルを選択して「アクション」→「ダウンロード」とするとダウンロードできる。



4. AWS上で定期実行

今の状態だと「TEST」をクリックするたびにツイートを取得することができる。
これを改善し、何もしなくても自動的にツイートを取得するようにする。

4.1. トリガーを追加

「AWS Lambda」のさっき作った関数の「デザイナー」にある「トリガーを追加」をクリック。



  • トリガーを選択

「EventBridge(CloudWatch Events)」を選択



  • ルール

新規ルールの作成を選択。
適当な「ルール名」を付ける。



  • スケジュール式

定期実行したい間隔を指定。
5分おきならrate(5 minutes)、5時間おきならrate(5 hours)
単数のときは単数形にする。(rate(1 minute)rate(1 hour))



「追加」をクリックする。



5. エラー

フォルダの中身ではなくフォルダごとzipファイルにしてアップロード後に実行するとエラー。
lambda_function.pyが見つからないと以下のエラーが発生する。
今回はフォルダの階層が1つ増えたためlambda_function.pyが見つからなかったことが原因。

1
2
3
4
5
Response
{
"errorMessage": "Unable to import module 'lambda_function': No module named 'lambda_function'",
"errorType": "Runtime.ImportModuleError"
}

6. Twitter API リンク

タグ , ,

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×