-
Notifications
You must be signed in to change notification settings - Fork 0
/
audioApi.py
144 lines (131 loc) · 5.01 KB
/
audioApi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import datetime
import csv
import ijson
import itertools
import pylast
from models import ListenHistory, Track
access_token = None
def auth(request):
global access_token
if access_token:
return access_token
url = 'https://api.zenobase.com/oauth/token'
params = {
'grant_type': 'password',
'username': request['username'],
'password': request['password'],
}
response = requests.post(url, params=params)
access_token = json.loads(response.text)['access_token']
return access_token
def trimDurations(items):
last = None
for item in sorted(items, key=lambda a: a['timestamp']):
if last is not None:
maximumDuration = item['timestamp'] - last['timestamp']
if last['listenDuration'] > maximumDuration:
last['listenDuration'] = maximumDuration
yield last
last = item
yield last
def mergeItems(items):
for sequence in findSequences(items):
item = min(sequence, key=lambda a: a['timestamp'])
startTime = min(a['timestamp'] for a in sequence)
endTime = max(a['timestamp'] for a in sequence)
minPercentage = min(a['percentage'] for a in sequence)
maxPercentage = max(a['percentage'] for a in sequence)
listenDuration = min((endTime-startTime), (maxPercentage-minPercentage)/100*item['duration'])
if listenDuration.total_seconds() < 20:
listenDuration = item['duration']
item['listenDuration'] = listenDuration
yield item
def formatPhoneData(item):
action, app, album, artist, title, duration, progress, timestamp = tuple(item)
if app not in ('com.bambuna.podcastaddict'):
return None
return {
'title': title,
'album': album,
'artist': artist,
'action': action,
'isPodcast': True,
'timestamp': datetime.datetime.fromisoformat(timestamp),
'duration': datetime.timedelta(seconds=int(duration)/1000),
'percentage': float(progress)/float(duration),
}
def formatZenobaseData(item):
duration, title, album, artist, percentage, action, app, timestamp = tuple(item)
if app not in ('com.bambuna.podcastaddict', 'com.spotify.music'):
return None
return {
'title': title,
'album': album,
'artist': artist,
'action': action,
'isPodcast': app == 'com.bambuna.podcastaddict',
'timestamp': datetime.datetime.fromisoformat(timestamp[:-1]),
'duration': datetime.timedelta(seconds=int(duration)/1000),
'percentage': float(percentage),
}
def formatLastFmTrack(track):
return {
'title': track.track.title,
'album': track.album,
'artist': track.track.artist.name,
'action': 'start',
'isPodcast': False,
'timestamp': datetime.datetime.fromtimestamp(int(track.timestamp)),
'duration': datetime.timedelta(seconds=-1),
'percentage': 1
}
def sameTrack(track1, track2):
if track1['title'] == track2['title']:
return True
if track1['isPodcast'] and track1['artist'] == track2['artist'] and track1['duration'] == track2['duration']:
return True
return False
def findSequences(items):
items = [i for i in items if i is not None]
if not items:
return
items = sorted(items, key=lambda a: a['timestamp'])
podcasts = [i for i in items if i['isPodcast']]
music = [i for i in items if not i['isPodcast']]
for set in (podcasts, music):
currentTrack = None
sequence = []
for item in set:
if currentTrack and not sameTrack(currentTrack, item):
yield sequence
sequence = []
currentTrack = item
sequence.append(item)
if sequence:
yield sequence
def callLastFm(credentials, start, end):
network = pylast.LastFMNetwork(api_key=credentials['apikey'], api_secret=credentials['secret'])
user = network.get_user(credentials['username'])
tracks = user.get_recent_tracks(limit=None, time_from=start, time_to=end)
return map(formatLastFmTrack, tracks)
def call(sources, start, end):
csvfile = open(sources['zenobase'], 'r', encoding="utf8")
items = map(formatZenobaseData, csv.reader(csvfile, delimiter=',', quotechar='"'))
# for phoneFile in sources['phone']:
# audiofile = open(phoneFile, 'r'):
# items = itertools.chain(items,map(formatPhoneData, ijson.items(audiofile)))
items = itertools.chain(items, callLastFm(sources['lastfm'], start, end))
for item in trimDurations(mergeItems(items)):
track = Track(
name=item['title'],
artist=item['artist'],
album=item['album'],
is_podcast=bool(item['isPodcast']),
duration=item['duration'].total_seconds(),
)
yield track
yield ListenHistory(
timestamp=item['timestamp'],
duration=item['listenDuration'].total_seconds(),
track_id=track.id,
)